Commit aa2e1af6 authored by Amos Latteier's avatar Amos Latteier

Continued to tweak resource freeing code which should reduce leakage when errors occur.

parent 045432f2
...@@ -117,6 +117,7 @@ import DebugLogger ...@@ -117,6 +117,7 @@ import DebugLogger
from cStringIO import StringIO from cStringIO import StringIO
from tempfile import TemporaryFile from tempfile import TemporaryFile
import socket, string, os, sys, time import socket, string, os, sys, time
from types import StringType
tz_for_log=compute_timezone_for_log() tz_for_log=compute_timezone_for_log()
...@@ -124,6 +125,8 @@ class PCGIChannel(asynchat.async_chat): ...@@ -124,6 +125,8 @@ class PCGIChannel(asynchat.async_chat):
"""Processes a PCGI request by collecting the env and stdin and """Processes a PCGI request by collecting the env and stdin and
then passing them to ZPublisher. The result is wrapped in a then passing them to ZPublisher. The result is wrapped in a
producer and sent back.""" producer and sent back."""
closed=0
def __init__(self,server,sock,addr): def __init__(self,server,sock,addr):
self.server = server self.server = server
...@@ -175,9 +178,10 @@ class PCGIChannel(asynchat.async_chat): ...@@ -175,9 +178,10 @@ class PCGIChannel(asynchat.async_chat):
string.strip(self.env['PATH_INFO']),'/')) string.strip(self.env['PATH_INFO']),'/'))
self.env['PATH_INFO'] = '/' + string.join(path[len(script):],'/') self.env['PATH_INFO'] = '/' + string.join(path[len(script):],'/')
self.data=StringIO() self.data=StringIO()
DebugLogger.log('B', id(self), DebugLogger.log('B', id(self),
'%s %s' % (self.env['REQUEST_METHOD'], self.env['PATH_INFO'])) '%s %s' % (self.env['REQUEST_METHOD'],
self.env.get('PATH_INFO' ,'/')))
# now read the next size header # now read the next size header
self.set_terminator(10) self.set_terminator(10)
...@@ -249,31 +253,14 @@ class PCGIChannel(asynchat.async_chat): ...@@ -249,31 +253,14 @@ class PCGIChannel(asynchat.async_chat):
def __repr__(self): def __repr__(self):
return "<PCGIChannel at %x>" % id(self) return "<PCGIChannel at %x>" % id(self)
def handle_close(self): def close(self):
self.closed=1
while self.producer_fifo: while self.producer_fifo:
p=self.producer_fifo.first()
if p is not None and type(p) != StringType:
p.more() # free up resources held by producer
self.producer_fifo.pop() self.producer_fifo.pop()
self.close() asyncore.dispatcher.close(self)
def handle_error (self):
(file,fun,line), t, v, tbinfo = compact_traceback()
# sometimes a user repr method will crash.
try:
self_repr = repr (self)
except:
self_repr = '<__repr__ (self) failed for object at %0x>' % id(self)
self.log_info (
'uncaptured python exception, closing channel %s (%s:%s %s)' % (
self_repr,
t,
v,
tbinfo
),
'error'
)
self.handle_close()
class PCGIServer(asyncore.dispatcher): class PCGIServer(asyncore.dispatcher):
...@@ -405,7 +392,7 @@ class PCGIResponse(HTTPResponse): ...@@ -405,7 +392,7 @@ class PCGIResponse(HTTPResponse):
self.stdout.write(str(self)) self.stdout.write(str(self))
self._wrote=1 self._wrote=1
self.stdout.write(data) self.stdout.write(data)
def _finish(self): def _finish(self):
self.stdout.finish(self) self.stdout.finish(self)
self.stdout.close() self.stdout.close()
...@@ -434,27 +421,25 @@ class PCGIPipe: ...@@ -434,27 +421,25 @@ class PCGIPipe:
self._data.write(text) self._data.write(text)
def close(self): def close(self):
data=self._data.getvalue() if not self._channel.closed:
l=len(data) data=self._data.getvalue()
l=len(data)
DebugLogger.log('A', id(self._channel), DebugLogger.log('A', id(self._channel),
'%s %s' % (self._channel.reply_code, l)) '%s %s' % (self._channel.reply_code, l))
self._channel.push('%010d%s%010d' % (l, data, 0), 0)
self._channel.push('%010d%s%010d' % (l, data, 0), 0) self._channel.push(LoggingProducer(self._channel, l, 'log_request'), 0)
self._channel.push(LoggingProducer(self._channel, l, 'log_request'), 0) self._channel.push(CallbackProducer(
lambda t=('E', id(self._channel)): apply(DebugLogger.log,t)))
self._channel.push(CallbackProducer(
lambda t=('E', id(self._channel)): apply(DebugLogger.log,t))) if self._shutdown:
try: r=self._shutdown[0]
if self._shutdown: except: r=0
try: r=self._shutdown[0] sys.ZServerExitCode=r
except: r=0 self._channel.push(ShutdownProducer(), 0)
sys.ZServerExitCode=r Wakeup(lambda: asyncore.close_all())
self._channel.push(ShutdownProducer(), 0) else:
Wakeup(lambda: asyncore.close_all()) self._channel.push(None, 0)
else: Wakeup()
self._channel.push(None, 0)
Wakeup()
self._data=None self._data=None
self._channel=None self._channel=None
......
...@@ -117,6 +117,7 @@ import DebugLogger ...@@ -117,6 +117,7 @@ import DebugLogger
from cStringIO import StringIO from cStringIO import StringIO
from tempfile import TemporaryFile from tempfile import TemporaryFile
import socket, string, os, sys, time import socket, string, os, sys, time
from types import StringType
tz_for_log=compute_timezone_for_log() tz_for_log=compute_timezone_for_log()
...@@ -124,6 +125,8 @@ class PCGIChannel(asynchat.async_chat): ...@@ -124,6 +125,8 @@ class PCGIChannel(asynchat.async_chat):
"""Processes a PCGI request by collecting the env and stdin and """Processes a PCGI request by collecting the env and stdin and
then passing them to ZPublisher. The result is wrapped in a then passing them to ZPublisher. The result is wrapped in a
producer and sent back.""" producer and sent back."""
closed=0
def __init__(self,server,sock,addr): def __init__(self,server,sock,addr):
self.server = server self.server = server
...@@ -175,9 +178,10 @@ class PCGIChannel(asynchat.async_chat): ...@@ -175,9 +178,10 @@ class PCGIChannel(asynchat.async_chat):
string.strip(self.env['PATH_INFO']),'/')) string.strip(self.env['PATH_INFO']),'/'))
self.env['PATH_INFO'] = '/' + string.join(path[len(script):],'/') self.env['PATH_INFO'] = '/' + string.join(path[len(script):],'/')
self.data=StringIO() self.data=StringIO()
DebugLogger.log('B', id(self), DebugLogger.log('B', id(self),
'%s %s' % (self.env['REQUEST_METHOD'], self.env['PATH_INFO'])) '%s %s' % (self.env['REQUEST_METHOD'],
self.env.get('PATH_INFO' ,'/')))
# now read the next size header # now read the next size header
self.set_terminator(10) self.set_terminator(10)
...@@ -249,31 +253,14 @@ class PCGIChannel(asynchat.async_chat): ...@@ -249,31 +253,14 @@ class PCGIChannel(asynchat.async_chat):
def __repr__(self): def __repr__(self):
return "<PCGIChannel at %x>" % id(self) return "<PCGIChannel at %x>" % id(self)
def handle_close(self): def close(self):
self.closed=1
while self.producer_fifo: while self.producer_fifo:
p=self.producer_fifo.first()
if p is not None and type(p) != StringType:
p.more() # free up resources held by producer
self.producer_fifo.pop() self.producer_fifo.pop()
self.close() asyncore.dispatcher.close(self)
def handle_error (self):
(file,fun,line), t, v, tbinfo = compact_traceback()
# sometimes a user repr method will crash.
try:
self_repr = repr (self)
except:
self_repr = '<__repr__ (self) failed for object at %0x>' % id(self)
self.log_info (
'uncaptured python exception, closing channel %s (%s:%s %s)' % (
self_repr,
t,
v,
tbinfo
),
'error'
)
self.handle_close()
class PCGIServer(asyncore.dispatcher): class PCGIServer(asyncore.dispatcher):
...@@ -405,7 +392,7 @@ class PCGIResponse(HTTPResponse): ...@@ -405,7 +392,7 @@ class PCGIResponse(HTTPResponse):
self.stdout.write(str(self)) self.stdout.write(str(self))
self._wrote=1 self._wrote=1
self.stdout.write(data) self.stdout.write(data)
def _finish(self): def _finish(self):
self.stdout.finish(self) self.stdout.finish(self)
self.stdout.close() self.stdout.close()
...@@ -434,27 +421,25 @@ class PCGIPipe: ...@@ -434,27 +421,25 @@ class PCGIPipe:
self._data.write(text) self._data.write(text)
def close(self): def close(self):
data=self._data.getvalue() if not self._channel.closed:
l=len(data) data=self._data.getvalue()
l=len(data)
DebugLogger.log('A', id(self._channel), DebugLogger.log('A', id(self._channel),
'%s %s' % (self._channel.reply_code, l)) '%s %s' % (self._channel.reply_code, l))
self._channel.push('%010d%s%010d' % (l, data, 0), 0)
self._channel.push('%010d%s%010d' % (l, data, 0), 0) self._channel.push(LoggingProducer(self._channel, l, 'log_request'), 0)
self._channel.push(LoggingProducer(self._channel, l, 'log_request'), 0) self._channel.push(CallbackProducer(
lambda t=('E', id(self._channel)): apply(DebugLogger.log,t)))
self._channel.push(CallbackProducer(
lambda t=('E', id(self._channel)): apply(DebugLogger.log,t))) if self._shutdown:
try: r=self._shutdown[0]
if self._shutdown: except: r=0
try: r=self._shutdown[0] sys.ZServerExitCode=r
except: r=0 self._channel.push(ShutdownProducer(), 0)
sys.ZServerExitCode=r Wakeup(lambda: asyncore.close_all())
self._channel.push(ShutdownProducer(), 0) else:
Wakeup(lambda: asyncore.close_all()) self._channel.push(None, 0)
else: Wakeup()
self._channel.push(None, 0)
Wakeup()
self._data=None self._data=None
self._channel=None self._channel=None
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment