Commit 3b241983 authored by Kirill Smelkov's avatar Kirill Smelkov

Port/move channels to C/C++/Pyx

- Move channels implementation to be done in C++ inside libgolang. The
  code and logic is based on previous Python-level channels
  implementation, but the new code is just C++ and does not depend on
  Python nor GIL at all, and so works without GIL if libgolang
  runtime works without GIL(*).

  (*) for example "thread" runtime works without GIL, while "gevent" runtime
      acquires GIL on every semaphore acquire.

  New channels implementation is located in δ(libgolang.cpp).

- Provide low-level C channels API to the implementation. The low-level
  C API was inspired by Libtask[1] and Plan9/Libthread[2].

  [1] Libtask: a Coroutine Library for C and Unix. https://swtch.com/libtask.
  [2] http://9p.io/magic/man2html/2/thread.

- Provide high-level C++ channels API that provides type-safety and
  automatic channel lifetime management.

  Overview of C and C++ APIs are in δ(libgolang.h).

- Expose C++ channels API at Pyx level as Cython/nogil API so that Cython
  programs could use channels with ease and without need to care about
  lifetime management and low-level details.

  Overview of Cython/nogil channels API is in δ(README.rst) and
  δ(_golang.pxd).

- Turn Python channels to be tiny wrapper around chan<PyObject>.

Implementation note:

- gevent case needs special care because greenlet, which gevent uses,
  swaps coroutine stack from C stack to heap on coroutine park, and
  replaces that space on C stack with stack of activated coroutine
  copied back from heap. This way if an object on g's stack is accessed
  while g is parked it would be memory of another g's stack.

  The channels implementation explicitly cares about this issue so that
  stack -> * channel send, or * -> stack channel receive work correctly.

  It should be noted that greenlet approach, which it inherits from
  stackless, is not only a bit tricky, but also comes with overhead
  (stack <-> heap copy), and prevents a coroutine to migrate from 1 OS
  thread to another OS thread as that would change addresses of on-stack
  things for that coroutine.

  As the latter property prevents to use multiple CPUs even if the
  program / runtime are prepared to work without GIL, it would be more
  logical to change gevent/greenlet to use separate stack for each
  coroutine. That would remove stack <-> heap copy and the need for
  special care in channels implementation for stack - stack sends.
  Such approach should be possible to implement with e.g. swapcontext or
  similar mechanism, and a proof of concept of such work wrapped into
  greenlet-compatible API exists[3]. It would be good if at some point
  there would be a chance to explore such approach in Pygolang context.

  [3] https://github.com/python-greenlet/greenlet/issues/113#issuecomment-264529838 and below

Just this patch brings in the following speedup at Python level:

 (on i7@2.6GHz)

thread runtime:

    name             old time/op  new time/op  delta
    go               20.0µs ± 1%  15.6µs ± 1%  -21.84%  (p=0.000 n=10+10)
    chan             9.37µs ± 4%  2.89µs ± 6%  -69.12%  (p=0.000 n=10+10)
    select           20.2µs ± 4%   3.4µs ± 5%  -83.20%  (p=0.000 n=8+10)
    def              58.0ns ± 0%  60.0ns ± 0%   +3.45%  (p=0.000 n=8+10)
    func_def         43.8µs ± 1%  43.9µs ± 1%     ~     (p=0.796 n=10+10)
    call             62.4ns ± 1%  63.5ns ± 1%   +1.76%  (p=0.001 n=10+10)
    func_call        1.06µs ± 1%  1.05µs ± 1%   -0.63%  (p=0.002 n=10+10)
    try_finally       136ns ± 0%   137ns ± 0%   +0.74%  (p=0.000 n=9+10)
    defer            2.28µs ± 1%  2.33µs ± 1%   +2.34%  (p=0.000 n=10+10)
    workgroup_empty  48.2µs ± 1%  34.1µs ± 2%  -29.18%  (p=0.000 n=9+10)
    workgroup_raise  58.9µs ± 1%  45.5µs ± 1%  -22.74%  (p=0.000 n=10+10)

gevent runtime:

    name             old time/op  new time/op  delta
    go               24.7µs ± 1%  15.9µs ± 1%  -35.72%  (p=0.000 n=9+9)
    chan             11.6µs ± 1%   7.3µs ± 1%  -36.74%  (p=0.000 n=10+10)
    select           22.5µs ± 1%  10.4µs ± 1%  -53.73%  (p=0.000 n=10+10)
    def              55.0ns ± 0%  55.0ns ± 0%     ~     (all equal)
    func_def         43.6µs ± 1%  43.6µs ± 1%     ~     (p=0.684 n=10+10)
    call             63.0ns ± 0%  64.0ns ± 0%   +1.59%  (p=0.000 n=10+10)
    func_call        1.06µs ± 1%  1.07µs ± 1%   +0.45%  (p=0.045 n=10+9)
    try_finally       135ns ± 0%   137ns ± 0%   +1.48%  (p=0.000 n=10+10)
    defer            2.31µs ± 1%  2.33µs ± 1%   +0.89%  (p=0.000 n=10+10)
    workgroup_empty  70.2µs ± 0%  55.8µs ± 0%  -20.63%  (p=0.000 n=10+10)
    workgroup_raise  90.3µs ± 0%  70.9µs ± 1%  -21.51%  (p=0.000 n=9+10)

The whole Cython/nogil work - starting from 8fa3c15b (Start using Cython
and providing Cython/nogil API) to this patch - brings in the following
speedup at Python level:

 (on i7@2.6GHz)

thread runtime:

    name             old time/op  new time/op  delta
    go               92.9µs ± 1%  15.6µs ± 1%  -83.16%  (p=0.000 n=10+10)
    chan             13.9µs ± 1%   2.9µs ± 6%  -79.14%  (p=0.000 n=10+10)
    select           29.7µs ± 6%   3.4µs ± 5%  -88.55%  (p=0.000 n=10+10)
    def              57.0ns ± 0%  60.0ns ± 0%   +5.26%  (p=0.000 n=10+10)
    func_def         44.0µs ± 1%  43.9µs ± 1%     ~     (p=0.055 n=10+10)
    call             63.5ns ± 1%  63.5ns ± 1%     ~     (p=1.000 n=10+10)
    func_call        1.06µs ± 0%  1.05µs ± 1%   -1.31%  (p=0.000 n=10+10)
    try_finally       139ns ± 0%   137ns ± 0%   -1.44%  (p=0.000 n=10+10)
    defer            2.36µs ± 1%  2.33µs ± 1%   -1.26%  (p=0.000 n=10+10)
    workgroup_empty  98.4µs ± 1%  34.1µs ± 2%  -65.32%  (p=0.000 n=10+10)
    workgroup_raise   135µs ± 1%    46µs ± 1%  -66.35%  (p=0.000 n=10+10)

gevent runtime:

    name             old time/op  new time/op  delta
    go               68.8µs ± 1%  15.9µs ± 1%  -76.91%  (p=0.000 n=10+9)
    chan             14.8µs ± 1%   7.3µs ± 1%  -50.67%  (p=0.000 n=10+10)
    select           32.0µs ± 0%  10.4µs ± 1%  -67.57%  (p=0.000 n=10+10)
    def              58.0ns ± 0%  55.0ns ± 0%   -5.17%  (p=0.000 n=10+10)
    func_def         43.9µs ± 1%  43.6µs ± 1%   -0.53%  (p=0.035 n=10+10)
    call             63.5ns ± 1%  64.0ns ± 0%   +0.79%  (p=0.033 n=10+10)
    func_call        1.08µs ± 1%  1.07µs ± 1%   -1.74%  (p=0.000 n=10+9)
    try_finally       142ns ± 0%   137ns ± 0%   -3.52%  (p=0.000 n=10+10)
    defer            2.32µs ± 1%  2.33µs ± 1%   +0.71%  (p=0.005 n=10+10)
    workgroup_empty  90.3µs ± 0%  55.8µs ± 0%  -38.26%  (p=0.000 n=10+10)
    workgroup_raise   108µs ± 1%    71µs ± 1%  -34.64%  (p=0.000 n=10+10)

This patch is the final patch in series to reach the goal of providing
channels that could be used in Cython/nogil code.

Cython/nogil channels work is dedicated to the memory of Вера Павловна Супрун[4].

