1 | #!/usr/bin/env python
|
---|
2 | # Serial cable sniffer for discrete (single send and single receive style)
|
---|
3 | # serial communications with hex dump. Note you will need a serial cable
|
---|
4 | # sniffer (DIY schemes avaiable).
|
---|
5 | # HOST -> DEVICE at /dev/ttyUSB0 and DEVICE -> HOST at /dev/ttyUSB1
|
---|
6 | #
|
---|
7 | # NOTE: Use Cheap USB-to-Serial devices else you might run into trouble with
|
---|
8 | # 'smart' buffering and/or not clearing properly failing devices for example:
|
---|
9 | # - ID 0557:2008 ATEN International Co., Ltd UC-232A Serial Port [pl2303]
|
---|
10 | #
|
---|
11 | # $Id: serial-sniffer.py 153 2010-07-15 20:21:48Z rick $
|
---|
12 | # Rick van der Zwet <info@rickvanderzwet.nl>
|
---|
13 | # license: BSD
|
---|
14 | import time
|
---|
15 | import signal
|
---|
16 | import os
|
---|
17 | import sys
|
---|
18 |
|
---|
19 | DEBUG = True
|
---|
20 |
|
---|
21 | # Stolen from http://snippets.dzone.com/posts/show/7682
|
---|
22 | # license: MIT License
|
---|
23 | class Watchdog(Exception):
|
---|
24 | def __init__(self, time=5):
|
---|
25 | self.time = time
|
---|
26 |
|
---|
27 | def __enter__(self):
|
---|
28 | signal.signal(signal.SIGALRM, self.handler)
|
---|
29 | signal.alarm(self.time)
|
---|
30 |
|
---|
31 | def __exit__(self, type, value, traceback):
|
---|
32 | signal.alarm(0)
|
---|
33 |
|
---|
34 | def handler(self, signum, frame):
|
---|
35 | raise self
|
---|
36 |
|
---|
37 | def __str__(self):
|
---|
38 | return "The code you executed took more than %ds to complete" % self.time
|
---|
39 |
|
---|
40 |
|
---|
41 | send = open('/dev/ttyUSB0','rb', buffering=0)
|
---|
42 | recv = open('/dev/ttyUSB1','rb', buffering=0)
|
---|
43 | comment = open('/dev/stdin','r', buffering=0)
|
---|
44 |
|
---|
45 | def str_to_hex(s):
|
---|
46 | """Hexadecimal string representation of 's'"""
|
---|
47 | return " ".join(["%02x" % ord(x) for x in s])
|
---|
48 |
|
---|
49 | def perror(s):
|
---|
50 | sys.stderr.write("[ERROR] %s\n" % s)
|
---|
51 |
|
---|
52 | def pdebug(s):
|
---|
53 | if DEBUG:
|
---|
54 | sys.stderr.write("[DEBUG] %s\n" % s)
|
---|
55 |
|
---|
56 | def main():
|
---|
57 | while 1:
|
---|
58 |
|
---|
59 | # Async USB buffers get all data from sources
|
---|
60 | send_buffer = ""
|
---|
61 | recv_buffer = ""
|
---|
62 | while send_buffer == "" or recv_buffer == "":
|
---|
63 | if send_buffer == "":
|
---|
64 | pdebug("trying to read send_buffer")
|
---|
65 | while 1:
|
---|
66 | try:
|
---|
67 | with Watchdog(1):
|
---|
68 | send_buffer += send.read(1)
|
---|
69 | except Watchdog:
|
---|
70 | pdebug("trying to read send_buffer")
|
---|
71 | break
|
---|
72 | else:
|
---|
73 | pdebug("send_buffer filled %s" % str_to_hex(send_buffer))
|
---|
74 |
|
---|
75 | if recv_buffer == "":
|
---|
76 | pdebug("trying to read recv_buffer")
|
---|
77 | while 1:
|
---|
78 | try:
|
---|
79 | with Watchdog(5):
|
---|
80 | recv_buffer += recv.read(1)
|
---|
81 | except Watchdog:
|
---|
82 | break
|
---|
83 | else:
|
---|
84 | pdebug("recv_buffer filled %s" % str_to_hex(recv_buffer))
|
---|
85 |
|
---|
86 | # Try to fetch comment if we have any
|
---|
87 | comment_buffer = ""
|
---|
88 | while 1:
|
---|
89 | try:
|
---|
90 | with Watchdog(1):
|
---|
91 | comment_buffer += comment.read(1)
|
---|
92 | except Watchdog:
|
---|
93 | break
|
---|
94 |
|
---|
95 | prefix = "[%s] " % time.strftime("%H:%M:%S")
|
---|
96 | print prefix + "Send: " + str_to_hex(send_buffer)
|
---|
97 | print prefix + "Recv: " + str_to_hex(recv_buffer)
|
---|
98 | print prefix + "Comm: " + comment_buffer
|
---|
99 |
|
---|
100 | send.close()
|
---|
101 | recv.close()
|
---|
102 |
|
---|
103 | if __name__ == "__main__":
|
---|
104 | # Little hack to disable output from terminal which will later on be
|
---|
105 | # processed and included in variable
|
---|
106 | os.system('stty -echo')
|
---|
107 | try:
|
---|
108 | main()
|
---|
109 | except KeyboardInterrupt:
|
---|
110 | pass
|
---|
111 | os.system('stty echo')
|
---|