source: py-tcpmultiplexer/TCPMultiplexer.py@ 330

Last change on this file since 330 was 325, checked in by Rick van der Zwet, 13 years ago

Acive flags & some doc detiasl

  • Property svn:executable set to *
File size: 8.9 KB
RevLine 
[311]1#!/usr/bin/env python
[313]2"""
[311]3# Multiplexer for HTTP streams, used for broadcast of MPJEG and WAV streams
4#
5# Licence: BSDLike
[317]6# URL: http://rickvanderzwet.nl/svn/personal/py-tcpmultiplexer/
[311]7#
8# Rick van der Zwet <info@rickvanderzwet.nl>
[313]9"""
10
[311]11import SocketServer
[312]12import argparse
[311]13import logging
[323]14import os
15import socket
[311]16import sys
[312]17import threading
18import time
19import urllib2
[311]20
[312]21# Some boring defaults
22DEFAULT_HOST = '0.0.0.0'
[313]23DEFAULT_PORT = 9999
24DEFAULT_CONFIG = 'streams.yaml'
[314]25DEFAULT_DOCUMENTROOT = './htdocs'
[325]26DEFAULT_TIMEOUT = None
[312]27
[314]28
[313]29# URL : TARGET
30DEFAULT_STREAMS = {
[325]31# '/cam1/video' : 'http://172.16.0.67:8080/videofeed',
32# '/cam1/audio' : 'http://172.16.0.67:8080/audio.wav',
[313]33 }
34
[323]35
[312]36# Global variables used as ring-buffers or shared-storage
37urlheader = dict()
38dataheader = dict()
[311]39urldata = dict()
[325]40stream_running = dict()
[311]41running = True
42recv_threads = []
[314]43document_root = None
[325]44recv_timeout = None
[311]45
[325]46logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
[311]47
48class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
49 pass
50
51
52class MyTCPHandler(SocketServer.BaseRequestHandler):
53 """
54 The RequestHandler class for our server.
55
56 It is instantiated once per connection to the server, and must
57 override the handle() method to implement communication to the
58 client.
59 """
60
61 def handle(self):
[325]62 global running, dataheader, document_root, stream_running, urlheader, urldata
[311]63 # self.request is the TCP socket connected to the client
64 self.data = self.request.recv(1024).strip()
65 urlid = self.data.split('\n')[0].split()[1]
66 req_type = self.data.split('\n')[0].split()[0]
[314]67 # XXX: Check for .. paths
68 document_file = document_root + urlid
[311]69 logging.info("Connection %s from '%s' for '%s'", req_type, self.client_address[0], urlid)
70 try:
[314]71 if urlid == '/':
72 self.request.send("HTTP/1.1 200 OK\n\n<html><head><title>Overview</title></head><body><h3>Streams</h3><ul>")
[325]73 for urlid in filter(lambda x: stream_running[x], stream_running.keys()):
[314]74 self.request.send('<li><a href="%(urlid)s">%(urlid)s</a></li>' % {'urlid': urlid})
75 self.request.send("</ul><h3>Files</h3><ul>")
76 for root, dirs, files in os.walk(document_root):
[318]77 # Please do not show any hidden files or directories
78 [dirs.remove(name) for name in dirs if name.startswith('.')]
79 [files.remove(name) for name in files if name.startswith('.')]
[322]80 for name in sorted(files):
[314]81 self.request.send('<li><a href="%(item)s">%(item)s</li>' % { 'item' : os.path.join(root.replace(document_root,'',1),name) })
82 self.request.send("</ul></body></html>")
83 elif urldata.has_key(urlid):
[312]84 self.request.send('HTTP/1.1 200 OK\n' + urlheader[urlid] + 'X-Proxy-Server: TCPMultiplexer.py\n' + '\n' + dataheader[urlid])
85 if req_type == 'HEAD':
86 return
87 elif req_type == 'GET':
88 urldata[urlid][self] = []
89 while running:
90 if len(urldata[urlid][self]) == 0:
91 time.sleep(0.1)
92 continue
93 self.request.send(urldata[urlid][self].pop(0))
[314]94 elif os.path.isfile(document_file):
[325]95 data = open(document_file,'r').read()
96 self.request.send('HTTP/1.1 200 OK\nContent-Length: %s\n\n%s' % (len(data),data))
[314]97 else:
98 self.request.send("HTTP/1.1 404 NOT FOUND\nContent-Type: text/html\n\n<h1>404</h1>The page '%s' does not exists" % urlid)
[311]99 except IOError:
[322]100 logging.info("Connection closed from '%s'", self.client_address[0])
[311]101 del urldata[urlid][self]
102
[323]103
104
[311]105def get_data(url, urlid, *args):
[323]106 """ Fetch the DATA from the WAV or MJPEG Stream """
[325]107 global running, dataheader, urlheader, urldata, stream_running, recv_timeout
[312]108 logger = logging.getLogger('recv_id=' + urlid)
[311]109 # Fill buffers if needed
[312]110 logger.info("Starting thread '%s' (%s)", url, urlid)
[311]111 while running:
[325]112 stream_running[urlid] = False
[323]113 urlheader[urlid] = None
114 dataheader[urlid] = None
115 urldata[urlid] = None
[311]116 try:
[325]117 f = urllib2.urlopen(url,timeout=recv_timeout)
[312]118 urlheader[urlid] = ''.join(f.info().headers)
[311]119 urldata[urlid] = dict()
[323]120
121 # Find datatype
[312]122 datatype = None
123 urltype = f.info().gettype()
124 logger.info("url.info().gettype() is '%s'", urltype)
125 if urltype == 'audio/x-wav':
126 datatype = 'wav'
127 elif urltype == 'multipart/x-mixed-replace':
[320]128 datatype = 'mjpeg'
[312]129
[323]130 # Be verbose to the user
[312]131 if datatype:
132 logger.info("'%s' Identified as %s", url, datatype)
[323]133 else:
134 logger.warning("Cannot find file type of '%s'", url)
[312]135
[323]136 # Set the correct data header
[312]137 if datatype == 'wav':
[318]138 # WAV header
[312]139 dataheader[urlid] = f.read(44)
[320]140 elif datatype == 'mjpeg':
[324]141 data = f.read(1024)
[320]142
[324]143 # Get the required headers and acurate datasize
[320]144 headers = []
[324]145 datasize = 1024 * 1024
[320]146 for header in data.splitlines():
[322]147 if not header.strip():
148 # Newlines in the beginning are evil
149 if headers:
150 break
151 else:
152 continue
[324]153 if header.startswith('DataLen:') or header.startswith('Content-length:'):
154 datasize = int(header.split(':')[1])
[320]155 headers.append(header)
156 boundry = headers[0]
[324]157
158 logger.info("Data Length: %s", datasize)
[320]159 logger.info("Boundry line: %s", boundry)
[318]160 logger.info("Image headers %s", headers)
[320]161
[324]162 data = data + f.read(datasize * 2)
[320]163 valid_image = boundry + data.split(boundry)[1]
164 dataheader[urlid] = valid_image + '\n'.join(headers) + '\n'
[312]165 else:
166 dataheader[urlid] = ''
[318]167 logger.info("Using dataheader of length %s", len(dataheader[urlid]))
168
[325]169 # Main data loader
170 logger.info("Stream ready to serve")
171 stream_running[urlid] = True
172 recv_buffer_size = 1024 * 8
[311]173 while running:
[325]174 data = f.read(recv_buffer_size)
175 if not len(data) == recv_buffer_size:
176 raise IOError("Connection corrupted, got '%s' instead of '%s'" % (len(data), recv_buffer_size))
177 logger.debug("Received data chunk with length: %s", len(data))
[311]178 for key in urldata[urlid].keys():
179 urldata[urlid][key].append(data)
[312]180 except IOError, e:
[311]181 #Enforce a connection reset
[312]182 logger.warning("URL reset '%s' (%s)", url, e)
[325]183 stream_running[urlid] = False
[311]184 time.sleep(1)
[325]185 pass
[312]186 logger.info("Closing Thread '%s'", url)
[311]187
188
189if __name__ == "__main__":
[313]190 parser = argparse.ArgumentParser(description=__doc__,formatter_class=argparse.RawDescriptionHelpFormatter)
191 parser.add_argument('--host', dest='host', default=DEFAULT_HOST, help='Listen to IP [default: %s]' % DEFAULT_HOST)
192 parser.add_argument('--port', dest='port', default=DEFAULT_PORT, type=int, help='Listen to PORT [default: %s]' % DEFAULT_PORT)
[323]193 parser.add_argument('--timeout', dest='timeout', default=DEFAULT_TIMEOUT, type=int, help='Default socket timeout [default: %s]' % DEFAULT_TIMEOUT)
[313]194 parser.add_argument('--stream-cfg', dest='stream_cfg', default=DEFAULT_CONFIG, help='YAML Stream configuration [default: %s]' % DEFAULT_CONFIG)
[314]195 parser.add_argument('--document-root', dest='document_root', default=DEFAULT_DOCUMENTROOT, help='Document Root File Directory [default: %s]' % DEFAULT_DOCUMENTROOT)
[313]196 args = parser.parse_args()
[312]197
[314]198 document_root = os.path.abspath(os.path.join(os.path.dirname(__file__),args.document_root))
199 logging.info("Serving '/htdocs' from document_root '%s'", document_root)
200
[323]201 # Set the timeout
[325]202 logging.info("Changing socket timeout from '%s' to '%s'", socket.getdefaulttimeout(), args.timeout)
203 recv_timeout = args.timeout
[323]204
[313]205 # Inport streams
206 streams = DEFAULT_STREAMS
207 try:
208 import yaml
[318]209 streams.update(yaml.load(open(args.stream_cfg)))
[313]210 except (ImportError, IOError) as e:
211 logging.warning("Stream config file '%s' not readable or parsable (%s)", args.stream_cfg, e)
[312]212
213
[313]214 # Create the base server
[311]215 try:
216 while True:
217 try:
218 ThreadedTCPServer.allow_reuse_address = True
[313]219 server = ThreadedTCPServer((args.host, args.port), MyTCPHandler)
[311]220 break
221 except IOError, e:
[313]222 logging.warning('For conection %s:%s to become available (%s)', args.host, args.port , e)
[311]223 time.sleep(1)
224 except KeyboardInterrupt:
225 sys.exit(1)
226
[312]227 for urlid, url in streams.iteritems():
228 recv_threads.append(threading.Thread(target=get_data, args=(url,urlid)))
229 recv_threads[-1].setDaemon(True)
230 recv_threads[-1].start()
[311]231
232 # Activate the server; this will keep running until you
233 # interrupt the program with Ctrl-C
234 try:
[313]235 logging.info('Serving at %s:%s', args.host, args.port)
[311]236 server.serve_forever()
237 except KeyboardInterrupt, IOError:
238 logging.info('Shutting down, please wait...')
239 running = False
240 server.shutdown()
241 [thread.join() for thread in recv_threads]
242 logging.info('All done, good bye!')
243
Note: See TracBrowser for help on using the repository browser.