# -*- coding: utf-8 -*-

"""
Test suite for PEP 380 implementation

adapted from original tests written by Greg Ewing
see <http://www.cosc.canterbury.ac.nz/greg.ewing/python/yield-from/YieldFrom-Python3.1.2-rev5.zip>
"""

import sys

def _lines(trace):
    for line in trace:
        print(line)

def test_delegation_of_initial_next_to_subgenerator():
    """
    >>> _lines(test_delegation_of_initial_next_to_subgenerator())
    Starting g1
    Starting g2
    Yielded 42
    Finishing g2
    Finishing g1
    """
    trace = []
    def g1():
        trace.append("Starting g1")
        yield from g2()
        trace.append("Finishing g1")
    def g2():
        trace.append("Starting g2")
        yield 42
        trace.append("Finishing g2")
    for x in g1():
        trace.append("Yielded %s" % (x,))
    return trace

def test_raising_exception_in_initial_next_call():
    """
    >>> _lines(test_raising_exception_in_initial_next_call())
    Starting g1
    Starting g2
    Finishing g2
    Finishing g1
    """
    trace = []
    def g1():
        try:
            trace.append("Starting g1")
            yield from g2()
        finally:
            trace.append("Finishing g1")
    def g2():
        try:
            trace.append("Starting g2")
            raise ValueError("spanish inquisition occurred")
        finally:
            trace.append("Finishing g2")
    try:
        for x in g1():
            trace.append("Yielded %s" % (x,))
    except ValueError as e:
        pass
    else:
        trace.append("subgenerator failed to raise ValueError")
    return trace

def test_delegation_of_next_call_to_subgenerator():
    """
    >>> _lines(test_delegation_of_next_call_to_subgenerator())
    Starting g1
    Yielded g1 ham
    Starting g2
    Yielded g2 spam
    Yielded g2 more spam
    Finishing g2
    Yielded g1 eggs
    Finishing g1
    """
    trace = []
    def g1():
        trace.append("Starting g1")
        yield "g1 ham"
        yield from g2()
        yield "g1 eggs"
        trace.append("Finishing g1")
    def g2():
        trace.append("Starting g2")
        yield "g2 spam"
        yield "g2 more spam"
        trace.append("Finishing g2")
    for x in g1():
        trace.append("Yielded %s" % (x,))
    return trace

def test_raising_exception_in_delegated_next_call():
    """
    >>> _lines(test_raising_exception_in_delegated_next_call())
    Starting g1
    Yielded g1 ham
    Starting g2
    Yielded g2 spam
    Finishing g2
    Finishing g1
    """
    trace = []
    def g1():
        try:
            trace.append("Starting g1")
            yield "g1 ham"
            yield from g2()
            yield "g1 eggs"
        finally:
            trace.append("Finishing g1")
    def g2():
        try:
            trace.append("Starting g2")
            yield "g2 spam"
            raise ValueError("hovercraft is full of eels")
            yield "g2 more spam"
        finally:
            trace.append("Finishing g2")
    try:
        for x in g1():
            trace.append("Yielded %s" % (x,))
    except ValueError:
        pass
    else:
        trace.append("subgenerator failed to raise ValueError")
    return trace

def test_delegation_of_send():
    """
    >>> _lines(test_delegation_of_send())
    Starting g1
    g1 received 1
    Starting g2
    Yielded g2 spam
    g2 received 2
    Yielded g2 more spam
    g2 received 3
    Finishing g2
    Yielded g1 eggs
    g1 received 4
    Finishing g1
    """
    trace = []
    def g1():
        trace.append("Starting g1")
        x = yield "g1 ham"
        trace.append("g1 received %s" % (x,))
        yield from g2()
        x = yield "g1 eggs"
        trace.append("g1 received %s" % (x,))
        trace.append("Finishing g1")
    def g2():
        trace.append("Starting g2")
        x = yield "g2 spam"
        trace.append("g2 received %s" % (x,))
        x = yield "g2 more spam"
        trace.append("g2 received %s" % (x,))
        trace.append("Finishing g2")
    g = g1()
    y = next(g)
    x = 1
    try:
        while 1:
            y = g.send(x)
            trace.append("Yielded %s" % (y,))
            x += 1
    except StopIteration:
        pass
    return trace

def test_handling_exception_while_delegating_send():
    """
    >>> _lines(test_handling_exception_while_delegating_send())
    Starting g1
    g1 received 1
    Starting g2
    Yielded g2 spam
    g2 received 2
    """
    trace = []
    def g1():
        trace.append("Starting g1")
        x = yield "g1 ham"
        trace.append("g1 received %s" % (x,))
        yield from g2()
        x = yield "g1 eggs"
        trace.append("g1 received %s" % (x,))
        trace.append("Finishing g1")
    def g2():
        trace.append("Starting g2")
        x = yield "g2 spam"
        trace.append("g2 received %s" % (x,))
        raise ValueError("hovercraft is full of eels")
        x = yield "g2 more spam"
        trace.append("g2 received %s" % (x,))
        trace.append("Finishing g2")
    def run():
        g = g1()
        y = next(g)
        x = 1
        try:
            while 1:
                y = g.send(x)
                trace.append("Yielded %s" % (y,))
                x += 1
        except StopIteration:
            trace.append("StopIteration")
    try:
        run()
    except ValueError:
        pass # ok
    else:
        trace.append("no ValueError")
    return trace

def test_delegating_close():
    """
    >>> _lines(test_delegating_close())
    Starting g1
    Yielded g1 ham
    Starting g2
    Yielded g2 spam
    Finishing g2
    Finishing g1
    """
    trace = []
    def g1():
        try:
            trace.append("Starting g1")
            yield "g1 ham"
            yield from g2()
            yield "g1 eggs"
        finally:
            trace.append("Finishing g1")
    def g2():
        try:
            trace.append("Starting g2")
            yield "g2 spam"
            yield "g2 more spam"
        finally:
            trace.append("Finishing g2")
    g = g1()
    for i in range(2):
        x = next(g)
        trace.append("Yielded %s" % (x,))
    g.close()
    return trace

def test_handing_exception_while_delegating_close():
    """
    >>> _lines(test_handing_exception_while_delegating_close())
    Starting g1
    Yielded g1 ham
    Starting g2
    Yielded g2 spam
    Finishing g2
    Finishing g1
    nybbles have exploded with delight
    """
    trace = []
    def g1():
        try:
            trace.append("Starting g1")
            yield "g1 ham"
            yield from g2()
            yield "g1 eggs"
        finally:
            trace.append("Finishing g1")
    def g2():
        try:
            trace.append("Starting g2")
            yield "g2 spam"
            yield "g2 more spam"
        finally:
            trace.append("Finishing g2")
            raise ValueError("nybbles have exploded with delight")
    try:
        g = g1()
        for i in range(2):
            x = next(g)
            trace.append("Yielded %s" % (x,))
        g.close()
    except ValueError as e:
        trace.append(e.args[0])
        # FIXME: __context__ is currently not set
        #if sys.version_info[0] >= 3:
        #    assert isinstance(e.__context__, GeneratorExit), 'exception context is %r' % e.__context__
    else:
        trace.append("subgenerator failed to raise ValueError")
    return trace

def test_delegating_throw():
    """
    >>> _lines(test_delegating_throw())
    Starting g1
    Yielded g1 ham
    Starting g2
    Yielded g2 spam
    Finishing g2
    Finishing g1
    """
    trace = []
    def g1():
        try:
            trace.append("Starting g1")
            yield "g1 ham"
            yield from g2()
            yield "g1 eggs"
        finally:
            trace.append("Finishing g1")
    def g2():
        try:
            trace.append("Starting g2")
            yield "g2 spam"
            yield "g2 more spam"
        finally:
            trace.append("Finishing g2")
    try:
        g = g1()
        for i in range(2):
            x = next(g)
            trace.append("Yielded %s" % (x,))
        e = ValueError("tomato ejected")
        g.throw(e)
    except ValueError:
        pass
    else:
        trace.append("subgenerator failed to raise ValueError")
    return trace

def __test_value_attribute_of_StopIteration_exception():
    """
    StopIteration:
    value = None
    StopIteration: spam
    value = spam
    StopIteration: spam
    value = eggs
    """
    trace = []
    def pex(e):
        trace.append("%s: %s" % (e.__class__.__name__, e))
        trace.append("value = %s" % (e.value,))
    e = StopIteration()
    pex(e)
    e = StopIteration("spam")
    pex(e)
    e.value = "eggs"
    pex(e)
    return trace

def test_exception_value_crash():
    """
    >>> test_exception_value_crash()
    ['g2']
    """
    # There used to be a refcount error in CPython when the return value
    # stored in the StopIteration has a refcount of 1.
    def g1():
        yield from g2()
    def g2():
        yield "g2"
        return [42]
    return list(g1())

def test_generator_return_value():
    """
    >>> _lines(test_generator_return_value())
    Starting g1
    Yielded g1 ham
    Starting g2
    Yielded g2 spam
    Yielded g2 more spam
    Finishing g2
    g2 returned None
    Starting g2
    Yielded g2 spam
    Yielded g2 more spam
    Finishing g2
    g2 returned 42
    Yielded g1 eggs
    Finishing g1
    """
    trace = []
    def g1():
        trace.append("Starting g1")
        yield "g1 ham"
        ret = yield from g2()
        trace.append("g2 returned %s" % (ret,))
        ret = yield from g2(42)
        trace.append("g2 returned %s" % (ret,))
        yield "g1 eggs"
        trace.append("Finishing g1")
    def g2(v = None):
        trace.append("Starting g2")
        yield "g2 spam"
        yield "g2 more spam"
        trace.append("Finishing g2")
        if v:
            return v
    for x in g1():
        trace.append("Yielded %s" % (x,))
    return trace

def test_delegation_of_next_to_non_generator():
    """
    >>> _lines(test_delegation_of_next_to_non_generator())
    Yielded 0
    Yielded 1
    Yielded 2
    """
    trace = []
    def g():
        yield from range(3)
    for x in g():
        trace.append("Yielded %s" % (x,))
    return trace

def test_conversion_of_sendNone_to_next():
    """
    >>> _lines(test_conversion_of_sendNone_to_next())
    Yielded: 0
    Yielded: 1
    Yielded: 2
    """
    trace = []
    def g():
        yield from range(3)
    gi = g()
    for x in range(3):
        y = gi.send(None)
        trace.append("Yielded: %s" % (y,))
    return trace

def test_delegation_of_close_to_non_generator():
    """
    >>> _lines(test_delegation_of_close_to_non_generator())
    starting g
    finishing g
    """
    trace = []
    def g():
        try:
            trace.append("starting g")
            yield from range(3)
            trace.append("g should not be here")
        finally:
            trace.append("finishing g")
    gi = g()
    next(gi)
    gi.close()
    return trace

def test_delegating_throw_to_non_generator():
    """
    >>> _lines(test_delegating_throw_to_non_generator())
    Starting g
    Yielded 0
    Yielded 1
    Yielded 2
    Yielded 3
    Yielded 4
    Finishing g
    """
    trace = []
    def g():
        try:
            trace.append("Starting g")
            yield from range(10)
        finally:
            trace.append("Finishing g")
    try:
        gi = g()
        for i in range(5):
            x = next(gi)
            trace.append("Yielded %s" % (x,))
        e = ValueError("tomato ejected")
        gi.throw(e)
    except ValueError:
        pass
    else:
        trace.append("subgenerator failed to raise ValueError")
    return trace

def test_attempting_to_send_to_non_generator():
    """
    >>> _lines(test_attempting_to_send_to_non_generator())
    starting g
    finishing g
    """
    trace = []
    def g():
        try:
            trace.append("starting g")
            yield from range(3)
            trace.append("g should not be here")
        finally:
            trace.append("finishing g")
    try:
        gi = g()
        next(gi)
        for x in range(3):
            y = gi.send(42)
            trace.append("Should not have yielded: %s" % y)
    except AttributeError:
        pass
    else:
        trace.append("was able to send into non-generator")
    return trace

def test_broken_getattr_handling():
    """
    >>> test_broken_getattr_handling()
    []
    """
    class Broken:
        def __iter__(self):
            return self
        def __next__(self):
            return 1
        next = __next__
        def __getattr__(self, attr):
            1/0

    def g():
        yield from Broken()

    not_raised = []
    try:
        gi = g()
        assert next(gi) == 1
        gi.send(1)
    except ZeroDivisionError:
        pass
    else:
        not_raised.append(1)

    try:
        gi = g()
        assert next(gi) == 1
        gi.throw(AttributeError)
    except ZeroDivisionError:
        pass
    else:
        not_raised.append(2)

    """
    # this currently only calls PyErr_WriteUnraisable() and doesn't raise ...
    try:
        gi = g()
        assert next(gi) == 1
        gi.close()
    except ZeroDivisionError:
        pass
    else:
        not_raised.append(3)
    """
    gi = g()
    assert next(gi) == 1
    gi.close()

    return not_raised

def test_exception_in_initial_next_call():
    """
    >>> _lines(test_exception_in_initial_next_call())
    g1 about to yield from g2
    """
    trace = []
    def g1():
        trace.append("g1 about to yield from g2")
        yield from g2()
        trace.append("g1 should not be here")
    def g2():
        yield 1/0
    def run():
        gi = g1()
        next(gi)
    try:
        run()
    except ZeroDivisionError:
        pass
    else:
        trace.append("ZeroDivisionError not raised")
    return trace