[4] https://navytux.spb.ru/memory/%D0%A2%D1%91%D1%82%D1%8F%20%D0%92%D0%B5%D1%80%D0%B0.pdf#page=3
parent 9efb6575
...@@ -170,7 +170,7 @@ located in `src/` under `$GOPATH`. ...@@ -170,7 +170,7 @@ located in `src/` under `$GOPATH`.
Cython/nogil API Cython/nogil API
---------------- ----------------
Cython package `golang` provides *nogil* API with goroutines and Cython package `golang` provides *nogil* API with goroutines, channels and
other features that mirror corresponding Python package. Cython API is not only other features that mirror corresponding Python package. Cython API is not only
faster compared to Python version, but also, due to *nogil* property, allows to faster compared to Python version, but also, due to *nogil* property, allows to
build concurrent systems without limitations imposed by Python's GIL. All that build concurrent systems without limitations imposed by Python's GIL. All that
...@@ -178,14 +178,59 @@ while still programming in Python-like language. Brief description of ...@@ -178,14 +178,59 @@ while still programming in Python-like language. Brief description of
Cython/nogil API follows: Cython/nogil API follows:
`go` spawns new task - a coroutine, or thread, depending on activated runtime. `go` spawns new task - a coroutine, or thread, depending on activated runtime.
For example:: `chan[T]` represents a channel with Go semantic and elements of type `T`.
Use `makechan[T]` to create new channel, and `chan[T].recv`, `chan[T].send`,
`chan[T].close` for communication. `nil` stands for the nil channel. `select`
can be used to multiplex on several channels. For example::
cdef nogil: cdef nogil:
void worker(): struct Point:
pass int x
int y
void worker(chan[int] chi, chan[Point] chp):
chi.send(1)
cdef Point p
p.x = 3
p.y = 4
chp.send(p)
void myfunc(): void myfunc():
go(worker) cdef chan[int] chi = makechan[int]() # synchronous channel of integers
cdef chan[Point] chp = makechan[Point](3) # channel with buffer of size 3 and Point elements
go(worker, chi, chp)
i = chi.recv() # will give 1
p = chp.recv() # will give Point(3,4)
chp = nil # rebind chp to nil channel
cdef cbool ok
cdef int j = 33
_ = select([
chi.recvs(&i) # 0
chi.recvs(&i, &ok), # 1
chi.sends(&j), # 2
chp.recvs(&p), # 3
default, # 4
])
if _ == 0:
# i is what was received from chi
...
if _ == 1:
# (i, ok) is what was received from chi
...
if _ == 2:
# we know j was sent to chi
...
if _ == 3:
# this case will be never selected because
# send/recv on nil channel block forever.
...
if _ == 4:
# default case
...
`panic` stops normal execution of current goroutine by throwing a C-level `panic` stops normal execution of current goroutine by throwing a C-level
exception. On Python/C boundaries C-level exceptions have to be converted to exception. On Python/C boundaries C-level exceptions have to be converted to
......
...@@ -23,6 +23,7 @@ Cython/nogil API ...@@ -23,6 +23,7 @@ Cython/nogil API
---------------- ----------------
- `go` spawns lightweight thread. - `go` spawns lightweight thread.
- `chan[T]`, `makechan[T]` and `select` provide C-level channels with Go semantic.
- `panic` stops normal execution of current goroutine by throwing a C-level exception. - `panic` stops normal execution of current goroutine by throwing a C-level exception.
Everything in Cython/nogil API do not depend on Python runtime and in Everything in Cython/nogil API do not depend on Python runtime and in
...@@ -38,10 +39,16 @@ Golang.py runtime ...@@ -38,10 +39,16 @@ Golang.py runtime
In addition to Cython/nogil API, golang.pyx provides runtime for golang.py: In addition to Cython/nogil API, golang.pyx provides runtime for golang.py:
- Python-level channels are represented by pychan + pyselect.
- Python-level panic is represented by pypanic. - Python-level panic is represented by pypanic.
""" """
from libcpp cimport nullptr_t, nullptr as nil
from libcpp.utility cimport pair
cdef extern from *:
ctypedef bint cbool "bool"
# nogil pyx-level golang API. # nogil pyx-level golang API.
# #
# NOTE even though many functions may panic (= throw C++ exception) nothing is # NOTE even though many functions may panic (= throw C++ exception) nothing is
...@@ -58,14 +65,63 @@ cdef extern from "golang/libgolang.h" namespace "golang" nogil: ...@@ -58,14 +65,63 @@ cdef extern from "golang/libgolang.h" namespace "golang" nogil:
void go(...) # typechecking is done by C void go(...) # typechecking is done by C
struct _chan
cppclass chan[T]:
chan();
# send/recv/close
void send(const T&)
T recv()
pair[T, cbool] recv_()
void close()
# send/recv in select
_selcase sends(const T *ptx)
_selcase recvs()
_selcase recvs(T* prx)
_selcase recvs(T* prx, cbool *pok)
# length/capacity
unsigned len()
unsigned cap()
# compare wrt nil; =nil
cbool operator==(nullptr_t)
cbool operator!=(nullptr_t)
void operator=(nullptr_t)
# for tests
_chan *_rawchan()
chan[T] makechan[T]()
chan[T] makechan[T](unsigned size)
struct structZ:
pass
enum _chanop:
_CHANSEND
_CHANRECV
_DEFAULT
struct _selcase:
_chanop op
void *data
cbool *rxok
const _selcase default "golang::_default"
int select(_selcase casev[])
# ---- python bits ---- # ---- python bits ----
cdef void topyexc() except * cdef void topyexc() except *
cpdef pypanic(arg) cpdef pypanic(arg)
# pychan is chan<object>
from cpython cimport PyObject
ctypedef PyObject *pPyObject # https://github.com/cython/cython/issues/534
from cython cimport final from cython cimport final
@final @final
cdef class pychan: cdef class pychan:
cdef dict __dict__ cdef chan[pPyObject] ch
...@@ -33,10 +33,15 @@ from __future__ import print_function, absolute_import ...@@ -33,10 +33,15 @@ from __future__ import print_function, absolute_import
_init_libgolang() _init_libgolang()
from cpython cimport Py_INCREF, Py_DECREF, PY_MAJOR_VERSION from cpython cimport Py_INCREF, Py_DECREF, PY_MAJOR_VERSION
cdef extern from "Python.h":
ctypedef struct PyTupleObject:
PyObject **ob_item
void Py_FatalError(const char *msg)
from libcpp.vector cimport vector
from cython cimport final from cython cimport final
import sys import sys
import threading, collections, random
# ---- panic ---- # ---- panic ----
...@@ -136,311 +141,98 @@ cdef void __goviac(void *arg) nogil: ...@@ -136,311 +141,98 @@ cdef void __goviac(void *arg) nogil:
# ---- channels ---- # ---- channels ----
# _RecvWaiting represents a receiver waiting on a chan. # pychan is chan<object>.
class _RecvWaiting(object): @final
# .group _WaitGroup group of waiters this receiver is part of cdef class pychan:
# .chan chan channel receiver is waiting on def __cinit__(pych, size=0):
# pych.ch = makechan_pyobj_pyexc(size)
# on wakeup: sender|closer -> receiver:
# .rx_ rx_ for recv def __dealloc__(pych):
def __init__(self, group, ch): # on del: drain buffered channel to decref sent objects.
self.group = group # verify that the channel is not connected anywhere outside us.
self.chan = ch # (if it was present also somewhere else - draining would be incorrect)
group.register(self) if pych.ch == nil:
return
# wakeup notifies waiting receiver that recv_ completed. cdef int refcnt = _chanrefcnt(pych.ch._rawchan())
def wakeup(self, rx, ok): if refcnt != 1:
self.rx_ = (rx, ok) # cannot raise py-level exception in __dealloc__
self.group.wakeup() Py_FatalError("pychan.__dealloc__: chan.refcnt=%d ; must be =1" % refcnt)
# _SendWaiting represents a sender waiting on a chan.
class _SendWaiting(object):
# .group _WaitGroup group of waiters this sender is part of
# .chan chan channel sender is waiting on
# .obj object that was passed to send
#
# on wakeup: receiver|closer -> sender:
# .ok bool whether send succeeded (it will not on close)
def __init__(self, group, ch, obj):
self.group = group
self.chan = ch
self.obj = obj
group.register(self)
# wakeup notifies waiting sender that send completed.
def wakeup(self, ok):
self.ok = ok
self.group.wakeup()
# _WaitGroup is a group of waiting senders and receivers.
#
# Only 1 waiter from the group can succeed waiting.
class _WaitGroup(object):
# ._waitv [] of _{Send|Recv}Waiting
# ._sema semaphore used for wakeup
#
# ._mu lock NOTE ∀ chan order is always: chan._mu > ._mu
#
# on wakeup: sender|receiver -> group:
# .which _{Send|Recv}Waiting instance which succeeded waiting.
def __init__(self):
self._waitv = []
self._sema = threading.Lock() # in python it is valid to release lock from another thread.
self._sema.acquire()
self._mu = threading.Lock()
self.which = None
def register(self, wait):
self._waitv.append(wait)
# try_to_win tries to win waiter after it was dequeued from a channel's {_send|_recv}q.
#
# -> ok: true if won, false - if not.
def try_to_win(self, waiter):
with self._mu:
if self.which is not None:
return False
else:
self.which = waiter
return True
# wait waits for winning case of group to complete.
def wait(self):
self._sema.acquire()
# wakeup wakes up the group.
#
# prior to wakeup try_to_win must have been called.
# in practice this means that waiters queued to chan.{_send|_recv}q must
# be dequeued with _dequeWaiter.
def wakeup(self):
assert self.which is not None
self._sema.release()
# dequeAll removes all registered waiters from their wait queues.
def dequeAll(self):
for w in self._waitv:
ch = w.chan
if isinstance(w, _SendWaiting):
queue = ch._sendq
else:
assert isinstance(w, _RecvWaiting)
queue = ch._recvq
with ch._mu:
try:
queue.remove(w)
except ValueError:
pass
# _dequeWaiter dequeues a send or recv waiter from a channel's _recvq or _sendq. cdef chan[pPyObject] ch = pych.ch
# pych.ch = nil # does _chanxdecref(ch)
# the channel owning {_recv|_send}q must be locked.
def _dequeWaiter(queue):
while len(queue) > 0:
w = queue.popleft()
# if this waiter can win its group - return it.
# if not - someone else from its group already has won, and so we anyway have
# to remove the waiter from the queue.
if w.group.try_to_win(w):
return w
return None cdef PyObject *_rx
while ch.len() != 0:
# NOTE *not* chanrecv_pyexc(ch):
# - recv must not block and must not panic as we verified that we
# are the only holder of the channel and that ch buffer is not empty.
# - even if recv panics, we cannot convert that panic to python
# exception in __dealloc__. So if it really panics - let the
# panic make it and crash the process similarly to Py_FatalError above.
_rx = ch.recv()
Py_DECREF(<object>_rx)
# ch is decref'ed automatically at return
# pychan is Python channel with Go semantic.
@final
cdef class pychan:
# ._cap channel capacity
# ._mu lock
# ._dataq deque *: data buffer
# ._recvq deque _RecvWaiting: blocked receivers
# ._sendq deque _SendWaiting: blocked senders
# ._closed bool
def __init__(ch, size=0):
ch._cap = size
ch._mu = threading.Lock()
ch._dataq = collections.deque()
ch._recvq = collections.deque()
ch._sendq = collections.deque()
ch._closed = False
# send sends object to a receiver. # send sends object to a receiver.
def send(ch, obj): def send(pych, obj):
if ch is pynilchan: # increment obj reference count - until received the channel is holding pointer to the object.
_blockforever() Py_INCREF(obj)
ch._mu.acquire()
if 1:
ok = ch._trysend(obj)
if ok:
return
g = _WaitGroup()
me = _SendWaiting(g, ch, obj)
ch._sendq.append(me)
ch._mu.release()
g.wait() try:
assert g.which is me with nogil:
if not me.ok: chansend_pyexc(pych.ch, <PyObject *>obj)
pypanic("send on closed channel") except: # not only _PanicError as send can also throw e.g. bad_alloc
# the object was not sent - e.g. it was "send on a closed channel"
Py_DECREF(obj)
raise
# recv_ is "comma-ok" version of recv. # recv_ is "comma-ok" version of recv.
# #
# ok is true - if receive was delivered by a successful send. # ok is true - if receive was delivered by a successful send.
# ok is false - if receive is due to channel being closed and empty. # ok is false - if receive is due to channel being closed and empty.
def recv_(ch): # -> (rx, ok) def recv_(pych): # -> (rx, ok)
if ch is pynilchan: cdef PyObject *_rx = NULL
_blockforever() cdef bint ok
ch._mu.acquire() with nogil:
if 1: _rx, ok = chanrecv__pyexc(pych.ch)
rx_, ok = ch._tryrecv()
if ok:
return rx_
g = _WaitGroup()
me = _RecvWaiting(g, ch)
ch._recvq.append(me)
ch._mu.release() if not ok:
return (None, ok)
g.wait() # we received the object and the channel dropped pointer to it.
assert g.which is me rx = <object>_rx
return me.rx_ Py_DECREF(rx)
return (rx, ok)
# recv receives from the channel. # recv receives from the channel.
def recv(ch): # -> rx def recv(pych): # -> rx
rx, _ = ch.recv_() rx, _ = pych.recv_() # TODO call recv_ via C
return rx return rx
# _trysend(obj) -> ok # close closes sending side of the channel.
# def close(pych):
# must be called with ._mu held. with nogil:
# if ok or panic - returns with ._mu released. chanclose_pyexc(pych.ch)
# if !ok - returns with ._mu still being held.
def _trysend(ch, obj):
if ch._closed:
ch._mu.release()
pypanic("send on closed channel")
# synchronous channel
if ch._cap == 0:
recv = _dequeWaiter(ch._recvq)
if recv is None:
return False
ch._mu.release()
recv.wakeup(obj, True)
return True
# buffered channel
else:
if len(ch._dataq) >= ch._cap:
return False
ch._dataq.append(obj)
recv = _dequeWaiter(ch._recvq)
if recv is not None:
rx = ch._dataq.popleft()
ch._mu.release()
recv.wakeup(rx, True)
else:
ch._mu.release()
return True
# _tryrecv() -> rx_=(rx, ok), ok
#
# must be called with ._mu held.
# if ok or panic - returns with ._mu released.
# if !ok - returns with ._mu still being held.
def _tryrecv(ch):
# buffered
if len(ch._dataq) > 0:
rx = ch._dataq.popleft()
# wakeup a blocked writer, if there is any
send = _dequeWaiter(ch._sendq)
if send is not None:
ch._dataq.append(send.obj)
ch._mu.release()
send.wakeup(True)
else:
ch._mu.release()
return (rx, True), True
# closed
if ch._closed:
ch._mu.release()
return (None, False), True
# sync | empty: there is waiting writer
send = _dequeWaiter(ch._sendq)
if send is None:
return (None, False), False
ch._mu.release()
rx = send.obj
send.wakeup(True)
return (rx, True), True
def __len__(pych):
return chanlen_pyexc(pych.ch)
# close closes sending side of the channel. def __repr__(pych):
def close(ch): if pych.ch == nil:
if ch is pynilchan:
pypanic("close of nil channel")
recvv = []
sendv = []
with ch._mu:
if ch._closed:
pypanic("close of closed channel")
ch._closed = True
# schedule: wake-up all readers
while 1:
recv = _dequeWaiter(ch._recvq)
if recv is None:
break
recvv.append(recv)
# schedule: wake-up all writers (they will panic)
while 1:
send = _dequeWaiter(ch._sendq)
if send is None:
break
sendv.append(send)
# perform scheduled wakeups outside of ._mu
for recv in recvv:
recv.wakeup(None, False)
for send in sendv:
send.wakeup(False)
def __len__(ch):
return len(ch._dataq)
def __repr__(ch):
if ch is pynilchan:
return "nilchan" return "nilchan"
else: else:
return super(pychan, ch).__repr__() return super(pychan, pych).__repr__()
# pynilchan is the nil py channel. # pynilchan is the nil py channel.
# #
# On nil channel: send/recv block forever; close panics. # On nil channel: send/recv block forever; close panics.
pynilchan = pychan(None) # TODO -> <chan*>(NULL) after move to Cython cdef pychan _pynilchan = pychan()
_pynilchan.ch = chan[pPyObject]() # = NULL
pynilchan = _pynilchan
# pydefault represents default case for pyselect. # pydefault represents default case for pyselect.
...@@ -474,164 +266,90 @@ pydefault = object() ...@@ -474,164 +266,90 @@ pydefault = object()
# # default case # # default case
# ... # ...
def pyselect(*pycasev): def pyselect(*pycasev):
# select promise: if multiple cases are ready - one will be selected randomly cdef int i, n = len(pycasev), selected
npycasev = list(enumerate(pycasev)) cdef vector[_selcase] casev = vector[_selcase](n)
random.shuffle(npycasev) cdef pychan pych
cdef PyObject *_rx = NULL # all select recvs are setup to receive into _rx
# first pass: poll all cases and bail out in the end if default was provided cdef cbool rxok = False # (its ok as only one receive will be actually executed)
recvv = [] # [](n, ch, commaok)
sendv = [] # [](n, ch, tx) # prepare casev for chanselect
ndefault = None for i in range(n):
for (n, pycase) in npycasev: pycase = pycasev[i]
# default: remember we have it # default
if pycase is pydefault: if pycase is pydefault:
if ndefault is not None: casev[i] = default
pypanic("pyselect: multiple default")
ndefault = n
# send # send
elif type(pycase) is tuple: elif type(pycase) is tuple:
if len(pycase) != 2: if len(pycase) != 2:
pypanic("pyselect: invalid [%d]() case" % len(pycase)) pypanic("pyselect: invalid [%d]() case" % len(pycase))
_tcase = <PyTupleObject *>pycase
pysend, tx = pycase pysend = <object>(_tcase.ob_item[0])
if pysend.__self__.__class__ is not pychan: if pysend.__self__.__class__ is not pychan:
pypanic("pyselect: send on non-chan: %r" % (pysend.__self__.__class__,)) pypanic("pyselect: send on non-chan: %r" % (pysend.__self__.__class__,))
ch = pysend.__self__ pych = pysend.__self__
if pysend.__name__ != "send": # XXX better check PyCFunction directly if pysend.__name__ != "send": # XXX better check PyCFunction directly
pypanic("pyselect: send expected: %r" % (pysend,)) pypanic("pyselect: send expected: %r" % (pysend,))
if ch is not pynilchan: # nil chan is never ready # wire ptx through pycase[1]
ch._mu.acquire() p_tx = &(_tcase.ob_item[1])
if 1: tx = <object>(p_tx[0])
ok = ch._trysend(tx)
if ok:
return n, None
ch._mu.release()
sendv.append((n, ch, tx)) # incref tx as if corresponding channel is holding pointer to the object while it is being sent.
# we'll decref the object if it won't be sent.
# see pychan.send for details.
Py_INCREF(tx)
casev[i] = pych.ch.sends(p_tx)
# recv # recv
else: else:
pyrecv = pycase pyrecv = pycase
if pyrecv.__self__.__class__ is not pychan: if pyrecv.__self__.__class__ is not pychan:
pypanic("pyselect: recv on non-chan: %r" % (pyrecv.__self__.__class__,)) pypanic("pyselect: recv on non-chan: %r" % (pyrecv.__self__.__class__,))
ch = pyrecv.__self__ pych = pyrecv.__self__
if pyrecv.__name__ == "recv": # XXX better check PyCFunction directly if pyrecv.__name__ == "recv": # XXX better check PyCFunction directly
commaok = False casev[i] = pych.ch.recvs(&_rx)
elif pyrecv.__name__ == "recv_": # XXX better check PyCFunction directly elif pyrecv.__name__ == "recv_": # XXX better check PyCFunction directly
commaok = True casev[i] = pych.ch.recvs(&_rx, &rxok)
else: else:
pypanic("pyselect: recv expected: %r" % (pyrecv,)) pypanic("pyselect: recv expected: %r" % (pyrecv,))
if ch is not pynilchan: # nil chan is never ready selected = -1
ch._mu.acquire()
if 1:
rx_, ok = ch._tryrecv()
if ok:
if not commaok:
rx, ok = rx_
rx_ = rx
return n, rx_
ch._mu.release()
recvv.append((n, ch, commaok))
# execute default if we have it
if ndefault is not None:
return ndefault, None
# select{} or with nil-channels only -> block forever
if len(recvv) + len(sendv) == 0:
_blockforever()
# second pass: subscribe and wait on all rx/tx cases
g = _WaitGroup()
# selected returns what was selected in g.
# the return signature is the one of select.
def selected():
g.wait()
sel = g.which
if isinstance(sel, _SendWaiting):
if not sel.ok:
pypanic("send on closed channel")
return sel.sel_n, None
if isinstance(sel, _RecvWaiting):
rx_ = sel.rx_
if not sel.sel_commaok:
rx, ok = rx_
rx_ = rx
return sel.sel_n, rx_
raise AssertionError("select: unreachable")
try: try:
for n, ch, tx in sendv: with nogil:
ch._mu.acquire() selected = _chanselect_pyexc(&casev[0], casev.size())
with g._mu:
# a case that we previously queued already won
if g.which is not None:
ch._mu.release()
return selected()
ok = ch._trysend(tx)
if ok:
# don't let already queued cases win
g.which = "tx prepoll won" # !None
return n, None
w = _SendWaiting(g, ch, tx)
w.sel_n = n
ch._sendq.append(w)
ch._mu.release()
for n, ch, commaok in recvv:
ch._mu.acquire()
with g._mu:
# a case that we previously queued already won
if g.which is not None:
ch._mu.release()
return selected()
rx_, ok = ch._tryrecv()
if ok:
# don't let already queued cases win
g.which = "rx prepoll won" # !None
if not commaok:
rx, ok = rx_
rx_ = rx
return n, rx_
w = _RecvWaiting(g, ch)
w.sel_n = n
w.sel_commaok = commaok
ch._recvq.append(w)
ch._mu.release()
return selected()
finally: finally:
# unsubscribe not-succeeded waiters # decref not sent tx (see ^^^ send prepare)
g.dequeAll() for i in range(n):
if casev[i].op == _CHANSEND and (i != selected):
p_tx = <PyObject **>casev[i].data
# _blockforever blocks current goroutine forever. _tx = p_tx[0]
_tblockforever = None tx = <object>_tx
def _blockforever(): Py_DECREF(tx)
if _tblockforever is not None:
_tblockforever() # return what was selected
# take a lock twice. It will forever block on the second lock attempt. cdef _chanop op = casev[selected].op
# Under gevent, similarly to Go, this raises "LoopExit: This operation if op == _DEFAULT:
# would block forever", if there are no other greenlets scheduled to be run. return selected, None
dead = threading.Lock() if op == _CHANSEND:
dead.acquire() return selected, None
dead.acquire()
if op != _CHANRECV:
raise AssertionError("pyselect: chanselect returned with bad op")
# we received NULL or the object; if it is object, corresponding channel
# dropped pointer to it (see pychan.recv_ for details).
cdef object rx = None
if _rx != NULL:
rx = <object>_rx
Py_DECREF(rx)
if casev[selected].rxok != NULL:
return selected, (rx, rxok)
else:
return selected, rx
# ---- init libgolang runtime --- # ---- init libgolang runtime ---
...@@ -668,9 +386,30 @@ cdef void _init_libgolang() except*: ...@@ -668,9 +386,30 @@ cdef void _init_libgolang() except*:
# ---- misc ---- # ---- misc ----
cdef extern from "golang/libgolang.h" namespace "golang" nogil: cdef extern from "golang/libgolang.h" namespace "golang" nogil:
int _chanrefcnt(_chan *ch)
int _chanselect(_selcase *casev, int casec)
void _taskgo(void (*f)(void *), void *arg) void _taskgo(void (*f)(void *), void *arg)
cdef nogil: cdef nogil:
chan[pPyObject] makechan_pyobj_pyexc(unsigned size) except +topyexc:
return makechan[pPyObject](size)
void chansend_pyexc(chan[pPyObject] ch, PyObject *_tx) except +topyexc:
ch.send(_tx)
(PyObject*, bint) chanrecv__pyexc(chan[pPyObject] ch) except +topyexc:
_ = ch.recv_()
return (_.first, _.second) # TODO teach Cython to coerce pair[X,Y] -> (X,Y)
void chanclose_pyexc(chan[pPyObject] ch) except +topyexc:
ch.close()
unsigned chanlen_pyexc(chan[pPyObject] ch) except +topyexc:
return ch.len()
int _chanselect_pyexc(const _selcase *casev, int casec) except +topyexc:
return _chanselect(casev, casec)
void _taskgo_pyexc(void (*f)(void *) nogil, void *arg) except +topyexc: void _taskgo_pyexc(void (*f)(void *) nogil, void *arg) except +topyexc:
_taskgo(f, arg) _taskgo(f, arg)
...@@ -26,21 +26,30 @@ ...@@ -26,21 +26,30 @@
from __future__ import print_function, absolute_import from __future__ import print_function, absolute_import
from golang cimport go, pychan, panic, pypanic, topyexc from golang cimport go, chan, _chan, makechan, pychan, nil, select, \
from golang import nilchan default, structZ, panic, pypanic, topyexc, cbool
from golang import _golang
from golang import time cdef extern from "golang/libgolang.h" namespace "golang" nogil:
int _tchanrecvqlen(_chan *ch)
int _tchansendqlen(_chan *ch)
void (*_tblockforever)()
# pylen_{recv,send}q returns len(pych._{recv,send}q) # pylen_{recv,send}q returns len(_chan._{recv,send}q)
def pylen_recvq(pychan pych not None): # -> int def pylen_recvq(pychan pych not None): # -> int
if pych is nilchan: if pych.ch == nil:
raise AssertionError('len(.recvq) on nil channel') raise AssertionError('len(.recvq) on nil channel')
return len(pych._recvq) return _tchanrecvqlen(pych.ch._rawchan())
def pylen_sendq(pychan pych not None): # -> int def pylen_sendq(pychan pych not None): # -> int
if pych is nilchan: if pych.ch == nil:
raise AssertionError('len(.sendq) on nil channel') raise AssertionError('len(.sendq) on nil channel')
return len(pych._sendq) return _tchansendqlen(pych.ch._rawchan())
# runtime/libgolang_test.cpp
cdef extern from *:
"""
extern void waitBlocked(golang::_chan *ch, bool rx, bool tx);
"""
void waitBlocked(_chan *, bint rx, bint tx) nogil except +topyexc
# pywaitBlocked waits till a receive or send pychan operation blocks waiting on the channel. # pywaitBlocked waits till a receive or send pychan operation blocks waiting on the channel.
# #
...@@ -49,7 +58,8 @@ def pywaitBlocked(pychanop): ...@@ -49,7 +58,8 @@ def pywaitBlocked(pychanop):
if pychanop.__self__.__class__ is not pychan: if pychanop.__self__.__class__ is not pychan:
pypanic("wait blocked: %r is method of a non-chan: %r" % (pychanop, pychanop.__self__.__class__)) pypanic("wait blocked: %r is method of a non-chan: %r" % (pychanop, pychanop.__self__.__class__))
cdef pychan pych = pychanop.__self__ cdef pychan pych = pychanop.__self__
recv = send = False cdef bint recv = False
cdef bint send = False
if pychanop.__name__ == "recv": # XXX better check PyCFunction directly if pychanop.__name__ == "recv": # XXX better check PyCFunction directly
recv = True recv = True
elif pychanop.__name__ == "send": # XXX better check PyCFunction directly elif pychanop.__name__ == "send": # XXX better check PyCFunction directly
...@@ -57,41 +67,84 @@ def pywaitBlocked(pychanop): ...@@ -57,41 +67,84 @@ def pywaitBlocked(pychanop):
else: else:
pypanic("wait blocked: unexpected chan method: %r" % (pychanop,)) pypanic("wait blocked: unexpected chan method: %r" % (pychanop,))
t0 = time.now() with nogil:
while 1: waitBlocked(pych.ch._rawchan(), recv, send)
with pych._mu:
if recv and pylen_recvq(pych) > 0:
return
if send and pylen_sendq(pych) > 0:
return
now = time.now()
if now-t0 > 10: # waited > 10 seconds - likely deadlock
pypanic("deadlock")
time.sleep(0) # yield to another thread / coroutine
# `with pypanicWhenBlocked` hooks into _golang._blockforever to raise panic with # `with pypanicWhenBlocked` hooks into libgolang _blockforever to raise panic with
# "t: blocks forever" instead of blocking. # "t: blocks forever" instead of blocking.
cdef class pypanicWhenBlocked: cdef class pypanicWhenBlocked:
def __enter__(t): def __enter__(t):
assert _golang._tblockforever is None global _tblockforever
_golang._tblockforever = _panicblocked _tblockforever = _panicblocked
return t return t
def __exit__(t, typ, val, tb): def __exit__(t, typ, val, tb):
_golang._tblockforever = None _tblockforever = NULL
def _panicblocked(): cdef void _panicblocked() nogil:
pypanic("t: blocks forever") panic("t: blocks forever")
# small test to verify pyx(nogil) channels.
ctypedef struct Point:
int x
int y
# TODO kill this and teach Cython to coerce pair[X,Y] -> (X,Y)
cdef (int, cbool) recv_(chan[int] ch) nogil:
_ = ch.recv_()
return (_.first, _.second)
cdef void _test_chan_nogil() nogil except +topyexc:
cdef chan[structZ] done = makechan[structZ]()
cdef chan[int] chi = makechan[int](1)
cdef chan[Point] chp = makechan[Point]()
chp = nil # reset to nil
cdef int i, j
cdef Point p
cdef cbool jok
i=+1; chi.send(i)
j=-1; j = chi.recv()
if not (j == i):
panic("send -> recv != I")
i = 2
_=select([
done.recvs(), # 0
chi.sends(&i), # 1
chp.recvs(&p), # 2
chi.recvs(&j, &jok), # 3
default, # 4
])
if _ != 1:
panic("select: selected !1")
j, jok = recv_(chi)
if not (j == 2 and jok == True):
panic("recv_ != (2, true)")
chi.close()
j, jok = recv_(chi)
if not (j == 0 and jok == False):
panic("recv_ from closed != (0, false)")
def test_chan_nogil():
with nogil:
_test_chan_nogil()
# small test to verify pyx(nogil) go. # small test to verify pyx(nogil) go.
cdef void _test_go_nogil() nogil except +topyexc: cdef void _test_go_nogil() nogil except +topyexc:
go(_work, 111) cdef chan[structZ] done = makechan[structZ]()
# TODO wait till _work is done go(_work, 111, done)
cdef void _work(int i) nogil: done.recv()
cdef void _work(int i, chan[structZ] done) nogil:
if i != 111: if i != 111:
panic("_work: i != 111") panic("_work: i != 111")
done.close()
def test_go_nogil(): def test_go_nogil():
with nogil: with nogil:
...@@ -101,9 +154,14 @@ def test_go_nogil(): ...@@ -101,9 +154,14 @@ def test_go_nogil():
# runtime/libgolang_test_c.c # runtime/libgolang_test_c.c
cdef extern from * nogil: cdef extern from * nogil:
""" """
extern "C" void _test_chan_c();
extern "C" void _test_go_c(); extern "C" void _test_go_c();
""" """
void _test_chan_c() except +topyexc
void _test_go_c() except +topyexc void _test_go_c() except +topyexc
def test_chan_c():
with nogil:
_test_chan_c()
def test_go_c(): def test_go_c():
with nogil: with nogil:
_test_go_c() _test_go_c()
...@@ -111,9 +169,24 @@ def test_go_c(): ...@@ -111,9 +169,24 @@ def test_go_c():
# runtime/libgolang_test.cpp # runtime/libgolang_test.cpp
cdef extern from * nogil: cdef extern from * nogil:
""" """
extern void _test_chan_cpp_refcount();
extern void _test_chan_cpp();
extern void _test_chan_vs_stackdeadwhileparked();
extern void _test_go_cpp(); extern void _test_go_cpp();
""" """
void _test_chan_cpp_refcount() except +topyexc
void _test_chan_cpp() except +topyexc
void _test_chan_vs_stackdeadwhileparked() except +topyexc
void _test_go_cpp() except +topyexc void _test_go_cpp() except +topyexc
def test_chan_cpp_refcount():
with nogil:
_test_chan_cpp_refcount()
def test_chan_cpp():
with nogil:
_test_chan_cpp()
def test_chan_vs_stackdeadwhileparked():
with nogil:
_test_chan_vs_stackdeadwhileparked()
def test_go_cpp(): def test_go_cpp():
with nogil: with nogil:
_test_go_cpp() _test_go_cpp()
...@@ -159,6 +159,11 @@ def test_chan(): ...@@ -159,6 +159,11 @@ def test_chan():
assert w2() is not None assert w2() is not None
ch = None ch = None
gc.collect() gc.collect()
# pypy needs another GC run: pychan does Py_DECREF on buffered objects, but
# on pypy cpyext objects are not deallocated from Py_DECREF even if
# ob_refcnt goes to zero - the deallocation is delayed until GC run.
# see also: http://doc.pypy.org/en/latest/discussion/rawrefcount.html
gc.collect()
assert w1() is None assert w1() is None
assert w2() is None assert w2() is None
......
...@@ -22,9 +22,10 @@ ...@@ -22,9 +22,10 @@
// Library Libgolang provides Go-like features for C and C++. // Library Libgolang provides Go-like features for C and C++.
// //
// Library Libgolang provides goroutines and other // Library Libgolang provides goroutines, channels with Go semantic and other
// accompanying features. The library consists of high-level type-safe C++ API, // accompanying features. The library consists of high-level type-safe C++ API,
// and low-level unsafe C API. // and low-level unsafe C API. The low-level C API was inspired by Libtask[1]
// and Plan9/Libthread[2].
// //
// The primary motivation for Libgolang is to serve as runtime for golang.pyx - // The primary motivation for Libgolang is to serve as runtime for golang.pyx -
// - Cython part of Pygolang project. However Libgolang is independent of // - Cython part of Pygolang project. However Libgolang is independent of
...@@ -35,12 +36,29 @@ ...@@ -35,12 +36,29 @@
// C++-level API // C++-level API
// //
// - `go` spawns new task. // - `go` spawns new task.
// - `chan<T>`, and `select` provide channels with Go semantic and automatic
// lifetime management.
// - `sleep` pauses current task. // - `sleep` pauses current task.
// - `panic` throws exception that represent C-level panic. // - `panic` throws exception that represent C-level panic.
// //
// For example: // For example:
// //
// go(worker, 1); // spawn worker(int) // chan<int> ch = makechan<int>(); // create new channel
// go(worker, ch, 1); // spawn worker(chan<int>, int)
// ch.send(1)
// j = ch.recv()
//
// _ = select({
// _default, // 0
// ch.sends(&i), // 1
// ch.recvs(&j), // 2
// });
// if (_ == 0)
// // default case selected
// if (_ == 1)
// // case 1 selected: i sent to ch
// if (_ == 2)
// // case 2 selected: j received from ch
// //
// if (<bug condition>) // if (<bug condition>)
// panic("bug"); // panic("bug");
...@@ -49,6 +67,10 @@ ...@@ -49,6 +67,10 @@
// C-level API // C-level API
// //
// - `_taskgo` spawns new task. // - `_taskgo` spawns new task.
// - `_makechan` creates raw channel with Go semantic.
// - `_chanxincref` and `_chanxdecref` manage channel lifetime.
// - `_chansend` and `_chanrecv` send/receive over raw channel.
// - `_chanselect`, `_selsend`, `_selrecv`, ... provide raw select functionality.
// - `tasknanosleep` pauses current task. // - `tasknanosleep` pauses current task.
// //
// //
...@@ -65,6 +87,10 @@ ...@@ -65,6 +87,10 @@
// //
// Once again, Libgolang itself is independent from Python, and other runtimes // Once again, Libgolang itself is independent from Python, and other runtimes
// are possible. // are possible.
//
//
// [1] Libtask: a Coroutine Library for C and Unix. https://swtch.com/libtask.
// [2] http://9p.io/magic/man2html/2/thread.
#include <stdbool.h> #include <stdbool.h>
#include <stddef.h> #include <stddef.h>
...@@ -104,10 +130,88 @@ LIBGOLANG_API void _taskgo(void (*f)(void *arg), void *arg); ...@@ -104,10 +130,88 @@ LIBGOLANG_API void _taskgo(void (*f)(void *arg), void *arg);
LIBGOLANG_API void _tasknanosleep(uint64_t dt); LIBGOLANG_API void _tasknanosleep(uint64_t dt);
LIBGOLANG_API uint64_t _nanotime(void); LIBGOLANG_API uint64_t _nanotime(void);
typedef struct _chan _chan;
LIBGOLANG_API _chan *_makechan(unsigned elemsize, unsigned size);
LIBGOLANG_API void _chanxincref(_chan *ch);
LIBGOLANG_API void _chanxdecref(_chan *ch);
LIBGOLANG_API int _chanrefcnt(_chan *ch);
LIBGOLANG_API void _chansend(_chan *ch, const void *ptx);
LIBGOLANG_API void _chanrecv(_chan *ch, void *prx);
LIBGOLANG_API bool _chanrecv_(_chan *ch, void *prx);
LIBGOLANG_API void _chanclose(_chan *ch);
LIBGOLANG_API unsigned _chanlen(_chan *ch);
LIBGOLANG_API unsigned _chancap(_chan *ch);
enum _chanop {
_CHANSEND = 0,
_CHANRECV = 1,
_DEFAULT = 2,
};
// _selcase represents one select case.
typedef struct _selcase {
_chan *ch; // channel
enum _chanop op; // chansend/chanrecv/default
void *data; // chansend: ptx; chanrecv: prx
bool *rxok; // chanrecv: where to save ok if !NULL; otherwise not used
} _selcase;
LIBGOLANG_API int _chanselect(const _selcase *casev, int casec);
// _selsend creates `_chansend(ch, ptx)` case for _chanselect.
static inline
_selcase _selsend(_chan *ch, const void *ptx) {
_selcase _ = {
.ch = ch,
.op = _CHANSEND,
.data = (void *)ptx,
.rxok = NULL,
};
return _;
}
// _selrecv creates `_chanrecv(ch, prx)` case for _chanselect.
static inline
_selcase _selrecv(_chan *ch, void *prx) {
_selcase _ = {
.ch = ch,
.op = _CHANRECV,
.data = prx,
.rxok = NULL,
};
return _;
}
// _selrecv_ creates `*pok = _chanrecv_(ch, prx)` case for _chanselect.
static inline
_selcase _selrecv_(_chan *ch, void *prx, bool *pok) {
_selcase _ = {
.ch = ch,
.op = _CHANRECV,
.data = prx,
.rxok = pok,
};
return _;
}
// _default represents default case for _chanselect.
extern LIBGOLANG_API const _selcase _default;
// libgolang runtime - the runtime must be initialized before any other libgolang use. // libgolang runtime - the runtime must be initialized before any other libgolang use.
typedef struct _libgolang_sema _libgolang_sema; typedef struct _libgolang_sema _libgolang_sema;
typedef enum _libgolang_runtime_flags {
// STACK_DEAD_WHILE_PARKED indicates that it is not safe to access
// goroutine's stack memory while the goroutine is parked.
//
// for example gevent/greenlet/stackless use it because they copy g's stack
// to heap on park and back on unpark. This way if objects on g's stack
// were accessed while g was parked it would be memory of another g's stack.
STACK_DEAD_WHILE_PARKED = 1,
} _libgolang_runtime_flags;
typedef struct _libgolang_runtime_ops { typedef struct _libgolang_runtime_ops {
_libgolang_runtime_flags flags;
// go should spawn a task (coroutine/thread/...). // go should spawn a task (coroutine/thread/...).
void (*go)(void (*f)(void *), void *arg); void (*go)(void (*f)(void *), void *arg);
...@@ -136,6 +240,11 @@ typedef struct _libgolang_runtime_ops { ...@@ -136,6 +240,11 @@ typedef struct _libgolang_runtime_ops {
LIBGOLANG_API void _libgolang_init(const _libgolang_runtime_ops *runtime_ops); LIBGOLANG_API void _libgolang_init(const _libgolang_runtime_ops *runtime_ops);
// for testing
LIBGOLANG_API int _tchanrecvqlen(_chan *ch);
LIBGOLANG_API int _tchansendqlen(_chan *ch);
LIBGOLANG_API extern void (*_tblockforever)(void);
#ifdef __cplusplus #ifdef __cplusplus
}} }}
#endif #endif
...@@ -145,8 +254,12 @@ LIBGOLANG_API void _libgolang_init(const _libgolang_runtime_ops *runtime_ops); ...@@ -145,8 +254,12 @@ LIBGOLANG_API void _libgolang_init(const _libgolang_runtime_ops *runtime_ops);
#ifdef __cplusplus #ifdef __cplusplus
#include <exception>
#include <functional> #include <functional>
#include <initializer_list>
#include <memory> #include <memory>
#include <type_traits>
#include <utility>
namespace golang { namespace golang {
...@@ -162,6 +275,122 @@ static inline void go(F /*std::function<void(Argv...)>*/ f, Argv... argv) { ...@@ -162,6 +275,122 @@ static inline void go(F /*std::function<void(Argv...)>*/ f, Argv... argv) {
}, frun); }, frun);
} }
template<typename T> class chan;
template<typename T> static chan<T> makechan(unsigned size=0);
// chan<T> provides type-safe wrapper over _chan.
//
// chan<T> is automatically reference-counted and is safe to use from multiple
// goroutines simultaneously.
template<typename T>
class chan {
_chan *_ch;
public:
inline chan() { _ch = NULL; } // nil channel if not explicitly initialized
friend chan<T> makechan<T>(unsigned size);
inline ~chan() { _chanxdecref(_ch); _ch = NULL; }
// = nil
inline chan(nullptr_t) { _ch = NULL; }
inline chan& operator=(nullptr_t) { _chanxdecref(_ch); _ch = NULL; return *this; }
// copy
inline chan(const chan& from) { _ch = from._ch; _chanxincref(_ch); }
inline chan& operator=(const chan& from) {
if (this != &from) {
_chanxdecref(_ch); _ch = from._ch; _chanxincref(_ch);
}
return *this;
}
// move
inline chan(chan&& from) { _ch = from._ch; from._ch = NULL; }
inline chan& operator=(chan&& from) {
if (this != &from) {
_chanxdecref(_ch); _ch = from._ch; from._ch = NULL;
}
return *this;
}
// _chan does plain memcpy to copy elements.
// TODO allow all types (e.g. element=chan )
static_assert(std::is_trivially_copyable<T>::value, "TODO chan<T>: T copy is not trivial");
// send/recv/close
inline void send(const T &ptx) { _chansend(_ch, &ptx); }
inline T recv() { T rx; _chanrecv(_ch, &rx); return rx; }
inline std::pair<T,bool> recv_() { T rx; bool ok = _chanrecv_(_ch, &rx);
return std::make_pair(rx, ok); }
inline void close() { _chanclose(_ch); }
// send/recv in select
// ch.sends creates `ch.send(*ptx)` case for select.
[[nodiscard]] inline _selcase sends(const T *ptx) { return _selsend(_ch, ptx); }
// ch.recvs creates `*prx = ch.recv()` case for select.
//
// if pok is provided the case is extended to `[*prx, *pok] = ch.recv_()`
// if both prx and pok are omitted the case is reduced to `ch.recv()`.
[[nodiscard]] inline _selcase recvs(T *prx=NULL, bool *pok=NULL) {
return _selrecv_(_ch, prx, pok);
}
// length/capacity
inline unsigned len() { return _chanlen(_ch); }
inline unsigned cap() { return _chancap(_ch); }
// compare wrt nil
inline bool operator==(nullptr_t) { return (_ch == NULL); }
inline bool operator!=(nullptr_t) { return (_ch != NULL); }
// compare wrt chan
inline bool operator==(const chan<T>& ch2) { return (_ch == ch2._ch); }
inline bool operator!=(const chan<T>& ch2) { return (_ch != ch2._ch); }
// for testing
inline _chan *_rawchan() { return _ch; }
};
// makechan<T> makes new chan<T> with capacity=size.
template<typename T> static inline
chan<T> makechan(unsigned size) {
chan<T> ch;
unsigned elemsize = std::is_empty<T>::value
? 0 // eg struct{} for which sizeof() gives 1 - *not* 0
: sizeof(T);
ch._ch = _makechan(elemsize, size);
if (ch._ch == NULL)
throw std::bad_alloc();
return ch;
}
// structZ is struct{}.
//
// it's a workaround for e.g. makechan<struct{}> giving
// "error: types may not be defined in template arguments".
struct structZ{};
// select, together with chan<T>.sends and chan<T>.recvs, provide type-safe
// wrappers over _chanselect and _selsend/_selrecv/_selrecv_.
//
// Usage example:
//
// _ = select({
// ch1.recvs(&v), // 0
// ch2.recvs(&v, &ok), // 1
// ch2.sends(&v), // 2
// _default, // 3
// })
static inline // select({case1, case2, case3})
int select(const std::initializer_list<const _selcase> &casev) {
return _chanselect(casev.begin(), casev.size());
}
template<size_t N> static inline // select(casev_array)
int select(const _selcase (&casev)[N]) {
return _chanselect(&casev[0], N);
}
namespace time { namespace time {
......
...@@ -23,18 +23,26 @@ ...@@ -23,18 +23,26 @@
# Small program that uses a bit of golang.pyx nogil features, mainly to verify # Small program that uses a bit of golang.pyx nogil features, mainly to verify
# that external project can build against golang in pyx mode. # that external project can build against golang in pyx mode.
from golang cimport go, topyexc from golang cimport go, chan, makechan, topyexc
from libc.stdio cimport printf from libc.stdio cimport printf
cdef nogil: cdef nogil:
void worker(int i, int j): void worker(chan[int] ch, int i, int j):
pass ch.send(i*j)
void _main() except +topyexc: void _main() except +topyexc:
cdef chan[int] ch = makechan[int]()
cdef int i cdef int i
for i in range(3): for i in range(3):
go(worker, i, 4) go(worker, ch, i, 4)
for i in range(3):
ch.recv()
ch.close()
#_, ok = ch.recv_() # TODO teach Cython to coerce pair[X,Y] -> (X,Y)
ch.recv_()
printf("test.pyx: OK\n") printf("test.pyx: OK\n")
......
...@@ -23,8 +23,12 @@ from libc.stdint cimport uint64_t ...@@ -23,8 +23,12 @@ from libc.stdint cimport uint64_t
cdef extern from "golang/libgolang.h" nogil: cdef extern from "golang/libgolang.h" nogil:
struct _libgolang_sema struct _libgolang_sema
enum _libgolang_runtime_flags:
STACK_DEAD_WHILE_PARKED
struct _libgolang_runtime_ops: struct _libgolang_runtime_ops:
_libgolang_runtime_flags flags
void (*go)(void (*f)(void *) nogil, void *arg); void (*go)(void (*f)(void *) nogil, void *arg);
_libgolang_sema* (*sema_alloc) () _libgolang_sema* (*sema_alloc) ()
......
...@@ -42,7 +42,7 @@ from cpython cimport Py_INCREF, Py_DECREF ...@@ -42,7 +42,7 @@ from cpython cimport Py_INCREF, Py_DECREF
from cython cimport final from cython cimport final
from golang.runtime._libgolang cimport _libgolang_runtime_ops, _libgolang_sema, \ from golang.runtime._libgolang cimport _libgolang_runtime_ops, _libgolang_sema, \
panic STACK_DEAD_WHILE_PARKED, panic
from golang.runtime cimport _runtime_thread from golang.runtime cimport _runtime_thread
...@@ -125,6 +125,10 @@ cdef nogil: ...@@ -125,6 +125,10 @@ cdef nogil:
# XXX const # XXX const
_libgolang_runtime_ops gevent_ops = _libgolang_runtime_ops( _libgolang_runtime_ops gevent_ops = _libgolang_runtime_ops(
# when greenlet is switched to another, its stack is copied to
# heap, and stack of switched-to greenlet is copied back to C stack.
flags = STACK_DEAD_WHILE_PARKED,
go = go, go = go,
sema_alloc = sema_alloc, sema_alloc = sema_alloc,
sema_free = sema_free, sema_free = sema_free,
......
...@@ -80,7 +80,7 @@ cdef extern from "pythread.h" nogil: ...@@ -80,7 +80,7 @@ cdef extern from "pythread.h" nogil:
void PyThread_free_lock(PyThread_type_lock) void PyThread_free_lock(PyThread_type_lock)
from golang.runtime._libgolang cimport _libgolang_runtime_ops, _libgolang_sema, \ from golang.runtime._libgolang cimport _libgolang_runtime_ops, _libgolang_sema, \
panic _libgolang_runtime_flags, panic
from libc.stdint cimport uint64_t, UINT64_MAX from libc.stdint cimport uint64_t, UINT64_MAX
IF POSIX: IF POSIX:
...@@ -168,6 +168,7 @@ cdef nogil: ...@@ -168,6 +168,7 @@ cdef nogil:
# XXX const # XXX const
_libgolang_runtime_ops thread_ops = _libgolang_runtime_ops( _libgolang_runtime_ops thread_ops = _libgolang_runtime_ops(
flags = <_libgolang_runtime_flags>0,
go = go, go = go,
sema_alloc = sema_alloc, sema_alloc = sema_alloc,
sema_free = sema_free, sema_free = sema_free,
......
...@@ -20,18 +20,26 @@ ...@@ -20,18 +20,26 @@
// Library Libgolang provides Go-like features for C and C++. // Library Libgolang provides Go-like features for C and C++.
// See libgolang.h for library overview. // See libgolang.h for library overview.
// Pygolang C part: provides runtime implementation of panic, etc. // Pygolang C part: provides runtime implementation of panic, channels, etc.
// //
// C++ (not C) is used: // C++ (not C) is used:
// - to implement C-level panic (via C++ exceptions). // - to implement C-level panic (via C++ exceptions).
// - to provide chan<T> template that can be used as chan[T] in Cython.
// - because Cython (currently ?) does not allow to add methods to `cdef struct`.
#include "golang/libgolang.h" #include "golang/libgolang.h"
#include <algorithm>
#include <atomic>
#include <exception> #include <exception>
#include <functional>
#include <limits> #include <limits>
#include <memory>
#include <mutex> // lock_guard #include <mutex> // lock_guard
#include <random>
#include <string> #include <string>
#include <stdlib.h>
#include <string.h> #include <string.h>
// linux/list.h needs ARRAY_SIZE XXX -> better use c.h or ccan/array_size.h ? // linux/list.h needs ARRAY_SIZE XXX -> better use c.h or ccan/array_size.h ?
...@@ -40,9 +48,14 @@ ...@@ -40,9 +48,14 @@
#endif #endif
#include <linux/list.h> #include <linux/list.h>
using std::atomic;
using std::bad_alloc;
using std::exception; using std::exception;
using std::max;
using std::numeric_limits; using std::numeric_limits;
using std::string; using std::string;
using std::unique_ptr;
using std::vector;
namespace golang { namespace golang {
...@@ -168,6 +181,949 @@ private: ...@@ -168,6 +181,949 @@ private:
// with_lock mimics `with mu` from python. // with_lock mimics `with mu` from python.
#define with_lock(mu) std::lock_guard<Mutex> _with_lock_ ## __COUNTER__ (mu) #define with_lock(mu) std::lock_guard<Mutex> _with_lock_ ## __COUNTER__ (mu)
// defer(f) mimics defer from golang.
// XXX f is called at end of current scope, not function.
#define defer(f) _deferred _defer_ ## __COUNTER__ (f)
struct _deferred {
typedef std::function<void(void)> F;
F f;
_deferred(F f) : f(f) {}
~_deferred() { f(); }
private:
_deferred(const _deferred&); // don't copy
};
// ---- channels -----
struct _WaitGroup;
struct _RecvSendWaiting;
// _chan is a raw channel with Go semantic.
//
// Over raw channel the data is sent/received via elemsize'ed memcpy of void*
// and the caller must make sure to pass correct arguments.
//
// See chan<T> for type-safe wrapper.
//
// _chan is not related to Python runtime and works without GIL if libgolang
// runtime works without GIL(*).
//
// (*) for example "thread" runtime works without GIL, while "gevent" runtime
// acquires GIL on every semaphore acquire.
struct _chan {
atomic<int> _refcnt; // reference counter for _chan object
unsigned _cap; // channel capacity (in elements)
unsigned _elemsize; // size of element
Mutex _mu;
list_head _recvq; // blocked receivers (_ -> _RecvSendWaiting.in_rxtxq)
list_head _sendq; // blocked senders (_ -> _RecvSendWaiting.in_rxtxq)
bool _closed;
// data queue (circular buffer) goes past _chan memory and occupies [_cap*_elemsize] bytes.
unsigned _dataq_n; // total number of entries in dataq
unsigned _dataq_r; // index for next read (in elements; can be used only if _dataq_n > 0)
unsigned _dataq_w; // index for next write (in elements; can be used only if _dataq_n < _cap)
void incref();
void decref();
int refcnt();
void send(const void *ptx);
bool recv_(void *prx);
void recv(void *prx);
bool _trysend(const void *tx);
bool _tryrecv(void *prx, bool *pok);
void close();
unsigned len();
unsigned cap();
void _dataq_append(const void *ptx);
void _dataq_popleft(void *prx);
private:
_chan(const _chan&); // don't copy
template<bool onstack> void _send2 (const void *);
void __send2 (const void *, _WaitGroup*, _RecvSendWaiting*);
template<bool onstack> bool _recv2_(void *);
bool __recv2_(void *, _WaitGroup*, _RecvSendWaiting*);
};
// _RecvSendWaiting represents a receiver/sender waiting on a chan.
struct _RecvSendWaiting {
_WaitGroup *group; // group of waiters this receiver/sender is part of
_chan *chan; // channel receiver/sender is waiting on
list_head in_rxtxq; // in recv or send queue of the channel (_chan._recvq|_sendq -> _)
// recv: on wakeup: sender|closer -> receiver; NULL means "don't copy received value"
// send: ptr-to data to send
void *pdata;
// on wakeup: whether recv/send succeeded (send fails on close)
bool ok;
// this wait is used in under select as case #sel_n
int sel_n;
_RecvSendWaiting();
void init(_WaitGroup *group, _chan *ch);
void wakeup(bool ok);
private:
_RecvSendWaiting(const _RecvSendWaiting&); // don't copy
};
// _WaitGroup is a group of waiting senders and receivers.
//
// Only 1 waiter from the group can succeed waiting.
struct _WaitGroup {
Sema _sema; // used for wakeup
Mutex _mu; // lock NOTE ∀ chan order is always: chan._mu > ._mu
// on wakeup: sender|receiver -> group:
// .which _{Send|Recv}Waiting instance which succeeded waiting.
const _RecvSendWaiting *which;
_WaitGroup();
bool try_to_win(_RecvSendWaiting *waiter);
void wait();
void wakeup();
private:
_WaitGroup(const _WaitGroup&); // don't copy
};
// Default _RecvSendWaiting ctor creates zero-value _RecvSendWaiting.
// zero value _RecvSendWaiting is invalid and must be initialized via .init before use.
_RecvSendWaiting::_RecvSendWaiting() {
_RecvSendWaiting *w = this;
bzero((void *)w, sizeof(*w));
}
// init initializes waiter to be part of group waiting on ch.
void _RecvSendWaiting::init(_WaitGroup *group, _chan *ch) {
_RecvSendWaiting *w = this;
if (w->group != NULL)
bug("_RecvSendWaiting: double init");
w->group = group;
w->chan = ch;
INIT_LIST_HEAD(&w->in_rxtxq);
w->pdata = NULL;
w->ok = false;
w->sel_n = -1;
}
// wakeup notifies waiting receiver/sender that corresponding operation completed.
void _RecvSendWaiting::wakeup(bool ok) {
_RecvSendWaiting *w = this;
w->ok = ok;
w->group->wakeup();
}
_WaitGroup::_WaitGroup() {
_WaitGroup *group = this;
group->_sema.acquire();
group->which = NULL;
}
// try_to_win tries to win waiter after it was dequeued from a channel's {_send|_recv}q.
//
// -> won: true if won, false - if not.
bool _WaitGroup::try_to_win(_RecvSendWaiting *waiter) { // -> won
_WaitGroup *group = this;
bool won;
group->_mu.lock();
if (group->which != NULL) {
won = false;
}
else {
group->which = waiter;
won = true;
}
group->_mu.unlock();
return won;
}
// wait waits for winning case of group to complete.
void _WaitGroup::wait() {
_WaitGroup *group = this;
group->_sema.acquire();
}
// wakeup wakes up the group.
//
// prior to wakeup try_to_win must have been called.
// in practice this means that waiters queued to chan.{_send|_recv}q must
// be dequeued with _dequeWaiter.
void _WaitGroup::wakeup() {
_WaitGroup *group = this;
if (group->which == NULL)
bug("wakeup: group.which=nil");
group->_sema.release();
}
// _dequeWaiter dequeues a send or recv waiter from a channel's _recvq or _sendq.
//
// the channel owning {_recv|_send}q must be locked.
_RecvSendWaiting *_dequeWaiter(list_head *queue) {
while (!list_empty(queue)) {
_RecvSendWaiting *w = list_entry(queue->next, _RecvSendWaiting, in_rxtxq);
list_del_init(&w->in_rxtxq); // _init is important as we can try to remove the
// waiter the second time in select.
// if this waiter can win its group - return it.
// if not - someone else from its group already has won, and so we anyway have
// to remove the waiter from the queue.
if (w->group->try_to_win(w)) {
return w;
}
}
return NULL;
}
// _makechan creates new _chan(elemsize, size).
//
// returned channel has refcnt=1.
_chan *_makechan(unsigned elemsize, unsigned size) {
_chan *ch;
ch = (_chan *)malloc(sizeof(_chan) + size*elemsize);
if (ch == NULL)
return NULL;
memset((void *)ch, 0, sizeof(*ch));
new (&ch->_mu) Sema();
ch->_refcnt = 1;
ch->_cap = size;
ch->_elemsize = elemsize;
ch->_closed = false;
INIT_LIST_HEAD(&ch->_recvq);
INIT_LIST_HEAD(&ch->_sendq);
return ch;
}
// _chanxincref increments reference counter of the channel.
//
// it is noop if ch=nil.
void _chanxincref(_chan *ch) {
if (ch == NULL)
return;
ch->incref();
}
void _chan::incref() {
_chan *ch = this;
int refcnt_was = ch->_refcnt.fetch_add(+1);
if (refcnt_was < 1)
panic("chan: incref: refcnt was < 1");
}
// _chanxdecref decrements reference counter of the channel.
//
// if refcnt goes to zero, the channel is deallocated.
// it is noop if ch=nil.
void _chanxdecref(_chan *ch) {
if (ch == NULL)
return;
ch->decref();
}
void _chan::decref() {
_chan *ch = this;
int refcnt_was = ch->_refcnt.fetch_add(-1);
if (refcnt_was < 1)
panic("chan: decref: refcnt was < 1");
if (refcnt_was != 1)
return;
// refcnt=0 -> free the channel
ch->_mu.~Mutex();
memset((void *)ch, 0, sizeof(*ch) + ch->_cap*ch->_elemsize);
free(ch);
}
// _chanrefcnt returns current reference counter of the channel.
//
// NOTE if returned refcnt is > 1, the caller, due to concurrent execution of
// other goroutines, cannot generally assume that the reference counter won't change.
int _chanrefcnt(_chan *ch) {
return ch->refcnt();
}
int _chan::refcnt() {
_chan *ch = this;
return ch->_refcnt;
}
void _blockforever();
// send sends data to a receiver.
//
// sizeof(*ptx) must be ch._elemsize.
void _chansend(_chan *ch, const void *ptx) {
if (ch == NULL) // NOTE: cannot do this check in _chan::send
_blockforever(); // (C++ assumes `this` is never NULL and optimizes it out)
ch->send(ptx);
}
template<> void _chan::_send2</*onstack=*/true> (const void *ptx);
template<> void _chan::_send2</*onstack=*/false>(const void *ptx);
void _chan::send(const void *ptx) {
_chan *ch = this;
ch->_mu.lock();
bool done = ch->_trysend(ptx);
if (done)
return;
(_runtime->flags & STACK_DEAD_WHILE_PARKED) \
? ch->_send2</*onstack=*/false>(ptx)
: ch->_send2</*onstack=*/true >(ptx);
}
template<> void _chan::_send2</*onstack=*/true> (const void *ptx) {
_WaitGroup g;
_RecvSendWaiting me;
__send2(ptx, &g, &me);
}
template<> void _chan::_send2</*onstack=*/false>(const void *ptx) { _chan *ch = this;
unique_ptr<_WaitGroup> g (new _WaitGroup);
unique_ptr<_RecvSendWaiting> me (new _RecvSendWaiting);
// ptx stack -> heap (if ptx is on stack) TODO avoid copy if ptx is !onstack
void *ptx_onheap = malloc(ch->_elemsize);
if (ptx_onheap == NULL) {
ch->_mu.unlock();
throw bad_alloc();
}
memcpy(ptx_onheap, ptx, ch->_elemsize);
defer([&]() {
free(ptx_onheap);
});
__send2(ptx_onheap, g.get(), me.get());
}
void _chan::__send2(const void *ptx, _WaitGroup *g, _RecvSendWaiting *me) { _chan *ch = this;
me->init(g, ch);
me->pdata = (void *)ptx; // we add it to _sendq; the memory will be only read
me->ok = false;
list_add_tail(&me->in_rxtxq, &ch->_sendq);
ch->_mu.unlock();
g->wait();
if (g->which != me)
bug("chansend: g.which != me");
if (!me->ok)
panic("send on closed channel");
}
// recv_ is "comma-ok" version of recv.
//
// ok is true - if receive was delivered by a successful send.
// ok is false - if receive is due to channel being closed and empty.
//
// sizeof(*prx) must be ch._elemsize | prx=NULL.
bool _chanrecv_(_chan *ch, void *prx) {
if (ch == NULL)
_blockforever();
return ch->recv_(prx);
}
template<> bool _chan::_recv2_</*onstack=*/true> (void *prx);
template<> bool _chan::_recv2_</*onstack=*/false>(void *prx);
bool _chan::recv_(void *prx) { // -> ok
_chan *ch = this;
ch->_mu.lock();
bool ok, done = ch->_tryrecv(prx, &ok);
if (done)
return ok;
return (_runtime->flags & STACK_DEAD_WHILE_PARKED) \
? ch->_recv2_</*onstack=*/false>(prx)
: ch->_recv2_</*onstack=*/true> (prx);
}
template<> bool _chan::_recv2_</*onstack=*/true> (void *prx) {
_WaitGroup g;
_RecvSendWaiting me;
return __recv2_(prx, &g, &me);
}
template<> bool _chan::_recv2_</*onstack=*/false>(void *prx) { _chan *ch = this;
unique_ptr<_WaitGroup> g (new _WaitGroup);
unique_ptr<_RecvSendWaiting> me (new _RecvSendWaiting);
if (prx == NULL)
return __recv2_(prx, g.get(), me.get());
// prx stack -> onheap + copy back (if prx is on stack) TODO avoid copy if prx is !onstack
void *prx_onheap = malloc(ch->_elemsize);
if (prx_onheap == NULL) {
ch->_mu.unlock();
throw bad_alloc();
}
defer([&]() {
free(prx_onheap);
});
bool ok = __recv2_(prx_onheap, g.get(), me.get());
memcpy(prx, prx_onheap, ch->_elemsize);
return ok;
}
bool _chan::__recv2_(void *prx, _WaitGroup *g, _RecvSendWaiting *me) { _chan *ch = this;
me->init(g, ch);
me->pdata = prx;
me->ok = false;
list_add_tail(&me->in_rxtxq, &ch->_recvq);
ch->_mu.unlock();
g->wait();
if (g->which != me)
bug("chanrecv: g.which != me");
return me->ok;
}
// recv receives from the channel.
//
// if prx != NULL received value is put into *prx.
void _chanrecv(_chan *ch, void *prx) {
if (ch == NULL)
_blockforever();
ch->recv(prx);
}
void _chan::recv(void *prx) {
_chan *ch = this;
(void)ch->recv_(prx);
return;
}
// _trysend(ch, *ptx) -> done
//
// must be called with ._mu held.
// if done or panic - returns with ._mu released.
// if !done - returns with ._mu still being held.
bool _chan::_trysend(const void *ptx) { // -> done
_chan *ch = this;
if (ch->_closed) {
ch->_mu.unlock();
panic("send on closed channel");
}
// synchronous channel
if (ch->_cap == 0) {
_RecvSendWaiting *recv = _dequeWaiter(&ch->_recvq);
if (recv == NULL)
return false;
ch->_mu.unlock();
if (recv->pdata != NULL)
memcpy(recv->pdata, ptx, ch->_elemsize);
recv->wakeup(/*ok=*/true);
return true;
}
// buffered channel
else {
if (ch->_dataq_n >= ch->_cap)
return false;
ch->_dataq_append(ptx);
_RecvSendWaiting *recv = _dequeWaiter(&ch->_recvq);
if (recv != NULL) {
ch->_dataq_popleft(recv->pdata);
ch->_mu.unlock();
recv->wakeup(/*ok=*/true);
} else {
ch->_mu.unlock();
}
return true;
}
}
// _tryrecv() -> (*prx, *pok), done
//
// must be called with ._mu held.
// if done or panic - returns with ._mu released.
// if !done - returns with ._mu still being held.
//
// if !done - (*prx, *pok) are left unmodified.
// if prx=NULL received value is not copied into *prx.
bool _chan::_tryrecv(void *prx, bool *pok) { // -> done
_chan *ch = this;
// buffered
if (ch->_dataq_n > 0) {
ch->_dataq_popleft(prx);
*pok = true;
// wakeup a blocked writer, if there is any
_RecvSendWaiting *send = _dequeWaiter(&ch->_sendq);
if (send != NULL) {
ch->_dataq_append(send->pdata);
ch->_mu.unlock();
send->wakeup(/*ok=*/true);
} else {
ch->_mu.unlock();
}
return true;
}
// closed
if (ch->_closed) {
ch->_mu.unlock();
if (prx != NULL)
memset(prx, 0, ch->_elemsize);
*pok = false;
return true;
}
// sync | empty: there is waiting writer
_RecvSendWaiting *send = _dequeWaiter(&ch->_sendq);
if (send == NULL)
return false;
ch->_mu.unlock();
if (prx != NULL)
memcpy(prx, send->pdata, ch->_elemsize);
*pok = true;
send->wakeup(/*ok=*/true);
return true;
}
// close closes sending side of the channel.
void _chanclose(_chan *ch) {
if (ch == NULL)
panic("close of nil channel");
ch->close();
}
void _chan::close() {
_chan *ch = this;
ch->_mu.lock();
if (ch->_closed) {
ch->_mu.unlock();
panic("close of closed channel");
}
ch->_closed = true;
// wake-up all readers
while (1) {
_RecvSendWaiting *recv = _dequeWaiter(&ch->_recvq);
if (recv == NULL)
break;
ch->_mu.unlock();
if (recv->pdata != NULL)
memset(recv->pdata, 0, ch->_elemsize);
recv->wakeup(/*ok=*/false);
ch->_mu.lock();
}
// wake-up all writers (they will panic)
while (1) {
_RecvSendWaiting *send = _dequeWaiter(&ch->_sendq);
if (send == NULL)
break;
ch->_mu.unlock();
send->wakeup(/*ok=*/false);
ch->_mu.lock();
}
ch->_mu.unlock();
}
// len returns current number of buffered elements.
unsigned _chanlen(_chan *ch) {
if (ch == NULL)
return 0; // len(nil) = 0
return ch->len();
}
unsigned _chan::len() {
_chan *ch = this;
ch->_mu.lock(); // only to make valgrind happy
unsigned len = ch->_dataq_n;
ch->_mu.unlock();
return len;
}
// cap returns channel capacity.
unsigned _chancap(_chan *ch) {
if (ch == NULL)
return 0; // cap(nil) = 0
return ch->cap();
}
unsigned _chan::cap() {
_chan *ch = this;
return ch->_cap;
}
// _dataq_append appends next element to ch._dataq.
// called with ch._mu locked.
void _chan::_dataq_append(const void *ptx) {
_chan *ch = this;
if (ch->_dataq_n >= ch->_cap)
bug("chan: dataq.append on full dataq");
if (ch->_dataq_w >= ch->_cap)
bug("chan: dataq.append: w >= cap");
memcpy(&((char *)(ch+1))[ch->_dataq_w * ch->_elemsize], ptx, ch->_elemsize);
ch->_dataq_w++; ch->_dataq_w %= ch->_cap;
ch->_dataq_n++;
}
// _dataq_popleft pops oldest element from ch._dataq into *prx.
// called with ch._mu locked.
// if prx=NULL the element is popped, but not copied anywhere.
void _chan::_dataq_popleft(void *prx) {
_chan *ch = this;
if (ch->_dataq_n == 0)
bug("chan: dataq.popleft on empty dataq");
if (ch->_dataq_r >= ch->_cap)
bug("chan: dataq.popleft: r >= cap");
if (prx != NULL)
memcpy(prx, &((char *)(ch+1))[ch->_dataq_r * ch->_elemsize], ch->_elemsize);
ch->_dataq_r++; ch->_dataq_r %= ch->_cap;
ch->_dataq_n--;
}
// ---- select ----
// _default represents default case for _select.
const _selcase _default = {
.ch = NULL,
.op = _DEFAULT,
.data = NULL,
.rxok = NULL,
};
static const _RecvSendWaiting _sel_txrx_prepoll_won;
template<bool onstack> static int _chanselect2(const _selcase *, int, const vector<int>&);
template<> int _chanselect2</*onstack=*/true> (const _selcase *, int, const vector<int>&);
template<> int _chanselect2</*onstack=*/false>(const _selcase *, int, const vector<int>&);
static int __chanselect2(const _selcase *, int, const vector<int>&, _WaitGroup*);
// _chanselect executes one ready send or receive channel case.
//
// if no case is ready and default case was provided, select chooses default.
// if no case is ready and default was not provided, select blocks until one case becomes ready.
//
// returns: selected case number.
//
// For example:
//
// _selcase sel[4];
// sel[0] = _selsend(chi, &i);
// sel[1] = _selrecv(chp, &p);
// sel[2] = _selrecv_(chi, &j, &jok);
// sel[3] = _default;
// _ = _chanselect(sel, 4);
//
// See `select` for user-friendly wrapper.
// NOTE casev is not modified and can be used for next _chanselect calls.
int _chanselect(const _selcase *casev, int casec) {
if (casec < 0)
panic("select: casec < 0");
// select promise: if multiple cases are ready - one will be selected randomly
vector<int> nv(casec); // n -> n(case) TODO -> caller stack-allocate nv
for (int i=0; i <casec; i++)
nv[i] = i;
std::random_shuffle(nv.begin(), nv.end());
// first pass: poll all cases and bail out in the end if default was provided
int ndefault = -1;
bool havenonnil = false; // whether we have at least one !nil channel
for (auto n : nv) {
const _selcase *cas = &casev[n];
_chan *ch = cas->ch;
// default: remember we have it
if (cas->op == _DEFAULT) {
if (ndefault != -1)
panic("select: multiple default");
ndefault = n;
}
// send
else if (cas->op == _CHANSEND) {
if (ch != NULL) { // nil chan is never ready
ch->_mu.lock();
if (1) {
bool done = ch->_trysend(cas->data);
if (done)
return n;
}
ch->_mu.unlock();
havenonnil = true;
}
}
// recv
else if (cas->op == _CHANRECV) {
if (ch != NULL) { // nil chan is never ready
ch->_mu.lock();
if (1) {
bool ok, done = ch->_tryrecv(cas->data, &ok);
if (done) {
if (cas->rxok != NULL)
*cas->rxok = ok;
return n;
}
}
ch->_mu.unlock();
havenonnil = true;
}
}
// bad
else {
panic("select: invalid op");
}
}
// execute default if we have it
if (ndefault != -1)
return ndefault;
// select{} or with nil-channels only -> block forever
if (!havenonnil)
_blockforever();
// second pass: subscribe and wait on all rx/tx cases
return (_runtime->flags & STACK_DEAD_WHILE_PARKED) \
? _chanselect2</*onstack=*/false>(casev, casec, nv)
: _chanselect2</*onstack=*/true> (casev, casec, nv);
}
template<> int _chanselect2</*onstack=*/true> (const _selcase *casev, int casec, const vector<int>& nv) {
_WaitGroup g;
return __chanselect2(casev, casec, nv, &g);
}
template<> int _chanselect2</*onstack=*/false>(const _selcase *casev, int casec, const vector<int>& nv) {
unique_ptr<_WaitGroup> g (new _WaitGroup);
int i;
unsigned rxmax=0, txtotal=0;
// reallocate chan .tx / .rx to heap; adjust casev
// XXX avoid doing this if all .tx and .rx are on heap?
unique_ptr<_selcase[]> casev_onheap (new _selcase[casec]);
for (i = 0; i < casec; i++) {
const _selcase *cas = &casev[i];
casev_onheap[i] = *cas;
if (cas->ch == NULL) // nil chan
continue;
if (cas->op == _CHANSEND) {
txtotal += cas->ch->_elemsize;
}
else if (cas->op == _CHANRECV) {
rxmax = max(rxmax, cas->ch->_elemsize);
}
else {
bug("select: invalid op ; _chanselect2: !onstack: A");
}
}
// tx are appended sequentially; all rx go to &rxtxdata[0]
char *rxtxdata = (char *)malloc(max(rxmax, txtotal));
if (rxtxdata == NULL)
throw bad_alloc();
defer([&]() {
free(rxtxdata);
});
char *ptx = rxtxdata;
for (i = 0; i <casec; i++) {
_selcase *cas = &casev_onheap[i];
if (cas->ch == NULL) // nil chan
continue;
if (cas->op == _CHANSEND) {
memcpy(ptx, cas->data, cas->ch->_elemsize);
cas->data = ptx;
ptx += cas->ch->_elemsize;
}
else if (cas->op == _CHANRECV) {
cas->data = rxtxdata;
} else {
bug("select: invalid op ; _chanselect2: !onstack: B");
}
}
// select ...
int selected = __chanselect2(casev_onheap.get(), casec, nv, g.get());
// copy data back to original rx location.
_selcase *cas = &casev_onheap[selected];
if (cas->op == _CHANRECV) {
const _selcase *cas0 = &casev[selected];
if (cas0->data != NULL)
memcpy(cas0->data, cas->data, cas->ch->_elemsize);
}
return selected;
}
static int __chanselect2(const _selcase *casev, int casec, const vector<int>& nv, _WaitGroup* g) {
// storage for waiters we create XXX stack-allocate (if !STACK_DEAD_WHILE_PARKED)
// XXX or let caller stack-allocate? but then we force it to know sizeof(_RecvSendWaiting)
_RecvSendWaiting *waitv = (_RecvSendWaiting *)calloc(sizeof(_RecvSendWaiting), casec);
int waitc = 0;
if (waitv == NULL)
throw bad_alloc();
// on exit: remove all registered waiters from their wait queues.
defer([&]() {
for (int i = 0; i < waitc; i++) {
_RecvSendWaiting *w = &waitv[i];
w->chan->_mu.lock();
list_del_init(&w->in_rxtxq); // thanks to _init used in _dequeWaiter
w->chan->_mu.unlock(); // it is ok to del twice even if w was already removed
}
free(waitv);
waitv = NULL;
});
for (auto n : nv) {
const _selcase *cas = &casev[n];
_chan *ch = cas->ch;
if (ch == NULL) // nil chan is never ready
continue;
ch->_mu.lock();
with_lock(g->_mu); // with, because _trysend may panic
// a case that we previously queued already won while we were
// queuing other cases.
if (g->which != NULL) {
ch->_mu.unlock();
goto ready;
}
// send
if (cas->op == _CHANSEND) {
bool done = ch->_trysend(cas->data);
if (done) {
g->which = &_sel_txrx_prepoll_won; // !NULL not to let already queued cases win
return n;
}
if (waitc >= casec)
bug("select: waitv overflow");
_RecvSendWaiting *w = &waitv[waitc++];
w->init(g, ch);
w->pdata = cas->data;
w->ok = false;
w->sel_n = n;
list_add_tail(&w->in_rxtxq, &ch->_sendq);
}
// recv
else if (cas->op == _CHANRECV) {
bool ok, done = ch->_tryrecv(cas->data, &ok);
if (done) {
g->which = &_sel_txrx_prepoll_won; // !NULL not to let already queued cases win
if (cas->rxok != NULL)
*cas->rxok = ok;
return n;
}
if (waitc >= casec)
bug("select: waitv overflow");
_RecvSendWaiting *w = &waitv[waitc++];
w->init(g, ch);
w->pdata = cas->data;
w->ok = false;
w->sel_n = n;
list_add_tail(&w->in_rxtxq, &ch->_recvq);
}
// bad
else {
bug("select: invalid op during phase 2");
}
ch->_mu.unlock();
}
// wait for a case to become ready
g->wait();
ready:
if (g->which == &_sel_txrx_prepoll_won)
bug("select: woke up with g.which=_sel_txrx_prepoll_won");
const _RecvSendWaiting *sel = g->which;
int selected = sel->sel_n;
const _selcase *cas = &casev[selected];
if (cas->op == _CHANSEND) {
if (!sel->ok)
panic("send on closed channel");
return selected;
}
else if (cas->op == _CHANRECV) {
if (cas->rxok != NULL)
*cas->rxok = sel->ok;
return selected;
}
bug("select: selected case has invalid op");
}
// _blockforever blocks current goroutine forever.
void (*_tblockforever)() = NULL;
void _blockforever() {
if (_tblockforever != NULL)
_tblockforever();
// take a lock twice. It will forever block on the second lock attempt.
// Under gevent, similarly to Go, this raises "LoopExit: This operation
// would block forever", if there are no other greenlets scheduled to be run.
Sema dead;
dead.acquire();
dead.acquire();
bug("_blockforever: woken up");
}
// ---- for tests ----
// _tchanlenrecvqlen returns len(_ch._recvq)
int _tchanrecvqlen(_chan *_ch) {
int l = 0;
list_head *h;
_ch->_mu.lock();
list_for_each(h, &_ch->_recvq)
l++;
_ch->_mu.unlock();
return l;
}
// _tchanlensendqlen returns len(_ch._sendq)
int _tchansendqlen(_chan *_ch) {
int l = 0;
list_head *h;
_ch->_mu.lock();
list_for_each(h, &_ch->_sendq)
l++;
_ch->_mu.unlock();
return l;
}
} // golang:: } // golang::
......
...@@ -20,15 +20,265 @@ ...@@ -20,15 +20,265 @@
// Test that exercises C++-level libgolang.h API and functionality. // Test that exercises C++-level libgolang.h API and functionality.
#include "golang/libgolang.h" #include "golang/libgolang.h"
#include <stdio.h>
#include <tuple>
#include <utility>
using namespace golang; using namespace golang;
using std::function;
using std::move;
using std::tie;
#define __STR(X) #X
#define STR(X) __STR(X)
#define ASSERT(COND) do { \
if (!(COND)) \
panic(__FILE__ ":" STR(__LINE__) " assert `" #COND "` failed"); \
} while(0)
// verify chan<T> automatic reference counting.
void _test_chan_cpp_refcount() {
chan<int> ch;
ASSERT(ch == NULL);
ASSERT(!(ch != NULL));
ASSERT(ch._rawchan() == NULL);
ch = makechan<int>();
ASSERT(!(ch == NULL));
ASSERT(ch != NULL);
ASSERT(ch._rawchan() != NULL);
_chan *_ch = ch._rawchan();
ASSERT(_chanrefcnt(_ch) == 1);
// copy ctor
{
chan<int> ch2(ch);
ASSERT(ch2._rawchan() == _ch);
ASSERT(_chanrefcnt(_ch) == 2);
ASSERT(ch2 == ch);
ASSERT(!(ch2 != ch));
// ch2 goes out of scope, decref'ing via ~chan
}
ASSERT(_chanrefcnt(_ch) == 1);
// copy =
{
chan<int> ch2;
ASSERT(ch2 == NULL);
ASSERT(ch2._rawchan() == NULL);
ch2 = ch;
ASSERT(ch2._rawchan() == _ch);
ASSERT(_chanrefcnt(_ch) == 2);
ASSERT(ch2 == ch);
ASSERT(!(ch2 != ch));
ch2 = NULL;
ASSERT(ch2 == NULL);
ASSERT(ch2._rawchan() == NULL);
ASSERT(_chanrefcnt(_ch) == 1);
ASSERT(!(ch2 == ch));
ASSERT(ch2 != ch);
}
ASSERT(_chanrefcnt(_ch) == 1);
// move ctor
chan<int> ch2(move(ch));
ASSERT(ch == NULL);
ASSERT(ch._rawchan() == NULL);
ASSERT(ch2 != NULL);
ASSERT(ch2._rawchan() == _ch);
ASSERT(_chanrefcnt(_ch) == 1);
// move =
ch = move(ch2);
ASSERT(ch != NULL);
ASSERT(ch._rawchan() == _ch);
ASSERT(ch2 == NULL);
ASSERT(ch2._rawchan() == NULL);
ASSERT(_chanrefcnt(_ch) == 1);
// ch goes out of scope and destroys raw channel
}
// verify chan<T> IO.
struct Point {
int x, y;
};
void _test_chan_cpp() {
chan<structZ> done = makechan<structZ>();
chan<int> chi = makechan<int>(1);
chan<Point> chp = makechan<Point>(); chp = NULL;
int i, j, _;
Point p;
bool jok;
i=+1; chi.send(i);
j=-1; j = chi.recv();
if (j != i)
panic("send -> recv != I");
i = 2;
_ = select({
done.recvs(), // 0
chi.sends(&i), // 1
chp.recvs(&p), // 2
chi.recvs(&j, &jok), // 3
_default, // 4
});
if (_ != 1)
panic("select: selected !1");
tie(j, jok) = chi.recv_();
if (!(j == 2 && jok == true))
panic("recv_ != (2, true)");
chi.close();
tie(j, jok) = chi.recv_();
if (!(j == 0 && jok == false))
panic("recv_ from closed != (0, false)");
// XXX chan<chan> is currently TODO
//chan<chan<int>> zzz;
}
// waitBlocked waits until either a receive (if rx) or send (if tx) operation
// blocks waiting on the channel.
void waitBlocked(_chan *ch, bool rx, bool tx) {
if (ch == NULL)
panic("wait blocked: called on nil channel");
double t0 = time::now();
while (1) {
if (rx && (_tchanrecvqlen(ch) != 0))
return;
if (tx && (_tchansendqlen(ch) != 0))
return;
double now = time::now();
if (now-t0 > 10) // waited > 10 seconds - likely deadlock
panic("deadlock");
time::sleep(0); // yield to another thread / coroutine
}
}
template<typename T> void waitBlocked_RX(chan<T> ch) {
waitBlocked(ch._rawchan(), /*rx=*/true, /*tx=*/0);
}
template<typename T> void waitBlocked_TX(chan<T> ch) {
waitBlocked(ch._rawchan(), /*rx=*/0, /*tx=*/true);
}
// usestack_and_call pushes C-stack down and calls f from that.
// C-stack pushdown is used to make sure that when f will block and switched
// to another g, greenlet will save f's C-stack frame onto heap.
//
// --- ~~~
// stack of another g
// --- ~~~
//
// .
// .
// .
//
// f -> heap
static void usestack_and_call(function<void()> f, int nframes=128) {
if (nframes == 0) {
f();
return;
}
return usestack_and_call(f, nframes-1);
}
// verify that send/recv/select correctly route their onstack arguments through onheap proxies.
void _test_chan_vs_stackdeadwhileparked() {
// problem: under greenlet g's stack lives on system stack and is swapped as needed
// onto heap and back on g switch. This way if e.g. recv() is called with
// prx pointing to stack, and the stack is later copied to heap and replaced
// with stack of another g, the sender, if writing to original prx directly,
// will write to stack of different g, and original recv g, after wakeup,
// will see unchanged memory - with stack content that was saved to heap.
//
// to avoid this, send/recv/select create onheap proxies for onstack
// arguments and use those proxies as actual argument for send/receive.
// recv
auto ch = makechan<int>();
go([&]() {
waitBlocked_RX(ch);
usestack_and_call([&]() {
ch.send(111);
});
});
usestack_and_call([&]() {
int rx = ch.recv();
if (rx != 111)
panic("recv(111) != 111");
});
// send
auto done = makechan<structZ>();
go([&]() {
waitBlocked_TX(ch);
usestack_and_call([&]() {
int rx = ch.recv();
if (rx != 222)
panic("recv(222) != 222");
});
done.close();
});
usestack_and_call([&]() {
ch.send(222);
});
done.recv();
// select(recv)
go([&]() {
waitBlocked_RX(ch);
usestack_and_call([&]() {
ch.send(333);
});
});
usestack_and_call([&]() {
int rx = 0;
int _ = select({ch.recvs(&rx)});
if (_ != 0)
panic("select(recv, 333): selected !0");
if (rx != 333)
panic("select(recv, 333): recv != 333");
});
// select(send)
done = makechan<structZ>();
go([&]() {
waitBlocked_TX(ch);
usestack_and_call([&]() {
int rx = ch.recv();
if (rx != 444)
panic("recv(444) != 444");
});
done.close();
});
usestack_and_call([&]() {
int tx = 444;
int _ = select({ch.sends(&tx)});
if (_ != 0)
panic("select(send, 444): selected !0");
});
done.recv();
}
// small test to verify C++ go. // small test to verify C++ go.
static void _work(int i); static void _work(int i, chan<structZ> done);
void _test_go_cpp() { void _test_go_cpp() {
go(_work, 111); // not λ to test that go correctly passes arguments auto done = makechan<structZ>();
// TODO wait till _work is done go(_work, 111, done); // not λ to test that go correctly passes arguments
done.recv();
} }
static void _work(int i) { static void _work(int i, chan<structZ> done) {
if (i != 111) if (i != 111)
panic("_work: i != 111"); panic("_work: i != 111");
done.close();
} }
...@@ -26,20 +26,69 @@ ...@@ -26,20 +26,69 @@
#include "golang/libgolang.h" #include "golang/libgolang.h"
#include <stdlib.h> #include <stdlib.h>
typedef struct Point {
int x, y;
} Point;
void _test_chan_c(void) {
_chan *done = _makechan(0, 0);
_chan *chi = _makechan(sizeof(int), 1);
_chan *chp = NULL;
int i, j, _;
Point p;
bool jok;
i=+1; _chansend(chi, &i);
j=-1; _chanrecv(chi, &j);
if (j != i)
panic("send -> recv != I");
i = 2;
_selcase sel[5];
sel[0] = _selrecv(done, NULL);
sel[1] = _selsend(chi, &i);
sel[2] = _selrecv(chp, &p);
sel[3] = _selrecv_(chi, &j, &jok);
sel[4] = _default;
_ = _chanselect(sel, 5);
if (_ != 1)
panic("select: selected !1");
jok = _chanrecv_(chi, &j);
if (!(j == 2 && jok == true))
panic("recv_ != (2, true)");
_chanclose(chi);
jok = _chanrecv_(chi, &j);
if (!(j == 0 && jok == false))
panic("recv_ from closed != (0, false)");
_chanxdecref(done);
_chanxdecref(chi);
_chanxdecref(chp);
}
// small test to verify C _taskgo. // small test to verify C _taskgo.
struct _work_arg{int i;}; struct _work_arg{int i; _chan *done;};
static void _work(void *); static void _work(void *);
void _test_go_c(void) { void _test_go_c(void) {
_chan *done = _makechan(0,0);
if (done == NULL)
panic("_makechan -> failed");
struct _work_arg *_ = malloc(sizeof(*_)); struct _work_arg *_ = malloc(sizeof(*_));
if (_ == NULL) if (_ == NULL)
panic("malloc _work_arg -> failed"); panic("malloc _work_arg -> failed");
_->i = 111; _->i = 111;
_->done = done;
_taskgo(_work, _); _taskgo(_work, _);
// TODO wait till _work is done _chanrecv(done, NULL);
_chanxdecref(done);
} }
static void _work(void *__) { static void _work(void *__) {
struct _work_arg *_ = (struct _work_arg *)__; struct _work_arg *_ = (struct _work_arg *)__;
if (_->i != 111) if (_->i != 111)
panic("_work: i != 111"); panic("_work: i != 111");
_chanclose(_->done);
free(_); free(_);
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment