Commit 19545c50 authored by Xavier Thompson's avatar Xavier Thompson

Improve scheduler interface

By splitting the implementation into `Scheduler` and `Pool` types,
the user-facing Scheduler type is now free of reference cycles:

- no more `Scheduler.finish()` method to break reference cycles
- automatic join when no references to the scheduler remain
parent ee1fe514
......@@ -13,6 +13,7 @@ cdef extern from "<unistd.h>" nogil:
enum: _SC_NPROCESSORS_ONLN # Seems to not be included in "posix.unistd".
cdef cypclass Pool
cdef cypclass Scheduler
cdef cypclass Worker
......@@ -20,14 +21,16 @@ cdef cypclass Worker
# The C compiler will dismiss it because we pass the function pointer to create a thread which prevents inlining.
cdef inline void * worker_function(void * arg) nogil:
worker = <lock Worker> arg
sch = <Scheduler> <void*> worker.scheduler
pool = <Pool> <void*> worker.pool
# Wait until all the workers are ready.
pthread_barrier_wait(&sch.barrier)
pthread_barrier_wait(&pool.barrier)
while 1:
# Reset queue to NULL to discard previous one
queue = NULL
# Wait until a queue becomes available.
sem_wait(&sch.num_free_queues)
# If the scheduler is finished there is nothing to do anymore.
if sch.is_finished.load(memory_order_relaxed):
sem_wait(&pool.num_free_queues)
# If the pool is finished there is nothing to do anymore.
if pool.is_finished.load(memory_order_relaxed):
return <void*> 0
# Pop or steal a queue.
queue = worker.get_queue()
......@@ -38,26 +41,26 @@ cdef inline void * worker_function(void * arg) nogil:
# Mark the empty queue as not assigned to any worker.
queue.has_worker = False
# Decrement the number of non-completed queues.
if sch.num_pending_queues.fetch_sub(1) == 1:
if pool.num_pending_queues.fetch_sub(1) == 1:
# Signal that there are no more queues.
sem_post(&sch.is_idle)
sem_post(&pool.is_idle)
# Discard the empty queue and continue the main loop.
continue
# The queue is not empty: reinsert it in this worker's queues.
with wlocked worker:
worker.queues.push_back(queue)
# Signal that the queue is available.
sem_post(&sch.num_free_queues)
sem_post(&pool.num_free_queues)
cdef cypclass Worker:
deque[lock SequentialMailBox] queues
lock Scheduler scheduler
lock Pool pool
pthread_t thread
lock Worker __new__(alloc, lock Scheduler scheduler):
lock Worker __new__(alloc, lock Pool pool):
instance = consume alloc()
instance.scheduler = scheduler
instance.pool = pool
locked_instance = <lock Worker> consume instance
if not pthread_create(&locked_instance.thread, NULL, worker_function, <void *> locked_instance):
return locked_instance
......@@ -79,11 +82,11 @@ cdef cypclass Worker:
# - return the last queue of the first worker with a non-empty list
# - continue looping until a queue is found
cdef int i, index, num_workers, random_offset
sch = <Scheduler> <void*> self.scheduler
num_workers = <int> sch.workers.size()
pool = <Pool> <void*> self.pool
num_workers = <int> pool.workers.size()
index = rand() % num_workers
while True:
victim = sch.workers[index]
victim = pool.workers[index]
with wlocked victim:
if not victim.queues.empty():
stolen_queue = victim.queues.back()
......@@ -98,7 +101,7 @@ cdef cypclass Worker:
return pthread_join(self.thread, NULL)
cdef cypclass Scheduler:
cdef cypclass Pool:
vector[lock Worker] workers
pthread_barrier_t barrier
sem_t num_free_queues
......@@ -107,12 +110,12 @@ cdef cypclass Scheduler:
atomic[bint] is_finished
int num_workers
lock Scheduler __new__(alloc, int num_workers=0):
self = <lock Scheduler> consume alloc()
lock Pool __new__(alloc, int num_workers=0):
self = <lock Pool> consume alloc()
if num_workers == 0: num_workers = sysconf(_SC_NPROCESSORS_ONLN)
self.num_workers = num_workers
sem_init(&self.num_free_queues, 0, 0)
# Initially the scheduler is idle but not finished
# Initially the pool is idle but not finished
self.is_finished.store(False, memory_order_relaxed)
sem_init(&self.is_idle, 0, 1)
self.num_pending_queues.store(0)
......@@ -136,24 +139,24 @@ cdef cypclass Scheduler:
void post_queue(lock self, lock SequentialMailBox queue):
cdef int num_workers, random_offset, num_previous_queues
sch = <Scheduler> <void*> self
pool = <Pool> <void*> self
# Increment the number of non-completed queues.
num_previous_queues = sch.num_pending_queues.fetch_add(1)
num_previous_queues = pool.num_pending_queues.fetch_add(1)
if num_previous_queues == 0:
# Signal that the scheduler is not idle.
# Signal that the pool is not idle.
sem_wait(&self.is_idle)
# Add a queue to a random worker.
num_workers = <int> sch.workers.size()
num_workers = <int> pool.workers.size()
random_offset = rand() % num_workers
receiver = sch.workers[random_offset]
receiver = pool.workers[random_offset]
with wlocked receiver:
queue.has_worker = True
receiver.queues.push_back(queue)
# Signal that a queue is available.
sem_post(&sch.num_free_queues)
sem_post(&pool.num_free_queues)
void join(lock self):
# Wait until the scheduler is idle.
# Wait until the pool is idle.
is_idle = &self.is_idle
sem_wait(is_idle)
sem_post(is_idle)
......@@ -171,6 +174,21 @@ cdef cypclass Scheduler:
self.workers.clear()
cdef cypclass Scheduler:
lock Pool pool
lock Scheduler __new__(alloc, int num_workers=0):
self = consume alloc()
self.pool = Pool(num_workers)
return consume self
void join(lock self):
self.pool.join()
__dealloc__(self):
self.pool.finish()
cdef cypclass SequentialMailBox(ActhonQueueInterface):
deque[ActhonMessageInterface] messages
lock Scheduler scheduler
......@@ -190,9 +208,9 @@ cdef cypclass SequentialMailBox(ActhonQueueInterface):
if message._sync_method is not NULL:
message._sync_method.insertActivity()
# If no worker is already assigned this queue
# register it with the scheduler.
# register it with the scheduler's pool.
if not self.has_worker:
self.scheduler.post_queue(self)
self.scheduler.pool.post_queue(self)
bint activate(self):
# Try to process the first message in the queue.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment