Commit 096909b8 authored by Denis Bilenko's avatar Denis Bilenko

fileobject: use __del__ implemented in Cython so that it cannot be interrupted...

fileobject: use __del__ implemented in Cython so that it cannot be interrupted asynchronously; multiple bugfixes

 - add 'close' argument to SocketAdapter
 - handle EINTR
 - remove _flushlock
 - remove closedfileobject
parent 80720f69
from python cimport * from python cimport *
import os
__all__ = ['set_exc_info'] __all__ = ['set_exc_info']
...@@ -20,3 +21,18 @@ def set_exc_info(object type, object value): ...@@ -20,3 +21,18 @@ def set_exc_info(object type, object value):
Py_INCREF(<PyObjectPtr>value) Py_INCREF(<PyObjectPtr>value)
tstate.exc_value = <PyObjectPtr>value tstate.exc_value = <PyObjectPtr>value
tstate.exc_traceback = NULL tstate.exc_traceback = NULL
# We implement __del__s in Cython so that they are safe against signals
def SocketAdapter__del__(self, close=os.close):
fileno = self._fileno
if fileno is not None:
self._fileno = None
if self._close:
close(fileno)
def noop(self):
pass
import sys import sys
import os import os
from gevent.hub import get_hub from gevent.hub import get_hub
from gevent.lock import RLock
from gevent.socket import EBADF from gevent.socket import EBADF
...@@ -22,9 +21,26 @@ if fcntl is None: ...@@ -22,9 +21,26 @@ if fcntl is None:
else: else:
from gevent.socket import _fileobject, EAGAIN from gevent.socket import _fileobject, EAGAIN, _get_memory
from errno import EINTR
cancel_wait_ex = IOError(EBADF, 'File descriptor was closed in another greenlet') cancel_wait_ex = IOError(EBADF, 'File descriptor was closed in another greenlet')
try:
from gevent._util import SocketAdapter__del__, noop
except ImportError:
SocketAdapter__del__ = None
noop = None
from types import UnboundMethodType
class NA(object):
def __repr__(self):
return 'N/A'
NA = NA()
class SocketAdapter(object): class SocketAdapter(object):
"""Socket-like API on top of a file descriptor. """Socket-like API on top of a file descriptor.
...@@ -33,12 +49,13 @@ else: ...@@ -33,12 +49,13 @@ else:
from file descriptors on POSIX platforms. from file descriptors on POSIX platforms.
""" """
def __init__(self, fileno, mode=None): def __init__(self, fileno, mode=None, close=True):
if not isinstance(fileno, (int, long)): if not isinstance(fileno, (int, long)):
raise TypeError('fileno must be int: %r' % fileno) raise TypeError('fileno must be int: %r' % fileno)
self._fileno = fileno self._fileno = fileno
self._mode = mode or 'rb' self._mode = mode or 'rb'
self._translate = 'U' in mode self._close = close
self._translate = 'U' in self._mode
fcntl(fileno, F_SETFL, os.O_NONBLOCK) fcntl(fileno, F_SETFL, os.O_NONBLOCK)
self._eat_newline = False self._eat_newline = False
self.hub = get_hub() self.hub = get_hub()
...@@ -48,9 +65,10 @@ else: ...@@ -48,9 +65,10 @@ else:
def __repr__(self): def __repr__(self):
if self._fileno is None: if self._fileno is None:
return '<%s closed>' % (self.__class__.__name__, ) return '<%s at 0x%x closed>' % (self.__class__.__name__, id(self))
else: else:
return '%s(%r, %r)' % (self.__class__.__name__, self._fileno, self._mode) args = (self.__class__.__name__, id(self), getattr(self, '_fileno', NA), getattr(self, '_mode', NA))
return '<%s at 0x%x (%r, %r)>' % args
def makefile(self, *args, **kwargs): def makefile(self, *args, **kwargs):
return _fileobject(self, *args, **kwargs) return _fileobject(self, *args, **kwargs)
...@@ -61,13 +79,19 @@ else: ...@@ -61,13 +79,19 @@ else:
raise IOError(EBADF, 'Bad file descriptor (%s object is closed)' % self.__class__.__name) raise IOError(EBADF, 'Bad file descriptor (%s object is closed)' % self.__class__.__name)
return result return result
def detach(self):
x = self._fileno
self._fileno = None
return x
def close(self): def close(self):
self.hub.cancel_wait(self._read_event, cancel_wait_ex) self.hub.cancel_wait(self._read_event, cancel_wait_ex)
self.hub.cancel_wait(self._write_event, cancel_wait_ex) self.hub.cancel_wait(self._write_event, cancel_wait_ex)
fileno = self._fileno fileno = self._fileno
if fileno is not None: if fileno is not None:
self._fileno = None self._fileno = None
os.close(fileno) if self._close:
os.close(fileno)
def sendall(self, data): def sendall(self, data):
fileno = self.fileno() fileno = self.fileno()
...@@ -91,10 +115,13 @@ else: ...@@ -91,10 +115,13 @@ else:
try: try:
data = os.read(self.fileno(), size) data = os.read(self.fileno(), size)
except (IOError, OSError): except (IOError, OSError):
ex = sys.exc_info()[1] code = sys.exc_info()[1].args[0]
if ex.args[0] == EBADF: if code == EBADF:
return '' return ''
if ex.args[0] != EAGAIN: elif code == EINTR:
sys.exc_clear()
continue
elif code != EAGAIN:
raise raise
sys.exc_clear() sys.exc_clear()
else: else:
...@@ -122,6 +149,16 @@ else: ...@@ -122,6 +149,16 @@ else:
data = data.replace("\r", "\n") data = data.replace("\r", "\n")
return data return data
if not SocketAdapter__del__:
def __del__(self, close=os.close):
fileno = self._fileno
if fileno is not None:
close(fileno)
if SocketAdapter__del__:
SocketAdapter.__del__ = UnboundMethodType(SocketAdapter__del__, None, SocketAdapter)
class FileObjectPosix(_fileobject): class FileObjectPosix(_fileobject):
...@@ -131,9 +168,9 @@ else: ...@@ -131,9 +168,9 @@ else:
fobj = None fobj = None
else: else:
fileno = fobj.fileno() fileno = fobj.fileno()
sock = SocketAdapter(fileno, mode) sock = SocketAdapter(fileno, mode, close=close)
self._fobj = fobj self._fobj = fobj
self._flushlock = RLock() self._closed = False
_fileobject.__init__(self, sock, mode=mode, bufsize=bufsize, close=close) _fileobject.__init__(self, sock, mode=mode, bufsize=bufsize, close=close)
def __repr__(self): def __repr__(self):
...@@ -145,27 +182,36 @@ else: ...@@ -145,27 +182,36 @@ else:
return '<%s %s _fobj=%r>' % (self.__class__.__name__, self._sock, self._fobj) return '<%s %s _fobj=%r>' % (self.__class__.__name__, self._sock, self._fobj)
def close(self): def close(self):
if self._closed:
# make sure close() is only ran once when called concurrently
# cannot rely on self._sock for this because we need to keep that until flush() is done
return
self._closed = True
sock = self._sock sock = self._sock
if sock is None: if sock is None:
return return
try: try:
self.flush() self.flush()
finally: finally:
if self._fobj is not None or not self._close:
sock.detach()
self._sock = None self._sock = None
if self._close:
sock.close()
self._fobj = None self._fobj = None
# what if fobj has its own close() in __del__ like fdopen?
def flush(self):
# the reason we make flush() greenlet-safe is to make close() greenlet-safe
with self._flushlock:
return _fileobject.flush(self)
def __getattr__(self, item): def __getattr__(self, item):
assert item != '_fobj' assert item != '_fobj'
if self._fobj is None:
raise FileObjectClosed
return getattr(self._fobj, item) return getattr(self._fobj, item)
if not noop:
def __del__(self):
pass
if noop:
FileObjectPosix.__del__ = UnboundMethodType(FileObjectPosix, None, noop)
class FileObjectThreadPool(object): class FileObjectThreadPool(object):
...@@ -177,47 +223,42 @@ class FileObjectThreadPool(object): ...@@ -177,47 +223,42 @@ class FileObjectThreadPool(object):
if threadpool is None: if threadpool is None:
threadpool = get_hub().threadpool threadpool = get_hub().threadpool
self.threadpool = threadpool self.threadpool = threadpool
self._flushlock = RLock()
def close(self): def close(self):
fobj = self._fobj fobj = self._fobj
if fobj is closedfileobject: if fobj is None:
return return
self._fobj = None
try: try:
self.flush() self.flush(__fobj=fobj)
finally: finally:
self._fobj = closedfileobject
if self._close: if self._close:
fobj.close() fobj.close()
def flush(self): def flush(self, __fobj=None):
# the reason we make flush() greenlet-safe is to make close() greenlet-safe if __fobj is not None:
fobj = self._fobj fobj = __fobj
if fobj is closedfileobject: else:
raise closedfileobject fobj = self._fobj
with self._flushlock: if fobj is None:
return self.threadpool.apply_e(BaseException, self._fobj.flush) raise FileObjectClosed
return self.threadpool.apply_e(BaseException, fobj.flush)
def __del__(self):
try:
self.close()
except:
# close() may fail if __init__ didn't complete
pass
def __repr__(self): def __repr__(self):
return '<%s _fobj=%r threadpool=%r>' % (self.__class__.__name__, self._fobj, self.threadpool) return '<%s _fobj=%r threadpool=%r>' % (self.__class__.__name__, self._fobj, self.threadpool)
def __getattr__(self, item): def __getattr__(self, item):
assert item != '_fobj' assert item != '_fobj'
if self._fobj is None:
raise FileObjectClosed
return getattr(self._fobj, item) return getattr(self._fobj, item)
for method in ['read', 'readinto', 'readline', 'readlines', 'write', 'writelines', 'xreadlines']: for method in ['read', 'readinto', 'readline', 'readlines', 'write', 'writelines', 'xreadlines']:
exec '''def %s(self, *args, **kwargs): exec '''def %s(self, *args, **kwargs):
fobj = self._fobj fobj = self._fobj
if fobj is closedfileobject: if fobj is None:
raise closedfileobject raise FileObjectClosed
return self.threadpool.apply_e(BaseException, fobj.%s, args, kwargs)''' % (method, method) return self.threadpool.apply_e(BaseException, fobj.%s, args, kwargs)''' % (method, method)
def __iter__(self): def __iter__(self):
...@@ -230,18 +271,7 @@ class FileObjectThreadPool(object): ...@@ -230,18 +271,7 @@ class FileObjectThreadPool(object):
raise StopIteration raise StopIteration
class closedfileobject(IOError): FileObjectClosed = IOError(EBADF, 'Bad file descriptor (FileObject was closed)')
def __init__(self):
IOError.__init__(self, EBADF, 'Bad file descriptor (FileObjectThreadPool was closed)')
def _dummy(self, *args, **kwargs):
raise self
__getattr__ = _dummy
__call__ = _dummy
closedfileobject = closedfileobject()
try: try:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment