source: py-tcpmultiplexer/TCPMultiplexer.py@ 322

Last change on this file since 322 was 322, checked in by Rick van der Zwet, 13 years ago

VLC webcam stream

  • Property svn:executable set to *
File size: 7.5 KB
Line 
1#!/usr/bin/env python
2"""
3# Multiplexer for HTTP streams, used for broadcast of MPJEG and WAV streams
4#
5# Licence: BSDLike
6# URL: http://rickvanderzwet.nl/svn/personal/py-tcpmultiplexer/
7#
8# Rick van der Zwet <info@rickvanderzwet.nl>
9"""
10
11import SocketServer
12import argparse
13import logging
14import sys
15import threading
16import time
17import urllib2
18import os
19
20# Some boring defaults
21DEFAULT_HOST = '0.0.0.0'
22DEFAULT_PORT = 9999
23DEFAULT_CONFIG = 'streams.yaml'
24DEFAULT_DOCUMENTROOT = './htdocs'
25
26
27# URL : TARGET
28DEFAULT_STREAMS = {
29 '/cam1/video' : 'http://172.16.0.67:8080/videofeed',
30 '/cam1/audio' : 'http://172.16.0.67:8080/audio.wav',
31 }
32
33# Global variables used as ring-buffers or shared-storage
34urlheader = dict()
35dataheader = dict()
36urldata = dict()
37running = True
38recv_threads = []
39document_root = None
40
41logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
42
43class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
44 pass
45
46
47class MyTCPHandler(SocketServer.BaseRequestHandler):
48 """
49 The RequestHandler class for our server.
50
51 It is instantiated once per connection to the server, and must
52 override the handle() method to implement communication to the
53 client.
54 """
55
56 def handle(self):
57 global running, dataheader, document_root, urlheader, urldata
58 # self.request is the TCP socket connected to the client
59 self.data = self.request.recv(1024).strip()
60 urlid = self.data.split('\n')[0].split()[1]
61 req_type = self.data.split('\n')[0].split()[0]
62 # XXX: Check for .. paths
63 document_file = document_root + urlid
64 logging.info("Connection %s from '%s' for '%s'", req_type, self.client_address[0], urlid)
65 try:
66 if urlid == '/':
67 self.request.send("HTTP/1.1 200 OK\n\n<html><head><title>Overview</title></head><body><h3>Streams</h3><ul>")
68 for urlid in sorted(urldata.keys()):
69 self.request.send('<li><a href="%(urlid)s">%(urlid)s</a></li>' % {'urlid': urlid})
70 self.request.send("</ul><h3>Files</h3><ul>")
71 for root, dirs, files in os.walk(document_root):
72 # Please do not show any hidden files or directories
73 [dirs.remove(name) for name in dirs if name.startswith('.')]
74 [files.remove(name) for name in files if name.startswith('.')]
75 for name in sorted(files):
76 self.request.send('<li><a href="%(item)s">%(item)s</li>' % { 'item' : os.path.join(root.replace(document_root,'',1),name) })
77 self.request.send("</ul></body></html>")
78 elif urldata.has_key(urlid):
79 self.request.send('HTTP/1.1 200 OK\n' + urlheader[urlid] + 'X-Proxy-Server: TCPMultiplexer.py\n' + '\n' + dataheader[urlid])
80 if req_type == 'HEAD':
81 return
82 elif req_type == 'GET':
83 urldata[urlid][self] = []
84 while running:
85 if len(urldata[urlid][self]) == 0:
86 time.sleep(0.1)
87 continue
88 self.request.send(urldata[urlid][self].pop(0))
89 elif os.path.isfile(document_file):
90 self.request.send('HTTP/1.1 200 OK\n\n' + open(document_file,'r').read())
91 else:
92 self.request.send("HTTP/1.1 404 NOT FOUND\nContent-Type: text/html\n\n<h1>404</h1>The page '%s' does not exists" % urlid)
93 except IOError:
94 logging.info("Connection closed from '%s'", self.client_address[0])
95 del urldata[urlid][self]
96
97def get_data(url, urlid, *args):
98 global running, dataheader, urlheader, urldata
99 logger = logging.getLogger('recv_id=' + urlid)
100 # Fill buffers if needed
101 logger.info("Starting thread '%s' (%s)", url, urlid)
102 while running:
103 try:
104 f = urllib2.urlopen(url)
105 urlheader[urlid] = ''.join(f.info().headers)
106 urldata[urlid] = dict()
107 datatype = None
108 urltype = f.info().gettype()
109 logger.info("url.info().gettype() is '%s'", urltype)
110 if urltype == 'audio/x-wav':
111 datatype = 'wav'
112 elif urltype == 'multipart/x-mixed-replace':
113 datatype = 'mjpeg'
114 else:
115 logger.warning("Cannot find file type of '%s'", url)
116
117 if datatype:
118 logger.info("'%s' Identified as %s", url, datatype)
119
120 if datatype == 'wav':
121 # WAV header
122 dataheader[urlid] = f.read(44)
123 elif datatype == 'mjpeg':
124 data = f.read(1024 * 1024)
125
126 # Get the required headers
127 headers = []
128 for header in data.splitlines():
129 if not header.strip():
130 # Newlines in the beginning are evil
131 if headers:
132 break
133 else:
134 continue
135 headers.append(header)
136 boundry = headers[0]
137 logger.info("Boundry line: %s", boundry)
138 logger.info("Image headers %s", headers)
139
140 valid_image = boundry + data.split(boundry)[1]
141 dataheader[urlid] = valid_image + '\n'.join(headers) + '\n'
142 else:
143 dataheader[urlid] = ''
144
145 logger.info("Using dataheader of length %s", len(dataheader[urlid]))
146
147 while running:
148 data = f.read(10000)
149 for key in urldata[urlid].keys():
150 urldata[urlid][key].append(data)
151 except IOError, e:
152 #Enforce a connection reset
153 logger.warning("URL reset '%s' (%s)", url, e)
154 del urlheader[urlid]
155 del dataheader[urlid]
156 del urldata[urlid]
157 time.sleep(1)
158 continue
159 logger.info("Closing Thread '%s'", url)
160
161
162if __name__ == "__main__":
163 parser = argparse.ArgumentParser(description=__doc__,formatter_class=argparse.RawDescriptionHelpFormatter)
164 parser.add_argument('--host', dest='host', default=DEFAULT_HOST, help='Listen to IP [default: %s]' % DEFAULT_HOST)
165 parser.add_argument('--port', dest='port', default=DEFAULT_PORT, type=int, help='Listen to PORT [default: %s]' % DEFAULT_PORT)
166 parser.add_argument('--stream-cfg', dest='stream_cfg', default=DEFAULT_CONFIG, help='YAML Stream configuration [default: %s]' % DEFAULT_CONFIG)
167 parser.add_argument('--document-root', dest='document_root', default=DEFAULT_DOCUMENTROOT, help='Document Root File Directory [default: %s]' % DEFAULT_DOCUMENTROOT)
168 args = parser.parse_args()
169
170 document_root = os.path.abspath(os.path.join(os.path.dirname(__file__),args.document_root))
171 logging.info("Serving '/htdocs' from document_root '%s'", document_root)
172
173 # Inport streams
174 streams = DEFAULT_STREAMS
175 try:
176 import yaml
177 streams.update(yaml.load(open(args.stream_cfg)))
178 except (ImportError, IOError) as e:
179 logging.warning("Stream config file '%s' not readable or parsable (%s)", args.stream_cfg, e)
180
181
182 # Create the base server
183 try:
184 while True:
185 try:
186 ThreadedTCPServer.allow_reuse_address = True
187 server = ThreadedTCPServer((args.host, args.port), MyTCPHandler)
188 break
189 except IOError, e:
190 logging.warning('For conection %s:%s to become available (%s)', args.host, args.port , e)
191 time.sleep(1)
192 except KeyboardInterrupt:
193 sys.exit(1)
194
195 for urlid, url in streams.iteritems():
196 recv_threads.append(threading.Thread(target=get_data, args=(url,urlid)))
197 recv_threads[-1].setDaemon(True)
198 recv_threads[-1].start()
199
200 # Activate the server; this will keep running until you
201 # interrupt the program with Ctrl-C
202 try:
203 logging.info('Serving at %s:%s', args.host, args.port)
204 server.serve_forever()
205 except KeyboardInterrupt, IOError:
206 logging.info('Shutting down, please wait...')
207 running = False
208 server.shutdown()
209 [thread.join() for thread in recv_threads]
210 logging.info('All done, good bye!')
211
Note: See TracBrowser for help on using the repository browser.