#!/usr/bin/env python # Serial cable sniffer for discrete (single send and single receive style) # serial communications with hex dump. Note you will need a serial cable # sniffer (DIY schemes avaiable). # HOST -> DEVICE at /dev/ttyUSB0 and DEVICE -> HOST at /dev/ttyUSB1 # # NOTE: Use Cheap USB-to-Serial devices else you might run into trouble with # 'smart' buffering and/or not clearing properly failing devices for example: # - ID 0557:2008 ATEN International Co., Ltd UC-232A Serial Port [pl2303] # # $Id: serial-sniffer.py 153 2010-07-15 20:21:48Z rick $ # Rick van der Zwet # license: BSD import time import signal import os import sys DEBUG = True # Stolen from http://snippets.dzone.com/posts/show/7682 # license: MIT License class Watchdog(Exception): def __init__(self, time=5): self.time = time def __enter__(self): signal.signal(signal.SIGALRM, self.handler) signal.alarm(self.time) def __exit__(self, type, value, traceback): signal.alarm(0) def handler(self, signum, frame): raise self def __str__(self): return "The code you executed took more than %ds to complete" % self.time send = open('/dev/ttyUSB0','rb', buffering=0) recv = open('/dev/ttyUSB1','rb', buffering=0) comment = open('/dev/stdin','r', buffering=0) def str_to_hex(s): """Hexadecimal string representation of 's'""" return " ".join(["%02x" % ord(x) for x in s]) def perror(s): sys.stderr.write("[ERROR] %s\n" % s) def pdebug(s): if DEBUG: sys.stderr.write("[DEBUG] %s\n" % s) def main(): while 1: # Async USB buffers get all data from sources send_buffer = "" recv_buffer = "" while send_buffer == "" or recv_buffer == "": if send_buffer == "": pdebug("trying to read send_buffer") while 1: try: with Watchdog(1): send_buffer += send.read(1) except Watchdog: pdebug("trying to read send_buffer") break else: pdebug("send_buffer filled %s" % str_to_hex(send_buffer)) if recv_buffer == "": pdebug("trying to read recv_buffer") while 1: try: with Watchdog(5): recv_buffer += recv.read(1) except Watchdog: break else: pdebug("recv_buffer filled %s" % str_to_hex(recv_buffer)) # Try to fetch comment if we have any comment_buffer = "" while 1: try: with Watchdog(1): comment_buffer += comment.read(1) except Watchdog: break prefix = "[%s] " % time.strftime("%H:%M:%S") print prefix + "Send: " + str_to_hex(send_buffer) print prefix + "Recv: " + str_to_hex(recv_buffer) print prefix + "Comm: " + comment_buffer send.close() recv.close() if __name__ == "__main__": # Little hack to disable output from terminal which will later on be # processed and included in variable os.system('stty -echo') try: main() except KeyboardInterrupt: pass os.system('stty echo')