source: powerbar/serial-npm4000.py@ 154

Last change on this file since 154 was 154, checked in by Rick van der Zwet, 14 years ago

Initial working version of serial NPM4000 driver

  • Property svn:executable set to *
File size: 11.3 KB
RevLine 
[154]1#!/usr/bin/env python
2# NPM 4000 powerbar managment script, to be used instead of the windows
3# application.
4#
5# XXX: Not all features are ported yet (like amp monitoring)
6# XXX: Some dirty poking around with hex representations and numeric representations of ports
7# c2 04 ff ff a1 67XXX: Make proper classes for use
8# XXX: Documentation
9#
10# Licence: BSD
11# Version: $Id: npm4000.py 750 2009-09-27 14:34:55Z rick $
12# Rick van der Zwet <info@rickvanderzwet.nl>
13
14import serial
15import sys
16import time
17import random
18
19DEBUG = False
20
21def make_checksum(command):
22 crc = 0
23 for item in command:
24 crc ^= item
25 return crc
26
27def debug(msg):
28 if DEBUG:
29 print msg
30
31def hex_to_str(command):
32 return " ".join(["%02x" % item for item in command])
33
34def str_to_hex(s):
35 """Hexadecimal string representation of 's'"""
36 return [ord(x) for x in s]
37
38def port_to_hex(port_number):
39 if port_number <= 8:
40 port = 0xa0 + port_number
41 elif port_number <= 16:
42 port = 0xb0 + (port_number - 8)
43 elif port_number <= 24:
44 port = 0xc0 + (port_number - 16)
45 else:
46 assert False, "Invalid port port_number (%i)" % port_number
47 debug("%i - %02x" % (port_number, port))
48 return port
49
50
51def hex_to_port(port):
52 base = port & 0xf0
53 index = port & 0x0f
54 if (base ^ 0xa0) == 0:
55 port_number = index + 0
56 elif (base ^ 0xb0) == 0:
57 port_number = index + 8
58 elif (base ^ 0xc0) == 0:
59 port_number = index + 16
60 else:
61 assert False, "Invalid port (%02x)" % port
62 debug("%02x - %i" % (port, port_number))
63 return port_number
64
65
66def send_raw_command(ser, raw_command, response_size=1024):
67 """ Send command to serial device and wait for response """
68
69 print "Going to send: " + hex_to_str(raw_command)
70 send_line = "".join([chr(item) for item in raw_command])
71 f = open('input.bin','w')
72 f.write(send_line)
73 f.close()
74
75 ser.write(send_line)
76 recv_line = ser.read(response_size)
77 recv_command = str_to_hex(recv_line)
78 print "Received: %s (%i)" % (hex_to_str(recv_command), len(recv_line))
79
80 #return(recv_command)
81
82def send_command(ser, command, response_size=1024):
83 raw_command = command + [make_checksum(command)]
84 return send_raw_command(ser, raw_command, response_size)
85
86
87
88#print "%02X" % make_checksum([0xAA, 0x03, 0xFF, 0xFF])
89
90
91#[01:38:14] Send: 55 07 ff ff 12 34 56 78 5a ff ff d2 00
92#[01:38:14] Recv: ff ff a9 d1 28 ff ff 00 07 00 11 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 28 00 28 00 00 10 ff
93#[01:38:14] Comm: Refresh
94
95#55 07 ff ff 12 34 56 78 5a
96action_login = [0x55, 0x07]
97device_id = [0xff, 0xff]
98password = [0x12, 0x34, 0x56, 0x78]
99
100ser = serial.Serial('/dev/ttyUSB0', 19200, timeout=5)
101print ser
102
103# Login cycle
104#command = action_login + device_id + password
105#send_command(ser, command)
106
107# Refresh cycle
108#action_refresh = [0xd2, 0x00]
109#raw_command = command + [0x5a] + device_id + action_refresh
110#send_raw_command(ser, raw_command)
111# 55 07 ff ff 12 34 56 78 5a c2 04 ff ff a1 67
112#send_raw_command(ser, [0x55,0x07,0xff,0xff,0x12,0x34,0x56,0x78,0x5a])
113# 55 07 ff ff 12 34 56 78 5a d1 03 ff ff d2 00
114#send_raw_command(ser, [0x55, 0x07, 0xff, 0xff, 0x12, 0x34, 0x56, 0x78, 0x5a, 0xd1, 0x03, 0xff, 0xff, 0xd2, 0x00])
115
116# 55 07 FF FF 12 34 56 78 5A D1 03 FF FF D2 00
117login_line = [0x55, 0x07, 0xff, 0xff, 0x12, 0x34, 0x56, 0x78]
118refresh_line = [0xd1, 0x03, 0xff, 0xff, 0xd2]
119allon_line = [0xb1, 0x03, 0xff, 0xff]
120alloff_line = [0xc1, 0x03, 0xff, 0xff]
121port_on_line = [0xb2, 0x04, 0xff, 0xff, 0xa1]
122port_off_line = [0xc2, 0x04, 0xff,0xff, 0xa1]
123power_on_interval_125_line = [0xd6, 0x04, 0xff, 0xff, 0xfa, 0x28]
124power_on_interval_05_line = [0xd6, 0x04, 0xff, 0xff, 0x01, 0xd3]
125
126def port_on_cmd(port_number):
127 line = port_on_line
128 line[4] = port_to_hex(port_number)
129 return line
130
131def port_off_cmd(port_number):
132 line = port_off_line
133 line[4] = port_to_hex(port_number)
134 return line
135
136# Note: you will need to wait at least timeout * 24 to complete
137#ser.timeout = 13
138#send_raw_command(ser, allon_line, 6)
139#send_raw_command(ser, alloff_line,6)
140#send_raw_command(ser, refresh_line, 42)
141#send_raw_command(ser, power_on_interval_05_line,6)
142#send_raw_command(ser,one_off_line,6)
143while True:
144 port_number = random.randint(1,24)
145 print port_number
146 send_command(ser, login_line, 5)
147 send_command(ser, port_on_cmd(port_number),6)
148 send_command(ser, login_line, 5)
149 send_command(ser, port_off_cmd(port_number),6)
150
151
152ser.close()
153
154
155
156
157sys.exit(0)
158
159# XOR all the 8bit values to get a checksum
160def checksum(s):
161 crc = 0
162 for p in range(0, len(s),2):
163 crc ^= int(s[p:p+2], 16)
164 return "%02X" % crc
165
166passwd = 0x12345678
167addr_code = 0xFFFF
168
169verbose = False
170
171line2code = { 'login' : (0x5507, 1),
172 'portOn' : (0xB204, 2),
173 'portOff' : (0xC204, 2),
174 'allPortsOff' : (0xC103, 6),
175 'allPortsOn' : (0xB104, 15),
176 'status' : (0xD103, 2),
177 'knownError' : (0xFFFF, 1),
178 }
179
180# Socket used troughout the code
181s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
182
183def getCode(type):
184 return(line2code[type][0])
185
186def getTimeout(type):
187 return(line2code[type][1])
188
189def dprint(msg):
190 if verbose:
191 print time.strftime('[%Y-%m-%d %H:%M:%S]'), msg
192
193def getPortNumber(name):
194 port_char = name[0]
195 port_number = int(name[1])
196 number = -1
197 if port_char == 'A':
198 number = port_number
199 elif port_char == 'B':
200 number = port_number + 8
201 elif port_char == 'C':
202 number = port_number + 16
203 else:
204 raise ValueError, "Port %s not defined" % name
205
206 return(number)
207
208def getPortName(number):
209 name = "U3"
210 if number <= 8:
211 name = "A%i" % number
212 elif number <= 16:
213 name = "B%i" % (number - 8)
214 elif number <= 24:
215 name = "C%i" % (number - 16)
216 else:
217 raise ValueError, "Port %i not defined" % number
218 return(name)
219
220def getPortHex(number):
221 return(int(getPortName(number),16))
222
223def generateCommand(type,arg=None):
224 if arg == None:
225 command = "%04X%04X" % (getCode(type), addr_code)
226 else:
227 command = "%04X%04X%02X" % (getCode(type), addr_code, arg)
228 command += checksum(command)
229 return (command)
230
231
232def doCommand(type, arg=None):
233 sendCommand('login', passwd)
234 retval = sendCommand(type, arg)
235 #XXX: Check if command returned succesfull D128FF
236 return(retval)
237
238
239def getPortState(port, raw_status):
240 """Port configuration is put into 24 bits, every port has one bit
241 representing the statewith a awkward order:
242 A8,A7,A6,A5,A4,A3,A2,A1,B8,..,B1,C8,..,C1"""
243 # Portion of retval we are interested in
244 ports_state = int(raw_status[8:14],16)
245 port_bit_location = -1
246 if port <= 8:
247 port_bit_location = (1 << 15 << port)
248 elif port <= 16:
249 port_bit_location = (1 << 7 << (port - 8))
250 elif port <= 24:
251 port_bit_location = (1 << (port - 16 - 1))
252
253 if port_bit_location & ports_state:
254 return True
255 else:
256 return False
257
258def getPortStatus(i):
259 raw_status = doCommand('status')
260 print "Port %02i [%s]:" % (i, getPortName(i)),
261 if getPortState(i,raw_status):
262 print "1"
263 else:
264 print "0"
265 return(retval)
266
267 raw_status= doCommand('status')
268 for i in range(1,25):
269 print "Port %02i [%s]:" % (i, getPortName(i)),
270 if getPortState(i,raw_status):
271 print "1"
272 else:
273 print "0"
274
275def togglePort(port):
276 if getPortState(port):
277 doCommand('portOff',port)
278 else:
279 doCommand('portOn', port)
280
281
282def runTest():
283 print "TEST: Start will all port Offline"
284 doCommand('allPortsOff')
285
286 print "TEST: All On and Off again, in mass execution"
287 doCommand('allPortsOn',0x01)
288 doCommand('allPortsOff')
289
290 print "TEST: Enable and disable ports one by one"
291 for i in range(1,25):
292 print getPortName(i)
293 doCommand('portOn', getPortHex(i))
294 doCommand('portOff', getPortHex(i))
295
296 print "TEST: Send known error"
297 doCommand('knownError')
298
299
300def usage():
301 print """
302Usage %s arguments
303Version: $Id: npm4000.py 750 2009-09-27 14:34:55Z rick $
304
305Arguments:
306 [-h|--help] Reading right know
307 [-v|--verbose] Print extra communication output
308 --host= IP adress of FQDN hostname [%s]
309 --port= Port to connect to [%s]
310 --password= Password to use in hex notation [0x1234568]
311 --addresscode= Internal device number in hex notation [0xFFFF]
312 [-s|--status] Current port configuration
313 [-t <port>|--toggle=<port>] Toggle port(s)
314 [-o <port>|--on=<port>] Turn on port(s)
315 [-f <port>|--off=<port>] Turn off port(s)
316
317Note: <port> has different notations:
318 Numeric value of port 1,2,3,4,5,..
319 Actual value of port A1,..,A8,B1,..,B8,C1,..,C8
320 All ports all
321 """ % (sys.argv[0], inet_addr, inet_port)
322
323def main():
324 global verbose, addr_code, inet_addr, inet_port, passwd, s
325 try:
326 opts, args = getopt.getopt(sys.argv[1:],
327 "hf:s:t:o:v", ["help","verbose","host=", "port=", "password=", "addresscode=","toggle=","off=", "on=", "status="])
328 except getopt.GetoptError, err:
329 # print help information and exit:
330 print str(err) # will print something like "option -a not recognized"
331 usage()
332 sys.exit(2)
333
334 opt_port = None
335 opt_action = None
336 for o, a in opts:
337 if o in ("-v", "--verbose"):
338 verbose = True
339 elif o in ("-h", "--help"):
340 usage()
341 sys.exit()
342 elif o in ("--addresscode"):
343 addr_code = int(a,16)
344 elif o in ("--host"):
345 inet_addr = a
346 elif o in ("--password"):
347 passwd = int(a,16)
348 elif o in ("--port"):
349 inet_port = a
350 elif o in ("-s", "--status"):
351 opt_action = "status"
352 opt_port = a
353 elif o in ("-t","--toggle"):
354 opt_action = "toggle"
355 opt_port = a
356 elif o in ("-f","--off"):
357 opt_action = "off"
358 opt_port = a
359 elif o in ("-o","--on"):
360 opt_action = "on"
361 opt_port = a
362 else:
363 assert False, "unhandled option"
364
365 if (opt_port == None or opt_action == None):
366 usage()
367 sys.exit(2)
368
369 dprint ('action: ' + opt_action + ' port: ' + opt_port)
370
371 # Status needs real integers, hack
372 if opt_action == "status":
373 if opt_port == "all":
374 getStatusAll()
375 else:
376 print "XXX: Implement"
377 sys.exit(0)
378
379 # Resolve port to proper number
380 if opt_port == "all":
381 None # Blank resolution, as it is done elsewhere
382 elif opt_port[0] in ("A","B","C"):
383 opt_port = int(opt_port,16)
384 dprint('Hexcode of port: %i' % opt_port)
385 else:
386 # Dirty hack to have conversion and checking at the same time
387 opt_port = getPortHex(int(opt_port))
388 dprint('Hexcode of port: %i' % opt_port)
389
390 if opt_action == "toggle":
391 if opt_port == "all":
392 for i in range(1,25):
393 togglePort(getPortHex(i))
394 else:
395 togglePort(opt_port)
396 elif opt_action == "on":
397 if opt_port == "all":
398 doCommand("allPortsOn",0x01)
399 else:
400 doCommand("portOn", opt_port)
401 elif opt_action == "off":
402 if opt_port == "all":
403 doCommand("allPortsOff");
404 else:
405 doCommand("portOn", opt_port)
406
407
408if __name__ == "__main__":
409 main()
Note: See TracBrowser for help on using the repository browser.