Changeset 312
- Timestamp:
- Jul 19, 2011, 12:38:06 PM (13 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
py-tcpmultiplexer/TCPMultiplexer.py
r311 r312 7 7 # Rick van der Zwet <info@rickvanderzwet.nl> 8 8 # 9 import threading10 import urllib211 import time12 9 import SocketServer 10 import argparse 13 11 import logging 14 12 import sys 13 import threading 14 import time 15 import urllib2 15 16 16 urlheaders = dict() 17 # Some boring defaults 18 DEFAULT_HOST = '0.0.0.0' 19 DEFAULT_PORT = '9999' 20 21 # Global variables used as ring-buffers or shared-storage 22 urlheader = dict() 23 dataheader = dict() 17 24 urldata = dict() 18 25 running = True … … 35 42 36 43 def handle(self): 37 global running, urlheaders, urldata44 global running, dataheader, urlheader, urldata 38 45 # self.request is the TCP socket connected to the client 39 46 self.data = self.request.recv(1024).strip() … … 43 50 logging.info("Connection %s from '%s' for '%s'", req_type, self.client_address[0], urlid) 44 51 try: 45 self.request.send('HTTP/1.1 200 OK\n' + urlheaders[urlid] + '\n') 46 if req_type == 'HEAD': 47 return 48 elif req_type == 'GET': 49 urldata[urlid][self] = [] 50 while running: 51 if len(urldata[urlid][self]) == 0: 52 time.sleep(0.1) 53 continue 54 self.request.send(urldata[urlid][self].pop(0)) 52 if not urldata.has_key(urlid): 53 self.request.send("HTTP/1.1 404 NOT FOUND\n\nThe page '%s' does not exists" % urlid) 55 54 else: 56 assert False, "Invalid request" 55 self.request.send('HTTP/1.1 200 OK\n' + urlheader[urlid] + 'X-Proxy-Server: TCPMultiplexer.py\n' + '\n' + dataheader[urlid]) 56 if req_type == 'HEAD': 57 return 58 elif req_type == 'GET': 59 urldata[urlid][self] = [] 60 while running: 61 if len(urldata[urlid][self]) == 0: 62 time.sleep(0.1) 63 continue 64 self.request.send(urldata[urlid][self].pop(0)) 65 else: 66 assert False, "Invalid request" 57 67 except IOError: 58 68 pass 59 69 except StandardError, e: 60 logging.warning('Server has issues: %s' %e)70 logging.warning('Server has issues: %s', e) 61 71 del urldata[urlid][self] 62 72 63 73 def get_data(url, urlid, *args): 64 global running, urlheaders, urldata 74 global running, dataheader, urlheader, urldata 75 logger = logging.getLogger('recv_id=' + urlid) 65 76 # Fill buffers if needed 66 logg ing.info("Starting thread '%s' (%s)" % (url, urlid))77 logger.info("Starting thread '%s' (%s)", url, urlid) 67 78 while running: 68 79 try: 69 80 f = urllib2.urlopen(url) 70 urlheader s[urlid] = ''.join(f.info().headers)81 urlheader[urlid] = ''.join(f.info().headers) 71 82 urldata[urlid] = dict() 83 datatype = None 84 urltype = f.info().gettype() 85 logger.info("url.info().gettype() is '%s'", urltype) 86 if urltype == 'audio/x-wav': 87 datatype = 'wav' 88 elif urltype == 'multipart/x-mixed-replace': 89 datatype = 'mjeg' 90 else: 91 logger.warning("Cannot find file type of '%s'", url) 92 93 if datatype: 94 logger.info("'%s' Identified as %s", url, datatype) 95 96 if datatype == 'wav': 97 dataheader[urlid] = f.read(44) 98 elif datatype == 'mjeg': 99 dataheader[urlid] = '\n'.join(f.read(100).split('\n')[0:2]) 100 else: 101 dataheader[urlid] = '' 102 72 103 while running: 73 104 data = f.read(10000) 74 105 for key in urldata[urlid].keys(): 75 106 urldata[urlid][key].append(data) 76 except UrlError:107 except IOError, e: 77 108 #Enforce a connection reset 78 logging.warning("URL reset '$s'" % url) 79 del urlheaders[key] 80 del urldata[key] 109 logger.warning("URL reset '%s' (%s)", url, e) 110 del urlheader[urlid] 111 del dataheader[urlid] 112 del urldata[urlid] 81 113 time.sleep(1) 82 114 continue 83 logg ing.info("Closing Thread '%s'" %url)115 logger.info("Closing Thread '%s'", url) 84 116 85 117 86 118 if __name__ == "__main__": 119 parser = argparse.ArgumentParser(description='Process some integers.') 120 121 # URL : TARGET 122 DEFAULT_STREAMS = { 123 '/cam1/video' : 'http://172.16.0.67:8080/videofeed', 124 '/cam1/audio' : 'http://172.16.0.67:8080/audio.wav', 125 } 126 # Flexible enough to specify using yaml file (sys.argv) as well 127 if len(sys.argv) > 1: 128 config_file = sys.argv[1] 129 try: 130 import yaml 131 streams = yaml.load(open(config_file)) 132 except (ImportError, IOError) as e: 133 logging.error("Config file '%s' not readable or parsable (%s)", config_file, e) 134 135 87 136 HOST, PORT = "0.0.0.0", 9999 88 137 # Create the server … … 94 143 break 95 144 except IOError, e: 96 logging.warning('For conection %s:%s to become available (%s)' % (HOST, PORT, e))145 logging.warning('For conection %s:%s to become available (%s)', HOST, PORT, e) 97 146 time.sleep(1) 98 147 except KeyboardInterrupt: 99 148 sys.exit(1) 100 149 101 url = 'http://172.16.0.67:8080/videofeed'102 recv_threads.append(threading.Thread(target=get_data, args=(url,'/video1')))103 recv_threads[-1].setDaemon(True)104 recv_threads[-1].start()150 for urlid, url in streams.iteritems(): 151 recv_threads.append(threading.Thread(target=get_data, args=(url,urlid))) 152 recv_threads[-1].setDaemon(True) 153 recv_threads[-1].start() 105 154 106 155 # Activate the server; this will keep running until you 107 156 # interrupt the program with Ctrl-C 108 157 try: 109 logging.info('Serving at %s:%s' % (HOST,PORT))158 logging.info('Serving at %s:%s', HOST,PORT) 110 159 server.serve_forever() 111 160 except KeyboardInterrupt, IOError:
Note:
See TracChangeset
for help on using the changeset viewer.