Commit ccbea2e3 authored by Jason Madden's avatar Jason Madden

Fix #660 by being careful about the lifetime of objects exposed as attributes.

Needs a test case.
parent b6135d65
...@@ -7,7 +7,10 @@ ...@@ -7,7 +7,10 @@
1.1b6 (unreleased) 1.1b6 (unreleased)
================== ==================
- TDB - PyPy: Fix a memory leak for code that allocated and disposed of many
``gevent.lock.Semaphore`` subclasses. If monkey-patched, this could
also apply to ``threading.Semaphore`` objects. Reported in
:issue:`660` by Jay Oster.
1.1b5 (Sep 18, 2015) 1.1b5 (Sep 18, 2015)
==================== ====================
......
...@@ -8,6 +8,8 @@ cdef class Semaphore: ...@@ -8,6 +8,8 @@ cdef class Semaphore:
cpdef int release(self) except -1000 cpdef int release(self) except -1000
cpdef rawlink(self, object callback) cpdef rawlink(self, object callback)
cpdef unlink(self, object callback) cpdef unlink(self, object callback)
cpdef _start_notify(self)
cdef _notify_links(self)
cpdef int wait(self, object timeout=*) except -1000 cpdef int wait(self, object timeout=*) except -1000
cpdef bint acquire(self, int blocking=*, object timeout=*) except -1000 cpdef bint acquire(self, int blocking=*, object timeout=*) except -1000
cpdef __enter__(self) cpdef __enter__(self)
......
...@@ -6,9 +6,10 @@ from gevent.timeout import Timeout ...@@ -6,9 +6,10 @@ from gevent.timeout import Timeout
__all__ = ['Semaphore', 'BoundedSemaphore'] __all__ = ['Semaphore', 'BoundedSemaphore']
class Semaphore(object): class Semaphore(object):
""" """
Semaphore(value=1) -> Semaphore
A semaphore manages a counter representing the number of release() A semaphore manages a counter representing the number of release()
calls minus the number of acquire() calls, plus an initial value. calls minus the number of acquire() calls, plus an initial value.
The acquire() method blocks if necessary until it can return The acquire() method blocks if necessary until it can return
...@@ -22,15 +23,30 @@ class Semaphore(object): ...@@ -22,15 +23,30 @@ class Semaphore(object):
def __init__(self, value=1): def __init__(self, value=1):
if value < 0: if value < 0:
raise ValueError("semaphore initial value must be >= 0") raise ValueError("semaphore initial value must be >= 0")
self._links = []
self.counter = value self.counter = value
self._notifier = None
self._dirty = False self._dirty = False
# we don't want to do get_hub() here to allow module-level locks # In PyPy 2.6.1 with Cython 0.23, `cdef public` or `cdef
# readonly` attributes of type `object` can appear to leak if
# a Python subclass is used (this is visible simply
# instantiating this subclass if _links=[]). Our _links and
# _notifier are such attributes, and gevent.thread subclasses
# this class. Thus, we carefully manage the lifetime of the
# objects we put in these attributes so that, in the normal
# case of a semaphore used correctly (dealloced when it's not
# locked and no one is waiting), the leak goes away (because
# these objects are back to None). This can also be solved on PyPy
# by simply not declaring these objects in the pxd file, but that doesn't work for
# CPython ("No attribute..."); it might be possible to simply `cdef` them,
# but that's a minor backwards incompatibility (because they'd be inaccessible
# to python)
# See https://github.com/gevent/gevent/issues/660
self._links = None
self._notifier = None
# we don't want to do get_hub() here to allow defining module-level locks
# without initializing the hub # without initializing the hub
def __str__(self): def __str__(self):
params = (self.__class__.__name__, self.counter, len(self._links)) params = (self.__class__.__name__, self.counter, len(self._links) if self._links else 0)
return '<%s counter=%s _links[%s]>' % params return '<%s counter=%s _links[%s]>' % params
def locked(self): def locked(self):
...@@ -45,11 +61,37 @@ class Semaphore(object): ...@@ -45,11 +61,37 @@ class Semaphore(object):
def _start_notify(self): def _start_notify(self):
if self._links and self.counter > 0 and not self._notifier: if self._links and self.counter > 0 and not self._notifier:
self._notifier = get_hub().loop.run_callback(self._notify_links) # We create a new self._notifier each time through the loop,
# if needed. (it has a __bool__ method that tells whether it has
# been run; once it's run once---at the end of the loop---it becomes
# false.)
# Note that we pass Semaphore.__notify_links and not self._notify_links.
# This avoids having Cython create a bound method, which on PyPy 2.6.1,
# shows up as a leak, at least in the short term (possibly due to the cycles?)
# Simply passing it to Python, where Python doesn't keep a reference to it,
# shows as a leak.
# Note2: The method and tuple objects still show as a leak until the event
# loop is allowed to run, e.g., with gevent.sleep(). Manually running the callbacks
# isn't enough.
self._notifier = get_hub().loop.run_callback(Semaphore._notify_links, self)
def _notify_links(self): def _notify_links(self):
# Subclasses CANNOT override. This is a cdef method.
# We release self._notifier here. We are called by it
# at the end of the loop, and it is now false in a boolean way (as soon
# as this method returns).
# If we get acquired/released again, we will create a new one, but there's
# no need to keep it around until that point (making it potentially climb
# into older GC generations, notably on PyPy)
notifier = self._notifier
try:
while True: while True:
self._dirty = False self._dirty = False
if not self._links:
# In case we were manually unlinked before
# the callback. Which shouldn't happen
return
for link in self._links: for link in self._links:
if self.counter <= 0: if self.counter <= 0:
return return
...@@ -58,38 +100,60 @@ class Semaphore(object): ...@@ -58,38 +100,60 @@ class Semaphore(object):
except: except:
getcurrent().handle_error((link, self), *sys.exc_info()) getcurrent().handle_error((link, self), *sys.exc_info())
if self._dirty: if self._dirty:
# We mutated self._links so we need to start over
break break
if not self._dirty: if not self._dirty:
return return
finally:
# We should not have created a new notifier even if callbacks
# released us because we loop through *all* of our links on the
# same callback while self._notifier is still true.
assert self._notifier is notifier
self._notifier = None
def rawlink(self, callback): def rawlink(self, callback):
"""Register a callback to call when a counter is more than zero. """
rawlink(callback) -> None
Register a callback to call when a counter is more than zero.
*callback* will be called in the :class:`Hub <gevent.hub.Hub>`, so it must not use blocking gevent API. *callback* will be called in the :class:`Hub <gevent.hub.Hub>`, so it must not use blocking gevent API.
*callback* will be passed one argument: this instance. *callback* will be passed one argument: this instance.
""" """
if not callable(callback): if not callable(callback):
raise TypeError('Expected callable: %r' % (callback, )) raise TypeError('Expected callable:', callback)
if self._links is None:
self._links = [callback]
else:
self._links.append(callback) self._links.append(callback)
self._dirty = True self._dirty = True
def unlink(self, callback): def unlink(self, callback):
"""Remove the callback set by :meth:`rawlink`""" """
unlink(callback) -> None
Remove the callback set by :meth:`rawlink`
"""
try: try:
self._links.remove(callback) self._links.remove(callback)
self._dirty = True self._dirty = True
except ValueError: except (ValueError, AttributeError):
pass pass
if not self._links:
self._links = None
# TODO: Cancel a notifier if there are no links?
def wait(self, timeout=None): def wait(self, timeout=None):
""" """
wait(timeout=None) -> int
Wait until it is possible to acquire this semaphore, or until the optional Wait until it is possible to acquire this semaphore, or until the optional
*timeout* elapses. *timeout* elapses.
.. warning:: If this semaphore was initialized with a size of 0, .. warning:: If this semaphore was initialized with a size of 0,
this method will block forever if no timeout is given. this method will block forever if no timeout is given.
:param float timeout: If given, specifies the maximum amount of seconds :keyword float timeout: If given, specifies the maximum amount of seconds
this method will block. this method will block.
:return: A number indicating how many times the semaphore can be acquired :return: A number indicating how many times the semaphore can be acquired
before blocking. before blocking.
...@@ -117,6 +181,8 @@ class Semaphore(object): ...@@ -117,6 +181,8 @@ class Semaphore(object):
def acquire(self, blocking=True, timeout=None): def acquire(self, blocking=True, timeout=None):
""" """
acquire(blocking=True, timeout=None) -> bool
Acquire the semaphore. Acquire the semaphore.
.. warning:: If this semaphore was initialized with a size of 0, .. warning:: If this semaphore was initialized with a size of 0,
...@@ -171,6 +237,7 @@ class Semaphore(object): ...@@ -171,6 +237,7 @@ class Semaphore(object):
def __exit__(self, t, v, tb): def __exit__(self, t, v, tb):
self.release() self.release()
class BoundedSemaphore(Semaphore): class BoundedSemaphore(Semaphore):
""" """
A bounded semaphore checks to make sure its current value doesn't A bounded semaphore checks to make sure its current value doesn't
......
...@@ -614,6 +614,7 @@ class loop(object): ...@@ -614,6 +614,7 @@ class loop(object):
callback = cb.callback callback = cb.callback
args = cb.args args = cb.args
if callback is None or args is None: if callback is None or args is None:
# it's been stopped
continue continue
cb.callback = None cb.callback = None
...@@ -799,6 +800,7 @@ class loop(object): ...@@ -799,6 +800,7 @@ class loop(object):
cb = callback(func, args) cb = callback(func, args)
self._callbacks.append(cb) self._callbacks.append(cb)
self.ref() self.ref()
return cb return cb
def _format(self): def _format(self):
...@@ -838,6 +840,11 @@ class loop(object): ...@@ -838,6 +840,11 @@ class loop(object):
raise ValueError('operation on destroyed loop') raise ValueError('operation on destroyed loop')
return self._ptr.activecnt return self._ptr.activecnt
# For times when *args is captured but often not passed (empty),
# we can avoid keeping the new tuple that was created for *args
# around by using a constant.
_NOARGS = ()
class callback(object): class callback(object):
...@@ -845,7 +852,7 @@ class callback(object): ...@@ -845,7 +852,7 @@ class callback(object):
def __init__(self, callback, args): def __init__(self, callback, args):
self.callback = callback self.callback = callback
self.args = args self.args = args or _NOARGS
def stop(self): def stop(self):
self.callback = None self.callback = None
...@@ -883,7 +890,7 @@ class callback(object): ...@@ -883,7 +890,7 @@ class callback(object):
class watcher(object): class watcher(object):
def __init__(self, _loop, ref=True, priority=None, args=tuple()): def __init__(self, _loop, ref=True, priority=None, args=_NOARGS):
self.loop = _loop self.loop = _loop
if ref: if ref:
self._flags = 0 self._flags = 0
...@@ -892,6 +899,7 @@ class watcher(object): ...@@ -892,6 +899,7 @@ class watcher(object):
self.args = None self.args = None
self._callback = None self._callback = None
self._handle = ffi.new_handle(self) self._handle = ffi.new_handle(self)
# XXX This parses the C every time. Is that expensive? Can we do better?
self._gwatcher = ffi.new('struct gevent_' + self._watcher_type + '*') self._gwatcher = ffi.new('struct gevent_' + self._watcher_type + '*')
self._watcher = ffi.addressof(self._gwatcher.watcher) self._watcher = ffi.addressof(self._gwatcher.watcher)
self._gwatcher.handle = self._handle self._gwatcher.handle = self._handle
...@@ -959,7 +967,7 @@ class watcher(object): ...@@ -959,7 +967,7 @@ class watcher(object):
if callback is None: if callback is None:
raise TypeError('callback must be callable, not None') raise TypeError('callback must be callable, not None')
self.callback = callback self.callback = callback
self.args = args self.args = args or _NOARGS
self._libev_unref() self._libev_unref()
self._watcher_start(self.loop._ptr, self._watcher) self._watcher_start(self.loop._ptr, self._watcher)
self.loop._keepaliveset.add(self) self.loop._keepaliveset.add(self)
...@@ -985,7 +993,7 @@ class watcher(object): ...@@ -985,7 +993,7 @@ class watcher(object):
def feed(self, revents, callback, *args): def feed(self, revents, callback, *args):
self.callback = callback self.callback = callback
self.args = args self.args = args or _NOARGS
if self._flags & 6 == 4: if self._flags & 6 == 4:
self.loop.unref() self.loop.unref()
self._flags |= 2 self._flags |= 2
...@@ -1017,6 +1025,7 @@ class io(watcher): ...@@ -1017,6 +1025,7 @@ class io(watcher):
watcher.__init__(self, loop, ref=ref, priority=priority, args=(fd, events)) watcher.__init__(self, loop, ref=ref, priority=priority, args=(fd, events))
def start(self, callback, *args, **kwargs): def start(self, callback, *args, **kwargs):
args = args or _NOARGS
if kwargs.get('pass_events'): if kwargs.get('pass_events'):
args = (GEVENT_CORE_EVENTS, ) + args args = (GEVENT_CORE_EVENTS, ) + args
super(io, self).start(callback, *args) super(io, self).start(callback, *args)
...@@ -1067,7 +1076,7 @@ class timer(watcher): ...@@ -1067,7 +1076,7 @@ class timer(watcher):
raise TypeError('callback must be callable, not None') raise TypeError('callback must be callable, not None')
update = kw.get("update", True) update = kw.get("update", True)
self.callback = callback self.callback = callback
self.args = args self.args = args or _NOARGS
self._libev_unref() # LIBEV_UNREF self._libev_unref() # LIBEV_UNREF
...@@ -1083,7 +1092,7 @@ class timer(watcher): ...@@ -1083,7 +1092,7 @@ class timer(watcher):
def again(self, callback, *args, **kw): def again(self, callback, *args, **kw):
update = kw.get("update", True) update = kw.get("update", True)
self.callback = callback self.callback = callback
self.args = args self.args = args or _NOARGS
self._libev_unref() self._libev_unref()
if update: if update:
libev.ev_now_update(self.loop._ptr) libev.ev_now_update(self.loop._ptr)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment