source: py-tcpmultiplexer/TCPMultiplexer.py@ 324

Last change on this file since 324 was 324, checked in by Rick van der Zwet, 13 years ago

Be smarter about the datasize

  • Property svn:executable set to *
File size: 8.3 KB
RevLine 
[311]1#!/usr/bin/env python
[313]2"""
[311]3# Multiplexer for HTTP streams, used for broadcast of MPJEG and WAV streams
4#
5# Licence: BSDLike
[317]6# URL: http://rickvanderzwet.nl/svn/personal/py-tcpmultiplexer/
[311]7#
8# Rick van der Zwet <info@rickvanderzwet.nl>
[313]9"""
10
[311]11import SocketServer
[312]12import argparse
[311]13import logging
[323]14import os
15import socket
[311]16import sys
[312]17import threading
18import time
19import urllib2
[311]20
[312]21# Some boring defaults
22DEFAULT_HOST = '0.0.0.0'
[313]23DEFAULT_PORT = 9999
24DEFAULT_CONFIG = 'streams.yaml'
[314]25DEFAULT_DOCUMENTROOT = './htdocs'
[323]26DEFAULT_TIMEOUT = 2
[312]27
[314]28
[313]29# URL : TARGET
30DEFAULT_STREAMS = {
31 '/cam1/video' : 'http://172.16.0.67:8080/videofeed',
32 '/cam1/audio' : 'http://172.16.0.67:8080/audio.wav',
33 }
34
[323]35
[312]36# Global variables used as ring-buffers or shared-storage
37urlheader = dict()
38dataheader = dict()
[311]39urldata = dict()
40running = True
41recv_threads = []
[314]42document_root = None
[311]43
44logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
45
46class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
47 pass
48
49
50class MyTCPHandler(SocketServer.BaseRequestHandler):
51 """
52 The RequestHandler class for our server.
53
54 It is instantiated once per connection to the server, and must
55 override the handle() method to implement communication to the
56 client.
57 """
58
59 def handle(self):
[314]60 global running, dataheader, document_root, urlheader, urldata
[311]61 # self.request is the TCP socket connected to the client
62 self.data = self.request.recv(1024).strip()
63 urlid = self.data.split('\n')[0].split()[1]
64 req_type = self.data.split('\n')[0].split()[0]
[314]65 # XXX: Check for .. paths
66 document_file = document_root + urlid
[311]67 logging.info("Connection %s from '%s' for '%s'", req_type, self.client_address[0], urlid)
68 try:
[314]69 if urlid == '/':
70 self.request.send("HTTP/1.1 200 OK\n\n<html><head><title>Overview</title></head><body><h3>Streams</h3><ul>")
[322]71 for urlid in sorted(urldata.keys()):
[314]72 self.request.send('<li><a href="%(urlid)s">%(urlid)s</a></li>' % {'urlid': urlid})
73 self.request.send("</ul><h3>Files</h3><ul>")
74 for root, dirs, files in os.walk(document_root):
[318]75 # Please do not show any hidden files or directories
76 [dirs.remove(name) for name in dirs if name.startswith('.')]
77 [files.remove(name) for name in files if name.startswith('.')]
[322]78 for name in sorted(files):
[314]79 self.request.send('<li><a href="%(item)s">%(item)s</li>' % { 'item' : os.path.join(root.replace(document_root,'',1),name) })
80 self.request.send("</ul></body></html>")
81 elif urldata.has_key(urlid):
[312]82 self.request.send('HTTP/1.1 200 OK\n' + urlheader[urlid] + 'X-Proxy-Server: TCPMultiplexer.py\n' + '\n' + dataheader[urlid])
83 if req_type == 'HEAD':
84 return
85 elif req_type == 'GET':
86 urldata[urlid][self] = []
87 while running:
88 if len(urldata[urlid][self]) == 0:
89 time.sleep(0.1)
90 continue
91 self.request.send(urldata[urlid][self].pop(0))
[314]92 elif os.path.isfile(document_file):
93 self.request.send('HTTP/1.1 200 OK\n\n' + open(document_file,'r').read())
94 else:
95 self.request.send("HTTP/1.1 404 NOT FOUND\nContent-Type: text/html\n\n<h1>404</h1>The page '%s' does not exists" % urlid)
[311]96 except IOError:
[322]97 logging.info("Connection closed from '%s'", self.client_address[0])
[311]98 del urldata[urlid][self]
99
[323]100
101
[311]102def get_data(url, urlid, *args):
[323]103 """ Fetch the DATA from the WAV or MJPEG Stream """
[312]104 global running, dataheader, urlheader, urldata
105 logger = logging.getLogger('recv_id=' + urlid)
[311]106 # Fill buffers if needed
[312]107 logger.info("Starting thread '%s' (%s)", url, urlid)
[311]108 while running:
[323]109 urlheader[urlid] = None
110 dataheader[urlid] = None
111 urldata[urlid] = None
[311]112 try:
113 f = urllib2.urlopen(url)
[312]114 urlheader[urlid] = ''.join(f.info().headers)
[311]115 urldata[urlid] = dict()
[323]116
117 # Find datatype
[312]118 datatype = None
119 urltype = f.info().gettype()
120 logger.info("url.info().gettype() is '%s'", urltype)
121 if urltype == 'audio/x-wav':
122 datatype = 'wav'
123 elif urltype == 'multipart/x-mixed-replace':
[320]124 datatype = 'mjpeg'
[312]125
[323]126 # Be verbose to the user
[312]127 if datatype:
128 logger.info("'%s' Identified as %s", url, datatype)
[323]129 else:
130 logger.warning("Cannot find file type of '%s'", url)
[312]131
[323]132 # Set the correct data header
[312]133 if datatype == 'wav':
[318]134 # WAV header
[312]135 dataheader[urlid] = f.read(44)
[320]136 elif datatype == 'mjpeg':
[324]137 data = f.read(1024)
[320]138
[324]139 # Get the required headers and acurate datasize
[320]140 headers = []
[324]141 datasize = 1024 * 1024
[320]142 for header in data.splitlines():
[322]143 if not header.strip():
144 # Newlines in the beginning are evil
145 if headers:
146 break
147 else:
148 continue
[324]149 if header.startswith('DataLen:') or header.startswith('Content-length:'):
150 datasize = int(header.split(':')[1])
[320]151 headers.append(header)
152 boundry = headers[0]
[324]153
154 logger.info("Data Length: %s", datasize)
[320]155 logger.info("Boundry line: %s", boundry)
[318]156 logger.info("Image headers %s", headers)
[320]157
[324]158 data = data + f.read(datasize * 2)
[320]159 valid_image = boundry + data.split(boundry)[1]
160 dataheader[urlid] = valid_image + '\n'.join(headers) + '\n'
[312]161 else:
162 dataheader[urlid] = ''
163
[318]164 logger.info("Using dataheader of length %s", len(dataheader[urlid]))
165
[311]166 while running:
167 data = f.read(10000)
[323]168 # logger.debug("Received data chunk with length: %s", len(data))
[311]169 for key in urldata[urlid].keys():
170 urldata[urlid][key].append(data)
[312]171 except IOError, e:
[311]172 #Enforce a connection reset
[312]173 logger.warning("URL reset '%s' (%s)", url, e)
174 del urlheader[urlid]
175 del dataheader[urlid]
176 del urldata[urlid]
[311]177 time.sleep(1)
178 continue
[312]179 logger.info("Closing Thread '%s'", url)
[311]180
181
182if __name__ == "__main__":
[313]183 parser = argparse.ArgumentParser(description=__doc__,formatter_class=argparse.RawDescriptionHelpFormatter)
184 parser.add_argument('--host', dest='host', default=DEFAULT_HOST, help='Listen to IP [default: %s]' % DEFAULT_HOST)
185 parser.add_argument('--port', dest='port', default=DEFAULT_PORT, type=int, help='Listen to PORT [default: %s]' % DEFAULT_PORT)
[323]186 parser.add_argument('--timeout', dest='timeout', default=DEFAULT_TIMEOUT, type=int, help='Default socket timeout [default: %s]' % DEFAULT_TIMEOUT)
[313]187 parser.add_argument('--stream-cfg', dest='stream_cfg', default=DEFAULT_CONFIG, help='YAML Stream configuration [default: %s]' % DEFAULT_CONFIG)
[314]188 parser.add_argument('--document-root', dest='document_root', default=DEFAULT_DOCUMENTROOT, help='Document Root File Directory [default: %s]' % DEFAULT_DOCUMENTROOT)
[313]189 args = parser.parse_args()
[312]190
[314]191 document_root = os.path.abspath(os.path.join(os.path.dirname(__file__),args.document_root))
192 logging.info("Serving '/htdocs' from document_root '%s'", document_root)
193
[323]194 # Set the timeout
195 socket.setdefaulttimeout(args.timeout)
196
[313]197 # Inport streams
198 streams = DEFAULT_STREAMS
199 try:
200 import yaml
[318]201 streams.update(yaml.load(open(args.stream_cfg)))
[313]202 except (ImportError, IOError) as e:
203 logging.warning("Stream config file '%s' not readable or parsable (%s)", args.stream_cfg, e)
[312]204
205
[313]206 # Create the base server
[311]207 try:
208 while True:
209 try:
210 ThreadedTCPServer.allow_reuse_address = True
[313]211 server = ThreadedTCPServer((args.host, args.port), MyTCPHandler)
[311]212 break
213 except IOError, e:
[313]214 logging.warning('For conection %s:%s to become available (%s)', args.host, args.port , e)
[311]215 time.sleep(1)
216 except KeyboardInterrupt:
217 sys.exit(1)
218
[312]219 for urlid, url in streams.iteritems():
220 recv_threads.append(threading.Thread(target=get_data, args=(url,urlid)))
221 recv_threads[-1].setDaemon(True)
222 recv_threads[-1].start()
[311]223
224 # Activate the server; this will keep running until you
225 # interrupt the program with Ctrl-C
226 try:
[313]227 logging.info('Serving at %s:%s', args.host, args.port)
[311]228 server.serve_forever()
229 except KeyboardInterrupt, IOError:
230 logging.info('Shutting down, please wait...')
231 running = False
232 server.shutdown()
233 [thread.join() for thread in recv_threads]
234 logging.info('All done, good bye!')
235
Note: See TracBrowser for help on using the repository browser.