Commit 1ad3c2d5 authored by Kirill Smelkov's avatar Kirill Smelkov

sync += RWMutex

Provide sync.RWMutex that can be useful for cases when there are
multiple simultaneous readers and more seldom writer(s).

This implements readers-writer mutex with preference for writers
similarly to Go version.
parent 36ab859c
...@@ -338,7 +338,8 @@ handle concurrency in structured ways: ...@@ -338,7 +338,8 @@ handle concurrency in structured ways:
- |golang.sync|_ (py__, pyx__) provides `sync.WorkGroup` to spawn group of goroutines working - |golang.sync|_ (py__, pyx__) provides `sync.WorkGroup` to spawn group of goroutines working
on a common task. It also provides low-level primitives - for example on a common task. It also provides low-level primitives - for example
`sync.Once`, `sync.WaitGroup` and `sync.Mutex` - that are sometimes useful too. `sync.Once`, `sync.WaitGroup`, `sync.Mutex` and `sync.RWMutex` - that are
sometimes useful too.
.. |golang.sync| replace:: `golang.sync` .. |golang.sync| replace:: `golang.sync`
.. _golang.sync: https://lab.nexedi.com/kirr/pygolang/tree/master/golang/sync.h .. _golang.sync: https://lab.nexedi.com/kirr/pygolang/tree/master/golang/sync.h
......
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
- `WorkGroup` allows to spawn group of goroutines working on a common task(*). - `WorkGroup` allows to spawn group of goroutines working on a common task(*).
- `Once` allows to execute an action only once. - `Once` allows to execute an action only once.
- `WaitGroup` allows to wait for a collection of tasks to finish. - `WaitGroup` allows to wait for a collection of tasks to finish.
- `Sema`(*) and `Mutex` provide low-level synchronization. - `Sema`(*), `Mutex` and `RWMutex` provide low-level synchronization.
See also https://golang.org/pkg/sync for Go sync package documentation. See also https://golang.org/pkg/sync for Go sync package documentation.
...@@ -44,6 +44,12 @@ cdef extern from "golang/sync.h" namespace "golang::sync" nogil: ...@@ -44,6 +44,12 @@ cdef extern from "golang/sync.h" namespace "golang::sync" nogil:
void lock() void lock()
void unlock() void unlock()
cppclass RWMutex:
void Lock()
void Unlock()
void RLock()
void RUnlock()
cppclass Once: cppclass Once:
void do "do_" (...) # ... = func<void()> void do "do_" (...) # ... = func<void()>
......
...@@ -75,6 +75,40 @@ cdef class PyMutex: ...@@ -75,6 +75,40 @@ cdef class PyMutex:
pymu.unlock() pymu.unlock()
@final
cdef class PyRWMutex:
cdef RWMutex mu
# FIXME cannot catch/pyreraise panic of .mu ctor
# https://github.com/cython/cython/issues/3165
def Lock(PyRWMutex pymu):
with nogil:
rwmutex_lock_pyexc(&pymu.mu)
def Unlock(PyRWMutex pymu):
# NOTE nogil needed for unlock since RWMutex _locks_ internal mu even in unlock
with nogil:
rwmutex_unlock_pyexc(&pymu.mu)
def RLock(PyRWMutex pymu):
with nogil:
rwmutex_rlock_pyexc(&pymu.mu)
def RUnlock(PyRWMutex pymu):
# NOTE nogil needed for runlock (see ^^^)
with nogil:
rwmutex_runlock_pyexc(&pymu.mu)
# with support (write by default)
__enter__ = Lock
def __exit__(PyRWMutex pymu, exc_typ, exc_val, exc_tb):
pymu.Unlock()
# TODO .RLocker() that returns X : X.Lock() -> .RLock() and for unlock correspondingly ?
# TODO then `with mu.RLocker()` would mean "with read lock".
@final @final
cdef class PyOnce: cdef class PyOnce:
"""Once allows to execute an action only once. """Once allows to execute an action only once.
...@@ -244,6 +278,15 @@ cdef nogil: ...@@ -244,6 +278,15 @@ cdef nogil:
void mutexunlock_pyexc(Mutex *mu) except +topyexc: void mutexunlock_pyexc(Mutex *mu) except +topyexc:
mu.unlock() mu.unlock()
void rwmutex_lock_pyexc(RWMutex *mu) except +topyexc:
mu.Lock()
void rwmutex_unlock_pyexc(RWMutex *mu) except +topyexc:
mu.Unlock()
void rwmutex_rlock_pyexc(RWMutex *mu) except +topyexc:
mu.RLock()
void rwmutex_runlock_pyexc(RWMutex *mu) except +topyexc:
mu.RUnlock()
void waitgroup_done_pyexc(WaitGroup *wg) except +topyexc: void waitgroup_done_pyexc(WaitGroup *wg) except +topyexc:
wg.done() wg.done()
void waitgroup_add_pyexc(WaitGroup *wg, int delta) except +topyexc: void waitgroup_add_pyexc(WaitGroup *wg, int delta) except +topyexc:
......
...@@ -26,6 +26,95 @@ ...@@ -26,6 +26,95 @@
namespace golang { namespace golang {
namespace sync { namespace sync {
// RWMutex
RWMutex::RWMutex() {
RWMutex& mu = *this;
mu._wakeupq = makechan<structZ>();
mu._nread_active = 0;
mu._nwrite_waiting = 0;
mu._write_active = false;
}
RWMutex::~RWMutex() {}
// RWMutex implementation is based on
// https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock#Using_a_condition_variable_and_a_mutex
// but a channel ._wakeupq is used instead of condition variable.
// _wakeup_all simulates broadcast cond notification by waking up all current
// waiters and reallocating ._wakeupq for next round of queued waiters and
// wakeup.
//
// Must be called under ._g locked.
void RWMutex::_wakeup_all() {
RWMutex& mu = *this;
mu._wakeupq.close();
mu._wakeupq = makechan<structZ>();
}
void RWMutex::RLock() {
RWMutex& mu = *this;
mu._g.lock();
while (mu._nwrite_waiting > 0 || mu._write_active) {
chan<structZ> wakeupq = mu._wakeupq;
mu._g.unlock();
wakeupq.recv();
mu._g.lock();
}
mu._nread_active++;
mu._g.unlock();
}
void RWMutex::RUnlock() {
RWMutex& mu = *this;
mu._g.lock();
if (mu._nread_active <= 0) {
mu._g.unlock();
panic("sync: RUnlock of unlocked RWMutex");
}
mu._nread_active--;
if (mu._nread_active == 0)
mu._wakeup_all();
mu._g.unlock();
}
void RWMutex::Lock() {
RWMutex& mu = *this;
mu._g.lock();
mu._nwrite_waiting++;
while (mu._nread_active > 0 || mu._write_active) {
chan<structZ> wakeupq = mu._wakeupq;
mu._g.unlock();
wakeupq.recv();
mu._g.lock();
}
mu._nwrite_waiting--;
mu._write_active = true;
mu._g.unlock();
}
void RWMutex::Unlock() {
RWMutex& mu = *this;
mu._g.lock();
if (!mu._write_active) {
mu._g.unlock();
panic("sync: Unlock of unlocked RWMutex");
}
mu._write_active = false;
mu._wakeup_all();
mu._g.unlock();
}
// Once // Once
Once::Once() { Once::Once() {
Once *once = this; Once *once = this;
......
...@@ -25,7 +25,7 @@ ...@@ -25,7 +25,7 @@
// - `WorkGroup` allows to spawn group of goroutines working on a common task(*). // - `WorkGroup` allows to spawn group of goroutines working on a common task(*).
// - `Once` allows to execute an action only once. // - `Once` allows to execute an action only once.
// - `WaitGroup` allows to wait for a collection of tasks to finish. // - `WaitGroup` allows to wait for a collection of tasks to finish.
// - `Sema`(*) and `Mutex` provide low-level synchronization. // - `Sema`(*), `Mutex` and `RWMutex` provide low-level synchronization.
// //
// See also https://golang.org/pkg/sync for Go sync package documentation. // See also https://golang.org/pkg/sync for Go sync package documentation.
// //
...@@ -102,6 +102,32 @@ private: ...@@ -102,6 +102,32 @@ private:
Mutex(Mutex&&); // don't move Mutex(Mutex&&); // don't move
}; };
// RWMutex provides readers-writer mutex with preference for writers.
//
// https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock .
class RWMutex {
Mutex _g;
chan<structZ> _wakeupq; // closed & recreated every time to wakeup all waiters
int _nread_active; // number of readers holding the lock
int _nwrite_waiting; // number of writers waiting for the lock
bool _write_active; // whether a writer is holding the lock
public:
LIBGOLANG_API RWMutex();
LIBGOLANG_API ~RWMutex();
LIBGOLANG_API void Lock();
LIBGOLANG_API void Unlock();
LIBGOLANG_API void RLock();
LIBGOLANG_API void RUnlock();
private:
void _wakeup_all();
RWMutex(const RWMutex&); // don't copy
RWMutex(RWMutex&&); // don't move
};
// Once allows to execute an action only once. // Once allows to execute an action only once.
// //
// For example: // For example:
......
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
- `WorkGroup` allows to spawn group of goroutines working on a common task(*). - `WorkGroup` allows to spawn group of goroutines working on a common task(*).
- `Once` allows to execute an action only once. - `Once` allows to execute an action only once.
- `WaitGroup` allows to wait for a collection of tasks to finish. - `WaitGroup` allows to wait for a collection of tasks to finish.
- `Sema`(*) and `Mutex` provide low-level synchronization. - `Sema`(*), `Mutex` and `RWMutex` provide low-level synchronization.
See also https://golang.org/pkg/sync for Go sync package documentation. See also https://golang.org/pkg/sync for Go sync package documentation.
...@@ -36,6 +36,7 @@ from __future__ import print_function, absolute_import ...@@ -36,6 +36,7 @@ from __future__ import print_function, absolute_import
from golang._sync import \ from golang._sync import \
PySema as Sema, \ PySema as Sema, \
PyMutex as Mutex, \ PyMutex as Mutex, \
PyRWMutex as RWMutex, \
PyOnce as Once, \ PyOnce as Once, \
PyWaitGroup as WaitGroup, \ PyWaitGroup as WaitGroup, \
PyWorkGroup as WorkGroup PyWorkGroup as WorkGroup
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright (C) 2019 Nexedi SA and Contributors. # Copyright (C) 2019-2020 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com> # Kirill Smelkov <kirr@nexedi.com>
# #
# This program is free software: you can Use, Study, Modify and Redistribute # This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your # it under the terms of the GNU General Public License version 3, or (at your
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
from __future__ import print_function, absolute_import from __future__ import print_function, absolute_import
from golang import go, chan from golang import go, chan, select, default
from golang import sync, context, time from golang import sync, context, time
from pytest import raises from pytest import raises
from golang.golang_test import import_pyx_tests, panics from golang.golang_test import import_pyx_tests, panics
...@@ -61,8 +61,84 @@ def _test_mutex(mu, lock, unlock): ...@@ -61,8 +61,84 @@ def _test_mutex(mu, lock, unlock):
done.recv() done.recv()
assert l == ['c', 'd'] assert l == ['c', 'd']
def test_mutex(): _test_mutex(sync.Mutex(), 'lock', 'unlock') def test_mutex(): _test_mutex(sync.Mutex(), 'lock', 'unlock')
def test_sema(): _test_mutex(sync.Sema(), 'acquire', 'release') def test_sema(): _test_mutex(sync.Sema(), 'acquire', 'release')
def test_rwmutex_basic(): _test_mutex(sync.RWMutex(), 'Lock', 'Unlock')
def test_rwmutex():
mu = sync.RWMutex()
# Unlock without lock -> panic
# RUnlock without lock -> panic
with panics("sync: Unlock of unlocked RWMutex"): mu.Unlock()
with panics("sync: RUnlock of unlocked RWMutex"): mu.RUnlock()
# Lock vs Lock; was also tested in test_rwmutex_basic
mu.Lock()
l = []
done = chan()
def _():
mu.Lock()
l.append('b')
mu.Unlock()
done.close()
go(_)
time.sleep(1*dt)
l.append('a')
mu.Unlock()
done.recv()
assert l == ['a', 'b']
# Lock vs RLock
l = [] # accessed as R R R ... R W R R R ... R
Nr1 = 10 # Nreaders queued before W
Nr2 = 15 # Nreaders queued after W
mu.RLock()
locked = chan(Nr1+1+Nr2) # main <- R|W: mu locked
rcont = chan() # main -> R: continue
def R(): # readers
mu.RLock()
locked.send(('R', len(l)))
rcont.recv()
mu.RUnlock()
for i in range(Nr1):
go(R)
# make sure all Nr1 readers entered mu.RLock
for i in range(Nr1):
assert locked.recv() == ('R', 0)
# spawn W
def W(): # 1 writer
mu.Lock()
time.sleep(Nr2*dt) # give R2 readers more chance to call mu.RLock and run first
locked.send('W')
l.append('a')
mu.Unlock()
go(W)
# spawn more readers to verify that Lock has priority over RLock
time.sleep(1*dt) # give W more chance to call mu.Lock first
for i in range(Nr2):
go(R)
# release main rlock, make sure nor W nor more R are yet ready, and let all readers continue
time.sleep((1+1)*dt)
mu.RUnlock()
time.sleep(1*dt)
for i in range(100):
_, _rx = select(
default, # 0
locked.recv, # 1
)
assert _ == 0
rcont.close()
# W must get the lock first and all R2 readers only after it
assert locked.recv() == 'W'
for i in range(Nr2):
assert locked.recv() == ('R', 1)
# verify that sema.acquire can be woken up by sema.release not from the same # verify that sema.acquire can be woken up by sema.release not from the same
# thread which did the original sema.acquire. # thread which did the original sema.acquire.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment