Commit 12805442 authored by Denis Bilenko's avatar Denis Bilenko

add tests for Pool.discard() and Pool.add(). Original patch by Danil Eremeev.

parent 3537561d
......@@ -143,6 +143,41 @@ class PoolBasicTests(greentest.TestCase):
gevent.sleep(0.01)
self.assertEqual(sorted(r), [1, 2, 3, 4])
def test_discard(self):
p = self.klass(size=1)
first = p.spawn(gevent.sleep, 1000)
p.discard(first)
first.kill()
assert not first, first
self.assertEqual(len(p), 0)
self.assertEqual(p._semaphore.counter, 1)
def test_add_method(self):
p = self.klass(size=1)
first = gevent.spawn(gevent.sleep, 1000)
try:
second = gevent.spawn(gevent.sleep, 1000)
try:
self.assertEqual(p.free_count(), 1)
self.assertEqual(len(p), 0)
p.add(first)
timeout = gevent.Timeout(0.1)
timeout.start()
try:
p.add(second)
except gevent.Timeout:
pass
else:
raise AssertionError('Expected timeout')
finally:
timeout.cancel()
self.assertEqual(p.free_count(), 0)
self.assertEqual(len(p), 1)
finally:
second.kill()
finally:
first.kill()
def test_apply(self):
p = self.klass()
result = p.apply(lambda a: ('foo', a), (1, ))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment