# -*- coding: utf-8 -*- """ Test suite for PEP 380 implementation adapted from original tests written by Greg Ewing see <http://www.cosc.canterbury.ac.nz/greg.ewing/python/yield-from/YieldFrom-Python3.1.2-rev5.zip> """ import sys def _lines(trace): for line in trace: print(line) def test_delegation_of_initial_next_to_subgenerator(): """ >>> _lines(test_delegation_of_initial_next_to_subgenerator()) Starting g1 Starting g2 Yielded 42 Finishing g2 Finishing g1 """ trace = [] def g1(): trace.append("Starting g1") yield from g2() trace.append("Finishing g1") def g2(): trace.append("Starting g2") yield 42 trace.append("Finishing g2") for x in g1(): trace.append("Yielded %s" % (x,)) return trace def test_raising_exception_in_initial_next_call(): """ >>> _lines(test_raising_exception_in_initial_next_call()) Starting g1 Starting g2 Finishing g2 Finishing g1 """ trace = [] def g1(): try: trace.append("Starting g1") yield from g2() finally: trace.append("Finishing g1") def g2(): try: trace.append("Starting g2") raise ValueError("spanish inquisition occurred") finally: trace.append("Finishing g2") try: for x in g1(): trace.append("Yielded %s" % (x,)) except ValueError as e: pass else: trace.append("subgenerator failed to raise ValueError") return trace def test_delegation_of_next_call_to_subgenerator(): """ >>> _lines(test_delegation_of_next_call_to_subgenerator()) Starting g1 Yielded g1 ham Starting g2 Yielded g2 spam Yielded g2 more spam Finishing g2 Yielded g1 eggs Finishing g1 """ trace = [] def g1(): trace.append("Starting g1") yield "g1 ham" yield from g2() yield "g1 eggs" trace.append("Finishing g1") def g2(): trace.append("Starting g2") yield "g2 spam" yield "g2 more spam" trace.append("Finishing g2") for x in g1(): trace.append("Yielded %s" % (x,)) return trace def test_raising_exception_in_delegated_next_call(): """ >>> _lines(test_raising_exception_in_delegated_next_call()) Starting g1 Yielded g1 ham Starting g2 Yielded g2 spam Finishing g2 Finishing g1 """ trace = [] def g1(): try: trace.append("Starting g1") yield "g1 ham" yield from g2() yield "g1 eggs" finally: trace.append("Finishing g1") def g2(): try: trace.append("Starting g2") yield "g2 spam" raise ValueError("hovercraft is full of eels") yield "g2 more spam" finally: trace.append("Finishing g2") try: for x in g1(): trace.append("Yielded %s" % (x,)) except ValueError: pass else: trace.append("subgenerator failed to raise ValueError") return trace def test_delegation_of_send(): """ >>> _lines(test_delegation_of_send()) Starting g1 g1 received 1 Starting g2 Yielded g2 spam g2 received 2 Yielded g2 more spam g2 received 3 Finishing g2 Yielded g1 eggs g1 received 4 Finishing g1 """ trace = [] def g1(): trace.append("Starting g1") x = yield "g1 ham" trace.append("g1 received %s" % (x,)) yield from g2() x = yield "g1 eggs" trace.append("g1 received %s" % (x,)) trace.append("Finishing g1") def g2(): trace.append("Starting g2") x = yield "g2 spam" trace.append("g2 received %s" % (x,)) x = yield "g2 more spam" trace.append("g2 received %s" % (x,)) trace.append("Finishing g2") g = g1() y = next(g) x = 1 try: while 1: y = g.send(x) trace.append("Yielded %s" % (y,)) x += 1 except StopIteration: pass return trace def test_handling_exception_while_delegating_send(): """ >>> _lines(test_handling_exception_while_delegating_send()) Starting g1 g1 received 1 Starting g2 Yielded g2 spam g2 received 2 """ trace = [] def g1(): trace.append("Starting g1") x = yield "g1 ham" trace.append("g1 received %s" % (x,)) yield from g2() x = yield "g1 eggs" trace.append("g1 received %s" % (x,)) trace.append("Finishing g1") def g2(): trace.append("Starting g2") x = yield "g2 spam" trace.append("g2 received %s" % (x,)) raise ValueError("hovercraft is full of eels") x = yield "g2 more spam" trace.append("g2 received %s" % (x,)) trace.append("Finishing g2") def run(): g = g1() y = next(g) x = 1 try: while 1: y = g.send(x) trace.append("Yielded %s" % (y,)) x += 1 except StopIteration: trace.append("StopIteration") try: run() except ValueError: pass # ok else: trace.append("no ValueError") return trace def test_delegating_close(): """ >>> _lines(test_delegating_close()) Starting g1 Yielded g1 ham Starting g2 Yielded g2 spam Finishing g2 Finishing g1 """ trace = [] def g1(): try: trace.append("Starting g1") yield "g1 ham" yield from g2() yield "g1 eggs" finally: trace.append("Finishing g1") def g2(): try: trace.append("Starting g2") yield "g2 spam" yield "g2 more spam" finally: trace.append("Finishing g2") g = g1() for i in range(2): x = next(g) trace.append("Yielded %s" % (x,)) g.close() return trace def test_handing_exception_while_delegating_close(): """ >>> _lines(test_handing_exception_while_delegating_close()) Starting g1 Yielded g1 ham Starting g2 Yielded g2 spam Finishing g2 Finishing g1 nybbles have exploded with delight """ trace = [] def g1(): try: trace.append("Starting g1") yield "g1 ham" yield from g2() yield "g1 eggs" finally: trace.append("Finishing g1") def g2(): try: trace.append("Starting g2") yield "g2 spam" yield "g2 more spam" finally: trace.append("Finishing g2") raise ValueError("nybbles have exploded with delight") try: g = g1() for i in range(2): x = next(g) trace.append("Yielded %s" % (x,)) g.close() except ValueError as e: trace.append(e.args[0]) # FIXME: __context__ is currently not set #if sys.version_info[0] >= 3: # assert isinstance(e.__context__, GeneratorExit), 'exception context is %r' % e.__context__ else: trace.append("subgenerator failed to raise ValueError") return trace def test_delegating_throw(): """ >>> _lines(test_delegating_throw()) Starting g1 Yielded g1 ham Starting g2 Yielded g2 spam Finishing g2 Finishing g1 """ trace = [] def g1(): try: trace.append("Starting g1") yield "g1 ham" yield from g2() yield "g1 eggs" finally: trace.append("Finishing g1") def g2(): try: trace.append("Starting g2") yield "g2 spam" yield "g2 more spam" finally: trace.append("Finishing g2") try: g = g1() for i in range(2): x = next(g) trace.append("Yielded %s" % (x,)) e = ValueError("tomato ejected") g.throw(e) except ValueError: pass else: trace.append("subgenerator failed to raise ValueError") return trace def __test_value_attribute_of_StopIteration_exception(): """ StopIteration: value = None StopIteration: spam value = spam StopIteration: spam value = eggs """ trace = [] def pex(e): trace.append("%s: %s" % (e.__class__.__name__, e)) trace.append("value = %s" % (e.value,)) e = StopIteration() pex(e) e = StopIteration("spam") pex(e) e.value = "eggs" pex(e) return trace def test_exception_value_crash(): """ >>> test_exception_value_crash() ['g2'] """ # There used to be a refcount error in CPython when the return value # stored in the StopIteration has a refcount of 1. def g1(): yield from g2() def g2(): yield "g2" return [42] return list(g1()) def test_generator_return_value(): """ >>> _lines(test_generator_return_value()) Starting g1 Yielded g1 ham Starting g2 Yielded g2 spam Yielded g2 more spam Finishing g2 g2 returned None Starting g2 Yielded g2 spam Yielded g2 more spam Finishing g2 g2 returned 42 Yielded g1 eggs Finishing g1 """ trace = [] def g1(): trace.append("Starting g1") yield "g1 ham" ret = yield from g2() trace.append("g2 returned %s" % (ret,)) ret = yield from g2(42) trace.append("g2 returned %s" % (ret,)) yield "g1 eggs" trace.append("Finishing g1") def g2(v = None): trace.append("Starting g2") yield "g2 spam" yield "g2 more spam" trace.append("Finishing g2") if v: return v for x in g1(): trace.append("Yielded %s" % (x,)) return trace def test_delegation_of_next_to_non_generator(): """ >>> _lines(test_delegation_of_next_to_non_generator()) Yielded 0 Yielded 1 Yielded 2 """ trace = [] def g(): yield from range(3) for x in g(): trace.append("Yielded %s" % (x,)) return trace def test_conversion_of_sendNone_to_next(): """ >>> _lines(test_conversion_of_sendNone_to_next()) Yielded: 0 Yielded: 1 Yielded: 2 """ trace = [] def g(): yield from range(3) gi = g() for x in range(3): y = gi.send(None) trace.append("Yielded: %s" % (y,)) return trace def test_delegation_of_close_to_non_generator(): """ >>> _lines(test_delegation_of_close_to_non_generator()) starting g finishing g """ trace = [] def g(): try: trace.append("starting g") yield from range(3) trace.append("g should not be here") finally: trace.append("finishing g") gi = g() next(gi) gi.close() return trace def test_delegating_throw_to_non_generator(): """ >>> _lines(test_delegating_throw_to_non_generator()) Starting g Yielded 0 Yielded 1 Yielded 2 Yielded 3 Yielded 4 Finishing g """ trace = [] def g(): try: trace.append("Starting g") yield from range(10) finally: trace.append("Finishing g") try: gi = g() for i in range(5): x = next(gi) trace.append("Yielded %s" % (x,)) e = ValueError("tomato ejected") gi.throw(e) except ValueError: pass else: trace.append("subgenerator failed to raise ValueError") return trace def test_attempting_to_send_to_non_generator(): """ >>> _lines(test_attempting_to_send_to_non_generator()) starting g finishing g """ trace = [] def g(): try: trace.append("starting g") yield from range(3) trace.append("g should not be here") finally: trace.append("finishing g") try: gi = g() next(gi) for x in range(3): y = gi.send(42) trace.append("Should not have yielded: %s" % y) except AttributeError: pass else: trace.append("was able to send into non-generator") return trace def test_broken_getattr_handling(): """ >>> test_broken_getattr_handling() [] """ class Broken: def __iter__(self): return self def __next__(self): return 1 next = __next__ def __getattr__(self, attr): 1/0 def g(): yield from Broken() not_raised = [] try: gi = g() assert next(gi) == 1 gi.send(1) except ZeroDivisionError: pass else: not_raised.append(1) try: gi = g() assert next(gi) == 1 gi.throw(AttributeError) except ZeroDivisionError: pass else: not_raised.append(2) """ # this currently only calls PyErr_WriteUnraisable() and doesn't raise ... try: gi = g() assert next(gi) == 1 gi.close() except ZeroDivisionError: pass else: not_raised.append(3) """ gi = g() assert next(gi) == 1 gi.close() return not_raised def test_exception_in_initial_next_call(): """ >>> _lines(test_exception_in_initial_next_call()) g1 about to yield from g2 """ trace = [] def g1(): trace.append("g1 about to yield from g2") yield from g2() trace.append("g1 should not be here") def g2(): yield 1/0 def run(): gi = g1() next(gi) try: run() except ZeroDivisionError: pass else: trace.append("ZeroDivisionError not raised") return trace def test_attempted_yield_from_loop(): """ >>> _lines(test_attempted_yield_from_loop()) g1: starting Yielded: y1 g1: about to yield from g2 g2: starting Yielded: y2 g2: about to yield from g1 """ trace = [] def g1(): trace.append("g1: starting") yield "y1" trace.append("g1: about to yield from g2") yield from g2() trace.append("g1 should not be here") def g2(): trace.append("g2: starting") yield "y2" trace.append("g2: about to yield from g1") yield from gi trace.append("g2 should not be here") try: gi = g1() for y in gi: trace.append("Yielded: %s" % (y,)) except ValueError: pass # "generator already executing" else: trace.append("subgenerator didn't raise ValueError") return trace def test_attempted_reentry(): """ >>> _lines(test_attempted_reentry()) g1: starting Yielded: y1 g1: about to yield from g2 g2: starting Yielded: y2 g2: about to yield from g1 g2: caught ValueError Yielded: y3 g1: after delegating to g2 Yielded: y4 """ trace = [] def g1(): trace.append("g1: starting") yield "y1" trace.append("g1: about to yield from g2") yield from g2() trace.append("g1: after delegating to g2") yield "y4" def g2(): trace.append("g2: starting") yield "y2" trace.append("g2: about to yield from g1") try: yield from gi except ValueError: trace.append("g2: caught ValueError") else: trace.append("g1 did not raise ValueError on reentry") yield "y3" gi = g1() for y in gi: trace.append("Yielded: %s" % (y,)) return trace def test_returning_value_from_delegated_throw(): """ >>> _lines(test_returning_value_from_delegated_throw()) Starting g1 Yielded g1 ham Starting g2 Yielded g2 spam Caught LunchError in g2 Yielded g2 yet more spam Yielded g1 eggs Finishing g1 """ trace = [] def g1(): try: trace.append("Starting g1") yield "g1 ham" yield from g2() yield "g1 eggs" finally: trace.append("Finishing g1") def g2(): try: trace.append("Starting g2") yield "g2 spam" yield "g2 more spam" except LunchError: trace.append("Caught LunchError in g2") yield "g2 lunch saved" yield "g2 yet more spam" class LunchError(Exception): pass g = g1() for i in range(2): x = next(g) trace.append("Yielded %s" % (x,)) e = LunchError("tomato ejected") g.throw(e) for x in g: trace.append("Yielded %s" % (x,)) return trace def test_next_and_return_with_value(): """ >>> _lines(test_next_and_return_with_value()) g starting f resuming g g returning None f caught StopIteration g starting f resuming g g returning 42 f caught StopIteration """ trace = [] def f(r): gi = g(r) next(gi) try: trace.append("f resuming g") next(gi) trace.append("f SHOULD NOT BE HERE") except StopIteration: trace.append("f caught StopIteration") def g(r): trace.append("g starting") yield trace.append("g returning %s" % (r,)) return r f(None) f(42) return trace def test_send_and_return_with_value(): """ >>> _lines(test_send_and_return_with_value()) g starting f sending spam to g g received spam g returning None f caught StopIteration g starting f sending spam to g g received spam g returning 42 f caught StopIteration """ trace = [] def f(r): gi = g(r) next(gi) try: trace.append("f sending spam to g") gi.send("spam") trace.append("f SHOULD NOT BE HERE") except StopIteration: trace.append("f caught StopIteration") def g(r): trace.append("g starting") x = yield trace.append("g received %s" % (x,)) trace.append("g returning %s" % (r,)) return r f(None) f(42) return trace def test_catching_exception_from_subgen_and_returning(): """ Test catching an exception thrown into a subgenerator and returning a value >>> _lines(test_catching_exception_from_subgen_and_returning()) 1 inner caught ValueError inner returned 2 to outer 2 """ trace = [] def inner(): try: yield 1 except ValueError: trace.append("inner caught ValueError") return 2 def outer(): v = yield from inner() trace.append("inner returned %r to outer" % v) yield v g = outer() trace.append(next(g)) trace.append(g.throw(ValueError)) return trace def test_throwing_GeneratorExit_into_subgen_that_returns(): """ Test throwing GeneratorExit into a subgenerator that catches it and returns normally. >>> _lines(test_throwing_GeneratorExit_into_subgen_that_returns()) Enter g Enter f """ trace = [] def f(): try: trace.append("Enter f") yield trace.append("Exit f") except GeneratorExit: return def g(): trace.append("Enter g") yield from f() trace.append("Exit g") try: gi = g() next(gi) gi.throw(GeneratorExit) except GeneratorExit: pass else: trace.append("subgenerator failed to raise GeneratorExit") return trace def test_throwing_GeneratorExit_into_subgenerator_that_yields(): """ Test throwing GeneratorExit into a subgenerator that catches it and yields. >>> _lines(test_throwing_GeneratorExit_into_subgenerator_that_yields()) Enter g Enter f """ trace = [] def f(): try: trace.append("Enter f") yield trace.append("Exit f") except GeneratorExit: yield def g(): trace.append("Enter g") yield from f() trace.append("Exit g") try: gi = g() next(gi) gi.throw(GeneratorExit) except RuntimeError: pass # "generator ignored GeneratorExit" else: trace.append("subgenerator failed to raise GeneratorExit") return trace def test_throwing_GeneratorExit_into_subgen_that_raises(): """ Test throwing GeneratorExit into a subgenerator that catches it and raises a different exception. >>> _lines(test_throwing_GeneratorExit_into_subgen_that_raises()) Enter g Enter f """ trace = [] def f(): try: trace.append("Enter f") yield trace.append("Exit f") except GeneratorExit: raise ValueError("Vorpal bunny encountered") def g(): trace.append("Enter g") yield from f() trace.append("Exit g") try: gi = g() next(gi) gi.throw(GeneratorExit) except ValueError: pass # "Vorpal bunny encountered" else: trace.append("subgenerator failed to raise ValueError") return trace def test_yield_from_empty(): """ >>> test_yield_from_empty() """ def g(): yield from () try: next(g()) except StopIteration: pass else: return "FAILED" # test re-entry guards def _reentering_gen(): def one(): yield 0 yield from two() yield 3 def two(): yield 1 try: yield from g1 except ValueError: pass yield 2 g1 = one() return g1 def test_delegating_generators_claim_to_be_running_next(): """ >>> test_delegating_generators_claim_to_be_running_next() [0, 1, 2, 3] """ return list(_reentering_gen()) def test_delegating_generators_claim_to_be_running_send(): """ >>> test_delegating_generators_claim_to_be_running_send() [0, 1, 2, 3] """ g1 = _reentering_gen() res = [next(g1)] try: while True: res.append(g1.send(42)) except StopIteration: pass return res def test_delegating_generators_claim_to_be_running_throw(): """ >>> test_delegating_generators_claim_to_be_running_throw() [0, 1, 2, 3] """ class MyErr(Exception): pass def one(): try: yield 0 except MyErr: pass yield from two() try: yield 3 except MyErr: pass def two(): try: yield 1 except MyErr: pass try: yield from g1 except ValueError: pass try: yield 2 except MyErr: pass g1 = one() res = [next(g1)] try: while True: res.append(g1.throw(MyErr)) except StopIteration: pass return res def test_delegating_generators_claim_to_be_running_close(): """ >>> test_delegating_generators_claim_to_be_running_close() 42 """ class MyIt(object): def __iter__(self): return self def __next__(self): return 42 next = __next__ def close(self): assert g1.gi_running try: next(g1) except ValueError: pass # guard worked else: assert False, "re-entry guard failed to bark" def one(): yield from MyIt() g1 = one() ret = next(g1) g1.close() return ret