source: py-tcpmultiplexer/TCPMultiplexer.py@ 315

Last change on this file since 315 was 314, checked in by Rick van der Zwet, 13 years ago

Allow serving static files as well.

  • Property svn:executable set to *
File size: 6.6 KB
RevLine 
[311]1#!/usr/bin/env python
[313]2"""
[311]3# Multiplexer for HTTP streams, used for broadcast of MPJEG and WAV streams
4#
5# Licence: BSDLike
6#
7# Rick van der Zwet <info@rickvanderzwet.nl>
[313]8"""
9
[311]10import SocketServer
[312]11import argparse
[311]12import logging
13import sys
[312]14import threading
15import time
16import urllib2
[314]17import os
[311]18
[312]19# Some boring defaults
20DEFAULT_HOST = '0.0.0.0'
[313]21DEFAULT_PORT = 9999
22DEFAULT_CONFIG = 'streams.yaml'
[314]23DEFAULT_DOCUMENTROOT = './htdocs'
[312]24
[314]25
[313]26# URL : TARGET
27DEFAULT_STREAMS = {
28 '/cam1/video' : 'http://172.16.0.67:8080/videofeed',
29 '/cam1/audio' : 'http://172.16.0.67:8080/audio.wav',
30 }
31
[312]32# Global variables used as ring-buffers or shared-storage
33urlheader = dict()
34dataheader = dict()
[311]35urldata = dict()
36running = True
37recv_threads = []
[314]38document_root = None
[311]39
40logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
41
42class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
43 pass
44
45
46class MyTCPHandler(SocketServer.BaseRequestHandler):
47 """
48 The RequestHandler class for our server.
49
50 It is instantiated once per connection to the server, and must
51 override the handle() method to implement communication to the
52 client.
53 """
54
55 def handle(self):
[314]56 global running, dataheader, document_root, urlheader, urldata
[311]57 # self.request is the TCP socket connected to the client
58 self.data = self.request.recv(1024).strip()
59 urlid = self.data.split('\n')[0].split()[1]
60 req_type = self.data.split('\n')[0].split()[0]
[314]61 # XXX: Check for .. paths
62 document_file = document_root + urlid
[311]63 logging.info("Connection %s from '%s' for '%s'", req_type, self.client_address[0], urlid)
64 try:
[314]65 if urlid == '/':
66 self.request.send("HTTP/1.1 200 OK\n\n<html><head><title>Overview</title></head><body><h3>Streams</h3><ul>")
67 for urlid in urldata.keys():
68 self.request.send('<li><a href="%(urlid)s">%(urlid)s</a></li>' % {'urlid': urlid})
69 self.request.send("</ul><h3>Files</h3><ul>")
70 for root, dirs, files in os.walk(document_root):
71 for name in files:
72 self.request.send('<li><a href="%(item)s">%(item)s</li>' % { 'item' : os.path.join(root.replace(document_root,'',1),name) })
73 self.request.send("</ul></body></html>")
74 elif urldata.has_key(urlid):
[312]75 self.request.send('HTTP/1.1 200 OK\n' + urlheader[urlid] + 'X-Proxy-Server: TCPMultiplexer.py\n' + '\n' + dataheader[urlid])
76 if req_type == 'HEAD':
77 return
78 elif req_type == 'GET':
79 urldata[urlid][self] = []
80 while running:
81 if len(urldata[urlid][self]) == 0:
82 time.sleep(0.1)
83 continue
84 self.request.send(urldata[urlid][self].pop(0))
[314]85 elif os.path.isfile(document_file):
86 self.request.send('HTTP/1.1 200 OK\n\n' + open(document_file,'r').read())
87 else:
88 self.request.send("HTTP/1.1 404 NOT FOUND\nContent-Type: text/html\n\n<h1>404</h1>The page '%s' does not exists" % urlid)
[311]89 except IOError:
90 pass
91 except StandardError, e:
[312]92 logging.warning('Server has issues: %s', e)
[311]93 del urldata[urlid][self]
94
95def get_data(url, urlid, *args):
[312]96 global running, dataheader, urlheader, urldata
97 logger = logging.getLogger('recv_id=' + urlid)
[311]98 # Fill buffers if needed
[312]99 logger.info("Starting thread '%s' (%s)", url, urlid)
[311]100 while running:
101 try:
102 f = urllib2.urlopen(url)
[312]103 urlheader[urlid] = ''.join(f.info().headers)
[311]104 urldata[urlid] = dict()
[312]105 datatype = None
106 urltype = f.info().gettype()
107 logger.info("url.info().gettype() is '%s'", urltype)
108 if urltype == 'audio/x-wav':
109 datatype = 'wav'
110 elif urltype == 'multipart/x-mixed-replace':
111 datatype = 'mjeg'
112 else:
113 logger.warning("Cannot find file type of '%s'", url)
114
115 if datatype:
116 logger.info("'%s' Identified as %s", url, datatype)
117
118 if datatype == 'wav':
119 dataheader[urlid] = f.read(44)
120 elif datatype == 'mjeg':
121 dataheader[urlid] = '\n'.join(f.read(100).split('\n')[0:2])
122 else:
123 dataheader[urlid] = ''
124
[311]125 while running:
126 data = f.read(10000)
127 for key in urldata[urlid].keys():
128 urldata[urlid][key].append(data)
[312]129 except IOError, e:
[311]130 #Enforce a connection reset
[312]131 logger.warning("URL reset '%s' (%s)", url, e)
132 del urlheader[urlid]
133 del dataheader[urlid]
134 del urldata[urlid]
[311]135 time.sleep(1)
136 continue
[312]137 logger.info("Closing Thread '%s'", url)
[311]138
139
140if __name__ == "__main__":
[313]141 parser = argparse.ArgumentParser(description=__doc__,formatter_class=argparse.RawDescriptionHelpFormatter)
142 parser.add_argument('--host', dest='host', default=DEFAULT_HOST, help='Listen to IP [default: %s]' % DEFAULT_HOST)
143 parser.add_argument('--port', dest='port', default=DEFAULT_PORT, type=int, help='Listen to PORT [default: %s]' % DEFAULT_PORT)
144 parser.add_argument('--stream-cfg', dest='stream_cfg', default=DEFAULT_CONFIG, help='YAML Stream configuration [default: %s]' % DEFAULT_CONFIG)
[314]145 parser.add_argument('--document-root', dest='document_root', default=DEFAULT_DOCUMENTROOT, help='Document Root File Directory [default: %s]' % DEFAULT_DOCUMENTROOT)
[313]146 args = parser.parse_args()
[312]147
[314]148 document_root = os.path.abspath(os.path.join(os.path.dirname(__file__),args.document_root))
149 logging.info("Serving '/htdocs' from document_root '%s'", document_root)
150
[313]151 # Inport streams
152 streams = DEFAULT_STREAMS
153 try:
154 import yaml
155 streams = yaml.load(open(args.stream_cfg))
156 except (ImportError, IOError) as e:
157 logging.warning("Stream config file '%s' not readable or parsable (%s)", args.stream_cfg, e)
[312]158
159
[313]160 # Create the base server
[311]161 try:
162 while True:
163 try:
164 ThreadedTCPServer.allow_reuse_address = True
[313]165 server = ThreadedTCPServer((args.host, args.port), MyTCPHandler)
[311]166 break
167 except IOError, e:
[313]168 logging.warning('For conection %s:%s to become available (%s)', args.host, args.port , e)
[311]169 time.sleep(1)
170 except KeyboardInterrupt:
171 sys.exit(1)
172
[312]173 for urlid, url in streams.iteritems():
174 recv_threads.append(threading.Thread(target=get_data, args=(url,urlid)))
175 recv_threads[-1].setDaemon(True)
176 recv_threads[-1].start()
[311]177
178 # Activate the server; this will keep running until you
179 # interrupt the program with Ctrl-C
180 try:
[313]181 logging.info('Serving at %s:%s', args.host, args.port)
[311]182 server.serve_forever()
183 except KeyboardInterrupt, IOError:
184 logging.info('Shutting down, please wait...')
185 running = False
186 server.shutdown()
187 [thread.join() for thread in recv_threads]
188 logging.info('All done, good bye!')
189
Note: See TracBrowser for help on using the repository browser.