source: powerbar/serial-npm4000.py@ 160

Last change on this file since 160 was 160, checked in by Rick van der Zwet, 14 years ago

Pinball test to test all ports in some kind of pinball style foo

  • Property svn:executable set to *
File size: 13.1 KB
Line 
1#!/usr/bin/env python
2# NPM 4000 powerbar managment script, to be used instead of the windows
3# application.
4#
5# TODO: Not all features are ported yet (like amp monitoring)
6# TODO: Error handling not implemented
7#
8# Licence: BSD
9# Version: $Id: npm4000.py 750 2009-09-27 14:34:55Z rick $
10# Rick van der Zwet <info@rickvanderzwet.nl>
11
12import serial
13import sys
14import time
15import random
16import getopt
17
18# Set to true to enable verbose communication aka debugging
19DEBUG = False
20
21# Default for options
22opt_serial_port = '/dev/ttyUSB0'
23opt_password = 0x12345678
24opt_address_code = 0xFFFF
25opt_baudrate = 19200
26opt_delay = 0.0
27
28# Serial connection port status is cached globally to avoid overhead
29ser = None
30port_status_synced = False
31ports_status = None
32ports_ampere = None
33# Segment A, B, C
34grid_status = None
35
36def make_checksum(command):
37 """ Generate CRC checksum using XOR on all bytes """
38 crc = 0
39 for item in command:
40 crc ^= item
41 return crc
42
43
44
45def debug(msg):
46 """ Print debug statement if DEBUG is set """
47 if DEBUG:
48 print msg
49
50
51
52def hex_to_str(command):
53 """ Human readable representation of command """
54 return " ".join(["%02x" % item for item in command])
55
56
57
58def str_to_hex(s):
59 """ Hexadecimal string representation of 's' """
60 return [ord(x) for x in s]
61
62
63
64def port_to_hex(port_number):
65 """ Convert integer port number to hexadecimal presentation as internal
66 location
67 """
68 if port_number < 1:
69 assert False, "Invalid port port_number (%i)" % port_number
70 if port_number <= 8:
71 port = 0xa0 + port_number
72 elif port_number <= 16:
73 port = 0xb0 + (port_number - 8)
74 elif port_number <= 24:
75 port = 0xc0 + (port_number - 16)
76 else:
77 assert False, "Invalid port port_number (%i)" % port_number
78 debug("%i - %02x" % (port_number, port))
79 return port
80
81
82
83def hex_to_port(port):
84 """ Convert hexadecimal port to human port number """
85 base = port & 0xf0
86 index = port & 0x0f
87 if (base ^ 0xa0) == 0:
88 port_number = index + 0
89 elif (base ^ 0xb0) == 0:
90 port_number = index + 8
91 elif (base ^ 0xc0) == 0:
92 port_number = index + 16
93 else:
94 assert False, "Invalid port (%02x)" % port
95 debug("%02x - %i" % (port, port_number))
96 return port_number
97
98
99
100def send_raw_command(raw_command, response_size=1024):
101 """ Send raw command to serial device and wait for response """
102
103 debug("Going to send: " + hex_to_str(raw_command))
104 send_line = "".join([chr(item) for item in raw_command])
105 ser.write(send_line)
106 recv_line = ser.read(response_size)
107 recv_command = str_to_hex(recv_line)
108 debug("Received: %s (%i)" % (hex_to_str(recv_command), len(recv_line)))
109 return(recv_command)
110
111
112
113def address_to_num(address):
114 """ Convert internal address representation to integer """
115 return (address[0] << 8) ^ address[1]
116
117
118
119def num_to_address(npm_number):
120 """ Convert address number to internal representation """
121 return [npm_number >> 8, npm_number & 0x00ff]
122
123
124
125def bin_reverse(number,width=8):
126 """Little hacking using string logic to binary reverse number"""
127 return int(bin(number)[2:].zfill(width)[::-1],2)
128
129
130# Login cycle
131#command = action_login + device_id + password
132#send_command(ser, command)
133# Reference implementation lines
134# A = action, B = address, C = password, P = port
135# Mostly of type [A, A, B, B, C, C, C, C] or [A, A, B, B] or [A, A, B, B, P]
136# (command, return_type, timeout)
137line = dict()
138line['login'] = ([0x55, 0x07, 0xff, 0xff, 0x12, 0x34, 0x56, 0x78], 5, 1)
139line['status'] = ([0xd1, 0x03, 0xff, 0xff], 42, 1)
140line['allon'] = ([0xb1, 0x03, 0xff, 0xff], 6, 13)
141line['alloff'] = ([0xc1, 0x03, 0xff, 0xff], 6, 13)
142line['port_on'] = ([0xb2, 0x04, 0xff, 0xff, 0xa1], 6, 1)
143line['port_off'] = ([0xc2, 0x04, 0xff,0xff, 0xa1], 6, 1)
144line['power_on_interval_125'] = ([0xd6, 0x04, 0xff, 0xff, 0xfa, 0x28], 5, 1)
145line['power_on_interval_05'] = ([0xd6, 0x04, 0xff, 0xff, 0x01, 0xd3], 5, 1)
146line['change_address_code'] = ([0x05, 0x00, 0xff, 0xff, 0x00, 0x04], 5, 1)
147line['modify_password'] = ([0xd3, 0x07, 0xff, 0xff, 0x11, 0x11, 0x11, 0x11], 5, 1)
148
149def num_to_hex(number):
150 """ Number to internal hexadecimal representation """
151 if number == None:
152 return []
153 length = len(hex(number)[2:]) + (len(hex(number)[2:]) % 2)
154 return [int(hex(number)[2:].zfill(length)[x:x+2],16) for x in range(0,length,2)]
155
156#print hex_to_str(num_to_hex(opt_password))
157#exit(0)
158
159
160
161def send_command(action, argument=[]):
162 """ Send CRC computed command to serial device and wait for response """
163 (command, response_size, timeout) = line[action]
164 if not isinstance(argument, list):
165 argument = num_to_hex(argument)
166 command = command[0:2] + num_to_hex(opt_address_code) + argument
167 serial.timeout = timeout
168 raw_command = command + [make_checksum(command)]
169 return send_raw_command(raw_command, response_size)
170
171
172def action_login():
173 """ Login to device """
174 return send_command('login', opt_password)
175
176def action_status():
177 """ Get port status from device """
178 action_login()
179 return send_command('status')
180
181def action_port_on(port):
182 """ Enable port on device """
183 global port_status_synced
184 port_status_synced = False
185
186 action_login()
187 return send_command('port_on', port_to_hex(port))
188
189def action_port_off(port):
190 """ Disable port on device """
191 global port_status_synced
192 port_status_synced = False
193
194 action_login()
195 return send_command('port_off', port_to_hex(port))
196
197def action_toggle_port(port):
198 """ Toggle port state """
199 if get_port_status(port):
200 action_port_off(port)
201 else:
202 action_port_on(port)
203
204def get_ports_status():
205 global ports_status, port_status_synced, ports_ampere, grid_status
206 ports_status_synced = False
207 # Example port 1 is off
208 # d1 28 ff ff fe ff ff
209 # ^^ ^^ ^^
210 # TODO: Implement Ampere monitoring
211 #[01:38:14] Send: 55 07 ff ff 12 34 56 78 5a ff ff d2 00
212 #[01:38:14] Recv: ff ff a9 d1 28 ff ff 00 07 00 11 00 00 00 00 00 00 00 00
213 # 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
214 # 28 00 28 00 00 10 ff
215 #[01:38:14] Comm: Refresh
216 port_array = [False] * 25
217 ports_ampere = [0] * 25
218 grid_status = [0] * 3
219
220 action_login()
221 retval = send_command('status')
222 status_array = bin_reverse((bin_reverse(retval[4]) << 16) ^ (bin_reverse(retval[5]) << 8) ^ bin_reverse(retval[6]),24)
223 for port in range(0,24):
224 if (status_array & (1 << port)) > 0:
225 port_array[port+1] = True
226 # retval[7] is the sensor state?
227 ports_ampere = [0] + retval[8:16] + retval[17:25] + retval[26:34]
228 grid_status = [retval[16], retval[25], retval[34]]
229
230 # Update global state
231 ports_status = port_array
232 port_status_synced = True
233
234 return port_array
235
236
237
238def get_port_status(port):
239 """ Get specific port status """
240 global ports_status, port_status_synced
241
242 if not port_status_synced:
243 get_ports_status()
244 return ports_status[port]
245
246
247
248def get_port_ampere(port):
249 """ Get specific port ampere usage """
250 global ports_ampere, port_status_synced
251
252 if not port_status_synced:
253 get_ports_status()
254 return ports_ampere[port]
255
256
257
258def get_grid_status():
259 """ Get grid ampere usage """
260 global grid_status, ports_status_synced
261
262 if not port_status_synced:
263 get_ports_status()
264 return grid_status
265
266
267
268def bool_to_str(boolean, raw=False):
269 if not raw:
270 return str(boolean)
271 elif boolean:
272 return "1"
273 else:
274 return "0"
275
276
277def usage(msg="",exitcode=None):
278 if msg:
279 msg = "[ERROR] %s" % msg
280 print """%(msg)s
281Usage %(argv)s arguments
282Version: $Id: npm4000.py 750 2009-09-27 14:34:55Z rick $
283
284Arguments:
285 [-h|--help] Reading right know
286 [-a|--ampere] Display ampere readings
287 [-d|--debug] Print extra communication output
288 [-r|--raw] Status(es) is bits like output
289 --serialport=<path> Serial Port to connect to [%(serialport)s]
290 --password=<hex> Password to use in hex notation [%(password)s]
291 --addresscode=<hex> Internal device number in hex notation [%(addresscode)s]
292 --delay=<int> Delay used between port operations [%(delay)s]
293 --baudrate=<int> Bautrate used for communication (19200,9600) [%(baudrate)s]
294 [-s <port>|--status=<port>] Current port(s) configuration
295 [-t <port>|--toggle=<port>] Toggle port(s)
296 [-o <port>|--on=<port>] Turn on port(s)
297 [-f <port>|--off=<port>] Turn off port(s)
298 --allon Turn all ports on using internal safety [TODO: Implement]
299 --alloff Turn all ports off using internal safety [TODO: Implement]
300 --changepassword= Change password [TODO: Implement]
301 --changeaddresscode= Change addresscode [TODO: Implement]
302 --changetimerdelay= Change internal timer delay [TODO: Implement]
303 --pinballtest=<int> Randomly toggle ports for number of times]
304 [-p <port>|--port=<port>] Ports needed to be used
305
306Note: [TODO: Implement] bit codes are in the source code, feel free to drop me
307an email <info@rickvanderzwet.nl> if you really to need to be in there.
308
309Note: <port> has different notations:
310 Numeric value of port 1,2,3,4,5,..
311 Actual value of port A1,..,A8,B1,..,B8,C1,..,C8
312 All ports all
313
314%(msg)s
315 """ % { 'argv' : sys.argv[0],
316 'msg' : msg,
317 'serialport' : opt_serial_port,
318 'password' : opt_password,
319 'addresscode' : opt_address_code,
320 'delay' : opt_delay,
321 'baudrate' : opt_baudrate,
322 }
323 if exitcode != None:
324 sys.exit(exitcode)
325
326def main():
327 global DEBUG, ser, opt_serial_port, opt_password, opt_address_code, opt_baudrate, opt_delay, opt_pinballtest
328 try:
329 opts, args = getopt.getopt(sys.argv[1:],
330 "adhf:s:t:ro:p:v",
331 ["ampere", "debug", "delay=", "help", "verbose", "serialport=", "port=", "password=",
332 "addresscode=","toggle=","off=", "on=", "status=", "buadrate=", "raw=", "pinballtest="])
333 except getopt.GetoptError, err:
334 usage(str(err),2)
335
336 opt_port = None
337 opt_action = None
338 opt_raw = False
339 opt_ampere = False
340 opt_pinballtest = None
341 for o, a in opts:
342 debug("%s : %s" % (o, a))
343 if o in ["-a", "--ampere"]:
344 opt_ampere = True
345 elif o in ["-d", "--debug"]:
346 DEBUG = True
347 elif o in ["--delay"]:
348 opt_delay = float(a)
349 elif o in ["-h", "--help"]:
350 usage("",0)
351 elif o in ["--addresscode"]:
352 opt_address_code = int(a,16)
353 elif o in ["--password"]:
354 opt_passwd = int(a,16)
355 elif o in ["-p","--port"]:
356 opt_port = a
357 elif o in ["--pinballtest"]:
358 opt_action = 'pinballtest'
359 opt_pinballtest = int(a)
360 elif o in ["--buadrate"]:
361 opt_baudrate = a
362 elif o in ["--serialport"]:
363 opt_serial_port = a
364 elif o in ["-s", "--status"]:
365 opt_action = "status"
366 opt_port = a
367 elif o in ["-t","--toggle"]:
368 opt_action = "toggle"
369 opt_port = a
370 elif o in ["-f","--off"]:
371 opt_action = "off"
372 opt_port = a
373 elif o in ["-r","--raw"]:
374 opt_raw = True
375 elif o in ["-o","--on"]:
376 opt_action = "on"
377 opt_port = a
378 else:
379 assert False, "unhandled option"
380
381 if (opt_port == None):
382 usage("No port defined",2)
383 elif (opt_action == None):
384 usage("No action defined",2)
385
386
387 # Resolve port to proper numbers array
388 ports = []
389 for port in opt_port.split(','):
390 debug("Raw port: %s" % port)
391 if port == "all":
392 ports.extend(range(1,25))
393 elif port[0] in "ABCabc":
394 print hex_to_port(int(port,16))
395 ports.append(hex_to_port(int(port,16)))
396 else:
397 ports.append(int(port))
398 debug("Operating on ports " + str(ports))
399
400 # Open serial port
401 ser = serial.Serial(opt_serial_port, opt_baudrate, timeout=5)
402 debug(serial)
403
404 if opt_pinballtest:
405 for count in range(0,opt_pinballtest):
406 port = random.choice(ports)
407 print "[%04i] Toggle port %02i" % (count, port)
408 action_toggle_port(port)
409 # Backoff time
410 time.sleep(opt_delay)
411 sys.exit(0)
412
413 # Status needs real integers, hack
414 for port in ports:
415 if opt_action == "status":
416 if opt_raw:
417 print bool_to_str(get_port_status(port),opt_raw),
418 else:
419 ampere_str = ""
420 if opt_ampere:
421 ampere_str = "[%s]" % get_port_ampere(port)
422 print "Port %02i : %s %s" % (port, get_port_status(port), ampere_str)
423 elif opt_action == "toggle":
424 action_toggle_port(port)
425 elif opt_action == "on":
426 action_port_on(port)
427 elif opt_action == "off":
428 action_port_off(port)
429 else:
430 assert False, "Option '%s' invalid" % opt_action
431 # Backoff if we need to be patient
432 time.sleep(opt_delay)
433 sys.stdout.flush()
434
435 # Be nice and close correctly
436 ser.close()
437
438 if opt_ampere:
439 if opt_raw:
440 print " ".join(map(str, get_grid_status()))
441 else:
442 print "Grid A: %s" % grid_status[0]
443 print "Grid B: %s" % grid_status[1]
444 print "Grid C: %s" % grid_status[2]
445
446if __name__ == "__main__":
447 main()
Note: See TracBrowser for help on using the repository browser.