Commit a503aa40 authored by Denis Bilenko's avatar Denis Bilenko

test__pool.py: adapt stdlib tests from python26/test/test_multiprocessing.py

parent ec9bccd0
from time import time
import gevent
from gevent import pool
from gevent.event import Event
from greentest import TestCase, main
class TestCoroutinePool(TestCase):
klass = pool.Pool
......@@ -141,6 +143,109 @@ class PoolBasicTests(TestCase):
result = p.apply(lambda a: ('foo', a), (1, ))
self.assertEqual(result, ('foo', 1))
#
# tests from standard library test/test_multiprocessing.py
class TimingWrapper(object):
def __init__(self, func):
self.func = func
self.elapsed = None
def __call__(self, *args, **kwds):
t = time()
try:
return self.func(*args, **kwds)
finally:
self.elapsed = time() - t
def sqr(x, wait=0.0):
gevent.sleep(wait)
return x*x
TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.082, 0.035, 0.14
class TestPool(TestCase):
size = 1
def setUp(self):
self.pool = pool.Pool(self.size)
def tearDown(self):
self.pool.join()
def test_apply(self):
papply = self.pool.apply
self.assertEqual(papply(sqr, (5,)), sqr(5))
self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
def test_map(self):
pmap = self.pool.map
self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
self.assertEqual(pmap(sqr, range(100)), map(sqr, range(100)))
def test_async(self):
res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
get = TimingWrapper(res.get)
self.assertEqual(get(), 49)
self.assertAlmostEqual(get.elapsed, TIMEOUT1, 1)
def test_async_callback(self):
result = []
res = self.pool.apply_async(sqr, (7, TIMEOUT1,), callback=lambda x: result.append(x))
get = TimingWrapper(res.get)
self.assertEqual(get(), 49)
self.assertAlmostEqual(get.elapsed, TIMEOUT1, 1)
assert result == [49], result
def test_async_timeout(self):
res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
get = TimingWrapper(res.get)
self.assertRaises(gevent.Timeout, get, timeout=TIMEOUT2)
self.assertAlmostEqual(get.elapsed, TIMEOUT2, 1)
def test_imap(self):
it = self.pool.imap(sqr, range(10))
self.assertEqual(list(it), map(sqr, range(10)))
it = self.pool.imap(sqr, range(10))
for i in range(10):
self.assertEqual(it.next(), i*i)
self.assertRaises(StopIteration, it.next)
it = self.pool.imap(sqr, range(1000))
for i in range(1000):
self.assertEqual(it.next(), i*i)
self.assertRaises(StopIteration, it.next)
def test_imap_unordered(self):
it = self.pool.imap_unordered(sqr, range(1000))
self.assertEqual(sorted(it), map(sqr, range(1000)))
it = self.pool.imap_unordered(sqr, range(1000))
self.assertEqual(sorted(it), map(sqr, range(1000)))
def test_terminate(self):
result = self.pool.map_async(
gevent.sleep, [0.1 for i in range(1000)]
)
kill = TimingWrapper(self.pool.kill)
kill(block=True)
assert kill.elapsed < 0.2, kill.elapsed
class TestPool2(TestPool):
size = 2
class TestPool3(TestPool):
size = 3
class TestPool10(TestPool):
size = 10
class TestPoolUnlimit(TestPool):
size = None
if __name__=='__main__':
main()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment