Changeset 286 for misc/tailtime
- Timestamp:
- Mar 10, 2011, 8:44:46 AM (14 years ago)
- File:
-
- 1 copied
Legend:
- Unmodified
- Added
- Removed
-
misc/tailtime
r242 r286 1 1 #!/usr/bin/env python 2 # Serial cable sniffer for discrete (single send and single receive style) 3 # serial communications with hex dump. Note you will need a serial cable 4 # sniffer (DIY schemes avaiable). 5 # HOST -> DEVICE at /dev/ttyUSB0 and DEVICE -> HOST at /dev/ttyUSB1 6 # 7 # NOTE: Use Cheap USB-to-Serial devices else you might run into trouble with 8 # 'smart' buffering and/or not clearing properly failing devices for example: 9 # - ID 0557:2008 ATEN International Co., Ltd UC-232A Serial Port [pl2303] 10 # 2 # 3 # Prefix your (log) files with a timestamp 4 # 11 5 # $Id$ 12 6 # Rick van der Zwet <info@rickvanderzwet.nl> 13 7 # license: BSD 14 8 import time 15 import s ignal9 import sys 16 10 import os 17 import sys18 11 19 DEBUG = True 20 21 # Stolen from http://snippets.dzone.com/posts/show/7682 22 # license: MIT License 23 class Watchdog(Exception): 24 def __init__(self, time=5): 25 self.time = time 26 27 def __enter__(self): 28 signal.signal(signal.SIGALRM, self.handler) 29 signal.alarm(self.time) 30 31 def __exit__(self, type, value, traceback): 32 signal.alarm(0) 33 34 def handler(self, signum, frame): 35 raise self 36 37 def __str__(self): 38 return "The code you executed took more than %ds to complete" % self.time 39 40 41 send = open('/dev/ttyUSB0','rb', buffering=0) 42 recv = open('/dev/ttyUSB1','rb', buffering=0) 43 comment = open('/dev/stdin','r', buffering=0) 44 45 def str_to_hex(s): 46 """Hexadecimal string representation of 's'""" 47 return " ".join(["%02x" % ord(x) for x in s]) 48 49 def perror(s): 50 sys.stderr.write("[ERROR] %s\n" % s) 51 52 def pdebug(s): 53 if DEBUG: 54 sys.stderr.write("[DEBUG] %s\n" % s) 55 56 def main(): 57 while 1: 58 59 # Async USB buffers get all data from sources 60 send_buffer = "" 61 recv_buffer = "" 62 while send_buffer == "" or recv_buffer == "": 63 if send_buffer == "": 64 pdebug("trying to read send_buffer") 65 while 1: 66 try: 67 with Watchdog(1): 68 send_buffer += send.read(1) 69 except Watchdog: 70 pdebug("trying to read send_buffer") 71 break 72 else: 73 pdebug("send_buffer filled %s" % str_to_hex(send_buffer)) 74 75 if recv_buffer == "": 76 pdebug("trying to read recv_buffer") 77 while 1: 78 try: 79 with Watchdog(5): 80 recv_buffer += recv.read(1) 81 except Watchdog: 82 break 83 else: 84 pdebug("recv_buffer filled %s" % str_to_hex(recv_buffer)) 85 86 # Try to fetch comment if we have any 87 comment_buffer = "" 88 while 1: 89 try: 90 with Watchdog(1): 91 comment_buffer += comment.read(1) 92 except Watchdog: 93 break 94 95 prefix = "[%s] " % time.strftime("%H:%M:%S") 96 print prefix + "Send: " + str_to_hex(send_buffer) 97 print prefix + "Recv: " + str_to_hex(recv_buffer) 98 print prefix + "Comm: " + comment_buffer 99 100 send.close() 101 recv.close() 12 # Really make it unbuffered 13 sys.stdin = os.fdopen(sys.stdin.fileno(), 'r', 0) 14 sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0) 102 15 103 16 if __name__ == "__main__": 104 # Little hack to disable output from terminal which will later on be105 # processed and included in variable106 os.system('stty -echo')107 17 try: 108 main() 109 except KeyboardInterrupt: 110 pass 111 os.system('stty echo') 18 strftime = sys.argv[1] 19 except IndexError,ValueError: 20 strftime = '%H:%M:%S' 21 try: 22 while 1: 23 line = sys.stdin.readline() 24 #XXX: Not some kind of smarter way to detect EOF? 25 if len(line) == 0: 26 break 27 prefix = "[%s] " % time.strftime(strftime) 28 print prefix + line, 29 except KeyboardInterrupt: 30 pass
Note:
See TracChangeset
for help on using the changeset viewer.