Commit e2cd323e authored by Denis Bilenko's avatar Denis Bilenko

Fix #216: propagate errors raised by Pool.map/imap

parent 12d3ca96
...@@ -244,6 +244,8 @@ class IMapUnordered(Greenlet): ...@@ -244,6 +244,8 @@ class IMapUnordered(Greenlet):
self.count -= 1 self.count -= 1
if greenlet.successful(): if greenlet.successful():
self.queue.put(greenlet.value) self.queue.put(greenlet.value)
else:
self.queue.put(Failure(greenlet.exception))
if self.ready() and self.count <= 0: if self.ready() and self.count <= 0:
self.queue.put(Failure(StopIteration)) self.queue.put(Failure(StopIteration))
...@@ -283,7 +285,6 @@ class IMap(Greenlet): ...@@ -283,7 +285,6 @@ class IMap(Greenlet):
self.index += 1 self.index += 1
if isinstance(value, Failure): if isinstance(value, Failure):
raise value.exc raise value.exc
if value is not _SKIP:
return value return value
if PY3: if PY3:
...@@ -314,7 +315,7 @@ class IMap(Greenlet): ...@@ -314,7 +315,7 @@ class IMap(Greenlet):
if greenlet.successful(): if greenlet.successful():
self.queue.put((greenlet.index, greenlet.value)) self.queue.put((greenlet.index, greenlet.value))
else: else:
self.queue.put((greenlet.index, _SKIP)) self.queue.put((greenlet.index, Failure(greenlet.exception)))
if self.ready() and self.count <= 0: if self.ready() and self.count <= 0:
self.maxindex += 1 self.maxindex += 1
self.queue.put((self.maxindex, Failure(StopIteration))) self.queue.put((self.maxindex, Failure(StopIteration)))
...@@ -395,6 +396,3 @@ class pass_value(object): ...@@ -395,6 +396,3 @@ class pass_value(object):
def __getattr__(self, item): def __getattr__(self, item):
assert item != 'callback' assert item != 'callback'
return getattr(self.callback, item) return getattr(self.callback, item)
_SKIP = object()
...@@ -416,5 +416,33 @@ class TestErrorInIterator(greentest.TestCase): ...@@ -416,5 +416,33 @@ class TestErrorInIterator(greentest.TestCase):
gevent.sleep(0.001) gevent.sleep(0.001)
def divide_by(x):
return 1.0 / x
class TestErrorInHandler(greentest.TestCase):
error_fatal = False
def test_map(self):
p = pool.Pool(3)
self.assertRaises(ZeroDivisionError, p.map, divide_by, [1, 0, 2])
def test_imap(self):
p = pool.Pool(1)
it = p.imap(divide_by, [1, 0, 2])
self.assertEqual(it.next(), 1.0)
self.assertRaises(ZeroDivisionError, it.next)
self.assertEqual(it.next(), 0.5)
self.assertRaises(StopIteration, it.next)
def test_imap_unordered(self):
p = pool.Pool(1)
it = p.imap_unordered(divide_by, [1, 0, 2])
self.assertEqual(it.next(), 1.0)
self.assertRaises(ZeroDivisionError, it.next)
self.assertEqual(it.next(), 0.5)
self.assertRaises(StopIteration, it.next)
if __name__ == '__main__': if __name__ == '__main__':
greentest.main() greentest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment