Changeset 312 for py-tcpmultiplexer


Ignore:
Timestamp:
Jul 19, 2011, 12:38:06 PM (13 years ago)
Author:
Rick van der Zwet
Message:

Making sure we have valid starting headers...

File:
1 edited

Legend:

Unmodified
Added
Removed
  • py-tcpmultiplexer/TCPMultiplexer.py

    r311 r312  
    77# Rick van der Zwet <info@rickvanderzwet.nl>
    88#
    9 import threading
    10 import urllib2
    11 import time
    129import SocketServer
     10import argparse
    1311import logging
    1412import sys
     13import threading
     14import time
     15import urllib2
    1516
    16 urlheaders = dict()
     17# Some boring defaults
     18DEFAULT_HOST = '0.0.0.0'
     19DEFAULT_PORT = '9999'
     20
     21# Global variables used as ring-buffers or shared-storage
     22urlheader = dict()
     23dataheader = dict()
    1724urldata = dict()
    1825running = True
     
    3542
    3643    def handle(self):
    37         global running, urlheaders, urldata
     44        global running, dataheader, urlheader, urldata
    3845        # self.request is the TCP socket connected to the client
    3946        self.data = self.request.recv(1024).strip()
     
    4350        logging.info("Connection %s from '%s' for '%s'", req_type, self.client_address[0], urlid)
    4451        try:
    45           self.request.send('HTTP/1.1 200 OK\n' + urlheaders[urlid] + '\n')
    46           if req_type == 'HEAD':
    47             return
    48           elif req_type == 'GET':
    49             urldata[urlid][self] = []
    50             while running:
    51               if len(urldata[urlid][self]) == 0:
    52                 time.sleep(0.1)
    53                 continue
    54               self.request.send(urldata[urlid][self].pop(0))
     52          if not urldata.has_key(urlid):
     53            self.request.send("HTTP/1.1 404 NOT FOUND\n\nThe page '%s' does not exists" % urlid)
    5554          else:
    56              assert False, "Invalid request"
     55            self.request.send('HTTP/1.1 200 OK\n' + urlheader[urlid] + 'X-Proxy-Server: TCPMultiplexer.py\n' + '\n' + dataheader[urlid])
     56            if req_type == 'HEAD':
     57              return
     58            elif req_type == 'GET':
     59              urldata[urlid][self] = []
     60              while running:
     61                if len(urldata[urlid][self]) == 0:
     62                  time.sleep(0.1)
     63                  continue
     64                self.request.send(urldata[urlid][self].pop(0))
     65            else:
     66               assert False, "Invalid request"
    5767        except IOError:
    5868          pass
    5969        except StandardError, e:
    60           logging.warning('Server has issues: %s' % e)
     70          logging.warning('Server has issues: %s', e)
    6171          del urldata[urlid][self]
    6272
    6373def get_data(url, urlid, *args):
    64   global running, urlheaders, urldata
     74  global running, dataheader, urlheader, urldata
     75  logger = logging.getLogger('recv_id=' + urlid)
    6576  # Fill buffers if needed
    66   logging.info("Starting thread '%s' (%s)" % (url, urlid))
     77  logger.info("Starting thread '%s' (%s)", url, urlid)
    6778  while running:
    6879    try:
    6980      f = urllib2.urlopen(url)
    70       urlheaders[urlid] = ''.join(f.info().headers)
     81      urlheader[urlid] = ''.join(f.info().headers)
    7182      urldata[urlid] = dict()
     83      datatype = None
     84      urltype = f.info().gettype()
     85      logger.info("url.info().gettype() is '%s'", urltype)
     86      if urltype == 'audio/x-wav':
     87        datatype = 'wav'
     88      elif urltype == 'multipart/x-mixed-replace':
     89        datatype = 'mjeg'
     90      else:
     91        logger.warning("Cannot find file type of '%s'", url)
     92
     93      if datatype:
     94        logger.info("'%s' Identified as %s", url, datatype)
     95
     96      if datatype == 'wav':
     97        dataheader[urlid] = f.read(44)
     98      elif datatype == 'mjeg':
     99        dataheader[urlid] = '\n'.join(f.read(100).split('\n')[0:2])
     100      else:
     101        dataheader[urlid] = ''
     102
    72103      while running:
    73104        data = f.read(10000)
    74105        for key in urldata[urlid].keys():
    75106          urldata[urlid][key].append(data)
    76     except UrlError:
     107    except IOError, e:
    77108      #Enforce a connection reset
    78       logging.warning("URL reset '$s'" % url)
    79       del urlheaders[key]
    80       del urldata[key]
     109      logger.warning("URL reset '%s' (%s)", url, e)
     110      del urlheader[urlid]
     111      del dataheader[urlid]
     112      del urldata[urlid]
    81113      time.sleep(1)
    82114      continue
    83   logging.info("Closing Thread '%s'" % url)
     115  logger.info("Closing Thread '%s'", url)
    84116
    85117
    86118if __name__ == "__main__":
     119  parser = argparse.ArgumentParser(description='Process some integers.')
     120
     121  # URL : TARGET
     122  DEFAULT_STREAMS = {
     123    '/cam1/video' : 'http://172.16.0.67:8080/videofeed',
     124    '/cam1/audio' : 'http://172.16.0.67:8080/audio.wav',
     125    }
     126  # Flexible enough to specify using yaml file (sys.argv) as well
     127  if len(sys.argv) > 1:
     128    config_file = sys.argv[1]
     129    try:
     130      import yaml
     131      streams = yaml.load(open(config_file))
     132    except (ImportError, IOError) as e:
     133      logging.error("Config file '%s' not readable or parsable (%s)", config_file, e)
     134
     135
    87136  HOST, PORT = "0.0.0.0", 9999
    88137  # Create the server
     
    94143        break
    95144      except IOError, e:
    96         logging.warning('For conection %s:%s to become available (%s)' % (HOST, PORT, e))
     145        logging.warning('For conection %s:%s to become available (%s)', HOST, PORT, e)
    97146        time.sleep(1)
    98147  except KeyboardInterrupt:
    99148    sys.exit(1)
    100149
    101   url = 'http://172.16.0.67:8080/videofeed'
    102   recv_threads.append(threading.Thread(target=get_data, args=(url,'/video1')))
    103   recv_threads[-1].setDaemon(True)
    104   recv_threads[-1].start()
     150  for urlid, url in streams.iteritems():
     151    recv_threads.append(threading.Thread(target=get_data, args=(url,urlid)))
     152    recv_threads[-1].setDaemon(True)
     153    recv_threads[-1].start()
    105154
    106155  # Activate the server; this will keep running until you
    107156  # interrupt the program with Ctrl-C
    108157  try:
    109     logging.info('Serving at %s:%s' % (HOST,PORT))
     158    logging.info('Serving at %s:%s', HOST,PORT)
    110159    server.serve_forever()
    111160  except KeyboardInterrupt, IOError:
Note: See TracChangeset for help on using the changeset viewer.