source: py-tcpmultiplexer/TCPMultiplexer.py@ 321

Last change on this file since 321 was 320, checked in by Rick van der Zwet, 13 years ago

Making the boundry work

  • Property svn:executable set to *
File size: 7.4 KB
Line 
1#!/usr/bin/env python
2"""
3# Multiplexer for HTTP streams, used for broadcast of MPJEG and WAV streams
4#
5# Licence: BSDLike
6# URL: http://rickvanderzwet.nl/svn/personal/py-tcpmultiplexer/
7#
8# Rick van der Zwet <info@rickvanderzwet.nl>
9"""
10
11import SocketServer
12import argparse
13import logging
14import sys
15import threading
16import time
17import urllib2
18import os
19
20# Some boring defaults
21DEFAULT_HOST = '0.0.0.0'
22DEFAULT_PORT = 9999
23DEFAULT_CONFIG = 'streams.yaml'
24DEFAULT_DOCUMENTROOT = './htdocs'
25
26
27# URL : TARGET
28DEFAULT_STREAMS = {
29 '/cam1/video' : 'http://172.16.0.67:8080/videofeed',
30 '/cam1/audio' : 'http://172.16.0.67:8080/audio.wav',
31 }
32
33# Global variables used as ring-buffers or shared-storage
34urlheader = dict()
35dataheader = dict()
36urldata = dict()
37running = True
38recv_threads = []
39document_root = None
40
41logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
42
43class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
44 pass
45
46
47class MyTCPHandler(SocketServer.BaseRequestHandler):
48 """
49 The RequestHandler class for our server.
50
51 It is instantiated once per connection to the server, and must
52 override the handle() method to implement communication to the
53 client.
54 """
55
56 def handle(self):
57 global running, dataheader, document_root, urlheader, urldata
58 # self.request is the TCP socket connected to the client
59 self.data = self.request.recv(1024).strip()
60 urlid = self.data.split('\n')[0].split()[1]
61 req_type = self.data.split('\n')[0].split()[0]
62 # XXX: Check for .. paths
63 document_file = document_root + urlid
64 logging.info("Connection %s from '%s' for '%s'", req_type, self.client_address[0], urlid)
65 try:
66 if urlid == '/':
67 self.request.send("HTTP/1.1 200 OK\n\n<html><head><title>Overview</title></head><body><h3>Streams</h3><ul>")
68 for urlid in urldata.keys():
69 self.request.send('<li><a href="%(urlid)s">%(urlid)s</a></li>' % {'urlid': urlid})
70 self.request.send("</ul><h3>Files</h3><ul>")
71 for root, dirs, files in os.walk(document_root):
72 # Please do not show any hidden files or directories
73 [dirs.remove(name) for name in dirs if name.startswith('.')]
74 [files.remove(name) for name in files if name.startswith('.')]
75 for name in files:
76 self.request.send('<li><a href="%(item)s">%(item)s</li>' % { 'item' : os.path.join(root.replace(document_root,'',1),name) })
77 self.request.send("</ul></body></html>")
78 elif urldata.has_key(urlid):
79 self.request.send('HTTP/1.1 200 OK\n' + urlheader[urlid] + 'X-Proxy-Server: TCPMultiplexer.py\n' + '\n' + dataheader[urlid])
80 if req_type == 'HEAD':
81 return
82 elif req_type == 'GET':
83 urldata[urlid][self] = []
84 while running:
85 if len(urldata[urlid][self]) == 0:
86 time.sleep(0.1)
87 continue
88 self.request.send(urldata[urlid][self].pop(0))
89 elif os.path.isfile(document_file):
90 self.request.send('HTTP/1.1 200 OK\n\n' + open(document_file,'r').read())
91 else:
92 self.request.send("HTTP/1.1 404 NOT FOUND\nContent-Type: text/html\n\n<h1>404</h1>The page '%s' does not exists" % urlid)
93 except IOError:
94 pass
95 except StandardError, e:
96 logging.warning('Server has issues: %s', e)
97 del urldata[urlid][self]
98
99def get_data(url, urlid, *args):
100 global running, dataheader, urlheader, urldata
101 logger = logging.getLogger('recv_id=' + urlid)
102 # Fill buffers if needed
103 logger.info("Starting thread '%s' (%s)", url, urlid)
104 while running:
105 try:
106 f = urllib2.urlopen(url)
107 urlheader[urlid] = ''.join(f.info().headers)
108 urldata[urlid] = dict()
109 datatype = None
110 urltype = f.info().gettype()
111 logger.info("url.info().gettype() is '%s'", urltype)
112 if urltype == 'audio/x-wav':
113 datatype = 'wav'
114 elif urltype == 'multipart/x-mixed-replace':
115 datatype = 'mjpeg'
116 else:
117 logger.warning("Cannot find file type of '%s'", url)
118
119 if datatype:
120 logger.info("'%s' Identified as %s", url, datatype)
121
122 if datatype == 'wav':
123 # WAV header
124 dataheader[urlid] = f.read(44)
125 elif datatype == 'mjpeg':
126 data = f.read(1024 * 1024)
127
128 # Get the required headers
129 headers = []
130 for header in data.splitlines():
131 if not header.strip(): break
132 headers.append(header)
133 boundry = headers[0]
134 logger.info("Boundry line: %s", boundry)
135 logger.info("Image headers %s", headers)
136
137 valid_image = boundry + data.split(boundry)[1]
138 dataheader[urlid] = valid_image + '\n'.join(headers) + '\n'
139 else:
140 dataheader[urlid] = ''
141
142 logger.info("Using dataheader of length %s", len(dataheader[urlid]))
143
144 while running:
145 data = f.read(10000)
146 for key in urldata[urlid].keys():
147 urldata[urlid][key].append(data)
148 except IOError, e:
149 #Enforce a connection reset
150 logger.warning("URL reset '%s' (%s)", url, e)
151 del urlheader[urlid]
152 del dataheader[urlid]
153 del urldata[urlid]
154 time.sleep(1)
155 continue
156 logger.info("Closing Thread '%s'", url)
157
158
159if __name__ == "__main__":
160 parser = argparse.ArgumentParser(description=__doc__,formatter_class=argparse.RawDescriptionHelpFormatter)
161 parser.add_argument('--host', dest='host', default=DEFAULT_HOST, help='Listen to IP [default: %s]' % DEFAULT_HOST)
162 parser.add_argument('--port', dest='port', default=DEFAULT_PORT, type=int, help='Listen to PORT [default: %s]' % DEFAULT_PORT)
163 parser.add_argument('--stream-cfg', dest='stream_cfg', default=DEFAULT_CONFIG, help='YAML Stream configuration [default: %s]' % DEFAULT_CONFIG)
164 parser.add_argument('--document-root', dest='document_root', default=DEFAULT_DOCUMENTROOT, help='Document Root File Directory [default: %s]' % DEFAULT_DOCUMENTROOT)
165 args = parser.parse_args()
166
167 document_root = os.path.abspath(os.path.join(os.path.dirname(__file__),args.document_root))
168 logging.info("Serving '/htdocs' from document_root '%s'", document_root)
169
170 # Inport streams
171 streams = DEFAULT_STREAMS
172 try:
173 import yaml
174 streams.update(yaml.load(open(args.stream_cfg)))
175 except (ImportError, IOError) as e:
176 logging.warning("Stream config file '%s' not readable or parsable (%s)", args.stream_cfg, e)
177
178
179 # Create the base server
180 try:
181 while True:
182 try:
183 ThreadedTCPServer.allow_reuse_address = True
184 server = ThreadedTCPServer((args.host, args.port), MyTCPHandler)
185 break
186 except IOError, e:
187 logging.warning('For conection %s:%s to become available (%s)', args.host, args.port , e)
188 time.sleep(1)
189 except KeyboardInterrupt:
190 sys.exit(1)
191
192 for urlid, url in streams.iteritems():
193 recv_threads.append(threading.Thread(target=get_data, args=(url,urlid)))
194 recv_threads[-1].setDaemon(True)
195 recv_threads[-1].start()
196
197 # Activate the server; this will keep running until you
198 # interrupt the program with Ctrl-C
199 try:
200 logging.info('Serving at %s:%s', args.host, args.port)
201 server.serve_forever()
202 except KeyboardInterrupt, IOError:
203 logging.info('Shutting down, please wait...')
204 running = False
205 server.shutdown()
206 [thread.join() for thread in recv_threads]
207 logging.info('All done, good bye!')
208
Note: See TracBrowser for help on using the repository browser.