source: py-tcpmultiplexer/TCPMultiplexer.py@ 323

Last change on this file since 323 was 323, checked in by Rick van der Zwet, 13 years ago

Get Socket Timeout to make sure to continue quickly.

  • Property svn:executable set to *
File size: 8.1 KB
Line 
1#!/usr/bin/env python
2"""
3# Multiplexer for HTTP streams, used for broadcast of MPJEG and WAV streams
4#
5# Licence: BSDLike
6# URL: http://rickvanderzwet.nl/svn/personal/py-tcpmultiplexer/
7#
8# Rick van der Zwet <info@rickvanderzwet.nl>
9"""
10
11import SocketServer
12import argparse
13import logging
14import os
15import socket
16import sys
17import threading
18import time
19import urllib2
20
21# Some boring defaults
22DEFAULT_HOST = '0.0.0.0'
23DEFAULT_PORT = 9999
24DEFAULT_CONFIG = 'streams.yaml'
25DEFAULT_DOCUMENTROOT = './htdocs'
26DEFAULT_TIMEOUT = 2
27
28
29# URL : TARGET
30DEFAULT_STREAMS = {
31 '/cam1/video' : 'http://172.16.0.67:8080/videofeed',
32 '/cam1/audio' : 'http://172.16.0.67:8080/audio.wav',
33 }
34
35
36# Global variables used as ring-buffers or shared-storage
37urlheader = dict()
38dataheader = dict()
39urldata = dict()
40running = True
41recv_threads = []
42document_root = None
43
44logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
45
46class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
47 pass
48
49
50class MyTCPHandler(SocketServer.BaseRequestHandler):
51 """
52 The RequestHandler class for our server.
53
54 It is instantiated once per connection to the server, and must
55 override the handle() method to implement communication to the
56 client.
57 """
58
59 def handle(self):
60 global running, dataheader, document_root, urlheader, urldata
61 # self.request is the TCP socket connected to the client
62 self.data = self.request.recv(1024).strip()
63 urlid = self.data.split('\n')[0].split()[1]
64 req_type = self.data.split('\n')[0].split()[0]
65 # XXX: Check for .. paths
66 document_file = document_root + urlid
67 logging.info("Connection %s from '%s' for '%s'", req_type, self.client_address[0], urlid)
68 try:
69 if urlid == '/':
70 self.request.send("HTTP/1.1 200 OK\n\n<html><head><title>Overview</title></head><body><h3>Streams</h3><ul>")
71 for urlid in sorted(urldata.keys()):
72 self.request.send('<li><a href="%(urlid)s">%(urlid)s</a></li>' % {'urlid': urlid})
73 self.request.send("</ul><h3>Files</h3><ul>")
74 for root, dirs, files in os.walk(document_root):
75 # Please do not show any hidden files or directories
76 [dirs.remove(name) for name in dirs if name.startswith('.')]
77 [files.remove(name) for name in files if name.startswith('.')]
78 for name in sorted(files):
79 self.request.send('<li><a href="%(item)s">%(item)s</li>' % { 'item' : os.path.join(root.replace(document_root,'',1),name) })
80 self.request.send("</ul></body></html>")
81 elif urldata.has_key(urlid):
82 self.request.send('HTTP/1.1 200 OK\n' + urlheader[urlid] + 'X-Proxy-Server: TCPMultiplexer.py\n' + '\n' + dataheader[urlid])
83 if req_type == 'HEAD':
84 return
85 elif req_type == 'GET':
86 urldata[urlid][self] = []
87 while running:
88 if len(urldata[urlid][self]) == 0:
89 time.sleep(0.1)
90 continue
91 self.request.send(urldata[urlid][self].pop(0))
92 elif os.path.isfile(document_file):
93 self.request.send('HTTP/1.1 200 OK\n\n' + open(document_file,'r').read())
94 else:
95 self.request.send("HTTP/1.1 404 NOT FOUND\nContent-Type: text/html\n\n<h1>404</h1>The page '%s' does not exists" % urlid)
96 except IOError:
97 logging.info("Connection closed from '%s'", self.client_address[0])
98 del urldata[urlid][self]
99
100
101
102def get_data(url, urlid, *args):
103 """ Fetch the DATA from the WAV or MJPEG Stream """
104 global running, dataheader, urlheader, urldata
105 logger = logging.getLogger('recv_id=' + urlid)
106 # Fill buffers if needed
107 logger.info("Starting thread '%s' (%s)", url, urlid)
108 while running:
109 urlheader[urlid] = None
110 dataheader[urlid] = None
111 urldata[urlid] = None
112 try:
113 f = urllib2.urlopen(url)
114 urlheader[urlid] = ''.join(f.info().headers)
115 urldata[urlid] = dict()
116
117 # Find datatype
118 datatype = None
119 urltype = f.info().gettype()
120 logger.info("url.info().gettype() is '%s'", urltype)
121 if urltype == 'audio/x-wav':
122 datatype = 'wav'
123 elif urltype == 'multipart/x-mixed-replace':
124 datatype = 'mjpeg'
125
126 # Be verbose to the user
127 if datatype:
128 logger.info("'%s' Identified as %s", url, datatype)
129 else:
130 logger.warning("Cannot find file type of '%s'", url)
131
132 # Set the correct data header
133 if datatype == 'wav':
134 # WAV header
135 dataheader[urlid] = f.read(44)
136 elif datatype == 'mjpeg':
137 data = f.read(1024 * 1024)
138
139 # Get the required headers
140 headers = []
141 for header in data.splitlines():
142 if not header.strip():
143 # Newlines in the beginning are evil
144 if headers:
145 break
146 else:
147 continue
148 headers.append(header)
149 boundry = headers[0]
150 logger.info("Boundry line: %s", boundry)
151 logger.info("Image headers %s", headers)
152
153 valid_image = boundry + data.split(boundry)[1]
154 dataheader[urlid] = valid_image + '\n'.join(headers) + '\n'
155 else:
156 dataheader[urlid] = ''
157
158 logger.info("Using dataheader of length %s", len(dataheader[urlid]))
159
160 while running:
161 data = f.read(10000)
162 # logger.debug("Received data chunk with length: %s", len(data))
163 for key in urldata[urlid].keys():
164 urldata[urlid][key].append(data)
165 except IOError, e:
166 #Enforce a connection reset
167 logger.warning("URL reset '%s' (%s)", url, e)
168 del urlheader[urlid]
169 del dataheader[urlid]
170 del urldata[urlid]
171 time.sleep(1)
172 continue
173 logger.info("Closing Thread '%s'", url)
174
175
176if __name__ == "__main__":
177 parser = argparse.ArgumentParser(description=__doc__,formatter_class=argparse.RawDescriptionHelpFormatter)
178 parser.add_argument('--host', dest='host', default=DEFAULT_HOST, help='Listen to IP [default: %s]' % DEFAULT_HOST)
179 parser.add_argument('--port', dest='port', default=DEFAULT_PORT, type=int, help='Listen to PORT [default: %s]' % DEFAULT_PORT)
180 parser.add_argument('--timeout', dest='timeout', default=DEFAULT_TIMEOUT, type=int, help='Default socket timeout [default: %s]' % DEFAULT_TIMEOUT)
181 parser.add_argument('--stream-cfg', dest='stream_cfg', default=DEFAULT_CONFIG, help='YAML Stream configuration [default: %s]' % DEFAULT_CONFIG)
182 parser.add_argument('--document-root', dest='document_root', default=DEFAULT_DOCUMENTROOT, help='Document Root File Directory [default: %s]' % DEFAULT_DOCUMENTROOT)
183 args = parser.parse_args()
184
185 document_root = os.path.abspath(os.path.join(os.path.dirname(__file__),args.document_root))
186 logging.info("Serving '/htdocs' from document_root '%s'", document_root)
187
188 # Set the timeout
189 socket.setdefaulttimeout(args.timeout)
190
191 # Inport streams
192 streams = DEFAULT_STREAMS
193 try:
194 import yaml
195 streams.update(yaml.load(open(args.stream_cfg)))
196 except (ImportError, IOError) as e:
197 logging.warning("Stream config file '%s' not readable or parsable (%s)", args.stream_cfg, e)
198
199
200 # Create the base server
201 try:
202 while True:
203 try:
204 ThreadedTCPServer.allow_reuse_address = True
205 server = ThreadedTCPServer((args.host, args.port), MyTCPHandler)
206 break
207 except IOError, e:
208 logging.warning('For conection %s:%s to become available (%s)', args.host, args.port , e)
209 time.sleep(1)
210 except KeyboardInterrupt:
211 sys.exit(1)
212
213 for urlid, url in streams.iteritems():
214 recv_threads.append(threading.Thread(target=get_data, args=(url,urlid)))
215 recv_threads[-1].setDaemon(True)
216 recv_threads[-1].start()
217
218 # Activate the server; this will keep running until you
219 # interrupt the program with Ctrl-C
220 try:
221 logging.info('Serving at %s:%s', args.host, args.port)
222 server.serve_forever()
223 except KeyboardInterrupt, IOError:
224 logging.info('Shutting down, please wait...')
225 running = False
226 server.shutdown()
227 [thread.join() for thread in recv_threads]
228 logging.info('All done, good bye!')
229
Note: See TracBrowser for help on using the repository browser.