def test_attempted_yield_from_loop():
    """
    >>> _lines(test_attempted_yield_from_loop())
    g1: starting
    Yielded: y1
    g1: about to yield from g2
    g2: starting
    Yielded: y2
    g2: about to yield from g1
    """
    trace = []
    def g1():
        trace.append("g1: starting")
        yield "y1"
        trace.append("g1: about to yield from g2")
        yield from g2()
        trace.append("g1 should not be here")

    def g2():
        trace.append("g2: starting")
        yield "y2"
        trace.append("g2: about to yield from g1")
        yield from gi
        trace.append("g2 should not be here")
    try:
        gi = g1()
        for y in gi:
            trace.append("Yielded: %s" % (y,))
    except ValueError:
        pass # "generator already executing"
    else:
        trace.append("subgenerator didn't raise ValueError")
    return trace

def test_attempted_reentry():
    """
    >>> _lines(test_attempted_reentry())
    g1: starting
    Yielded: y1
    g1: about to yield from g2
    g2: starting
    Yielded: y2
    g2: about to yield from g1
    g2: caught ValueError
    Yielded: y3
    g1: after delegating to g2
    Yielded: y4
    """
    trace = []
    def g1():
        trace.append("g1: starting")
        yield "y1"
        trace.append("g1: about to yield from g2")
        yield from g2()
        trace.append("g1: after delegating to g2")
        yield "y4"

    def g2():
        trace.append("g2: starting")
        yield "y2"
        trace.append("g2: about to yield from g1")
        try:
            yield from gi
        except ValueError:
            trace.append("g2: caught ValueError")
        else:
            trace.append("g1 did not raise ValueError on reentry")
        yield "y3"
    gi = g1()
    for y in gi:
        trace.append("Yielded: %s" % (y,))
    return trace

def test_returning_value_from_delegated_throw():
    """
    >>> _lines(test_returning_value_from_delegated_throw())
    Starting g1
    Yielded g1 ham
    Starting g2
    Yielded g2 spam
    Caught LunchError in g2
    Yielded g2 yet more spam
    Yielded g1 eggs
    Finishing g1
    """
    trace = []
    def g1():
        try:
            trace.append("Starting g1")
            yield "g1 ham"
            yield from g2()
            yield "g1 eggs"
        finally:
            trace.append("Finishing g1")
    def g2():
        try:
            trace.append("Starting g2")
            yield "g2 spam"
            yield "g2 more spam"
        except LunchError:
            trace.append("Caught LunchError in g2")
            yield "g2 lunch saved"
            yield "g2 yet more spam"
    class LunchError(Exception):
        pass
    g = g1()
    for i in range(2):
        x = next(g)
        trace.append("Yielded %s" % (x,))
    e = LunchError("tomato ejected")
    g.throw(e)
    for x in g:
        trace.append("Yielded %s" % (x,))
    return trace

def test_next_and_return_with_value():
    """
    >>> _lines(test_next_and_return_with_value())
    g starting
    f resuming g
    g returning None
    f caught StopIteration
    g starting
    f resuming g
    g returning 42
    f caught StopIteration
    """
    trace = []
    def f(r):
        gi = g(r)
        next(gi)
        try:
            trace.append("f resuming g")
            next(gi)
            trace.append("f SHOULD NOT BE HERE")
        except StopIteration:
            trace.append("f caught StopIteration")
    def g(r):
        trace.append("g starting")
        yield
        trace.append("g returning %s" % (r,))
        return r
    f(None)
    f(42)
    return trace

def test_send_and_return_with_value():
    """
    >>> _lines(test_send_and_return_with_value())
    g starting
    f sending spam to g
    g received spam
    g returning None
    f caught StopIteration
    g starting
    f sending spam to g
    g received spam
    g returning 42
    f caught StopIteration
    """
    trace = []
    def f(r):
        gi = g(r)
        next(gi)
        try:
            trace.append("f sending spam to g")
            gi.send("spam")
            trace.append("f SHOULD NOT BE HERE")
        except StopIteration:
            trace.append("f caught StopIteration")
    def g(r):
        trace.append("g starting")
        x = yield
        trace.append("g received %s" % (x,))
        trace.append("g returning %s" % (r,))
        return r
    f(None)
    f(42)
    return trace

def test_catching_exception_from_subgen_and_returning():
    """
    Test catching an exception thrown into a
    subgenerator and returning a value

    >>> _lines(test_catching_exception_from_subgen_and_returning())
    1
    inner caught ValueError
    inner returned 2 to outer
    2
    """
    trace = []
    def inner():
        try:
            yield 1
        except ValueError:
            trace.append("inner caught ValueError")
        return 2

    def outer():
        v = yield from inner()
        trace.append("inner returned %r to outer" % v)
        yield v
    g = outer()
    trace.append(next(g))
    trace.append(g.throw(ValueError))
    return trace

