Ignore:
Timestamp:
Jul 19, 2011, 11:24:53 PM (13 years ago)
Author:
Rick van der Zwet
Message:

Acive flags & some doc detiasl

File:
1 edited

Legend:

Unmodified
Added
Removed
  • py-tcpmultiplexer/TCPMultiplexer.py

    r324 r325  
    2424DEFAULT_CONFIG = 'streams.yaml'
    2525DEFAULT_DOCUMENTROOT = './htdocs'
    26 DEFAULT_TIMEOUT = 2
     26DEFAULT_TIMEOUT = None
    2727
    2828
    2929# URL : TARGET
    3030DEFAULT_STREAMS = {
    31   '/cam1/video' : 'http://172.16.0.67:8080/videofeed',
    32   '/cam1/audio' : 'http://172.16.0.67:8080/audio.wav',
     31#  '/cam1/video' : 'http://172.16.0.67:8080/videofeed',
     32#  '/cam1/audio' : 'http://172.16.0.67:8080/audio.wav',
    3333  }
    3434
     
    3838dataheader = dict()
    3939urldata = dict()
     40stream_running = dict()
    4041running = True
    4142recv_threads = []
    4243document_root = None
    43 
    44 logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
     44recv_timeout = None
     45
     46logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    4547
    4648class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
     
    5860
    5961    def handle(self):
    60         global running, dataheader, document_root, urlheader, urldata
     62        global running, dataheader, document_root, stream_running, urlheader, urldata
    6163        # self.request is the TCP socket connected to the client
    6264        self.data = self.request.recv(1024).strip()
     
    6971          if urlid == '/':
    7072            self.request.send("HTTP/1.1 200 OK\n\n<html><head><title>Overview</title></head><body><h3>Streams</h3><ul>")
    71             for urlid in sorted(urldata.keys()):
     73            for urlid in filter(lambda x: stream_running[x], stream_running.keys()):
    7274              self.request.send('<li><a href="%(urlid)s">%(urlid)s</a></li>' % {'urlid': urlid})
    7375            self.request.send("</ul><h3>Files</h3><ul>")
     
    9193                self.request.send(urldata[urlid][self].pop(0))
    9294          elif os.path.isfile(document_file):
    93             self.request.send('HTTP/1.1 200 OK\n\n' + open(document_file,'r').read())
     95            data = open(document_file,'r').read()
     96            self.request.send('HTTP/1.1 200 OK\nContent-Length: %s\n\n%s' % (len(data),data))
    9497          else:
    9598            self.request.send("HTTP/1.1 404 NOT FOUND\nContent-Type: text/html\n\n<h1>404</h1>The page '%s' does not exists" % urlid)
     
    102105def get_data(url, urlid, *args):
    103106  """ Fetch the DATA from the WAV or MJPEG Stream """
    104   global running, dataheader, urlheader, urldata
     107  global running, dataheader, urlheader, urldata, stream_running, recv_timeout
    105108  logger = logging.getLogger('recv_id=' + urlid)
    106109  # Fill buffers if needed
    107110  logger.info("Starting thread '%s' (%s)", url, urlid)
    108111  while running:
     112    stream_running[urlid] = False
    109113    urlheader[urlid] = None
    110114    dataheader[urlid] = None
    111115    urldata[urlid] = None
    112116    try:
    113       f = urllib2.urlopen(url)
     117      f = urllib2.urlopen(url,timeout=recv_timeout)
    114118      urlheader[urlid] = ''.join(f.info().headers)
    115119      urldata[urlid] = dict()
     
    161165      else:
    162166        dataheader[urlid] = ''
    163 
    164167      logger.info("Using dataheader of length %s", len(dataheader[urlid]))
    165168
     169      # Main data loader
     170      logger.info("Stream ready to serve")
     171      stream_running[urlid] = True
     172      recv_buffer_size = 1024 * 8
    166173      while running:
    167         data = f.read(10000)
    168         # logger.debug("Received data chunk with length: %s", len(data))
     174        data = f.read(recv_buffer_size)
     175        if not len(data) == recv_buffer_size:
     176          raise IOError("Connection corrupted, got '%s' instead of '%s'" % (len(data), recv_buffer_size))
     177        logger.debug("Received data chunk with length: %s", len(data))
    169178        for key in urldata[urlid].keys():
    170179          urldata[urlid][key].append(data)
     
    172181      #Enforce a connection reset
    173182      logger.warning("URL reset '%s' (%s)", url, e)
    174       del urlheader[urlid]
    175       del dataheader[urlid]
    176       del urldata[urlid]
     183      stream_running[urlid] = False
    177184      time.sleep(1)
    178       continue
     185      pass
    179186  logger.info("Closing Thread '%s'", url)
    180187
     
    193200
    194201  # Set the timeout
    195   socket.setdefaulttimeout(args.timeout)
     202  logging.info("Changing socket timeout from '%s' to '%s'", socket.getdefaulttimeout(), args.timeout)
     203  recv_timeout = args.timeout
    196204
    197205  # Inport streams
Note: See TracChangeset for help on using the changeset viewer.