Commit ae658982 authored by Gwenael Samain's avatar Gwenael Samain

lwan demo

parent ee853e23
from libcpp.deque cimport deque
from lwan_coro cimport scheduler_t, scheduler_init, scheduler_coro_add, scheduler_coro_add_runtime, scheduler_run, async_scheduler_run, scheduler_destroy, async_scheduler_destroy, coro_yield_value, coro_yield, coro_t
from libc.stdio cimport printf
cdef extern from "<unistd.h>" nogil:
int sleep(int sec)
ctypedef deque[ActhonMessageInterface] message_queue_t
cdef cypclass BasicQueue(ActhonQueueInterface) checklock:
message_queue_t* _queue
__init__(self):
self._queue = new message_queue_t()
__dealloc__(self):
with gil:
print("Deallocating queue")
del self._queue
bint is_empty(self) const:
return self._queue.empty()
void push(self, ActhonMessageInterface message):
# lock queue
self._queue.push_back(message)
if message._sync_method is not NULL:
message._sync_method.insertActivity(message)
# incref message
Cy_INCREF(message)
# unlock queue
bint activate(self):
cdef bint one_message_processed
# lock queue
if self._queue.empty():
#with gil:
# print("Empty queue")
return False
# Note here that according to Cython refcount conventions,
# the front() method should have returned a new ref.
# This is obviously not the case, so if we do nothing
# we will, at the end of this function, loose a ref on the pointed object
# (as we will decref the thing pointed by next_message).
next_message = self._queue.front()
self._queue.pop_front()
one_message_processed = next_message.activate()
if one_message_processed:
#with gil:
# print("Top message processed, reporting to result class & deallocating")
if next_message._sync_method is not NULL:
with wlocked next_message._sync_method:
next_message._sync_method.removeActivity(next_message)
#del next_message
else:
#with gil:
# print("Top message could not be processed, enqueueing back")
# Could not process yet (because of sync constraints) => re-enqueue
self._queue.push_back(next_message)
# Don't forget to incref to avoid premature deallocation
Cy_INCREF(next_message)
# unlock queue
return one_message_processed
cdef cypclass BasicResult(ActhonResultInterface) nolock:
union result_t:
int int_val
void* ptr
result_t result
bint result_pushed
@staticmethod
ActhonResultInterface construct():
return BasicResult()
__init__(self):
self.result_pushed = False
self.result.ptr = NULL
void pushVoidStarResult(self, void* result):
# lock self
#with wlocked self:
self.result.ptr = result
self.result_pushed = True
# unlock self
void pushIntResult(self, int result):
# lock self
#with wlocked self:
self.result.int_val = result
self.result_pushed = True
# unlock self
result_t _getRawResult(self):
cdef result_t res
while not self.result_pushed:
# wait in a cool manner, busy loop is horrible
sleep(1)
#with gil:
# print("No value yet !")
# We must be careful here. Due to out-of-order execution,
# the thread doing push_result could have modified self.result_pushed before
# assigning self.result to the correct value.
# To be sure everything is OK, we must have a barrier of some kind.
# Taking the lock can be this kind of barrier (but it's clearly overkill)
# lock self
with rlocked self:
res = self.result
# unlock self
return res
void* getVoidStarResult(self):
res = self._getRawResult()
return res.ptr
int getIntResult(self):
res = self._getRawResult()
return res.int_val
cdef cypclass ActivityCounterSync(ActhonSyncInterface) nolock:
int count
ActivityCounterSync previous_sync
__init__(self, ActivityCounterSync prev = <ActivityCounterSync> NULL):
self.count = 0
self.previous_sync = prev
void insertActivity(self, ActhonMessageInterface msg):
# lock self
#with wlocked self:
self.count += 1
# unlock self
void removeActivity(self, ActhonMessageInterface msg):
# lock self
#with wlocked self:
self.count -= 1
# unlock self
bint isCompleted(self):
cdef bint result
# lock self
#with rlocked self:
result = self.count == 0
# unlock self
return result
bint isActivable(self):
cdef bint res
if self.previous_sync is not NULL:
with rlocked self.previous_sync:
res = self.previous_sync.isCompleted()
else:
res = True
return res
cdef cypclass A nolock activable:
int a
void foo(self):
printf("Foo call with object number %d\n", self.a)
void bar(self):
printf("Bar call with object number %d\n", self.a)
int baz(self):
printf("Baz call with object number %d\n", self.a)
# Compute square
return self.multiply(self.a)
int multiply(self, int arg):
return self.a*arg
int modulo_add(self, int arg, int modulo = 10):
return (self.a + arg) % modulo
cdef cypclass B(A) activable:
int add(self, int arg):
return self.a + arg
cdef ActhonResultInterface take_activated_object(A.Activated actor) nogil:
return actor.multiply(<ActhonSyncInterface> NULL, 4)
cdef void set_A_attrs(A obj):
obj._active_result_class = BasicResult.construct
obj._active_queue_class = BasicQueue()
cdef int process_queue(coro_t *coroutine, void *arg) nogil:
queue = <BasicQueue> arg
cdef bint remaining_messages = True
cdef bint res = False
while remaining_messages:
with wlocked queue:
res = queue.activate()
if not res:
remaining_messages = not queue.is_empty()
printf("Worker, yielding\n")
coro_yield(coroutine, coro_yield_value.MAY_RESUME)
del queue
coro_yield(coroutine, coro_yield_value.FINISHED)
cpdef void main():
cdef scheduler_t scheduler
cdef A.Activated actor
cdef ActivityCounterSync sync = <ActivityCounterSync> NULL
scheduler_init(&scheduler, num_workers=2)
#sync = ActivityCounterSync(sync)
for i in range(4):
prev_sync = sync
sync = ActivityCounterSync(prev_sync)
obj = A()
obj.a = i
set_A_attrs(obj)
scheduler_coro_add(&scheduler, process_queue, <void*>obj._active_queue_class)
# Now populate the queue with some messages
actor = obj.__activate__()
actor.foo(sync)
actor.bar(prev_sync)
# Useful ?
#Cy_INCREF(prev_sync)
# So bar will be fired before foo
res = actor.baz(<ActivityCounterSync> NULL)
scheduler_run(&scheduler)
scheduler_destroy(&scheduler)
print(<int> res)
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment