Commit a1815c04 authored by Jason Madden's avatar Jason Madden

Fix libuv crashes if the default loop is destroyed and then used again.

Fixes #1580
parent 9535660a
Fix destroying the libuv default loop and then using the default loop
again.
......@@ -397,10 +397,8 @@ class AbstractLoop(object):
def _init_loop_and_aux_watchers(self, flags=None, default=None):
self._ptr = self._init_loop(flags, default)
# self._check is a watcher that runs in each iteration of the
# mainloop, just after the blocking call. It's point is to handle
# signals. It doesn't run watchers or callbacks, it just exists to give
......@@ -541,12 +539,13 @@ class AbstractLoop(object):
raise NotImplementedError()
def destroy(self):
if self._ptr:
ptr = self.ptr
if ptr:
try:
if not self._can_destroy_loop(self._ptr):
if not self._can_destroy_loop(ptr):
return False
self._stop_aux_watchers()
self._destroy_loop(self._ptr)
self._destroy_loop(ptr)
finally:
# not ffi.NULL, we don't want something that can be
# passed to C and crash later. This will create nice friendly
......@@ -566,6 +565,7 @@ class AbstractLoop(object):
@property
def ptr(self):
# Use this when you need to be sure the pointer is valid.
return self._ptr
@property
......@@ -650,7 +650,7 @@ class AbstractLoop(object):
@property
def default(self):
return self._default if self._ptr else False
return self._default if self.ptr else False
@property
def iteration(self):
......@@ -730,9 +730,11 @@ class AbstractLoop(object):
return cb
def _format(self):
if not self._ptr:
ptr = self.ptr
if not ptr:
return 'destroyed'
msg = self.backend
msg = "backend=" + self.backend
msg += ' ptr=' + str(ptr)
if self.default:
msg += ' default'
msg += ' pending=%s' % self.pendingcnt
......@@ -759,6 +761,6 @@ class AbstractLoop(object):
@property
def activecnt(self):
if not self._ptr:
if not self.ptr:
raise ValueError('operation on destroyed loop')
return 0
......@@ -268,7 +268,7 @@ class watcher(object):
raise NotImplementedError()
def _watcher_ffi_stop(self):
self._watcher_stop(self.loop._ptr, self._watcher)
self._watcher_stop(self.loop.ptr, self._watcher)
def _watcher_ffi_ref(self):
raise NotImplementedError()
......
......@@ -100,14 +100,15 @@ static void _gevent_fs_poll_callback3(void* handlep, int status, const uv_stat_t
static void gevent_uv_walk_callback_close(uv_handle_t* handle, void* arg)
{
if( handle && !uv_is_closing(handle) ) {
uv_close(handle, NULL);
uv_close(handle, NULL);
handle->data = NULL;
}
}
static void gevent_close_all_handles(uv_loop_t* loop)
{
if (loop) {
uv_walk(loop, gevent_uv_walk_callback_close, NULL);
uv_walk(loop, gevent_uv_walk_callback_close, NULL);
}
}
......@@ -155,7 +156,7 @@ static void* _gevent_uv_calloc(size_t count, size_t size)
void* result;
result = _gevent_uv_malloc(count * size);
if(result) {
memset(result, 0, count * size);
memset(result, 0, count * size);
}
return result;
}
......@@ -163,9 +164,9 @@ static void* _gevent_uv_calloc(size_t count, size_t size)
static void gevent_set_uv_alloc()
{
uv_replace_allocator(_gevent_uv_malloc,
_gevent_uv_realloc,
_gevent_uv_calloc,
_gevent_uv_free);
_gevent_uv_realloc,
_gevent_uv_calloc,
_gevent_uv_free);
}
#ifdef __clang__
......
......@@ -117,7 +117,7 @@ class loop(AbstractLoop):
self._io_watchers = dict()
self._fork_watchers = set()
self._pid = os.getpid()
self._default = self._ptr == libuv.uv_default_loop()
self._default = (self._ptr == libuv.uv_default_loop())
self._queued_callbacks = []
def _queue_callback(self, watcher_ptr, revents):
......@@ -148,8 +148,18 @@ class loop(AbstractLoop):
_signal_idle = None
@property
def ptr(self):
if not self._ptr:
return None
if self._ptr and not self._ptr.data:
# Another instance of the Python loop destroyed
# the C loop. It was probably the default.
self._ptr = None
return self._ptr
def _init_and_start_check(self):
libuv.uv_check_init(self._ptr, self._check)
libuv.uv_check_init(self.ptr, self._check)
libuv.uv_check_start(self._check, libuv.python_check_callback)
libuv.uv_unref(self._check)
......@@ -167,7 +177,7 @@ class loop(AbstractLoop):
# scheduled, this should also be the same and unnecessary?
# libev does takes this basic approach on Windows.
self._signal_idle = ffi.new("uv_timer_t*")
libuv.uv_timer_init(self._ptr, self._signal_idle)
libuv.uv_timer_init(self.ptr, self._signal_idle)
self._signal_idle.data = self._handle_to_self
sig_cb = ffi.cast('void(*)(uv_timer_t*)', libuv.python_check_callback)
libuv.uv_timer_start(self._signal_idle,
......@@ -210,12 +220,12 @@ class loop(AbstractLoop):
super(loop, self)._run_callbacks()
def _init_and_start_prepare(self):
libuv.uv_prepare_init(self._ptr, self._prepare)
libuv.uv_prepare_init(self.ptr, self._prepare)
libuv.uv_prepare_start(self._prepare, libuv.python_prepare_callback)
libuv.uv_unref(self._prepare)
def _init_callback_timer(self):
libuv.uv_check_init(self._ptr, self._timer0)
libuv.uv_check_init(self.ptr, self._timer0)
def _stop_callback_timer(self):
libuv.uv_check_stop(self._timer0)
......@@ -328,51 +338,64 @@ class loop(AbstractLoop):
self._start_callback_timer()
libuv.uv_ref(self._timer0)
def _can_destroy_loop(self, ptr):
# We're being asked to destroy a loop that's,
# at the time it was constructed, was the default loop.
# If loop objects were constructed more than once,
# it may have already been destroyed, though.
# We track this in the data member.
return ptr.data
return ptr
def _destroy_loop(self, ptr):
ptr.data = ffi.NULL
libuv.uv_stop(ptr)
def __close_loop(self, ptr):
closed_failed = 1
libuv.gevent_close_all_handles(ptr)
while closed_failed:
closed_failed = libuv.uv_loop_close(ptr)
if not closed_failed:
break
closed_failed = libuv.uv_loop_close(ptr)
if closed_failed:
assert closed_failed == libuv.UV_EBUSY
if closed_failed != libuv.UV_EBUSY:
raise SystemError("Unknown close failure reason", closed_failed)
# We already closed all the handles. Run the loop
# once to let them be cut off from the loop.
ran_has_more_callbacks = libuv.uv_run(ptr, libuv.UV_RUN_ONCE)
if ran_has_more_callbacks:
libuv.uv_run(ptr, libuv.UV_RUN_NOWAIT)
closed_failed = libuv.uv_loop_close(ptr)
assert closed_failed == 0, closed_failed
# Destroy the native resources *after* we have closed
# the loop. If we do it before, walking the handles
# attached to the loop is likely to segfault.
libuv.gevent_zero_check(self._check)
libuv.gevent_zero_check(self._timer0)
libuv.gevent_zero_prepare(self._prepare)
libuv.gevent_zero_timer(self._signal_idle)
del self._check
del self._prepare
del self._signal_idle
del self._timer0
def _destroy_loop(self, ptr):
# We're being asked to destroy a loop that's, potentially, at
# the time it was constructed, was the default loop. If loop
# objects were constructed more than once, it may have already
# been destroyed, though. We track this in the data member.
data = ptr.data
ptr.data = ffi.NULL
try:
if data:
libuv.uv_stop(ptr)
libuv.gevent_close_all_handles(ptr)
finally:
ptr.data = ffi.NULL
try:
if data:
self.__close_loop(ptr)
finally:
# Destroy the native resources *after* we have closed
# the loop. If we do it before, walking the handles
# attached to the loop is likely to segfault.
# Note that these may have been closed already if the default loop was shared.
if data:
libuv.gevent_zero_check(self._check)
libuv.gevent_zero_check(self._timer0)
libuv.gevent_zero_prepare(self._prepare)
libuv.gevent_zero_timer(self._signal_idle)
libuv.gevent_zero_loop(ptr)
libuv.gevent_zero_loop(ptr)
del self._check
del self._prepare
del self._signal_idle
del self._timer0
# Destroy any watchers we're still holding on to.
del self._io_watchers
del self._fork_watchers
del self._child_watchers
# Destroy any watchers we're still holding on to.
del self._io_watchers
del self._fork_watchers
del self._child_watchers
def debug(self):
......@@ -402,7 +425,7 @@ class loop(AbstractLoop):
libuv.uv_is_active(handle),
libuv.uv_is_closing(handle)))
libuv.uv_walk(self._ptr,
libuv.uv_walk(self.ptr,
ffi.callback("void(*)(uv_handle_t*,void*)",
walk),
ffi.NULL)
......@@ -416,7 +439,7 @@ class loop(AbstractLoop):
pass
def break_(self, how=None):
libuv.uv_stop(self._ptr)
libuv.uv_stop(self.ptr)
def reinit(self):
# TODO: How to implement? We probably have to simply
......@@ -440,7 +463,7 @@ class loop(AbstractLoop):
# (https://github.com/joyent/libuv/issues/1405)
# In 1.12, the uv_loop_fork function was added (by gevent!)
libuv.uv_loop_fork(self._ptr)
libuv.uv_loop_fork(self.ptr)
_prepare_ran_callbacks = False
......@@ -540,14 +563,14 @@ class loop(AbstractLoop):
# libuv's now is expressed as an integer number of
# milliseconds, so to get it compatible with time.time units
# that this method is supposed to return, we have to divide by 1000.0
now = libuv.uv_now(self._ptr)
now = libuv.uv_now(self.ptr)
return now / 1000.0
def update_now(self):
libuv.uv_update_time(self._ptr)
libuv.uv_update_time(self.ptr)
def fileno(self):
if self._ptr:
if self.ptr:
fd = libuv.uv_backend_fd(self._ptr)
if fd >= 0:
return fd
......@@ -563,7 +586,7 @@ class loop(AbstractLoop):
return
self._sigchld_watcher = ffi.new('uv_signal_t*')
libuv.uv_signal_init(self._ptr, self._sigchld_watcher)
libuv.uv_signal_init(self.ptr, self._sigchld_watcher)
self._sigchld_watcher.data = self._handle_to_self
libuv.uv_signal_start(self._sigchld_watcher,
......
......@@ -619,7 +619,7 @@ class timer(_base.TimerMixin, watcher):
_again = False
def _watcher_ffi_init(self, args):
self._watcher_init(self.loop._ptr, self._watcher)
self._watcher_init(self.loop.ptr, self._watcher)
self._after, self._repeat = args
if self._after and self._after < 0.001:
import warnings
......@@ -674,7 +674,7 @@ class stat(_base.StatMixin, watcher):
return data
def _watcher_ffi_init(self, args):
return self._watcher_init(self.loop._ptr, self._watcher)
return self._watcher_init(self.loop.ptr, self._watcher)
MIN_STAT_INTERVAL = 0.1074891 # match libev; 0.0 is default
......@@ -707,7 +707,7 @@ class signal(_base.SignalMixin, watcher):
_watcher_callback_name = '_gevent_signal_callback1'
def _watcher_ffi_init(self, args):
self._watcher_init(self.loop._ptr, self._watcher)
self._watcher_init(self.loop.ptr, self._watcher)
self.ref = False # libev doesn't ref these by default
......
......@@ -17,7 +17,7 @@
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from __future__ import print_function
from functools import wraps
......@@ -29,7 +29,6 @@ def wrap_error_fatal(method):
def wrapper(self, *args, **kwargs):
# XXX should also be able to do gevent.SYSTEM_ERROR = object
# which is a global default to all hubs
gevent.get_hub().SYSTEM_ERROR = object
try:
return method(self, *args, **kwargs)
......
......@@ -44,6 +44,3 @@ class QuietHub(Hub):
# see handle_error
return
return Hub.print_exception(self, context, t, v, tb)
def destroy(self, destroy_loop=None):
raise AssertionError("Do not destroy the hub in a unittest")
......@@ -29,6 +29,7 @@ import gevent
from gevent._util import LazyOnClass
from gevent._compat import perf_counter
from gevent._compat import get_clock_info
from gevent._hub_local import get_hub_if_exists
from . import sysinfo
from . import params
......@@ -277,7 +278,9 @@ class TestCase(TestCaseMetaClass("NewBase",
# so that doesn't always happen. test__pool.py:TestPoolYYY.test_async
# tends to show timeouts that are too short if we don't.
# XXX: Should some core part of the loop call this?
gevent.get_hub().loop.update_now()
hub = get_hub_if_exists()
if hub and hub.loop:
hub.loop.update_now()
self.close_on_teardown = []
self.addCleanup(self._tearDownCloseOnTearDown)
......
......@@ -261,14 +261,12 @@ class Discovery(object):
ignored=(),
coverage=False,
package=None,
configured_ignore_coverage=(),
configured_test_options=None,
config=None,
):
# pylint:disable=too-many-locals,too-many-branches
self.configured_test_options = configured_test_options or {}
self.config = config or {}
self.ignore = set(ignored or ())
self.tests = tests
self.configured_test_options = configured_test_options
self.configured_test_options = config.get('TEST_FILE_OPTIONS', set())
if ignore_files:
ignore_files = ignore_files.split(',')
......@@ -276,14 +274,17 @@ class Discovery(object):
self.ignore.update(set(load_list_from_file(f, package)))
if coverage:
self.ignore.update(configured_ignore_coverage)
self.ignore.update(config.get('IGNORE_COVERAGE', set()))
if package:
self.package = package
self.package_dir = _dir_from_package_name(package)
class Discovered(object):
def __init__(self, package, configured_test_options, ignore):
def __init__(self, package, configured_test_options, ignore, config):
self.orig_dir = os.getcwd()
self.configured_run_alone = config['RUN_ALONE']
self.configured_failing_tests = config['FAILING_TESTS']
self.package = package
self.configured_test_options = configured_test_options
self.ignore = ignore
......@@ -332,9 +333,17 @@ class Discovery(object):
or (_import_main.search(contents) and _main.search(contents))
)
def __has_config(self, filename):
return (
RUN_LEAKCHECKS
or filename in self.configured_test_options
or filename in self.configured_run_alone
or matches(self.configured_failing_tests, filename)
)
def __can_monkey_combine(self, filename, contents):
return (
filename not in self.configured_test_options
not self.__has_config(filename)
and self.__makes_simple_monkey_patch(contents)
and self.__file_allows_monkey_combine(contents)
and self.__file_allows_combine(contents)
......@@ -347,7 +356,7 @@ class Discovery(object):
def __can_nonmonkey_combine(self, filename, contents):
return (
filename not in self.configured_test_options
not self.__has_config(filename)
and self.__makes_no_monkey_patch(contents)
and self.__file_allows_combine(contents)
and self.__calls_unittest_main_toplevel(contents)
......@@ -367,7 +376,6 @@ class Discovery(object):
# XXX: Rework this to allow test combining (it could write the files out and return
# them directly; we would use 'python -m gevent.monkey --module unittest ...)
self.to_import.append(qualified_name)
# TODO: I'm pretty sure combining breaks keeping particular tests standalone
elif self.__can_monkey_combine(filename, contents):
self.std_monkey_patch_files.append(qualified_name if self.package else filename)
elif self.__can_nonmonkey_combine(filename, contents):
......@@ -457,15 +465,17 @@ class Discovery(object):
def visit_files(self, filenames):
for filename in filenames:
self.visit_file(filename)
self.__expand_imports()
with Discovery._in_dir(self.orig_dir):
self.__expand_imports()
self.__combine_commands(self.std_monkey_patch_files)
self.__combine_commands(self.no_monkey_patch_files)
@staticmethod
@contextmanager
def _in_package_dir(self):
def _in_dir(package_dir):
olddir = os.getcwd()
if self.package_dir:
os.chdir(self.package_dir)
if package_dir:
os.chdir(package_dir)
try:
yield
finally:
......@@ -474,10 +484,11 @@ class Discovery(object):
@Lazy
def discovered(self):
tests = self.tests
discovered = self.Discovered(self.package, self.configured_test_options, self.ignore)
discovered = self.Discovered(self.package, self.configured_test_options,
self.ignore, self.config)
# We need to glob relative names, our config is based on filenames still
with self._in_package_dir():
with self._in_dir(self.package_dir):
if not tests:
tests = set(glob.glob('test_*.py')) - set(['test_support.py'])
else:
......@@ -716,7 +727,6 @@ def main():
FAILING_TESTS = []
IGNORED_TESTS = []
RUN_ALONE = []
TEST_FILE_OPTIONS = {}
coverage = False
if options.coverage or os.environ.get("GEVENTTEST_COVERAGE"):
......@@ -753,9 +763,6 @@ def main():
FAILING_TESTS = config['FAILING_TESTS']
IGNORED_TESTS = config['IGNORED_TESTS']
RUN_ALONE = config['RUN_ALONE']
TEST_FILE_OPTIONS = config['TEST_FILE_OPTIONS']
IGNORE_COVERAGE = config['IGNORE_COVERAGE']
tests = Discovery(
options.tests,
......@@ -763,8 +770,7 @@ def main():
ignored=IGNORED_TESTS,
coverage=coverage,
package=options.package,
configured_ignore_coverage=IGNORE_COVERAGE,
configured_test_options=TEST_FILE_OPTIONS,
config=config,
)
if options.discover:
for cmd, options in tests:
......
......@@ -364,7 +364,6 @@ class Definitions(DefinitionsBase):
it seems highly load dependent. Observed with both libev and libuv.
""",
when=TRAVIS & (PYPY | OSX),
run_alone=ALWAYS,
# This often takes much longer on PyPy on CI.
options={'timeout': (CI & PYPY, 180)},
)
......@@ -438,11 +437,11 @@ class Definitions(DefinitionsBase):
""",
)
test__pool = test__queue = RunAlone(
test__pool = RunAlone(
"""
On a heavily loaded box, these can all take upwards of 200s.
""",
when=CI & LEAKTEST | PY3
when=CI & LEAKTEST
)
test_socket = RunAlone(
......
......@@ -110,7 +110,6 @@ class AbstractTestMixin(object):
except ImportError:
if modname in modules.OPTIONAL_MODULES:
msg = "Unable to import %s" % modname
warnings.warn(msg) # make the testrunner print it
raise unittest.SkipTest(msg)
raise
......
......@@ -5,6 +5,20 @@ import unittest
class TestDestroyDefaultLoop(unittest.TestCase):
def tearDown(self):
self._reset_hub()
super(TestDestroyDefaultLoop, self).tearDown()
def _reset_hub(self):
from gevent._hub_local import set_hub
from gevent._hub_local import set_loop
from gevent._hub_local import get_hub_if_exists
hub = get_hub_if_exists()
if hub is not None:
hub.destroy(destroy_loop=True)
set_hub(None)
set_loop(None)
def test_destroy_gc(self):
# Issue 1098: destroying the default loop
# while using the C extension could crash
......@@ -19,6 +33,7 @@ class TestDestroyDefaultLoop(unittest.TestCase):
loop = gevent.config.loop(default=True)
self.assertTrue(loop.default)
# Destroy it
loop.destroy()
# It no longer claims to be the default
self.assertFalse(loop.default)
......@@ -31,10 +46,7 @@ class TestDestroyDefaultLoop(unittest.TestCase):
# crash only happened when that greenlet object
# was collected at exit time, which was most common
# in CPython 3.5)
from gevent._hub_local import set_hub
set_hub(None)
self._reset_hub()
def test_destroy_two(self):
# Get two new loop object, but using the default
......@@ -50,6 +62,10 @@ class TestDestroyDefaultLoop(unittest.TestCase):
# Destroy the second. This doesn't crash.
loop2.destroy()
self.assertFalse(loop2.default)
self.assertFalse(loop2.ptr)
self._reset_hub()
self.assertTrue(gevent.get_hub().loop.ptr)
if __name__ == '__main__':
......
......@@ -30,4 +30,6 @@ class Test_udp_client(util.TestServer):
if __name__ == '__main__':
main()
# Running this following test__example_portforwarder on Appveyor
# doesn't work in the same process for some reason.
main() # pragma: testrunner-no-combine
......@@ -86,7 +86,8 @@ class TestTree(greentest.TestCase):
# so perhaps we need a GC?
for _ in range(3):
gc.collect()
gevent.get_hub().resolver = None # Reset resolver, don't need to see it
gevent.get_hub().threadpool = None # ditto the pool
glets = []
l = MyLocal(42)
assert l
......@@ -135,6 +136,10 @@ class TestTree(greentest.TestCase):
def _normalize_tree_format(self, value):
import re
hexobj = re.compile('0x[0123456789abcdef]+L?', re.I)
hub_repr = repr(gevent.get_hub())
value = value.replace(hub_repr, "<HUB>")
value = hexobj.sub('X', value)
value = value.replace('epoll', 'select')
value = value.replace('select', 'default')
......@@ -160,26 +165,26 @@ class TestTree(greentest.TestCase):
: Greenlet Locals:
: Local <class '__main__.MyLocal'> at X
: {'foo': 42}
+--- <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
+--- <HUB>
: Parent: <greenlet.greenlet object at X>
+--- <Greenlet "Greenlet-1" at X: t2>; finished with value <Greenlet "CustomName-0" at 0x
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
: Parent: <HUB>
| +--- <Greenlet "CustomName-0" at X: t1>; finished with exception ExpectedException()
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
: Parent: <HUB>
+--- <Greenlet "Greenlet-2" at X: t2>; finished with value <Greenlet "CustomName-4" at 0x
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
: Parent: <HUB>
| +--- <Greenlet "CustomName-4" at X: t1>; finished with exception ExpectedException()
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
: Parent: <HUB>
+--- <Greenlet "Greenlet-3" at X: t3>; finished with value <Greenlet "Greenlet-5" at X
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
: Parent: <HUB>
: Spawn Tree Locals
: {'stl': 'STL'}
| +--- <Greenlet "Greenlet-5" at X: t2>; finished with value <Greenlet "CustomName-6" at 0x
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
: Parent: <HUB>
| +--- <Greenlet "CustomName-6" at X: t1>; finished with exception ExpectedException()
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
: Parent: <HUB>
+--- <Greenlet "Greenlet-7" at X: <bound method GreenletTree.current_tree of <class 'gevent.util.GreenletTree'>>>; finished with value <gevent.util.GreenletTree obje
Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
Parent: <HUB>
""".strip()
self.assertEqual(expected, value)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment