source: misc/serial-sniffer.py@ 270

Last change on this file since 270 was 153, checked in by Rick van der Zwet, 14 years ago

Some hacks to make it work better

  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 2.9 KB
RevLine 
[145]1#!/usr/bin/env python
2# Serial cable sniffer for discrete (single send and single receive style)
3# serial communications with hex dump. Note you will need a serial cable
4# sniffer (DIY schemes avaiable).
5# HOST -> DEVICE at /dev/ttyUSB0 and DEVICE -> HOST at /dev/ttyUSB1
6#
[147]7# NOTE: Use Cheap USB-to-Serial devices else you might run into trouble with
8# 'smart' buffering and/or not clearing properly failing devices for example:
9# - ID 0557:2008 ATEN International Co., Ltd UC-232A Serial Port [pl2303]
10#
[145]11# $Id: serial-sniffer.py 153 2010-07-15 20:21:48Z rick $
12# Rick van der Zwet <info@rickvanderzwet.nl>
13# license: BSD
14import time
15import signal
16import os
[146]17import sys
[145]18
[148]19DEBUG = True
[147]20
[145]21# Stolen from http://snippets.dzone.com/posts/show/7682
22# license: MIT License
23class Watchdog(Exception):
24 def __init__(self, time=5):
25 self.time = time
26
27 def __enter__(self):
28 signal.signal(signal.SIGALRM, self.handler)
29 signal.alarm(self.time)
30
31 def __exit__(self, type, value, traceback):
32 signal.alarm(0)
33
34 def handler(self, signum, frame):
35 raise self
36
37 def __str__(self):
38 return "The code you executed took more than %ds to complete" % self.time
39
40
[153]41send = open('/dev/ttyUSB0','rb', buffering=0)
42recv = open('/dev/ttyUSB1','rb', buffering=0)
[145]43comment = open('/dev/stdin','r', buffering=0)
44
45def str_to_hex(s):
46 """Hexadecimal string representation of 's'"""
47 return " ".join(["%02x" % ord(x) for x in s])
48
[147]49def perror(s):
50 sys.stderr.write("[ERROR] %s\n" % s)
51
52def pdebug(s):
53 if DEBUG:
54 sys.stderr.write("[DEBUG] %s\n" % s)
55
[145]56def main():
57 while 1:
58
59 # Async USB buffers get all data from sources
60 send_buffer = ""
61 recv_buffer = ""
62 while send_buffer == "" or recv_buffer == "":
63 if send_buffer == "":
[153]64 pdebug("trying to read send_buffer")
[145]65 while 1:
66 try:
[147]67 with Watchdog(1):
[145]68 send_buffer += send.read(1)
69 except Watchdog:
[153]70 pdebug("trying to read send_buffer")
[145]71 break
72 else:
[147]73 pdebug("send_buffer filled %s" % str_to_hex(send_buffer))
[145]74
75 if recv_buffer == "":
[153]76 pdebug("trying to read recv_buffer")
[145]77 while 1:
78 try:
[148]79 with Watchdog(5):
[145]80 recv_buffer += recv.read(1)
81 except Watchdog:
82 break
83 else:
[147]84 pdebug("recv_buffer filled %s" % str_to_hex(recv_buffer))
[145]85
86 # Try to fetch comment if we have any
87 comment_buffer = ""
88 while 1:
89 try:
90 with Watchdog(1):
91 comment_buffer += comment.read(1)
92 except Watchdog:
93 break
94
95 prefix = "[%s] " % time.strftime("%H:%M:%S")
96 print prefix + "Send: " + str_to_hex(send_buffer)
97 print prefix + "Recv: " + str_to_hex(recv_buffer)
98 print prefix + "Comm: " + comment_buffer
99
100 send.close()
101 recv.close()
102
103if __name__ == "__main__":
104 # Little hack to disable output from terminal which will later on be
105 # processed and included in variable
106 os.system('stty -echo')
107 try:
108 main()
[146]109 except KeyboardInterrupt:
[145]110 pass
111 os.system('stty echo')
Note: See TracBrowser for help on using the repository browser.