source: powerbar/serial-npm4000.py@ 158

Last change on this file since 158 was 158, checked in by Rick van der Zwet, 14 years ago

We got a lift off :-)

  • Property svn:executable set to *
File size: 11.4 KB
Line 
1#!/usr/bin/env python
2# NPM 4000 powerbar managment script, to be used instead of the windows
3# application.
4#
5# TODO: Not all features are ported yet (like amp monitoring)
6# TODO: Error handling not implemented
7#
8# Licence: BSD
9# Version: $Id: npm4000.py 750 2009-09-27 14:34:55Z rick $
10# Rick van der Zwet <info@rickvanderzwet.nl>
11
12import serial
13import sys
14import time
15import random
16import getopt
17
18# Set to true to enable verbose communication aka debugging
19DEBUG = False
20
21# Default for options
22opt_serial_port = '/dev/ttyUSB0'
23opt_password = 0x12345678
24opt_address_code = 0xFFFF
25opt_baudrate = 19200
26
27# Serial connection port status is cached globally to avoid overhead
28ser = None
29port_status_synced = False
30ports_status = None
31
32def make_checksum(command):
33 """ Generate CRC checksum using XOR on all bytes """
34 crc = 0
35 for item in command:
36 crc ^= item
37 return crc
38
39
40
41def debug(msg):
42 """ Print debug statement if DEBUG is set """
43 if DEBUG:
44 print msg
45
46
47
48def hex_to_str(command):
49 """ Human readable representation of command """
50 return " ".join(["%02x" % item for item in command])
51
52
53
54def str_to_hex(s):
55 """ Hexadecimal string representation of 's' """
56 return [ord(x) for x in s]
57
58
59
60def port_to_hex(port_number):
61 """ Convert integer port number to hexadecimal presentation as internal
62 location
63 """
64 if port_number < 1:
65 assert False, "Invalid port port_number (%i)" % port_number
66 if port_number <= 8:
67 port = 0xa0 + port_number
68 elif port_number <= 16:
69 port = 0xb0 + (port_number - 8)
70 elif port_number <= 24:
71 port = 0xc0 + (port_number - 16)
72 else:
73 assert False, "Invalid port port_number (%i)" % port_number
74 debug("%i - %02x" % (port_number, port))
75 return port
76
77
78
79def hex_to_port(port):
80 """ Convert hexadecimal port to human port number """
81 base = port & 0xf0
82 index = port & 0x0f
83 if (base ^ 0xa0) == 0:
84 port_number = index + 0
85 elif (base ^ 0xb0) == 0:
86 port_number = index + 8
87 elif (base ^ 0xc0) == 0:
88 port_number = index + 16
89 else:
90 assert False, "Invalid port (%02x)" % port
91 debug("%02x - %i" % (port, port_number))
92 return port_number
93
94
95
96def send_raw_command(raw_command, response_size=1024):
97 """ Send raw command to serial device and wait for response """
98
99 debug("Going to send: " + hex_to_str(raw_command))
100 send_line = "".join([chr(item) for item in raw_command])
101 ser.write(send_line)
102 recv_line = ser.read(response_size)
103 recv_command = str_to_hex(recv_line)
104 debug("Received: %s (%i)" % (hex_to_str(recv_command), len(recv_line)))
105 return(recv_command)
106
107
108
109def address_to_num(address):
110 """ Convert internal address representation to integer """
111 return (address[0] << 8) ^ address[1]
112
113
114
115def num_to_address(npm_number):
116 """ Convert address number to internal representation """
117 return [npm_number >> 8, npm_number & 0x00ff]
118
119
120
121def bin_reverse(number,width=8):
122 """Little hacking using string logic to binary reverse number"""
123 return int(bin(number)[2:].zfill(width)[::-1],2)
124
125#send_command(ser, login_line, 5)
126# Note: you will need to wait at least timeout * 24 to complete
127#ser.timeout = 13
128#send_command(ser, allon_line, 6)
129#send_command(ser, alloff_line,6)
130
131#send_command(ser, power_on_interval_05_line,6)
132#send_command(ser,port_on_cmd(24),6)
133#send_command(ser,port_off_cmd(24),6)
134#retval = send_command(ser, refresh_line, 42)
135#print get_port_status(retval)
136#while True:
137# port_number = random.randint(1,24)
138# print port_number
139# send_command(ser, login_line, 5)
140# send_command(ser, port_on_cmd(port_number),6)
141# send_command(ser, login_line, 5)
142# send_command(ser, port_off_cmd(port_number),6)
143
144# Login cycle
145#command = action_login + device_id + password
146#send_command(ser, command)
147# Reference implementation lines
148# A = action, B = address, C = password, P = port
149# Mostly of type [A, A, B, B, C, C, C, C] or [A, A, B, B] or [A, A, B, B, P]
150# (command, return_type, timeout)
151line = dict()
152line['login'] = ([0x55, 0x07, 0xff, 0xff, 0x12, 0x34, 0x56, 0x78], 5, 1)
153line['status'] = ([0xd1, 0x03, 0xff, 0xff], 42, 1)
154line['allon'] = ([0xb1, 0x03, 0xff, 0xff], 6, 13)
155line['alloff'] = ([0xc1, 0x03, 0xff, 0xff], 6, 13)
156line['port_on'] = ([0xb2, 0x04, 0xff, 0xff, 0xa1], 6, 1)
157line['port_off'] = ([0xc2, 0x04, 0xff,0xff, 0xa1], 6, 1)
158line['power_on_interval_125'] = ([0xd6, 0x04, 0xff, 0xff, 0xfa, 0x28], 5, 1)
159line['power_on_interval_05'] = ([0xd6, 0x04, 0xff, 0xff, 0x01, 0xd3], 5, 1)
160line['change_address_code'] = ([0x05, 0x00, 0xff, 0xff, 0x00, 0x04], 5, 1)
161line['modify_password'] = ([0xd3, 0x07, 0xff, 0xff, 0x11, 0x11, 0x11, 0x11], 5, 1)
162
163def num_to_hex(number):
164 """ Number to internal hexadecimal representation """
165 if number == None:
166 return []
167 length = len(hex(number)[2:]) + (len(hex(number)[2:]) % 2)
168 return [int(hex(number)[2:].zfill(length)[x:x+2],16) for x in range(0,length,2)]
169
170#print hex_to_str(num_to_hex(opt_password))
171#exit(0)
172
173
174
175def send_command(action, argument=[]):
176 """ Send CRC computed command to serial device and wait for response """
177 (command, response_size, timeout) = line[action]
178 if not isinstance(argument, list):
179 argument = num_to_hex(argument)
180 command = command[0:2] + num_to_hex(opt_address_code) + argument
181 serial.timeout = timeout
182 raw_command = command + [make_checksum(command)]
183 return send_raw_command(raw_command, response_size)
184
185
186def action_login():
187 """ Login to device """
188 return send_command('login', opt_password)
189
190def action_status():
191 """ Get port status from device """
192 action_login()
193 return send_command('status')
194
195def action_port_on(port):
196 """ Enable port on device """
197 global ports_status
198 ports_status = None
199
200 action_login()
201 return send_command('port_on', port_to_hex(port))
202
203def action_port_off(port):
204 """ Disable port on device """
205 global ports_status
206 ports_status = None
207
208 action_login()
209 return send_command('port_off', port_to_hex(port))
210
211def get_ports_status():
212 global ports_status
213 # Example port 1 is off
214 # d1 28 ff ff fe ff ff
215 # ^^ ^^ ^^
216 # TODO: Implement Ampere monitoring
217 #[01:38:14] Send: 55 07 ff ff 12 34 56 78 5a ff ff d2 00
218 #[01:38:14] Recv: ff ff a9 d1 28 ff ff 00 07 00 11 00 00 00 00 00 00 00 00
219 # 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
220 # 28 00 28 00 00 10 ff
221 #[01:38:14] Comm: Refresh
222 port_array = [False] * 25
223
224 action_login()
225 retval = send_command('status')
226 status_array = bin_reverse((bin_reverse(retval[4]) << 16) ^ (bin_reverse(retval[5]) << 8) ^ bin_reverse(retval[6]),24)
227 for port in range(0,24):
228 if (status_array & (1 << port)) > 0:
229 port_array[port+1] = True
230
231 # Update global state
232 ports_status = port_array
233 return port_array
234
235
236def get_port_status(port):
237 """ Get specific port status """
238 global ports_status
239
240 if ports_status:
241 return ports_status[port]
242 else:
243 return get_ports_status()[port]
244
245
246
247
248
249def runTest():
250 print "TEST: Start will all port Offline"
251 doCommand('allPortsOff')
252
253 print "TEST: All On and Off again, in mass execution"
254 doCommand('allPortsOn',0x01)
255 doCommand('allPortsOff')
256
257 print "TEST: Enable and disable ports one by one"
258 for i in range(1,25):
259 print getPortName(i)
260 doCommand('portOn', getPortHex(i))
261 doCommand('portOff', getPortHex(i))
262
263 print "TEST: Send known error"
264 doCommand('knownError')
265
266def bool_to_str(boolean, raw=False):
267 if not raw:
268 return str(boolean)
269 elif boolean:
270 return "1"
271 else:
272 return "0"
273
274
275def usage(msg="",exitcode=None):
276 print """%s
277Usage %s arguments
278Version: $Id: npm4000.py 750 2009-09-27 14:34:55Z rick $
279
280Arguments:
281 [-h|--help] Reading right know
282 [-d|--debug] Print extra communication output
283 [-r|--raw] Status(es) is bits like output
284 --serialport= Serial Port to connect to [%s]
285 --password= Password to use in hex notation [%s]
286 --addresscode= Internal device number in hex notation [%s]
287 [-s <port>|--status=<port>] Current port(s) configuration
288 [-t <port>|--toggle=<port>] Toggle port(s)
289 [-o <port>|--on=<port>] Turn on port(s)
290 [-f <port>|--off=<port>] Turn off port(s)
291 --allon Turn all ports on using internal safety [TODO: Implement]
292 --alloff Turn all ports off using internal safety [TODO: Implement]
293 --changepassword= Change password [TODO: Implement]
294 --changeaddresscode= Change addresscode [TODO: Implement]
295 --changetimerdelay= Change internal timer delay [TODO: Implement]
296 --delay= Delay used between port operations [TODO: Implement]
297 --baudrate= Bautrate used for communication (19200,9600) [%s]
298
299Note: <port> has different notations:
300 Numeric value of port 1,2,3,4,5,..
301 Actual value of port A1,..,A8,B1,..,B8,C1,..,C8
302 All ports all
303 """ % (msg, sys.argv[0], opt_serial_port, hex(opt_password), hex(opt_address_code), opt_baudrate)
304 if exitcode:
305 sys.exit(exitcode)
306
307def main():
308 global DEBUG, ser, opt_serial_port, opt_password, opt_address_code, opt_baudrate
309 try:
310 opts, args = getopt.getopt(sys.argv[1:],
311 "df:s:t:ro:v",
312 ["debug", "verbose", "serialport=", "password=",
313 "addresscode=","toggle=","off=", "on=", "status=", "buadrate=", "raw="])
314 except getopt.GetoptError, err:
315 usage(str(err),2)
316
317 opt_port = None
318 opt_action = None
319 opt_raw = False
320 for o, a in opts:
321 debug("%s : %s" % (o, a))
322 if o in ["-d", "--debug"]:
323 DEBUG = True
324 elif o in ["-h", "--help"]:
325 usage("",0)
326 elif o in ["--addresscode"]:
327 opt_address_code = int(a,16)
328 elif o in ["--password"]:
329 opt_passwd = int(a,16)
330 elif o in ["--buadrate"]:
331 opt_baudrate = a
332 elif o in ["--serialport"]:
333 opt_serial_port = a
334 elif o in ["-s", "--status"]:
335 opt_action = "status"
336 opt_port = a
337 elif o in ["-t","--toggle"]:
338 opt_action = "toggle"
339 opt_port = a
340 elif o in ["-f","--off"]:
341 opt_action = "off"
342 opt_port = a
343 elif o in ["-r","--raw"]:
344 opt_raw = True
345 elif o in ["-o","--on"]:
346 opt_action = "on"
347 opt_port = a
348 else:
349 assert False, "unhandled option"
350
351 if (opt_port == None):
352 usage("[ERROR] No port defined",2)
353 elif (opt_action == None):
354 usage("[ERROR] No action defined",2)
355
356
357 # Resolve port to proper numbers array
358 ports = []
359 for port in opt_port.split(','):
360 debug("Raw port: %s" % port)
361 if port == "all":
362 ports.extend(range(1,25))
363 elif port[0] in "ABCabc":
364 print hex_to_port(int(port,16))
365 ports.append(hex_to_port(int(port,16)))
366 else:
367 ports.append(int(port))
368 debug("Operating on ports " + str(ports))
369
370 ser = serial.Serial(opt_serial_port, opt_baudrate, timeout=5)
371 debug(serial)
372
373
374 # Status needs real integers, hack
375 for port in ports:
376 if opt_action == "status":
377 print bool_to_str(get_port_status(port),opt_raw),
378 elif opt_action == "toggle":
379 if get_port_status(port):
380 action_port_off(port)
381 else:
382 action_port_on(port)
383 elif opt_action == "on":
384 action_port_on(port)
385 elif opt_action == "off":
386 action_port_off(port)
387 else:
388 assert False, "Option '%s' invalid" % opt_action
389
390 ser.close()
391
392if __name__ == "__main__":
393 main()
Note: See TracBrowser for help on using the repository browser.