Commit b4ae48f1 authored by Jason Madden's avatar Jason Madden

Fix an interaction between the switch interval and libuv that could introduce...

Fix an interaction between the switch interval and libuv that could introduce delays processing large batches of callbacks.

Specifically, a .3 (idle signal timer) delay as UV_RUN_ONCE would pause if there was no other active timer or IO watcher.

Now, if there are still batches of callbacks to run, we explicitly use UV_RUN_NOWAIT to only poll for IO and queue callbacks without waiting at all.

This gets the time for the synthetic greenlet-launching benchmarks to match libev-cffi and be close to libuv-cext.

Fixes #1493
parent 3c453c72
......@@ -62,6 +62,16 @@
- Remove some undocumented, deprecated functions from the threadpool module.
- libuv: Fix a perceived slowness spawning many greenlets at the same time
without yielding to the event loop while having no active IO
watchers or timers. If the time spent launching greenlets exceeded
the switch interval and there were no other active watchers, then
the default IO poll time of about .3s would elapse between spawning
batches. This could theoretically apply for any non-switching
callbacks. This can be produced in synthetic benchmarks and other
special circumstances, but real applications are unlikely to be
affected. See :issue:`1493`.
1.5a2 (2019-10-21)
==================
......
......@@ -106,7 +106,7 @@ def bench_none(options):
def bench_gevent(options):
from gevent import spawn, sleep, get_hub
from gevent import spawn, sleep
return test(spawn, sleep, options)
......@@ -225,5 +225,6 @@ def main(argv=None):
Options(sleep=False, join=False, foo=1, bar='hello'),
inner_loops=N)
if __name__ == '__main__':
main()
......@@ -285,6 +285,7 @@ class AbstractCallbacks(object):
def assign_standard_callbacks(ffi, lib, callbacks_class, extras=()): # pylint:disable=unused-argument
# callbacks keeps these cdata objects alive at the python level
callbacks = callbacks_class(ffi)
extras = [extra if len(extra) == 2 else (extra, None) for extra in extras]
extras = tuple([(getattr(callbacks, name), error) for name, error in extras])
for (func, error_func) in ((callbacks.python_callback, None),
(callbacks.python_handle_error, None),
......@@ -321,11 +322,14 @@ else:
_NOARGS = ()
CALLBACK_CHECK_COUNT = 50
class AbstractLoop(object):
# pylint:disable=too-many-public-methods,too-many-instance-attributes
# How many callbacks we should run between checking against the
# switch interval.
CALLBACK_CHECK_COUNT = 50
error_handler = None
_CHECK_POINTER = None
......@@ -428,7 +432,7 @@ class AbstractLoop(object):
# moment there.
self.starting_timer_may_update_loop_time = True
try:
count = CALLBACK_CHECK_COUNT
count = self.CALLBACK_CHECK_COUNT
now = self.now()
expiration = now + getswitchinterval()
self._stop_callback_timer()
......@@ -479,7 +483,7 @@ class AbstractLoop(object):
# but we may have more, so before looping check our
# switch interval.
if count == 0 and self._callbacks:
count = CALLBACK_CHECK_COUNT
count = self.CALLBACK_CHECK_COUNT
self.update_now()
if self.now() >= expiration:
now = 0
......
......@@ -354,22 +354,20 @@ void* memset(void *b, int c, size_t len);
// call them
typedef void* GeventWatcherObject;
extern "Python" {
// Standard gevent._ffi.loop callbacks.
int python_callback(GeventWatcherObject handle, int revents);
void python_handle_error(GeventWatcherObject handle, int revents);
void python_stop(GeventWatcherObject handle);
void python_check_callback(uv_check_t* handle);
void python_prepare_callback(uv_prepare_t* handle);
void python_timer0_callback(uv_check_t* handle);
// libuv specific callback
void _uv_close_callback(uv_handle_t* handle);
void python_sigchld_callback(uv_signal_t* handle, int signum);
void python_queue_callback(uv_handle_t* handle, int revents);
// Standard gevent._ffi.loop callbacks.
int python_callback(GeventWatcherObject handle, int revents);
void python_handle_error(GeventWatcherObject handle, int revents);
void python_stop(GeventWatcherObject handle);
void python_check_callback(uv_check_t* handle);
void python_prepare_callback(uv_prepare_t* handle);
void python_timer0_callback(uv_check_t* handle);
// libuv specific callback
void _uv_close_callback(uv_handle_t* handle);
void python_sigchld_callback(uv_signal_t* handle, int signum);
void python_queue_callback(uv_handle_t* handle, int revents);
}
// A variable we fill in.
static void (*gevent_noop)(void* handle);
static void _gevent_signal_callback1(uv_signal_t* handle, int arg);
static void _gevent_async_callback0(uv_async_t* handle);
......
#include <string.h>
#include <assert.h>
#include "uv.h"
typedef void* GeventWatcherObject;
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunused"
#pragma clang diagnostic ignored "-Wunused-parameter"
#pragma clang diagnostic ignored "-Wundefined-internal"
#endif
static int python_callback(GeventWatcherObject handle, int revents);
static void python_queue_callback(uv_handle_t* watcher_ptr, int revents);
static void python_handle_error(GeventWatcherObject handle, int revents);
static void python_stop(GeventWatcherObject handle);
static void _gevent_noop(void* handle) {}
static void (*gevent_noop)(void* handle) = &_gevent_noop;
static void _gevent_generic_callback1_unused(uv_handle_t* watcher, int arg)
{
// Python code may set this to NULL or even change it
// out from under us, which would tend to break things.
GeventWatcherObject handle = watcher->data;
const int cb_result = python_callback(handle, arg);
switch(cb_result) {
case -1:
// in case of exception, call self.loop.handle_error;
// this function is also responsible for stopping the watcher
// and allowing memory to be freed
python_handle_error(handle, arg);
break;
case 1:
// Code to stop the event IF NEEDED. Note that if python_callback
// has disposed of the last reference to the handle,
// `watcher` could now be invalid/disposed memory!
if (!uv_is_active(watcher)) {
if (watcher->data != handle) {
if (watcher->data) {
// If Python set the data to NULL, then they
// expected to be stopped. That's fine.
// Otherwise, something weird happened.
fprintf(stderr,
"WARNING: gevent: watcher handle changed in callback "
"from %p to %p for watcher at %p of type %d\n",
handle, watcher->data, watcher, watcher->type);
// There's a very good chance that the object the
// handle referred to has been changed and/or the
// old handle has been deallocated (most common), so
// passing the old handle will crash. Instead we
// pass a sigil to let python distinguish this case.
python_stop(NULL);
}
}
else {
python_stop(handle);
}
}
break;
case 2:
// watcher is already stopped and dead, nothing to do.
break;
default:
fprintf(stderr,
"WARNING: gevent: Unexpected return value %d from Python callback "
"for watcher %p (of type %d) and handle %p\n",
cb_result,
watcher, watcher->type, handle);
// XXX: Possible leaking of resources here? Should we be
// closing the watcher?
}
}
static void _gevent_generic_callback1(uv_handle_t* watcher, int arg)
{
python_queue_callback(watcher, arg);
python_queue_callback(watcher, arg);
}
static void _gevent_generic_callback0(uv_handle_t* handle)
......@@ -181,3 +129,11 @@ static void gevent_zero_loop(uv_loop_t* handle)
{
memset(handle, 0, sizeof(uv_loop_t));
}
#ifdef __clang__
#pragma clang diagnostic pop
#endif
/* Local Variables: */
/* flycheck-clang-include-path: ("../../../deps/libuv/include") */
/* End: */
......@@ -43,12 +43,19 @@ class _Callbacks(AbstractCallbacks):
the_watcher.loop._queue_callback(watcher_ptr, revents)
def __loop_from_loop_ptr(self, loop_ptr):
loop_handle = loop_ptr.data
return self.from_handle(loop_handle)
_callbacks = assign_standard_callbacks(
ffi, libuv, _Callbacks,
[('python_sigchld_callback', None),
('python_timer0_callback', None),
('python_queue_callback', None)])
[
'python_sigchld_callback',
'python_timer0_callback',
'python_queue_callback',
]
)
from gevent._ffi.loop import EVENTS
GEVENT_CORE_EVENTS = EVENTS # export
......@@ -84,6 +91,14 @@ class loop(AbstractLoop):
# (+- 0.000036s)
approx_timer_resolution = 0.001 # 1ms
# It's relatively more expensive to break from the callback loop
# because we don't do it "inline" from C, we're looping in Python
CALLBACK_CHECK_COUNT = max(AbstractLoop.CALLBACK_CHECK_COUNT, 100)
# Defines the maximum amount of time the loop will sleep waiting for IO,
# which is also the interval at which signals are checked and handled.
SIGNAL_CHECK_INTERVAL_MS = 300
error_handler = None
_CHECK_POINTER = 'uv_check_t *'
......@@ -126,7 +141,7 @@ class loop(AbstractLoop):
# Track whether or not any object has destroyed
# this loop. See _can_destroy_default_loop
ptr.data = ptr
ptr.data = self._handle_to_self
return ptr
_signal_idle = None
......@@ -155,8 +170,8 @@ class loop(AbstractLoop):
sig_cb = ffi.cast('void(*)(uv_timer_t*)', libuv.python_check_callback)
libuv.uv_timer_start(self._signal_idle,
sig_cb,
300,
300)
self.SIGNAL_CHECK_INTERVAL_MS,
self.SIGNAL_CHECK_INTERVAL_MS)
libuv.uv_unref(self._signal_idle)
def _run_callbacks(self):
......@@ -464,13 +479,34 @@ class loop(AbstractLoop):
if mode == libuv.UV_RUN_DEFAULT:
while self._ptr and self._ptr.data:
# This is here to better preserve order guarantees. See _run_callbacks
# for details.
# It may get run again from the prepare watcher, so potentially we
# could take twice as long as the switch interval.
# This is here to better preserve order guarantees.
# See _run_callbacks for details.
# It may get run again from the prepare watcher, so
# potentially we could take twice as long as the
# switch interval.
# If we have *lots* of callbacks to run, we may not actually
# get through them all before we're requested to poll for IO;
# so in that case, just spin the loop once (UV_RUN_NOWAIT) and
# go again.
self._run_callbacks()
self._prepare_ran_callbacks = False
ran_status = libuv.uv_run(self._ptr, libuv.UV_RUN_ONCE)
# UV_RUN_ONCE will poll for IO, blocking for up to the time needed
# for the next timer to expire. Worst case, that's our _signal_idle
# timer, about 1/3 second. UV_RUN_ONCE guarantees that some forward progress
# is made, either by an IO watcher or a timer.
#
# In contrast, UV_RUN_NOWAIT makes no such guarantee, it only polls for IO once and
# immediately returns; it does not update the loop time or timers after
# polling for IO.
run_mode = (
libuv.UV_RUN_ONCE
if not self._callbacks and not self._queued_callbacks
else libuv.UV_RUN_NOWAIT
)
ran_status = libuv.uv_run(self._ptr, run_mode)
# Note that we run queued callbacks when the prepare watcher runs,
# thus accounting for timers that expired before polling for IO,
# and idle watchers. This next call should get IO callbacks and
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment