Commit a9e7456b authored by Denis Bilenko's avatar Denis Bilenko

ThreadPool: make it possible to set 'size' property

parent 011f719f
...@@ -49,10 +49,27 @@ class ThreadPool(object): ...@@ -49,10 +49,27 @@ class ThreadPool(object):
def __len__(self): def __len__(self):
return self.task_queue.unfinished_tasks return self.task_queue.unfinished_tasks
@property def _get_size(self):
def size(self):
return self._size return self._size
def _set_size(self, size):
if size < 0:
raise ValueError('Size of the pool cannot be negative: %r' % (size, ))
if size > self._maxsize:
raise ValueError('Size of the pool cannot be bigger than maxsize: %r > %r' % (size, self._maxsize))
if self.manager:
self.manager.kill()
while self._size < size:
self._add_thread()
delay = 0.0001
while self._size > size:
while self._size - size > self.task_queue.unfinished_tasks:
self.task_queue.put(None)
sleep(delay)
delay = min(delay * 2, .05)
size = property(_get_size, _set_size)
def _init(self, maxsize): def _init(self, maxsize):
self._size = 0 self._size = 0
self._semaphore = Semaphore(1) self._semaphore = Semaphore(1)
...@@ -82,37 +99,35 @@ class ThreadPool(object): ...@@ -82,37 +99,35 @@ class ThreadPool(object):
def kill(self): def kill(self):
if self.manager: if self.manager:
self.manager.kill() self.manager.kill()
self._manage(0) self.size = 0
def _adjust(self, maxsize): def _adjust_step(self):
if maxsize is None: # if there is a possibility & necessity for adding a thread, do it
maxsize = self._maxsize while self._size < self._maxsize and self.task_queue.unfinished_tasks > self._size:
while self.task_queue.unfinished_tasks > self._size and self._size < maxsize:
self._add_thread() self._add_thread()
while self._size - maxsize > self.task_queue.unfinished_tasks: # while the number of threads is more than maxsize, kill one
# we do not check what's already in task_queue - it could be all Nones
while self._size - self._maxsize > self.task_queue.unfinished_tasks:
self.task_queue.put(None) self.task_queue.put(None)
if self._size: if self._size:
self.fork_watcher.start(self._on_fork) self.fork_watcher.start(self._on_fork)
else: else:
self.fork_watcher.stop() self.fork_watcher.stop()
def _manage(self, maxsize=None): def _adjust_wait(self):
if maxsize is None:
maxsize = self._maxsize
delay = 0.0001 delay = 0.0001
while True: while True:
self._adjust(maxsize) self._adjust_step()
if self._size <= maxsize: if self._size <= self._maxsize:
return return
sleep(delay) sleep(delay)
delay = min(delay * 2, .05) delay = min(delay * 2, .05)
def adjust(self): def adjust(self):
if self.manager: self._adjust_step()
return if not self.manager and self._size > self._maxsize:
if self._adjust(self.maxsize): # might need to feed more Nones into the pool
return self.manager = Greenlet.spawn(self._adjust_wait)
self.manager = Greenlet.spawn(self._manage)
def _add_thread(self): def _add_thread(self):
with self._lock: with self._lock:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment