source: py-tcpmultiplexer/TCPMultiplexer.py@ 329

Last change on this file since 329 was 325, checked in by Rick van der Zwet, 13 years ago

Acive flags & some doc detiasl

  • Property svn:executable set to *
File size: 8.9 KB
Line 
1#!/usr/bin/env python
2"""
3# Multiplexer for HTTP streams, used for broadcast of MPJEG and WAV streams
4#
5# Licence: BSDLike
6# URL: http://rickvanderzwet.nl/svn/personal/py-tcpmultiplexer/
7#
8# Rick van der Zwet <info@rickvanderzwet.nl>
9"""
10
11import SocketServer
12import argparse
13import logging
14import os
15import socket
16import sys
17import threading
18import time
19import urllib2
20
21# Some boring defaults
22DEFAULT_HOST = '0.0.0.0'
23DEFAULT_PORT = 9999
24DEFAULT_CONFIG = 'streams.yaml'
25DEFAULT_DOCUMENTROOT = './htdocs'
26DEFAULT_TIMEOUT = None
27
28
29# URL : TARGET
30DEFAULT_STREAMS = {
31# '/cam1/video' : 'http://172.16.0.67:8080/videofeed',
32# '/cam1/audio' : 'http://172.16.0.67:8080/audio.wav',
33 }
34
35
36# Global variables used as ring-buffers or shared-storage
37urlheader = dict()
38dataheader = dict()
39urldata = dict()
40stream_running = dict()
41running = True
42recv_threads = []
43document_root = None
44recv_timeout = None
45
46logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
47
48class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
49 pass
50
51
52class MyTCPHandler(SocketServer.BaseRequestHandler):
53 """
54 The RequestHandler class for our server.
55
56 It is instantiated once per connection to the server, and must
57 override the handle() method to implement communication to the
58 client.
59 """
60
61 def handle(self):
62 global running, dataheader, document_root, stream_running, urlheader, urldata
63 # self.request is the TCP socket connected to the client
64 self.data = self.request.recv(1024).strip()
65 urlid = self.data.split('\n')[0].split()[1]
66 req_type = self.data.split('\n')[0].split()[0]
67 # XXX: Check for .. paths
68 document_file = document_root + urlid
69 logging.info("Connection %s from '%s' for '%s'", req_type, self.client_address[0], urlid)
70 try:
71 if urlid == '/':
72 self.request.send("HTTP/1.1 200 OK\n\n<html><head><title>Overview</title></head><body><h3>Streams</h3><ul>")
73 for urlid in filter(lambda x: stream_running[x], stream_running.keys()):
74 self.request.send('<li><a href="%(urlid)s">%(urlid)s</a></li>' % {'urlid': urlid})
75 self.request.send("</ul><h3>Files</h3><ul>")
76 for root, dirs, files in os.walk(document_root):
77 # Please do not show any hidden files or directories
78 [dirs.remove(name) for name in dirs if name.startswith('.')]
79 [files.remove(name) for name in files if name.startswith('.')]
80 for name in sorted(files):
81 self.request.send('<li><a href="%(item)s">%(item)s</li>' % { 'item' : os.path.join(root.replace(document_root,'',1),name) })
82 self.request.send("</ul></body></html>")
83 elif urldata.has_key(urlid):
84 self.request.send('HTTP/1.1 200 OK\n' + urlheader[urlid] + 'X-Proxy-Server: TCPMultiplexer.py\n' + '\n' + dataheader[urlid])
85 if req_type == 'HEAD':
86 return
87 elif req_type == 'GET':
88 urldata[urlid][self] = []
89 while running:
90 if len(urldata[urlid][self]) == 0:
91 time.sleep(0.1)
92 continue
93 self.request.send(urldata[urlid][self].pop(0))
94 elif os.path.isfile(document_file):
95 data = open(document_file,'r').read()
96 self.request.send('HTTP/1.1 200 OK\nContent-Length: %s\n\n%s' % (len(data),data))
97 else:
98 self.request.send("HTTP/1.1 404 NOT FOUND\nContent-Type: text/html\n\n<h1>404</h1>The page '%s' does not exists" % urlid)
99 except IOError:
100 logging.info("Connection closed from '%s'", self.client_address[0])
101 del urldata[urlid][self]
102
103
104
105def get_data(url, urlid, *args):
106 """ Fetch the DATA from the WAV or MJPEG Stream """
107 global running, dataheader, urlheader, urldata, stream_running, recv_timeout
108 logger = logging.getLogger('recv_id=' + urlid)
109 # Fill buffers if needed
110 logger.info("Starting thread '%s' (%s)", url, urlid)
111 while running:
112 stream_running[urlid] = False
113 urlheader[urlid] = None
114 dataheader[urlid] = None
115 urldata[urlid] = None
116 try:
117 f = urllib2.urlopen(url,timeout=recv_timeout)
118 urlheader[urlid] = ''.join(f.info().headers)
119 urldata[urlid] = dict()
120
121 # Find datatype
122 datatype = None
123 urltype = f.info().gettype()
124 logger.info("url.info().gettype() is '%s'", urltype)
125 if urltype == 'audio/x-wav':
126 datatype = 'wav'
127 elif urltype == 'multipart/x-mixed-replace':
128 datatype = 'mjpeg'
129
130 # Be verbose to the user
131 if datatype:
132 logger.info("'%s' Identified as %s", url, datatype)
133 else:
134 logger.warning("Cannot find file type of '%s'", url)
135
136 # Set the correct data header
137 if datatype == 'wav':
138 # WAV header
139 dataheader[urlid] = f.read(44)
140 elif datatype == 'mjpeg':
141 data = f.read(1024)
142
143 # Get the required headers and acurate datasize
144 headers = []
145 datasize = 1024 * 1024
146 for header in data.splitlines():
147 if not header.strip():
148 # Newlines in the beginning are evil
149 if headers:
150 break
151 else:
152 continue
153 if header.startswith('DataLen:') or header.startswith('Content-length:'):
154 datasize = int(header.split(':')[1])
155 headers.append(header)
156 boundry = headers[0]
157
158 logger.info("Data Length: %s", datasize)
159 logger.info("Boundry line: %s", boundry)
160 logger.info("Image headers %s", headers)
161
162 data = data + f.read(datasize * 2)
163 valid_image = boundry + data.split(boundry)[1]
164 dataheader[urlid] = valid_image + '\n'.join(headers) + '\n'
165 else:
166 dataheader[urlid] = ''
167 logger.info("Using dataheader of length %s", len(dataheader[urlid]))
168
169 # Main data loader
170 logger.info("Stream ready to serve")
171 stream_running[urlid] = True
172 recv_buffer_size = 1024 * 8
173 while running:
174 data = f.read(recv_buffer_size)
175 if not len(data) == recv_buffer_size:
176 raise IOError("Connection corrupted, got '%s' instead of '%s'" % (len(data), recv_buffer_size))
177 logger.debug("Received data chunk with length: %s", len(data))
178 for key in urldata[urlid].keys():
179 urldata[urlid][key].append(data)
180 except IOError, e:
181 #Enforce a connection reset
182 logger.warning("URL reset '%s' (%s)", url, e)
183 stream_running[urlid] = False
184 time.sleep(1)
185 pass
186 logger.info("Closing Thread '%s'", url)
187
188
189if __name__ == "__main__":
190 parser = argparse.ArgumentParser(description=__doc__,formatter_class=argparse.RawDescriptionHelpFormatter)
191 parser.add_argument('--host', dest='host', default=DEFAULT_HOST, help='Listen to IP [default: %s]' % DEFAULT_HOST)
192 parser.add_argument('--port', dest='port', default=DEFAULT_PORT, type=int, help='Listen to PORT [default: %s]' % DEFAULT_PORT)
193 parser.add_argument('--timeout', dest='timeout', default=DEFAULT_TIMEOUT, type=int, help='Default socket timeout [default: %s]' % DEFAULT_TIMEOUT)
194 parser.add_argument('--stream-cfg', dest='stream_cfg', default=DEFAULT_CONFIG, help='YAML Stream configuration [default: %s]' % DEFAULT_CONFIG)
195 parser.add_argument('--document-root', dest='document_root', default=DEFAULT_DOCUMENTROOT, help='Document Root File Directory [default: %s]' % DEFAULT_DOCUMENTROOT)
196 args = parser.parse_args()
197
198 document_root = os.path.abspath(os.path.join(os.path.dirname(__file__),args.document_root))
199 logging.info("Serving '/htdocs' from document_root '%s'", document_root)
200
201 # Set the timeout
202 logging.info("Changing socket timeout from '%s' to '%s'", socket.getdefaulttimeout(), args.timeout)
203 recv_timeout = args.timeout
204
205 # Inport streams
206 streams = DEFAULT_STREAMS
207 try:
208 import yaml
209 streams.update(yaml.load(open(args.stream_cfg)))
210 except (ImportError, IOError) as e:
211 logging.warning("Stream config file '%s' not readable or parsable (%s)", args.stream_cfg, e)
212
213
214 # Create the base server
215 try:
216 while True:
217 try:
218 ThreadedTCPServer.allow_reuse_address = True
219 server = ThreadedTCPServer((args.host, args.port), MyTCPHandler)
220 break
221 except IOError, e:
222 logging.warning('For conection %s:%s to become available (%s)', args.host, args.port , e)
223 time.sleep(1)
224 except KeyboardInterrupt:
225 sys.exit(1)
226
227 for urlid, url in streams.iteritems():
228 recv_threads.append(threading.Thread(target=get_data, args=(url,urlid)))
229 recv_threads[-1].setDaemon(True)
230 recv_threads[-1].start()
231
232 # Activate the server; this will keep running until you
233 # interrupt the program with Ctrl-C
234 try:
235 logging.info('Serving at %s:%s', args.host, args.port)
236 server.serve_forever()
237 except KeyboardInterrupt, IOError:
238 logging.info('Shutting down, please wait...')
239 running = False
240 server.shutdown()
241 [thread.join() for thread in recv_threads]
242 logging.info('All done, good bye!')
243
Note: See TracBrowser for help on using the repository browser.