source: powerbar/serial-npm4000.py@ 155

Last change on this file since 155 was 155, checked in by Rick van der Zwet, 14 years ago

Status notifications from ports

  • Property svn:executable set to *
File size: 10.8 KB
Line 
1#!/usr/bin/env python
2# NPM 4000 powerbar managment script, to be used instead of the windows
3# application.
4#
5# XXX: Not all features are ported yet (like amp monitoring)
6# XXX: Some dirty poking around with hex representations and numeric representations of ports
7# c2 04 ff ff a1 67XXX: Make proper classes for use
8# XXX: Documentation
9#
10# Licence: BSD
11# Version: $Id: npm4000.py 750 2009-09-27 14:34:55Z rick $
12# Rick van der Zwet <info@rickvanderzwet.nl>
13
14import serial
15import sys
16import time
17import random
18
19DEBUG = False
20
21def make_checksum(command):
22 crc = 0
23 for item in command:
24 crc ^= item
25 return crc
26
27def debug(msg):
28 if DEBUG:
29 print msg
30
31def hex_to_str(command):
32 return " ".join(["%02x" % item for item in command])
33
34def str_to_hex(s):
35 """Hexadecimal string representation of 's'"""
36 return [ord(x) for x in s]
37
38def port_to_hex(port_number):
39 if port_number <= 8:
40 port = 0xa0 + port_number
41 elif port_number <= 16:
42 port = 0xb0 + (port_number - 8)
43 elif port_number <= 24:
44 port = 0xc0 + (port_number - 16)
45 else:
46 assert False, "Invalid port port_number (%i)" % port_number
47 debug("%i - %02x" % (port_number, port))
48 return port
49
50
51def hex_to_port(port):
52 base = port & 0xf0
53 index = port & 0x0f
54 if (base ^ 0xa0) == 0:
55 port_number = index + 0
56 elif (base ^ 0xb0) == 0:
57 port_number = index + 8
58 elif (base ^ 0xc0) == 0:
59 port_number = index + 16
60 else:
61 assert False, "Invalid port (%02x)" % port
62 debug("%02x - %i" % (port, port_number))
63 return port_number
64
65
66def send_raw_command(ser, raw_command, response_size=1024):
67 """ Send command to serial device and wait for response """
68
69 print "Going to send: " + hex_to_str(raw_command)
70 send_line = "".join([chr(item) for item in raw_command])
71 f = open('input.bin','w')
72 f.write(send_line)
73 f.close()
74
75 ser.write(send_line)
76 recv_line = ser.read(response_size)
77 recv_command = str_to_hex(recv_line)
78 print "Received: %s (%i)" % (hex_to_str(recv_command), len(recv_line))
79 return(recv_command)
80
81def send_command(ser, command, response_size=1024):
82 raw_command = command + [make_checksum(command)]
83 return send_raw_command(ser, raw_command, response_size)
84
85
86
87#print "%02X" % make_checksum([0xAA, 0x03, 0xFF, 0xFF])
88
89
90#[01:38:14] Send: 55 07 ff ff 12 34 56 78 5a ff ff d2 00
91#[01:38:14] Recv: ff ff a9 d1 28 ff ff 00 07 00 11 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 28 00 28 00 00 10 ff
92#[01:38:14] Comm: Refresh
93
94#55 07 ff ff 12 34 56 78 5a
95action_login = [0x55, 0x07]
96device_id = [0xff, 0xff]
97password = [0x12, 0x34, 0x56, 0x78]
98
99ser = serial.Serial('/dev/ttyUSB0', 19200, timeout=5)
100print ser
101
102# Login cycle
103#command = action_login + device_id + password
104#send_command(ser, command)
105
106# Refresh cycle
107#action_refresh = [0xd2, 0x00]
108#raw_command = command + [0x5a] + device_id + action_refresh
109#send_raw_command(ser, raw_command)
110# 55 07 ff ff 12 34 56 78 5a c2 04 ff ff a1 67
111#send_raw_command(ser, [0x55,0x07,0xff,0xff,0x12,0x34,0x56,0x78,0x5a])
112# 55 07 ff ff 12 34 56 78 5a d1 03 ff ff d2 00
113#send_raw_command(ser, [0x55, 0x07, 0xff, 0xff, 0x12, 0x34, 0x56, 0x78, 0x5a, 0xd1, 0x03, 0xff, 0xff, 0xd2, 0x00])
114
115# 55 07 FF FF 12 34 56 78 5A D1 03 FF FF D2 00
116login_line = [0x55, 0x07, 0xff, 0xff, 0x12, 0x34, 0x56, 0x78]
117refresh_line = [0xd1, 0x03, 0xff, 0xff]
118allon_line = [0xb1, 0x03, 0xff, 0xff]
119alloff_line = [0xc1, 0x03, 0xff, 0xff]
120port_on_line = [0xb2, 0x04, 0xff, 0xff, 0xa1]
121port_off_line = [0xc2, 0x04, 0xff,0xff, 0xa1]
122power_on_interval_125_line = [0xd6, 0x04, 0xff, 0xff, 0xfa, 0x28]
123power_on_interval_05_line = [0xd6, 0x04, 0xff, 0xff, 0x01, 0xd3]
124change_address_code_line = [0x05, 0x00, 0xff, 0xff, 0x00, 0x04]
125modify_password_line = [0xd3, 0x07, 0xff, 0xff, 0x11, 0x11, 0x11, 0x11]
126
127def port_on_cmd(port_number):
128 line = port_on_line
129 line[4] = port_to_hex(port_number)
130 return line
131
132def port_off_cmd(port_number):
133 line = port_off_line
134 line[4] = port_to_hex(port_number)
135 return line
136
137def address_to_num(address):
138 return (address[0] << 8) ^ address[1]
139
140def num_to_address(npm_number):
141 return [npm_number >> 8, npm_number & 0x00ff]
142
143def bin_reverse(number,width=8):
144 """Little hacking using string logic to binary reverse number"""
145 return int(bin(number)[2:].zfill(width)[::-1],2)
146
147
148def get_port_status(retval):
149 # Example port 1 is off
150 # d1 28 ff ff fe ff ff
151 # ^^ ^^ ^^
152 port_array = [False] * 25
153 status_array = bin_reverse((bin_reverse(retval[4]) << 16) ^ (bin_reverse(retval[5]) << 8) ^ bin_reverse(retval[6]),24)
154 for port in range(0,24):
155 if (status_array & (1 << port)) > 0:
156 port_array[port+1] = True
157 return port_array
158
159
160#send_command(ser, login_line, 5)
161# Note: you will need to wait at least timeout * 24 to complete
162#ser.timeout = 13
163#send_command(ser, allon_line, 6)
164#send_command(ser, alloff_line,6)
165
166#send_command(ser, power_on_interval_05_line,6)
167#send_command(ser,port_on_cmd(24),6)
168#send_command(ser,port_off_cmd(24),6)
169#retval = send_command(ser, refresh_line, 42)
170#print get_port_status(retval)
171while True:
172 port_number = random.randint(1,24)
173 print port_number
174 send_command(ser, login_line, 5)
175 send_command(ser, port_on_cmd(port_number),6)
176 send_command(ser, login_line, 5)
177 send_command(ser, port_off_cmd(port_number),6)
178
179ser.close()
180exit(0)
181
182def generateCommand(type,arg=None):
183 if arg == None:
184 command = "%04X%04X" % (getCode(type), addr_code)
185 else:
186 command = "%04X%04X%02X" % (getCode(type), addr_code, arg)
187 command += checksum(command)
188 return (command)
189
190
191def doCommand(type, arg=None):
192 sendCommand('login', passwd)
193 retval = sendCommand(type, arg)
194 #XXX: Check if command returned succesfull D128FF
195 return(retval)
196
197
198def getPortState(port, raw_status):
199 """Port configuration is put into 24 bits, every port has one bit
200 representing the statewith a awkward order:
201 A8,A7,A6,A5,A4,A3,A2,A1,B8,..,B1,C8,..,C1"""
202 # Portion of retval we are interested in
203 ports_state = int(raw_status[8:14],16)
204 port_bit_location = -1
205 if port <= 8:
206 port_bit_location = (1 << 15 << port)
207 elif port <= 16:
208 port_bit_location = (1 << 7 << (port - 8))
209 elif port <= 24:
210 port_bit_location = (1 << (port - 16 - 1))
211
212 if port_bit_location & ports_state:
213 return True
214 else:
215 return False
216
217def getPortStatus(i):
218 raw_status = doCommand('status')
219 print "Port %02i [%s]:" % (i, getPortName(i)),
220 if getPortState(i,raw_status):
221 print "1"
222 else:
223 print "0"
224 return(retval)
225
226 raw_status= doCommand('status')
227 for i in range(1,25):
228 print "Port %02i [%s]:" % (i, getPortName(i)),
229 if getPortState(i,raw_status):
230 print "1"
231 else:
232 print "0"
233
234def togglePort(port):
235 if getPortState(port):
236 doCommand('portOff',port)
237 else:
238 doCommand('portOn', port)
239
240
241def runTest():
242 print "TEST: Start will all port Offline"
243 doCommand('allPortsOff')
244
245 print "TEST: All On and Off again, in mass execution"
246 doCommand('allPortsOn',0x01)
247 doCommand('allPortsOff')
248
249 print "TEST: Enable and disable ports one by one"
250 for i in range(1,25):
251 print getPortName(i)
252 doCommand('portOn', getPortHex(i))
253 doCommand('portOff', getPortHex(i))
254
255 print "TEST: Send known error"
256 doCommand('knownError')
257
258
259def usage():
260 print """
261Usage %s arguments
262Version: $Id: npm4000.py 750 2009-09-27 14:34:55Z rick $
263
264Arguments:
265 [-h|--help] Reading right know
266 [-v|--verbose] Print extra communication output
267 --host= IP adress of FQDN hostname [%s]
268 --port= Port to connect to [%s]
269 --password= Password to use in hex notation [0x1234568]
270 --addresscode= Internal device number in hex notation [0xFFFF]
271 [-s|--status] Current port configuration
272 [-t <port>|--toggle=<port>] Toggle port(s)
273 [-o <port>|--on=<port>] Turn on port(s)
274 [-f <port>|--off=<port>] Turn off port(s)
275
276Note: <port> has different notations:
277 Numeric value of port 1,2,3,4,5,..
278 Actual value of port A1,..,A8,B1,..,B8,C1,..,C8
279 All ports all
280 """ % (sys.argv[0], inet_addr, inet_port)
281
282def main():
283 global verbose, addr_code, inet_addr, inet_port, passwd, s
284 try:
285 opts, args = getopt.getopt(sys.argv[1:],
286 "hf:s:t:o:v", ["help","verbose","host=", "port=", "password=", "addresscode=","toggle=","off=", "on=", "status="])
287 except getopt.GetoptError, err:
288 # print help information and exit:
289 print str(err) # will print something like "option -a not recognized"
290 usage()
291 sys.exit(2)
292
293 opt_port = None
294 opt_action = None
295 for o, a in opts:
296 if o in ("-v", "--verbose"):
297 verbose = True
298 elif o in ("-h", "--help"):
299 usage()
300 sys.exit()
301 elif o in ("--addresscode"):
302 addr_code = int(a,16)
303 elif o in ("--host"):
304 inet_addr = a
305 elif o in ("--password"):
306 passwd = int(a,16)
307 elif o in ("--port"):
308 inet_port = a
309 elif o in ("-s", "--status"):
310 opt_action = "status"
311 opt_port = a
312 elif o in ("-t","--toggle"):
313 opt_action = "toggle"
314 opt_port = a
315 elif o in ("-f","--off"):
316 opt_action = "off"
317 opt_port = a
318 elif o in ("-o","--on"):
319 opt_action = "on"
320 opt_port = a
321 else:
322 assert False, "unhandled option"
323
324 if (opt_port == None or opt_action == None):
325 usage()
326 sys.exit(2)
327
328 dprint ('action: ' + opt_action + ' port: ' + opt_port)
329
330 # Status needs real integers, hack
331 if opt_action == "status":
332 if opt_port == "all":
333 getStatusAll()
334 else:
335 print "XXX: Implement"
336 sys.exit(0)
337
338 # Resolve port to proper number
339 if opt_port == "all":
340 None # Blank resolution, as it is done elsewhere
341 elif opt_port[0] in ("A","B","C"):
342 opt_port = int(opt_port,16)
343 dprint('Hexcode of port: %i' % opt_port)
344 else:
345 # Dirty hack to have conversion and checking at the same time
346 opt_port = getPortHex(int(opt_port))
347 dprint('Hexcode of port: %i' % opt_port)
348
349 if opt_action == "toggle":
350 if opt_port == "all":
351 for i in range(1,25):
352 togglePort(getPortHex(i))
353 else:
354 togglePort(opt_port)
355 elif opt_action == "on":
356 if opt_port == "all":
357 doCommand("allPortsOn",0x01)
358 else:
359 doCommand("portOn", opt_port)
360 elif opt_action == "off":
361 if opt_port == "all":
362 doCommand("allPortsOff");
363 else:
364 doCommand("portOn", opt_port)
365
366
367if __name__ == "__main__":
368 main()
Note: See TracBrowser for help on using the repository browser.