source: powerbar/serial-npm4000.py@ 157

Last change on this file since 157 was 157, checked in by Rick van der Zwet, 14 years ago

More hacks to test

  • Property svn:executable set to *
File size: 10.9 KB
Line 
1#!/usr/bin/env python
2# NPM 4000 powerbar managment script, to be used instead of the windows
3# application.
4#
5# TODO: Not all features are ported yet (like amp monitoring)
6# TODO: Error handling not implemented
7#
8# Licence: BSD
9# Version: $Id: npm4000.py 750 2009-09-27 14:34:55Z rick $
10# Rick van der Zwet <info@rickvanderzwet.nl>
11
12import serial
13import sys
14import time
15import random
16import getopt
17
18# Set to true to enable verbose communication aka debugging
19DEBUG = False
20DEBUG = True
21
22# Default for options
23opt_serial_port = '/dev/ttyUSB0'
24opt_password = 0x1245678
25opt_address_code = 0xFFFF
26opt_baudrate = 19200
27
28# Serial connection port status is cached globally to avoid overhead
29ser = None
30port_status_synced = False
31ports_status = None
32
33def make_checksum(command):
34 """ Generate CRC checksum using XOR on all bytes """
35 crc = 0
36 for item in command:
37 crc ^= item
38 return crc
39
40
41
42def debug(msg):
43 """ Print debug statement if DEBUG is set """
44 if DEBUG:
45 print msg
46
47
48
49def hex_to_str(command):
50 """ Human readable representation of command """
51 return " ".join(["%02x" % item for item in command])
52
53
54
55def str_to_hex(s):
56 """ Hexadecimal string representation of 's' """
57 return [ord(x) for x in s]
58
59
60
61def port_to_hex(port_number):
62 """ Convert integer port number to hexadecimal presentation as internal
63 location
64 """
65 if port_number < 1:
66 assert False, "Invalid port port_number (%i)" % port_number
67 if port_number <= 8:
68 port = 0xa0 + port_number
69 elif port_number <= 16:
70 port = 0xb0 + (port_number - 8)
71 elif port_number <= 24:
72 port = 0xc0 + (port_number - 16)
73 else:
74 assert False, "Invalid port port_number (%i)" % port_number
75 debug("%i - %02x" % (port_number, port))
76 return port
77
78
79
80def hex_to_port(port):
81 """ Convert hexadecimal port to human port number """
82 base = port & 0xf0
83 index = port & 0x0f
84 if (base ^ 0xa0) == 0:
85 port_number = index + 0
86 elif (base ^ 0xb0) == 0:
87 port_number = index + 8
88 elif (base ^ 0xc0) == 0:
89 port_number = index + 16
90 else:
91 assert False, "Invalid port (%02x)" % port
92 debug("%02x - %i" % (port, port_number))
93 return port_number
94
95
96
97def send_raw_command(raw_command, response_size=1024):
98 """ Send raw command to serial device and wait for response """
99
100 print "Going to send: " + hex_to_str(raw_command)
101 send_line = "".join([chr(item) for item in raw_command])
102 ser.write(send_line)
103 recv_line = ser.read(response_size)
104 recv_command = str_to_hex(recv_line)
105 print "Received: %s (%i)" % (hex_to_str(recv_command), len(recv_line))
106 return(recv_command)
107
108
109
110def address_to_num(address):
111 """ Convert internal address representation to integer """
112 return (address[0] << 8) ^ address[1]
113
114
115
116def num_to_address(npm_number):
117 """ Convert address number to internal representation """
118 return [npm_number >> 8, npm_number & 0x00ff]
119
120
121
122def bin_reverse(number,width=8):
123 """Little hacking using string logic to binary reverse number"""
124 return int(bin(number)[2:].zfill(width)[::-1],2)
125
126#send_command(ser, login_line, 5)
127# Note: you will need to wait at least timeout * 24 to complete
128#ser.timeout = 13
129#send_command(ser, allon_line, 6)
130#send_command(ser, alloff_line,6)
131
132#send_command(ser, power_on_interval_05_line,6)
133#send_command(ser,port_on_cmd(24),6)
134#send_command(ser,port_off_cmd(24),6)
135#retval = send_command(ser, refresh_line, 42)
136#print get_port_status(retval)
137#while True:
138# port_number = random.randint(1,24)
139# print port_number
140# send_command(ser, login_line, 5)
141# send_command(ser, port_on_cmd(port_number),6)
142# send_command(ser, login_line, 5)
143# send_command(ser, port_off_cmd(port_number),6)
144
145# Login cycle
146#command = action_login + device_id + password
147#send_command(ser, command)
148# Reference implementation lines
149# A = action, B = address, C = password, P = port
150# Mostly of type [A, A, B, B, C, C, C, C] or [A, A, B, B] or [A, A, B, B, P]
151# (command, return_type, timeout)
152line = dict()
153line['login'] = ([0x55, 0x07, 0xff, 0xff, 0x12, 0x34, 0x56, 0x78], 5, 1)
154line['status'] = ([0xd1, 0x03, 0xff, 0xff], 42, 1)
155line['allon'] = ([0xb1, 0x03, 0xff, 0xff], 6, 13)
156line['alloff'] = ([0xc1, 0x03, 0xff, 0xff], 6, 13)
157line['port_on'] = ([0xb2, 0x04, 0xff, 0xff, 0xa1], 6, 1)
158line['port_off'] = ([0xc2, 0x04, 0xff,0xff, 0xa1], 6, 1)
159line['power_on_interval_125'] = ([0xd6, 0x04, 0xff, 0xff, 0xfa, 0x28], 5, 1)
160line['power_on_interval_05'] = ([0xd6, 0x04, 0xff, 0xff, 0x01, 0xd3], 5, 1)
161line['change_address_code'] = ([0x05, 0x00, 0xff, 0xff, 0x00, 0x04], 5, 1)
162line['modify_password'] = ([0xd3, 0x07, 0xff, 0xff, 0x11, 0x11, 0x11, 0x11], 5, 1)
163
164def num_to_hex(number):
165 """ Number to internal hexadecimal representation """
166 if number == None:
167 return []
168 length = len(hex(number)[2:]) + (len(hex(number)[2:]) % 2)
169 return [int(hex(number)[2:].zfill(length)[x:x+2],16) for x in range(0,length,2)]
170
171
172
173def send_command(action, argument=None):
174 """ Send CRC computed command to serial device and wait for response """
175 (command, response_size, timeout) = line[action]
176 command = command[0:2] + num_to_hex(opt_address_code) + num_to_hex(argument)
177 serial.timeout = timeout
178 raw_command = command + [make_checksum(command)]
179 return send_raw_command(raw_command, response_size)
180
181
182def action_login():
183 """ Login to device """
184 return send_command('login', opt_password)
185
186def action_status():
187 """ Get port status from device """
188 action_login()
189 return send_command('status')
190
191def action_port_on(port):
192 """ Enable port on device """
193 action_login()
194 port_status_synced = False
195 return send_command('port_on', port)
196
197def action_port_off(port):
198 """ Disable port on device """
199 action_login()
200 port_status_synced = False
201 return send_command('port_off', port)
202
203def get_ports_status():
204 # Example port 1 is off
205 # d1 28 ff ff fe ff ff
206 # ^^ ^^ ^^
207 # TODO: Implement Ampere monitoring
208 #[01:38:14] Send: 55 07 ff ff 12 34 56 78 5a ff ff d2 00
209 #[01:38:14] Recv: ff ff a9 d1 28 ff ff 00 07 00 11 00 00 00 00 00 00 00 00
210 # 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
211 # 28 00 28 00 00 10 ff
212 #[01:38:14] Comm: Refresh
213 port_array = [False] * 25
214
215 action_login()
216 retval = send_command('status')
217 status_array = bin_reverse((bin_reverse(retval[4]) << 16) ^ (bin_reverse(retval[5]) << 8) ^ bin_reverse(retval[6]),24)
218 for port in range(0,24):
219 if (status_array & (1 << port)) > 0:
220 port_array[port+1] = True
221
222 # Update global state
223 ports_status = port_array
224 port_status_synced = True
225 return port_array
226
227
228def get_port_status(port):
229 """ Get specific port status """
230 if port_status_synced:
231 return ports_status[port]
232 else:
233 return get_ports_status()[port]
234
235
236
237
238
239def runTest():
240 print "TEST: Start will all port Offline"
241 doCommand('allPortsOff')
242
243 print "TEST: All On and Off again, in mass execution"
244 doCommand('allPortsOn',0x01)
245 doCommand('allPortsOff')
246
247 print "TEST: Enable and disable ports one by one"
248 for i in range(1,25):
249 print getPortName(i)
250 doCommand('portOn', getPortHex(i))
251 doCommand('portOff', getPortHex(i))
252
253 print "TEST: Send known error"
254 doCommand('knownError')
255
256
257
258def usage(msg="",exitcode=None):
259 print """%s
260Usage %s arguments
261Version: $Id: npm4000.py 750 2009-09-27 14:34:55Z rick $
262
263Arguments:
264 [-h|--help] Reading right know
265 [-d|--debug] Print extra communication output
266 --serialport= Serial Port to connect to [%s]
267 --password= Password to use in hex notation [%s]
268 --addresscode= Internal device number in hex notation [%s]
269 [-s <port>|--status=<port>] Current port(s) configuration
270 [-t <port>|--toggle=<port>] Toggle port(s)
271 [-o <port>|--on=<port>] Turn on port(s)
272 [-f <port>|--off=<port>] Turn off port(s)
273 --allon Turn all ports on using internal safety [TODO: Implement]
274 --alloff Turn all ports off using internal safety [TODO: Implement]
275 --changepassword= Change password [TODO: Implement]
276 --changeaddresscode= Change addresscode [TODO: Implement]
277 --changetimerdelay= Change internal timer delay [TODO: Implement]
278 --delay= Delay used between port operations [TODO: Implement]
279 --baudrate= Bautrate used for communication (19200,9600) [%s]
280
281Note: <port> has different notations:
282 Numeric value of port 1,2,3,4,5,..
283 Actual value of port A1,..,A8,B1,..,B8,C1,..,C8
284 All ports all
285 """ % (msg, sys.argv[0], opt_serial_port, hex(opt_password), hex(opt_address_code), opt_baudrate)
286 if exitcode:
287 sys.exit(exitcode)
288
289def main():
290 global ser, opt_serial_port, opt_password, opt_address_code, opt_baudrate
291 try:
292 opts, args = getopt.getopt(sys.argv[1:],
293 "df:s:t:o:v", ["debug","verbose", "serialport=", "password=", "addresscode=","toggle=","off=", "on=", "status=", "buadrate="])
294 except getopt.GetoptError, err:
295 usage(str(err),2)
296
297 opt_port = None
298 opt_action = None
299 for o, a in opts:
300 debug("%s : %s" % (o, a))
301 if o in ["-d", "--debug"]:
302 DEBUG = True
303 elif o in ["-h", "--help"]:
304 usage("",0)
305 elif o in ["--addresscode"]:
306 opt_address_code = int(a,16)
307 elif o in ["--password"]:
308 opt_passwd = int(a,16)
309 elif o in ["--buadrate"]:
310 opt_baudrate = a
311 elif o in ["--serialport"]:
312 opt_serial_port = a
313 elif o in ["-s", "--status"]:
314 opt_action = "status"
315 opt_port = a
316 elif o in ["-t","--toggle"]:
317 opt_action = "toggle"
318 opt_port = a
319 elif o in ["-f","--off"]:
320 opt_action = "off"
321 opt_port = a
322 elif o in ["-o","--on"]:
323 opt_action = "on"
324 opt_port = a
325 else:
326 assert False, "unhandled option"
327
328 if (opt_port == None):
329 usage("[ERROR] No port defined",2)
330 elif (opt_action == None):
331 usage("[ERROR] No action defined",2)
332
333
334 # Resolve port to proper numbers array
335 ports = []
336 for port in opt_port.split(','):
337 debug("Raw port: %s" % port)
338 if port == "all":
339 ports.extend(range(1,25))
340 elif port[0] in "ABCabc":
341 print hex_to_port(int(port,16))
342 ports.append(hex_to_port(int(port,16)))
343 else:
344 ports.append(int(port))
345 debug("Operating on ports " + str(ports))
346
347 ser = serial.Serial(opt_serial_port, opt_baudrate, timeout=5)
348 debug(serial)
349
350
351 # Status needs real integers, hack
352 for port in ports:
353 if opt_action == "status":
354 get_port_status(port)
355 elif opt_action == "toggle":
356 if get_port_status(port):
357 action_port_off(port)
358 else:
359 action_port_on(port)
360 elif opt_action == "on":
361 action_port_on(port)
362 elif opt_action == "off":
363 action_port_off(port)
364 else:
365 assert False, "Option '%s' invalid" % opt_action
366
367 ser.close()
368
369if __name__ == "__main__":
370 main()
Note: See TracBrowser for help on using the repository browser.