Commit d080e4f8 authored by Jason Madden's avatar Jason Madden

fix the refcycle in FOThread by denormalizing.

parent 1858944e
...@@ -75,6 +75,10 @@ class FileObjectThread(FileObjectBase): ...@@ -75,6 +75,10 @@ class FileObjectThread(FileObjectBase):
A file-like object wrapping another file-like object, performing all blocking A file-like object wrapping another file-like object, performing all blocking
operations on that object in a background thread. operations on that object in a background thread.
.. caution::
Attempting to change the threadpool or lock of an existing FileObjectThread
has undefined consequences.
.. versionchanged:: 1.1b1 .. versionchanged:: 1.1b1
The file object is closed using the threadpool. Note that whether or The file object is closed using the threadpool. Note that whether or
not this action is synchronous or asynchronous is not documented. not this action is synchronous or asynchronous is not documented.
...@@ -113,24 +117,23 @@ class FileObjectThread(FileObjectBase): ...@@ -113,24 +117,23 @@ class FileObjectThread(FileObjectBase):
else: else:
fobj = os.fdopen(fobj, mode, bufsize) fobj = os.fdopen(fobj, mode, bufsize)
self.__io_holder = [fobj] # signal for _wrap_method
super(FileObjectThread, self).__init__(fobj, closefd) super(FileObjectThread, self).__init__(fobj, closefd)
def _apply(self, func, args=None, kwargs=None):
with self.lock:
return self.threadpool.apply(func, args, kwargs)
def _do_close(self, fobj, closefd): def _do_close(self, fobj, closefd):
self.__io_holder[0] = None # for _wrap_method
try: try:
self._apply(fobj.flush) with self.lock:
self.threadpool.apply(fobj.flush)
finally: finally:
if closefd: if closefd:
# Note that we're not using self._apply; older code # Note that we're not taking the lock; older code
# did fobj.close() without going through the threadpool at all, # did fobj.close() without going through the threadpool at all,
# so acquiring the lock could potentially introduce deadlocks # so acquiring the lock could potentially introduce deadlocks
# that weren't present before. Avoiding the lock doesn't make # that weren't present before. Avoiding the lock doesn't make
# the existing race condition any worse. # the existing race condition any worse.
# We wrap the close in an exception handler and re-raise directly # We wrap the close in an exception handler and re-raise directly
# to avoid the (common, expected) IOError from being logged # to avoid the (common, expected) IOError from being logged by the pool
def close(): def close():
try: try:
fobj.close() fobj.close()
...@@ -144,6 +147,7 @@ class FileObjectThread(FileObjectBase): ...@@ -144,6 +147,7 @@ class FileObjectThread(FileObjectBase):
super(FileObjectThread, self)._do_delegate_methods() super(FileObjectThread, self)._do_delegate_methods()
if not hasattr(self, 'read1') and 'r' in getattr(self._io, 'mode', ''): if not hasattr(self, 'read1') and 'r' in getattr(self._io, 'mode', ''):
self.read1 = self.read self.read1 = self.read
self.__io_holder[0] = self._io
def _extra_repr(self): def _extra_repr(self):
return ' threadpool=%r' % (self.threadpool,) return ' threadpool=%r' % (self.threadpool,)
...@@ -159,19 +163,22 @@ class FileObjectThread(FileObjectBase): ...@@ -159,19 +163,22 @@ class FileObjectThread(FileObjectBase):
__next__ = next __next__ = next
def _wrap_method(self, method): def _wrap_method(self, method):
# NOTE: This introduces a refcycle within self: # NOTE: We are careful to avoid introducing a refcycle
# self.__dict__ has methods that directly refer to self. # within self. Our wrapper cannot refer to self.
# Options to eliminate this are weakrefs, using __getattribute__ to io_holder = self.__io_holder
# fake a method descriptor, other? They all seem more costly than lock = self.lock
# the refcycle. threadpool = self.threadpool
@functools.wraps(method) @functools.wraps(method)
def thread_method(*args, **kwargs): def thread_method(*args, **kwargs):
if self._io is None: if io_holder[0] is None:
# This is different than FileObjectPosix, etc, # This is different than FileObjectPosix, etc,
# because we want to save the expensive trip through # because we want to save the expensive trip through
# the threadpool. # the threadpool.
raise FileObjectClosed() raise FileObjectClosed()
return self._apply(method, args, kwargs) with lock:
return threadpool.apply(method, args, kwargs)
return thread_method return thread_method
......
...@@ -52,8 +52,6 @@ class Test(greentest.TestCase): ...@@ -52,8 +52,6 @@ class Test(greentest.TestCase):
raise AssertionError('Expected OSError: [Errno 2] No such file or directory') raise AssertionError('Expected OSError: [Errno 2] No such file or directory')
def test_leak(self): def test_leak(self):
from gevent.fileobject import FileObject, FileObjectThread
num_before = greentest.get_number_open_files() num_before = greentest.get_number_open_files()
p = subprocess.Popen([sys.executable, "-c", "print()"], p = subprocess.Popen([sys.executable, "-c", "print()"],
stdout=subprocess.PIPE) stdout=subprocess.PIPE)
...@@ -64,13 +62,6 @@ class Test(greentest.TestCase): ...@@ -64,13 +62,6 @@ class Test(greentest.TestCase):
gc.collect() gc.collect()
num_after = greentest.get_number_open_files() num_after = greentest.get_number_open_files()
if FileObject is FileObjectThread:
# There is (deliberately) a small refcycle within
# the instance itself
self.assertNotEqual(num_before, num_after)
gc.collect() # This produces a ResourceWarning
num_after = greentest.get_number_open_files()
self.assertEqual(num_before, num_after) self.assertEqual(num_before, num_after)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment