#!/usr/bin/env python # NPM 4000 powerbar managment script, to be used instead of the windows # application. # # TODO: Not all features are ported yet (like amp monitoring) # TODO: Error handling not implemented # # Licence: BSD # Version: $Id: npm4000.py 750 2009-09-27 14:34:55Z rick $ # Rick van der Zwet import serial import sys import time import random import getopt # Set to true to enable verbose communication aka debugging DEBUG = False # Default for options opt_serial_port = '/dev/ttyUSB0' opt_password = 0x12345678 opt_address_code = 0xFFFF opt_baudrate = 19200 opt_delay = 0.0 # Serial connection port status is cached globally to avoid overhead ser = None port_status_synced = False ports_status = None ports_ampere = None # Segment A, B, C grid_status = None def make_checksum(command): """ Generate CRC checksum using XOR on all bytes """ crc = 0 for item in command: crc ^= item return crc def debug(msg): """ Print debug statement if DEBUG is set """ if DEBUG: print msg def hex_to_str(command): """ Human readable representation of command """ return " ".join(["%02x" % item for item in command]) def str_to_hex(s): """ Hexadecimal string representation of 's' """ return [ord(x) for x in s] def port_to_hex(port_number): """ Convert integer port number to hexadecimal presentation as internal location """ if port_number < 1: assert False, "Invalid port port_number (%i)" % port_number if port_number <= 8: port = 0xa0 + port_number elif port_number <= 16: port = 0xb0 + (port_number - 8) elif port_number <= 24: port = 0xc0 + (port_number - 16) else: assert False, "Invalid port port_number (%i)" % port_number debug("%i - %02x" % (port_number, port)) return port def hex_to_port(port): """ Convert hexadecimal port to human port number """ base = port & 0xf0 index = port & 0x0f if (base ^ 0xa0) == 0: port_number = index + 0 elif (base ^ 0xb0) == 0: port_number = index + 8 elif (base ^ 0xc0) == 0: port_number = index + 16 else: assert False, "Invalid port (%02x)" % port debug("%02x - %i" % (port, port_number)) return port_number def send_raw_command(raw_command, response_size=1024): """ Send raw command to serial device and wait for response """ debug("Going to send: " + hex_to_str(raw_command)) send_line = "".join([chr(item) for item in raw_command]) ser.write(send_line) recv_line = ser.read(response_size) recv_command = str_to_hex(recv_line) debug("Received: %s (%i)" % (hex_to_str(recv_command), len(recv_line))) return(recv_command) def address_to_num(address): """ Convert internal address representation to integer """ return (address[0] << 8) ^ address[1] def num_to_address(npm_number): """ Convert address number to internal representation """ return [npm_number >> 8, npm_number & 0x00ff] def bin_reverse(number,width=8): """Little hacking using string logic to binary reverse number""" return int(bin(number)[2:].zfill(width)[::-1],2) # Login cycle #command = action_login + device_id + password #send_command(ser, command) # Reference implementation lines # A = action, B = address, C = password, P = port # Mostly of type [A, A, B, B, C, C, C, C] or [A, A, B, B] or [A, A, B, B, P] # (command, return_type, timeout) line = dict() line['login'] = ([0x55, 0x07, 0xff, 0xff, 0x12, 0x34, 0x56, 0x78], 5, 1) line['status'] = ([0xd1, 0x03, 0xff, 0xff], 42, 1) line['allon'] = ([0xb1, 0x03, 0xff, 0xff], 6, 13) line['alloff'] = ([0xc1, 0x03, 0xff, 0xff], 6, 13) line['port_on'] = ([0xb2, 0x04, 0xff, 0xff, 0xa1], 6, 1) line['port_off'] = ([0xc2, 0x04, 0xff,0xff, 0xa1], 6, 1) line['power_on_interval_125'] = ([0xd6, 0x04, 0xff, 0xff, 0xfa, 0x28], 5, 1) line['power_on_interval_05'] = ([0xd6, 0x04, 0xff, 0xff, 0x01, 0xd3], 5, 1) line['change_address_code'] = ([0x05, 0x00, 0xff, 0xff, 0x00, 0x04], 5, 1) line['modify_password'] = ([0xd3, 0x07, 0xff, 0xff, 0x11, 0x11, 0x11, 0x11], 5, 1) def num_to_hex(number): """ Number to internal hexadecimal representation """ if number == None: return [] length = len(hex(number)[2:]) + (len(hex(number)[2:]) % 2) return [int(hex(number)[2:].zfill(length)[x:x+2],16) for x in range(0,length,2)] #print hex_to_str(num_to_hex(opt_password)) #exit(0) def send_command(action, argument=[]): """ Send CRC computed command to serial device and wait for response """ (command, response_size, timeout) = line[action] if not isinstance(argument, list): argument = num_to_hex(argument) command = command[0:2] + num_to_hex(opt_address_code) + argument serial.timeout = timeout raw_command = command + [make_checksum(command)] return send_raw_command(raw_command, response_size) def action_login(): """ Login to device """ return send_command('login', opt_password) def action_status(): """ Get port status from device """ action_login() return send_command('status') def action_port_on(port): """ Enable port on device """ global port_status_synced port_status_synced = False action_login() return send_command('port_on', port_to_hex(port)) def action_port_off(port): """ Disable port on device """ global port_status_synced port_status_synced = False action_login() return send_command('port_off', port_to_hex(port)) def action_toggle_port(port): """ Toggle port state """ if get_port_status(port): action_port_off(port) else: action_port_on(port) def get_ports_status(): global ports_status, port_status_synced, ports_ampere, grid_status ports_status_synced = False # Example port 1 is off # d1 28 ff ff fe ff ff # ^^ ^^ ^^ # TODO: Implement Ampere monitoring #[01:38:14] Send: 55 07 ff ff 12 34 56 78 5a ff ff d2 00 #[01:38:14] Recv: ff ff a9 d1 28 ff ff 00 07 00 11 00 00 00 00 00 00 00 00 # 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 # 28 00 28 00 00 10 ff #[01:38:14] Comm: Refresh port_array = [False] * 25 ports_ampere = [0] * 25 grid_status = [0] * 3 action_login() retval = send_command('status') status_array = bin_reverse((bin_reverse(retval[4]) << 16) ^ (bin_reverse(retval[5]) << 8) ^ bin_reverse(retval[6]),24) for port in range(0,24): if (status_array & (1 << port)) > 0: port_array[port+1] = True # retval[7] is the sensor state? ports_ampere = [0] + retval[8:16] + retval[17:25] + retval[26:34] grid_status = [retval[16], retval[25], retval[34]] # Update global state ports_status = port_array port_status_synced = True return port_array def get_port_status(port): """ Get specific port status """ global ports_status, port_status_synced if not port_status_synced: get_ports_status() return ports_status[port] def get_port_ampere(port): """ Get specific port ampere usage """ global ports_ampere, port_status_synced if not port_status_synced: get_ports_status() return ports_ampere[port] def get_grid_status(): """ Get grid ampere usage """ global grid_status, ports_status_synced if not port_status_synced: get_ports_status() return grid_status def bool_to_str(boolean, raw=False): if not raw: return str(boolean) elif boolean: return "1" else: return "0" def usage(msg="",exitcode=None): if msg: msg = "[ERROR] %s" % msg print """%(msg)s Usage %(argv)s arguments Version: $Id: npm4000.py 750 2009-09-27 14:34:55Z rick $ Arguments: [-h|--help] Reading right know [-a|--ampere] Display ampere readings [-d|--debug] Print extra communication output [-r|--raw] Status(es) is bits like output --serialport= Serial Port to connect to [%(serialport)s] --password= Password to use in hex notation [%(password)s] --addresscode= Internal device number in hex notation [%(addresscode)s] --delay= Delay used between port operations [%(delay)s] --baudrate= Bautrate used for communication (19200,9600) [%(baudrate)s] [-s |--status=] Current port(s) configuration [-t |--toggle=] Toggle port(s) [-o |--on=] Turn on port(s) [-f |--off=] Turn off port(s) --allon Turn all ports on using internal safety [TODO: Implement] --alloff Turn all ports off using internal safety [TODO: Implement] --changepassword= Change password [TODO: Implement] --changeaddresscode= Change addresscode [TODO: Implement] --changetimerdelay= Change internal timer delay [TODO: Implement] --pinballtest= Randomly toggle ports for number of times] --wheel_of_fortune Wheel of fortune implementation [-p |--port=] Ports needed to be used Note: [TODO: Implement] bit codes are in the source code, feel free to drop me an email if you really to need to be in there. Note: has different notations: Numeric value of port 1,2,3,4,5,.. Actual value of port A1,..,A8,B1,..,B8,C1,..,C8 All ports all %(msg)s """ % { 'argv' : sys.argv[0], 'msg' : msg, 'serialport' : opt_serial_port, 'password' : opt_password, 'addresscode' : opt_address_code, 'delay' : opt_delay, 'baudrate' : opt_baudrate, } if exitcode != None: sys.exit(exitcode) def main(): global DEBUG, ser, opt_serial_port, opt_password, opt_address_code, opt_baudrate, opt_delay, opt_pinballtest try: opts, args = getopt.getopt(sys.argv[1:], "adhf:s:t:ro:p:v", ["ampere", "debug", "delay=", "help", "verbose", "serialport=", "port=", "password=", "addresscode=","toggle=","off=", "on=", "status=", "buadrate=", "raw=", "pinballtest=", "wheel_of_fortune"]) except getopt.GetoptError, err: usage(str(err),2) opt_port = None opt_action = None opt_raw = False opt_ampere = False opt_pinballtest = None for o, a in opts: debug("%s : %s" % (o, a)) if o in ["-a", "--ampere"]: opt_ampere = True elif o in ["-d", "--debug"]: DEBUG = True elif o in ["--delay"]: opt_delay = float(a) elif o in ["-h", "--help"]: usage("",0) elif o in ["--addresscode"]: opt_address_code = int(a,16) elif o in ["--password"]: opt_passwd = int(a,16) elif o in ["-p","--port"]: opt_port = a elif o in ["--pinballtest"]: opt_action = 'pinballtest' opt_pinballtest = int(a) elif o in ["--buadrate"]: opt_baudrate = a elif o in ["--serialport"]: opt_serial_port = a elif o in ["-s", "--status"]: opt_action = "status" opt_port = a elif o in ["-t","--toggle"]: opt_action = "toggle" opt_port = a elif o in ["-f","--off"]: opt_action = "off" opt_port = a elif o in ["-r","--raw"]: opt_raw = True elif o in ["-o","--on"]: opt_action = "on" opt_port = a elif o in ["--wheel_of_fortune"]: opt_action = 'wheel_of_fortune' opt_port = "all" else: assert False, "unhandled option" if (opt_port == None): usage("No port defined",2) elif (opt_action == None): usage("No action defined",2) # Resolve port to proper numbers array ports = [] for port in opt_port.split(','): debug("Raw port: %s" % port) if port == "all": ports.extend(range(1,25)) elif port[0] in "ABCabc": print hex_to_port(int(port,16)) ports.append(hex_to_port(int(port,16))) else: ports.append(int(port)) debug("Operating on ports " + str(ports)) # Open serial port ser = serial.Serial(opt_serial_port, opt_baudrate, timeout=5) debug(serial) if opt_action == 'pinballtest': for count in range(0,opt_pinballtest): port = random.choice(ports) print "[%04i] Toggle port %02i" % (count, port) action_toggle_port(port) # Backoff time time.sleep(opt_delay) sys.exit(0) elif opt_action == 'wheel_of_fortune': # First turn all ports off for port in ports: action_port_off(port) port = random.choice(ports) total_time = 0.0 for c in range(1,random.randint(10,200)): # Not all should be evaluated (50%) if random.randint(0,200) > 100: continue action_port_on(port) sleep_time = float(c) / 1000 total_time += sleep_time time.sleep(sleep_time) action_port_off(port) print "[%03i] Port %i (%f)" % (c, port, sleep_time) port = ((port + 1) % 25) if port == 0: port += 1 print "Total time: %f" % total_time # Initial result action_port_on(port) sys.exit(0) # Status needs real integers, hack for port in ports: if opt_action == "status": if opt_raw: print bool_to_str(get_port_status(port),opt_raw), else: ampere_str = "" if opt_ampere: ampere_str = "[%s]" % get_port_ampere(port) print "Port %02i : %s %s" % (port, get_port_status(port), ampere_str) elif opt_action == "toggle": action_toggle_port(port) elif opt_action == "on": action_port_on(port) elif opt_action == "off": action_port_off(port) else: assert False, "Option '%s' invalid" % opt_action # Backoff if we need to be patient time.sleep(opt_delay) sys.stdout.flush() # Be nice and close correctly ser.close() if opt_ampere: if opt_raw: print " ".join(map(str, get_grid_status())) else: print "Grid A: %s" % grid_status[0] print "Grid B: %s" % grid_status[1] print "Grid C: %s" % grid_status[2] if __name__ == "__main__": main()