source: py-tcpmultiplexer/TCPMultiplexer.py@ 317

Last change on this file since 317 was 317, checked in by Rick van der Zwet, 13 years ago

Credit foo.

  • Property svn:executable set to *
File size: 6.8 KB
Line 
1#!/usr/bin/env python
2"""
3# Multiplexer for HTTP streams, used for broadcast of MPJEG and WAV streams
4#
5# Licence: BSDLike
6# URL: http://rickvanderzwet.nl/svn/personal/py-tcpmultiplexer/
7#
8# Rick van der Zwet <info@rickvanderzwet.nl>
9"""
10
11import SocketServer
12import argparse
13import logging
14import sys
15import threading
16import time
17import urllib2
18import os
19
20# Some boring defaults
21DEFAULT_HOST = '0.0.0.0'
22DEFAULT_PORT = 9999
23DEFAULT_CONFIG = 'streams.yaml'
24DEFAULT_DOCUMENTROOT = './htdocs'
25
26
27# URL : TARGET
28DEFAULT_STREAMS = {
29 '/cam1/video' : 'http://172.16.0.67:8080/videofeed',
30 '/cam1/audio' : 'http://172.16.0.67:8080/audio.wav',
31 }
32
33# Global variables used as ring-buffers or shared-storage
34urlheader = dict()
35dataheader = dict()
36urldata = dict()
37running = True
38recv_threads = []
39document_root = None
40
41logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
42
43class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
44 pass
45
46
47class MyTCPHandler(SocketServer.BaseRequestHandler):
48 """
49 The RequestHandler class for our server.
50
51 It is instantiated once per connection to the server, and must
52 override the handle() method to implement communication to the
53 client.
54 """
55
56 def handle(self):
57 global running, dataheader, document_root, urlheader, urldata
58 # self.request is the TCP socket connected to the client
59 self.data = self.request.recv(1024).strip()
60 urlid = self.data.split('\n')[0].split()[1]
61 req_type = self.data.split('\n')[0].split()[0]
62 # XXX: Check for .. paths
63 document_file = document_root + urlid
64 logging.info("Connection %s from '%s' for '%s'", req_type, self.client_address[0], urlid)
65 try:
66 if urlid == '/':
67 self.request.send("HTTP/1.1 200 OK\n\n<html><head><title>Overview</title></head><body><h3>Streams</h3><ul>")
68 for urlid in urldata.keys():
69 self.request.send('<li><a href="%(urlid)s">%(urlid)s</a></li>' % {'urlid': urlid})
70 self.request.send("</ul><h3>Files</h3><ul>")
71 for root, dirs, files in os.walk(document_root):
72 for name in files:
73 self.request.send('<li><a href="%(item)s">%(item)s</li>' % { 'item' : os.path.join(root.replace(document_root,'',1),name) })
74 # Please do not show any hidden file
75 [dirs.remove(name) for name in dirs if name.startswith('.')]
76 self.request.send("</ul></body></html>")
77 elif urldata.has_key(urlid):
78 self.request.send('HTTP/1.1 200 OK\n' + urlheader[urlid] + 'X-Proxy-Server: TCPMultiplexer.py\n' + '\n' + dataheader[urlid])
79 if req_type == 'HEAD':
80 return
81 elif req_type == 'GET':
82 urldata[urlid][self] = []
83 while running:
84 if len(urldata[urlid][self]) == 0:
85 time.sleep(0.1)
86 continue
87 self.request.send(urldata[urlid][self].pop(0))
88 elif os.path.isfile(document_file):
89 self.request.send('HTTP/1.1 200 OK\n\n' + open(document_file,'r').read())
90 else:
91 self.request.send("HTTP/1.1 404 NOT FOUND\nContent-Type: text/html\n\n<h1>404</h1>The page '%s' does not exists" % urlid)
92 except IOError:
93 pass
94 except StandardError, e:
95 logging.warning('Server has issues: %s', e)
96 del urldata[urlid][self]
97
98def get_data(url, urlid, *args):
99 global running, dataheader, urlheader, urldata
100 logger = logging.getLogger('recv_id=' + urlid)
101 # Fill buffers if needed
102 logger.info("Starting thread '%s' (%s)", url, urlid)
103 while running:
104 try:
105 f = urllib2.urlopen(url)
106 urlheader[urlid] = ''.join(f.info().headers)
107 urldata[urlid] = dict()
108 datatype = None
109 urltype = f.info().gettype()
110 logger.info("url.info().gettype() is '%s'", urltype)
111 if urltype == 'audio/x-wav':
112 datatype = 'wav'
113 elif urltype == 'multipart/x-mixed-replace':
114 datatype = 'mjeg'
115 else:
116 logger.warning("Cannot find file type of '%s'", url)
117
118 if datatype:
119 logger.info("'%s' Identified as %s", url, datatype)
120
121 if datatype == 'wav':
122 dataheader[urlid] = f.read(44)
123 elif datatype == 'mjeg':
124 dataheader[urlid] = '\n'.join(f.read(100).split('\n')[0:2])
125 else:
126 dataheader[urlid] = ''
127
128 while running:
129 data = f.read(10000)
130 for key in urldata[urlid].keys():
131 urldata[urlid][key].append(data)
132 except IOError, e:
133 #Enforce a connection reset
134 logger.warning("URL reset '%s' (%s)", url, e)
135 del urlheader[urlid]
136 del dataheader[urlid]
137 del urldata[urlid]
138 time.sleep(1)
139 continue
140 logger.info("Closing Thread '%s'", url)
141
142
143if __name__ == "__main__":
144 parser = argparse.ArgumentParser(description=__doc__,formatter_class=argparse.RawDescriptionHelpFormatter)
145 parser.add_argument('--host', dest='host', default=DEFAULT_HOST, help='Listen to IP [default: %s]' % DEFAULT_HOST)
146 parser.add_argument('--port', dest='port', default=DEFAULT_PORT, type=int, help='Listen to PORT [default: %s]' % DEFAULT_PORT)
147 parser.add_argument('--stream-cfg', dest='stream_cfg', default=DEFAULT_CONFIG, help='YAML Stream configuration [default: %s]' % DEFAULT_CONFIG)
148 parser.add_argument('--document-root', dest='document_root', default=DEFAULT_DOCUMENTROOT, help='Document Root File Directory [default: %s]' % DEFAULT_DOCUMENTROOT)
149 args = parser.parse_args()
150
151 document_root = os.path.abspath(os.path.join(os.path.dirname(__file__),args.document_root))
152 logging.info("Serving '/htdocs' from document_root '%s'", document_root)
153
154 # Inport streams
155 streams = DEFAULT_STREAMS
156 try:
157 import yaml
158 streams = yaml.load(open(args.stream_cfg))
159 except (ImportError, IOError) as e:
160 logging.warning("Stream config file '%s' not readable or parsable (%s)", args.stream_cfg, e)
161
162
163 # Create the base server
164 try:
165 while True:
166 try:
167 ThreadedTCPServer.allow_reuse_address = True
168 server = ThreadedTCPServer((args.host, args.port), MyTCPHandler)
169 break
170 except IOError, e:
171 logging.warning('For conection %s:%s to become available (%s)', args.host, args.port , e)
172 time.sleep(1)
173 except KeyboardInterrupt:
174 sys.exit(1)
175
176 for urlid, url in streams.iteritems():
177 recv_threads.append(threading.Thread(target=get_data, args=(url,urlid)))
178 recv_threads[-1].setDaemon(True)
179 recv_threads[-1].start()
180
181 # Activate the server; this will keep running until you
182 # interrupt the program with Ctrl-C
183 try:
184 logging.info('Serving at %s:%s', args.host, args.port)
185 server.serve_forever()
186 except KeyboardInterrupt, IOError:
187 logging.info('Shutting down, please wait...')
188 running = False
189 server.shutdown()
190 [thread.join() for thread in recv_threads]
191 logging.info('All done, good bye!')
192
Note: See TracBrowser for help on using the repository browser.