[145] | 1 | #!/usr/bin/env python
|
---|
| 2 | # Serial cable sniffer for discrete (single send and single receive style)
|
---|
| 3 | # serial communications with hex dump. Note you will need a serial cable
|
---|
| 4 | # sniffer (DIY schemes avaiable).
|
---|
| 5 | # HOST -> DEVICE at /dev/ttyUSB0 and DEVICE -> HOST at /dev/ttyUSB1
|
---|
| 6 | #
|
---|
[147] | 7 | # NOTE: Use Cheap USB-to-Serial devices else you might run into trouble with
|
---|
| 8 | # 'smart' buffering and/or not clearing properly failing devices for example:
|
---|
| 9 | # - ID 0557:2008 ATEN International Co., Ltd UC-232A Serial Port [pl2303]
|
---|
| 10 | #
|
---|
[145] | 11 | # $Id: serial-sniffer.py 153 2010-07-15 20:21:48Z rick $
|
---|
| 12 | # Rick van der Zwet <info@rickvanderzwet.nl>
|
---|
| 13 | # license: BSD
|
---|
| 14 | import time
|
---|
| 15 | import signal
|
---|
| 16 | import os
|
---|
[146] | 17 | import sys
|
---|
[145] | 18 |
|
---|
[148] | 19 | DEBUG = True
|
---|
[147] | 20 |
|
---|
[145] | 21 | # Stolen from http://snippets.dzone.com/posts/show/7682
|
---|
| 22 | # license: MIT License
|
---|
| 23 | class Watchdog(Exception):
|
---|
| 24 | def __init__(self, time=5):
|
---|
| 25 | self.time = time
|
---|
| 26 |
|
---|
| 27 | def __enter__(self):
|
---|
| 28 | signal.signal(signal.SIGALRM, self.handler)
|
---|
| 29 | signal.alarm(self.time)
|
---|
| 30 |
|
---|
| 31 | def __exit__(self, type, value, traceback):
|
---|
| 32 | signal.alarm(0)
|
---|
| 33 |
|
---|
| 34 | def handler(self, signum, frame):
|
---|
| 35 | raise self
|
---|
| 36 |
|
---|
| 37 | def __str__(self):
|
---|
| 38 | return "The code you executed took more than %ds to complete" % self.time
|
---|
| 39 |
|
---|
| 40 |
|
---|
[153] | 41 | send = open('/dev/ttyUSB0','rb', buffering=0)
|
---|
| 42 | recv = open('/dev/ttyUSB1','rb', buffering=0)
|
---|
[145] | 43 | comment = open('/dev/stdin','r', buffering=0)
|
---|
| 44 |
|
---|
| 45 | def str_to_hex(s):
|
---|
| 46 | """Hexadecimal string representation of 's'"""
|
---|
| 47 | return " ".join(["%02x" % ord(x) for x in s])
|
---|
| 48 |
|
---|
[147] | 49 | def perror(s):
|
---|
| 50 | sys.stderr.write("[ERROR] %s\n" % s)
|
---|
| 51 |
|
---|
| 52 | def pdebug(s):
|
---|
| 53 | if DEBUG:
|
---|
| 54 | sys.stderr.write("[DEBUG] %s\n" % s)
|
---|
| 55 |
|
---|
[145] | 56 | def main():
|
---|
| 57 | while 1:
|
---|
| 58 |
|
---|
| 59 | # Async USB buffers get all data from sources
|
---|
| 60 | send_buffer = ""
|
---|
| 61 | recv_buffer = ""
|
---|
| 62 | while send_buffer == "" or recv_buffer == "":
|
---|
| 63 | if send_buffer == "":
|
---|
[153] | 64 | pdebug("trying to read send_buffer")
|
---|
[145] | 65 | while 1:
|
---|
| 66 | try:
|
---|
[147] | 67 | with Watchdog(1):
|
---|
[145] | 68 | send_buffer += send.read(1)
|
---|
| 69 | except Watchdog:
|
---|
[153] | 70 | pdebug("trying to read send_buffer")
|
---|
[145] | 71 | break
|
---|
| 72 | else:
|
---|
[147] | 73 | pdebug("send_buffer filled %s" % str_to_hex(send_buffer))
|
---|
[145] | 74 |
|
---|
| 75 | if recv_buffer == "":
|
---|
[153] | 76 | pdebug("trying to read recv_buffer")
|
---|
[145] | 77 | while 1:
|
---|
| 78 | try:
|
---|
[148] | 79 | with Watchdog(5):
|
---|
[145] | 80 | recv_buffer += recv.read(1)
|
---|
| 81 | except Watchdog:
|
---|
| 82 | break
|
---|
| 83 | else:
|
---|
[147] | 84 | pdebug("recv_buffer filled %s" % str_to_hex(recv_buffer))
|
---|
[145] | 85 |
|
---|
| 86 | # Try to fetch comment if we have any
|
---|
| 87 | comment_buffer = ""
|
---|
| 88 | while 1:
|
---|
| 89 | try:
|
---|
| 90 | with Watchdog(1):
|
---|
| 91 | comment_buffer += comment.read(1)
|
---|
| 92 | except Watchdog:
|
---|
| 93 | break
|
---|
| 94 |
|
---|
| 95 | prefix = "[%s] " % time.strftime("%H:%M:%S")
|
---|
| 96 | print prefix + "Send: " + str_to_hex(send_buffer)
|
---|
| 97 | print prefix + "Recv: " + str_to_hex(recv_buffer)
|
---|
| 98 | print prefix + "Comm: " + comment_buffer
|
---|
| 99 |
|
---|
| 100 | send.close()
|
---|
| 101 | recv.close()
|
---|
| 102 |
|
---|
| 103 | if __name__ == "__main__":
|
---|
| 104 | # Little hack to disable output from terminal which will later on be
|
---|
| 105 | # processed and included in variable
|
---|
| 106 | os.system('stty -echo')
|
---|
| 107 | try:
|
---|
| 108 | main()
|
---|
[146] | 109 | except KeyboardInterrupt:
|
---|
[145] | 110 | pass
|
---|
| 111 | os.system('stty echo')
|
---|