source: py-tcpmultiplexer/TCPMultiplexer.py@ 312

Last change on this file since 312 was 312, checked in by Rick van der Zwet, 13 years ago

Making sure we have valid starting headers...

  • Property svn:executable set to *
File size: 5.0 KB
Line 
1#!/usr/bin/env python
2#
3# Multiplexer for HTTP streams, used for broadcast of MPJEG and WAV streams
4#
5# Licence: BSDLike
6#
7# Rick van der Zwet <info@rickvanderzwet.nl>
8#
9import SocketServer
10import argparse
11import logging
12import sys
13import threading
14import time
15import urllib2
16
17# Some boring defaults
18DEFAULT_HOST = '0.0.0.0'
19DEFAULT_PORT = '9999'
20
21# Global variables used as ring-buffers or shared-storage
22urlheader = dict()
23dataheader = dict()
24urldata = dict()
25running = True
26recv_threads = []
27
28logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
29
30class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
31 pass
32
33
34class MyTCPHandler(SocketServer.BaseRequestHandler):
35 """
36 The RequestHandler class for our server.
37
38 It is instantiated once per connection to the server, and must
39 override the handle() method to implement communication to the
40 client.
41 """
42
43 def handle(self):
44 global running, dataheader, urlheader, urldata
45 # self.request is the TCP socket connected to the client
46 self.data = self.request.recv(1024).strip()
47 print self.data
48 urlid = self.data.split('\n')[0].split()[1]
49 req_type = self.data.split('\n')[0].split()[0]
50 logging.info("Connection %s from '%s' for '%s'", req_type, self.client_address[0], urlid)
51 try:
52 if not urldata.has_key(urlid):
53 self.request.send("HTTP/1.1 404 NOT FOUND\n\nThe page '%s' does not exists" % urlid)
54 else:
55 self.request.send('HTTP/1.1 200 OK\n' + urlheader[urlid] + 'X-Proxy-Server: TCPMultiplexer.py\n' + '\n' + dataheader[urlid])
56 if req_type == 'HEAD':
57 return
58 elif req_type == 'GET':
59 urldata[urlid][self] = []
60 while running:
61 if len(urldata[urlid][self]) == 0:
62 time.sleep(0.1)
63 continue
64 self.request.send(urldata[urlid][self].pop(0))
65 else:
66 assert False, "Invalid request"
67 except IOError:
68 pass
69 except StandardError, e:
70 logging.warning('Server has issues: %s', e)
71 del urldata[urlid][self]
72
73def get_data(url, urlid, *args):
74 global running, dataheader, urlheader, urldata
75 logger = logging.getLogger('recv_id=' + urlid)
76 # Fill buffers if needed
77 logger.info("Starting thread '%s' (%s)", url, urlid)
78 while running:
79 try:
80 f = urllib2.urlopen(url)
81 urlheader[urlid] = ''.join(f.info().headers)
82 urldata[urlid] = dict()
83 datatype = None
84 urltype = f.info().gettype()
85 logger.info("url.info().gettype() is '%s'", urltype)
86 if urltype == 'audio/x-wav':
87 datatype = 'wav'
88 elif urltype == 'multipart/x-mixed-replace':
89 datatype = 'mjeg'
90 else:
91 logger.warning("Cannot find file type of '%s'", url)
92
93 if datatype:
94 logger.info("'%s' Identified as %s", url, datatype)
95
96 if datatype == 'wav':
97 dataheader[urlid] = f.read(44)
98 elif datatype == 'mjeg':
99 dataheader[urlid] = '\n'.join(f.read(100).split('\n')[0:2])
100 else:
101 dataheader[urlid] = ''
102
103 while running:
104 data = f.read(10000)
105 for key in urldata[urlid].keys():
106 urldata[urlid][key].append(data)
107 except IOError, e:
108 #Enforce a connection reset
109 logger.warning("URL reset '%s' (%s)", url, e)
110 del urlheader[urlid]
111 del dataheader[urlid]
112 del urldata[urlid]
113 time.sleep(1)
114 continue
115 logger.info("Closing Thread '%s'", url)
116
117
118if __name__ == "__main__":
119 parser = argparse.ArgumentParser(description='Process some integers.')
120
121 # URL : TARGET
122 DEFAULT_STREAMS = {
123 '/cam1/video' : 'http://172.16.0.67:8080/videofeed',
124 '/cam1/audio' : 'http://172.16.0.67:8080/audio.wav',
125 }
126 # Flexible enough to specify using yaml file (sys.argv) as well
127 if len(sys.argv) > 1:
128 config_file = sys.argv[1]
129 try:
130 import yaml
131 streams = yaml.load(open(config_file))
132 except (ImportError, IOError) as e:
133 logging.error("Config file '%s' not readable or parsable (%s)", config_file, e)
134
135
136 HOST, PORT = "0.0.0.0", 9999
137 # Create the server
138 try:
139 while True:
140 try:
141 ThreadedTCPServer.allow_reuse_address = True
142 server = ThreadedTCPServer((HOST, PORT), MyTCPHandler)
143 break
144 except IOError, e:
145 logging.warning('For conection %s:%s to become available (%s)', HOST, PORT, e)
146 time.sleep(1)
147 except KeyboardInterrupt:
148 sys.exit(1)
149
150 for urlid, url in streams.iteritems():
151 recv_threads.append(threading.Thread(target=get_data, args=(url,urlid)))
152 recv_threads[-1].setDaemon(True)
153 recv_threads[-1].start()
154
155 # Activate the server; this will keep running until you
156 # interrupt the program with Ctrl-C
157 try:
158 logging.info('Serving at %s:%s', HOST,PORT)
159 server.serve_forever()
160 except KeyboardInterrupt, IOError:
161 logging.info('Shutting down, please wait...')
162 running = False
163 server.shutdown()
164 [thread.join() for thread in recv_threads]
165 logging.info('All done, good bye!')
166
Note: See TracBrowser for help on using the repository browser.