Commit ee1fe514 authored by Xavier Thompson's avatar Xavier Thompson

Use atomic variable for is_finished flag

parent c72cc5a2
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
from libcpp.deque cimport deque from libcpp.deque cimport deque
from libcpp.vector cimport vector from libcpp.vector cimport vector
from libcpp.atomic cimport atomic from libcpp.atomic cimport atomic, memory_order_relaxed
from libc.stdio cimport printf from libc.stdio cimport printf
from libc.stdlib cimport rand from libc.stdlib cimport rand
from posix.unistd cimport sysconf from posix.unistd cimport sysconf
...@@ -27,7 +27,7 @@ cdef inline void * worker_function(void * arg) nogil: ...@@ -27,7 +27,7 @@ cdef inline void * worker_function(void * arg) nogil:
# Wait until a queue becomes available. # Wait until a queue becomes available.
sem_wait(&sch.num_free_queues) sem_wait(&sch.num_free_queues)
# If the scheduler is finished there is nothing to do anymore. # If the scheduler is finished there is nothing to do anymore.
if sch.is_finished: if sch.is_finished.load(memory_order_relaxed):
return <void*> 0 return <void*> 0
# Pop or steal a queue. # Pop or steal a queue.
queue = worker.get_queue() queue = worker.get_queue()
...@@ -104,7 +104,7 @@ cdef cypclass Scheduler: ...@@ -104,7 +104,7 @@ cdef cypclass Scheduler:
sem_t num_free_queues sem_t num_free_queues
atomic[int] num_pending_queues atomic[int] num_pending_queues
sem_t is_idle sem_t is_idle
volatile bint is_finished atomic[bint] is_finished
int num_workers int num_workers
lock Scheduler __new__(alloc, int num_workers=0): lock Scheduler __new__(alloc, int num_workers=0):
...@@ -113,7 +113,7 @@ cdef cypclass Scheduler: ...@@ -113,7 +113,7 @@ cdef cypclass Scheduler:
self.num_workers = num_workers self.num_workers = num_workers
sem_init(&self.num_free_queues, 0, 0) sem_init(&self.num_free_queues, 0, 0)
# Initially the scheduler is idle but not finished # Initially the scheduler is idle but not finished
self.is_finished = False self.is_finished.store(False, memory_order_relaxed)
sem_init(&self.is_idle, 0, 1) sem_init(&self.is_idle, 0, 1)
self.num_pending_queues.store(0) self.num_pending_queues.store(0)
if pthread_barrier_init(&self.barrier, NULL, num_workers + 1): if pthread_barrier_init(&self.barrier, NULL, num_workers + 1):
...@@ -162,7 +162,7 @@ cdef cypclass Scheduler: ...@@ -162,7 +162,7 @@ cdef cypclass Scheduler:
# Wait until all current tasks are joined # Wait until all current tasks are joined
self.join() self.join()
# Signal the worker threads that there is no more work. # Signal the worker threads that there is no more work.
self.is_finished = True self.is_finished.store(True, memory_order_relaxed)
# Pretend that there are new queues to wake up the workers. # Pretend that there are new queues to wake up the workers.
num_free_queues = &self.num_free_queues num_free_queues = &self.num_free_queues
for worker in self.workers: for worker in self.workers:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment