Changeset 156
- Timestamp:
- Jul 16, 2010, 10:24:29 AM (14 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
powerbar/serial-npm4000.py
r155 r156 3 3 # application. 4 4 # 5 # XXX: Not all features are ported yet (like amp monitoring) 6 # XXX: Some dirty poking around with hex representations and numeric representations of ports 7 # c2 04 ff ff a1 67XXX: Make proper classes for use 8 # XXX: Documentation 5 # TODO: Not all features are ported yet (like amp monitoring) 6 # TODO: Error handling not implemented 9 7 # 10 8 # Licence: BSD … … 16 14 import time 17 15 import random 18 16 import getopt 17 18 # Set to true to enable verbose communication aka debugging 19 19 DEBUG = False 20 20 21 # Default for options 22 opt_serial_port = '/dev/ttyUSB0' 23 opt_password = 0x1245678 24 opt_address_code = 0xFFFF 25 opt_baudrate = 19200 26 27 # Serial connection port status is cached globally to avoid overhead 28 serial = None 29 port_status_synced = False 30 ports_status = None 31 21 32 def make_checksum(command): 33 """ Generate CRC checksum using XOR on all bytes """ 22 34 crc = 0 23 35 for item in command: … … 25 37 return crc 26 38 39 40 27 41 def debug(msg): 28 if DEBUG: 29 print msg 42 """ Print debug statement if DEBUG is set """ 43 if DEBUG: 44 print msg 45 46 30 47 31 48 def hex_to_str(command): 49 """ Human readable representation of command """ 32 50 return " ".join(["%02x" % item for item in command]) 33 51 52 53 34 54 def str_to_hex(s): 35 """ Hexadecimal string representation of 's'"""55 """ Hexadecimal string representation of 's' """ 36 56 return [ord(x) for x in s] 37 57 58 59 38 60 def port_to_hex(port_number): 61 """ Convert integer port number to hexadecimal presentation as internal 62 location 63 """ 64 if port_number < 1: 65 assert False, "Invalid port port_number (%i)" % port_number 39 66 if port_number <= 8: 40 67 port = 0xa0 + port_number … … 49 76 50 77 78 51 79 def hex_to_port(port): 80 """ Convert hexadecimal port to human port number """ 52 81 base = port & 0xf0 53 82 index = port & 0x0f … … 64 93 65 94 66 def send_raw_command(ser, raw_command, response_size=1024): 67 """ Send command to serial device and wait for response """ 95 96 def send_raw_command(raw_command, response_size=1024): 97 """ Send raw command to serial device and wait for response """ 68 98 69 99 print "Going to send: " + hex_to_str(raw_command) 70 100 send_line = "".join([chr(item) for item in raw_command]) 71 f = open('input.bin','w') 72 f.write(send_line) 73 f.close() 74 75 ser.write(send_line) 76 recv_line = ser.read(response_size) 101 serial.write(send_line) 102 recv_line = serial.read(response_size) 77 103 recv_command = str_to_hex(recv_line) 78 104 print "Received: %s (%i)" % (hex_to_str(recv_command), len(recv_line)) 79 105 return(recv_command) 80 106 81 def send_command(ser, command, response_size=1024): 82 raw_command = command + [make_checksum(command)] 83 return send_raw_command(ser, raw_command, response_size) 84 85 86 87 #print "%02X" % make_checksum([0xAA, 0x03, 0xFF, 0xFF]) 88 89 90 #[01:38:14] Send: 55 07 ff ff 12 34 56 78 5a ff ff d2 00 91 #[01:38:14] Recv: ff ff a9 d1 28 ff ff 00 07 00 11 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 28 00 28 00 00 10 ff 92 #[01:38:14] Comm: Refresh 93 94 #55 07 ff ff 12 34 56 78 5a 95 action_login = [0x55, 0x07] 96 device_id = [0xff, 0xff] 97 password = [0x12, 0x34, 0x56, 0x78] 98 99 ser = serial.Serial('/dev/ttyUSB0', 19200, timeout=5) 100 print ser 101 102 # Login cycle 103 #command = action_login + device_id + password 104 #send_command(ser, command) 105 106 # Refresh cycle 107 #action_refresh = [0xd2, 0x00] 108 #raw_command = command + [0x5a] + device_id + action_refresh 109 #send_raw_command(ser, raw_command) 110 # 55 07 ff ff 12 34 56 78 5a c2 04 ff ff a1 67 111 #send_raw_command(ser, [0x55,0x07,0xff,0xff,0x12,0x34,0x56,0x78,0x5a]) 112 # 55 07 ff ff 12 34 56 78 5a d1 03 ff ff d2 00 113 #send_raw_command(ser, [0x55, 0x07, 0xff, 0xff, 0x12, 0x34, 0x56, 0x78, 0x5a, 0xd1, 0x03, 0xff, 0xff, 0xd2, 0x00]) 114 115 # 55 07 FF FF 12 34 56 78 5A D1 03 FF FF D2 00 116 login_line = [0x55, 0x07, 0xff, 0xff, 0x12, 0x34, 0x56, 0x78] 117 refresh_line = [0xd1, 0x03, 0xff, 0xff] 118 allon_line = [0xb1, 0x03, 0xff, 0xff] 119 alloff_line = [0xc1, 0x03, 0xff, 0xff] 120 port_on_line = [0xb2, 0x04, 0xff, 0xff, 0xa1] 121 port_off_line = [0xc2, 0x04, 0xff,0xff, 0xa1] 122 power_on_interval_125_line = [0xd6, 0x04, 0xff, 0xff, 0xfa, 0x28] 123 power_on_interval_05_line = [0xd6, 0x04, 0xff, 0xff, 0x01, 0xd3] 124 change_address_code_line = [0x05, 0x00, 0xff, 0xff, 0x00, 0x04] 125 modify_password_line = [0xd3, 0x07, 0xff, 0xff, 0x11, 0x11, 0x11, 0x11] 126 127 def port_on_cmd(port_number): 128 line = port_on_line 129 line[4] = port_to_hex(port_number) 130 return line 131 132 def port_off_cmd(port_number): 133 line = port_off_line 134 line[4] = port_to_hex(port_number) 135 return line 107 136 108 137 109 def address_to_num(address): 110 """ Convert internal address representation to integer """ 138 111 return (address[0] << 8) ^ address[1] 139 112 113 114 140 115 def num_to_address(npm_number): 116 """ Convert address number to internal representation """ 141 117 return [npm_number >> 8, npm_number & 0x00ff] 118 119 142 120 143 121 def bin_reverse(number,width=8): 144 122 """Little hacking using string logic to binary reverse number""" 145 123 return int(bin(number)[2:].zfill(width)[::-1],2) 146 147 148 def get_port_status(retval):149 # Example port 1 is off150 # d1 28 ff ff fe ff ff151 # ^^ ^^ ^^152 port_array = [False] * 25153 status_array = bin_reverse((bin_reverse(retval[4]) << 16) ^ (bin_reverse(retval[5]) << 8) ^ bin_reverse(retval[6]),24)154 for port in range(0,24):155 if (status_array & (1 << port)) > 0:156 port_array[port+1] = True157 return port_array158 159 124 160 125 #send_command(ser, login_line, 5) … … 169 134 #retval = send_command(ser, refresh_line, 42) 170 135 #print get_port_status(retval) 171 while True: 172 port_number = random.randint(1,24) 173 print port_number 174 send_command(ser, login_line, 5) 175 send_command(ser, port_on_cmd(port_number),6) 176 send_command(ser, login_line, 5) 177 send_command(ser, port_off_cmd(port_number),6) 136 #while True: 137 # port_number = random.randint(1,24) 138 # print port_number 139 # send_command(ser, login_line, 5) 140 # send_command(ser, port_on_cmd(port_number),6) 141 # send_command(ser, login_line, 5) 142 # send_command(ser, port_off_cmd(port_number),6) 143 144 # Login cycle 145 #command = action_login + device_id + password 146 #send_command(ser, command) 147 # Reference implementation lines 148 # A = action, B = address, C = password, P = port 149 # Mostly of type [A, A, B, B, C, C, C, C] or [A, A, B, B] or [A, A, B, B, P] 150 # (command, return_type, timeout) 151 line = dict() 152 line['login'] = ([0x55, 0x07, 0xff, 0xff, 0x12, 0x34, 0x56, 0x78], 5, 1) 153 line['status'] = ([0xd1, 0x03, 0xff, 0xff], 42, 1) 154 line['allon'] = ([0xb1, 0x03, 0xff, 0xff], 6, 13) 155 line['alloff'] = ([0xc1, 0x03, 0xff, 0xff], 6, 13) 156 line['port_on'] = ([0xb2, 0x04, 0xff, 0xff, 0xa1], 6, 1) 157 line['port_off'] = ([0xc2, 0x04, 0xff,0xff, 0xa1], 6, 1) 158 line['power_on_interval_125'] = ([0xd6, 0x04, 0xff, 0xff, 0xfa, 0x28], 5, 1) 159 line['power_on_interval_05'] = ([0xd6, 0x04, 0xff, 0xff, 0x01, 0xd3], 5, 1) 160 line['change_address_code'] = ([0x05, 0x00, 0xff, 0xff, 0x00, 0x04], 5, 1) 161 line['modify_password'] = ([0xd3, 0x07, 0xff, 0xff, 0x11, 0x11, 0x11, 0x11], 5, 1) 162 163 def num_to_hex(number): 164 """ Number to internal hexadecimal representation """ 165 if number == None: 166 return [] 167 length = len(hex(number)[2:]) + (len(hex(number)[2:]) % 2) 168 return [hex(number)[2:].zfill(length)[x:x+2] for x in range(0,length,2)] 169 170 171 172 def send_command(action, argument=None): 173 """ Send CRC computed command to serial device and wait for response """ 174 (command, response_size, timeout) = line[action] 175 command = command[0:1] + num_to_hex(opt_address_code) + num_to_hex(argument) 176 serial.timeout = timeout 177 raw_command = command + [make_checksum(command)] 178 return send_raw_command(raw_command, response_size) 179 180 181 def action_login(): 182 """ Login to device """ 183 return send_command('login', opt_password) 184 185 def action_status(): 186 """ Get port status from device """ 187 action_login() 188 return send_command('status') 189 190 def action_port_on(port): 191 """ Enable port on device """ 192 action_login() 193 port_status_synced = False 194 return send_command('port_on', port) 195 196 def action_port_off(port): 197 """ Disable port on device """ 198 action_login() 199 port_status_synced = False 200 return send_command('port_off', port) 201 202 203 def get_ports_status(): 204 # Example port 1 is off 205 # d1 28 ff ff fe ff ff 206 # ^^ ^^ ^^ 207 # TODO: Implement Ampere monitoring 208 #[01:38:14] Send: 55 07 ff ff 12 34 56 78 5a ff ff d2 00 209 #[01:38:14] Recv: ff ff a9 d1 28 ff ff 00 07 00 11 00 00 00 00 00 00 00 00 210 # 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 211 # 28 00 28 00 00 10 ff 212 #[01:38:14] Comm: Refresh 213 port_array = [False] * 25 214 215 retval = send_command('status') 216 status_array = bin_reverse((bin_reverse(retval[4]) << 16) ^ (bin_reverse(retval[5]) << 8) ^ bin_reverse(retval[6]),24) 217 for port in range(0,24): 218 if (status_array & (1 << port)) > 0: 219 port_array[port+1] = True 220 221 # Update global state 222 ports_status = port_array 223 port_status_synced = True 224 return port_array 225 226 227 def get_port_status(port): 228 """ Get specific port status """ 229 if port_status_synced: 230 return ports_status[port] 231 else: 232 return get_ports_status()[port] 233 234 178 235 179 ser.close()180 exit(0)181 182 def generateCommand(type,arg=None):183 if arg == None:184 command = "%04X%04X" % (getCode(type), addr_code)185 else:186 command = "%04X%04X%02X" % (getCode(type), addr_code, arg)187 command += checksum(command)188 return (command)189 190 191 def doCommand(type, arg=None):192 sendCommand('login', passwd)193 retval = sendCommand(type, arg)194 #XXX: Check if command returned succesfull D128FF195 return(retval)196 197 198 def getPortState(port, raw_status):199 """Port configuration is put into 24 bits, every port has one bit200 representing the statewith a awkward order:201 A8,A7,A6,A5,A4,A3,A2,A1,B8,..,B1,C8,..,C1"""202 # Portion of retval we are interested in203 ports_state = int(raw_status[8:14],16)204 port_bit_location = -1205 if port <= 8:206 port_bit_location = (1 << 15 << port)207 elif port <= 16:208 port_bit_location = (1 << 7 << (port - 8))209 elif port <= 24:210 port_bit_location = (1 << (port - 16 - 1))211 212 if port_bit_location & ports_state:213 return True214 else:215 return False216 217 def getPortStatus(i):218 raw_status = doCommand('status')219 print "Port %02i [%s]:" % (i, getPortName(i)),220 if getPortState(i,raw_status):221 print "1"222 else:223 print "0"224 return(retval)225 226 raw_status= doCommand('status')227 for i in range(1,25):228 print "Port %02i [%s]:" % (i, getPortName(i)),229 if getPortState(i,raw_status):230 print "1"231 else:232 print "0"233 234 def togglePort(port):235 if getPortState(port):236 doCommand('portOff',port)237 else:238 doCommand('portOn', port)239 236 240 237 … … 257 254 258 255 259 def usage(): 260 print """ 256 257 def usage(msg="",exitcode=None): 258 print """%s 261 259 Usage %s arguments 262 260 Version: $Id: npm4000.py 750 2009-09-27 14:34:55Z rick $ … … 264 262 Arguments: 265 263 [-h|--help] Reading right know 266 [-v|--verbose] Print extra communication output 267 --host= IP adress of FQDN hostname [%s] 268 --port= Port to connect to [%s] 269 --password= Password to use in hex notation [0x1234568] 270 --addresscode= Internal device number in hex notation [0xFFFF] 271 [-s|--status] Current port configuration 264 [-d|--debug] Print extra communication output 265 --serialport= Serial Port to connect to [%s] 266 --password= Password to use in hex notation [%s] 267 --addresscode= Internal device number in hex notation [%s] 268 [-s <port>|--status=<port>] Current port(s) configuration 272 269 [-t <port>|--toggle=<port>] Toggle port(s) 273 270 [-o <port>|--on=<port>] Turn on port(s) 274 271 [-f <port>|--off=<port>] Turn off port(s) 272 --allon Turn all ports on using internal safety [TODO: Implement] 273 --alloff Turn all ports off using internal safety [TODO: Implement] 274 --changepassword= Change password [TODO: Implement] 275 --changeaddresscode= Change addresscode [TODO: Implement] 276 --changetimerdelay= Change internal timer delay [TODO: Implement] 277 --delay= Delay used between port operations [TODO: Implement] 278 --baudrate= Bautrate used for communication (19200,9600) [%s] 275 279 276 280 Note: <port> has different notations: … … 278 282 Actual value of port A1,..,A8,B1,..,B8,C1,..,C8 279 283 All ports all 280 """ % (sys.argv[0], inet_addr, inet_port) 284 """ % (msg, sys.argv[0], opt_serial_port, hex(opt_password), hex(opt_address_code), opt_baudrate) 285 if exitcode: 286 sys.exit(exitcode) 281 287 282 288 def main(): 283 global verbose, addr_code, inet_addr, inet_port, passwd, s284 289 try: 285 290 opts, args = getopt.getopt(sys.argv[1:], 286 " hf:s:t:o:v", ["help","verbose","host=", "port=", "password=", "addresscode=","toggle=","off=", "on=", "status="])291 "df:s:t:o:v", ["debug","verbose", "serialport=", "password=", "addresscode=","toggle=","off=", "on=", "status=", "buadrate="]) 287 292 except getopt.GetoptError, err: 288 # print help information and exit: 289 print str(err) # will print something like "option -a not recognized" 290 usage() 291 sys.exit(2) 293 usage(str(err),2) 292 294 293 295 opt_port = None 294 296 opt_action = None 295 297 for o, a in opts: 296 if o in ("- v", "--verbose"):297 verbose= True298 if o in ("-d", "--debug"): 299 DEBUG = True 298 300 elif o in ("-h", "--help"): 299 usage() 300 sys.exit() 301 usage("",0) 301 302 elif o in ("--addresscode"): 302 addr_code = int(a,16) 303 elif o in ("--host"): 304 inet_addr = a 303 opt_address_code = int(a,16) 305 304 elif o in ("--password"): 306 passwd = int(a,16) 307 elif o in ("--port"): 308 inet_port = a 305 opt_passwd = int(a,16) 306 elif o in ("--buadrate"): 307 opt_baudrate = a 308 elif o in ("--serialport"): 309 opt_serial_port = a 309 310 elif o in ("-s", "--status"): 310 311 opt_action = "status" … … 323 324 324 325 if (opt_port == None or opt_action == None): 325 usage() 326 sys.exit(2) 327 328 dprint ('action: ' + opt_action + ' port: ' + opt_port) 326 usage("[ERROR] No port or action defined",2) 327 328 329 # Resolve port to proper numbers array 330 ports = [] 331 for port in opt_port.split(','): 332 if port == "all": 333 ports.extend(range(1,25)) 334 elif port[0] in ("A","B","C"): 335 ports.extend(int(port,16)) 336 else: 337 ports.extend(int(port)) 338 debug("Operating on ports " + ",".join(port)) 339 340 serial = serial.Serial(opt_serial_port, opt_baudrate, timeout=5) 341 debug(serial) 342 329 343 330 344 # Status needs real integers, hack 331 if opt_action == "status": 332 if opt_port == "all": 333 getStatusAll() 345 for port in ports: 346 if opt_action == "status": 347 action_status(port) 348 elif opt_action == "toggle": 349 if get_port_status(port): 350 action_port_off(port) 334 351 else: 335 print "XXX: Implement" 336 sys.exit(0) 337 338 # Resolve port to proper number 339 if opt_port == "all": 340 None # Blank resolution, as it is done elsewhere 341 elif opt_port[0] in ("A","B","C"): 342 opt_port = int(opt_port,16) 343 dprint('Hexcode of port: %i' % opt_port) 344 else: 345 # Dirty hack to have conversion and checking at the same time 346 opt_port = getPortHex(int(opt_port)) 347 dprint('Hexcode of port: %i' % opt_port) 348 349 if opt_action == "toggle": 350 if opt_port == "all": 351 for i in range(1,25): 352 togglePort(getPortHex(i)) 353 else: 354 togglePort(opt_port) 355 elif opt_action == "on": 356 if opt_port == "all": 357 doCommand("allPortsOn",0x01) 358 else: 359 doCommand("portOn", opt_port) 360 elif opt_action == "off": 361 if opt_port == "all": 362 doCommand("allPortsOff"); 363 else: 364 doCommand("portOn", opt_port) 365 352 action_port_on(port) 353 elif opt_action == "on": 354 action_port_on(port) 355 elif opt_action == "off": 356 action_port_off(port) 357 else: 358 assert False, "Option '%s' invalid" % opt_action 359 360 serial.close() 366 361 367 362 if __name__ == "__main__":
Note:
See TracChangeset
for help on using the changeset viewer.