source: py-tcpmultiplexer/TCPMultiplexer.py@ 311

Last change on this file since 311 was 311, checked in by Rick van der Zwet, 13 years ago

Initial commit for webcam redirect streamers

  • Property svn:executable set to *
File size: 3.4 KB
RevLine 
[311]1#!/usr/bin/env python
2#
3# Multiplexer for HTTP streams, used for broadcast of MPJEG and WAV streams
4#
5# Licence: BSDLike
6#
7# Rick van der Zwet <info@rickvanderzwet.nl>
8#
9import threading
10import urllib2
11import time
12import SocketServer
13import logging
14import sys
15
16urlheaders = dict()
17urldata = dict()
18running = True
19recv_threads = []
20
21logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
22
23class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
24 pass
25
26
27class MyTCPHandler(SocketServer.BaseRequestHandler):
28 """
29 The RequestHandler class for our server.
30
31 It is instantiated once per connection to the server, and must
32 override the handle() method to implement communication to the
33 client.
34 """
35
36 def handle(self):
37 global running, urlheaders, urldata
38 # self.request is the TCP socket connected to the client
39 self.data = self.request.recv(1024).strip()
40 print self.data
41 urlid = self.data.split('\n')[0].split()[1]
42 req_type = self.data.split('\n')[0].split()[0]
43 logging.info("Connection %s from '%s' for '%s'", req_type, self.client_address[0], urlid)
44 try:
45 self.request.send('HTTP/1.1 200 OK\n' + urlheaders[urlid] + '\n')
46 if req_type == 'HEAD':
47 return
48 elif req_type == 'GET':
49 urldata[urlid][self] = []
50 while running:
51 if len(urldata[urlid][self]) == 0:
52 time.sleep(0.1)
53 continue
54 self.request.send(urldata[urlid][self].pop(0))
55 else:
56 assert False, "Invalid request"
57 except IOError:
58 pass
59 except StandardError, e:
60 logging.warning('Server has issues: %s' % e)
61 del urldata[urlid][self]
62
63def get_data(url, urlid, *args):
64 global running, urlheaders, urldata
65 # Fill buffers if needed
66 logging.info("Starting thread '%s' (%s)" % (url, urlid))
67 while running:
68 try:
69 f = urllib2.urlopen(url)
70 urlheaders[urlid] = ''.join(f.info().headers)
71 urldata[urlid] = dict()
72 while running:
73 data = f.read(10000)
74 for key in urldata[urlid].keys():
75 urldata[urlid][key].append(data)
76 except UrlError:
77 #Enforce a connection reset
78 logging.warning("URL reset '$s'" % url)
79 del urlheaders[key]
80 del urldata[key]
81 time.sleep(1)
82 continue
83 logging.info("Closing Thread '%s'" % url)
84
85
86if __name__ == "__main__":
87 HOST, PORT = "0.0.0.0", 9999
88 # Create the server
89 try:
90 while True:
91 try:
92 ThreadedTCPServer.allow_reuse_address = True
93 server = ThreadedTCPServer((HOST, PORT), MyTCPHandler)
94 break
95 except IOError, e:
96 logging.warning('For conection %s:%s to become available (%s)' % (HOST, PORT, e))
97 time.sleep(1)
98 except KeyboardInterrupt:
99 sys.exit(1)
100
101 url = 'http://172.16.0.67:8080/videofeed'
102 recv_threads.append(threading.Thread(target=get_data, args=(url,'/video1')))
103 recv_threads[-1].setDaemon(True)
104 recv_threads[-1].start()
105
106 # Activate the server; this will keep running until you
107 # interrupt the program with Ctrl-C
108 try:
109 logging.info('Serving at %s:%s' % (HOST,PORT))
110 server.serve_forever()
111 except KeyboardInterrupt, IOError:
112 logging.info('Shutting down, please wait...')
113 running = False
114 server.shutdown()
115 [thread.join() for thread in recv_threads]
116 logging.info('All done, good bye!')
117
Note: See TracBrowser for help on using the repository browser.