source: powerbar/serial-npm4000.py@ 158

Last change on this file since 158 was 158, checked in by Rick van der Zwet, 14 years ago

We got a lift off :-)

  • Property svn:executable set to *
File size: 11.4 KB
RevLine 
[154]1#!/usr/bin/env python
2# NPM 4000 powerbar managment script, to be used instead of the windows
3# application.
4#
[156]5# TODO: Not all features are ported yet (like amp monitoring)
6# TODO: Error handling not implemented
[154]7#
8# Licence: BSD
9# Version: $Id: npm4000.py 750 2009-09-27 14:34:55Z rick $
10# Rick van der Zwet <info@rickvanderzwet.nl>
11
12import serial
13import sys
14import time
15import random
[156]16import getopt
[154]17
[156]18# Set to true to enable verbose communication aka debugging
[154]19DEBUG = False
20
[156]21# Default for options
22opt_serial_port = '/dev/ttyUSB0'
[158]23opt_password = 0x12345678
[156]24opt_address_code = 0xFFFF
25opt_baudrate = 19200
26
27# Serial connection port status is cached globally to avoid overhead
[157]28ser = None
[156]29port_status_synced = False
30ports_status = None
31
[154]32def make_checksum(command):
[156]33 """ Generate CRC checksum using XOR on all bytes """
[154]34 crc = 0
35 for item in command:
36 crc ^= item
37 return crc
38
[156]39
40
[154]41def debug(msg):
[156]42 """ Print debug statement if DEBUG is set """
43 if DEBUG:
44 print msg
[154]45
[156]46
47
[154]48def hex_to_str(command):
[156]49 """ Human readable representation of command """
[154]50 return " ".join(["%02x" % item for item in command])
51
[156]52
53
[154]54def str_to_hex(s):
[156]55 """ Hexadecimal string representation of 's' """
[154]56 return [ord(x) for x in s]
57
[156]58
59
[154]60def port_to_hex(port_number):
[156]61 """ Convert integer port number to hexadecimal presentation as internal
62 location
63 """
64 if port_number < 1:
65 assert False, "Invalid port port_number (%i)" % port_number
[154]66 if port_number <= 8:
67 port = 0xa0 + port_number
68 elif port_number <= 16:
69 port = 0xb0 + (port_number - 8)
70 elif port_number <= 24:
71 port = 0xc0 + (port_number - 16)
72 else:
73 assert False, "Invalid port port_number (%i)" % port_number
74 debug("%i - %02x" % (port_number, port))
75 return port
76
77
[156]78
[154]79def hex_to_port(port):
[156]80 """ Convert hexadecimal port to human port number """
[154]81 base = port & 0xf0
82 index = port & 0x0f
83 if (base ^ 0xa0) == 0:
84 port_number = index + 0
85 elif (base ^ 0xb0) == 0:
86 port_number = index + 8
87 elif (base ^ 0xc0) == 0:
88 port_number = index + 16
89 else:
90 assert False, "Invalid port (%02x)" % port
91 debug("%02x - %i" % (port, port_number))
92 return port_number
93
94
95
[156]96def send_raw_command(raw_command, response_size=1024):
97 """ Send raw command to serial device and wait for response """
98
[158]99 debug("Going to send: " + hex_to_str(raw_command))
[154]100 send_line = "".join([chr(item) for item in raw_command])
[157]101 ser.write(send_line)
102 recv_line = ser.read(response_size)
[154]103 recv_command = str_to_hex(recv_line)
[158]104 debug("Received: %s (%i)" % (hex_to_str(recv_command), len(recv_line)))
[155]105 return(recv_command)
[154]106
107
108
[156]109def address_to_num(address):
110 """ Convert internal address representation to integer """
111 return (address[0] << 8) ^ address[1]
[154]112
113
114
[156]115def num_to_address(npm_number):
116 """ Convert address number to internal representation """
117 return [npm_number >> 8, npm_number & 0x00ff]
[154]118
119
120
[156]121def bin_reverse(number,width=8):
122 """Little hacking using string logic to binary reverse number"""
123 return int(bin(number)[2:].zfill(width)[::-1],2)
124
125#send_command(ser, login_line, 5)
126# Note: you will need to wait at least timeout * 24 to complete
127#ser.timeout = 13
128#send_command(ser, allon_line, 6)
129#send_command(ser, alloff_line,6)
130
131#send_command(ser, power_on_interval_05_line,6)
132#send_command(ser,port_on_cmd(24),6)
133#send_command(ser,port_off_cmd(24),6)
134#retval = send_command(ser, refresh_line, 42)
135#print get_port_status(retval)
136#while True:
137# port_number = random.randint(1,24)
138# print port_number
139# send_command(ser, login_line, 5)
140# send_command(ser, port_on_cmd(port_number),6)
141# send_command(ser, login_line, 5)
142# send_command(ser, port_off_cmd(port_number),6)
143
[154]144# Login cycle
145#command = action_login + device_id + password
146#send_command(ser, command)
[156]147# Reference implementation lines
148# A = action, B = address, C = password, P = port
149# Mostly of type [A, A, B, B, C, C, C, C] or [A, A, B, B] or [A, A, B, B, P]
150# (command, return_type, timeout)
151line = dict()
152line['login'] = ([0x55, 0x07, 0xff, 0xff, 0x12, 0x34, 0x56, 0x78], 5, 1)
153line['status'] = ([0xd1, 0x03, 0xff, 0xff], 42, 1)
154line['allon'] = ([0xb1, 0x03, 0xff, 0xff], 6, 13)
155line['alloff'] = ([0xc1, 0x03, 0xff, 0xff], 6, 13)
156line['port_on'] = ([0xb2, 0x04, 0xff, 0xff, 0xa1], 6, 1)
157line['port_off'] = ([0xc2, 0x04, 0xff,0xff, 0xa1], 6, 1)
158line['power_on_interval_125'] = ([0xd6, 0x04, 0xff, 0xff, 0xfa, 0x28], 5, 1)
159line['power_on_interval_05'] = ([0xd6, 0x04, 0xff, 0xff, 0x01, 0xd3], 5, 1)
160line['change_address_code'] = ([0x05, 0x00, 0xff, 0xff, 0x00, 0x04], 5, 1)
161line['modify_password'] = ([0xd3, 0x07, 0xff, 0xff, 0x11, 0x11, 0x11, 0x11], 5, 1)
[154]162
[156]163def num_to_hex(number):
164 """ Number to internal hexadecimal representation """
165 if number == None:
166 return []
167 length = len(hex(number)[2:]) + (len(hex(number)[2:]) % 2)
[157]168 return [int(hex(number)[2:].zfill(length)[x:x+2],16) for x in range(0,length,2)]
[154]169
[158]170#print hex_to_str(num_to_hex(opt_password))
171#exit(0)
[154]172
173
[158]174
175def send_command(action, argument=[]):
[156]176 """ Send CRC computed command to serial device and wait for response """
177 (command, response_size, timeout) = line[action]
[158]178 if not isinstance(argument, list):
179 argument = num_to_hex(argument)
180 command = command[0:2] + num_to_hex(opt_address_code) + argument
[156]181 serial.timeout = timeout
182 raw_command = command + [make_checksum(command)]
183 return send_raw_command(raw_command, response_size)
[154]184
[155]185
[156]186def action_login():
187 """ Login to device """
188 return send_command('login', opt_password)
[155]189
[156]190def action_status():
191 """ Get port status from device """
192 action_login()
193 return send_command('status')
[155]194
[156]195def action_port_on(port):
196 """ Enable port on device """
[158]197 global ports_status
198 ports_status = None
199
[156]200 action_login()
[158]201 return send_command('port_on', port_to_hex(port))
[155]202
[156]203def action_port_off(port):
204 """ Disable port on device """
[158]205 global ports_status
206 ports_status = None
207
[156]208 action_login()
[158]209 return send_command('port_off', port_to_hex(port))
[156]210
211def get_ports_status():
[158]212 global ports_status
[155]213 # Example port 1 is off
214 # d1 28 ff ff fe ff ff
215 # ^^ ^^ ^^
[156]216 # TODO: Implement Ampere monitoring
217 #[01:38:14] Send: 55 07 ff ff 12 34 56 78 5a ff ff d2 00
218 #[01:38:14] Recv: ff ff a9 d1 28 ff ff 00 07 00 11 00 00 00 00 00 00 00 00
219 # 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
220 # 28 00 28 00 00 10 ff
221 #[01:38:14] Comm: Refresh
[155]222 port_array = [False] * 25
[156]223
[157]224 action_login()
[156]225 retval = send_command('status')
[155]226 status_array = bin_reverse((bin_reverse(retval[4]) << 16) ^ (bin_reverse(retval[5]) << 8) ^ bin_reverse(retval[6]),24)
227 for port in range(0,24):
228 if (status_array & (1 << port)) > 0:
229 port_array[port+1] = True
[156]230
231 # Update global state
232 ports_status = port_array
[155]233 return port_array
234
235
[156]236def get_port_status(port):
237 """ Get specific port status """
[158]238 global ports_status
239
240 if ports_status:
[156]241 return ports_status[port]
242 else:
243 return get_ports_status()[port]
[154]244
245
[156]246
[154]247
248
249def runTest():
250 print "TEST: Start will all port Offline"
251 doCommand('allPortsOff')
252
253 print "TEST: All On and Off again, in mass execution"
254 doCommand('allPortsOn',0x01)
255 doCommand('allPortsOff')
256
257 print "TEST: Enable and disable ports one by one"
258 for i in range(1,25):
259 print getPortName(i)
260 doCommand('portOn', getPortHex(i))
261 doCommand('portOff', getPortHex(i))
262
263 print "TEST: Send known error"
264 doCommand('knownError')
265
[158]266def bool_to_str(boolean, raw=False):
267 if not raw:
268 return str(boolean)
269 elif boolean:
270 return "1"
271 else:
272 return "0"
273
[154]274
[156]275def usage(msg="",exitcode=None):
276 print """%s
[154]277Usage %s arguments
278Version: $Id: npm4000.py 750 2009-09-27 14:34:55Z rick $
279
280Arguments:
281 [-h|--help] Reading right know
[156]282 [-d|--debug] Print extra communication output
[158]283 [-r|--raw] Status(es) is bits like output
[156]284 --serialport= Serial Port to connect to [%s]
285 --password= Password to use in hex notation [%s]
286 --addresscode= Internal device number in hex notation [%s]
287 [-s <port>|--status=<port>] Current port(s) configuration
[154]288 [-t <port>|--toggle=<port>] Toggle port(s)
289 [-o <port>|--on=<port>] Turn on port(s)
290 [-f <port>|--off=<port>] Turn off port(s)
[156]291 --allon Turn all ports on using internal safety [TODO: Implement]
292 --alloff Turn all ports off using internal safety [TODO: Implement]
293 --changepassword= Change password [TODO: Implement]
294 --changeaddresscode= Change addresscode [TODO: Implement]
295 --changetimerdelay= Change internal timer delay [TODO: Implement]
296 --delay= Delay used between port operations [TODO: Implement]
297 --baudrate= Bautrate used for communication (19200,9600) [%s]
[154]298
299Note: <port> has different notations:
300 Numeric value of port 1,2,3,4,5,..
301 Actual value of port A1,..,A8,B1,..,B8,C1,..,C8
302 All ports all
[156]303 """ % (msg, sys.argv[0], opt_serial_port, hex(opt_password), hex(opt_address_code), opt_baudrate)
304 if exitcode:
305 sys.exit(exitcode)
[154]306
307def main():
[158]308 global DEBUG, ser, opt_serial_port, opt_password, opt_address_code, opt_baudrate
[154]309 try:
310 opts, args = getopt.getopt(sys.argv[1:],
[158]311 "df:s:t:ro:v",
312 ["debug", "verbose", "serialport=", "password=",
313 "addresscode=","toggle=","off=", "on=", "status=", "buadrate=", "raw="])
[154]314 except getopt.GetoptError, err:
[156]315 usage(str(err),2)
[154]316
317 opt_port = None
318 opt_action = None
[158]319 opt_raw = False
[154]320 for o, a in opts:
[157]321 debug("%s : %s" % (o, a))
322 if o in ["-d", "--debug"]:
[156]323 DEBUG = True
[157]324 elif o in ["-h", "--help"]:
[156]325 usage("",0)
[157]326 elif o in ["--addresscode"]:
[156]327 opt_address_code = int(a,16)
[157]328 elif o in ["--password"]:
[156]329 opt_passwd = int(a,16)
[157]330 elif o in ["--buadrate"]:
[156]331 opt_baudrate = a
[157]332 elif o in ["--serialport"]:
[156]333 opt_serial_port = a
[157]334 elif o in ["-s", "--status"]:
[154]335 opt_action = "status"
336 opt_port = a
[157]337 elif o in ["-t","--toggle"]:
[154]338 opt_action = "toggle"
339 opt_port = a
[157]340 elif o in ["-f","--off"]:
[154]341 opt_action = "off"
342 opt_port = a
[158]343 elif o in ["-r","--raw"]:
344 opt_raw = True
[157]345 elif o in ["-o","--on"]:
[154]346 opt_action = "on"
347 opt_port = a
348 else:
349 assert False, "unhandled option"
350
[157]351 if (opt_port == None):
352 usage("[ERROR] No port defined",2)
353 elif (opt_action == None):
354 usage("[ERROR] No action defined",2)
[154]355
356
[156]357 # Resolve port to proper numbers array
358 ports = []
359 for port in opt_port.split(','):
[157]360 debug("Raw port: %s" % port)
[156]361 if port == "all":
362 ports.extend(range(1,25))
[157]363 elif port[0] in "ABCabc":
364 print hex_to_port(int(port,16))
365 ports.append(hex_to_port(int(port,16)))
[156]366 else:
[157]367 ports.append(int(port))
368 debug("Operating on ports " + str(ports))
[154]369
[157]370 ser = serial.Serial(opt_serial_port, opt_baudrate, timeout=5)
[156]371 debug(serial)
372
[154]373
[156]374 # Status needs real integers, hack
375 for port in ports:
376 if opt_action == "status":
[158]377 print bool_to_str(get_port_status(port),opt_raw),
[156]378 elif opt_action == "toggle":
379 if get_port_status(port):
380 action_port_off(port)
[154]381 else:
[156]382 action_port_on(port)
383 elif opt_action == "on":
384 action_port_on(port)
385 elif opt_action == "off":
386 action_port_off(port)
387 else:
388 assert False, "Option '%s' invalid" % opt_action
[154]389
[157]390 ser.close()
[154]391
392if __name__ == "__main__":
393 main()
Note: See TracBrowser for help on using the repository browser.