Commit 168c4570 authored by Gwenael Samain's avatar Gwenael Samain

External acthon lib (as a .pxi because cython is confused with c++ methods accross shared libs)

parent ae658982
from libcpp.deque cimport deque
from lwan_coro cimport scheduler_t, scheduler_init, scheduler_coro_add, scheduler_coro_add_runtime, scheduler_run, async_scheduler_run, scheduler_destroy, async_scheduler_destroy, coro_yield_value, coro_yield, coro_t
from libc.stdio cimport printf
cdef extern from "<unistd.h>" nogil:
int sleep(int sec)
ctypedef deque[ActhonMessageInterface] message_queue_t
cdef cypclass BasicQueue(ActhonQueueInterface) checklock:
message_queue_t* _queue
__init__(self):
self._queue = new message_queue_t()
__dealloc__(self):
with gil:
print("Deallocating queue")
del self._queue
bint is_empty(self) const:
return self._queue.empty()
void push(self, ActhonMessageInterface message):
# lock queue
self._queue.push_back(message)
if message._sync_method is not NULL:
message._sync_method.insertActivity(message)
# incref message
Cy_INCREF(message)
# unlock queue
bint activate(self):
cdef bint one_message_processed
# lock queue
if self._queue.empty():
#with gil:
# print("Empty queue")
return False
# Note here that according to Cython refcount conventions,
# the front() method should have returned a new ref.
# This is obviously not the case, so if we do nothing
# we will, at the end of this function, loose a ref on the pointed object
# (as we will decref the thing pointed by next_message).
next_message = self._queue.front()
self._queue.pop_front()
one_message_processed = next_message.activate()
if one_message_processed:
#with gil:
# print("Top message processed, reporting to result class & deallocating")
if next_message._sync_method is not NULL:
with wlocked next_message._sync_method:
next_message._sync_method.removeActivity(next_message)
#del next_message
else:
#with gil:
# print("Top message could not be processed, enqueueing back")
# Could not process yet (because of sync constraints) => re-enqueue
self._queue.push_back(next_message)
# Don't forget to incref to avoid premature deallocation
Cy_INCREF(next_message)
# unlock queue
return one_message_processed
cdef cypclass BasicResult(ActhonResultInterface) nolock:
union result_t:
int int_val
void* ptr
result_t result
bint result_pushed
@staticmethod
ActhonResultInterface construct():
return BasicResult()
__init__(self):
self.result_pushed = False
self.result.ptr = NULL
void pushVoidStarResult(self, void* result):
# lock self
#with wlocked self:
self.result.ptr = result
self.result_pushed = True
# unlock self
void pushIntResult(self, int result):
# lock self
#with wlocked self:
self.result.int_val = result
self.result_pushed = True
# unlock self
result_t _getRawResult(self):
cdef result_t res
while not self.result_pushed:
# wait in a cool manner, busy loop is horrible
sleep(1)
#with gil:
# print("No value yet !")
# We must be careful here. Due to out-of-order execution,
# the thread doing push_result could have modified self.result_pushed before
# assigning self.result to the correct value.
# To be sure everything is OK, we must have a barrier of some kind.
# Taking the lock can be this kind of barrier (but it's clearly overkill)
# lock self
with rlocked self:
res = self.result
# unlock self
return res
void* getVoidStarResult(self):
res = self._getRawResult()
return res.ptr
int getIntResult(self):
res = self._getRawResult()
return res.int_val
cdef cypclass ActivityCounterSync(ActhonSyncInterface) nolock:
int count
ActivityCounterSync previous_sync
__init__(self, ActivityCounterSync prev = <ActivityCounterSync> NULL):
self.count = 0
self.previous_sync = prev
void insertActivity(self, ActhonMessageInterface msg):
# lock self
#with wlocked self:
self.count += 1
# unlock self
void removeActivity(self, ActhonMessageInterface msg):
# lock self
#with wlocked self:
self.count -= 1
# unlock self
bint isCompleted(self):
cdef bint result
# lock self
#with rlocked self:
result = self.count == 0
# unlock self
return result
bint isActivable(self):
cdef bint res
if self.previous_sync is not NULL:
with rlocked self.previous_sync:
res = self.previous_sync.isCompleted()
else:
res = True
return res
include "minimal_lib.pxi"
cdef cypclass A nolock activable:
int a
......@@ -185,10 +27,10 @@ cdef cypclass B(A) activable:
return self.a + arg
cdef ActhonResultInterface take_activated_object(A.Activated actor) nogil:
return actor.multiply(<ActhonSyncInterface> NULL, 4)
return actor.multiply(NULL, 4)
cdef void set_A_attrs(A obj):
obj._active_result_class = BasicResult.construct
obj._active_result_class = WaitResult.construct
obj._active_queue_class = BasicQueue()
......@@ -209,7 +51,7 @@ cdef int process_queue(coro_t *coroutine, void *arg) nogil:
cpdef void main():
cdef scheduler_t scheduler
cdef A.Activated actor
cdef ActivityCounterSync sync = <ActivityCounterSync> NULL
cdef ActivityCounterSync sync = NULL
scheduler_init(&scheduler, num_workers=2)
......@@ -230,7 +72,8 @@ cpdef void main():
#Cy_INCREF(prev_sync)
# So bar will be fired before foo
res = actor.baz(<ActivityCounterSync> NULL)
res = actor.baz(NULL)
scheduler_run(&scheduler)
scheduler_destroy(&scheduler)
print(<int> res)
\ No newline at end of file
with rlocked res:
print(<int> res)
\ No newline at end of file
from libcpp.deque cimport deque
ctypedef deque[ActhonMessageInterface] message_queue_t
cdef extern from "<semaphore.h>" nogil:
ctypedef int sem_t
int sem_init(sem_t *sem, int pshared, unsigned int value)
int sem_wait(sem_t *sem)
int sem_post(sem_t *sem)
int sem_destroy(sem_t* sem)
cdef cypclass BasicQueue(ActhonQueueInterface) checklock:
message_queue_t* _queue
__init__(self):
self._queue = new message_queue_t()
__dealloc__(self):
del self._queue
bint is_empty(self) const:
return self._queue.empty()
void push(self, ActhonMessageInterface message):
self._queue.push_back(message)
if message._sync_method is not NULL:
message._sync_method.insertActivity(message)
Cy_INCREF(message)
bint activate(self):
cdef bint one_message_processed
if self._queue.empty():
return False
# Note here that according to Cython refcount conventions,
# the front() method should have returned a new ref.
# This is obviously not the case, so if we do nothing
# we will, at the end of this function, loose a ref on the pointed object
# (as we will decref the thing pointed by next_message).
next_message = self._queue.front()
self._queue.pop_front()
one_message_processed = next_message.activate()
if one_message_processed:
if next_message._sync_method is not NULL:
with wlocked next_message._sync_method:
next_message._sync_method.removeActivity(next_message)
else:
self._queue.push_back(next_message)
# Don't forget to incref to avoid premature deallocation
Cy_INCREF(next_message)
return one_message_processed
cdef cypclass NoneResult(ActhonResultInterface) checklock:
void pushVoidStarResult(self, void* result):
pass
void pushIntResult(self, int result):
pass
void* getVoidStarResult(self) const:
return NULL
int getIntResult(self) const:
return 0
cdef cypclass WaitResult(ActhonResultInterface) checklock:
union result_t:
int int_val
void* ptr
result_t result
sem_t semaphore
__init__(self):
self.result.ptr = NULL
sem_init(&self.semaphore, 0, 0)
__dealloc__(self):
sem_destroy(&self.semaphore)
@staticmethod
ActhonResultInterface construct():
return WaitResult()
void pushVoidStarResult(self, void* result):
self.result.ptr = result
sem_post(&self.semaphore)
void pushIntResult(self, int result):
self.result.int_val = result
sem_post(&self.semaphore)
result_t _getRawResult(self) const:
# We must ensure a result exists, but we can let others access it immediately
# The cast here is a way of const-casting (we're modifying the semaphore in a const method)
sem_wait(<sem_t*> &self.semaphore)
sem_post(<sem_t*> &self.semaphore)
return self.result
void* getVoidStarResult(self) const:
res = self._getRawResult()
return res.ptr
int getIntResult(self) const:
res = self._getRawResult()
return res.int_val
cdef cypclass SimpleResult(ActhonResultInterface) nolock:
"""
This is a wrapper around BasicResult that is handling locking the underlying object for each function call.
Consequently this is a way to emulate autolock mode without the deadlock issues this mode has with acthon.
If you have many calls to this object, things can be slower (you're locking / unlocking for each call)
Moreover, the cython-generated part uses this to push the result of an asynchronous execution
of a function. This cython-generated will lock and unlock: it has no way to guess
the result interface you're providing doesn't need locking. So, for the result pushing,
you'll have write locking and unlocking on the BasicResult object AND on the whole SimpleResult object.
So this is simpler to use, but definitely slower than BasicResult.
"""
WaitResult member
__init__(self):
self.member = WaitResult()
@staticmethod
ActhonResultInterface construct():
return SimpleResult()
void pushVoidStarResult(self, void* result):
with wlocked self.member:
self.member.pushVoidStarResult(result)
void pushIntResult(self, int result):
with wlocked self.member:
self.member.pushIntResult(result)
void* getVoidStarResult(self) const:
cdef void* res
with rlocked self.member:
res = self.member.getVoidStarResult()
return res
int getIntResult(self) const:
cdef int res
with rlocked self.member:
res = self.member.getIntResult()
return res
cdef cypclass ActivityCounterSync(ActhonSyncInterface) checklock:
int count
ActivityCounterSync previous_sync
__init__(self, ActivityCounterSync prev = <ActivityCounterSync> NULL):
self.count = 0
self.previous_sync = prev
void insertActivity(self, ActhonMessageInterface msg):
self.count += 1
void removeActivity(self, ActhonMessageInterface msg):
self.count -= 1
bint isCompleted(self) const:
return self.count == 0
bint isActivable(self) const:
cdef bint res = True
if self.previous_sync is not NULL:
with rlocked self.previous_sync:
res = self.previous_sync.isCompleted()
return res
cdef cypclass BasicActivableClass autolock activable:
__init__(self):
self._active_queue_class = BasicQueue()
self._active_result_class = SimpleResult.construct
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment