source: misc/serial-sniffer.py@ 150

Last change on this file since 150 was 148, checked in by Rick van der Zwet, 14 years ago

Script production ready

  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 2.9 KB
Line 
1#!/usr/bin/env python
2# Serial cable sniffer for discrete (single send and single receive style)
3# serial communications with hex dump. Note you will need a serial cable
4# sniffer (DIY schemes avaiable).
5# HOST -> DEVICE at /dev/ttyUSB0 and DEVICE -> HOST at /dev/ttyUSB1
6#
7# NOTE: Use Cheap USB-to-Serial devices else you might run into trouble with
8# 'smart' buffering and/or not clearing properly failing devices for example:
9# - ID 0557:2008 ATEN International Co., Ltd UC-232A Serial Port [pl2303]
10#
11# $Id: serial-sniffer.py 148 2010-07-14 23:39:59Z rick $
12# Rick van der Zwet <info@rickvanderzwet.nl>
13# license: BSD
14import time
15import signal
16import os
17import sys
18
19DEBUG = True
20
21# Stolen from http://snippets.dzone.com/posts/show/7682
22# license: MIT License
23class Watchdog(Exception):
24 def __init__(self, time=5):
25 self.time = time
26
27 def __enter__(self):
28 signal.signal(signal.SIGALRM, self.handler)
29 signal.alarm(self.time)
30
31 def __exit__(self, type, value, traceback):
32 signal.alarm(0)
33
34 def handler(self, signum, frame):
35 raise self
36
37 def __str__(self):
38 return "The code you executed took more than %ds to complete" % self.time
39
40
41send = open('/dev/ttyUSB0','r', buffering=0)
42recv = open('/dev/ttyUSB1','r', buffering=0)
43comment = open('/dev/stdin','r', buffering=0)
44
45def str_to_hex(s):
46 """Hexadecimal string representation of 's'"""
47 return " ".join(["%02x" % ord(x) for x in s])
48
49def perror(s):
50 sys.stderr.write("[ERROR] %s\n" % s)
51
52def pdebug(s):
53 if DEBUG:
54 sys.stderr.write("[DEBUG] %s\n" % s)
55
56def main():
57 while 1:
58
59 # Async USB buffers get all data from sources
60 send_buffer = ""
61 recv_buffer = ""
62 while send_buffer == "" or recv_buffer == "":
63 if send_buffer == "":
64 # pdebug("trying to read send_buffer")
65 while 1:
66 try:
67 with Watchdog(1):
68 send_buffer += send.read(1)
69 except Watchdog:
70 break
71 else:
72 pdebug("send_buffer filled %s" % str_to_hex(send_buffer))
73
74 if recv_buffer == "":
75 # pdebug("trying to read recv_buffer")
76 while 1:
77 try:
78 with Watchdog(5):
79 recv_buffer += recv.read(1)
80 except Watchdog:
81 break
82 else:
83 pdebug("recv_buffer filled %s" % str_to_hex(recv_buffer))
84
85 # Try to fetch comment if we have any
86 comment_buffer = ""
87 while 1:
88 try:
89 with Watchdog(1):
90 comment_buffer += comment.read(1)
91 except Watchdog:
92 break
93
94 prefix = "[%s] " % time.strftime("%H:%M:%S")
95 print prefix + "Send: " + str_to_hex(send_buffer)
96 print prefix + "Recv: " + str_to_hex(recv_buffer)
97 print prefix + "Comm: " + comment_buffer
98
99 send.close()
100 recv.close()
101
102if __name__ == "__main__":
103 # Little hack to disable output from terminal which will later on be
104 # processed and included in variable
105 os.system('stty -echo')
106 try:
107 main()
108 except KeyboardInterrupt:
109 pass
110 os.system('stty echo')
Note: See TracBrowser for help on using the repository browser.