source: py-tcpmultiplexer/TCPMultiplexer.py@ 316

Last change on this file since 316 was 316, checked in by Rick van der Zwet, 13 years ago

Do not show versioning info.

  • Property svn:executable set to *
File size: 6.8 KB
Line 
1#!/usr/bin/env python
2"""
3# Multiplexer for HTTP streams, used for broadcast of MPJEG and WAV streams
4#
5# Licence: BSDLike
6#
7# Rick van der Zwet <info@rickvanderzwet.nl>
8"""
9
10import SocketServer
11import argparse
12import logging
13import sys
14import threading
15import time
16import urllib2
17import os
18
19# Some boring defaults
20DEFAULT_HOST = '0.0.0.0'
21DEFAULT_PORT = 9999
22DEFAULT_CONFIG = 'streams.yaml'
23DEFAULT_DOCUMENTROOT = './htdocs'
24
25
26# URL : TARGET
27DEFAULT_STREAMS = {
28 '/cam1/video' : 'http://172.16.0.67:8080/videofeed',
29 '/cam1/audio' : 'http://172.16.0.67:8080/audio.wav',
30 }
31
32# Global variables used as ring-buffers or shared-storage
33urlheader = dict()
34dataheader = dict()
35urldata = dict()
36running = True
37recv_threads = []
38document_root = None
39
40logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
41
42class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
43 pass
44
45
46class MyTCPHandler(SocketServer.BaseRequestHandler):
47 """
48 The RequestHandler class for our server.
49
50 It is instantiated once per connection to the server, and must
51 override the handle() method to implement communication to the
52 client.
53 """
54
55 def handle(self):
56 global running, dataheader, document_root, urlheader, urldata
57 # self.request is the TCP socket connected to the client
58 self.data = self.request.recv(1024).strip()
59 urlid = self.data.split('\n')[0].split()[1]
60 req_type = self.data.split('\n')[0].split()[0]
61 # XXX: Check for .. paths
62 document_file = document_root + urlid
63 logging.info("Connection %s from '%s' for '%s'", req_type, self.client_address[0], urlid)
64 try:
65 if urlid == '/':
66 self.request.send("HTTP/1.1 200 OK\n\n<html><head><title>Overview</title></head><body><h3>Streams</h3><ul>")
67 for urlid in urldata.keys():
68 self.request.send('<li><a href="%(urlid)s">%(urlid)s</a></li>' % {'urlid': urlid})
69 self.request.send("</ul><h3>Files</h3><ul>")
70 for root, dirs, files in os.walk(document_root):
71 for name in files:
72 self.request.send('<li><a href="%(item)s">%(item)s</li>' % { 'item' : os.path.join(root.replace(document_root,'',1),name) })
73 # Please do not show any hidden file
74 [dirs.remove(name) for name in dirs if name.startswith('.')]
75 self.request.send("</ul></body></html>")
76 elif urldata.has_key(urlid):
77 self.request.send('HTTP/1.1 200 OK\n' + urlheader[urlid] + 'X-Proxy-Server: TCPMultiplexer.py\n' + '\n' + dataheader[urlid])
78 if req_type == 'HEAD':
79 return
80 elif req_type == 'GET':
81 urldata[urlid][self] = []
82 while running:
83 if len(urldata[urlid][self]) == 0:
84 time.sleep(0.1)
85 continue
86 self.request.send(urldata[urlid][self].pop(0))
87 elif os.path.isfile(document_file):
88 self.request.send('HTTP/1.1 200 OK\n\n' + open(document_file,'r').read())
89 else:
90 self.request.send("HTTP/1.1 404 NOT FOUND\nContent-Type: text/html\n\n<h1>404</h1>The page '%s' does not exists" % urlid)
91 except IOError:
92 pass
93 except StandardError, e:
94 logging.warning('Server has issues: %s', e)
95 del urldata[urlid][self]
96
97def get_data(url, urlid, *args):
98 global running, dataheader, urlheader, urldata
99 logger = logging.getLogger('recv_id=' + urlid)
100 # Fill buffers if needed
101 logger.info("Starting thread '%s' (%s)", url, urlid)
102 while running:
103 try:
104 f = urllib2.urlopen(url)
105 urlheader[urlid] = ''.join(f.info().headers)
106 urldata[urlid] = dict()
107 datatype = None
108 urltype = f.info().gettype()
109 logger.info("url.info().gettype() is '%s'", urltype)
110 if urltype == 'audio/x-wav':
111 datatype = 'wav'
112 elif urltype == 'multipart/x-mixed-replace':
113 datatype = 'mjeg'
114 else:
115 logger.warning("Cannot find file type of '%s'", url)
116
117 if datatype:
118 logger.info("'%s' Identified as %s", url, datatype)
119
120 if datatype == 'wav':
121 dataheader[urlid] = f.read(44)
122 elif datatype == 'mjeg':
123 dataheader[urlid] = '\n'.join(f.read(100).split('\n')[0:2])
124 else:
125 dataheader[urlid] = ''
126
127 while running:
128 data = f.read(10000)
129 for key in urldata[urlid].keys():
130 urldata[urlid][key].append(data)
131 except IOError, e:
132 #Enforce a connection reset
133 logger.warning("URL reset '%s' (%s)", url, e)
134 del urlheader[urlid]
135 del dataheader[urlid]
136 del urldata[urlid]
137 time.sleep(1)
138 continue
139 logger.info("Closing Thread '%s'", url)
140
141
142if __name__ == "__main__":
143 parser = argparse.ArgumentParser(description=__doc__,formatter_class=argparse.RawDescriptionHelpFormatter)
144 parser.add_argument('--host', dest='host', default=DEFAULT_HOST, help='Listen to IP [default: %s]' % DEFAULT_HOST)
145 parser.add_argument('--port', dest='port', default=DEFAULT_PORT, type=int, help='Listen to PORT [default: %s]' % DEFAULT_PORT)
146 parser.add_argument('--stream-cfg', dest='stream_cfg', default=DEFAULT_CONFIG, help='YAML Stream configuration [default: %s]' % DEFAULT_CONFIG)
147 parser.add_argument('--document-root', dest='document_root', default=DEFAULT_DOCUMENTROOT, help='Document Root File Directory [default: %s]' % DEFAULT_DOCUMENTROOT)
148 args = parser.parse_args()
149
150 document_root = os.path.abspath(os.path.join(os.path.dirname(__file__),args.document_root))
151 logging.info("Serving '/htdocs' from document_root '%s'", document_root)
152
153 # Inport streams
154 streams = DEFAULT_STREAMS
155 try:
156 import yaml
157 streams = yaml.load(open(args.stream_cfg))
158 except (ImportError, IOError) as e:
159 logging.warning("Stream config file '%s' not readable or parsable (%s)", args.stream_cfg, e)
160
161
162 # Create the base server
163 try:
164 while True:
165 try:
166 ThreadedTCPServer.allow_reuse_address = True
167 server = ThreadedTCPServer((args.host, args.port), MyTCPHandler)
168 break
169 except IOError, e:
170 logging.warning('For conection %s:%s to become available (%s)', args.host, args.port , e)
171 time.sleep(1)
172 except KeyboardInterrupt:
173 sys.exit(1)
174
175 for urlid, url in streams.iteritems():
176 recv_threads.append(threading.Thread(target=get_data, args=(url,urlid)))
177 recv_threads[-1].setDaemon(True)
178 recv_threads[-1].start()
179
180 # Activate the server; this will keep running until you
181 # interrupt the program with Ctrl-C
182 try:
183 logging.info('Serving at %s:%s', args.host, args.port)
184 server.serve_forever()
185 except KeyboardInterrupt, IOError:
186 logging.info('Shutting down, please wait...')
187 running = False
188 server.shutdown()
189 [thread.join() for thread in recv_threads]
190 logging.info('All done, good bye!')
191
Note: See TracBrowser for help on using the repository browser.