source: powerbar/serial-npm4000.py@ 403

Last change on this file since 403 was 162, checked in by Rick van der Zwet, 14 years ago

Make sure Id get set nicely

  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 13.9 KB
RevLine 
[154]1#!/usr/bin/env python
2# NPM 4000 powerbar managment script, to be used instead of the windows
3# application.
4#
[156]5# TODO: Not all features are ported yet (like amp monitoring)
6# TODO: Error handling not implemented
[154]7#
8# Licence: BSD
[162]9# Version: $Id: serial-npm4000.py 162 2010-07-16 13:53:19Z rick $
[154]10# Rick van der Zwet <info@rickvanderzwet.nl>
11
12import serial
13import sys
14import time
15import random
[156]16import getopt
[154]17
[156]18# Set to true to enable verbose communication aka debugging
[154]19DEBUG = False
20
[156]21# Default for options
22opt_serial_port = '/dev/ttyUSB0'
[158]23opt_password = 0x12345678
[156]24opt_address_code = 0xFFFF
25opt_baudrate = 19200
[159]26opt_delay = 0.0
[156]27
28# Serial connection port status is cached globally to avoid overhead
[157]29ser = None
[156]30port_status_synced = False
31ports_status = None
[159]32ports_ampere = None
33# Segment A, B, C
34grid_status = None
[156]35
[154]36def make_checksum(command):
[156]37 """ Generate CRC checksum using XOR on all bytes """
[154]38 crc = 0
39 for item in command:
40 crc ^= item
41 return crc
42
[156]43
44
[154]45def debug(msg):
[156]46 """ Print debug statement if DEBUG is set """
47 if DEBUG:
48 print msg
[154]49
[156]50
51
[154]52def hex_to_str(command):
[156]53 """ Human readable representation of command """
[154]54 return " ".join(["%02x" % item for item in command])
55
[156]56
57
[154]58def str_to_hex(s):
[156]59 """ Hexadecimal string representation of 's' """
[154]60 return [ord(x) for x in s]
61
[156]62
63
[154]64def port_to_hex(port_number):
[156]65 """ Convert integer port number to hexadecimal presentation as internal
66 location
67 """
68 if port_number < 1:
69 assert False, "Invalid port port_number (%i)" % port_number
[154]70 if port_number <= 8:
71 port = 0xa0 + port_number
72 elif port_number <= 16:
73 port = 0xb0 + (port_number - 8)
74 elif port_number <= 24:
75 port = 0xc0 + (port_number - 16)
76 else:
77 assert False, "Invalid port port_number (%i)" % port_number
78 debug("%i - %02x" % (port_number, port))
79 return port
80
81
[156]82
[154]83def hex_to_port(port):
[156]84 """ Convert hexadecimal port to human port number """
[154]85 base = port & 0xf0
86 index = port & 0x0f
87 if (base ^ 0xa0) == 0:
88 port_number = index + 0
89 elif (base ^ 0xb0) == 0:
90 port_number = index + 8
91 elif (base ^ 0xc0) == 0:
92 port_number = index + 16
93 else:
94 assert False, "Invalid port (%02x)" % port
95 debug("%02x - %i" % (port, port_number))
96 return port_number
97
98
99
[156]100def send_raw_command(raw_command, response_size=1024):
101 """ Send raw command to serial device and wait for response """
102
[158]103 debug("Going to send: " + hex_to_str(raw_command))
[154]104 send_line = "".join([chr(item) for item in raw_command])
[157]105 ser.write(send_line)
106 recv_line = ser.read(response_size)
[154]107 recv_command = str_to_hex(recv_line)
[158]108 debug("Received: %s (%i)" % (hex_to_str(recv_command), len(recv_line)))
[155]109 return(recv_command)
[154]110
111
112
[156]113def address_to_num(address):
114 """ Convert internal address representation to integer """
115 return (address[0] << 8) ^ address[1]
[154]116
117
118
[156]119def num_to_address(npm_number):
120 """ Convert address number to internal representation """
121 return [npm_number >> 8, npm_number & 0x00ff]
[154]122
123
124
[156]125def bin_reverse(number,width=8):
126 """Little hacking using string logic to binary reverse number"""
127 return int(bin(number)[2:].zfill(width)[::-1],2)
128
129
[154]130# Login cycle
131#command = action_login + device_id + password
132#send_command(ser, command)
[156]133# Reference implementation lines
134# A = action, B = address, C = password, P = port
135# Mostly of type [A, A, B, B, C, C, C, C] or [A, A, B, B] or [A, A, B, B, P]
136# (command, return_type, timeout)
137line = dict()
138line['login'] = ([0x55, 0x07, 0xff, 0xff, 0x12, 0x34, 0x56, 0x78], 5, 1)
139line['status'] = ([0xd1, 0x03, 0xff, 0xff], 42, 1)
140line['allon'] = ([0xb1, 0x03, 0xff, 0xff], 6, 13)
141line['alloff'] = ([0xc1, 0x03, 0xff, 0xff], 6, 13)
142line['port_on'] = ([0xb2, 0x04, 0xff, 0xff, 0xa1], 6, 1)
143line['port_off'] = ([0xc2, 0x04, 0xff,0xff, 0xa1], 6, 1)
144line['power_on_interval_125'] = ([0xd6, 0x04, 0xff, 0xff, 0xfa, 0x28], 5, 1)
145line['power_on_interval_05'] = ([0xd6, 0x04, 0xff, 0xff, 0x01, 0xd3], 5, 1)
146line['change_address_code'] = ([0x05, 0x00, 0xff, 0xff, 0x00, 0x04], 5, 1)
147line['modify_password'] = ([0xd3, 0x07, 0xff, 0xff, 0x11, 0x11, 0x11, 0x11], 5, 1)
[154]148
[156]149def num_to_hex(number):
150 """ Number to internal hexadecimal representation """
151 if number == None:
152 return []
153 length = len(hex(number)[2:]) + (len(hex(number)[2:]) % 2)
[157]154 return [int(hex(number)[2:].zfill(length)[x:x+2],16) for x in range(0,length,2)]
[154]155
[158]156#print hex_to_str(num_to_hex(opt_password))
157#exit(0)
[154]158
159
[158]160
161def send_command(action, argument=[]):
[156]162 """ Send CRC computed command to serial device and wait for response """
163 (command, response_size, timeout) = line[action]
[158]164 if not isinstance(argument, list):
165 argument = num_to_hex(argument)
166 command = command[0:2] + num_to_hex(opt_address_code) + argument
[156]167 serial.timeout = timeout
168 raw_command = command + [make_checksum(command)]
169 return send_raw_command(raw_command, response_size)
[154]170
[155]171
[156]172def action_login():
173 """ Login to device """
174 return send_command('login', opt_password)
[155]175
[156]176def action_status():
177 """ Get port status from device """
178 action_login()
179 return send_command('status')
[155]180
[156]181def action_port_on(port):
182 """ Enable port on device """
[159]183 global port_status_synced
184 port_status_synced = False
[158]185
[156]186 action_login()
[158]187 return send_command('port_on', port_to_hex(port))
[155]188
[156]189def action_port_off(port):
190 """ Disable port on device """
[159]191 global port_status_synced
192 port_status_synced = False
[158]193
[156]194 action_login()
[158]195 return send_command('port_off', port_to_hex(port))
[156]196
[160]197def action_toggle_port(port):
198 """ Toggle port state """
199 if get_port_status(port):
200 action_port_off(port)
201 else:
202 action_port_on(port)
203
[156]204def get_ports_status():
[159]205 global ports_status, port_status_synced, ports_ampere, grid_status
206 ports_status_synced = False
[155]207 # Example port 1 is off
208 # d1 28 ff ff fe ff ff
209 # ^^ ^^ ^^
[156]210 # TODO: Implement Ampere monitoring
211 #[01:38:14] Send: 55 07 ff ff 12 34 56 78 5a ff ff d2 00
212 #[01:38:14] Recv: ff ff a9 d1 28 ff ff 00 07 00 11 00 00 00 00 00 00 00 00
213 # 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
214 # 28 00 28 00 00 10 ff
215 #[01:38:14] Comm: Refresh
[155]216 port_array = [False] * 25
[159]217 ports_ampere = [0] * 25
218 grid_status = [0] * 3
[156]219
[157]220 action_login()
[156]221 retval = send_command('status')
[155]222 status_array = bin_reverse((bin_reverse(retval[4]) << 16) ^ (bin_reverse(retval[5]) << 8) ^ bin_reverse(retval[6]),24)
223 for port in range(0,24):
224 if (status_array & (1 << port)) > 0:
225 port_array[port+1] = True
[159]226 # retval[7] is the sensor state?
227 ports_ampere = [0] + retval[8:16] + retval[17:25] + retval[26:34]
228 grid_status = [retval[16], retval[25], retval[34]]
[156]229
230 # Update global state
231 ports_status = port_array
[159]232 port_status_synced = True
233
[155]234 return port_array
235
236
[159]237
[156]238def get_port_status(port):
239 """ Get specific port status """
[159]240 global ports_status, port_status_synced
[158]241
[159]242 if not port_status_synced:
243 get_ports_status()
244 return ports_status[port]
[154]245
246
247
[159]248def get_port_ampere(port):
249 """ Get specific port ampere usage """
250 global ports_ampere, port_status_synced
[154]251
[159]252 if not port_status_synced:
253 get_ports_status()
254 return ports_ampere[port]
[154]255
256
257
[159]258def get_grid_status():
259 """ Get grid ampere usage """
260 global grid_status, ports_status_synced
261
262 if not port_status_synced:
263 get_ports_status()
264 return grid_status
265
[154]266
[159]267
[158]268def bool_to_str(boolean, raw=False):
269 if not raw:
270 return str(boolean)
271 elif boolean:
272 return "1"
273 else:
274 return "0"
275
[154]276
[156]277def usage(msg="",exitcode=None):
[160]278 if msg:
279 msg = "[ERROR] %s" % msg
280 print """%(msg)s
281Usage %(argv)s arguments
[162]282Version: $Id: serial-npm4000.py 162 2010-07-16 13:53:19Z rick $
[154]283
284Arguments:
285 [-h|--help] Reading right know
[159]286 [-a|--ampere] Display ampere readings
[156]287 [-d|--debug] Print extra communication output
[158]288 [-r|--raw] Status(es) is bits like output
[160]289 --serialport=<path> Serial Port to connect to [%(serialport)s]
290 --password=<hex> Password to use in hex notation [%(password)s]
291 --addresscode=<hex> Internal device number in hex notation [%(addresscode)s]
292 --delay=<int> Delay used between port operations [%(delay)s]
293 --baudrate=<int> Bautrate used for communication (19200,9600) [%(baudrate)s]
[156]294 [-s <port>|--status=<port>] Current port(s) configuration
[154]295 [-t <port>|--toggle=<port>] Toggle port(s)
296 [-o <port>|--on=<port>] Turn on port(s)
297 [-f <port>|--off=<port>] Turn off port(s)
[156]298 --allon Turn all ports on using internal safety [TODO: Implement]
299 --alloff Turn all ports off using internal safety [TODO: Implement]
300 --changepassword= Change password [TODO: Implement]
301 --changeaddresscode= Change addresscode [TODO: Implement]
302 --changetimerdelay= Change internal timer delay [TODO: Implement]
[160]303 --pinballtest=<int> Randomly toggle ports for number of times]
[161]304 --wheel_of_fortune Wheel of fortune implementation
[160]305 [-p <port>|--port=<port>] Ports needed to be used
[154]306
[159]307Note: [TODO: Implement] bit codes are in the source code, feel free to drop me
308an email <info@rickvanderzwet.nl> if you really to need to be in there.
309
[154]310Note: <port> has different notations:
311 Numeric value of port 1,2,3,4,5,..
312 Actual value of port A1,..,A8,B1,..,B8,C1,..,C8
313 All ports all
[160]314
315%(msg)s
316 """ % { 'argv' : sys.argv[0],
317 'msg' : msg,
318 'serialport' : opt_serial_port,
319 'password' : opt_password,
320 'addresscode' : opt_address_code,
321 'delay' : opt_delay,
322 'baudrate' : opt_baudrate,
323 }
324 if exitcode != None:
[156]325 sys.exit(exitcode)
[154]326
327def main():
[160]328 global DEBUG, ser, opt_serial_port, opt_password, opt_address_code, opt_baudrate, opt_delay, opt_pinballtest
[154]329 try:
330 opts, args = getopt.getopt(sys.argv[1:],
[160]331 "adhf:s:t:ro:p:v",
[161]332 ["ampere", "debug", "delay=", "help", "verbose", "serialport=",
333 "port=", "password=", "addresscode=","toggle=","off=", "on=",
334 "status=", "buadrate=", "raw=", "pinballtest=",
335 "wheel_of_fortune"])
[154]336 except getopt.GetoptError, err:
[156]337 usage(str(err),2)
[154]338
339 opt_port = None
340 opt_action = None
[158]341 opt_raw = False
[159]342 opt_ampere = False
[160]343 opt_pinballtest = None
[154]344 for o, a in opts:
[157]345 debug("%s : %s" % (o, a))
[159]346 if o in ["-a", "--ampere"]:
347 opt_ampere = True
348 elif o in ["-d", "--debug"]:
[156]349 DEBUG = True
[159]350 elif o in ["--delay"]:
351 opt_delay = float(a)
[157]352 elif o in ["-h", "--help"]:
[156]353 usage("",0)
[157]354 elif o in ["--addresscode"]:
[156]355 opt_address_code = int(a,16)
[157]356 elif o in ["--password"]:
[156]357 opt_passwd = int(a,16)
[160]358 elif o in ["-p","--port"]:
359 opt_port = a
360 elif o in ["--pinballtest"]:
361 opt_action = 'pinballtest'
362 opt_pinballtest = int(a)
[157]363 elif o in ["--buadrate"]:
[156]364 opt_baudrate = a
[157]365 elif o in ["--serialport"]:
[156]366 opt_serial_port = a
[157]367 elif o in ["-s", "--status"]:
[154]368 opt_action = "status"
369 opt_port = a
[157]370 elif o in ["-t","--toggle"]:
[154]371 opt_action = "toggle"
372 opt_port = a
[157]373 elif o in ["-f","--off"]:
[154]374 opt_action = "off"
375 opt_port = a
[158]376 elif o in ["-r","--raw"]:
377 opt_raw = True
[157]378 elif o in ["-o","--on"]:
[154]379 opt_action = "on"
380 opt_port = a
[161]381 elif o in ["--wheel_of_fortune"]:
382 opt_action = 'wheel_of_fortune'
383 opt_port = "all"
[154]384 else:
385 assert False, "unhandled option"
386
[157]387 if (opt_port == None):
[160]388 usage("No port defined",2)
[157]389 elif (opt_action == None):
[160]390 usage("No action defined",2)
[154]391
392
[156]393 # Resolve port to proper numbers array
394 ports = []
395 for port in opt_port.split(','):
[157]396 debug("Raw port: %s" % port)
[156]397 if port == "all":
398 ports.extend(range(1,25))
[157]399 elif port[0] in "ABCabc":
400 print hex_to_port(int(port,16))
401 ports.append(hex_to_port(int(port,16)))
[156]402 else:
[157]403 ports.append(int(port))
404 debug("Operating on ports " + str(ports))
[154]405
[159]406 # Open serial port
[157]407 ser = serial.Serial(opt_serial_port, opt_baudrate, timeout=5)
[156]408 debug(serial)
[154]409
[161]410 if opt_action == 'pinballtest':
[160]411 for count in range(0,opt_pinballtest):
412 port = random.choice(ports)
413 print "[%04i] Toggle port %02i" % (count, port)
414 action_toggle_port(port)
415 # Backoff time
416 time.sleep(opt_delay)
417 sys.exit(0)
[161]418 elif opt_action == 'wheel_of_fortune':
419 # First turn all ports off
420 for port in ports:
421 action_port_off(port)
422
423 port = random.choice(ports)
424 total_time = 0.0
425 for c in range(1,random.randint(10,200)):
426 # Not all should be evaluated (50%)
427 if random.randint(0,200) > 100:
428 continue
429 action_port_on(port)
430 sleep_time = float(c) / 1000
431 total_time += sleep_time
432 time.sleep(sleep_time)
433 action_port_off(port)
434 print "[%03i] Port %i (%f)" % (c, port, sleep_time)
435 port = ((port + 1) % 25)
436 if port == 0:
437 port += 1
438
439 print "Total time: %f" % total_time
440 # Initial result
441 action_port_on(port)
442 sys.exit(0)
[160]443
[156]444 # Status needs real integers, hack
445 for port in ports:
446 if opt_action == "status":
[159]447 if opt_raw:
448 print bool_to_str(get_port_status(port),opt_raw),
449 else:
450 ampere_str = ""
451 if opt_ampere:
452 ampere_str = "[%s]" % get_port_ampere(port)
453 print "Port %02i : %s %s" % (port, get_port_status(port), ampere_str)
[156]454 elif opt_action == "toggle":
[160]455 action_toggle_port(port)
[156]456 elif opt_action == "on":
457 action_port_on(port)
458 elif opt_action == "off":
459 action_port_off(port)
460 else:
461 assert False, "Option '%s' invalid" % opt_action
[159]462 # Backoff if we need to be patient
463 time.sleep(opt_delay)
464 sys.stdout.flush()
[154]465
[159]466 # Be nice and close correctly
[157]467 ser.close()
[154]468
[159]469 if opt_ampere:
470 if opt_raw:
471 print " ".join(map(str, get_grid_status()))
472 else:
473 print "Grid A: %s" % grid_status[0]
474 print "Grid B: %s" % grid_status[1]
475 print "Grid C: %s" % grid_status[2]
476
[154]477if __name__ == "__main__":
478 main()
Note: See TracBrowser for help on using the repository browser.