def test_throwing_GeneratorExit_into_subgen_that_returns():
    """
    Test throwing GeneratorExit into a subgenerator that
    catches it and returns normally.

    >>> _lines(test_throwing_GeneratorExit_into_subgen_that_returns())
    Enter g
    Enter f
    """
    trace = []
    def f():
        try:
            trace.append("Enter f")
            yield
            trace.append("Exit f")
        except GeneratorExit:
            return
    def g():
        trace.append("Enter g")
        yield from f()
        trace.append("Exit g")
    try:
        gi = g()
        next(gi)
        gi.throw(GeneratorExit)
    except GeneratorExit:
        pass
    else:
        trace.append("subgenerator failed to raise GeneratorExit")
    return trace

def test_throwing_GeneratorExit_into_subgenerator_that_yields():
    """
    Test throwing GeneratorExit into a subgenerator that
    catches it and yields.

    >>> _lines(test_throwing_GeneratorExit_into_subgenerator_that_yields())
    Enter g
    Enter f
    """
    trace = []
    def f():
        try:
            trace.append("Enter f")
            yield
            trace.append("Exit f")
        except GeneratorExit:
            yield
    def g():
        trace.append("Enter g")
        yield from f()
        trace.append("Exit g")
    try:
        gi = g()
        next(gi)
        gi.throw(GeneratorExit)
    except RuntimeError:
        pass # "generator ignored GeneratorExit"
    else:
        trace.append("subgenerator failed to raise GeneratorExit")
    return trace

def test_throwing_GeneratorExit_into_subgen_that_raises():
    """
    Test throwing GeneratorExit into a subgenerator that
    catches it and raises a different exception.

    >>> _lines(test_throwing_GeneratorExit_into_subgen_that_raises())
    Enter g
    Enter f
    """
    trace = []
    def f():
        try:
            trace.append("Enter f")
            yield
            trace.append("Exit f")
        except GeneratorExit:
            raise ValueError("Vorpal bunny encountered")
    def g():
        trace.append("Enter g")
        yield from f()
        trace.append("Exit g")
    try:
        gi = g()
        next(gi)
        gi.throw(GeneratorExit)
    except ValueError:
        pass # "Vorpal bunny encountered"
    else:
        trace.append("subgenerator failed to raise ValueError")
    return trace

def test_yield_from_empty():
    """
    >>> test_yield_from_empty()
    """
    def g():
        yield from ()
    try:
        next(g())
    except StopIteration:
        pass
    else:
        return "FAILED"

# test re-entry guards

def _reentering_gen():
    def one():
        yield 0
        yield from two()
        yield 3
    def two():
        yield 1
        try:
            yield from g1
        except ValueError:
            pass
        yield 2
    g1 = one()
    return g1

def test_delegating_generators_claim_to_be_running_next():
    """
    >>> test_delegating_generators_claim_to_be_running_next()
    [0, 1, 2, 3]
    """
    return list(_reentering_gen())

def test_delegating_generators_claim_to_be_running_send():
    """
    >>> test_delegating_generators_claim_to_be_running_send()
    [0, 1, 2, 3]
    """
    g1 = _reentering_gen()
    res = [next(g1)]
    try:
        while True:
            res.append(g1.send(42))
    except StopIteration:
        pass
    return res

def test_delegating_generators_claim_to_be_running_throw():
    """
    >>> test_delegating_generators_claim_to_be_running_throw()
    [0, 1, 2, 3]
    """
    class MyErr(Exception):
        pass
    def one():
        try:
            yield 0
        except MyErr:
            pass
        yield from two()
        try:
            yield 3
        except MyErr:
            pass
    def two():
        try:
            yield 1
        except MyErr:
            pass
        try:
            yield from g1
        except ValueError:
            pass
        try:
            yield 2
        except MyErr:
            pass
    g1 = one()
    res = [next(g1)]
    try:
        while True:
            res.append(g1.throw(MyErr))
    except StopIteration:
        pass
    return res

def test_delegating_generators_claim_to_be_running_close():
    """
    >>> test_delegating_generators_claim_to_be_running_close()
    42
    """
    class MyIt(object):
        def __iter__(self):
            return self
        def __next__(self):
            return 42
        next = __next__
        def close(self):
            assert g1.gi_running
            try:
                next(g1)
            except ValueError:
                pass # guard worked
            else:
                assert False, "re-entry guard failed to bark"
    def one():
        yield from MyIt()
    g1 = one()
    ret = next(g1)
    g1.close()
    return ret