source: py-tcpmultiplexer/TCPMultiplexer.py@ 318

Last change on this file since 318 was 318, checked in by Rick van der Zwet, 13 years ago

First picture needs to valid for stream to be parsed correctly.

  • Property svn:executable set to *
File size: 7.3 KB
Line 
1#!/usr/bin/env python
2"""
3# Multiplexer for HTTP streams, used for broadcast of MPJEG and WAV streams
4#
5# Licence: BSDLike
6# URL: http://rickvanderzwet.nl/svn/personal/py-tcpmultiplexer/
7#
8# Rick van der Zwet <info@rickvanderzwet.nl>
9"""
10
11import SocketServer
12import argparse
13import logging
14import sys
15import threading
16import time
17import urllib2
18import os
19
20# Some boring defaults
21DEFAULT_HOST = '0.0.0.0'
22DEFAULT_PORT = 9999
23DEFAULT_CONFIG = 'streams.yaml'
24DEFAULT_DOCUMENTROOT = './htdocs'
25
26
27# URL : TARGET
28DEFAULT_STREAMS = {
29 '/cam1/video' : 'http://172.16.0.67:8080/videofeed',
30 '/cam1/audio' : 'http://172.16.0.67:8080/audio.wav',
31 }
32
33# Global variables used as ring-buffers or shared-storage
34urlheader = dict()
35dataheader = dict()
36urldata = dict()
37running = True
38recv_threads = []
39document_root = None
40
41logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
42
43class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
44 pass
45
46
47class MyTCPHandler(SocketServer.BaseRequestHandler):
48 """
49 The RequestHandler class for our server.
50
51 It is instantiated once per connection to the server, and must
52 override the handle() method to implement communication to the
53 client.
54 """
55
56 def handle(self):
57 global running, dataheader, document_root, urlheader, urldata
58 # self.request is the TCP socket connected to the client
59 self.data = self.request.recv(1024).strip()
60 urlid = self.data.split('\n')[0].split()[1]
61 req_type = self.data.split('\n')[0].split()[0]
62 # XXX: Check for .. paths
63 document_file = document_root + urlid
64 logging.info("Connection %s from '%s' for '%s'", req_type, self.client_address[0], urlid)
65 try:
66 if urlid == '/':
67 self.request.send("HTTP/1.1 200 OK\n\n<html><head><title>Overview</title></head><body><h3>Streams</h3><ul>")
68 for urlid in urldata.keys():
69 self.request.send('<li><a href="%(urlid)s">%(urlid)s</a></li>' % {'urlid': urlid})
70 self.request.send("</ul><h3>Files</h3><ul>")
71 for root, dirs, files in os.walk(document_root):
72 # Please do not show any hidden files or directories
73 [dirs.remove(name) for name in dirs if name.startswith('.')]
74 [files.remove(name) for name in files if name.startswith('.')]
75 for name in files:
76 self.request.send('<li><a href="%(item)s">%(item)s</li>' % { 'item' : os.path.join(root.replace(document_root,'',1),name) })
77 self.request.send("</ul></body></html>")
78 elif urldata.has_key(urlid):
79 self.request.send('HTTP/1.1 200 OK\n' + urlheader[urlid] + 'X-Proxy-Server: TCPMultiplexer.py\n' + '\n' + dataheader[urlid])
80 if req_type == 'HEAD':
81 return
82 elif req_type == 'GET':
83 urldata[urlid][self] = []
84 while running:
85 if len(urldata[urlid][self]) == 0:
86 time.sleep(0.1)
87 continue
88 self.request.send(urldata[urlid][self].pop(0))
89 elif os.path.isfile(document_file):
90 self.request.send('HTTP/1.1 200 OK\n\n' + open(document_file,'r').read())
91 else:
92 self.request.send("HTTP/1.1 404 NOT FOUND\nContent-Type: text/html\n\n<h1>404</h1>The page '%s' does not exists" % urlid)
93 except IOError:
94 pass
95 except StandardError, e:
96 logging.warning('Server has issues: %s', e)
97 del urldata[urlid][self]
98
99def get_data(url, urlid, *args):
100 global running, dataheader, urlheader, urldata
101 logger = logging.getLogger('recv_id=' + urlid)
102 # Fill buffers if needed
103 logger.info("Starting thread '%s' (%s)", url, urlid)
104 while running:
105 try:
106 f = urllib2.urlopen(url)
107 urlheader[urlid] = ''.join(f.info().headers)
108 urldata[urlid] = dict()
109 datatype = None
110 urltype = f.info().gettype()
111 logger.info("url.info().gettype() is '%s'", urltype)
112 if urltype == 'audio/x-wav':
113 datatype = 'wav'
114 elif urltype == 'multipart/x-mixed-replace':
115 datatype = 'mjeg'
116 else:
117 logger.warning("Cannot find file type of '%s'", url)
118
119 if datatype:
120 logger.info("'%s' Identified as %s", url, datatype)
121
122 if datatype == 'wav':
123 # WAV header
124 dataheader[urlid] = f.read(44)
125 elif datatype == 'mjeg':
126 # Get rid of Content-Length as not all pictures suits this setup
127 headers = f.read(100).splitlines()[0:3]
128 logger.info("Image headers %s", headers)
129 valid_image = headers[0] + f.read(100000).split(headers[0])[1]
130 dataheader[urlid] = valid_image + '\n'.join([h for h in headers if not h.startswith('Content-length:')]) + '\n'
131 else:
132 dataheader[urlid] = ''
133
134 logger.info("Using dataheader of length %s", len(dataheader[urlid]))
135
136 while running:
137 data = f.read(10000)
138 for key in urldata[urlid].keys():
139 urldata[urlid][key].append(data)
140 except IOError, e:
141 #Enforce a connection reset
142 logger.warning("URL reset '%s' (%s)", url, e)
143 del urlheader[urlid]
144 del dataheader[urlid]
145 del urldata[urlid]
146 time.sleep(1)
147 continue
148 logger.info("Closing Thread '%s'", url)
149
150
151if __name__ == "__main__":
152 parser = argparse.ArgumentParser(description=__doc__,formatter_class=argparse.RawDescriptionHelpFormatter)
153 parser.add_argument('--host', dest='host', default=DEFAULT_HOST, help='Listen to IP [default: %s]' % DEFAULT_HOST)
154 parser.add_argument('--port', dest='port', default=DEFAULT_PORT, type=int, help='Listen to PORT [default: %s]' % DEFAULT_PORT)
155 parser.add_argument('--stream-cfg', dest='stream_cfg', default=DEFAULT_CONFIG, help='YAML Stream configuration [default: %s]' % DEFAULT_CONFIG)
156 parser.add_argument('--document-root', dest='document_root', default=DEFAULT_DOCUMENTROOT, help='Document Root File Directory [default: %s]' % DEFAULT_DOCUMENTROOT)
157 args = parser.parse_args()
158
159 document_root = os.path.abspath(os.path.join(os.path.dirname(__file__),args.document_root))
160 logging.info("Serving '/htdocs' from document_root '%s'", document_root)
161
162 # Inport streams
163 streams = DEFAULT_STREAMS
164 try:
165 import yaml
166 streams.update(yaml.load(open(args.stream_cfg)))
167 except (ImportError, IOError) as e:
168 logging.warning("Stream config file '%s' not readable or parsable (%s)", args.stream_cfg, e)
169
170
171 # Create the base server
172 try:
173 while True:
174 try:
175 ThreadedTCPServer.allow_reuse_address = True
176 server = ThreadedTCPServer((args.host, args.port), MyTCPHandler)
177 break
178 except IOError, e:
179 logging.warning('For conection %s:%s to become available (%s)', args.host, args.port , e)
180 time.sleep(1)
181 except KeyboardInterrupt:
182 sys.exit(1)
183
184 for urlid, url in streams.iteritems():
185 recv_threads.append(threading.Thread(target=get_data, args=(url,urlid)))
186 recv_threads[-1].setDaemon(True)
187 recv_threads[-1].start()
188
189 # Activate the server; this will keep running until you
190 # interrupt the program with Ctrl-C
191 try:
192 logging.info('Serving at %s:%s', args.host, args.port)
193 server.serve_forever()
194 except KeyboardInterrupt, IOError:
195 logging.info('Shutting down, please wait...')
196 running = False
197 server.shutdown()
198 [thread.join() for thread in recv_threads]
199 logging.info('All done, good bye!')
200
Note: See TracBrowser for help on using the repository browser.