Commit e3ef3c8e authored by Denis Bilenko's avatar Denis Bilenko

fix #423: pool: LoopExit in imap/imap_unordered

It was possible for IMap/IMapUnordered greenlets to exit without putting the final StopIteration. So, whoever was waiting on the results would have to wait forever (or until LoopExit if there's nothing else running in the program).

Original patch and test by @thinxer (close #424).

Fix #311: same issue.
parent 232bcec4
......@@ -4,6 +4,12 @@ Changelog
.. currentmodule:: gevent
Release 1.0.1
-------------
- Fix #423: Pool's imap/imap_unordered could hang forever. Based on patch and test by Jianfei Wang.
Release 1.0 (Nov 26, 2013)
--------------------------
......
......@@ -208,6 +208,7 @@ class IMapUnordered(Greenlet):
self.iterable = iterable
self.queue = Queue()
self.count = 0
self.finished = False
self.rawlink(self._on_finish)
def __iter__(self):
......@@ -226,13 +227,9 @@ class IMapUnordered(Greenlet):
def _run(self):
try:
func = self.func
empty = True
for item in self.iterable:
self.count += 1
self.spawn(func, item).rawlink(self._on_result)
empty = False
if empty:
self.queue.put(Failure(StopIteration))
finally:
self.__dict__.pop('spawn', None)
self.__dict__.pop('func', None)
......@@ -244,12 +241,20 @@ class IMapUnordered(Greenlet):
self.queue.put(greenlet.value)
else:
self.queue.put(Failure(greenlet.exception))
if self.ready() and self.count <= 0:
if self.ready() and self.count <= 0 and not self.finished:
self.queue.put(Failure(StopIteration))
self.finished = True
def _on_finish(self, _self):
if self.finished:
return
if not self.successful():
self.queue.put(Failure(self.exception))
self.finished = True
return
if self.count <= 0:
self.queue.put(Failure(StopIteration))
self.finished = True
class IMap(Greenlet):
......@@ -266,6 +271,7 @@ class IMap(Greenlet):
self.waiting = [] # QQQ maybe deque will work faster there?
self.index = 0
self.maxindex = -1
self.finished = False
self.rawlink(self._on_finish)
def __iter__(self):
......@@ -291,7 +297,6 @@ class IMap(Greenlet):
def _run(self):
try:
empty = True
func = self.func
for item in self.iterable:
self.count += 1
......@@ -299,10 +304,6 @@ class IMap(Greenlet):
g.rawlink(self._on_result)
self.maxindex += 1
g.index = self.maxindex
empty = False
if empty:
self.maxindex += 1
self.queue.put((self.maxindex, Failure(StopIteration)))
finally:
self.__dict__.pop('spawn', None)
self.__dict__.pop('func', None)
......@@ -314,14 +315,23 @@ class IMap(Greenlet):
self.queue.put((greenlet.index, greenlet.value))
else:
self.queue.put((greenlet.index, Failure(greenlet.exception)))
if self.ready() and self.count <= 0:
if self.ready() and self.count <= 0 and not self.finished:
self.maxindex += 1
self.queue.put((self.maxindex, Failure(StopIteration)))
self.finished = True
def _on_finish(self, _self):
if self.finished:
return
if not self.successful():
self.maxindex += 1
self.queue.put((self.maxindex, Failure(self.exception)))
self.finished = True
return
if self.count <= 0:
self.maxindex += 1
self.queue.put((self.maxindex, Failure(StopIteration)))
self.finished = True
class Failure(object):
......
......@@ -2,6 +2,7 @@ from time import time
import gevent
from gevent import pool
from gevent.event import Event
from gevent.queue import Queue
import greentest
import random
from greentest import ExpectedException
......@@ -225,6 +226,12 @@ def sqr_random_sleep(x):
return x * x
def final_sleep():
for i in range(3):
yield i
gevent.sleep(0.2)
TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.082, 0.035, 0.14
......@@ -330,6 +337,30 @@ class TestPool(greentest.TestCase):
expected = ['1', '2', '10']
self.assertEqual(result, expected)
# https://github.com/surfly/gevent/issues/423
def test_imap_no_stop(self):
q = Queue()
q.put(123)
gevent.spawn_later(0.1, q.put, StopIteration)
result = list(self.pool.imap(lambda _: _, q))
self.assertEqual(result, [123])
def test_imap_unordered_no_stop(self):
q = Queue()
q.put(1234)
gevent.spawn_later(0.1, q.put, StopIteration)
result = list(self.pool.imap_unordered(lambda _: _, q))
self.assertEqual(result, [1234])
# same issue, but different test: https://github.com/surfly/gevent/issues/311
def test_imap_final_sleep(self):
result = list(self.pool.imap(sqr, final_sleep()))
self.assertEqual(result, [0, 1, 4])
def test_imap_unordered_final_sleep(self):
result = list(self.pool.imap_unordered(sqr, final_sleep()))
self.assertEqual(result, [0, 1, 4])
class TestPool2(TestPool):
size = 2
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment