#!/usr/bin/env python # NPM 4000 powerbar managment script, to be used instead of the windows # application. # # XXX: Not all features are ported yet (like amp monitoring) # XXX: Some dirty poking around with hex representations and numeric representations of ports # c2 04 ff ff a1 67XXX: Make proper classes for use # XXX: Documentation # # Licence: BSD # Version: $Id: npm4000.py 750 2009-09-27 14:34:55Z rick $ # Rick van der Zwet import serial import sys import time import random DEBUG = False def make_checksum(command): crc = 0 for item in command: crc ^= item return crc def debug(msg): if DEBUG: print msg def hex_to_str(command): return " ".join(["%02x" % item for item in command]) def str_to_hex(s): """Hexadecimal string representation of 's'""" return [ord(x) for x in s] def port_to_hex(port_number): if port_number <= 8: port = 0xa0 + port_number elif port_number <= 16: port = 0xb0 + (port_number - 8) elif port_number <= 24: port = 0xc0 + (port_number - 16) else: assert False, "Invalid port port_number (%i)" % port_number debug("%i - %02x" % (port_number, port)) return port def hex_to_port(port): base = port & 0xf0 index = port & 0x0f if (base ^ 0xa0) == 0: port_number = index + 0 elif (base ^ 0xb0) == 0: port_number = index + 8 elif (base ^ 0xc0) == 0: port_number = index + 16 else: assert False, "Invalid port (%02x)" % port debug("%02x - %i" % (port, port_number)) return port_number def send_raw_command(ser, raw_command, response_size=1024): """ Send command to serial device and wait for response """ print "Going to send: " + hex_to_str(raw_command) send_line = "".join([chr(item) for item in raw_command]) f = open('input.bin','w') f.write(send_line) f.close() ser.write(send_line) recv_line = ser.read(response_size) recv_command = str_to_hex(recv_line) print "Received: %s (%i)" % (hex_to_str(recv_command), len(recv_line)) #return(recv_command) def send_command(ser, command, response_size=1024): raw_command = command + [make_checksum(command)] return send_raw_command(ser, raw_command, response_size) #print "%02X" % make_checksum([0xAA, 0x03, 0xFF, 0xFF]) #[01:38:14] Send: 55 07 ff ff 12 34 56 78 5a ff ff d2 00 #[01:38:14] Recv: ff ff a9 d1 28 ff ff 00 07 00 11 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 28 00 28 00 00 10 ff #[01:38:14] Comm: Refresh #55 07 ff ff 12 34 56 78 5a action_login = [0x55, 0x07] device_id = [0xff, 0xff] password = [0x12, 0x34, 0x56, 0x78] ser = serial.Serial('/dev/ttyUSB0', 19200, timeout=5) print ser # Login cycle #command = action_login + device_id + password #send_command(ser, command) # Refresh cycle #action_refresh = [0xd2, 0x00] #raw_command = command + [0x5a] + device_id + action_refresh #send_raw_command(ser, raw_command) # 55 07 ff ff 12 34 56 78 5a c2 04 ff ff a1 67 #send_raw_command(ser, [0x55,0x07,0xff,0xff,0x12,0x34,0x56,0x78,0x5a]) # 55 07 ff ff 12 34 56 78 5a d1 03 ff ff d2 00 #send_raw_command(ser, [0x55, 0x07, 0xff, 0xff, 0x12, 0x34, 0x56, 0x78, 0x5a, 0xd1, 0x03, 0xff, 0xff, 0xd2, 0x00]) # 55 07 FF FF 12 34 56 78 5A D1 03 FF FF D2 00 login_line = [0x55, 0x07, 0xff, 0xff, 0x12, 0x34, 0x56, 0x78] refresh_line = [0xd1, 0x03, 0xff, 0xff, 0xd2] allon_line = [0xb1, 0x03, 0xff, 0xff] alloff_line = [0xc1, 0x03, 0xff, 0xff] port_on_line = [0xb2, 0x04, 0xff, 0xff, 0xa1] port_off_line = [0xc2, 0x04, 0xff,0xff, 0xa1] power_on_interval_125_line = [0xd6, 0x04, 0xff, 0xff, 0xfa, 0x28] power_on_interval_05_line = [0xd6, 0x04, 0xff, 0xff, 0x01, 0xd3] def port_on_cmd(port_number): line = port_on_line line[4] = port_to_hex(port_number) return line def port_off_cmd(port_number): line = port_off_line line[4] = port_to_hex(port_number) return line # Note: you will need to wait at least timeout * 24 to complete #ser.timeout = 13 #send_raw_command(ser, allon_line, 6) #send_raw_command(ser, alloff_line,6) #send_raw_command(ser, refresh_line, 42) #send_raw_command(ser, power_on_interval_05_line,6) #send_raw_command(ser,one_off_line,6) while True: port_number = random.randint(1,24) print port_number send_command(ser, login_line, 5) send_command(ser, port_on_cmd(port_number),6) send_command(ser, login_line, 5) send_command(ser, port_off_cmd(port_number),6) ser.close() sys.exit(0) # XOR all the 8bit values to get a checksum def checksum(s): crc = 0 for p in range(0, len(s),2): crc ^= int(s[p:p+2], 16) return "%02X" % crc passwd = 0x12345678 addr_code = 0xFFFF verbose = False line2code = { 'login' : (0x5507, 1), 'portOn' : (0xB204, 2), 'portOff' : (0xC204, 2), 'allPortsOff' : (0xC103, 6), 'allPortsOn' : (0xB104, 15), 'status' : (0xD103, 2), 'knownError' : (0xFFFF, 1), } # Socket used troughout the code s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) def getCode(type): return(line2code[type][0]) def getTimeout(type): return(line2code[type][1]) def dprint(msg): if verbose: print time.strftime('[%Y-%m-%d %H:%M:%S]'), msg def getPortNumber(name): port_char = name[0] port_number = int(name[1]) number = -1 if port_char == 'A': number = port_number elif port_char == 'B': number = port_number + 8 elif port_char == 'C': number = port_number + 16 else: raise ValueError, "Port %s not defined" % name return(number) def getPortName(number): name = "U3" if number <= 8: name = "A%i" % number elif number <= 16: name = "B%i" % (number - 8) elif number <= 24: name = "C%i" % (number - 16) else: raise ValueError, "Port %i not defined" % number return(name) def getPortHex(number): return(int(getPortName(number),16)) def generateCommand(type,arg=None): if arg == None: command = "%04X%04X" % (getCode(type), addr_code) else: command = "%04X%04X%02X" % (getCode(type), addr_code, arg) command += checksum(command) return (command) def doCommand(type, arg=None): sendCommand('login', passwd) retval = sendCommand(type, arg) #XXX: Check if command returned succesfull D128FF return(retval) def getPortState(port, raw_status): """Port configuration is put into 24 bits, every port has one bit representing the statewith a awkward order: A8,A7,A6,A5,A4,A3,A2,A1,B8,..,B1,C8,..,C1""" # Portion of retval we are interested in ports_state = int(raw_status[8:14],16) port_bit_location = -1 if port <= 8: port_bit_location = (1 << 15 << port) elif port <= 16: port_bit_location = (1 << 7 << (port - 8)) elif port <= 24: port_bit_location = (1 << (port - 16 - 1)) if port_bit_location & ports_state: return True else: return False def getPortStatus(i): raw_status = doCommand('status') print "Port %02i [%s]:" % (i, getPortName(i)), if getPortState(i,raw_status): print "1" else: print "0" return(retval) raw_status= doCommand('status') for i in range(1,25): print "Port %02i [%s]:" % (i, getPortName(i)), if getPortState(i,raw_status): print "1" else: print "0" def togglePort(port): if getPortState(port): doCommand('portOff',port) else: doCommand('portOn', port) def runTest(): print "TEST: Start will all port Offline" doCommand('allPortsOff') print "TEST: All On and Off again, in mass execution" doCommand('allPortsOn',0x01) doCommand('allPortsOff') print "TEST: Enable and disable ports one by one" for i in range(1,25): print getPortName(i) doCommand('portOn', getPortHex(i)) doCommand('portOff', getPortHex(i)) print "TEST: Send known error" doCommand('knownError') def usage(): print """ Usage %s arguments Version: $Id: npm4000.py 750 2009-09-27 14:34:55Z rick $ Arguments: [-h|--help] Reading right know [-v|--verbose] Print extra communication output --host= IP adress of FQDN hostname [%s] --port= Port to connect to [%s] --password= Password to use in hex notation [0x1234568] --addresscode= Internal device number in hex notation [0xFFFF] [-s|--status] Current port configuration [-t |--toggle=] Toggle port(s) [-o |--on=] Turn on port(s) [-f |--off=] Turn off port(s) Note: has different notations: Numeric value of port 1,2,3,4,5,.. Actual value of port A1,..,A8,B1,..,B8,C1,..,C8 All ports all """ % (sys.argv[0], inet_addr, inet_port) def main(): global verbose, addr_code, inet_addr, inet_port, passwd, s try: opts, args = getopt.getopt(sys.argv[1:], "hf:s:t:o:v", ["help","verbose","host=", "port=", "password=", "addresscode=","toggle=","off=", "on=", "status="]) except getopt.GetoptError, err: # print help information and exit: print str(err) # will print something like "option -a not recognized" usage() sys.exit(2) opt_port = None opt_action = None for o, a in opts: if o in ("-v", "--verbose"): verbose = True elif o in ("-h", "--help"): usage() sys.exit() elif o in ("--addresscode"): addr_code = int(a,16) elif o in ("--host"): inet_addr = a elif o in ("--password"): passwd = int(a,16) elif o in ("--port"): inet_port = a elif o in ("-s", "--status"): opt_action = "status" opt_port = a elif o in ("-t","--toggle"): opt_action = "toggle" opt_port = a elif o in ("-f","--off"): opt_action = "off" opt_port = a elif o in ("-o","--on"): opt_action = "on" opt_port = a else: assert False, "unhandled option" if (opt_port == None or opt_action == None): usage() sys.exit(2) dprint ('action: ' + opt_action + ' port: ' + opt_port) # Status needs real integers, hack if opt_action == "status": if opt_port == "all": getStatusAll() else: print "XXX: Implement" sys.exit(0) # Resolve port to proper number if opt_port == "all": None # Blank resolution, as it is done elsewhere elif opt_port[0] in ("A","B","C"): opt_port = int(opt_port,16) dprint('Hexcode of port: %i' % opt_port) else: # Dirty hack to have conversion and checking at the same time opt_port = getPortHex(int(opt_port)) dprint('Hexcode of port: %i' % opt_port) if opt_action == "toggle": if opt_port == "all": for i in range(1,25): togglePort(getPortHex(i)) else: togglePort(opt_port) elif opt_action == "on": if opt_port == "all": doCommand("allPortsOn",0x01) else: doCommand("portOn", opt_port) elif opt_action == "off": if opt_port == "all": doCommand("allPortsOff"); else: doCommand("portOn", opt_port) if __name__ == "__main__": main()