source: powerbar/serial-npm4000.py@ 156

Last change on this file since 156 was 156, checked in by Rick van der Zwet, 14 years ago

Work in progress, lets make it look pretty

  • Property svn:executable set to *
File size: 10.7 KB
Line 
1#!/usr/bin/env python
2# NPM 4000 powerbar managment script, to be used instead of the windows
3# application.
4#
5# TODO: Not all features are ported yet (like amp monitoring)
6# TODO: Error handling not implemented
7#
8# Licence: BSD
9# Version: $Id: npm4000.py 750 2009-09-27 14:34:55Z rick $
10# Rick van der Zwet <info@rickvanderzwet.nl>
11
12import serial
13import sys
14import time
15import random
16import getopt
17
18# Set to true to enable verbose communication aka debugging
19DEBUG = False
20
21# Default for options
22opt_serial_port = '/dev/ttyUSB0'
23opt_password = 0x1245678
24opt_address_code = 0xFFFF
25opt_baudrate = 19200
26
27# Serial connection port status is cached globally to avoid overhead
28serial = None
29port_status_synced = False
30ports_status = None
31
32def make_checksum(command):
33 """ Generate CRC checksum using XOR on all bytes """
34 crc = 0
35 for item in command:
36 crc ^= item
37 return crc
38
39
40
41def debug(msg):
42 """ Print debug statement if DEBUG is set """
43 if DEBUG:
44 print msg
45
46
47
48def hex_to_str(command):
49 """ Human readable representation of command """
50 return " ".join(["%02x" % item for item in command])
51
52
53
54def str_to_hex(s):
55 """ Hexadecimal string representation of 's' """
56 return [ord(x) for x in s]
57
58
59
60def port_to_hex(port_number):
61 """ Convert integer port number to hexadecimal presentation as internal
62 location
63 """
64 if port_number < 1:
65 assert False, "Invalid port port_number (%i)" % port_number
66 if port_number <= 8:
67 port = 0xa0 + port_number
68 elif port_number <= 16:
69 port = 0xb0 + (port_number - 8)
70 elif port_number <= 24:
71 port = 0xc0 + (port_number - 16)
72 else:
73 assert False, "Invalid port port_number (%i)" % port_number
74 debug("%i - %02x" % (port_number, port))
75 return port
76
77
78
79def hex_to_port(port):
80 """ Convert hexadecimal port to human port number """
81 base = port & 0xf0
82 index = port & 0x0f
83 if (base ^ 0xa0) == 0:
84 port_number = index + 0
85 elif (base ^ 0xb0) == 0:
86 port_number = index + 8
87 elif (base ^ 0xc0) == 0:
88 port_number = index + 16
89 else:
90 assert False, "Invalid port (%02x)" % port
91 debug("%02x - %i" % (port, port_number))
92 return port_number
93
94
95
96def send_raw_command(raw_command, response_size=1024):
97 """ Send raw command to serial device and wait for response """
98
99 print "Going to send: " + hex_to_str(raw_command)
100 send_line = "".join([chr(item) for item in raw_command])
101 serial.write(send_line)
102 recv_line = serial.read(response_size)
103 recv_command = str_to_hex(recv_line)
104 print "Received: %s (%i)" % (hex_to_str(recv_command), len(recv_line))
105 return(recv_command)
106
107
108
109def address_to_num(address):
110 """ Convert internal address representation to integer """
111 return (address[0] << 8) ^ address[1]
112
113
114
115def num_to_address(npm_number):
116 """ Convert address number to internal representation """
117 return [npm_number >> 8, npm_number & 0x00ff]
118
119
120
121def bin_reverse(number,width=8):
122 """Little hacking using string logic to binary reverse number"""
123 return int(bin(number)[2:].zfill(width)[::-1],2)
124
125#send_command(ser, login_line, 5)
126# Note: you will need to wait at least timeout * 24 to complete
127#ser.timeout = 13
128#send_command(ser, allon_line, 6)
129#send_command(ser, alloff_line,6)
130
131#send_command(ser, power_on_interval_05_line,6)
132#send_command(ser,port_on_cmd(24),6)
133#send_command(ser,port_off_cmd(24),6)
134#retval = send_command(ser, refresh_line, 42)
135#print get_port_status(retval)
136#while True:
137# port_number = random.randint(1,24)
138# print port_number
139# send_command(ser, login_line, 5)
140# send_command(ser, port_on_cmd(port_number),6)
141# send_command(ser, login_line, 5)
142# send_command(ser, port_off_cmd(port_number),6)
143
144# Login cycle
145#command = action_login + device_id + password
146#send_command(ser, command)
147# Reference implementation lines
148# A = action, B = address, C = password, P = port
149# Mostly of type [A, A, B, B, C, C, C, C] or [A, A, B, B] or [A, A, B, B, P]
150# (command, return_type, timeout)
151line = dict()
152line['login'] = ([0x55, 0x07, 0xff, 0xff, 0x12, 0x34, 0x56, 0x78], 5, 1)
153line['status'] = ([0xd1, 0x03, 0xff, 0xff], 42, 1)
154line['allon'] = ([0xb1, 0x03, 0xff, 0xff], 6, 13)
155line['alloff'] = ([0xc1, 0x03, 0xff, 0xff], 6, 13)
156line['port_on'] = ([0xb2, 0x04, 0xff, 0xff, 0xa1], 6, 1)
157line['port_off'] = ([0xc2, 0x04, 0xff,0xff, 0xa1], 6, 1)
158line['power_on_interval_125'] = ([0xd6, 0x04, 0xff, 0xff, 0xfa, 0x28], 5, 1)
159line['power_on_interval_05'] = ([0xd6, 0x04, 0xff, 0xff, 0x01, 0xd3], 5, 1)
160line['change_address_code'] = ([0x05, 0x00, 0xff, 0xff, 0x00, 0x04], 5, 1)
161line['modify_password'] = ([0xd3, 0x07, 0xff, 0xff, 0x11, 0x11, 0x11, 0x11], 5, 1)
162
163def num_to_hex(number):
164 """ Number to internal hexadecimal representation """
165 if number == None:
166 return []
167 length = len(hex(number)[2:]) + (len(hex(number)[2:]) % 2)
168 return [hex(number)[2:].zfill(length)[x:x+2] for x in range(0,length,2)]
169
170
171
172def send_command(action, argument=None):
173 """ Send CRC computed command to serial device and wait for response """
174 (command, response_size, timeout) = line[action]
175 command = command[0:1] + num_to_hex(opt_address_code) + num_to_hex(argument)
176 serial.timeout = timeout
177 raw_command = command + [make_checksum(command)]
178 return send_raw_command(raw_command, response_size)
179
180
181def action_login():
182 """ Login to device """
183 return send_command('login', opt_password)
184
185def action_status():
186 """ Get port status from device """
187 action_login()
188 return send_command('status')
189
190def action_port_on(port):
191 """ Enable port on device """
192 action_login()
193 port_status_synced = False
194 return send_command('port_on', port)
195
196def action_port_off(port):
197 """ Disable port on device """
198 action_login()
199 port_status_synced = False
200 return send_command('port_off', port)
201
202
203def get_ports_status():
204 # Example port 1 is off
205 # d1 28 ff ff fe ff ff
206 # ^^ ^^ ^^
207 # TODO: Implement Ampere monitoring
208 #[01:38:14] Send: 55 07 ff ff 12 34 56 78 5a ff ff d2 00
209 #[01:38:14] Recv: ff ff a9 d1 28 ff ff 00 07 00 11 00 00 00 00 00 00 00 00
210 # 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
211 # 28 00 28 00 00 10 ff
212 #[01:38:14] Comm: Refresh
213 port_array = [False] * 25
214
215 retval = send_command('status')
216 status_array = bin_reverse((bin_reverse(retval[4]) << 16) ^ (bin_reverse(retval[5]) << 8) ^ bin_reverse(retval[6]),24)
217 for port in range(0,24):
218 if (status_array & (1 << port)) > 0:
219 port_array[port+1] = True
220
221 # Update global state
222 ports_status = port_array
223 port_status_synced = True
224 return port_array
225
226
227def get_port_status(port):
228 """ Get specific port status """
229 if port_status_synced:
230 return ports_status[port]
231 else:
232 return get_ports_status()[port]
233
234
235
236
237
238def runTest():
239 print "TEST: Start will all port Offline"
240 doCommand('allPortsOff')
241
242 print "TEST: All On and Off again, in mass execution"
243 doCommand('allPortsOn',0x01)
244 doCommand('allPortsOff')
245
246 print "TEST: Enable and disable ports one by one"
247 for i in range(1,25):
248 print getPortName(i)
249 doCommand('portOn', getPortHex(i))
250 doCommand('portOff', getPortHex(i))
251
252 print "TEST: Send known error"
253 doCommand('knownError')
254
255
256
257def usage(msg="",exitcode=None):
258 print """%s
259Usage %s arguments
260Version: $Id: npm4000.py 750 2009-09-27 14:34:55Z rick $
261
262Arguments:
263 [-h|--help] Reading right know
264 [-d|--debug] Print extra communication output
265 --serialport= Serial Port to connect to [%s]
266 --password= Password to use in hex notation [%s]
267 --addresscode= Internal device number in hex notation [%s]
268 [-s <port>|--status=<port>] Current port(s) configuration
269 [-t <port>|--toggle=<port>] Toggle port(s)
270 [-o <port>|--on=<port>] Turn on port(s)
271 [-f <port>|--off=<port>] Turn off port(s)
272 --allon Turn all ports on using internal safety [TODO: Implement]
273 --alloff Turn all ports off using internal safety [TODO: Implement]
274 --changepassword= Change password [TODO: Implement]
275 --changeaddresscode= Change addresscode [TODO: Implement]
276 --changetimerdelay= Change internal timer delay [TODO: Implement]
277 --delay= Delay used between port operations [TODO: Implement]
278 --baudrate= Bautrate used for communication (19200,9600) [%s]
279
280Note: <port> has different notations:
281 Numeric value of port 1,2,3,4,5,..
282 Actual value of port A1,..,A8,B1,..,B8,C1,..,C8
283 All ports all
284 """ % (msg, sys.argv[0], opt_serial_port, hex(opt_password), hex(opt_address_code), opt_baudrate)
285 if exitcode:
286 sys.exit(exitcode)
287
288def main():
289 try:
290 opts, args = getopt.getopt(sys.argv[1:],
291 "df:s:t:o:v", ["debug","verbose", "serialport=", "password=", "addresscode=","toggle=","off=", "on=", "status=", "buadrate="])
292 except getopt.GetoptError, err:
293 usage(str(err),2)
294
295 opt_port = None
296 opt_action = None
297 for o, a in opts:
298 if o in ("-d", "--debug"):
299 DEBUG = True
300 elif o in ("-h", "--help"):
301 usage("",0)
302 elif o in ("--addresscode"):
303 opt_address_code = int(a,16)
304 elif o in ("--password"):
305 opt_passwd = int(a,16)
306 elif o in ("--buadrate"):
307 opt_baudrate = a
308 elif o in ("--serialport"):
309 opt_serial_port = a
310 elif o in ("-s", "--status"):
311 opt_action = "status"
312 opt_port = a
313 elif o in ("-t","--toggle"):
314 opt_action = "toggle"
315 opt_port = a
316 elif o in ("-f","--off"):
317 opt_action = "off"
318 opt_port = a
319 elif o in ("-o","--on"):
320 opt_action = "on"
321 opt_port = a
322 else:
323 assert False, "unhandled option"
324
325 if (opt_port == None or opt_action == None):
326 usage("[ERROR] No port or action defined",2)
327
328
329 # Resolve port to proper numbers array
330 ports = []
331 for port in opt_port.split(','):
332 if port == "all":
333 ports.extend(range(1,25))
334 elif port[0] in ("A","B","C"):
335 ports.extend(int(port,16))
336 else:
337 ports.extend(int(port))
338 debug("Operating on ports " + ",".join(port))
339
340 serial = serial.Serial(opt_serial_port, opt_baudrate, timeout=5)
341 debug(serial)
342
343
344 # Status needs real integers, hack
345 for port in ports:
346 if opt_action == "status":
347 action_status(port)
348 elif opt_action == "toggle":
349 if get_port_status(port):
350 action_port_off(port)
351 else:
352 action_port_on(port)
353 elif opt_action == "on":
354 action_port_on(port)
355 elif opt_action == "off":
356 action_port_off(port)
357 else:
358 assert False, "Option '%s' invalid" % opt_action
359
360 serial.close()
361
362if __name__ == "__main__":
363 main()
Note: See TracBrowser for help on using the repository browser.