source: py-tcpmultiplexer/TCPMultiplexer.py@ 350

Last change on this file since 350 was 338, checked in by Rick van der Zwet, 13 years ago

Only Serve Stream which is fully started.

  • Property svn:executable set to *
File size: 9.3 KB
RevLine 
[311]1#!/usr/bin/env python
[313]2"""
[311]3# Multiplexer for HTTP streams, used for broadcast of MPJEG and WAV streams
4#
5# Licence: BSDLike
[317]6# URL: http://rickvanderzwet.nl/svn/personal/py-tcpmultiplexer/
[311]7#
8# Rick van der Zwet <info@rickvanderzwet.nl>
[313]9"""
10
[311]11import SocketServer
[312]12import argparse
[311]13import logging
[331]14import logging.handlers
[323]15import os
16import socket
[311]17import sys
[312]18import threading
19import time
20import urllib2
[311]21
[312]22# Some boring defaults
23DEFAULT_HOST = '0.0.0.0'
[313]24DEFAULT_PORT = 9999
25DEFAULT_CONFIG = 'streams.yaml'
[314]26DEFAULT_DOCUMENTROOT = './htdocs'
[325]27DEFAULT_TIMEOUT = None
[331]28DEFAULT_LOGFILE = 'py-tcpmultiplexer.log'
[312]29
[314]30
[313]31# URL : TARGET
32DEFAULT_STREAMS = {
[325]33# '/cam1/video' : 'http://172.16.0.67:8080/videofeed',
34# '/cam1/audio' : 'http://172.16.0.67:8080/audio.wav',
[313]35 }
36
[323]37
[312]38# Global variables used as ring-buffers or shared-storage
39urlheader = dict()
40dataheader = dict()
[311]41urldata = dict()
[325]42stream_running = dict()
[311]43running = True
44recv_threads = []
[314]45document_root = None
[325]46recv_timeout = None
[311]47
[331]48logger = logging.getLogger(__name__)
49logger.setLevel(logging.INFO)
50formatter = logging.Formatter('%(asctime)s - %(threadName)s - %(levelname)s - %(message)s')
[311]51
[331]52ch = logging.StreamHandler()
53ch.setFormatter(formatter)
54
55logger.addHandler(ch)
56
[311]57class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
58 pass
59
60
61class MyTCPHandler(SocketServer.BaseRequestHandler):
62 """
63 The RequestHandler class for our server.
64
65 It is instantiated once per connection to the server, and must
66 override the handle() method to implement communication to the
67 client.
68 """
69
70 def handle(self):
[325]71 global running, dataheader, document_root, stream_running, urlheader, urldata
[311]72 # self.request is the TCP socket connected to the client
73 self.data = self.request.recv(1024).strip()
74 urlid = self.data.split('\n')[0].split()[1]
75 req_type = self.data.split('\n')[0].split()[0]
[314]76 # XXX: Check for .. paths
77 document_file = document_root + urlid
[311]78 logging.info("Connection %s from '%s' for '%s'", req_type, self.client_address[0], urlid)
79 try:
[314]80 if urlid == '/':
81 self.request.send("HTTP/1.1 200 OK\n\n<html><head><title>Overview</title></head><body><h3>Streams</h3><ul>")
[325]82 for urlid in filter(lambda x: stream_running[x], stream_running.keys()):
[314]83 self.request.send('<li><a href="%(urlid)s">%(urlid)s</a></li>' % {'urlid': urlid})
84 self.request.send("</ul><h3>Files</h3><ul>")
85 for root, dirs, files in os.walk(document_root):
[318]86 # Please do not show any hidden files or directories
87 [dirs.remove(name) for name in dirs if name.startswith('.')]
88 [files.remove(name) for name in files if name.startswith('.')]
[322]89 for name in sorted(files):
[314]90 self.request.send('<li><a href="%(item)s">%(item)s</li>' % { 'item' : os.path.join(root.replace(document_root,'',1),name) })
91 self.request.send("</ul></body></html>")
[338]92 elif stream_running[urlid]:
[312]93 self.request.send('HTTP/1.1 200 OK\n' + urlheader[urlid] + 'X-Proxy-Server: TCPMultiplexer.py\n' + '\n' + dataheader[urlid])
94 if req_type == 'HEAD':
95 return
96 elif req_type == 'GET':
97 urldata[urlid][self] = []
98 while running:
99 if len(urldata[urlid][self]) == 0:
100 time.sleep(0.1)
101 continue
102 self.request.send(urldata[urlid][self].pop(0))
[314]103 elif os.path.isfile(document_file):
[325]104 data = open(document_file,'r').read()
105 self.request.send('HTTP/1.1 200 OK\nContent-Length: %s\n\n%s' % (len(data),data))
[314]106 else:
107 self.request.send("HTTP/1.1 404 NOT FOUND\nContent-Type: text/html\n\n<h1>404</h1>The page '%s' does not exists" % urlid)
[311]108 except IOError:
[322]109 logging.info("Connection closed from '%s'", self.client_address[0])
[311]110 del urldata[urlid][self]
111
[323]112
113
[311]114def get_data(url, urlid, *args):
[323]115 """ Fetch the DATA from the WAV or MJPEG Stream """
[325]116 global running, dataheader, urlheader, urldata, stream_running, recv_timeout
[311]117 # Fill buffers if needed
[312]118 logger.info("Starting thread '%s' (%s)", url, urlid)
[311]119 while running:
[325]120 stream_running[urlid] = False
[323]121 urlheader[urlid] = None
122 dataheader[urlid] = None
123 urldata[urlid] = None
[311]124 try:
[325]125 f = urllib2.urlopen(url,timeout=recv_timeout)
[312]126 urlheader[urlid] = ''.join(f.info().headers)
[311]127 urldata[urlid] = dict()
[323]128
129 # Find datatype
[312]130 datatype = None
131 urltype = f.info().gettype()
132 logger.info("url.info().gettype() is '%s'", urltype)
133 if urltype == 'audio/x-wav':
134 datatype = 'wav'
135 elif urltype == 'multipart/x-mixed-replace':
[320]136 datatype = 'mjpeg'
[312]137
[323]138 # Be verbose to the user
[312]139 if datatype:
140 logger.info("'%s' Identified as %s", url, datatype)
[323]141 else:
142 logger.warning("Cannot find file type of '%s'", url)
[312]143
[323]144 # Set the correct data header
[312]145 if datatype == 'wav':
[318]146 # WAV header
[312]147 dataheader[urlid] = f.read(44)
[320]148 elif datatype == 'mjpeg':
[324]149 data = f.read(1024)
[320]150
[324]151 # Get the required headers and acurate datasize
[320]152 headers = []
[324]153 datasize = 1024 * 1024
[320]154 for header in data.splitlines():
[322]155 if not header.strip():
156 # Newlines in the beginning are evil
157 if headers:
158 break
159 else:
160 continue
[324]161 if header.startswith('DataLen:') or header.startswith('Content-length:'):
162 datasize = int(header.split(':')[1])
[320]163 headers.append(header)
164 boundry = headers[0]
[324]165
166 logger.info("Data Length: %s", datasize)
[320]167 logger.info("Boundry line: %s", boundry)
[318]168 logger.info("Image headers %s", headers)
[320]169
[324]170 data = data + f.read(datasize * 2)
[320]171 valid_image = boundry + data.split(boundry)[1]
172 dataheader[urlid] = valid_image + '\n'.join(headers) + '\n'
[312]173 else:
174 dataheader[urlid] = ''
[318]175 logger.info("Using dataheader of length %s", len(dataheader[urlid]))
176
[325]177 # Main data loader
178 logger.info("Stream ready to serve")
179 stream_running[urlid] = True
180 recv_buffer_size = 1024 * 8
[311]181 while running:
[325]182 data = f.read(recv_buffer_size)
183 if not len(data) == recv_buffer_size:
184 raise IOError("Connection corrupted, got '%s' instead of '%s'" % (len(data), recv_buffer_size))
185 logger.debug("Received data chunk with length: %s", len(data))
[311]186 for key in urldata[urlid].keys():
187 urldata[urlid][key].append(data)
[331]188 except (urllib2.URLError, IOError) as e:
[311]189 #Enforce a connection reset
[312]190 logger.warning("URL reset '%s' (%s)", url, e)
[325]191 stream_running[urlid] = False
[311]192 time.sleep(1)
[325]193 pass
[312]194 logger.info("Closing Thread '%s'", url)
[311]195
196
197if __name__ == "__main__":
[313]198 parser = argparse.ArgumentParser(description=__doc__,formatter_class=argparse.RawDescriptionHelpFormatter)
199 parser.add_argument('--host', dest='host', default=DEFAULT_HOST, help='Listen to IP [default: %s]' % DEFAULT_HOST)
200 parser.add_argument('--port', dest='port', default=DEFAULT_PORT, type=int, help='Listen to PORT [default: %s]' % DEFAULT_PORT)
[331]201 parser.add_argument('--logfile', dest='logfile', default=DEFAULT_LOGFILE, help='File to write logfiles to [default: %s]' % DEFAULT_LOGFILE)
[323]202 parser.add_argument('--timeout', dest='timeout', default=DEFAULT_TIMEOUT, type=int, help='Default socket timeout [default: %s]' % DEFAULT_TIMEOUT)
[313]203 parser.add_argument('--stream-cfg', dest='stream_cfg', default=DEFAULT_CONFIG, help='YAML Stream configuration [default: %s]' % DEFAULT_CONFIG)
[314]204 parser.add_argument('--document-root', dest='document_root', default=DEFAULT_DOCUMENTROOT, help='Document Root File Directory [default: %s]' % DEFAULT_DOCUMENTROOT)
[313]205 args = parser.parse_args()
[312]206
[331]207 # Add file logger
208 ch = logging.handlers.WatchedFileHandler(args.logfile)
209 ch.setFormatter(formatter)
210 logger.addHandler(ch)
211
[314]212 document_root = os.path.abspath(os.path.join(os.path.dirname(__file__),args.document_root))
[331]213 logger.info("Serving '/htdocs' from document_root '%s'", document_root)
[314]214
[323]215 # Set the timeout
[331]216 logger.info("Changing socket timeout from '%s' to '%s'", socket.getdefaulttimeout(), args.timeout)
[325]217 recv_timeout = args.timeout
[323]218
[331]219
[313]220 # Inport streams
221 streams = DEFAULT_STREAMS
222 try:
223 import yaml
[318]224 streams.update(yaml.load(open(args.stream_cfg)))
[313]225 except (ImportError, IOError) as e:
[331]226 logger.warning("Stream config file '%s' not readable or parsable (%s)", args.stream_cfg, e)
[312]227
228
[313]229 # Create the base server
[311]230 try:
231 while True:
232 try:
233 ThreadedTCPServer.allow_reuse_address = True
[313]234 server = ThreadedTCPServer((args.host, args.port), MyTCPHandler)
[311]235 break
236 except IOError, e:
[331]237 logger.warning('For conection %s:%s to become available (%s)', args.host, args.port , e)
[311]238 time.sleep(1)
239 except KeyboardInterrupt:
240 sys.exit(1)
241
[312]242 for urlid, url in streams.iteritems():
[331]243 recv_threads.append(threading.Thread(name=urlid, target=get_data, args=(url,urlid)))
[312]244 recv_threads[-1].setDaemon(True)
245 recv_threads[-1].start()
[311]246
247 # Activate the server; this will keep running until you
248 # interrupt the program with Ctrl-C
249 try:
[331]250 logger.info('Serving at %s:%s', args.host, args.port)
[311]251 server.serve_forever()
252 except KeyboardInterrupt, IOError:
[331]253 logger.info('Shutting down, please wait...')
[311]254 running = False
255 server.shutdown()
256 [thread.join() for thread in recv_threads]
[331]257 logger.info('All done, good bye!')
[311]258
Note: See TracBrowser for help on using the repository browser.