1 | #!/usr/bin/env python
|
---|
2 | # Serial cable sniffer for discrete (single send and single receive style)
|
---|
3 | # serial communications with hex dump. Note you will need a serial cable
|
---|
4 | # sniffer (DIY schemes avaiable).
|
---|
5 | # HOST -> DEVICE at /dev/ttyUSB0 and DEVICE -> HOST at /dev/ttyUSB1
|
---|
6 | #
|
---|
7 | # $Id: serial-sniffer.py 145 2010-07-14 22:32:18Z rick $
|
---|
8 | # Rick van der Zwet <info@rickvanderzwet.nl>
|
---|
9 | # license: BSD
|
---|
10 | import time
|
---|
11 | import signal
|
---|
12 | import os
|
---|
13 |
|
---|
14 | # Stolen from http://snippets.dzone.com/posts/show/7682
|
---|
15 | # license: MIT License
|
---|
16 | class Watchdog(Exception):
|
---|
17 | def __init__(self, time=5):
|
---|
18 | self.time = time
|
---|
19 |
|
---|
20 | def __enter__(self):
|
---|
21 | signal.signal(signal.SIGALRM, self.handler)
|
---|
22 | signal.alarm(self.time)
|
---|
23 |
|
---|
24 | def __exit__(self, type, value, traceback):
|
---|
25 | signal.alarm(0)
|
---|
26 |
|
---|
27 | def handler(self, signum, frame):
|
---|
28 | raise self
|
---|
29 |
|
---|
30 | def __str__(self):
|
---|
31 | return "The code you executed took more than %ds to complete" % self.time
|
---|
32 |
|
---|
33 |
|
---|
34 | send = open('/dev/ttyUSB0','r', buffering=0)
|
---|
35 | recv = open('/dev/ttyUSB1','r', buffering=0)
|
---|
36 | comment = open('/dev/stdin','r', buffering=0)
|
---|
37 |
|
---|
38 | def str_to_hex(s):
|
---|
39 | """Hexadecimal string representation of 's'"""
|
---|
40 | return " ".join(["%02x" % ord(x) for x in s])
|
---|
41 |
|
---|
42 | def main():
|
---|
43 | while 1:
|
---|
44 |
|
---|
45 | # Async USB buffers get all data from sources
|
---|
46 | send_buffer = ""
|
---|
47 | recv_buffer = ""
|
---|
48 | while send_buffer == "" or recv_buffer == "":
|
---|
49 | if send_buffer == "":
|
---|
50 | while 1:
|
---|
51 | try:
|
---|
52 | with Watchdog(3):
|
---|
53 | send_buffer += send.read(1)
|
---|
54 | except Watchdog:
|
---|
55 | break
|
---|
56 | else:
|
---|
57 | print "# send_buffer filled %s" % str_to_hex(send_buffer)
|
---|
58 |
|
---|
59 | if recv_buffer == "":
|
---|
60 | while 1:
|
---|
61 | try:
|
---|
62 | with Watchdog(3):
|
---|
63 | recv_buffer += recv.read(1)
|
---|
64 | except Watchdog:
|
---|
65 | break
|
---|
66 | else:
|
---|
67 | print "# recv_buffer filled %s" % str_to_hex(recv_buffer)
|
---|
68 |
|
---|
69 | # Try to fetch comment if we have any
|
---|
70 | comment_buffer = ""
|
---|
71 | while 1:
|
---|
72 | try:
|
---|
73 | with Watchdog(1):
|
---|
74 | comment_buffer += comment.read(1)
|
---|
75 | except Watchdog:
|
---|
76 | break
|
---|
77 |
|
---|
78 | prefix = "[%s] " % time.strftime("%H:%M:%S")
|
---|
79 | print prefix + "Send: " + str_to_hex(send_buffer)
|
---|
80 | print prefix + "Recv: " + str_to_hex(recv_buffer)
|
---|
81 | print prefix + "Comm: " + comment_buffer
|
---|
82 |
|
---|
83 | send.close()
|
---|
84 | recv.close()
|
---|
85 |
|
---|
86 | if __name__ == "__main__":
|
---|
87 | # Little hack to disable output from terminal which will later on be
|
---|
88 | # processed and included in variable
|
---|
89 | os.system('stty -echo')
|
---|
90 | try:
|
---|
91 | main()
|
---|
92 | except:
|
---|
93 | pass
|
---|
94 | os.system('stty echo')
|
---|