#!/usr/bin/env python # NPM 4000 powerbar managment script, to be used instead of the windows # application. # # TODO: Not all features are ported yet (like amp monitoring) # TODO: Error handling not implemented # # Licence: BSD # Version: $Id: npm4000.py 750 2009-09-27 14:34:55Z rick $ # Rick van der Zwet import serial import sys import time import random import getopt # Set to true to enable verbose communication aka debugging DEBUG = False # Default for options opt_serial_port = '/dev/ttyUSB0' opt_password = 0x1245678 opt_address_code = 0xFFFF opt_baudrate = 19200 # Serial connection port status is cached globally to avoid overhead serial = None port_status_synced = False ports_status = None def make_checksum(command): """ Generate CRC checksum using XOR on all bytes """ crc = 0 for item in command: crc ^= item return crc def debug(msg): """ Print debug statement if DEBUG is set """ if DEBUG: print msg def hex_to_str(command): """ Human readable representation of command """ return " ".join(["%02x" % item for item in command]) def str_to_hex(s): """ Hexadecimal string representation of 's' """ return [ord(x) for x in s] def port_to_hex(port_number): """ Convert integer port number to hexadecimal presentation as internal location """ if port_number < 1: assert False, "Invalid port port_number (%i)" % port_number if port_number <= 8: port = 0xa0 + port_number elif port_number <= 16: port = 0xb0 + (port_number - 8) elif port_number <= 24: port = 0xc0 + (port_number - 16) else: assert False, "Invalid port port_number (%i)" % port_number debug("%i - %02x" % (port_number, port)) return port def hex_to_port(port): """ Convert hexadecimal port to human port number """ base = port & 0xf0 index = port & 0x0f if (base ^ 0xa0) == 0: port_number = index + 0 elif (base ^ 0xb0) == 0: port_number = index + 8 elif (base ^ 0xc0) == 0: port_number = index + 16 else: assert False, "Invalid port (%02x)" % port debug("%02x - %i" % (port, port_number)) return port_number def send_raw_command(raw_command, response_size=1024): """ Send raw command to serial device and wait for response """ print "Going to send: " + hex_to_str(raw_command) send_line = "".join([chr(item) for item in raw_command]) serial.write(send_line) recv_line = serial.read(response_size) recv_command = str_to_hex(recv_line) print "Received: %s (%i)" % (hex_to_str(recv_command), len(recv_line)) return(recv_command) def address_to_num(address): """ Convert internal address representation to integer """ return (address[0] << 8) ^ address[1] def num_to_address(npm_number): """ Convert address number to internal representation """ return [npm_number >> 8, npm_number & 0x00ff] def bin_reverse(number,width=8): """Little hacking using string logic to binary reverse number""" return int(bin(number)[2:].zfill(width)[::-1],2) #send_command(ser, login_line, 5) # Note: you will need to wait at least timeout * 24 to complete #ser.timeout = 13 #send_command(ser, allon_line, 6) #send_command(ser, alloff_line,6) #send_command(ser, power_on_interval_05_line,6) #send_command(ser,port_on_cmd(24),6) #send_command(ser,port_off_cmd(24),6) #retval = send_command(ser, refresh_line, 42) #print get_port_status(retval) #while True: # port_number = random.randint(1,24) # print port_number # send_command(ser, login_line, 5) # send_command(ser, port_on_cmd(port_number),6) # send_command(ser, login_line, 5) # send_command(ser, port_off_cmd(port_number),6) # Login cycle #command = action_login + device_id + password #send_command(ser, command) # Reference implementation lines # A = action, B = address, C = password, P = port # Mostly of type [A, A, B, B, C, C, C, C] or [A, A, B, B] or [A, A, B, B, P] # (command, return_type, timeout) line = dict() line['login'] = ([0x55, 0x07, 0xff, 0xff, 0x12, 0x34, 0x56, 0x78], 5, 1) line['status'] = ([0xd1, 0x03, 0xff, 0xff], 42, 1) line['allon'] = ([0xb1, 0x03, 0xff, 0xff], 6, 13) line['alloff'] = ([0xc1, 0x03, 0xff, 0xff], 6, 13) line['port_on'] = ([0xb2, 0x04, 0xff, 0xff, 0xa1], 6, 1) line['port_off'] = ([0xc2, 0x04, 0xff,0xff, 0xa1], 6, 1) line['power_on_interval_125'] = ([0xd6, 0x04, 0xff, 0xff, 0xfa, 0x28], 5, 1) line['power_on_interval_05'] = ([0xd6, 0x04, 0xff, 0xff, 0x01, 0xd3], 5, 1) line['change_address_code'] = ([0x05, 0x00, 0xff, 0xff, 0x00, 0x04], 5, 1) line['modify_password'] = ([0xd3, 0x07, 0xff, 0xff, 0x11, 0x11, 0x11, 0x11], 5, 1) def num_to_hex(number): """ Number to internal hexadecimal representation """ if number == None: return [] length = len(hex(number)[2:]) + (len(hex(number)[2:]) % 2) return [hex(number)[2:].zfill(length)[x:x+2] for x in range(0,length,2)] def send_command(action, argument=None): """ Send CRC computed command to serial device and wait for response """ (command, response_size, timeout) = line[action] command = command[0:1] + num_to_hex(opt_address_code) + num_to_hex(argument) serial.timeout = timeout raw_command = command + [make_checksum(command)] return send_raw_command(raw_command, response_size) def action_login(): """ Login to device """ return send_command('login', opt_password) def action_status(): """ Get port status from device """ action_login() return send_command('status') def action_port_on(port): """ Enable port on device """ action_login() port_status_synced = False return send_command('port_on', port) def action_port_off(port): """ Disable port on device """ action_login() port_status_synced = False return send_command('port_off', port) def get_ports_status(): # Example port 1 is off # d1 28 ff ff fe ff ff # ^^ ^^ ^^ # TODO: Implement Ampere monitoring #[01:38:14] Send: 55 07 ff ff 12 34 56 78 5a ff ff d2 00 #[01:38:14] Recv: ff ff a9 d1 28 ff ff 00 07 00 11 00 00 00 00 00 00 00 00 # 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 # 28 00 28 00 00 10 ff #[01:38:14] Comm: Refresh port_array = [False] * 25 retval = send_command('status') status_array = bin_reverse((bin_reverse(retval[4]) << 16) ^ (bin_reverse(retval[5]) << 8) ^ bin_reverse(retval[6]),24) for port in range(0,24): if (status_array & (1 << port)) > 0: port_array[port+1] = True # Update global state ports_status = port_array port_status_synced = True return port_array def get_port_status(port): """ Get specific port status """ if port_status_synced: return ports_status[port] else: return get_ports_status()[port] def runTest(): print "TEST: Start will all port Offline" doCommand('allPortsOff') print "TEST: All On and Off again, in mass execution" doCommand('allPortsOn',0x01) doCommand('allPortsOff') print "TEST: Enable and disable ports one by one" for i in range(1,25): print getPortName(i) doCommand('portOn', getPortHex(i)) doCommand('portOff', getPortHex(i)) print "TEST: Send known error" doCommand('knownError') def usage(msg="",exitcode=None): print """%s Usage %s arguments Version: $Id: npm4000.py 750 2009-09-27 14:34:55Z rick $ Arguments: [-h|--help] Reading right know [-d|--debug] Print extra communication output --serialport= Serial Port to connect to [%s] --password= Password to use in hex notation [%s] --addresscode= Internal device number in hex notation [%s] [-s |--status=] Current port(s) configuration [-t |--toggle=] Toggle port(s) [-o |--on=] Turn on port(s) [-f |--off=] Turn off port(s) --allon Turn all ports on using internal safety [TODO: Implement] --alloff Turn all ports off using internal safety [TODO: Implement] --changepassword= Change password [TODO: Implement] --changeaddresscode= Change addresscode [TODO: Implement] --changetimerdelay= Change internal timer delay [TODO: Implement] --delay= Delay used between port operations [TODO: Implement] --baudrate= Bautrate used for communication (19200,9600) [%s] Note: has different notations: Numeric value of port 1,2,3,4,5,.. Actual value of port A1,..,A8,B1,..,B8,C1,..,C8 All ports all """ % (msg, sys.argv[0], opt_serial_port, hex(opt_password), hex(opt_address_code), opt_baudrate) if exitcode: sys.exit(exitcode) def main(): try: opts, args = getopt.getopt(sys.argv[1:], "df:s:t:o:v", ["debug","verbose", "serialport=", "password=", "addresscode=","toggle=","off=", "on=", "status=", "buadrate="]) except getopt.GetoptError, err: usage(str(err),2) opt_port = None opt_action = None for o, a in opts: if o in ("-d", "--debug"): DEBUG = True elif o in ("-h", "--help"): usage("",0) elif o in ("--addresscode"): opt_address_code = int(a,16) elif o in ("--password"): opt_passwd = int(a,16) elif o in ("--buadrate"): opt_baudrate = a elif o in ("--serialport"): opt_serial_port = a elif o in ("-s", "--status"): opt_action = "status" opt_port = a elif o in ("-t","--toggle"): opt_action = "toggle" opt_port = a elif o in ("-f","--off"): opt_action = "off" opt_port = a elif o in ("-o","--on"): opt_action = "on" opt_port = a else: assert False, "unhandled option" if (opt_port == None or opt_action == None): usage("[ERROR] No port or action defined",2) # Resolve port to proper numbers array ports = [] for port in opt_port.split(','): if port == "all": ports.extend(range(1,25)) elif port[0] in ("A","B","C"): ports.extend(int(port,16)) else: ports.extend(int(port)) debug("Operating on ports " + ",".join(port)) serial = serial.Serial(opt_serial_port, opt_baudrate, timeout=5) debug(serial) # Status needs real integers, hack for port in ports: if opt_action == "status": action_status(port) elif opt_action == "toggle": if get_port_status(port): action_port_off(port) else: action_port_on(port) elif opt_action == "on": action_port_on(port) elif opt_action == "off": action_port_off(port) else: assert False, "Option '%s' invalid" % opt_action serial.close() if __name__ == "__main__": main()