Commit 8ae3dd3e authored by Guillaume Hervier's avatar Guillaume Hervier

Propagate task error from pool threads pool workers

parent 5828fe79
...@@ -23,13 +23,10 @@ import Queue ...@@ -23,13 +23,10 @@ import Queue
import itertools import itertools
import threading import threading
Job = namedtuple('Job', ['func', 'iterable', 'outqueue']) Job = namedtuple('Job', ['func', 'iterable', 'outqueue', 'options'])
Task = namedtuple('Task', ['func', 'args', 'index', 'outqueue']) Task = namedtuple('Task', ['func', 'args', 'index', 'outqueue', 'options'])
Result = namedtuple('Result', ['index', 'value']) Result = namedtuple('Result', ['index', 'value'])
RUNNING = 0
STOPPED = 1
def worker(taskqueue): def worker(taskqueue):
while True: while True:
...@@ -42,7 +39,10 @@ def worker(taskqueue): ...@@ -42,7 +39,10 @@ def worker(taskqueue):
# It means this task was the last of an iterable job # It means this task was the last of an iterable job
result = None result = None
else: else:
value = task.func(*task.args) try:
value = task.func(*task.args)
except Exception as e:
value = e
result = Result(task.index, value) result = Result(task.index, value)
task.outqueue.put(result, block=True) task.outqueue.put(result, block=True)
...@@ -57,8 +57,6 @@ class Pool(object): ...@@ -57,8 +57,6 @@ class Pool(object):
processes = cpu_count() processes = cpu_count()
self.processes = processes self.processes = processes
self.state = STOPPED
# Init queues # Init queues
self.taskqueue = Queue.Queue(maxsize=max_taskqueue_size) self.taskqueue = Queue.Queue(maxsize=max_taskqueue_size)
self.jobqueue = Queue.Queue(maxsize=max_jobqueue_size) self.jobqueue = Queue.Queue(maxsize=max_jobqueue_size)
...@@ -88,25 +86,25 @@ class Pool(object): ...@@ -88,25 +86,25 @@ class Pool(object):
return thread return thread
def create_job(self, func, iterable, max_outqueue_size=0): def create_job(self, func, iterable, **options):
max_outqueue_size = options.pop('max_outqueue_size', 0)
outqueue = Queue.Queue(maxsize=max_outqueue_size) outqueue = Queue.Queue(maxsize=max_outqueue_size)
job = Job(func, iterable, outqueue) job = Job(func, iterable, outqueue, options)
self.jobqueue.put(job) self.jobqueue.put(job)
return job return job
def imap(self, func, iterable, max_outqueue_size=0): def imap(self, func, iterable, **options):
iterable = itertools.imap(None, iterable) iterable = itertools.imap(None, iterable)
job = self.create_job(func, iterable, job = self.create_job(func, iterable, **options)
max_outqueue_size=max_outqueue_size)
return IMapIterator(job.outqueue) return IMapIterator(job.outqueue)
def apply(self, func, *args): def apply(self, func, *args, **options):
job = self.create_job(func, [args]) job = self.create_job(func, [args], **options)
return AsyncResult(job.outqueue) return AsyncResult(job.outqueue)
...@@ -128,9 +126,9 @@ class Pool(object): ...@@ -128,9 +126,9 @@ class Pool(object):
break break
for (index, args) in enumerate(job.iterable): for (index, args) in enumerate(job.iterable):
task = Task(job.func, args, index, job.outqueue) task = Task(job.func, args, index, job.outqueue, job.options)
taskqueue.put(task, block=True) taskqueue.put(task, block=True)
taskqueue.put(Task(None, None, None, job.outqueue), block=True) taskqueue.put(Task(None, None, None, job.outqueue, job.options), block=True)
jobqueue.task_done() jobqueue.task_done()
...@@ -151,6 +149,8 @@ class IMapIterator(object): ...@@ -151,6 +149,8 @@ class IMapIterator(object):
result = self.outqueue.get(True) result = self.outqueue.get(True)
if result is None: if result is None:
raise StopIteration() raise StopIteration()
if isinstance(result.value, Exception):
raise result.value
if result.index != self.index: if result.index != self.index:
self.results[result.index] = result self.results[result.index] = result
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment