Commit 07a67da7 authored by Gwenael Samain's avatar Gwenael Samain

Manual example: first try

parents
from libcpp.deque cimport deque
from posix.unistd cimport sleep
cdef extern from "Python.h":
void Cy_INCREF(cyobject o) nogil
cdef cypclass SyncInterface nolock:
bint isActivable(self):
pass
bint isCompleted(self):
pass
void insert_activity(self):
pass
void remove_activity(self):
pass
cdef cypclass MessageInterface nolock:
SyncInterface _sync_method
bint activate(self):
pass
cdef cypclass QueueInterface nolock:
void push(self, MessageInterface message):
pass
bint activate(self):
pass
cdef cypclass ResultInterface nolock:
void pushResult(self, void* result):
pass
void* getResult(self):
pass
void pushIntResult(self, int result):
pass
int getIntResult(self):
pass
#int __int__(self):
# pass
ctypedef deque[MessageInterface] message_queue_t
cdef cypclass BasicQueue(QueueInterface) nolock:
message_queue_t* _queue
__init__(self):
self._queue = new message_queue_t()
#pass
__dealloc__(self):
with gil:
print("Deallocating queue")
del self._queue
void push(self, MessageInterface message):
# lock queue
self._queue.push_back(message)
if message._sync_method is not NULL:
# prototype must change
message._sync_method.insert_activity()
# incref message
Cy_INCREF(message)
# unlock queue
bint activate(self):
cdef bint one_message_processed
# lock queue
if self._queue.empty():
with gil:
print("Empty queue")
return False
# Note here that according to Cython refcount conventions,
# the front() method should have returned a new ref.
# This is obviously not the case, so if we do nothing
# we will, at the end of this function, loose a ref on the pointed object
# (as we will decref the thing pointed by next_message).
next_message = self._queue.front()
self._queue.pop_front()
one_message_processed = next_message.activate()
if one_message_processed:
with gil:
print("Top message processed, reporting to result class & deallocating")
if next_message._sync_method is not NULL:
next_message._sync_method.remove_activity()
#del next_message
else:
with gil:
print("Top message could not be processed, enqueueing back")
# Could not process yet (because of sync constraints) => re-enqueue
self._queue.push_back(next_message)
# Don't forget to incref to avoid premature deallocation
Cy_INCREF(next_message)
# unlock queue
return one_message_processed
cdef cypclass BasicResult(ResultInterface) nolock:
union result_t:
int int_val
void* ptr
result_t result
bint result_pushed
@staticmethod
ResultInterface construct():
return BasicResult()
__init__(self):
self.result_pushed = False
self.result.ptr = NULL
void pushResult(self, void* result):
# lock self
self.result.ptr = result
self.result_pushed = True
# unlock self
void pushIntResult(self, int result):
# lock self
self.result.int_val = result
self.result_pushed = True
# unlock self
result_t _getRawResult(self):
if not self.result_pushed:
# wait in a cool manner, busy loop is horrible
#sleep(1)
with gil:
print("No value yet !")
# We must be careful here. Due to out-of-order execution,
# the thread doing push_result could have modified self.result_pushed before
# assigning self.result to the correct value.
# To be sure everything is OK, we must have a barrier of some kind.
# Taking the lock can be this kind of barrier (but it's clearly overkill)
# lock self
# unlock self
return self.result
void* getResult(self):
res = self._getRawResult()
return res.ptr
int getIntResult(self):
res = self._getRawResult()
return res.int_val
#int __int__(self):
# res = self._getRawResult()
# return res.int_val
cdef cypclass ActivityCounterSync(SyncInterface) nolock:
int count
ActivityCounterSync previous_sync
__init__(self, ActivityCounterSync prev = <ActivityCounterSync> NULL):
self.count = 0
self.previous_sync = prev
void insert_activity(self):
# lock self
self.count += 1
# unlock self
void remove_activity(self):
# lock self
self.count -= 1
# unlock self
bint isCompleted(self):
cdef bint result
# lock self
result = self.count == 0
# unlock self
return result
bint isActivable(self):
return self.previous_sync is NULL or self.previous_sync.isCompleted()
cdef cypclass ActivableClass:
ResultInterface (*_active_result_class)()
QueueInterface _active_queue_class
cdef cypclass Active nolock
cdef cypclass Passive(ActivableClass) nolock:
"""
_active_self is generated by cython, as well as activate ?
(implementation note: right after the declaration, declare the active class,
and inject activate & _active_self in the passive class)
The rest is completely user-defined.
"""
Active _active_self
int a
__init__(self, int a = 0):
self.a = a
self._active_queue_class = BasicQueue()
self._active_result_class = BasicResult.construct
int multiply(self, int arg) const:
return self.a * arg
Active activate(self):
if self._active_self is NULL:
self._active_self = Active(self)
return self._active_self
cdef cypclass Passive_multiply_Message(MessageInterface) nolock:
"""
This class is generated automatically, for each Passive's method.
"""
Passive _passive_self
ResultInterface _result
#SyncInterface _sync_method
int arg
__init__(self, Passive _passive_self, ResultInterface _active_result_class, int arg, SyncInterface sync_method = <SyncInterface> NULL):
self._passive_self = _passive_self
self.arg = arg
self._result = _active_result_class
self._sync_method = sync_method
bint activate(self):
cdef void* tmp
if self._sync_method is NULL or self._sync_method.isActivable():
# lock passive self
result = self._passive_self.multiply(self.arg)
# unlock passive self
self._result.pushIntResult(result)
return True
else:
#raise Race # wait etc.
return False
cdef cypclass Active nolock:
"""
This class is automatically generated, one Active class for each Passive class.
"""
Passive _passive_self
QueueInterface _active_queue
ResultInterface (*_active_result_class)()
__init__(self, Passive passive_self):
self._passive_self = passive_self
self._active_queue = passive_self._active_queue_class
self._active_result_class = passive_self._active_result_class
ResultInterface multiply(self, int arg, SyncInterface sync_method = <SyncInterface> NULL):
result_object = self._active_result_class()
message = Passive_multiply_Message(self._passive_self, result_object, arg, sync_method)
self._active_queue.push(message)
return result_object
cpdef test():
obj = Passive(3)
print(obj.multiply(3))
active_object = obj.activate()
#print("Active object created")
s = ActivityCounterSync()
after_s = ActivityCounterSync(s)
result_object = active_object.multiply(3, after_s)
res_before = active_object.multiply(4, s)
#print("Method call pushed")
# Now, manually process the queue
# (in a real-world example, this processing would be done by a separate worker)
# First pass: Could not process anything: the first task must be done after the other one
active_object._active_queue.activate()
print("Trying to access to result_object: this is non-sense")
result_object.getIntResult()
print("Trying to access to res_before: this is non-sense")
res_before.getIntResult()
# Second pass: the first task on the queue is doable: res_before is now set
active_object._active_queue.activate()
print("Trying to access to result_object: this is non-sense")
result_object.getIntResult()
print("Trying to access to res_before: this works")
print(res_before.getIntResult())
# Third pass: everything is processed
active_object._active_queue.activate()
print("Trying to access to result_object: this works")
print(result_object.getIntResult())
print("Trying to access to res_before: this works")
print(res_before.getIntResult())
# Fourth pass: just to check we trigger the empty queue message
active_object._active_queue.activate()
print("Results:")
print(res_before.getIntResult())
print(result_object.getIntResult())
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment