Commit 8c2ac5e9 authored by Kirill Smelkov's avatar Kirill Smelkov

time: Switch internals to pyx/nogil

- use .c.chan_double() which gives chan[double] pyx/nogil way to access
  Ticker.c and Timer.c. Use the channels via pyx/nogil API from inside.
- use pyx/nogil sleep and now;

This gets time.pyx codebase closer to be used from pyx/nogil mode.

NOTE: unless something like pyx/nogil memory management emerges[1] we
are relying on Python to manage memory of Ticker and Timer classes.
If we just spawn e.g. Ticker.__tick via pyx/nogil go, the thread that is
spawned won't be holding a reference to Ticker object, and once the
ticker goes out of scope in original thread (while its channel .c might
be still in scope), __tick will segfault accessing freed Ticker object.

To workaround it we use the following pattern:

    nogilready = chan(dtype='C.structZ')
    pygo(mymeth)
    nogilready.recv()

    def mymeth(MyObject self, pychan nogilready)
        with nogil:
            nogilready.chan_structZ().close()
            self._mymeth()
    cdef void _mymeth(MyObject self) nogil:
        ...

where python reference to MyObject will be held in spawned thread during
its lifetime, while the service provided by mymeth will be done under
nogil.

[1] https://www.nexedi.com/blog/NXD-Document.Blog.Cypclass
parent 7c929b25
...@@ -21,12 +21,12 @@ ...@@ -21,12 +21,12 @@
from __future__ import print_function, absolute_import from __future__ import print_function, absolute_import
from golang cimport pychan from golang cimport pychan, select, default, panic, topyexc
from golang cimport sync from golang cimport sync
from libc.math cimport INFINITY from libc.math cimport INFINITY
from cython cimport final from cython cimport final
from golang import go as pygo, select as pyselect, default as pydefault, panic as pypanic from golang import go as pygo, panic as pypanic
def pynow(): # -> t def pynow(): # -> t
...@@ -73,44 +73,56 @@ cdef class Ticker: ...@@ -73,44 +73,56 @@ cdef class Ticker:
cdef double _dt cdef double _dt
cdef sync.Mutex _mu cdef sync.Mutex _mu
cdef bint _stop cdef bint __stop
def __init__(Ticker self, double dt): def __init__(Ticker self, double dt):
if dt <= 0: if dt <= 0:
pypanic("ticker: dt <= 0") pypanic("ticker: dt <= 0")
self.c = pychan(1, dtype='C.double') # 1-buffer -- same as in Go self.c = pychan(1, dtype='C.double') # 1-buffer -- same as in Go
self._dt = dt self._dt = dt
self._stop = False self.__stop = False
pygo(self._tick) nogilready = pychan(dtype='C.structZ')
pygo(self.__tick, self, nogilready)
nogilready.recv()
# stop cancels the ticker. # stop cancels the ticker.
# #
# It is guaranteed that ticker channel is empty after stop completes. # It is guaranteed that ticker channel is empty after stop completes.
def stop(Ticker self): def stop(Ticker self):
_Ticker_stop_pyexc(self)
cdef void _stop(Ticker self) nogil:
c = self.c.chan_double()
self._mu.lock() self._mu.lock()
self._stop = True self.__stop = True
# drain what _tick could have been queued already # drain what __tick could have been queued already
while len(self.c) > 0: while c.len() > 0:
self.c.recv() c.recv()
self._mu.unlock() self._mu.unlock()
def _tick(Ticker self): cdef void __tick(Ticker self, pychan nogilready) except +topyexc:
with nogil:
nogilready.chan_structZ().close()
self.___tick()
cdef void ___tick(Ticker self) nogil:
c = self.c.chan_double()
while 1: while 1:
# XXX adjust for accumulated error δ? # XXX adjust for accumulated error δ?
pysleep(self._dt) sleep(self._dt)
self._mu.lock() self._mu.lock()
if self._stop: if self.__stop:
self._mu.unlock() self._mu.unlock()
return return
# send from under ._mu so that .stop can be sure there is no # send from under ._mu so that .stop can be sure there is no
# ongoing send while it drains the channel. # ongoing send while it drains the channel.
pyselect( t = now()
pydefault, select([
(self.c.send, pynow()), default,
) c.sends(&t),
])
self._mu.unlock() self._mu.unlock()
...@@ -151,6 +163,11 @@ cdef class Timer: ...@@ -151,6 +163,11 @@ cdef class Timer:
# guaranteed that after stop the function is not running - in such case # guaranteed that after stop the function is not running - in such case
# the caller must explicitly synchronize with that function to complete. # the caller must explicitly synchronize with that function to complete.
def stop(Timer self): # -> canceled def stop(Timer self): # -> canceled
return _Timer_stop_pyexc(self)
cdef bint _stop(Timer self) nogil: # -> canceled
cdef bint canceled
c = self.c.chan_double()
self._mu.lock() self._mu.lock()
if self._dt == INFINITY: if self._dt == INFINITY:
...@@ -160,9 +177,9 @@ cdef class Timer: ...@@ -160,9 +177,9 @@ cdef class Timer:
self._ver += 1 self._ver += 1
canceled = True canceled = True
# drain what _fire could have been queued already # drain what __fire could have been queued already
while len(self.c) > 0: while c.len() > 0:
self.c.recv() c.recv()
self._mu.unlock() self._mu.unlock()
return canceled return canceled
...@@ -171,18 +188,33 @@ cdef class Timer: ...@@ -171,18 +188,33 @@ cdef class Timer:
# #
# the timer must be either already stopped or expired. # the timer must be either already stopped or expired.
def reset(Timer self, double dt): def reset(Timer self, double dt):
_Timer_reset_pyexc(self, dt)
cdef void _reset(Timer self, double dt) nogil:
self._mu.lock() self._mu.lock()
if self._dt != INFINITY: if self._dt != INFINITY:
self._mu.unlock() self._mu.unlock()
pypanic("Timer.reset: the timer is armed; must be stopped or expired") panic("Timer.reset: the timer is armed; must be stopped or expired")
self._dt = dt self._dt = dt
self._ver += 1 self._ver += 1
pygo(self._fire, dt, self._ver) # FIXME uses gil.
# TODO rework timers so that new timer does not spawn new goroutine.
ok = False
with gil:
nogilready = pychan(dtype='C.structZ')
pygo(self.__fire, self, dt, self._ver, nogilready)
nogilready.recv()
ok = True
self._mu.unlock() self._mu.unlock()
if not ok:
panic("timer: reset: failed")
cdef void __fire(Timer self, double dt, int ver, pychan nogilready) except +topyexc:
def _fire(Timer self, double dt, int ver): with nogil:
pysleep(dt) nogilready.chan_structZ().close()
self.___fire(dt, ver)
cdef void ___fire(Timer self, double dt, int ver) nogil:
c = self.c.chan_double()
sleep(dt)
self._mu.lock() self._mu.lock()
if self._ver != ver: if self._ver != ver:
self._mu.unlock() self._mu.unlock()
...@@ -192,13 +224,16 @@ cdef class Timer: ...@@ -192,13 +224,16 @@ cdef class Timer:
# send under ._mu so that .stop can be sure that if it sees # send under ._mu so that .stop can be sure that if it sees
# ._dt = INFINITY, there is no ongoing .c send. # ._dt = INFINITY, there is no ongoing .c send.
if self._f is None: if self._f is None:
self.c.send(pynow()) c.send(now())
self._mu.unlock() self._mu.unlock()
return return
self._mu.unlock() self._mu.unlock()
# call ._f not from under ._mu not to deadlock e.g. if ._f wants to reset the timer. # call ._f not from under ._mu not to deadlock e.g. if ._f wants to reset the timer.
self._f() with gil:
ok = _callpyf(self._f)
if not ok:
panic("timer: fire: failed")
# ---- misc ---- # ---- misc ----
...@@ -209,9 +244,19 @@ pymillisecond = millisecond ...@@ -209,9 +244,19 @@ pymillisecond = millisecond
pyminute = minute pyminute = minute
pyhour = hour pyhour = hour
from golang cimport topyexc
cdef double now_pyexc() nogil except +topyexc: cdef double now_pyexc() nogil except +topyexc:
return now() return now()
cdef void sleep_pyexc(double dt) nogil except +topyexc: cdef void sleep_pyexc(double dt) nogil except +topyexc:
sleep(dt) sleep(dt)
cdef void _Ticker_stop_pyexc(Ticker t) nogil except +topyexc:
t._stop()
cdef bint _Timer_stop_pyexc (Timer t) nogil except +topyexc:
return t._stop()
cdef void _Timer_reset_pyexc(Timer t, double dt) nogil except +topyexc:
t._reset(dt)
cdef bint _callpyf(object f):
f()
return True
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment