Commit 5237c20f authored by Guillaume Hervier's avatar Guillaume Hervier

Refactor pool result queue to limit size

parent cebfe1e5
......@@ -35,8 +35,7 @@ def get_signature_vf(rorp):
size = os.path.getsize(rorp.path)
file2 = open(rorp.path, 'rb')
signature_fp = librsync.SigFile(file2, find_blocksize(size))
vf = connection.VirtualFile.new(signature_fp)
return vf
return signature_fp
def find_blocksize(file_len):
"""Return a reasonable block size to use on files of length file_len
......
......@@ -22,9 +22,10 @@ from multiprocessing import cpu_count
import Queue
import itertools
import threading
import sync
Job = namedtuple('Job', ['func', 'iterable', 'outqueue', 'options'])
Task = namedtuple('Task', ['func', 'args', 'index', 'outqueue', 'options'])
Task = namedtuple('Task', ['func', 'args', 'out', 'options'])
Result = namedtuple('Result', ['index', 'value'])
......@@ -35,18 +36,19 @@ def worker(taskqueue):
taskqueue.task_done()
break
if task.func is None:
# It means this task was the last of an iterable job
result = None
if callable(task.out):
args = task.options.get('out_args', ())
out = task.out(*args)
else:
try:
value = task.func(*task.args)
except Exception as e:
value = e
result = Result(task.index, value)
out = task.out
task.outqueue.put(result, block=True)
try:
value = task.func(*task.args)
except Exception as e:
value = e
result = value
out.set(result)
taskqueue.task_done()
......@@ -66,9 +68,7 @@ class Pool(object):
self.start_workers()
# Init task handler thread
self._job_handler_thread = self._start_handler_thread(self._job_handler,
self.jobqueue,
self.taskqueue)
self._job_handler_thread = self._start_handler_thread(self._job_handler)
def start_workers(self):
while len(self.workers) < self.processes:
......@@ -88,13 +88,21 @@ class Pool(object):
def create_job(self, func, iterable, **options):
max_outqueue_size = options.pop('max_outqueue_size', 0)
outqueue = Queue.Queue(maxsize=max_outqueue_size)
outqueue = JobResultQueue(maxsize=max_outqueue_size)
job = Job(func, iterable, outqueue, options)
self.jobqueue.put(job)
self.jobqueue.put(job, block=True)
return job
def create_task(self, func, args, out=None, **options):
if out is None:
out = sync.AsyncValue()
task = Task(func, args, out, options)
self.taskqueue.put(task, block=True)
return task
def imap(self, func, iterable, **options):
iterable = itertools.imap(None, iterable)
......@@ -104,9 +112,9 @@ class Pool(object):
return IMapIterator(job.outqueue)
def apply(self, func, *args, **options):
job = self.create_job(func, [args], **options)
task = self.create_task(func, args, **options)
return AsyncResult(job.outqueue)
return task.out
def stop(self):
self.jobqueue.put(None, block=True)
......@@ -117,20 +125,22 @@ class Pool(object):
for w in self.workers:
w.join(timeout=timeout)
def _job_handler(self, jobqueue, taskqueue):
def _job_handler(self):
while True:
job = jobqueue.get(True)
job = self.jobqueue.get(True)
if job is None:
for w in self.workers:
taskqueue.put(None)
self.taskqueue.put(None)
break
for (index, args) in enumerate(job.iterable):
task = Task(job.func, args, index, job.outqueue, job.options)
taskqueue.put(task, block=True)
taskqueue.put(Task(None, None, None, job.outqueue, job.options), block=True)
out = job.outqueue.slot(index)
task = self.create_task(job.func, args, out, **job.options)
# task = self.create_task(job.func, args, job.outqueue.slot,
# out_args=(index,), **job.options)
job.outqueue.set_length(index + 1)
jobqueue.task_done()
self.jobqueue.task_done()
class IMapIterator(object):
def __init__(self, outqueue):
......@@ -143,34 +153,55 @@ class IMapIterator(object):
def next(self):
while True:
if self.index in self.results:
result = self.results.pop(self.index)
else:
result = self.outqueue.get(True)
if result is None:
raise StopIteration()
if isinstance(result.value, Exception):
raise result.value
if result.index != self.index:
self.results[result.index] = result
continue
slot = self.outqueue.get(self.index)
if slot is None:
raise StopIteration()
self.index += 1
return result.value
result = slot.get()
if isinstance(result, Exception):
raise result
class AsyncResult(object):
def __init__(self, outqueue):
self.outqueue = outqueue
self.completed = False
self.value = None
def wait(self):
if self.completed:
return
self.value = self.outqueue.get(True)
self.completed = True
def get(self):
self.wait()
return self.value
self.index += 1
return result
class JobResultQueue(object):
def __init__(self, maxsize=None):
self.maxsize = maxsize
self.slots = {}
self.updated = threading.Condition(threading.Lock())
self.length = None
def slot(self, index):
self.updated.acquire()
try:
while len(self.slots) == self.maxsize:
self.updated.wait()
slot = self.slots[index] = sync.AsyncValue()
self.updated.notify()
return slot
finally:
self.updated.release()
def get(self, index):
self.updated.acquire()
try:
while index not in self.slots:
if self.length is not None and index >= self.length:
return None
self.updated.wait()
slot = self.slots.pop(index)
self.updated.notify()
return slot
finally:
self.updated.release()
def set_length(self, length):
self.updated.acquire()
try:
self.length = length
self.updated.notify()
finally:
self.updated.release()
......@@ -49,7 +49,6 @@ def Restore(mirror_rp, inc_rpath, target, restore_to_time):
TargetS.patch(target, diff_iter)
pool.stop()
pool.join()
MirrorS.close_rf_cache()
......@@ -293,8 +292,7 @@ class MirrorStruct:
file_fp = cls.rf_cache.get_fp(expanded_index, mir_rorp)
if (target_rorp):
# a file is already there, we can create a diff
signature_vf = target_rorp.conn.Rdiff.get_signature_vf(target_rorp)
target_signature = connection.VirtualFile(target_rorp.conn, signature_vf)
target_signature = target_rorp.conn.Rdiff.get_signature_vf(target_rorp)
file_fobj = opener.lazy_open(file_fp.name, 'rb')
delta_fp = mir_rorp.get_delta(target_signature, file_fobj)
mir_rorp.setfile(delta_fp)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment