source: py-tcpmultiplexer/TCPMultiplexer.py@ 345

Last change on this file since 345 was 338, checked in by Rick van der Zwet, 13 years ago

Only Serve Stream which is fully started.

  • Property svn:executable set to *
File size: 9.3 KB
Line 
1#!/usr/bin/env python
2"""
3# Multiplexer for HTTP streams, used for broadcast of MPJEG and WAV streams
4#
5# Licence: BSDLike
6# URL: http://rickvanderzwet.nl/svn/personal/py-tcpmultiplexer/
7#
8# Rick van der Zwet <info@rickvanderzwet.nl>
9"""
10
11import SocketServer
12import argparse
13import logging
14import logging.handlers
15import os
16import socket
17import sys
18import threading
19import time
20import urllib2
21
22# Some boring defaults
23DEFAULT_HOST = '0.0.0.0'
24DEFAULT_PORT = 9999
25DEFAULT_CONFIG = 'streams.yaml'
26DEFAULT_DOCUMENTROOT = './htdocs'
27DEFAULT_TIMEOUT = None
28DEFAULT_LOGFILE = 'py-tcpmultiplexer.log'
29
30
31# URL : TARGET
32DEFAULT_STREAMS = {
33# '/cam1/video' : 'http://172.16.0.67:8080/videofeed',
34# '/cam1/audio' : 'http://172.16.0.67:8080/audio.wav',
35 }
36
37
38# Global variables used as ring-buffers or shared-storage
39urlheader = dict()
40dataheader = dict()
41urldata = dict()
42stream_running = dict()
43running = True
44recv_threads = []
45document_root = None
46recv_timeout = None
47
48logger = logging.getLogger(__name__)
49logger.setLevel(logging.INFO)
50formatter = logging.Formatter('%(asctime)s - %(threadName)s - %(levelname)s - %(message)s')
51
52ch = logging.StreamHandler()
53ch.setFormatter(formatter)
54
55logger.addHandler(ch)
56
57class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
58 pass
59
60
61class MyTCPHandler(SocketServer.BaseRequestHandler):
62 """
63 The RequestHandler class for our server.
64
65 It is instantiated once per connection to the server, and must
66 override the handle() method to implement communication to the
67 client.
68 """
69
70 def handle(self):
71 global running, dataheader, document_root, stream_running, urlheader, urldata
72 # self.request is the TCP socket connected to the client
73 self.data = self.request.recv(1024).strip()
74 urlid = self.data.split('\n')[0].split()[1]
75 req_type = self.data.split('\n')[0].split()[0]
76 # XXX: Check for .. paths
77 document_file = document_root + urlid
78 logging.info("Connection %s from '%s' for '%s'", req_type, self.client_address[0], urlid)
79 try:
80 if urlid == '/':
81 self.request.send("HTTP/1.1 200 OK\n\n<html><head><title>Overview</title></head><body><h3>Streams</h3><ul>")
82 for urlid in filter(lambda x: stream_running[x], stream_running.keys()):
83 self.request.send('<li><a href="%(urlid)s">%(urlid)s</a></li>' % {'urlid': urlid})
84 self.request.send("</ul><h3>Files</h3><ul>")
85 for root, dirs, files in os.walk(document_root):
86 # Please do not show any hidden files or directories
87 [dirs.remove(name) for name in dirs if name.startswith('.')]
88 [files.remove(name) for name in files if name.startswith('.')]
89 for name in sorted(files):
90 self.request.send('<li><a href="%(item)s">%(item)s</li>' % { 'item' : os.path.join(root.replace(document_root,'',1),name) })
91 self.request.send("</ul></body></html>")
92 elif stream_running[urlid]:
93 self.request.send('HTTP/1.1 200 OK\n' + urlheader[urlid] + 'X-Proxy-Server: TCPMultiplexer.py\n' + '\n' + dataheader[urlid])
94 if req_type == 'HEAD':
95 return
96 elif req_type == 'GET':
97 urldata[urlid][self] = []
98 while running:
99 if len(urldata[urlid][self]) == 0:
100 time.sleep(0.1)
101 continue
102 self.request.send(urldata[urlid][self].pop(0))
103 elif os.path.isfile(document_file):
104 data = open(document_file,'r').read()
105 self.request.send('HTTP/1.1 200 OK\nContent-Length: %s\n\n%s' % (len(data),data))
106 else:
107 self.request.send("HTTP/1.1 404 NOT FOUND\nContent-Type: text/html\n\n<h1>404</h1>The page '%s' does not exists" % urlid)
108 except IOError:
109 logging.info("Connection closed from '%s'", self.client_address[0])
110 del urldata[urlid][self]
111
112
113
114def get_data(url, urlid, *args):
115 """ Fetch the DATA from the WAV or MJPEG Stream """
116 global running, dataheader, urlheader, urldata, stream_running, recv_timeout
117 # Fill buffers if needed
118 logger.info("Starting thread '%s' (%s)", url, urlid)
119 while running:
120 stream_running[urlid] = False
121 urlheader[urlid] = None
122 dataheader[urlid] = None
123 urldata[urlid] = None
124 try:
125 f = urllib2.urlopen(url,timeout=recv_timeout)
126 urlheader[urlid] = ''.join(f.info().headers)
127 urldata[urlid] = dict()
128
129 # Find datatype
130 datatype = None
131 urltype = f.info().gettype()
132 logger.info("url.info().gettype() is '%s'", urltype)
133 if urltype == 'audio/x-wav':
134 datatype = 'wav'
135 elif urltype == 'multipart/x-mixed-replace':
136 datatype = 'mjpeg'
137
138 # Be verbose to the user
139 if datatype:
140 logger.info("'%s' Identified as %s", url, datatype)
141 else:
142 logger.warning("Cannot find file type of '%s'", url)
143
144 # Set the correct data header
145 if datatype == 'wav':
146 # WAV header
147 dataheader[urlid] = f.read(44)
148 elif datatype == 'mjpeg':
149 data = f.read(1024)
150
151 # Get the required headers and acurate datasize
152 headers = []
153 datasize = 1024 * 1024
154 for header in data.splitlines():
155 if not header.strip():
156 # Newlines in the beginning are evil
157 if headers:
158 break
159 else:
160 continue
161 if header.startswith('DataLen:') or header.startswith('Content-length:'):
162 datasize = int(header.split(':')[1])
163 headers.append(header)
164 boundry = headers[0]
165
166 logger.info("Data Length: %s", datasize)
167 logger.info("Boundry line: %s", boundry)
168 logger.info("Image headers %s", headers)
169
170 data = data + f.read(datasize * 2)
171 valid_image = boundry + data.split(boundry)[1]
172 dataheader[urlid] = valid_image + '\n'.join(headers) + '\n'
173 else:
174 dataheader[urlid] = ''
175 logger.info("Using dataheader of length %s", len(dataheader[urlid]))
176
177 # Main data loader
178 logger.info("Stream ready to serve")
179 stream_running[urlid] = True
180 recv_buffer_size = 1024 * 8
181 while running:
182 data = f.read(recv_buffer_size)
183 if not len(data) == recv_buffer_size:
184 raise IOError("Connection corrupted, got '%s' instead of '%s'" % (len(data), recv_buffer_size))
185 logger.debug("Received data chunk with length: %s", len(data))
186 for key in urldata[urlid].keys():
187 urldata[urlid][key].append(data)
188 except (urllib2.URLError, IOError) as e:
189 #Enforce a connection reset
190 logger.warning("URL reset '%s' (%s)", url, e)
191 stream_running[urlid] = False
192 time.sleep(1)
193 pass
194 logger.info("Closing Thread '%s'", url)
195
196
197if __name__ == "__main__":
198 parser = argparse.ArgumentParser(description=__doc__,formatter_class=argparse.RawDescriptionHelpFormatter)
199 parser.add_argument('--host', dest='host', default=DEFAULT_HOST, help='Listen to IP [default: %s]' % DEFAULT_HOST)
200 parser.add_argument('--port', dest='port', default=DEFAULT_PORT, type=int, help='Listen to PORT [default: %s]' % DEFAULT_PORT)
201 parser.add_argument('--logfile', dest='logfile', default=DEFAULT_LOGFILE, help='File to write logfiles to [default: %s]' % DEFAULT_LOGFILE)
202 parser.add_argument('--timeout', dest='timeout', default=DEFAULT_TIMEOUT, type=int, help='Default socket timeout [default: %s]' % DEFAULT_TIMEOUT)
203 parser.add_argument('--stream-cfg', dest='stream_cfg', default=DEFAULT_CONFIG, help='YAML Stream configuration [default: %s]' % DEFAULT_CONFIG)
204 parser.add_argument('--document-root', dest='document_root', default=DEFAULT_DOCUMENTROOT, help='Document Root File Directory [default: %s]' % DEFAULT_DOCUMENTROOT)
205 args = parser.parse_args()
206
207 # Add file logger
208 ch = logging.handlers.WatchedFileHandler(args.logfile)
209 ch.setFormatter(formatter)
210 logger.addHandler(ch)
211
212 document_root = os.path.abspath(os.path.join(os.path.dirname(__file__),args.document_root))
213 logger.info("Serving '/htdocs' from document_root '%s'", document_root)
214
215 # Set the timeout
216 logger.info("Changing socket timeout from '%s' to '%s'", socket.getdefaulttimeout(), args.timeout)
217 recv_timeout = args.timeout
218
219
220 # Inport streams
221 streams = DEFAULT_STREAMS
222 try:
223 import yaml
224 streams.update(yaml.load(open(args.stream_cfg)))
225 except (ImportError, IOError) as e:
226 logger.warning("Stream config file '%s' not readable or parsable (%s)", args.stream_cfg, e)
227
228
229 # Create the base server
230 try:
231 while True:
232 try:
233 ThreadedTCPServer.allow_reuse_address = True
234 server = ThreadedTCPServer((args.host, args.port), MyTCPHandler)
235 break
236 except IOError, e:
237 logger.warning('For conection %s:%s to become available (%s)', args.host, args.port , e)
238 time.sleep(1)
239 except KeyboardInterrupt:
240 sys.exit(1)
241
242 for urlid, url in streams.iteritems():
243 recv_threads.append(threading.Thread(name=urlid, target=get_data, args=(url,urlid)))
244 recv_threads[-1].setDaemon(True)
245 recv_threads[-1].start()
246
247 # Activate the server; this will keep running until you
248 # interrupt the program with Ctrl-C
249 try:
250 logger.info('Serving at %s:%s', args.host, args.port)
251 server.serve_forever()
252 except KeyboardInterrupt, IOError:
253 logger.info('Shutting down, please wait...')
254 running = False
255 server.shutdown()
256 [thread.join() for thread in recv_threads]
257 logger.info('All done, good bye!')
258
Note: See TracBrowser for help on using the repository browser.