Commit 282eae48 authored by Jason Madden's avatar Jason Madden

Make loops automatically break when their owning hub is destroyed.

And stop running callbacks.

This fixes #1686 and fixes #1669.
parent a403cb7a
Make destroying a hub try harder to more forcibly stop loop processing
when there are outstanding callbacks or IO operations scheduled.
Thanks to Josh Snyder (:issue:`1686`) and Jan-Philip Gehrcke
(:issue:`1669`).
...@@ -14,6 +14,7 @@ from gevent._ffi import GEVENT_DEBUG_LEVEL ...@@ -14,6 +14,7 @@ from gevent._ffi import GEVENT_DEBUG_LEVEL
from gevent._ffi import TRACE from gevent._ffi import TRACE
from gevent._ffi.callback import callback from gevent._ffi.callback import callback
from gevent._compat import PYPY from gevent._compat import PYPY
from gevent.exceptions import HubDestroyed
from gevent import getswitchinterval from gevent import getswitchinterval
...@@ -601,6 +602,11 @@ class AbstractLoop(object): ...@@ -601,6 +602,11 @@ class AbstractLoop(object):
self.handle_error(None, SystemError, SystemError(message), None) self.handle_error(None, SystemError, SystemError(message), None)
def handle_error(self, context, type, value, tb): def handle_error(self, context, type, value, tb):
if type is HubDestroyed:
self._callbacks.clear()
self.break_()
return
handle_error = None handle_error = None
error_handler = self.error_handler error_handler = self.error_handler
if error_handler is not None: if error_handler is not None:
......
...@@ -10,6 +10,7 @@ from __future__ import absolute_import ...@@ -10,6 +10,7 @@ from __future__ import absolute_import
from __future__ import division from __future__ import division
from __future__ import print_function from __future__ import print_function
from greenlet import GreenletExit
__all__ = [ __all__ = [
'LoopExit', 'LoopExit',
...@@ -116,3 +117,20 @@ class InvalidThreadUseError(RuntimeError): ...@@ -116,3 +117,20 @@ class InvalidThreadUseError(RuntimeError):
.. versionadded:: 1.5a3 .. versionadded:: 1.5a3
""" """
class HubDestroyed(GreenletExit):
"""
Internal exception, raised when we're trying to destroy the
hub and we want the loop to stop running callbacks now.
This must not be subclassed; the type is tested by identity.
Clients outside of gevent must not raise this exception.
.. versionadded:: NEXT
"""
def __init__(self, destroy_loop):
GreenletExit.__init__(self, destroy_loop)
self.destroy_loop = destroy_loop
...@@ -31,6 +31,7 @@ __all__ = [ ...@@ -31,6 +31,7 @@ __all__ = [
from gevent._config import config as GEVENT_CONFIG from gevent._config import config as GEVENT_CONFIG
from gevent._compat import thread_mod_name from gevent._compat import thread_mod_name
from gevent._compat import reraise
from gevent._util import readproperty from gevent._util import readproperty
from gevent._util import Lazy from gevent._util import Lazy
from gevent._util import gmctime from gevent._util import gmctime
...@@ -54,6 +55,7 @@ iwait = _hub_primitives.iwait_on_objects ...@@ -54,6 +55,7 @@ iwait = _hub_primitives.iwait_on_objects
from gevent.exceptions import LoopExit from gevent.exceptions import LoopExit
from gevent.exceptions import HubDestroyed
from gevent._waiter import Waiter from gevent._waiter import Waiter
...@@ -64,6 +66,7 @@ get_thread_ident = __import__(thread_mod_name).get_ident ...@@ -64,6 +66,7 @@ get_thread_ident = __import__(thread_mod_name).get_ident
MAIN_THREAD_IDENT = get_thread_ident() # XXX: Assuming import is done on the main thread. MAIN_THREAD_IDENT = get_thread_ident() # XXX: Assuming import is done on the main thread.
def spawn_raw(function, *args, **kwargs): def spawn_raw(function, *args, **kwargs):
""" """
Create a new :class:`greenlet.greenlet` object and schedule it to Create a new :class:`greenlet.greenlet` object and schedule it to
...@@ -529,6 +532,11 @@ class Hub(WaitOperationsGreenlet): ...@@ -529,6 +532,11 @@ class Hub(WaitOperationsGreenlet):
""" """
type, value, tb = self._normalize_exception(type, value, tb) type, value, tb = self._normalize_exception(type, value, tb)
if type is HubDestroyed:
# We must continue propagating this for it to properly
# exit.
reraise(type, value, tb)
if not issubclass(type, self.NOT_ERROR): if not issubclass(type, self.NOT_ERROR):
self.print_exception(context, type, value, tb) self.print_exception(context, type, value, tb)
if context is None or issubclass(type, self.SYSTEM_ERROR): if context is None or issubclass(type, self.SYSTEM_ERROR):
...@@ -773,6 +781,9 @@ class Hub(WaitOperationsGreenlet): ...@@ -773,6 +781,9 @@ class Hub(WaitOperationsGreenlet):
is running in. If the hub is destroyed by a different thread is running in. If the hub is destroyed by a different thread
after a ``fork()``, for example, expect some garbage to leak. after a ``fork()``, for example, expect some garbage to leak.
""" """
if destroy_loop is None:
destroy_loop = not self.loop.default
if self.periodic_monitoring_thread is not None: if self.periodic_monitoring_thread is not None:
self.periodic_monitoring_thread.kill() self.periodic_monitoring_thread.kill()
self.periodic_monitoring_thread = None self.periodic_monitoring_thread = None
...@@ -791,7 +802,7 @@ class Hub(WaitOperationsGreenlet): ...@@ -791,7 +802,7 @@ class Hub(WaitOperationsGreenlet):
# loop; if we destroy the loop and then switch into the hub, # loop; if we destroy the loop and then switch into the hub,
# things will go VERY, VERY wrong. # things will go VERY, VERY wrong.
try: try:
self.throw(GreenletExit) self.throw(HubDestroyed(destroy_loop))
except LoopExit: except LoopExit:
# Expected. # Expected.
pass pass
...@@ -801,8 +812,6 @@ class Hub(WaitOperationsGreenlet): ...@@ -801,8 +812,6 @@ class Hub(WaitOperationsGreenlet):
# in this case. # in this case.
pass pass
if destroy_loop is None:
destroy_loop = not self.loop.default
if destroy_loop: if destroy_loop:
if get_loop() is self.loop: if get_loop() is self.loop:
# Don't let anyone try to reuse this # Don't let anyone try to reuse this
......
...@@ -41,6 +41,7 @@ import os ...@@ -41,6 +41,7 @@ import os
import traceback import traceback
import signal as signalmodule import signal as signalmodule
from gevent import getswitchinterval from gevent import getswitchinterval
from gevent.exceptions import HubDestroyed
__all__ = ['get_version', __all__ = ['get_version',
...@@ -334,6 +335,10 @@ cdef class CallbackFIFO(object): ...@@ -334,6 +335,10 @@ cdef class CallbackFIFO(object):
self.head = None self.head = None
self.tail = None self.tail = None
cdef inline clear(self):
self.head = None
self.tail = None
cdef inline callback popleft(self): cdef inline callback popleft(self):
cdef callback head = self.head cdef callback head = self.head
self.head = head.next self.head = head.next
...@@ -342,7 +347,6 @@ cdef class CallbackFIFO(object): ...@@ -342,7 +347,6 @@ cdef class CallbackFIFO(object):
head.next = None head.next = None
return head return head
cdef inline append(self, callback new_tail): cdef inline append(self, callback new_tail):
assert not new_tail.next assert not new_tail.next
if self.tail is None: if self.tail is None:
...@@ -353,7 +357,6 @@ cdef class CallbackFIFO(object): ...@@ -353,7 +357,6 @@ cdef class CallbackFIFO(object):
return return
self.tail = self.head self.tail = self.head
assert self.head is not None assert self.head is not None
old_tail = self.tail old_tail = self.tail
old_tail.next = new_tail old_tail.next = new_tail
...@@ -560,6 +563,11 @@ cdef public class loop [object PyGeventLoopObject, type PyGeventLoop_Type]: ...@@ -560,6 +563,11 @@ cdef public class loop [object PyGeventLoopObject, type PyGeventLoop_Type]:
cpdef handle_error(self, context, type, value, tb): cpdef handle_error(self, context, type, value, tb):
cdef object handle_error cdef object handle_error
cdef object error_handler = self.error_handler cdef object error_handler = self.error_handler
if type is HubDestroyed:
self._callbacks.clear()
self.break_()
return
if error_handler is not None: if error_handler is not None:
# we do want to do getattr every time so that setting Hub.handle_error property just works # we do want to do getattr every time so that setting Hub.handle_error property just works
handle_error = getattr(error_handler, 'handle_error', error_handler) handle_error = getattr(error_handler, 'handle_error', error_handler)
......
# -*- coding: utf-8 -*-
"""
Tests for https://github.com/gevent/gevent/issues/1686
which is about destroying a hub when there are active
callbacks or IO in operation.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import unittest
from gevent import testing as greentest
# Don't let the testrunner put us in a process with other
# tests; we are strict on the state of the hub and greenlets.
# pragma: testrunner-no-combine
@greentest.skipOnWindows("Uses os.fork")
class TestDestroyInChildWithActiveSpawn(unittest.TestCase):
def test(self): # pylint:disable=too-many-locals
# If this test is broken, there are a few failure modes.
# - In the original examples, the parent process just hangs, because the
# child has raced ahead, spawned the greenlet and read the data. When the
# greenlet goes to read in the parent, it blocks, and the hub and loop
# wait for it.
# - Here, our child detects the greenlet ran when it shouldn't and
# raises an error, which translates to a non-zero exit status,
# which the parent checks for and fails by raising an exception before
# returning control to the hub. We can replicate the hang by removing the
# assertion in the child.
from time import sleep as hang
from gevent import get_hub
from gevent import spawn
from gevent.socket import wait_read
from gevent.os import nb_read
from gevent.os import nb_write
from gevent.os import make_nonblocking
from gevent.os import fork
from gevent.os import waitpid
pipe_read_fd, pipe_write_fd = os.pipe()
make_nonblocking(pipe_read_fd)
make_nonblocking(pipe_write_fd)
run = []
def reader():
run.append(1)
return nb_read(pipe_read_fd, 4096)
# Put data in the pipe
DATA = b'test'
nb_write(pipe_write_fd, DATA)
# Make sure we're ready to read it
wait_read(pipe_read_fd)
# Schedule a greenlet to start
reader = spawn(reader)
hub = get_hub()
pid = fork()
if pid == 0:
# Child destroys the hub. The reader should not have run.
hub.destroy(destroy_loop=True)
self.assertFalse(run)
os._exit(0)
return
# The parent.
# Briefly prevent us from spinning our event loop.
hang(0.5)
wait_child_result = waitpid(pid, 0)
self.assertEqual(wait_child_result, (pid, 0))
# We should get the data; the greenlet only runs in the parent.
data = reader.get()
self.assertEqual(run, [1])
self.assertEqual(data, DATA)
if __name__ == '__main__':
greentest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment