source: powerbar/serial-npm4000.py@ 159

Last change on this file since 159 was 159, checked in by Rick van der Zwet, 14 years ago

Features :-)

  • Property svn:executable set to *
File size: 12.7 KB
Line 
1#!/usr/bin/env python
2# NPM 4000 powerbar managment script, to be used instead of the windows
3# application.
4#
5# TODO: Not all features are ported yet (like amp monitoring)
6# TODO: Error handling not implemented
7#
8# Licence: BSD
9# Version: $Id: npm4000.py 750 2009-09-27 14:34:55Z rick $
10# Rick van der Zwet <info@rickvanderzwet.nl>
11
12import serial
13import sys
14import time
15import random
16import getopt
17
18# Set to true to enable verbose communication aka debugging
19DEBUG = False
20
21# Default for options
22opt_serial_port = '/dev/ttyUSB0'
23opt_password = 0x12345678
24opt_address_code = 0xFFFF
25opt_baudrate = 19200
26opt_delay = 0.0
27
28# Serial connection port status is cached globally to avoid overhead
29ser = None
30port_status_synced = False
31ports_status = None
32ports_ampere = None
33# Segment A, B, C
34grid_status = None
35
36def make_checksum(command):
37 """ Generate CRC checksum using XOR on all bytes """
38 crc = 0
39 for item in command:
40 crc ^= item
41 return crc
42
43
44
45def debug(msg):
46 """ Print debug statement if DEBUG is set """
47 if DEBUG:
48 print msg
49
50
51
52def hex_to_str(command):
53 """ Human readable representation of command """
54 return " ".join(["%02x" % item for item in command])
55
56
57
58def str_to_hex(s):
59 """ Hexadecimal string representation of 's' """
60 return [ord(x) for x in s]
61
62
63
64def port_to_hex(port_number):
65 """ Convert integer port number to hexadecimal presentation as internal
66 location
67 """
68 if port_number < 1:
69 assert False, "Invalid port port_number (%i)" % port_number
70 if port_number <= 8:
71 port = 0xa0 + port_number
72 elif port_number <= 16:
73 port = 0xb0 + (port_number - 8)
74 elif port_number <= 24:
75 port = 0xc0 + (port_number - 16)
76 else:
77 assert False, "Invalid port port_number (%i)" % port_number
78 debug("%i - %02x" % (port_number, port))
79 return port
80
81
82
83def hex_to_port(port):
84 """ Convert hexadecimal port to human port number """
85 base = port & 0xf0
86 index = port & 0x0f
87 if (base ^ 0xa0) == 0:
88 port_number = index + 0
89 elif (base ^ 0xb0) == 0:
90 port_number = index + 8
91 elif (base ^ 0xc0) == 0:
92 port_number = index + 16
93 else:
94 assert False, "Invalid port (%02x)" % port
95 debug("%02x - %i" % (port, port_number))
96 return port_number
97
98
99
100def send_raw_command(raw_command, response_size=1024):
101 """ Send raw command to serial device and wait for response """
102
103 debug("Going to send: " + hex_to_str(raw_command))
104 send_line = "".join([chr(item) for item in raw_command])
105 ser.write(send_line)
106 recv_line = ser.read(response_size)
107 recv_command = str_to_hex(recv_line)
108 debug("Received: %s (%i)" % (hex_to_str(recv_command), len(recv_line)))
109 return(recv_command)
110
111
112
113def address_to_num(address):
114 """ Convert internal address representation to integer """
115 return (address[0] << 8) ^ address[1]
116
117
118
119def num_to_address(npm_number):
120 """ Convert address number to internal representation """
121 return [npm_number >> 8, npm_number & 0x00ff]
122
123
124
125def bin_reverse(number,width=8):
126 """Little hacking using string logic to binary reverse number"""
127 return int(bin(number)[2:].zfill(width)[::-1],2)
128
129#send_command(ser, login_line, 5)
130# Note: you will need to wait at least timeout * 24 to complete
131#ser.timeout = 13
132#send_command(ser, allon_line, 6)
133#send_command(ser, alloff_line,6)
134
135#send_command(ser, power_on_interval_05_line,6)
136#send_command(ser,port_on_cmd(24),6)
137#send_command(ser,port_off_cmd(24),6)
138#retval = send_command(ser, refresh_line, 42)
139#print get_port_status(retval)
140#while True:
141# port_number = random.randint(1,24)
142# print port_number
143# send_command(ser, login_line, 5)
144# send_command(ser, port_on_cmd(port_number),6)
145# send_command(ser, login_line, 5)
146# send_command(ser, port_off_cmd(port_number),6)
147
148# Login cycle
149#command = action_login + device_id + password
150#send_command(ser, command)
151# Reference implementation lines
152# A = action, B = address, C = password, P = port
153# Mostly of type [A, A, B, B, C, C, C, C] or [A, A, B, B] or [A, A, B, B, P]
154# (command, return_type, timeout)
155line = dict()
156line['login'] = ([0x55, 0x07, 0xff, 0xff, 0x12, 0x34, 0x56, 0x78], 5, 1)
157line['status'] = ([0xd1, 0x03, 0xff, 0xff], 42, 1)
158line['allon'] = ([0xb1, 0x03, 0xff, 0xff], 6, 13)
159line['alloff'] = ([0xc1, 0x03, 0xff, 0xff], 6, 13)
160line['port_on'] = ([0xb2, 0x04, 0xff, 0xff, 0xa1], 6, 1)
161line['port_off'] = ([0xc2, 0x04, 0xff,0xff, 0xa1], 6, 1)
162line['power_on_interval_125'] = ([0xd6, 0x04, 0xff, 0xff, 0xfa, 0x28], 5, 1)
163line['power_on_interval_05'] = ([0xd6, 0x04, 0xff, 0xff, 0x01, 0xd3], 5, 1)
164line['change_address_code'] = ([0x05, 0x00, 0xff, 0xff, 0x00, 0x04], 5, 1)
165line['modify_password'] = ([0xd3, 0x07, 0xff, 0xff, 0x11, 0x11, 0x11, 0x11], 5, 1)
166
167def num_to_hex(number):
168 """ Number to internal hexadecimal representation """
169 if number == None:
170 return []
171 length = len(hex(number)[2:]) + (len(hex(number)[2:]) % 2)
172 return [int(hex(number)[2:].zfill(length)[x:x+2],16) for x in range(0,length,2)]
173
174#print hex_to_str(num_to_hex(opt_password))
175#exit(0)
176
177
178
179def send_command(action, argument=[]):
180 """ Send CRC computed command to serial device and wait for response """
181 (command, response_size, timeout) = line[action]
182 if not isinstance(argument, list):
183 argument = num_to_hex(argument)
184 command = command[0:2] + num_to_hex(opt_address_code) + argument
185 serial.timeout = timeout
186 raw_command = command + [make_checksum(command)]
187 return send_raw_command(raw_command, response_size)
188
189
190def action_login():
191 """ Login to device """
192 return send_command('login', opt_password)
193
194def action_status():
195 """ Get port status from device """
196 action_login()
197 return send_command('status')
198
199def action_port_on(port):
200 """ Enable port on device """
201 global port_status_synced
202 port_status_synced = False
203
204 action_login()
205 return send_command('port_on', port_to_hex(port))
206
207def action_port_off(port):
208 """ Disable port on device """
209 global port_status_synced
210 port_status_synced = False
211
212 action_login()
213 return send_command('port_off', port_to_hex(port))
214
215def get_ports_status():
216 global ports_status, port_status_synced, ports_ampere, grid_status
217 ports_status_synced = False
218 # Example port 1 is off
219 # d1 28 ff ff fe ff ff
220 # ^^ ^^ ^^
221 # TODO: Implement Ampere monitoring
222 #[01:38:14] Send: 55 07 ff ff 12 34 56 78 5a ff ff d2 00
223 #[01:38:14] Recv: ff ff a9 d1 28 ff ff 00 07 00 11 00 00 00 00 00 00 00 00
224 # 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
225 # 28 00 28 00 00 10 ff
226 #[01:38:14] Comm: Refresh
227 port_array = [False] * 25
228 ports_ampere = [0] * 25
229 grid_status = [0] * 3
230
231 action_login()
232 retval = send_command('status')
233 status_array = bin_reverse((bin_reverse(retval[4]) << 16) ^ (bin_reverse(retval[5]) << 8) ^ bin_reverse(retval[6]),24)
234 for port in range(0,24):
235 if (status_array & (1 << port)) > 0:
236 port_array[port+1] = True
237 # retval[7] is the sensor state?
238 ports_ampere = [0] + retval[8:16] + retval[17:25] + retval[26:34]
239 grid_status = [retval[16], retval[25], retval[34]]
240
241 # Update global state
242 ports_status = port_array
243 port_status_synced = True
244
245 return port_array
246
247
248
249def get_port_status(port):
250 """ Get specific port status """
251 global ports_status, port_status_synced
252
253 if not port_status_synced:
254 get_ports_status()
255 return ports_status[port]
256
257
258
259def get_port_ampere(port):
260 """ Get specific port ampere usage """
261 global ports_ampere, port_status_synced
262
263 if not port_status_synced:
264 get_ports_status()
265 return ports_ampere[port]
266
267
268
269def get_grid_status():
270 """ Get grid ampere usage """
271 global grid_status, ports_status_synced
272
273 if not port_status_synced:
274 get_ports_status()
275 return grid_status
276
277
278
279def bool_to_str(boolean, raw=False):
280 if not raw:
281 return str(boolean)
282 elif boolean:
283 return "1"
284 else:
285 return "0"
286
287
288def usage(msg="",exitcode=None):
289 print """%s
290Usage %s arguments
291Version: $Id: npm4000.py 750 2009-09-27 14:34:55Z rick $
292
293Arguments:
294 [-h|--help] Reading right know
295 [-a|--ampere] Display ampere readings
296 [-d|--debug] Print extra communication output
297 [-r|--raw] Status(es) is bits like output
298 --serialport=<path> Serial Port to connect to [%s]
299 --password=<hex> Password to use in hex notation [%s]
300 --addresscode=<hex> Internal device number in hex notation [%s]
301 --delay=<int> Delay used between port operations [%s]
302 --baudrate=<int> Bautrate used for communication (19200,9600) [%s]
303 [-s <port>|--status=<port>] Current port(s) configuration
304 [-t <port>|--toggle=<port>] Toggle port(s)
305 [-o <port>|--on=<port>] Turn on port(s)
306 [-f <port>|--off=<port>] Turn off port(s)
307 --allon Turn all ports on using internal safety [TODO: Implement]
308 --alloff Turn all ports off using internal safety [TODO: Implement]
309 --changepassword= Change password [TODO: Implement]
310 --changeaddresscode= Change addresscode [TODO: Implement]
311 --changetimerdelay= Change internal timer delay [TODO: Implement]
312
313Note: [TODO: Implement] bit codes are in the source code, feel free to drop me
314an email <info@rickvanderzwet.nl> if you really to need to be in there.
315
316Note: <port> has different notations:
317 Numeric value of port 1,2,3,4,5,..
318 Actual value of port A1,..,A8,B1,..,B8,C1,..,C8
319 All ports all
320 """ % (msg, sys.argv[0], opt_serial_port, hex(opt_password), hex(opt_address_code), opt_delay, opt_baudrate)
321 if exitcode:
322 sys.exit(exitcode)
323
324def main():
325 global DEBUG, ser, opt_serial_port, opt_password, opt_address_code, opt_baudrate, opt_delay
326 try:
327 opts, args = getopt.getopt(sys.argv[1:],
328 "adf:s:t:ro:v",
329 ["ampere", "debug", "delay=", "verbose", "serialport=", "password=",
330 "addresscode=","toggle=","off=", "on=", "status=", "buadrate=", "raw="])
331 except getopt.GetoptError, err:
332 usage(str(err),2)
333
334 opt_port = None
335 opt_action = None
336 opt_raw = False
337 opt_ampere = False
338 for o, a in opts:
339 debug("%s : %s" % (o, a))
340 if o in ["-a", "--ampere"]:
341 opt_ampere = True
342 elif o in ["-d", "--debug"]:
343 DEBUG = True
344 elif o in ["--delay"]:
345 opt_delay = float(a)
346 elif o in ["-h", "--help"]:
347 usage("",0)
348 elif o in ["--addresscode"]:
349 opt_address_code = int(a,16)
350 elif o in ["--password"]:
351 opt_passwd = int(a,16)
352 elif o in ["--buadrate"]:
353 opt_baudrate = a
354 elif o in ["--serialport"]:
355 opt_serial_port = a
356 elif o in ["-s", "--status"]:
357 opt_action = "status"
358 opt_port = a
359 elif o in ["-t","--toggle"]:
360 opt_action = "toggle"
361 opt_port = a
362 elif o in ["-f","--off"]:
363 opt_action = "off"
364 opt_port = a
365 elif o in ["-r","--raw"]:
366 opt_raw = True
367 elif o in ["-o","--on"]:
368 opt_action = "on"
369 opt_port = a
370 else:
371 assert False, "unhandled option"
372
373 if (opt_port == None):
374 usage("[ERROR] No port defined",2)
375 elif (opt_action == None):
376 usage("[ERROR] No action defined",2)
377
378
379 # Resolve port to proper numbers array
380 ports = []
381 for port in opt_port.split(','):
382 debug("Raw port: %s" % port)
383 if port == "all":
384 ports.extend(range(1,25))
385 elif port[0] in "ABCabc":
386 print hex_to_port(int(port,16))
387 ports.append(hex_to_port(int(port,16)))
388 else:
389 ports.append(int(port))
390 debug("Operating on ports " + str(ports))
391
392 # Open serial port
393 ser = serial.Serial(opt_serial_port, opt_baudrate, timeout=5)
394 debug(serial)
395
396 # Status needs real integers, hack
397 for port in ports:
398 if opt_action == "status":
399 if opt_raw:
400 print bool_to_str(get_port_status(port),opt_raw),
401 else:
402 ampere_str = ""
403 if opt_ampere:
404 ampere_str = "[%s]" % get_port_ampere(port)
405 print "Port %02i : %s %s" % (port, get_port_status(port), ampere_str)
406 elif opt_action == "toggle":
407 if get_port_status(port):
408 action_port_off(port)
409 else:
410 action_port_on(port)
411 elif opt_action == "on":
412 action_port_on(port)
413 elif opt_action == "off":
414 action_port_off(port)
415 else:
416 assert False, "Option '%s' invalid" % opt_action
417 # Backoff if we need to be patient
418 time.sleep(opt_delay)
419 sys.stdout.flush()
420
421 # Be nice and close correctly
422 ser.close()
423
424 if opt_ampere:
425 if opt_raw:
426 print " ".join(map(str, get_grid_status()))
427 else:
428 print "Grid A: %s" % grid_status[0]
429 print "Grid B: %s" % grid_status[1]
430 print "Grid C: %s" % grid_status[2]
431
432if __name__ == "__main__":
433 main()
Note: See TracBrowser for help on using the repository browser.