1 | #!/usr/bin/env python
2 | #
3 | # Multiplexer for HTTP streams, used for broadcast of MPJEG and WAV streams
4 | #
5 | # Licence: BSDLike
6 | #
7 | # Rick van der Zwet <info@rickvanderzwet.nl>
8 | #
9 | import threading
10 | import urllib2
11 | import time
12 | import SocketServer
13 | import logging
14 | import sys
15 |
16 | urlheaders = dict()
17 | urldata = dict()
18 | running = True
19 | recv_threads = []
20 |
21 | logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
22 |
23 | class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
24 | pass
25 |
26 |
27 | class MyTCPHandler(SocketServer.BaseRequestHandler):
28 | """
29 | The RequestHandler class for our server.
30 |
31 | It is instantiated once per connection to the server, and must
32 | override the handle() method to implement communication to the
33 | client.
34 | """
35 |
36 | def handle(self):
37 | global running, urlheaders, urldata
38 | # self.request is the TCP socket connected to the client
39 | self.data = self.request.recv(1024).strip()
40 | print self.data
41 | urlid = self.data.split('\n')[0].split()[1]
42 | req_type = self.data.split('\n')[0].split()[0]
43 | logging.info("Connection %s from '%s' for '%s'", req_type, self.client_address[0], urlid)
44 | try:
45 | self.request.send('HTTP/1.1 200 OK\n' + urlheaders[urlid] + '\n')
46 | if req_type == 'HEAD':
47 | return
48 | elif req_type == 'GET':
49 | urldata[urlid][self] = []
50 | while running:
51 | if len(urldata[urlid][self]) == 0:
52 | time.sleep(0.1)
53 | continue
54 | self.request.send(urldata[urlid][self].pop(0))
55 | else:
56 | assert False, "Invalid request"
57 | except IOError:
58 | pass
59 | except StandardError, e:
60 | logging.warning('Server has issues: %s' % e)
61 | del urldata[urlid][self]
62 |
63 | def get_data(url, urlid, *args):
64 | global running, urlheaders, urldata
65 | # Fill buffers if needed
66 | logging.info("Starting thread '%s' (%s)" % (url, urlid))
67 | while running:
68 | try:
69 | f = urllib2.urlopen(url)
70 | urlheaders[urlid] = ''.join(f.info().headers)
71 | urldata[urlid] = dict()
72 | while running:
73 | data = f.read(10000)
74 | for key in urldata[urlid].keys():
75 | urldata[urlid][key].append(data)
76 | except UrlError:
77 | #Enforce a connection reset
78 | logging.warning("URL reset '$s'" % url)
79 | del urlheaders[key]
80 | del urldata[key]
81 | time.sleep(1)
82 | continue
83 | logging.info("Closing Thread '%s'" % url)
84 |
85 |
86 | if __name__ == "__main__":
87 | HOST, PORT = "", 9999
88 | # Create the server
89 | try:
90 | while True:
91 | try:
92 | ThreadedTCPServer.allow_reuse_address = True
93 | server = ThreadedTCPServer((HOST, PORT), MyTCPHandler)
94 | break
95 | except IOError, e:
96 | logging.warning('For conection %s:%s to become available (%s)' % (HOST, PORT, e))
97 | time.sleep(1)
98 | except KeyboardInterrupt:
99 | sys.exit(1)
100 |
101 | url = ''
102 | recv_threads.append(threading.Thread(target=get_data, args=(url,'/video1')))
103 | recv_threads[-1].setDaemon(True)
104 | recv_threads[-1].start()
105 |
106 | # Activate the server; this will keep running until you
107 | # interrupt the program with Ctrl-C
108 | try:
109 | logging.info('Serving at %s:%s' % (HOST,PORT))
110 | server.serve_forever()
111 | except KeyboardInterrupt, IOError:
112 | logging.info('Shutting down, please wait...')
113 | running = False
114 | server.shutdown()
115 | [thread.join() for thread in recv_threads]
116 | logging.info('All done, good bye!')
117 |