Commit b073f6df authored by Kirill Smelkov's avatar Kirill Smelkov

time: Move/Port timers to C++/Pyx nogil

Provide time.Ticker, time.Timer and friends that can be used directly
from C++ and Pyx/nogil codes. Python-level classes become small wrapper
around pyx/nogil ones.

This is the first patch that moves to Pyx/nogil classes that are
dynamically allocated on heap. refptr<T> is used to automatically manage
lifetime of such objects. At Pyx level exposed API is very similar to
Python-one, while internally it uses refptr<T> and friends.
parent e614d641
...@@ -28,6 +28,9 @@ ...@@ -28,6 +28,9 @@
See also https://golang.org/pkg/time for Go time package documentation. See also https://golang.org/pkg/time for Go time package documentation.
""" """
from golang cimport chan, cbool, refptr
from libcpp cimport nullptr_t
# golang/pyx - the same as std python - represents time as float # golang/pyx - the same as std python - represents time as float
cdef extern from * nogil: cdef extern from * nogil:
# XXX how to declare/share constants without C verbatim? # XXX how to declare/share constants without C verbatim?
...@@ -53,3 +56,36 @@ cdef extern from * nogil: ...@@ -53,3 +56,36 @@ cdef extern from * nogil:
cdef extern from "golang/time.h" namespace "golang::time" nogil: cdef extern from "golang/time.h" namespace "golang::time" nogil:
void sleep(double dt) void sleep(double dt)
double now() double now()
chan[double] tick(double dt)
chan[double] after(double dt)
Timer after_func(double dt, ...) # ... = std::function<void()>
# pyx _Ticker = raw C++ Ticker
cppclass _Ticker "Ticker":
chan[double] c
void stop()
# pyx Ticker = C++ refptr<Ticker>
cppclass Ticker "golang::refptr<golang::time::Ticker>" (refptr[_Ticker]):
# Ticker.X = Ticker->X in C++.
chan[double] c "_ptr()->c"
void stop "_ptr()->stop" ()
Ticker new_ticker(double dt)
# pyx _Timer = raw C++ Timer
cppclass _Timer "Timer":
chan[double] c
cbool stop()
void reset(double dt)
# pyx Timer = C++ refptr<Timer>
cppclass Timer "golang::refptr<golang::time::Timer>" (refptr[_Timer]):
# Timer.X = Timer->X in C++.
chan[double] c "_ptr()->c"
cbool stop "_ptr()->stop" ()
void reset "_ptr()->reset" (double dt)
Timer new_timer(double dt)
...@@ -21,12 +21,12 @@ ...@@ -21,12 +21,12 @@
from __future__ import print_function, absolute_import from __future__ import print_function, absolute_import
from golang cimport pychan, select, default, panic, topyexc from golang cimport pychan, topyexc
from golang cimport sync from golang cimport sync
from libc.math cimport INFINITY from cpython cimport PyObject
from cython cimport final from cython cimport final
from golang import go as pygo, panic as pypanic import atexit as pyatexit
def pynow(): # -> t def pynow(): # -> t
...@@ -38,22 +38,19 @@ def pysleep(double dt): ...@@ -38,22 +38,19 @@ def pysleep(double dt):
# ---- timers ---- # ---- timers ----
# FIXME timers are implemented very inefficiently - each timer currently consumes a goroutine.
# tick returns channel connected to dt ticker. # tick returns channel connected to dt ticker.
# #
# Note: there is no way to stop created ticker. # Note: there is no way to stop created ticker.
# Note: for dt <= 0, contrary to Ticker, tick returns nil channel instead of panicking. # Note: for dt <= 0, contrary to Ticker, tick returns nil channel instead of panicking.
def pytick(double dt): # -> chan time def pytick(double dt): # -> chan time
if dt <= 0: return pychan.from_chan_double( tick(dt) )
return pychan._nil('C.double')
return PyTicker(dt).c
# after returns channel connected to dt timer. # after returns channel connected to dt timer.
# #
# Note: with after there is no way to stop/garbage-collect created timer until it fires. # Note: with after there is no way to stop/garbage-collect created timer until it fires.
def pyafter(double dt): # -> chan time def pyafter(double dt): # -> chan time
return PyTimer(dt).c return pychan.from_chan_double( after(dt) )
# after_func arranges to call f after dt time. # after_func arranges to call f after dt time.
# #
...@@ -69,61 +66,23 @@ def pyafter_func(double dt, f): # -> PyTimer ...@@ -69,61 +66,23 @@ def pyafter_func(double dt, f): # -> PyTimer
# Ticking can be canceled via .stop() . # Ticking can be canceled via .stop() .
@final @final
cdef class PyTicker: cdef class PyTicker:
cdef readonly pychan c # chan[double] cdef Ticker tx
cdef readonly pychan c # pychan wrapping tx.c
cdef double _dt
cdef sync.Mutex _mu
cdef bint __stop
def __init__(PyTicker pytx, double dt): def __init__(PyTicker pytx, double dt):
if dt <= 0: with nogil:
pypanic("ticker: dt <= 0") pytx.tx = new_ticker_pyexc(dt)
pytx.c = pychan(1, dtype='C.double') # 1-buffer -- same as in Go pytx.c = pychan.from_chan_double( pytx.tx.c )
pytx._dt = dt
pytx.__stop = False def __dealloc__(PyTicker pytx):
nogilready = pychan(dtype='C.structZ') pytx.tx = NULL
pygo(pytx.__tick, pytx, nogilready)
nogilready.recv()
# stop cancels the ticker. # stop cancels the ticker.
# #
# It is guaranteed that ticker channel is empty after stop completes. # It is guaranteed that ticker channel is empty after stop completes.
def stop(PyTicker pytx): def stop(PyTicker pytx):
_Ticker_stop_pyexc(pytx)
cdef void _stop(PyTicker pytx) nogil:
c = pytx.c.chan_double()
pytx._mu.lock()
pytx.__stop = True
# drain what __tick could have been queued already
while c.len() > 0:
c.recv()
pytx._mu.unlock()
cdef void __tick(PyTicker pytx, pychan nogilready) except +topyexc:
with nogil: with nogil:
nogilready.chan_structZ().close() ticker_stop_pyexc(pytx.tx)
pytx.___tick()
cdef void ___tick(PyTicker pytx) nogil:
c = pytx.c.chan_double()
while 1:
# XXX adjust for accumulated error δ?
sleep(pytx._dt)
pytx._mu.lock()
if pytx.__stop:
pytx._mu.unlock()
return
# send from under ._mu so that .stop can be sure there is no
# ongoing send while it drains the channel.
t = now()
select([
default,
c.sends(&t),
])
pytx._mu.unlock()
# Timer arranges for time event to be sent to .c channel after dt time. # Timer arranges for time event to be sent to .c channel after dt time.
...@@ -134,20 +93,19 @@ cdef class PyTicker: ...@@ -134,20 +93,19 @@ cdef class PyTicker:
# instead of event being sent to channel .c . # instead of event being sent to channel .c .
@final @final
cdef class PyTimer: cdef class PyTimer:
cdef readonly pychan c cdef Timer t
cdef readonly pychan c # pychan wrapping t.c
cdef object _f
cdef sync.Mutex _mu
cdef double _dt # +inf - stopped, otherwise - armed
cdef int _ver # current timer was armed by n'th reset
def __init__(PyTimer pyt, double dt, f=None): def __init__(PyTimer pyt, double dt, f=None):
pyt._f = f with nogil:
pyt.c = pychan(1, dtype='C.double') if f is None else \ if f is None:
pychan._nil('C.double') pyt.t = new_timer_pyexc(dt)
pyt._dt = INFINITY else:
pyt._ver = 0 pyt.t = _new_timer_pyfunc_pyexc(dt, <PyObject *>f)
pyt.reset(dt) pyt.c = pychan.from_chan_double( pyt.t.c )
def __dealloc__(PyTimer pyt):
pyt.t = NULL
# stop cancels the timer. # stop cancels the timer.
# #
...@@ -163,77 +121,152 @@ cdef class PyTimer: ...@@ -163,77 +121,152 @@ cdef class PyTimer:
# guaranteed that after stop the function is not running - in such case # guaranteed that after stop the function is not running - in such case
# the caller must explicitly synchronize with that function to complete. # the caller must explicitly synchronize with that function to complete.
def stop(PyTimer pyt): # -> canceled def stop(PyTimer pyt): # -> canceled
return _Timer_stop_pyexc(pyt) with nogil:
cdef bint _stop(PyTimer pyt) nogil: # -> canceled canceled = timer_stop_pyexc(pyt.t)
cdef bint canceled
c = pyt.c.chan_double()
pyt._mu.lock()
if pyt._dt == INFINITY:
canceled = False
else:
pyt._dt = INFINITY
pyt._ver += 1
canceled = True
# drain what __fire could have been queued already
while c.len() > 0:
c.recv()
pyt._mu.unlock()
return canceled return canceled
# reset rearms the timer. # reset rearms the timer.
# #
# the timer must be either already stopped or expired. # the timer must be either already stopped or expired.
def reset(PyTimer pyt, double dt): def reset(PyTimer pyt, double dt):
_Timer_reset_pyexc(pyt, dt)
cdef void _reset(PyTimer pyt, double dt) nogil:
pyt._mu.lock()
if pyt._dt != INFINITY:
pyt._mu.unlock()
panic("Timer.reset: the timer is armed; must be stopped or expired")
pyt._dt = dt
pyt._ver += 1
# FIXME uses gil.
# TODO rework timers so that new timer does not spawn new goroutine.
ok = False
with gil:
nogilready = pychan(dtype='C.structZ')
pygo(pyt.__fire, pyt, dt, pyt._ver, nogilready)
nogilready.recv()
ok = True
pyt._mu.unlock()
if not ok:
panic("timer: reset: failed")
cdef void __fire(PyTimer pyt, double dt, int ver, pychan nogilready) except +topyexc:
with nogil: with nogil:
nogilready.chan_structZ().close() timer_reset_pyexc(pyt.t, dt)
pyt.___fire(dt, ver)
cdef void ___fire(PyTimer pyt, double dt, int ver) nogil:
c = pyt.c.chan_double() # _PyFunc represents python function scheduled to be called via PyTimer(f=...).
sleep(dt) # _PyFunc can be used from nogil code.
pyt._mu.lock() # _PyFunc is safe to use wrt race to python interpreter shutdown.
if pyt._ver != ver: cdef extern from * nogil:
pyt._mu.unlock() """
return # the timer was stopped/resetted - don't fire it #include <golang/sync.h>
pyt._dt = INFINITY using namespace golang;
# send under ._mu so that .stop can be sure that if it sees #include <utility>
# ._dt = INFINITY, there is no ongoing .c send. using std::tuple;
if pyt._f is None: using std::make_tuple;
c.send(now()) using std::tie;
pyt._mu.unlock()
return // pyexited indicates whether Python interpreter exited.
pyt._mu.unlock() static sync::Mutex *pyexitedMu = new sync::Mutex(); // never freed not race at exit
static bool pyexited = false; // on mu dtor vs mu use
# call ._f not from under ._mu not to deadlock e.g. if ._f wants to reset the timer. """
with gil: sync.Mutex *pyexitedMu
ok = _callpyf(pyt._f) bint pyexited
if not ok:
panic("timer: fire: failed") cdef _time_pyatexit():
global pyexited
with nogil:
pyexitedMu.lock()
pyexited = True
pyexitedMu.unlock()
pyatexit.register(_time_pyatexit)
cdef extern from * nogil:
"""
// pygil_ensure is like `with gil` but also takes into account possibility
// of python interpreter shutdown.
static tuple<PyGILState_STATE, bool> pygil_ensure() {
PyGILState_STATE gstate;
// A C++ thread might still be running while python interpreter is stopped.
// Verify it is not the case not to crash in PyGILState_Ensure().
//
// Tell caller not to run any py code if python interpreter is gone and ignore any error.
// Python itself behaves the same way on threading cleanup - see e.g.
// comments in our _golang.pyx::__goviac() about that and also e.g.
// https://www.riverbankcomputing.com/pipermail/pyqt/2004-July/008196.html
pyexitedMu->lock();
if (pyexited) {
pyexitedMu->unlock();
return make_tuple(PyGILState_STATE(0), false);
}
gstate = PyGILState_Ensure();
pyexitedMu->unlock();
return make_tuple(gstate, true);
}
struct _PyFunc {
PyObject *pyf; // function to call; _PyFunc keeps 1 reference to f
// ctor.
// _PyFunc must be constructed while Python interpreter is alive.
_PyFunc(PyObject *pyf) {
PyGILState_STATE gstate = PyGILState_Ensure();
Py_INCREF(pyf);
this->pyf = pyf;
PyGILState_Release(gstate);
}
// all other methods may be called at any time, including when python
// interpreter is gone.
// copy
_PyFunc(const _PyFunc& from) {
PyGILState_STATE gstate;
bool ok;
tie(gstate, ok) = pygil_ensure();
if (!ok) {
pyf = NULL; // won't be used
return;
}
pyf = from.pyf;
Py_INCREF(pyf);
PyGILState_Release(gstate);
}
// dtor
~_PyFunc() {
PyGILState_STATE gstate;
bool ok;
tie(gstate, ok) = pygil_ensure();
PyObject *pyf = this->pyf;
this->pyf = NULL;
if (!ok) {
return;
}
Py_DECREF(pyf);
PyGILState_Release(gstate);
}
// call
void operator() () const {
PyGILState_STATE gstate;
bool ok;
// C++ timer thread might still be running while python interpreter is stopped.
// Verify it is not the case not to crash in PyGILState_Ensure().
//
// Don't call the function if python interpreter is gone - i.e. ignore error here.
// Python itself behaves the same way on threading cleanup - see
// _golang.pyx::__goviac and pygil_ensure for details.
tie(gstate, ok) = pygil_ensure();
if (!ok) {
return;
}
ok = true;
PyObject *ret = PyObject_CallFunction(pyf, NULL);
if (ret == NULL && !pyexited) {
PyErr_PrintEx(0);
ok = false;
}
Py_XDECREF(ret);
PyGILState_Release(gstate);
// XXX exception -> exit program with traceback (same as in go) ?
//if (!ok)
// panic("pycall failed");
}
};
"""
cppclass _PyFunc:
_PyFunc(PyObject *pyf)
# ---- misc ---- # ---- misc ----
...@@ -251,14 +284,16 @@ cdef nogil: ...@@ -251,14 +284,16 @@ cdef nogil:
void sleep_pyexc(double dt) except +topyexc: void sleep_pyexc(double dt) except +topyexc:
sleep(dt) sleep(dt)
void _Ticker_stop_pyexc(PyTicker t) except +topyexc: Ticker new_ticker_pyexc(double dt) except +topyexc:
t._stop() return new_ticker(dt)
bint _Timer_stop_pyexc (PyTimer t) except +topyexc: void ticker_stop_pyexc(Ticker tx) except +topyexc:
return t._stop() tx.stop()
void _Timer_reset_pyexc(PyTimer t, double dt) except +topyexc: Timer new_timer_pyexc(double dt) except +topyexc:
t._reset(dt) return new_timer(dt)
Timer _new_timer_pyfunc_pyexc(double dt, PyObject *pyf) except +topyexc:
return after_func(dt, _PyFunc(pyf))
cdef bint _callpyf(object f):
f() cbool timer_stop_pyexc(Timer t) except +topyexc:
return True return t.stop()
void timer_reset_pyexc(Timer t, double dt) except +topyexc:
t.reset(dt)
...@@ -22,9 +22,181 @@ ...@@ -22,9 +22,181 @@
#include "golang/time.h" #include "golang/time.h"
#include <math.h>
using std::function;
// golang::time:: (except sleep and now) // golang::time:: (except sleep and now)
namespace golang { namespace golang {
namespace time { namespace time {
// ---- timers ----
// FIXME timers are implemented very inefficiently - each timer currently consumes a goroutine.
refptr<Ticker> new_ticker(double dt);
refptr<Timer> new_timer (double dt);
refptr<Timer> _new_timer(double dt, function<void()>);
chan<double> tick(double dt) {
if (dt <= 0)
return NULL;
return new_ticker(dt)->c;
}
chan<double> after(double dt) {
return new_timer(dt)->c;
}
refptr<Timer> after_func(double dt, function<void()> f) {
return _new_timer(dt, f);
}
// Ticker
Ticker::Ticker() {}
Ticker::~Ticker() {}
void Ticker::decref() {
if (__decref())
delete this;
}
refptr<Ticker> new_ticker(double dt) {
if (dt <= 0)
panic("ticker: dt <= 0");
refptr<Ticker> tx = adoptref(new Ticker());
tx->c = makechan<double>(1); // 1-buffer -- same as in Go
tx->_dt = dt;
tx->_stop = false;
go([tx]() {
tx->_tick();
});
return tx;
}
void Ticker::stop() {
Ticker &tx = *this;
tx._mu.lock();
tx._stop = true;
// drain what _tick could have been queued already
while (tx.c.len() > 0)
tx.c.recv();
tx._mu.unlock();
}
void Ticker::_tick() {
Ticker &tx = *this;
while (1) {
// XXX adjust for accumulated error δ?
sleep(tx._dt);
tx._mu.lock();
if (tx._stop) {
tx._mu.unlock();
return;
}
// send from under ._mu so that .stop can be sure there is no
// ongoing send while it drains the channel.
double t = now();
select({
_default,
tx.c.sends(&t),
});
tx._mu.unlock();
}
}
// Timer
Timer::Timer() {}
Timer::~Timer() {}
void Timer::decref() {
if (__decref())
delete this;
}
refptr<Timer> _new_timer(double dt, function<void()> f) {
refptr<Timer> t = adoptref(new Timer());
t->c = (f == NULL ? makechan<double>(1) : NULL);
t->_f = f;
t->_dt = INFINITY;
t->_ver = 0;
t->reset(dt);
return t;
}
refptr<Timer> new_timer(double dt) {
return _new_timer(dt, NULL);
}
bool Timer::stop() {
Timer &t = *this;
bool canceled;
t._mu.lock();
if (t._dt == INFINITY) {
canceled = false;
}
else {
t._dt = INFINITY;
t._ver += 1;
canceled = true;
}
// drain what _fire could have been queued already
while (t.c.len() > 0)
t.c.recv();
t._mu.unlock();
return canceled;
}
void Timer::reset(double dt) {
Timer &t = *this;
t._mu.lock();
if (t._dt != INFINITY) {
t._mu.unlock();
panic("Timer.reset: the timer is armed; must be stopped or expired");
}
t._dt = dt;
t._ver += 1;
// TODO rework timers so that new timer does not spawn new goroutine.
refptr<Timer> tref = newref(&t); // pass t reference to spawned goroutine
go([tref, dt](int ver) {
tref->_fire(dt, ver);
}, t._ver);
t._mu.unlock();
}
void Timer::_fire(double dt, int ver) {
Timer &t = *this;
sleep(dt);
t._mu.lock();
if (t._ver != ver) {
t._mu.unlock();
return; // the timer was stopped/resetted - don't fire it
}
t._dt = INFINITY;
// send under ._mu so that .stop can be sure that if it sees
// ._dt = INFINITY, there is no ongoing .c send.
if (t._f == NULL) {
t.c.send(now());
t._mu.unlock();
return;
}
t._mu.unlock();
// call ._f not from under ._mu not to deadlock e.g. if ._f wants to reset the timer.
t._f();
}
}} // golang::time:: }} // golang::time::
...@@ -24,6 +24,9 @@ ...@@ -24,6 +24,9 @@
// //
// - `now` returns current time. // - `now` returns current time.
// - `sleep` pauses current task. // - `sleep` pauses current task.
// - `Ticker` and `Timer` provide timers integrated with channels.
// - `tick`, `after` and `after_func` are convenience wrappers to use
// tickers and timers easily.
// //
// See also https://golang.org/pkg/time for Go time package documentation. // See also https://golang.org/pkg/time for Go time package documentation.
// //
...@@ -37,6 +40,7 @@ ...@@ -37,6 +40,7 @@
#include <golang/libgolang.h> #include <golang/libgolang.h>
#include <golang/sync.h>
// ---- C-level API ---- // ---- C-level API ----
...@@ -69,6 +73,112 @@ LIBGOLANG_API void sleep(double dt); ...@@ -69,6 +73,112 @@ LIBGOLANG_API void sleep(double dt);
// now returns current time in seconds. // now returns current time in seconds.
LIBGOLANG_API double now(); LIBGOLANG_API double now();
class Ticker;
class Timer;
// tick returns channel connected to dt ticker.
//
// Note: there is no way to stop created ticker.
// Note: for dt <= 0, contrary to Ticker, tick returns nil channel instead of panicking.
LIBGOLANG_API chan<double> tick(double dt);
// after returns channel connected to dt timer.
//
// Note: with after there is no way to stop/garbage-collect created timer until it fires.
LIBGOLANG_API chan<double> after(double dt);
// after_func arranges to call f after dt time.
//
// The function will be called in its own goroutine.
// Returned timer can be used to cancel the call.
LIBGOLANG_API refptr<Timer> after_func(double dt, std::function<void()> f);
// new_ticker creates new Ticker that will be firing at dt intervals.
LIBGOLANG_API refptr<Ticker> new_ticker(double dt);
// Ticker arranges for time events to be sent to .c channel on dt-interval basis.
//
// If the receiver is slow, Ticker does not queue events and skips them.
// Ticking can be canceled via .stop() .
struct Ticker : refobj {
chan<double> c;
private:
double _dt;
sync::Mutex _mu;
bool _stop;
// don't new - create only via new_ticker()
private:
Ticker();
~Ticker();
friend refptr<Ticker> new_ticker(double dt);
public:
LIBGOLANG_API void decref();
public:
// stop cancels the ticker.
//
// It is guaranteed that ticker channel is empty after stop completes.
LIBGOLANG_API void stop();
private:
void _tick();
};
// new_timer creates new Timer that will fire after dt.
LIBGOLANG_API refptr<Timer> new_timer(double dt);
// Timer arranges for time event to be sent to .c channel after dt time.
//
// The timer can be stopped (.stop), or reinitialized to another time (.reset).
struct Timer : refobj {
chan<double> c;
private:
std::function<void()> _f;
sync::Mutex _mu;
double _dt; // +inf - stopped, otherwise - armed
int _ver; // current timer was armed by n'th reset
// don't new - create only via new_timer() & co
private:
Timer();
~Timer();
friend refptr<Timer> _new_timer(double dt, std::function<void()> f);
public:
LIBGOLANG_API void decref();
public:
// stop cancels the timer.
//
// It returns:
//
// False: the timer was already expired or stopped,
// True: the timer was armed and canceled by this stop call.
//
// Note: contrary to Go version, there is no need to drain timer channel
// after stop call - it is guaranteed that after stop the channel is empty.
//
// Note: similarly to Go, if Timer is used with function - it is not
// guaranteed that after stop the function is not running - in such case
// the caller must explicitly synchronize with that function to complete.
LIBGOLANG_API bool stop();
// reset rearms the timer.
//
// the timer must be either already stopped or expired.
LIBGOLANG_API void reset(double dt);
private:
void _fire(double dt, int ver);
};
}} // golang::time:: }} // golang::time::
#endif // __cplusplus #endif // __cplusplus
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment