#!/usr/bin/env python """ # Multiplexer for HTTP streams, used for broadcast of MPJEG and WAV streams # # Licence: BSDLike # URL: http://rickvanderzwet.nl/svn/personal/py-tcpmultiplexer/ # # Rick van der Zwet """ import SocketServer import argparse import logging import os import socket import sys import threading import time import urllib2 # Some boring defaults DEFAULT_HOST = '0.0.0.0' DEFAULT_PORT = 9999 DEFAULT_CONFIG = 'streams.yaml' DEFAULT_DOCUMENTROOT = './htdocs' DEFAULT_TIMEOUT = None # URL : TARGET DEFAULT_STREAMS = { # '/cam1/video' : 'http://172.16.0.67:8080/videofeed', # '/cam1/audio' : 'http://172.16.0.67:8080/audio.wav', } # Global variables used as ring-buffers or shared-storage urlheader = dict() dataheader = dict() urldata = dict() stream_running = dict() running = True recv_threads = [] document_root = None recv_timeout = None logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer): pass class MyTCPHandler(SocketServer.BaseRequestHandler): """ The RequestHandler class for our server. It is instantiated once per connection to the server, and must override the handle() method to implement communication to the client. """ def handle(self): global running, dataheader, document_root, stream_running, urlheader, urldata # self.request is the TCP socket connected to the client self.data = self.request.recv(1024).strip() urlid = self.data.split('\n')[0].split()[1] req_type = self.data.split('\n')[0].split()[0] # XXX: Check for .. paths document_file = document_root + urlid logging.info("Connection %s from '%s' for '%s'", req_type, self.client_address[0], urlid) try: if urlid == '/': self.request.send("HTTP/1.1 200 OK\n\nOverview

Streams

Files

") elif urldata.has_key(urlid): self.request.send('HTTP/1.1 200 OK\n' + urlheader[urlid] + 'X-Proxy-Server: TCPMultiplexer.py\n' + '\n' + dataheader[urlid]) if req_type == 'HEAD': return elif req_type == 'GET': urldata[urlid][self] = [] while running: if len(urldata[urlid][self]) == 0: time.sleep(0.1) continue self.request.send(urldata[urlid][self].pop(0)) elif os.path.isfile(document_file): data = open(document_file,'r').read() self.request.send('HTTP/1.1 200 OK\nContent-Length: %s\n\n%s' % (len(data),data)) else: self.request.send("HTTP/1.1 404 NOT FOUND\nContent-Type: text/html\n\n

404

The page '%s' does not exists" % urlid) except IOError: logging.info("Connection closed from '%s'", self.client_address[0]) del urldata[urlid][self] def get_data(url, urlid, *args): """ Fetch the DATA from the WAV or MJPEG Stream """ global running, dataheader, urlheader, urldata, stream_running, recv_timeout logger = logging.getLogger('recv_id=' + urlid) # Fill buffers if needed logger.info("Starting thread '%s' (%s)", url, urlid) while running: stream_running[urlid] = False urlheader[urlid] = None dataheader[urlid] = None urldata[urlid] = None try: f = urllib2.urlopen(url,timeout=recv_timeout) urlheader[urlid] = ''.join(f.info().headers) urldata[urlid] = dict() # Find datatype datatype = None urltype = f.info().gettype() logger.info("url.info().gettype() is '%s'", urltype) if urltype == 'audio/x-wav': datatype = 'wav' elif urltype == 'multipart/x-mixed-replace': datatype = 'mjpeg' # Be verbose to the user if datatype: logger.info("'%s' Identified as %s", url, datatype) else: logger.warning("Cannot find file type of '%s'", url) # Set the correct data header if datatype == 'wav': # WAV header dataheader[urlid] = f.read(44) elif datatype == 'mjpeg': data = f.read(1024) # Get the required headers and acurate datasize headers = [] datasize = 1024 * 1024 for header in data.splitlines(): if not header.strip(): # Newlines in the beginning are evil if headers: break else: continue if header.startswith('DataLen:') or header.startswith('Content-length:'): datasize = int(header.split(':')[1]) headers.append(header) boundry = headers[0] logger.info("Data Length: %s", datasize) logger.info("Boundry line: %s", boundry) logger.info("Image headers %s", headers) data = data + f.read(datasize * 2) valid_image = boundry + data.split(boundry)[1] dataheader[urlid] = valid_image + '\n'.join(headers) + '\n' else: dataheader[urlid] = '' logger.info("Using dataheader of length %s", len(dataheader[urlid])) # Main data loader logger.info("Stream ready to serve") stream_running[urlid] = True recv_buffer_size = 1024 * 8 while running: data = f.read(recv_buffer_size) if not len(data) == recv_buffer_size: raise IOError("Connection corrupted, got '%s' instead of '%s'" % (len(data), recv_buffer_size)) logger.debug("Received data chunk with length: %s", len(data)) for key in urldata[urlid].keys(): urldata[urlid][key].append(data) except IOError, e: #Enforce a connection reset logger.warning("URL reset '%s' (%s)", url, e) stream_running[urlid] = False time.sleep(1) pass logger.info("Closing Thread '%s'", url) if __name__ == "__main__": parser = argparse.ArgumentParser(description=__doc__,formatter_class=argparse.RawDescriptionHelpFormatter) parser.add_argument('--host', dest='host', default=DEFAULT_HOST, help='Listen to IP [default: %s]' % DEFAULT_HOST) parser.add_argument('--port', dest='port', default=DEFAULT_PORT, type=int, help='Listen to PORT [default: %s]' % DEFAULT_PORT) parser.add_argument('--timeout', dest='timeout', default=DEFAULT_TIMEOUT, type=int, help='Default socket timeout [default: %s]' % DEFAULT_TIMEOUT) parser.add_argument('--stream-cfg', dest='stream_cfg', default=DEFAULT_CONFIG, help='YAML Stream configuration [default: %s]' % DEFAULT_CONFIG) parser.add_argument('--document-root', dest='document_root', default=DEFAULT_DOCUMENTROOT, help='Document Root File Directory [default: %s]' % DEFAULT_DOCUMENTROOT) args = parser.parse_args() document_root = os.path.abspath(os.path.join(os.path.dirname(__file__),args.document_root)) logging.info("Serving '/htdocs' from document_root '%s'", document_root) # Set the timeout logging.info("Changing socket timeout from '%s' to '%s'", socket.getdefaulttimeout(), args.timeout) recv_timeout = args.timeout # Inport streams streams = DEFAULT_STREAMS try: import yaml streams.update(yaml.load(open(args.stream_cfg))) except (ImportError, IOError) as e: logging.warning("Stream config file '%s' not readable or parsable (%s)", args.stream_cfg, e) # Create the base server try: while True: try: ThreadedTCPServer.allow_reuse_address = True server = ThreadedTCPServer((args.host, args.port), MyTCPHandler) break except IOError, e: logging.warning('For conection %s:%s to become available (%s)', args.host, args.port , e) time.sleep(1) except KeyboardInterrupt: sys.exit(1) for urlid, url in streams.iteritems(): recv_threads.append(threading.Thread(target=get_data, args=(url,urlid))) recv_threads[-1].setDaemon(True) recv_threads[-1].start() # Activate the server; this will keep running until you # interrupt the program with Ctrl-C try: logging.info('Serving at %s:%s', args.host, args.port) server.serve_forever() except KeyboardInterrupt, IOError: logging.info('Shutting down, please wait...') running = False server.shutdown() [thread.join() for thread in recv_threads] logging.info('All done, good bye!')