source: powerbar/serial-npm4000.py@ 241

Last change on this file since 241 was 162, checked in by Rick van der Zwet, 14 years ago

Make sure Id get set nicely

  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 13.9 KB
Line 
1#!/usr/bin/env python
2# NPM 4000 powerbar managment script, to be used instead of the windows
3# application.
4#
5# TODO: Not all features are ported yet (like amp monitoring)
6# TODO: Error handling not implemented
7#
8# Licence: BSD
9# Version: $Id: serial-npm4000.py 162 2010-07-16 13:53:19Z rick $
10# Rick van der Zwet <info@rickvanderzwet.nl>
11
12import serial
13import sys
14import time
15import random
16import getopt
17
18# Set to true to enable verbose communication aka debugging
19DEBUG = False
20
21# Default for options
22opt_serial_port = '/dev/ttyUSB0'
23opt_password = 0x12345678
24opt_address_code = 0xFFFF
25opt_baudrate = 19200
26opt_delay = 0.0
27
28# Serial connection port status is cached globally to avoid overhead
29ser = None
30port_status_synced = False
31ports_status = None
32ports_ampere = None
33# Segment A, B, C
34grid_status = None
35
36def make_checksum(command):
37 """ Generate CRC checksum using XOR on all bytes """
38 crc = 0
39 for item in command:
40 crc ^= item
41 return crc
42
43
44
45def debug(msg):
46 """ Print debug statement if DEBUG is set """
47 if DEBUG:
48 print msg
49
50
51
52def hex_to_str(command):
53 """ Human readable representation of command """
54 return " ".join(["%02x" % item for item in command])
55
56
57
58def str_to_hex(s):
59 """ Hexadecimal string representation of 's' """
60 return [ord(x) for x in s]
61
62
63
64def port_to_hex(port_number):
65 """ Convert integer port number to hexadecimal presentation as internal
66 location
67 """
68 if port_number < 1:
69 assert False, "Invalid port port_number (%i)" % port_number
70 if port_number <= 8:
71 port = 0xa0 + port_number
72 elif port_number <= 16:
73 port = 0xb0 + (port_number - 8)
74 elif port_number <= 24:
75 port = 0xc0 + (port_number - 16)
76 else:
77 assert False, "Invalid port port_number (%i)" % port_number
78 debug("%i - %02x" % (port_number, port))
79 return port
80
81
82
83def hex_to_port(port):
84 """ Convert hexadecimal port to human port number """
85 base = port & 0xf0
86 index = port & 0x0f
87 if (base ^ 0xa0) == 0:
88 port_number = index + 0
89 elif (base ^ 0xb0) == 0:
90 port_number = index + 8
91 elif (base ^ 0xc0) == 0:
92 port_number = index + 16
93 else:
94 assert False, "Invalid port (%02x)" % port
95 debug("%02x - %i" % (port, port_number))
96 return port_number
97
98
99
100def send_raw_command(raw_command, response_size=1024):
101 """ Send raw command to serial device and wait for response """
102
103 debug("Going to send: " + hex_to_str(raw_command))
104 send_line = "".join([chr(item) for item in raw_command])
105 ser.write(send_line)
106 recv_line = ser.read(response_size)
107 recv_command = str_to_hex(recv_line)
108 debug("Received: %s (%i)" % (hex_to_str(recv_command), len(recv_line)))
109 return(recv_command)
110
111
112
113def address_to_num(address):
114 """ Convert internal address representation to integer """
115 return (address[0] << 8) ^ address[1]
116
117
118
119def num_to_address(npm_number):
120 """ Convert address number to internal representation """
121 return [npm_number >> 8, npm_number & 0x00ff]
122
123
124
125def bin_reverse(number,width=8):
126 """Little hacking using string logic to binary reverse number"""
127 return int(bin(number)[2:].zfill(width)[::-1],2)
128
129
130# Login cycle
131#command = action_login + device_id + password
132#send_command(ser, command)
133# Reference implementation lines
134# A = action, B = address, C = password, P = port
135# Mostly of type [A, A, B, B, C, C, C, C] or [A, A, B, B] or [A, A, B, B, P]
136# (command, return_type, timeout)
137line = dict()
138line['login'] = ([0x55, 0x07, 0xff, 0xff, 0x12, 0x34, 0x56, 0x78], 5, 1)
139line['status'] = ([0xd1, 0x03, 0xff, 0xff], 42, 1)
140line['allon'] = ([0xb1, 0x03, 0xff, 0xff], 6, 13)
141line['alloff'] = ([0xc1, 0x03, 0xff, 0xff], 6, 13)
142line['port_on'] = ([0xb2, 0x04, 0xff, 0xff, 0xa1], 6, 1)
143line['port_off'] = ([0xc2, 0x04, 0xff,0xff, 0xa1], 6, 1)
144line['power_on_interval_125'] = ([0xd6, 0x04, 0xff, 0xff, 0xfa, 0x28], 5, 1)
145line['power_on_interval_05'] = ([0xd6, 0x04, 0xff, 0xff, 0x01, 0xd3], 5, 1)
146line['change_address_code'] = ([0x05, 0x00, 0xff, 0xff, 0x00, 0x04], 5, 1)
147line['modify_password'] = ([0xd3, 0x07, 0xff, 0xff, 0x11, 0x11, 0x11, 0x11], 5, 1)
148
149def num_to_hex(number):
150 """ Number to internal hexadecimal representation """
151 if number == None:
152 return []
153 length = len(hex(number)[2:]) + (len(hex(number)[2:]) % 2)
154 return [int(hex(number)[2:].zfill(length)[x:x+2],16) for x in range(0,length,2)]
155
156#print hex_to_str(num_to_hex(opt_password))
157#exit(0)
158
159
160
161def send_command(action, argument=[]):
162 """ Send CRC computed command to serial device and wait for response """
163 (command, response_size, timeout) = line[action]
164 if not isinstance(argument, list):
165 argument = num_to_hex(argument)
166 command = command[0:2] + num_to_hex(opt_address_code) + argument
167 serial.timeout = timeout
168 raw_command = command + [make_checksum(command)]
169 return send_raw_command(raw_command, response_size)
170
171
172def action_login():
173 """ Login to device """
174 return send_command('login', opt_password)
175
176def action_status():
177 """ Get port status from device """
178 action_login()
179 return send_command('status')
180
181def action_port_on(port):
182 """ Enable port on device """
183 global port_status_synced
184 port_status_synced = False
185
186 action_login()
187 return send_command('port_on', port_to_hex(port))
188
189def action_port_off(port):
190 """ Disable port on device """
191 global port_status_synced
192 port_status_synced = False
193
194 action_login()
195 return send_command('port_off', port_to_hex(port))
196
197def action_toggle_port(port):
198 """ Toggle port state """
199 if get_port_status(port):
200 action_port_off(port)
201 else:
202 action_port_on(port)
203
204def get_ports_status():
205 global ports_status, port_status_synced, ports_ampere, grid_status
206 ports_status_synced = False
207 # Example port 1 is off
208 # d1 28 ff ff fe ff ff
209 # ^^ ^^ ^^
210 # TODO: Implement Ampere monitoring
211 #[01:38:14] Send: 55 07 ff ff 12 34 56 78 5a ff ff d2 00
212 #[01:38:14] Recv: ff ff a9 d1 28 ff ff 00 07 00 11 00 00 00 00 00 00 00 00
213 # 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
214 # 28 00 28 00 00 10 ff
215 #[01:38:14] Comm: Refresh
216 port_array = [False] * 25
217 ports_ampere = [0] * 25
218 grid_status = [0] * 3
219
220 action_login()
221 retval = send_command('status')
222 status_array = bin_reverse((bin_reverse(retval[4]) << 16) ^ (bin_reverse(retval[5]) << 8) ^ bin_reverse(retval[6]),24)
223 for port in range(0,24):
224 if (status_array & (1 << port)) > 0:
225 port_array[port+1] = True
226 # retval[7] is the sensor state?
227 ports_ampere = [0] + retval[8:16] + retval[17:25] + retval[26:34]
228 grid_status = [retval[16], retval[25], retval[34]]
229
230 # Update global state
231 ports_status = port_array
232 port_status_synced = True
233
234 return port_array
235
236
237
238def get_port_status(port):
239 """ Get specific port status """
240 global ports_status, port_status_synced
241
242 if not port_status_synced:
243 get_ports_status()
244 return ports_status[port]
245
246
247
248def get_port_ampere(port):
249 """ Get specific port ampere usage """
250 global ports_ampere, port_status_synced
251
252 if not port_status_synced:
253 get_ports_status()
254 return ports_ampere[port]
255
256
257
258def get_grid_status():
259 """ Get grid ampere usage """
260 global grid_status, ports_status_synced
261
262 if not port_status_synced:
263 get_ports_status()
264 return grid_status
265
266
267
268def bool_to_str(boolean, raw=False):
269 if not raw:
270 return str(boolean)
271 elif boolean:
272 return "1"
273 else:
274 return "0"
275
276
277def usage(msg="",exitcode=None):
278 if msg:
279 msg = "[ERROR] %s" % msg
280 print """%(msg)s
281Usage %(argv)s arguments
282Version: $Id: serial-npm4000.py 162 2010-07-16 13:53:19Z rick $
283
284Arguments:
285 [-h|--help] Reading right know
286 [-a|--ampere] Display ampere readings
287 [-d|--debug] Print extra communication output
288 [-r|--raw] Status(es) is bits like output
289 --serialport=<path> Serial Port to connect to [%(serialport)s]
290 --password=<hex> Password to use in hex notation [%(password)s]
291 --addresscode=<hex> Internal device number in hex notation [%(addresscode)s]
292 --delay=<int> Delay used between port operations [%(delay)s]
293 --baudrate=<int> Bautrate used for communication (19200,9600) [%(baudrate)s]
294 [-s <port>|--status=<port>] Current port(s) configuration
295 [-t <port>|--toggle=<port>] Toggle port(s)
296 [-o <port>|--on=<port>] Turn on port(s)
297 [-f <port>|--off=<port>] Turn off port(s)
298 --allon Turn all ports on using internal safety [TODO: Implement]
299 --alloff Turn all ports off using internal safety [TODO: Implement]
300 --changepassword= Change password [TODO: Implement]
301 --changeaddresscode= Change addresscode [TODO: Implement]
302 --changetimerdelay= Change internal timer delay [TODO: Implement]
303 --pinballtest=<int> Randomly toggle ports for number of times]
304 --wheel_of_fortune Wheel of fortune implementation
305 [-p <port>|--port=<port>] Ports needed to be used
306
307Note: [TODO: Implement] bit codes are in the source code, feel free to drop me
308an email <info@rickvanderzwet.nl> if you really to need to be in there.
309
310Note: <port> has different notations:
311 Numeric value of port 1,2,3,4,5,..
312 Actual value of port A1,..,A8,B1,..,B8,C1,..,C8
313 All ports all
314
315%(msg)s
316 """ % { 'argv' : sys.argv[0],
317 'msg' : msg,
318 'serialport' : opt_serial_port,
319 'password' : opt_password,
320 'addresscode' : opt_address_code,
321 'delay' : opt_delay,
322 'baudrate' : opt_baudrate,
323 }
324 if exitcode != None:
325 sys.exit(exitcode)
326
327def main():
328 global DEBUG, ser, opt_serial_port, opt_password, opt_address_code, opt_baudrate, opt_delay, opt_pinballtest
329 try:
330 opts, args = getopt.getopt(sys.argv[1:],
331 "adhf:s:t:ro:p:v",
332 ["ampere", "debug", "delay=", "help", "verbose", "serialport=",
333 "port=", "password=", "addresscode=","toggle=","off=", "on=",
334 "status=", "buadrate=", "raw=", "pinballtest=",
335 "wheel_of_fortune"])
336 except getopt.GetoptError, err:
337 usage(str(err),2)
338
339 opt_port = None
340 opt_action = None
341 opt_raw = False
342 opt_ampere = False
343 opt_pinballtest = None
344 for o, a in opts:
345 debug("%s : %s" % (o, a))
346 if o in ["-a", "--ampere"]:
347 opt_ampere = True
348 elif o in ["-d", "--debug"]:
349 DEBUG = True
350 elif o in ["--delay"]:
351 opt_delay = float(a)
352 elif o in ["-h", "--help"]:
353 usage("",0)
354 elif o in ["--addresscode"]:
355 opt_address_code = int(a,16)
356 elif o in ["--password"]:
357 opt_passwd = int(a,16)
358 elif o in ["-p","--port"]:
359 opt_port = a
360 elif o in ["--pinballtest"]:
361 opt_action = 'pinballtest'
362 opt_pinballtest = int(a)
363 elif o in ["--buadrate"]:
364 opt_baudrate = a
365 elif o in ["--serialport"]:
366 opt_serial_port = a
367 elif o in ["-s", "--status"]:
368 opt_action = "status"
369 opt_port = a
370 elif o in ["-t","--toggle"]:
371 opt_action = "toggle"
372 opt_port = a
373 elif o in ["-f","--off"]:
374 opt_action = "off"
375 opt_port = a
376 elif o in ["-r","--raw"]:
377 opt_raw = True
378 elif o in ["-o","--on"]:
379 opt_action = "on"
380 opt_port = a
381 elif o in ["--wheel_of_fortune"]:
382 opt_action = 'wheel_of_fortune'
383 opt_port = "all"
384 else:
385 assert False, "unhandled option"
386
387 if (opt_port == None):
388 usage("No port defined",2)
389 elif (opt_action == None):
390 usage("No action defined",2)
391
392
393 # Resolve port to proper numbers array
394 ports = []
395 for port in opt_port.split(','):
396 debug("Raw port: %s" % port)
397 if port == "all":
398 ports.extend(range(1,25))
399 elif port[0] in "ABCabc":
400 print hex_to_port(int(port,16))
401 ports.append(hex_to_port(int(port,16)))
402 else:
403 ports.append(int(port))
404 debug("Operating on ports " + str(ports))
405
406 # Open serial port
407 ser = serial.Serial(opt_serial_port, opt_baudrate, timeout=5)
408 debug(serial)
409
410 if opt_action == 'pinballtest':
411 for count in range(0,opt_pinballtest):
412 port = random.choice(ports)
413 print "[%04i] Toggle port %02i" % (count, port)
414 action_toggle_port(port)
415 # Backoff time
416 time.sleep(opt_delay)
417 sys.exit(0)
418 elif opt_action == 'wheel_of_fortune':
419 # First turn all ports off
420 for port in ports:
421 action_port_off(port)
422
423 port = random.choice(ports)
424 total_time = 0.0
425 for c in range(1,random.randint(10,200)):
426 # Not all should be evaluated (50%)
427 if random.randint(0,200) > 100:
428 continue
429 action_port_on(port)
430 sleep_time = float(c) / 1000
431 total_time += sleep_time
432 time.sleep(sleep_time)
433 action_port_off(port)
434 print "[%03i] Port %i (%f)" % (c, port, sleep_time)
435 port = ((port + 1) % 25)
436 if port == 0:
437 port += 1
438
439 print "Total time: %f" % total_time
440 # Initial result
441 action_port_on(port)
442 sys.exit(0)
443
444 # Status needs real integers, hack
445 for port in ports:
446 if opt_action == "status":
447 if opt_raw:
448 print bool_to_str(get_port_status(port),opt_raw),
449 else:
450 ampere_str = ""
451 if opt_ampere:
452 ampere_str = "[%s]" % get_port_ampere(port)
453 print "Port %02i : %s %s" % (port, get_port_status(port), ampere_str)
454 elif opt_action == "toggle":
455 action_toggle_port(port)
456 elif opt_action == "on":
457 action_port_on(port)
458 elif opt_action == "off":
459 action_port_off(port)
460 else:
461 assert False, "Option '%s' invalid" % opt_action
462 # Backoff if we need to be patient
463 time.sleep(opt_delay)
464 sys.stdout.flush()
465
466 # Be nice and close correctly
467 ser.close()
468
469 if opt_ampere:
470 if opt_raw:
471 print " ".join(map(str, get_grid_status()))
472 else:
473 print "Grid A: %s" % grid_status[0]
474 print "Grid B: %s" % grid_status[1]
475 print "Grid C: %s" % grid_status[2]
476
477if __name__ == "__main__":
478 main()
Note: See TracBrowser for help on using the repository browser.