source: py-tcpmultiplexer/TCPMultiplexer.py@ 323

Last change on this file since 323 was 323, checked in by Rick van der Zwet, 13 years ago

Get Socket Timeout to make sure to continue quickly.

  • Property svn:executable set to *
File size: 8.1 KB
RevLine 
[311]1#!/usr/bin/env python
[313]2"""
[311]3# Multiplexer for HTTP streams, used for broadcast of MPJEG and WAV streams
4#
5# Licence: BSDLike
[317]6# URL: http://rickvanderzwet.nl/svn/personal/py-tcpmultiplexer/
[311]7#
8# Rick van der Zwet <info@rickvanderzwet.nl>
[313]9"""
10
[311]11import SocketServer
[312]12import argparse
[311]13import logging
[323]14import os
15import socket
[311]16import sys
[312]17import threading
18import time
19import urllib2
[311]20
[312]21# Some boring defaults
22DEFAULT_HOST = '0.0.0.0'
[313]23DEFAULT_PORT = 9999
24DEFAULT_CONFIG = 'streams.yaml'
[314]25DEFAULT_DOCUMENTROOT = './htdocs'
[323]26DEFAULT_TIMEOUT = 2
[312]27
[314]28
[313]29# URL : TARGET
30DEFAULT_STREAMS = {
31 '/cam1/video' : 'http://172.16.0.67:8080/videofeed',
32 '/cam1/audio' : 'http://172.16.0.67:8080/audio.wav',
33 }
34
[323]35
[312]36# Global variables used as ring-buffers or shared-storage
37urlheader = dict()
38dataheader = dict()
[311]39urldata = dict()
40running = True
41recv_threads = []
[314]42document_root = None
[311]43
44logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
45
46class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
47 pass
48
49
50class MyTCPHandler(SocketServer.BaseRequestHandler):
51 """
52 The RequestHandler class for our server.
53
54 It is instantiated once per connection to the server, and must
55 override the handle() method to implement communication to the
56 client.
57 """
58
59 def handle(self):
[314]60 global running, dataheader, document_root, urlheader, urldata
[311]61 # self.request is the TCP socket connected to the client
62 self.data = self.request.recv(1024).strip()
63 urlid = self.data.split('\n')[0].split()[1]
64 req_type = self.data.split('\n')[0].split()[0]
[314]65 # XXX: Check for .. paths
66 document_file = document_root + urlid
[311]67 logging.info("Connection %s from '%s' for '%s'", req_type, self.client_address[0], urlid)
68 try:
[314]69 if urlid == '/':
70 self.request.send("HTTP/1.1 200 OK\n\n<html><head><title>Overview</title></head><body><h3>Streams</h3><ul>")
[322]71 for urlid in sorted(urldata.keys()):
[314]72 self.request.send('<li><a href="%(urlid)s">%(urlid)s</a></li>' % {'urlid': urlid})
73 self.request.send("</ul><h3>Files</h3><ul>")
74 for root, dirs, files in os.walk(document_root):
[318]75 # Please do not show any hidden files or directories
76 [dirs.remove(name) for name in dirs if name.startswith('.')]
77 [files.remove(name) for name in files if name.startswith('.')]
[322]78 for name in sorted(files):
[314]79 self.request.send('<li><a href="%(item)s">%(item)s</li>' % { 'item' : os.path.join(root.replace(document_root,'',1),name) })
80 self.request.send("</ul></body></html>")
81 elif urldata.has_key(urlid):
[312]82 self.request.send('HTTP/1.1 200 OK\n' + urlheader[urlid] + 'X-Proxy-Server: TCPMultiplexer.py\n' + '\n' + dataheader[urlid])
83 if req_type == 'HEAD':
84 return
85 elif req_type == 'GET':
86 urldata[urlid][self] = []
87 while running:
88 if len(urldata[urlid][self]) == 0:
89 time.sleep(0.1)
90 continue
91 self.request.send(urldata[urlid][self].pop(0))
[314]92 elif os.path.isfile(document_file):
93 self.request.send('HTTP/1.1 200 OK\n\n' + open(document_file,'r').read())
94 else:
95 self.request.send("HTTP/1.1 404 NOT FOUND\nContent-Type: text/html\n\n<h1>404</h1>The page '%s' does not exists" % urlid)
[311]96 except IOError:
[322]97 logging.info("Connection closed from '%s'", self.client_address[0])
[311]98 del urldata[urlid][self]
99
[323]100
101
[311]102def get_data(url, urlid, *args):
[323]103 """ Fetch the DATA from the WAV or MJPEG Stream """
[312]104 global running, dataheader, urlheader, urldata
105 logger = logging.getLogger('recv_id=' + urlid)
[311]106 # Fill buffers if needed
[312]107 logger.info("Starting thread '%s' (%s)", url, urlid)
[311]108 while running:
[323]109 urlheader[urlid] = None
110 dataheader[urlid] = None
111 urldata[urlid] = None
[311]112 try:
113 f = urllib2.urlopen(url)
[312]114 urlheader[urlid] = ''.join(f.info().headers)
[311]115 urldata[urlid] = dict()
[323]116
117 # Find datatype
[312]118 datatype = None
119 urltype = f.info().gettype()
120 logger.info("url.info().gettype() is '%s'", urltype)
121 if urltype == 'audio/x-wav':
122 datatype = 'wav'
123 elif urltype == 'multipart/x-mixed-replace':
[320]124 datatype = 'mjpeg'
[312]125
[323]126 # Be verbose to the user
[312]127 if datatype:
128 logger.info("'%s' Identified as %s", url, datatype)
[323]129 else:
130 logger.warning("Cannot find file type of '%s'", url)
[312]131
[323]132 # Set the correct data header
[312]133 if datatype == 'wav':
[318]134 # WAV header
[312]135 dataheader[urlid] = f.read(44)
[320]136 elif datatype == 'mjpeg':
137 data = f.read(1024 * 1024)
138
139 # Get the required headers
140 headers = []
141 for header in data.splitlines():
[322]142 if not header.strip():
143 # Newlines in the beginning are evil
144 if headers:
145 break
146 else:
147 continue
[320]148 headers.append(header)
149 boundry = headers[0]
150 logger.info("Boundry line: %s", boundry)
[318]151 logger.info("Image headers %s", headers)
[320]152
153 valid_image = boundry + data.split(boundry)[1]
154 dataheader[urlid] = valid_image + '\n'.join(headers) + '\n'
[312]155 else:
156 dataheader[urlid] = ''
157
[318]158 logger.info("Using dataheader of length %s", len(dataheader[urlid]))
159
[311]160 while running:
161 data = f.read(10000)
[323]162 # logger.debug("Received data chunk with length: %s", len(data))
[311]163 for key in urldata[urlid].keys():
164 urldata[urlid][key].append(data)
[312]165 except IOError, e:
[311]166 #Enforce a connection reset
[312]167 logger.warning("URL reset '%s' (%s)", url, e)
168 del urlheader[urlid]
169 del dataheader[urlid]
170 del urldata[urlid]
[311]171 time.sleep(1)
172 continue
[312]173 logger.info("Closing Thread '%s'", url)
[311]174
175
176if __name__ == "__main__":
[313]177 parser = argparse.ArgumentParser(description=__doc__,formatter_class=argparse.RawDescriptionHelpFormatter)
178 parser.add_argument('--host', dest='host', default=DEFAULT_HOST, help='Listen to IP [default: %s]' % DEFAULT_HOST)
179 parser.add_argument('--port', dest='port', default=DEFAULT_PORT, type=int, help='Listen to PORT [default: %s]' % DEFAULT_PORT)
[323]180 parser.add_argument('--timeout', dest='timeout', default=DEFAULT_TIMEOUT, type=int, help='Default socket timeout [default: %s]' % DEFAULT_TIMEOUT)
[313]181 parser.add_argument('--stream-cfg', dest='stream_cfg', default=DEFAULT_CONFIG, help='YAML Stream configuration [default: %s]' % DEFAULT_CONFIG)
[314]182 parser.add_argument('--document-root', dest='document_root', default=DEFAULT_DOCUMENTROOT, help='Document Root File Directory [default: %s]' % DEFAULT_DOCUMENTROOT)
[313]183 args = parser.parse_args()
[312]184
[314]185 document_root = os.path.abspath(os.path.join(os.path.dirname(__file__),args.document_root))
186 logging.info("Serving '/htdocs' from document_root '%s'", document_root)
187
[323]188 # Set the timeout
189 socket.setdefaulttimeout(args.timeout)
190
[313]191 # Inport streams
192 streams = DEFAULT_STREAMS
193 try:
194 import yaml
[318]195 streams.update(yaml.load(open(args.stream_cfg)))
[313]196 except (ImportError, IOError) as e:
197 logging.warning("Stream config file '%s' not readable or parsable (%s)", args.stream_cfg, e)
[312]198
199
[313]200 # Create the base server
[311]201 try:
202 while True:
203 try:
204 ThreadedTCPServer.allow_reuse_address = True
[313]205 server = ThreadedTCPServer((args.host, args.port), MyTCPHandler)
[311]206 break
207 except IOError, e:
[313]208 logging.warning('For conection %s:%s to become available (%s)', args.host, args.port , e)
[311]209 time.sleep(1)
210 except KeyboardInterrupt:
211 sys.exit(1)
212
[312]213 for urlid, url in streams.iteritems():
214 recv_threads.append(threading.Thread(target=get_data, args=(url,urlid)))
215 recv_threads[-1].setDaemon(True)
216 recv_threads[-1].start()
[311]217
218 # Activate the server; this will keep running until you
219 # interrupt the program with Ctrl-C
220 try:
[313]221 logging.info('Serving at %s:%s', args.host, args.port)
[311]222 server.serve_forever()
223 except KeyboardInterrupt, IOError:
224 logging.info('Shutting down, please wait...')
225 running = False
226 server.shutdown()
227 [thread.join() for thread in recv_threads]
228 logging.info('All done, good bye!')
229
Note: See TracBrowser for help on using the repository browser.