source: py-tcpmultiplexer/TCPMultiplexer.py@ 313

Last change on this file since 313 was 313, checked in by Rick van der Zwet, 13 years ago

Use argparse for argument parsing instead of some random hacks

  • Property svn:executable set to *
File size: 5.5 KB
Line 
1#!/usr/bin/env python
2"""
3# Multiplexer for HTTP streams, used for broadcast of MPJEG and WAV streams
4#
5# Licence: BSDLike
6#
7# Rick van der Zwet <info@rickvanderzwet.nl>
8"""
9
10import SocketServer
11import argparse
12import logging
13import sys
14import threading
15import time
16import urllib2
17
18# Some boring defaults
19DEFAULT_HOST = '0.0.0.0'
20DEFAULT_PORT = 9999
21DEFAULT_CONFIG = 'streams.yaml'
22
23# URL : TARGET
24DEFAULT_STREAMS = {
25 '/cam1/video' : 'http://172.16.0.67:8080/videofeed',
26 '/cam1/audio' : 'http://172.16.0.67:8080/audio.wav',
27 }
28
29# Global variables used as ring-buffers or shared-storage
30urlheader = dict()
31dataheader = dict()
32urldata = dict()
33running = True
34recv_threads = []
35
36logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
37
38class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
39 pass
40
41
42class MyTCPHandler(SocketServer.BaseRequestHandler):
43 """
44 The RequestHandler class for our server.
45
46 It is instantiated once per connection to the server, and must
47 override the handle() method to implement communication to the
48 client.
49 """
50
51 def handle(self):
52 global running, dataheader, urlheader, urldata
53 # self.request is the TCP socket connected to the client
54 self.data = self.request.recv(1024).strip()
55 print self.data
56 urlid = self.data.split('\n')[0].split()[1]
57 req_type = self.data.split('\n')[0].split()[0]
58 logging.info("Connection %s from '%s' for '%s'", req_type, self.client_address[0], urlid)
59 try:
60 if not urldata.has_key(urlid):
61 self.request.send("HTTP/1.1 404 NOT FOUND\n\nThe page '%s' does not exists" % urlid)
62 else:
63 self.request.send('HTTP/1.1 200 OK\n' + urlheader[urlid] + 'X-Proxy-Server: TCPMultiplexer.py\n' + '\n' + dataheader[urlid])
64 if req_type == 'HEAD':
65 return
66 elif req_type == 'GET':
67 urldata[urlid][self] = []
68 while running:
69 if len(urldata[urlid][self]) == 0:
70 time.sleep(0.1)
71 continue
72 self.request.send(urldata[urlid][self].pop(0))
73 else:
74 assert False, "Invalid request"
75 except IOError:
76 pass
77 except StandardError, e:
78 logging.warning('Server has issues: %s', e)
79 del urldata[urlid][self]
80
81def get_data(url, urlid, *args):
82 global running, dataheader, urlheader, urldata
83 logger = logging.getLogger('recv_id=' + urlid)
84 # Fill buffers if needed
85 logger.info("Starting thread '%s' (%s)", url, urlid)
86 while running:
87 try:
88 f = urllib2.urlopen(url)
89 urlheader[urlid] = ''.join(f.info().headers)
90 urldata[urlid] = dict()
91 datatype = None
92 urltype = f.info().gettype()
93 logger.info("url.info().gettype() is '%s'", urltype)
94 if urltype == 'audio/x-wav':
95 datatype = 'wav'
96 elif urltype == 'multipart/x-mixed-replace':
97 datatype = 'mjeg'
98 else:
99 logger.warning("Cannot find file type of '%s'", url)
100
101 if datatype:
102 logger.info("'%s' Identified as %s", url, datatype)
103
104 if datatype == 'wav':
105 dataheader[urlid] = f.read(44)
106 elif datatype == 'mjeg':
107 dataheader[urlid] = '\n'.join(f.read(100).split('\n')[0:2])
108 else:
109 dataheader[urlid] = ''
110
111 while running:
112 data = f.read(10000)
113 for key in urldata[urlid].keys():
114 urldata[urlid][key].append(data)
115 except IOError, e:
116 #Enforce a connection reset
117 logger.warning("URL reset '%s' (%s)", url, e)
118 del urlheader[urlid]
119 del dataheader[urlid]
120 del urldata[urlid]
121 time.sleep(1)
122 continue
123 logger.info("Closing Thread '%s'", url)
124
125
126if __name__ == "__main__":
127 parser = argparse.ArgumentParser(description=__doc__,formatter_class=argparse.RawDescriptionHelpFormatter)
128 parser.add_argument('--host', dest='host', default=DEFAULT_HOST, help='Listen to IP [default: %s]' % DEFAULT_HOST)
129 parser.add_argument('--port', dest='port', default=DEFAULT_PORT, type=int, help='Listen to PORT [default: %s]' % DEFAULT_PORT)
130 parser.add_argument('--stream-cfg', dest='stream_cfg', default=DEFAULT_CONFIG, help='YAML Stream configuration [default: %s]' % DEFAULT_CONFIG)
131 args = parser.parse_args()
132
133 # Inport streams
134 streams = DEFAULT_STREAMS
135 try:
136 import yaml
137 streams = yaml.load(open(args.stream_cfg))
138 except (ImportError, IOError) as e:
139 logging.warning("Stream config file '%s' not readable or parsable (%s)", args.stream_cfg, e)
140
141
142 # Create the base server
143 try:
144 while True:
145 try:
146 ThreadedTCPServer.allow_reuse_address = True
147 server = ThreadedTCPServer((args.host, args.port), MyTCPHandler)
148 break
149 except IOError, e:
150 logging.warning('For conection %s:%s to become available (%s)', args.host, args.port , e)
151 time.sleep(1)
152 except KeyboardInterrupt:
153 sys.exit(1)
154
155 for urlid, url in streams.iteritems():
156 recv_threads.append(threading.Thread(target=get_data, args=(url,urlid)))
157 recv_threads[-1].setDaemon(True)
158 recv_threads[-1].start()
159
160 # Activate the server; this will keep running until you
161 # interrupt the program with Ctrl-C
162 try:
163 logging.info('Serving at %s:%s', args.host, args.port)
164 server.serve_forever()
165 except KeyboardInterrupt, IOError:
166 logging.info('Shutting down, please wait...')
167 running = False
168 server.shutdown()
169 [thread.join() for thread in recv_threads]
170 logging.info('All done, good bye!')
171
Note: See TracBrowser for help on using the repository browser.