source: py-tcpmultiplexer/TCPMultiplexer.py@ 324

Last change on this file since 324 was 324, checked in by Rick van der Zwet, 13 years ago

Be smarter about the datasize

  • Property svn:executable set to *
File size: 8.3 KB
Line 
1#!/usr/bin/env python
2"""
3# Multiplexer for HTTP streams, used for broadcast of MPJEG and WAV streams
4#
5# Licence: BSDLike
6# URL: http://rickvanderzwet.nl/svn/personal/py-tcpmultiplexer/
7#
8# Rick van der Zwet <info@rickvanderzwet.nl>
9"""
10
11import SocketServer
12import argparse
13import logging
14import os
15import socket
16import sys
17import threading
18import time
19import urllib2
20
21# Some boring defaults
22DEFAULT_HOST = '0.0.0.0'
23DEFAULT_PORT = 9999
24DEFAULT_CONFIG = 'streams.yaml'
25DEFAULT_DOCUMENTROOT = './htdocs'
26DEFAULT_TIMEOUT = 2
27
28
29# URL : TARGET
30DEFAULT_STREAMS = {
31 '/cam1/video' : 'http://172.16.0.67:8080/videofeed',
32 '/cam1/audio' : 'http://172.16.0.67:8080/audio.wav',
33 }
34
35
36# Global variables used as ring-buffers or shared-storage
37urlheader = dict()
38dataheader = dict()
39urldata = dict()
40running = True
41recv_threads = []
42document_root = None
43
44logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
45
46class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
47 pass
48
49
50class MyTCPHandler(SocketServer.BaseRequestHandler):
51 """
52 The RequestHandler class for our server.
53
54 It is instantiated once per connection to the server, and must
55 override the handle() method to implement communication to the
56 client.
57 """
58
59 def handle(self):
60 global running, dataheader, document_root, urlheader, urldata
61 # self.request is the TCP socket connected to the client
62 self.data = self.request.recv(1024).strip()
63 urlid = self.data.split('\n')[0].split()[1]
64 req_type = self.data.split('\n')[0].split()[0]
65 # XXX: Check for .. paths
66 document_file = document_root + urlid
67 logging.info("Connection %s from '%s' for '%s'", req_type, self.client_address[0], urlid)
68 try:
69 if urlid == '/':
70 self.request.send("HTTP/1.1 200 OK\n\n<html><head><title>Overview</title></head><body><h3>Streams</h3><ul>")
71 for urlid in sorted(urldata.keys()):
72 self.request.send('<li><a href="%(urlid)s">%(urlid)s</a></li>' % {'urlid': urlid})
73 self.request.send("</ul><h3>Files</h3><ul>")
74 for root, dirs, files in os.walk(document_root):
75 # Please do not show any hidden files or directories
76 [dirs.remove(name) for name in dirs if name.startswith('.')]
77 [files.remove(name) for name in files if name.startswith('.')]
78 for name in sorted(files):
79 self.request.send('<li><a href="%(item)s">%(item)s</li>' % { 'item' : os.path.join(root.replace(document_root,'',1),name) })
80 self.request.send("</ul></body></html>")
81 elif urldata.has_key(urlid):
82 self.request.send('HTTP/1.1 200 OK\n' + urlheader[urlid] + 'X-Proxy-Server: TCPMultiplexer.py\n' + '\n' + dataheader[urlid])
83 if req_type == 'HEAD':
84 return
85 elif req_type == 'GET':
86 urldata[urlid][self] = []
87 while running:
88 if len(urldata[urlid][self]) == 0:
89 time.sleep(0.1)
90 continue
91 self.request.send(urldata[urlid][self].pop(0))
92 elif os.path.isfile(document_file):
93 self.request.send('HTTP/1.1 200 OK\n\n' + open(document_file,'r').read())
94 else:
95 self.request.send("HTTP/1.1 404 NOT FOUND\nContent-Type: text/html\n\n<h1>404</h1>The page '%s' does not exists" % urlid)
96 except IOError:
97 logging.info("Connection closed from '%s'", self.client_address[0])
98 del urldata[urlid][self]
99
100
101
102def get_data(url, urlid, *args):
103 """ Fetch the DATA from the WAV or MJPEG Stream """
104 global running, dataheader, urlheader, urldata
105 logger = logging.getLogger('recv_id=' + urlid)
106 # Fill buffers if needed
107 logger.info("Starting thread '%s' (%s)", url, urlid)
108 while running:
109 urlheader[urlid] = None
110 dataheader[urlid] = None
111 urldata[urlid] = None
112 try:
113 f = urllib2.urlopen(url)
114 urlheader[urlid] = ''.join(f.info().headers)
115 urldata[urlid] = dict()
116
117 # Find datatype
118 datatype = None
119 urltype = f.info().gettype()
120 logger.info("url.info().gettype() is '%s'", urltype)
121 if urltype == 'audio/x-wav':
122 datatype = 'wav'
123 elif urltype == 'multipart/x-mixed-replace':
124 datatype = 'mjpeg'
125
126 # Be verbose to the user
127 if datatype:
128 logger.info("'%s' Identified as %s", url, datatype)
129 else:
130 logger.warning("Cannot find file type of '%s'", url)
131
132 # Set the correct data header
133 if datatype == 'wav':
134 # WAV header
135 dataheader[urlid] = f.read(44)
136 elif datatype == 'mjpeg':
137 data = f.read(1024)
138
139 # Get the required headers and acurate datasize
140 headers = []
141 datasize = 1024 * 1024
142 for header in data.splitlines():
143 if not header.strip():
144 # Newlines in the beginning are evil
145 if headers:
146 break
147 else:
148 continue
149 if header.startswith('DataLen:') or header.startswith('Content-length:'):
150 datasize = int(header.split(':')[1])
151 headers.append(header)
152 boundry = headers[0]
153
154 logger.info("Data Length: %s", datasize)
155 logger.info("Boundry line: %s", boundry)
156 logger.info("Image headers %s", headers)
157
158 data = data + f.read(datasize * 2)
159 valid_image = boundry + data.split(boundry)[1]
160 dataheader[urlid] = valid_image + '\n'.join(headers) + '\n'
161 else:
162 dataheader[urlid] = ''
163
164 logger.info("Using dataheader of length %s", len(dataheader[urlid]))
165
166 while running:
167 data = f.read(10000)
168 # logger.debug("Received data chunk with length: %s", len(data))
169 for key in urldata[urlid].keys():
170 urldata[urlid][key].append(data)
171 except IOError, e:
172 #Enforce a connection reset
173 logger.warning("URL reset '%s' (%s)", url, e)
174 del urlheader[urlid]
175 del dataheader[urlid]
176 del urldata[urlid]
177 time.sleep(1)
178 continue
179 logger.info("Closing Thread '%s'", url)
180
181
182if __name__ == "__main__":
183 parser = argparse.ArgumentParser(description=__doc__,formatter_class=argparse.RawDescriptionHelpFormatter)
184 parser.add_argument('--host', dest='host', default=DEFAULT_HOST, help='Listen to IP [default: %s]' % DEFAULT_HOST)
185 parser.add_argument('--port', dest='port', default=DEFAULT_PORT, type=int, help='Listen to PORT [default: %s]' % DEFAULT_PORT)
186 parser.add_argument('--timeout', dest='timeout', default=DEFAULT_TIMEOUT, type=int, help='Default socket timeout [default: %s]' % DEFAULT_TIMEOUT)
187 parser.add_argument('--stream-cfg', dest='stream_cfg', default=DEFAULT_CONFIG, help='YAML Stream configuration [default: %s]' % DEFAULT_CONFIG)
188 parser.add_argument('--document-root', dest='document_root', default=DEFAULT_DOCUMENTROOT, help='Document Root File Directory [default: %s]' % DEFAULT_DOCUMENTROOT)
189 args = parser.parse_args()
190
191 document_root = os.path.abspath(os.path.join(os.path.dirname(__file__),args.document_root))
192 logging.info("Serving '/htdocs' from document_root '%s'", document_root)
193
194 # Set the timeout
195 socket.setdefaulttimeout(args.timeout)
196
197 # Inport streams
198 streams = DEFAULT_STREAMS
199 try:
200 import yaml
201 streams.update(yaml.load(open(args.stream_cfg)))
202 except (ImportError, IOError) as e:
203 logging.warning("Stream config file '%s' not readable or parsable (%s)", args.stream_cfg, e)
204
205
206 # Create the base server
207 try:
208 while True:
209 try:
210 ThreadedTCPServer.allow_reuse_address = True
211 server = ThreadedTCPServer((args.host, args.port), MyTCPHandler)
212 break
213 except IOError, e:
214 logging.warning('For conection %s:%s to become available (%s)', args.host, args.port , e)
215 time.sleep(1)
216 except KeyboardInterrupt:
217 sys.exit(1)
218
219 for urlid, url in streams.iteritems():
220 recv_threads.append(threading.Thread(target=get_data, args=(url,urlid)))
221 recv_threads[-1].setDaemon(True)
222 recv_threads[-1].start()
223
224 # Activate the server; this will keep running until you
225 # interrupt the program with Ctrl-C
226 try:
227 logging.info('Serving at %s:%s', args.host, args.port)
228 server.serve_forever()
229 except KeyboardInterrupt, IOError:
230 logging.info('Shutting down, please wait...')
231 running = False
232 server.shutdown()
233 [thread.join() for thread in recv_threads]
234 logging.info('All done, good bye!')
235
Note: See TracBrowser for help on using the repository browser.