source: py-tcpmultiplexer/TCPMultiplexer.py@ 319

Last change on this file since 319 was 318, checked in by Rick van der Zwet, 13 years ago

First picture needs to valid for stream to be parsed correctly.

  • Property svn:executable set to *
File size: 7.3 KB
RevLine 
[311]1#!/usr/bin/env python
[313]2"""
[311]3# Multiplexer for HTTP streams, used for broadcast of MPJEG and WAV streams
4#
5# Licence: BSDLike
[317]6# URL: http://rickvanderzwet.nl/svn/personal/py-tcpmultiplexer/
[311]7#
8# Rick van der Zwet <info@rickvanderzwet.nl>
[313]9"""
10
[311]11import SocketServer
[312]12import argparse
[311]13import logging
14import sys
[312]15import threading
16import time
17import urllib2
[314]18import os
[311]19
[312]20# Some boring defaults
21DEFAULT_HOST = '0.0.0.0'
[313]22DEFAULT_PORT = 9999
23DEFAULT_CONFIG = 'streams.yaml'
[314]24DEFAULT_DOCUMENTROOT = './htdocs'
[312]25
[314]26
[313]27# URL : TARGET
28DEFAULT_STREAMS = {
29 '/cam1/video' : 'http://172.16.0.67:8080/videofeed',
30 '/cam1/audio' : 'http://172.16.0.67:8080/audio.wav',
31 }
32
[312]33# Global variables used as ring-buffers or shared-storage
34urlheader = dict()
35dataheader = dict()
[311]36urldata = dict()
37running = True
38recv_threads = []
[314]39document_root = None
[311]40
41logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
42
43class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
44 pass
45
46
47class MyTCPHandler(SocketServer.BaseRequestHandler):
48 """
49 The RequestHandler class for our server.
50
51 It is instantiated once per connection to the server, and must
52 override the handle() method to implement communication to the
53 client.
54 """
55
56 def handle(self):
[314]57 global running, dataheader, document_root, urlheader, urldata
[311]58 # self.request is the TCP socket connected to the client
59 self.data = self.request.recv(1024).strip()
60 urlid = self.data.split('\n')[0].split()[1]
61 req_type = self.data.split('\n')[0].split()[0]
[314]62 # XXX: Check for .. paths
63 document_file = document_root + urlid
[311]64 logging.info("Connection %s from '%s' for '%s'", req_type, self.client_address[0], urlid)
65 try:
[314]66 if urlid == '/':
67 self.request.send("HTTP/1.1 200 OK\n\n<html><head><title>Overview</title></head><body><h3>Streams</h3><ul>")
68 for urlid in urldata.keys():
69 self.request.send('<li><a href="%(urlid)s">%(urlid)s</a></li>' % {'urlid': urlid})
70 self.request.send("</ul><h3>Files</h3><ul>")
71 for root, dirs, files in os.walk(document_root):
[318]72 # Please do not show any hidden files or directories
73 [dirs.remove(name) for name in dirs if name.startswith('.')]
74 [files.remove(name) for name in files if name.startswith('.')]
[314]75 for name in files:
76 self.request.send('<li><a href="%(item)s">%(item)s</li>' % { 'item' : os.path.join(root.replace(document_root,'',1),name) })
77 self.request.send("</ul></body></html>")
78 elif urldata.has_key(urlid):
[312]79 self.request.send('HTTP/1.1 200 OK\n' + urlheader[urlid] + 'X-Proxy-Server: TCPMultiplexer.py\n' + '\n' + dataheader[urlid])
80 if req_type == 'HEAD':
81 return
82 elif req_type == 'GET':
83 urldata[urlid][self] = []
84 while running:
85 if len(urldata[urlid][self]) == 0:
86 time.sleep(0.1)
87 continue
88 self.request.send(urldata[urlid][self].pop(0))
[314]89 elif os.path.isfile(document_file):
90 self.request.send('HTTP/1.1 200 OK\n\n' + open(document_file,'r').read())
91 else:
92 self.request.send("HTTP/1.1 404 NOT FOUND\nContent-Type: text/html\n\n<h1>404</h1>The page '%s' does not exists" % urlid)
[311]93 except IOError:
94 pass
95 except StandardError, e:
[312]96 logging.warning('Server has issues: %s', e)
[311]97 del urldata[urlid][self]
98
99def get_data(url, urlid, *args):
[312]100 global running, dataheader, urlheader, urldata
101 logger = logging.getLogger('recv_id=' + urlid)
[311]102 # Fill buffers if needed
[312]103 logger.info("Starting thread '%s' (%s)", url, urlid)
[311]104 while running:
105 try:
106 f = urllib2.urlopen(url)
[312]107 urlheader[urlid] = ''.join(f.info().headers)
[311]108 urldata[urlid] = dict()
[312]109 datatype = None
110 urltype = f.info().gettype()
111 logger.info("url.info().gettype() is '%s'", urltype)
112 if urltype == 'audio/x-wav':
113 datatype = 'wav'
114 elif urltype == 'multipart/x-mixed-replace':
115 datatype = 'mjeg'
116 else:
117 logger.warning("Cannot find file type of '%s'", url)
118
119 if datatype:
120 logger.info("'%s' Identified as %s", url, datatype)
121
122 if datatype == 'wav':
[318]123 # WAV header
[312]124 dataheader[urlid] = f.read(44)
125 elif datatype == 'mjeg':
[318]126 # Get rid of Content-Length as not all pictures suits this setup
127 headers = f.read(100).splitlines()[0:3]
128 logger.info("Image headers %s", headers)
129 valid_image = headers[0] + f.read(100000).split(headers[0])[1]
130 dataheader[urlid] = valid_image + '\n'.join([h for h in headers if not h.startswith('Content-length:')]) + '\n'
[312]131 else:
132 dataheader[urlid] = ''
133
[318]134 logger.info("Using dataheader of length %s", len(dataheader[urlid]))
135
[311]136 while running:
137 data = f.read(10000)
138 for key in urldata[urlid].keys():
139 urldata[urlid][key].append(data)
[312]140 except IOError, e:
[311]141 #Enforce a connection reset
[312]142 logger.warning("URL reset '%s' (%s)", url, e)
143 del urlheader[urlid]
144 del dataheader[urlid]
145 del urldata[urlid]
[311]146 time.sleep(1)
147 continue
[312]148 logger.info("Closing Thread '%s'", url)
[311]149
150
151if __name__ == "__main__":
[313]152 parser = argparse.ArgumentParser(description=__doc__,formatter_class=argparse.RawDescriptionHelpFormatter)
153 parser.add_argument('--host', dest='host', default=DEFAULT_HOST, help='Listen to IP [default: %s]' % DEFAULT_HOST)
154 parser.add_argument('--port', dest='port', default=DEFAULT_PORT, type=int, help='Listen to PORT [default: %s]' % DEFAULT_PORT)
155 parser.add_argument('--stream-cfg', dest='stream_cfg', default=DEFAULT_CONFIG, help='YAML Stream configuration [default: %s]' % DEFAULT_CONFIG)
[314]156 parser.add_argument('--document-root', dest='document_root', default=DEFAULT_DOCUMENTROOT, help='Document Root File Directory [default: %s]' % DEFAULT_DOCUMENTROOT)
[313]157 args = parser.parse_args()
[312]158
[314]159 document_root = os.path.abspath(os.path.join(os.path.dirname(__file__),args.document_root))
160 logging.info("Serving '/htdocs' from document_root '%s'", document_root)
161
[313]162 # Inport streams
163 streams = DEFAULT_STREAMS
164 try:
165 import yaml
[318]166 streams.update(yaml.load(open(args.stream_cfg)))
[313]167 except (ImportError, IOError) as e:
168 logging.warning("Stream config file '%s' not readable or parsable (%s)", args.stream_cfg, e)
[312]169
170
[313]171 # Create the base server
[311]172 try:
173 while True:
174 try:
175 ThreadedTCPServer.allow_reuse_address = True
[313]176 server = ThreadedTCPServer((args.host, args.port), MyTCPHandler)
[311]177 break
178 except IOError, e:
[313]179 logging.warning('For conection %s:%s to become available (%s)', args.host, args.port , e)
[311]180 time.sleep(1)
181 except KeyboardInterrupt:
182 sys.exit(1)
183
[312]184 for urlid, url in streams.iteritems():
185 recv_threads.append(threading.Thread(target=get_data, args=(url,urlid)))
186 recv_threads[-1].setDaemon(True)
187 recv_threads[-1].start()
[311]188
189 # Activate the server; this will keep running until you
190 # interrupt the program with Ctrl-C
191 try:
[313]192 logging.info('Serving at %s:%s', args.host, args.port)
[311]193 server.serve_forever()
194 except KeyboardInterrupt, IOError:
195 logging.info('Shutting down, please wait...')
196 running = False
197 server.shutdown()
198 [thread.join() for thread in recv_threads]
199 logging.info('All done, good bye!')
200
Note: See TracBrowser for help on using the repository browser.