Changeset 325 for py-tcpmultiplexer/TCPMultiplexer.py
- Timestamp:
- Jul 19, 2011, 11:24:53 PM (13 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
py-tcpmultiplexer/TCPMultiplexer.py
r324 r325 24 24 DEFAULT_CONFIG = 'streams.yaml' 25 25 DEFAULT_DOCUMENTROOT = './htdocs' 26 DEFAULT_TIMEOUT = 226 DEFAULT_TIMEOUT = None 27 27 28 28 29 29 # URL : TARGET 30 30 DEFAULT_STREAMS = { 31 '/cam1/video' : 'http://172.16.0.67:8080/videofeed',32 '/cam1/audio' : 'http://172.16.0.67:8080/audio.wav',31 # '/cam1/video' : 'http://172.16.0.67:8080/videofeed', 32 # '/cam1/audio' : 'http://172.16.0.67:8080/audio.wav', 33 33 } 34 34 … … 38 38 dataheader = dict() 39 39 urldata = dict() 40 stream_running = dict() 40 41 running = True 41 42 recv_threads = [] 42 43 document_root = None 43 44 logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') 44 recv_timeout = None 45 46 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') 45 47 46 48 class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer): … … 58 60 59 61 def handle(self): 60 global running, dataheader, document_root, urlheader, urldata62 global running, dataheader, document_root, stream_running, urlheader, urldata 61 63 # self.request is the TCP socket connected to the client 62 64 self.data = self.request.recv(1024).strip() … … 69 71 if urlid == '/': 70 72 self.request.send("HTTP/1.1 200 OK\n\n<html><head><title>Overview</title></head><body><h3>Streams</h3><ul>") 71 for urlid in sorted(urldata.keys()):73 for urlid in filter(lambda x: stream_running[x], stream_running.keys()): 72 74 self.request.send('<li><a href="%(urlid)s">%(urlid)s</a></li>' % {'urlid': urlid}) 73 75 self.request.send("</ul><h3>Files</h3><ul>") … … 91 93 self.request.send(urldata[urlid][self].pop(0)) 92 94 elif os.path.isfile(document_file): 93 self.request.send('HTTP/1.1 200 OK\n\n' + open(document_file,'r').read()) 95 data = open(document_file,'r').read() 96 self.request.send('HTTP/1.1 200 OK\nContent-Length: %s\n\n%s' % (len(data),data)) 94 97 else: 95 98 self.request.send("HTTP/1.1 404 NOT FOUND\nContent-Type: text/html\n\n<h1>404</h1>The page '%s' does not exists" % urlid) … … 102 105 def get_data(url, urlid, *args): 103 106 """ Fetch the DATA from the WAV or MJPEG Stream """ 104 global running, dataheader, urlheader, urldata 107 global running, dataheader, urlheader, urldata, stream_running, recv_timeout 105 108 logger = logging.getLogger('recv_id=' + urlid) 106 109 # Fill buffers if needed 107 110 logger.info("Starting thread '%s' (%s)", url, urlid) 108 111 while running: 112 stream_running[urlid] = False 109 113 urlheader[urlid] = None 110 114 dataheader[urlid] = None 111 115 urldata[urlid] = None 112 116 try: 113 f = urllib2.urlopen(url )117 f = urllib2.urlopen(url,timeout=recv_timeout) 114 118 urlheader[urlid] = ''.join(f.info().headers) 115 119 urldata[urlid] = dict() … … 161 165 else: 162 166 dataheader[urlid] = '' 163 164 167 logger.info("Using dataheader of length %s", len(dataheader[urlid])) 165 168 169 # Main data loader 170 logger.info("Stream ready to serve") 171 stream_running[urlid] = True 172 recv_buffer_size = 1024 * 8 166 173 while running: 167 data = f.read(10000) 168 # logger.debug("Received data chunk with length: %s", len(data)) 174 data = f.read(recv_buffer_size) 175 if not len(data) == recv_buffer_size: 176 raise IOError("Connection corrupted, got '%s' instead of '%s'" % (len(data), recv_buffer_size)) 177 logger.debug("Received data chunk with length: %s", len(data)) 169 178 for key in urldata[urlid].keys(): 170 179 urldata[urlid][key].append(data) … … 172 181 #Enforce a connection reset 173 182 logger.warning("URL reset '%s' (%s)", url, e) 174 del urlheader[urlid] 175 del dataheader[urlid] 176 del urldata[urlid] 183 stream_running[urlid] = False 177 184 time.sleep(1) 178 continue185 pass 179 186 logger.info("Closing Thread '%s'", url) 180 187 … … 193 200 194 201 # Set the timeout 195 socket.setdefaulttimeout(args.timeout) 202 logging.info("Changing socket timeout from '%s' to '%s'", socket.getdefaulttimeout(), args.timeout) 203 recv_timeout = args.timeout 196 204 197 205 # Inport streams
Note:
See TracChangeset
for help on using the changeset viewer.