# -*- coding: utf-8 -*- # Copyright (C) 2018-2024 Nexedi SA and Contributors. # Kirill Smelkov <kirr@nexedi.com> # # This program is free software: you can Use, Study, Modify and Redistribute # it under the terms of the GNU General Public License version 3, or (at your # option) any later version, as published by the Free Software Foundation. # # You can also Link and Combine this program with other software covered by # the terms of any of the Free Software licenses or any of the Open Source # Initiative approved licenses and Convey the resulting work. Corresponding # source of such a combination shall include the source code for all other # software used. # # This program is distributed WITHOUT ANY WARRANTY; without even the implied # warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. # # See COPYING file for full licensing terms. # See https://www.nexedi.com/licensing for rationale and options. from __future__ import print_function, absolute_import from golang import go, chan, select, default, nilchan, _PanicError, func, panic, \ defer, recover, u from golang import sync from pytest import raises, mark, fail, skip from _pytest._code import Traceback from os.path import dirname import os, sys, inspect, importlib, traceback, doctest from subprocess import Popen, PIPE import six from six.moves import range as xrange import gc, weakref import re from golang import _golang_test from golang._golang_test import pywaitBlocked as waitBlocked, pylen_recvq as len_recvq, \ pylen_sendq as len_sendq, pypanicWhenBlocked as panicWhenBlocked # directories dir_golang = dirname(__file__) # .../pygolang/golang dir_pygolang = dirname(dir_golang) # .../pygolang dir_testprog = dir_golang + "/testprog" # .../pygolang/golang/testprog # pyx/c/c++ tests/benchmarks -> {test,bench}_pyx_* in caller's globals. def import_pyx_tests(modpath): mod = importlib.import_module(modpath) callf = inspect.currentframe().f_back # caller's frame callg = callf.f_globals # caller's globals tbre = re.compile("(test|bench)_(.+)") for f in dir(mod): m = tbre.match(f) if m is not None: kind, name = m.group(1), m.group(2) gf = kind + "_pyx_" + name # test_chan_nogil -> test_pyx_chan_nogil # define a python function with gf name (if we use f directly pytest # will say "cannot collect 'test_pyx_chan_nogil' because it is not a function") if kind == "test": def _(func=getattr(mod, f)): func() elif kind == "bench": def _(b, func=getattr(mod, f)): func(b) else: panic("unreachable") _.__name__ = gf callg[gf] = _ import_pyx_tests("golang._golang_test") # leaked goroutine behaviour check: done in separate process because we need # to test process termination exit there. def test_go_leaked(): pyrun([dir_testprog + "/golang_test_goleaked.py"], lsan=False) # there are on-purpose leaks in this test # benchmark go+join a thread/coroutine. # pyx/nogil mirror is in _golang_test.pyx def bench_go(b): done = chan() def _(): done.send(1) for i in xrange(b.N): go(_) done.recv() def test_chan(): # sync: pre-close vs send/recv ch = chan() ch.close() assert ch.recv() == None assert ch.recv_() == (None, False) assert ch.recv_() == (None, False) with panics("send on closed channel"): ch.send(0) with panics("close of closed channel"): ch.close() # sync: send vs recv ch = chan() def _(): ch.send(1) assert ch.recv() == 2 ch.close() go(_) assert ch.recv() == 1 ch.send(2) assert ch.recv_() == (None, False) assert ch.recv_() == (None, False) # sync: close vs send ch = chan() def _(): waitBlocked(ch.send) ch.close() go(_) with panics("send on closed channel"): ch.send(0) # close vs recv ch = chan() def _(): waitBlocked(ch.recv) ch.close() go(_) assert ch.recv_() == (None, False) # sync: close vs multiple recv ch = chan() done = chan() mu = sync.Mutex() s = set() def _(): assert ch.recv_() == (None, False) with mu: x = len(s) s.add(x) done.send(x) for i in range(3): go(_) ch.close() for i in range(3): done.recv() assert s == {0,1,2} # buffered ch = chan(3) done = chan() for _ in range(2): for i in range(3): assert len(ch) == i ch.send(i) assert len(ch) == i+1 for i in range(3): assert ch.recv_() == (i, True) assert len(ch) == 0 for i in range(3): ch.send(i) assert len(ch) == 3 def _(): waitBlocked(ch.send) assert ch.recv_() == (0, True) done.send('a') for i in range(1,4): assert ch.recv_() == (i, True) assert ch.recv_() == (None, False) done.send('b') go(_) ch.send(3) # will block without receiver assert done.recv() == 'a' ch.close() assert done.recv() == 'b' # buffered: releases objects in buffer on chan gc ch = chan(3) class Obj(object): pass obj1 = Obj(); w1 = weakref.ref(obj1); assert w1() is obj1 obj2 = Obj(); w2 = weakref.ref(obj2); assert w2() is obj2 ch.send(obj1) ch.send(obj2) del obj1 del obj2 gc.collect() assert w1() is not None assert w2() is not None ch = None gc.collect() # pypy needs another GC run: pychan does Py_DECREF on buffered objects, but # on pypy cpyext objects are not deallocated from Py_DECREF even if # ob_refcnt goes to zero - the deallocation is delayed until GC run. # see also: http://doc.pypy.org/en/latest/discussion/rawrefcount.html gc.collect() assert w1() is None assert w2() is None # test for buffered chan bug when ch._mu was released too early in _trysend. def test_chan_buf_send_vs_tryrecv_race(): # there was a bug when for buffered channel _trysend(ch) was releasing # ch._mu before further popping element from ch._dataq. If there was # another _tryrecv running concurrently to _trysend, that _tryrecv could # pop the element and _trysend would in turn try to pop on empty ch._dataq # leading to oops. The test tries to reproduce the following scenario: # # T1(recv) T2(send) T3(_tryrecv) # # recv(blocked) # # ch.mu.lock # ch.dataq.append(x) # ch.mu.unlock() # ch.mu.lock # ch.dataq.popleft() # # # oopses since T3 already # # popped the value # ch.dataq.popleft() ch = chan(1) # buffered done = chan() N = 1000 # T1: recv(blocked) def _(): for i in range(N): assert ch.recv() == i done.send(1) go(_) tryrecv_ctl = chan() # send <-> _tryrecv sync # T2: send after recv is blocked -> _trysend succeeds def _(): for i in range(N): waitBlocked(ch.recv) # ch.recv() ^^^ entered ch._recvq tryrecv_ctl.send('start') # signal _tryrecv to start ch.send(i) assert tryrecv_ctl.recv() == 'done' # wait _tryrecv to finish done.send(1) go(_) # T3: _tryrecv running in parallel to _trysend def _(): for i in range(N): assert tryrecv_ctl.recv() == 'start' _, _rx = select( ch.recv, # 0 default, # 1 ) assert (_, _rx) == (1, None) tryrecv_ctl.send('done') done.send(1) go(_) for i in range(3): done.recv() # test for buffered chan bug when ch._mu was released too early in _tryrecv. def test_chan_buf_recv_vs_tryrecv_race(): # (see test_chan_buf_send_vs_tryrecv_race for similar problem description) # # T1(send) T2(recv) T3(_trysend) # # send(blocked) # # ch.mu.lock # ch.dataq.popleft() # send = _dequeWaiter(ch._sendq) # ch.mu.unlock() # # ch.mu.lock # len(ch.dataq) == 0 -> ok to append # # # erroneously succeeds sending while # # it must not # ch.dataq.append(x) # # ch.dataq.append(send.obj) ch = chan(1) # buffered done = chan() N = 1000 # T1: send(blocked) def _(): for i in range(1 + N): ch.send(i) done.send(1) go(_) trysend_ctl = chan() # recv <-> _trysend sync # T2: recv after send is blocked -> _tryrecv succeeds def _(): for i in range(N): waitBlocked(ch.send) # ch.send() ^^^ entered ch._sendq assert len(ch) == 1 # and 1 element was already buffered trysend_ctl.send('start') # signal _trysend to start assert ch.recv() == i assert trysend_ctl.recv() == 'done' # wait _trysend to finish done.send(1) go(_) # T3: _trysend running in parallel to _tryrecv def _(): for i in range(N): assert trysend_ctl.recv() == 'start' _, _rx = select( (ch.send, 'i%d' % i), # 0 default, # 1 ) assert (_, _rx) == (1, None), ('i%d' % i) trysend_ctl.send('done') done.send(1) go(_) for i in range(3): done.recv() # send/recv on the same channel in both directions. # this triggers https://bugs.python.org/issue38106 on MacOS. def test_chan_sendrecv_2way(): N = 1000 ch = chan() def _(): for i in range(N): assert ch.recv() == ('hello %d' % i) ch.send('world %d' % i) go(_) for i in range(N): ch.send('hello %d' % i) assert ch.recv() == ('world %d' % i) # benchmark sync chan send/recv. # pyx/nogil mirror is in _golang_test.pyx def bench_chan(b): ch = chan() done = chan() def _(): while 1: _, ok = ch.recv_() if not ok: done.close() return go(_) for i in xrange(b.N): ch.send(1) ch.close() done.recv() def test_select(): N = 1000 # times to do repeated select/chan or select/select interactions # sync: close vs select(send) ch = chan() def _(): waitBlocked(ch.send) ch.close() go(_) with panics("send on closed channel"): select((ch.send, 0)) # sync: close vs select(recv) ch = chan() def _(): waitBlocked(ch.recv) ch.close() go(_) assert select(ch.recv) == (0, None) # non-blocking try send: not ok ch = chan() for i in range(N): _, _rx = select( (ch.send, 0), default, ) assert (_, _rx) == (1, None) # non-blocking try recv: not ok for i in range(N): _, _rx = select( ch.recv, default, ) assert (_, _rx) == (1, None) _, _rx = select( ch.recv_, default, ) assert (_, _rx) == (1, None) # non-blocking try send: ok ch = chan() done = chan() def _(): i = 0 while 1: x = ch.recv() if x == 'stop': break assert x == i i += 1 done.close() go(_) for i in range(N): waitBlocked(ch.recv) _, _rx = select( (ch.send, i), default, ) assert (_, _rx) == (0, None) ch.send('stop') done.recv() # non-blocking try recv: ok ch = chan() done = chan() def _(): for i in range(N): ch.send(i) done.close() go(_) for i in range(N): waitBlocked(ch.send) if i % 2: _, _rx = select( ch.recv, default, ) assert (_, _rx) == (0, i) else: _, _rx = select( ch.recv_, default, ) assert (_, _rx) == (0, (i, True)) done.recv() # blocking 2·send ch1 = chan() ch2 = chan() done = chan() def _(): while 1: waitBlocked(ch1.send) x = ch1.recv() if x == 'stop': break assert x == 'a' done.close() go(_) for i in range(N): _, _rx = select( (ch1.send, 'a'), (ch2.send, 'b'), ) assert (_, _rx) == (0, None) ch1.send('stop') done.recv() assert len_sendq(ch1) == len_recvq(ch1) == 0 assert len_sendq(ch2) == len_recvq(ch2) == 0 # blocking 2·recv ch1 = chan() ch2 = chan() done = chan() def _(): for i in range(N): waitBlocked(ch1.recv) ch1.send('a') done.close() go(_) for i in range(N): _, _rx = select( ch1.recv, ch2.recv, ) assert (_, _rx) == (0, 'a') done.recv() assert len_sendq(ch1) == len_recvq(ch1) == 0 assert len_sendq(ch2) == len_recvq(ch2) == 0 # blocking send/recv ch1 = chan() ch2 = chan() done = chan() def _(): while 1: waitBlocked(ch1.send) x = ch1.recv() if x == 'stop': break assert x == 'a' done.close() go(_) for i in range(N): _, _rx = select( (ch1.send, 'a'), ch2.recv, ) assert (_, _rx) == (0, None) ch1.send('stop') done.recv() assert len_sendq(ch1) == len_recvq(ch1) == 0 assert len_sendq(ch2) == len_recvq(ch2) == 0 # blocking recv/send ch1 = chan() ch2 = chan() done = chan() def _(): for i in range(N): waitBlocked(ch1.recv) ch1.send('a') done.close() go(_) for i in range(N): _, _rx = select( ch1.recv, (ch2.send, 'b'), ) assert (_, _rx) == (0, 'a') done.recv() assert len_sendq(ch1) == len_recvq(ch1) == 0 assert len_sendq(ch2) == len_recvq(ch2) == 0 # blocking send + nil channel z = nilchan for i in range(N): ch = chan() done = chan() def _(): waitBlocked(ch.send) assert ch.recv() == 'c' done.close() go(_) _, _rx = select( z.recv, (z.send, 0), (ch.send, 'c'), ) assert (_, _rx) == (2, None) done.recv() assert len_sendq(ch) == len_recvq(ch) == 0 # blocking recv + nil channel for i in range(N): ch = chan() done = chan() def _(): waitBlocked(ch.recv) ch.send('d') done.close() go(_) _, _rx = select( z.recv, (z.send, 0), ch.recv, ) assert (_, _rx) == (2, 'd') done.recv() assert len_sendq(ch) == len_recvq(ch) == 0 # buffered ping-pong ch = chan(1) for i in range(N): _, _rx = select( (ch.send, i), ch.recv, ) assert _ == (i % 2) assert _rx == (i - 1 if i % 2 else None) # select vs select # channels are recreated on every iteration. for i in range(N): ch1 = chan() ch2 = chan() done = chan() def _(): _, _rx = select( (ch1.send, 'a'), (ch2.send, 'xxx2'), ) assert (_, _rx) == (0, None) _, _rx = select( (ch1.send, 'yyy2'), ch2.recv, ) assert (_, _rx) == (1, 'b') done.close() go(_) _, _rx = select( ch1.recv, (ch2.send, 'xxx1'), ) assert (_, _rx) == (0, 'a') _, _rx = select( (ch1.send, 'yyy1'), (ch2.send, 'b'), ) assert (_, _rx) == (1, None) done.recv() assert len_sendq(ch1) == len_recvq(ch1) == 0 assert len_sendq(ch2) == len_recvq(ch2) == 0 # select vs select # channels are shared for all iterations. # (this tries to trigger parasitic effects from already performed select) ch1 = chan() ch2 = chan() done = chan() def _(): for i in range(N): _, _rx = select( (ch1.send, 'a%d' % i), (ch2.send, 'xxx2'), ) assert (_, _rx) == (0, None) _, _rx = select( (ch1.send, 'yyy2'), ch2.recv, ) assert (_, _rx) == (1, 'b%d' % i) done.close() go(_) for i in range(N): _, _rx = select( ch1.recv, (ch2.send, 'xxx1'), ) assert (_, _rx) == (0, 'a%d' % i) _, _rx = select( (ch1.send, 'yyy1'), (ch2.send, 'b%d' % i), ) assert (_, _rx) == (1, None) done.recv() assert len_sendq(ch1) == len_recvq(ch1) == 0 assert len_sendq(ch2) == len_recvq(ch2) == 0 # verify that select does not leak references to passed objects. @mark.skipif(not hasattr(sys, 'getrefcount'), # skipped e.g. on PyPy reason="needs sys.getrefcount") def test_select_refleak(): ch1 = chan() ch2 = chan() obj1 = object() obj2 = object() tx1 = (ch1.send, obj1) tx2 = (ch2.send, obj2) # normal exit gc.collect() nref1 = sys.getrefcount(obj1) nref2 = sys.getrefcount(obj2) _, _rx = select( tx1, # 0 tx2, # 1 default, # 2 ) assert (_, _rx) == (2, None) gc.collect() assert sys.getrefcount(obj1) == nref1 gc.collect() assert sys.getrefcount(obj1) == nref2 # abnormal exit with raises(AttributeError) as exc: select( tx1, # 0 tx2, # 1 'zzz', # 2 causes pyselect to panic ) assert exc.value.args == ("'str' object has no attribute '__self__'",) gc.collect() assert sys.getrefcount(obj1) == nref1 gc.collect() assert sys.getrefcount(obj1) == nref2 # benchmark sync chan send vs recv on select side. # pyx/nogil mirror is in _golang_test.pyx def bench_select(b): ch1 = chan() ch2 = chan() done = chan() def _(): while 1: _, _rx = select( ch1.recv_, # 0 ch2.recv_, # 1 ) if _ == 0: _, ok = _rx if not ok: done.close() return go(_) _ = (ch1, ch2) for i in xrange(b.N): ch = _[i%2] ch.send(1) ch1.close() done.recv() def test_blockforever(): with panicWhenBlocked(): _test_blockforever() def _test_blockforever(): z = nilchan assert len(z) == 0 assert repr(z) == "nilchan" with panics("t: blocks forever"): z.send(0) with panics("t: blocks forever"): z.recv() with panics("close of nil channel"): z.close() # to fully cover nilchan ops # select{} & nil-channel only with panics("t: blocks forever"): select() with panics("t: blocks forever"): select((z.send, 0)) with panics("t: blocks forever"): select(z.recv) with panics("t: blocks forever"): select((z.send, 1), z.recv) # verify chan(dtype=X) functionality. def test_chan_dtype_invalid(): with raises(TypeError) as exc: chan(dtype="BadType") assert exc.value.args == ("pychan: invalid dtype: 'BadType'",) chantypev = [ # dtype obj zero-obj ('object', 'abc', None), ('C.structZ', None, None), ('C.bool', True, False), ('C.int', 4, 0), ('C.double', 3.14, 0.0), ] @mark.parametrize('dtype,obj,zobj', chantypev) def test_chan_dtype(dtype, obj, zobj): # py -> py (pysend/pyrecv; buffered) ch = chan(1, dtype=dtype) ch.send(obj) obj2, ok = ch.recv_() assert ok == True assert type(obj2) is type(obj) assert obj2 == obj # send with different type - rejected for (dtype2, obj2, _) in chantypev: if dtype2 == dtype or dtype == "object": continue # X -> X; object accepts *, if (dtype2, dtype) == ('C.int', 'C.double'): # int -> double ok continue with raises(TypeError) as exc: ch.send(obj2) # XXX we can implement vvv, but it will potentially hide cause error # XXX (or use raise from?) #assert exc.value.args == ("type mismatch: expect %s; got %r" % (dtype, obj2),) with raises(TypeError) as exc: select((ch.send, obj2)) # py -> py (pyclose/pyrecv) ch.close() obj2, ok = ch.recv_() assert ok == False assert type(obj2) is type(zobj) assert obj2 == zobj # below tests are for py <-> c interaction if dtype == "object": return ctype = dtype[2:] # C.int -> int ch = chan(dtype=dtype) # recreate after close; mode=synchronous # recv/send/close via C def crecv(ch): return getattr(_golang_test, "pychan_%s_recv" % ctype)(ch) def csend(ch, obj): getattr(_golang_test, "pychan_%s_send" % ctype)(ch, obj) def cclose(ch): getattr(_golang_test, "pychan_%s_close" % ctype)(ch) # py -> c (pysend/crecv) rx = chan() def _(): _ = crecv(ch) rx.send(_) go(_) ch.send(obj) obj2 = rx.recv() assert type(obj2) is type(obj) assert obj2 == obj # py -> c (pyselect/crecv) rx = chan() def _(): _ = crecv(ch) rx.send(_) go(_) _, _rx = select( (ch.send, obj), # 0 ) assert (_, _rx) == (0, None) obj2 = rx.recv() assert type(obj2) is type(obj) assert obj2 == obj # py -> c (pyclose/crecv) rx = chan() def _(): _ = crecv(ch) rx.send(_) go(_) ch.close() obj2 = rx.recv() assert type(obj2) is type(zobj) assert obj2 == zobj ch = chan(dtype=dtype) # recreate after close # py <- c (pyrecv/csend) def _(): csend(ch, obj) go(_) obj2 = ch.recv() assert type(obj2) is type(obj) assert obj2 == obj # py <- c (pyselect/csend) def _(): csend(ch, obj) go(_) _, _rx = select( ch.recv, # 0 ) assert _ == 0 obj2 = _rx assert type(obj2) is type(obj) assert obj2 == obj # py <- c (pyrecv/cclose) def _(): cclose(ch) go(_) obj2 = ch.recv() assert type(obj2) is type(zobj) assert obj2 == zobj @mark.parametrize('dtype', [_[0] for _ in chantypev]) def test_chan_dtype_misc(dtype): nilch = chan.nil(dtype) # nil repr if dtype == "object": assert repr(nilch) == "nilchan" else: assert repr(nilch) == ("chan.nil(%r)" % dtype) # optimization: nil[X]() -> always same object nilch_ = chan.nil(dtype) assert nilch is nilch_ if dtype == "object": assert nilch is nilchan assert hash(nilch) == hash(nilchan) assert (nilch == nilch) # nil[X] == nil[X] assert not (nilch != nilch) assert (nilch == nilchan) # nil[X] == nil[*] assert not (nilch != nilchan) assert (nilchan == nilch) # nil[*] == nil[X] assert not (nilchan != nilch) # channels can be compared, different channels differ assert nilch != None # just in case ch1 = chan(dtype=dtype) ch2 = chan(dtype=dtype) ch3 = chan() assert ch1 != ch2; assert not (ch1 == ch2); assert ch1 == ch1; assert not (ch1 != ch1) assert ch1 != ch3; assert not (ch1 == ch3); assert ch2 == ch2; assert not (ch2 != ch2) assert ch2 != ch3; assert not (ch2 == ch3); assert ch3 == ch3; assert not (ch3 != ch3) assert hash(nilch) != hash(ch1) assert hash(nilch) != hash(ch2) assert hash(nilch) != hash(ch3) assert nilch != ch1; assert not (nilch == ch1) assert nilch != ch2; assert not (nilch == ch2) assert nilch != ch3; assert not (nilch == ch3) # .nil on chan instance XXX doesn't work (yet ?) """ ch = chan() # non-nil chan object instance with raises(AttributeError): ch.nil """ # nil[X] vs nil[Y] for (dtype2, _, _) in chantypev: nilch2 = chan.nil(dtype2) # nil[*] stands for untyped nil - it is equal to nil[X] for ∀ X if dtype == "object" or dtype2 == "object": if dtype != dtype2: assert nilch is not nilch2 assert hash(nilch) == hash(nilch2) assert (nilch == nilch2) == True assert (nilch2 == nilch) == True assert (nilch != nilch2) == False assert (nilch2 != nilch) == False continue # nil[X] == nil[X] if dtype == dtype2: assert hash(nilch) == hash(nilch2) assert (nilch == nilch2) == True assert (nilch2 == nilch) == True assert (nilch != nilch2) == False assert (nilch2 != nilch) == False continue # nil[X] != nil[Y] assert nilch is not nilch2 assert (nilch == nilch2) == False assert (nilch2 == nilch) == False assert (nilch != nilch2) == True assert (nilch2 != nilch) == True def test_func(): # test how @func(cls) works # this also implicitly tests just @func, since @func(cls) uses that. class MyClass: def __init__(self, v): self.v = v zzz = zzz_orig = 'z' # `@func(MyClass) def zzz` must not override zzz @func(MyClass) def zzz(self, v, x=2, **kkkkwww): assert self.v == v return v + 1 assert zzz is zzz_orig assert zzz == 'z' mstatic = mstatic_orig = 'mstatic' @func(MyClass) @staticmethod def mstatic(v): assert v == 5 return v + 1 assert mstatic is mstatic_orig assert mstatic == 'mstatic' mcls = mcls_orig = 'mcls' @func(MyClass) @classmethod def mcls(cls, v): assert cls is MyClass assert v == 7 return v + 1 assert mcls is mcls_orig assert mcls == 'mcls' # undefined var after `@func(cls) def var` should be not set assert 'var' not in locals() @func(MyClass) def var(self, v): assert v == 8 return v + 1 gc.collect() # pypy needs this to trigger _DelAttrAfterMeth GC assert 'var' not in locals() obj = MyClass(4) assert obj.zzz(4) == 4 + 1 assert obj.mstatic(5) == 5 + 1 assert obj.mcls(7) == 7 + 1 assert obj.var(8) == 8 + 1 # this tests that @func (used by @func(cls)) preserves decorated function signature assert fmtargspec(MyClass.zzz) == '(self, v, x=2, **kkkkwww)' assert MyClass.zzz.__module__ == __name__ assert MyClass.zzz.__name__ == 'zzz' assert MyClass.mstatic.__module__ == __name__ assert MyClass.mstatic.__name__ == 'mstatic' assert MyClass.mcls.__module__ == __name__ assert MyClass.mcls.__name__ == 'mcls' assert MyClass.var.__module__ == __name__ assert MyClass.var.__name__ == 'var' # test that func·func = func (double _func calls will be done internally for # getter when handling @func(@MyClass.vproperty.setter) def f(): pass g = func(f) h = func(g) assert h is g # @func overhead at def time. def bench_def(b): for i in xrange(b.N): def _(): pass def bench_func_def(b): for i in xrange(b.N): @func def _(): pass # @func overhead at call time. def bench_call(b): def _(): pass for i in xrange(b.N): _() def bench_func_call(b): @func def _(): pass for i in xrange(b.N): _() def test_deferrecover(): # regular defer calls v = [] @func def _(): defer(lambda: v.append(1)) defer(lambda: v.append(2)) defer(lambda: v.append(3)) _() assert v == [3, 2, 1] # defers called even if exception is raised v = [] @func def _(): defer(lambda: v.append(1)) defer(lambda: v.append(2)) def _(): v.append('ran ok') defer(_) 1/0 with raises(ZeroDivisionError): _() assert v == ['ran ok', 2, 1] # defer without @func is caught and properly reported v = [] def nofunc(): defer(lambda: v.append('xx')) with panics("function nofunc uses defer, but not @func"): nofunc() # panic in deferred call - all defers are called v = [] @func def _(): defer(lambda: v.append(1)) defer(lambda: v.append(2)) defer(lambda: panic(3)) defer(lambda: v.append(4)) with panics(3): _() assert v == [4, 2, 1] # defer + recover v = [] @func def _(): defer(lambda: v.append(1)) def _(): r = recover() assert r == "aaa" v.append('recovered ok') defer(_) defer(lambda: v.append(3)) panic("aaa") _() assert v == [3, 'recovered ok', 1] # recover + panic in defer v = [] @func def _(): defer(lambda: v.append(1)) defer(lambda: panic(2)) def _(): r = recover() assert r == "bbb" v.append('recovered 1') defer(_) defer(lambda: v.append(3)) panic("bbb") with panics(2): _() assert v == [3, 'recovered 1', 1] # recover + panic in defer + recover v = [] @func def _(): defer(lambda: v.append(1)) def _(): r = recover() assert r == "ddd" v.append('recovered 2') defer(_) defer(lambda: panic("ddd")) def _(): r = recover() assert r == "ccc" v.append('recovered 1') defer(_) defer(lambda: v.append(3)) panic("ccc") _() assert v == [3, 'recovered 1', 'recovered 2', 1] # ---- recover() -> None ---- # no exception / not under defer assert recover() is None # no exception/panic @func def _(): def _(): assert recover() is None defer(_) _() # not directly called by deferred func v = [] @func def _(): def f(): assert recover() is None v.append('not recovered') defer(lambda: f()) panic("zzz") with panics("zzz"): _() assert v == ['not recovered'] # ---- defer in @func(x) ---- # defer in @func(cls) v = [] class MyClass: pass @func(MyClass) def zzz(self): defer(lambda: v.append(1)) defer(lambda: v.append(2)) defer(lambda: v.append(3)) obj = MyClass() obj.zzz() assert v == [3, 2, 1] # defer in std method v = [] class MyClass: @func def method(self): defer(lambda: v.append(1)) defer(lambda: v.append(2)) defer(lambda: v.append(4)) obj = MyClass() obj.method() assert v == [4, 2, 1] # defer in std @staticmethod v = [] class MyClass: @func @staticmethod def mstatic(): defer(lambda: v.append(1)) defer(lambda: v.append(2)) defer(lambda: v.append(5)) MyClass.mstatic() assert v == [5, 2, 1] # defer in std @classmethod v = [] class MyClass: @func @classmethod def mcls(cls): assert cls is MyClass defer(lambda: v.append(1)) defer(lambda: v.append(2)) defer(lambda: v.append(7)) MyClass.mcls() assert v == [7, 2, 1] # verify that defer correctly establishes exception chain (even on py2). def test_defer_excchain(): # just @func/raise embeds traceback and adds ø chain @func def _(): raise RuntimeError("err") with raises(RuntimeError) as exci: _() e = exci.value assert type(e) is RuntimeError assert e.args == ("err",) assert e.__cause__ is None assert e.__context__ is None if six.PY3: # .__traceback__ for top-level exception is not set on py2 assert e.__traceback__ is not None tb = Traceback(e.__traceback__) assert tb[-1].name == "_" # exceptions in deferred calls are chained def d1(): raise RuntimeError("d1: aaa") @func def d2(): # NOTE regular raise inside @func 1/0 # which initially sets .__context__ to None @func def d3(): # d33->d32->d31 subchain that has to be correctly glued with neighbours as: # "d4: bbb" -> d33->d32->d31 -> 1/0 def d31(): raise RuntimeError("d31") def d32(): raise RuntimeError("d32") def d33(): raise RuntimeError("d33") defer(d33) defer(d32) defer(d31) def d4(): raise RuntimeError("d4: bbb") @func def _(): defer(d4) defer(d3) defer(d2) defer(d1) raise RuntimeError("err") with raises(RuntimeError) as exci: _() e4 = exci.value assert type(e4) is RuntimeError assert e4.args == ("d4: bbb",) assert e4.__cause__ is None assert e4.__context__ is not None if six.PY3: # .__traceback__ of top-level exception assert e4.__traceback__ is not None tb4 = Traceback(e4.__traceback__) assert tb4[-1].name == "d4" e33 = e4.__context__ assert type(e33) is RuntimeError assert e33.args == ("d33",) assert e33.__cause__ is None assert e33.__context__ is not None assert e33.__traceback__ is not None tb33 = Traceback(e33.__traceback__) assert tb33[-1].name == "d33" e32 = e33.__context__ assert type(e32) is RuntimeError assert e32.args == ("d32",) assert e32.__cause__ is None assert e32.__context__ is not None assert e32.__traceback__ is not None tb32 = Traceback(e32.__traceback__) assert tb32[-1].name == "d32" e31 = e32.__context__ assert type(e31) is RuntimeError assert e31.args == ("d31",) assert e31.__cause__ is None assert e31.__context__ is not None assert e31.__traceback__ is not None tb31 = Traceback(e31.__traceback__) assert tb31[-1].name == "d31" e2 = e31.__context__ assert type(e2) is ZeroDivisionError #assert e2.args == ("division by zero",) # text is different in between py23 assert e2.__cause__ is None assert e2.__context__ is not None assert e2.__traceback__ is not None tb2 = Traceback(e2.__traceback__) assert tb2[-1].name == "d2" e1 = e2.__context__ assert type(e1) is RuntimeError assert e1.args == ("d1: aaa",) assert e1.__cause__ is None assert e1.__context__ is not None assert e1.__traceback__ is not None tb1 = Traceback(e1.__traceback__) assert tb1[-1].name == "d1" e = e1.__context__ assert type(e) is RuntimeError assert e.args == ("err",) assert e.__cause__ is None assert e.__context__ is None assert e.__traceback__ is not None tb = Traceback(e.__traceback__) assert tb[-1].name == "_" # verify that recover breaks exception chain. @mark.xfail('PyPy' in sys.version and sys.version_info >= (3,) and sys.pypy_version_info < (7,3), reason="https://foss.heptapod.net/pypy/pypy/-/issues/3096") def test_defer_excchain_vs_recover(): @func def _(): def p1(): raise RuntimeError(1) defer(p1) def p2(): raise RuntimeError(2) defer(p2) def _(): r = recover() assert r == "aaa" defer(_) defer(lambda: panic("aaa")) with raises(RuntimeError) as exci: _() e1 = exci.value assert type(e1) is RuntimeError assert e1.args == (1,) assert e1.__cause__ is None assert e1.__context__ is not None if six.PY3: # .__traceback__ of top-level exception assert e1.__traceback__ is not None tb1 = Traceback(e1.__traceback__) assert tb1[-1].name == "p1" e2 = e1.__context__ assert type(e2) is RuntimeError assert e2.args == (2,) assert e2.__cause__ is None assert e2.__context__ is None # not chained to panic assert e2.__traceback__ is not None tb2 = Traceback(e2.__traceback__) assert tb2[-1].name == "p2" # verify that recover returns exception with .__traceback__ and excchain context set (even on py2). def test_recover_traceback_and_excchain(): # raise -> recover @func def f1(): def r1(): e = recover() assert e is not None assert type(e) is RuntimeError assert e.args == ("qqq",) assert e.__cause__ is None assert e.__context__ is None assert e.__suppress_context__ == False assert e.__traceback__ is not None tb = Traceback(e.__traceback__) assert tb[-1].name == "p1" assert tb[-2].name == "p2" assert tb[-3].name == "p3" assert tb[-4].name == "f1" # [-5] == _func._ defer(r1) def p1(): raise RuntimeError("qqq") def p2(): p1() def p3(): p2() p3() f1() # raise -> defer(raise2) -> recover @func def f2(): def r2(): e2 = recover() assert e2 is not None assert type(e2) is RuntimeError assert e2.args == ("epp2",) assert e2.__cause__ is None assert e2.__context__ is not None assert e2.__suppress_context__ == False assert e2.__traceback__ is not None t2 = Traceback(e2.__traceback__) assert t2[-1].name == "pp2" # [-2] == _GoFrame.__exit__ e1 = e2.__context__ assert type(e1) is RuntimeError assert e1.args == ("epp1",) assert e1.__cause__ is None assert e1.__context__ is None assert e1.__suppress_context__ == False assert e1.__traceback__ is not None t1 = Traceback(e1.__traceback__) assert t1[-1].name == "pp1" assert t1[-2].name == "f2" # [-3] == _func._ defer(r2) def pp2(): raise RuntimeError("epp2") defer(pp2) def pp1(): raise RuntimeError("epp1") pp1() f2() # raise -> recover -> wrap+reraise @func def f3(): def r3(): e1 = recover() assert e1 is not None e2 = RuntimeError("bad2") e2.__context__ = e1 raise e2 defer(r3) def bad1(): raise RuntimeError("bad1") bad1() with raises(RuntimeError) as exci: f3() e2 = exci.value assert type(e2) is RuntimeError assert e2.args == ("bad2",) assert e2.__cause__ is None assert e2.__context__ is not None if six.PY3: # .__traceback__ for top-level exception is not set on py2 assert e2.__traceback__ is not None t2 = Traceback(e2.__traceback__) assert t2[-1].name == "r3" e1 = e2.__context__ assert type(e1) is RuntimeError assert e1.args == ("bad1",) assert e1.__cause__ is None assert e1.__context__ is None assert e1.__traceback__ is not None t1 = Traceback(e1.__traceback__) assert t1[-1].name == "bad1" assert t1[-2].name == "f3" # verify that traceback.{print_exception,format_exception} work on chained # exception correctly. def test_defer_excchain_traceback(): # tbstr returns traceback that would be printed for exception e. def tbstr(e): fout_print = six.StringIO() traceback.print_exception(type(e), e, e.__traceback__, file=fout_print) lout_format = traceback.format_exception(type(e), e, e.__traceback__) out_print = fout_print.getvalue() out_format = "".join(lout_format) assert out_print == out_format return out_print # raise without @func/defer - must be printed correctly # (we patch traceback.print_exception & co on py2) def alpha(): def beta(): raise RuntimeError("gamma") beta() with raises(RuntimeError) as exci: alpha() e = exci.value if not hasattr(e, '__traceback__'): # py2 e.__traceback__ = exci.tb assertDoc("""\ Traceback (most recent call last): File "PYGOLANG/golang/golang_test.py", line ..., in test_defer_excchain_traceback alpha() File "PYGOLANG/golang/golang_test.py", line ..., in alpha beta() File "PYGOLANG/golang/golang_test.py", line ..., in beta raise RuntimeError("gamma") RuntimeError: gamma """, tbstr(e)) # raise in @func/chained defer @func def caller(): def q1(): raise RuntimeError("aaa") defer(q1) def q2(): raise RuntimeError("bbb") defer(q2) raise RuntimeError("ccc") with raises(RuntimeError) as exci: caller() e = exci.value if not hasattr(e, '__traceback__'): # py2 e.__traceback__ = exci.tb assertDoc("""\ Traceback (most recent call last): File "PYGOLANG/golang/__init__.py", line ..., in _goframe return f(*argv, **kw) ^^^^^^^^^^^^^^ +PY311 File "PYGOLANG/golang/golang_test.py", line ..., in caller raise RuntimeError("ccc") RuntimeError: ccc During handling of the above exception, another exception occurred: Traceback (most recent call last): File "PYGOLANG/golang/__init__.py", line ..., in __exit__ d() File "PYGOLANG/golang/golang_test.py", line ..., in q2 raise RuntimeError("bbb") RuntimeError: bbb During handling of the above exception, another exception occurred: Traceback (most recent call last): File "PYGOLANG/golang/golang_test.py", line ..., in test_defer_excchain_traceback caller() ... File "PYGOLANG/golang/__init__.py", line ..., in _goframe return f(*argv, **kw) -PY310 with __goframe__: +PY310 File "PYGOLANG/golang/__init__.py", line ..., in __exit__ d() -PY310 with __goframe__: +PY310 File "PYGOLANG/golang/__init__.py", line ..., in __exit__ d() File "PYGOLANG/golang/golang_test.py", line ..., in q1 raise RuntimeError("aaa") RuntimeError: aaa """, tbstr(e)) e.__suppress_context__ = True assertDoc("""\ Traceback (most recent call last): File "PYGOLANG/golang/golang_test.py", line ..., in test_defer_excchain_traceback caller() ... File "PYGOLANG/golang/__init__.py", line ..., in _goframe return f(*argv, **kw) -PY310 with __goframe__: +PY310 File "PYGOLANG/golang/__init__.py", line ..., in __exit__ d() -PY310 with __goframe__: +PY310 File "PYGOLANG/golang/__init__.py", line ..., in __exit__ d() File "PYGOLANG/golang/golang_test.py", line ..., in q1 raise RuntimeError("aaa") RuntimeError: aaa """, tbstr(e)) e.__cause__ = e.__context__ assertDoc("""\ Traceback (most recent call last): File "PYGOLANG/golang/__init__.py", line ..., in _goframe return f(*argv, **kw) ^^^^^^^^^^^^^^ +PY311 File "PYGOLANG/golang/golang_test.py", line ..., in caller raise RuntimeError("ccc") RuntimeError: ccc During handling of the above exception, another exception occurred: Traceback (most recent call last): File "PYGOLANG/golang/__init__.py", line ..., in __exit__ d() File "PYGOLANG/golang/golang_test.py", line ..., in q2 raise RuntimeError("bbb") RuntimeError: bbb The above exception was the direct cause of the following exception: Traceback (most recent call last): File "PYGOLANG/golang/golang_test.py", line ..., in test_defer_excchain_traceback caller() ... File "PYGOLANG/golang/__init__.py", line ..., in _goframe return f(*argv, **kw) -PY310 with __goframe__: +PY310 File "PYGOLANG/golang/__init__.py", line ..., in __exit__ d() -PY310 with __goframe__: +PY310 File "PYGOLANG/golang/__init__.py", line ..., in __exit__ d() File "PYGOLANG/golang/golang_test.py", line ..., in q1 raise RuntimeError("aaa") RuntimeError: aaa """, tbstr(e)) # verify that dump of unhandled chained exception traceback works correctly (even on py2). def test_defer_excchain_dump(): # run golang_test_defer_excchain.py and verify its output via doctest. tbok = readfile(dir_testprog + "/golang_test_defer_excchain.txt") retcode, stdout, stderr = _pyrun(["golang_test_defer_excchain.py"], cwd=dir_testprog, stdout=PIPE, stderr=PIPE) assert retcode != 0, (stdout, stderr) assert stdout == b"" assertDoc(tbok, stderr) # ----//---- (ipython) def test_defer_excchain_dump_ipython(): # ipython 8 changed traceback output significantly # we do not need to test it because we acticate ipython-related patch only # on py2 for which latest ipython version is 5. import IPython if six.PY3 and IPython.version_info >= (8,0): skip("ipython is patched only on py2; ipython8 changed traceback format") tbok = readfile(dir_testprog + "/golang_test_defer_excchain.txt-ipython") retcode, stdout, stderr = _pyrun(["-m", "IPython", "--quick", "--colors=NoColor", "-m", "golang_test_defer_excchain"], envadj={"COLUMNS": "80"}, # force ipython5 avoid thinking termwidth=0 cwd=dir_testprog, stdout=PIPE, stderr=PIPE) assert retcode == 0, (stdout, stderr) # ipython5 uses .pyc for filenames instead of .py stdout = re.sub(br'\.pyc\b', b'.py', stdout) # normalize .pyc -> .py assertDoc(tbok, stdout) assert b"Unknown failure executing module: <golang_test_defer_excchain>" in stderr # ----//---- (pytest) def test_defer_excchain_dump_pytest(): # pytest 7.4 also changed traceback output format # similarly to ipython we do not need to test it becase we activate # pytest-related patch only on py2 for which latest pytest version is 4.6.11 . import pytest if six.PY3 and getattr(pytest, 'version_tuple', (0,)) >= (7,4): skip("pytest is patched only on py2; pytest7.4 changed traceback format") tbok = readfile(dir_testprog + "/golang_test_defer_excchain.txt-pytest") retcode, stdout, stderr = _pyrun([ # don't let pytest emit internal deprecation warnings to stderr "-W", "ignore::DeprecationWarning", "-m", "pytest", "-o", "python_functions=main", "--tb=short", "golang_test_defer_excchain.py"], cwd=dir_testprog, stdout=PIPE, stderr=PIPE) assert retcode != 0, (stdout, stderr) assert stderr == b"" assertDoc(tbok, stdout) # defer overhead. def bench_try_finally(b): def fin(): pass def _(): try: pass finally: fin() for i in xrange(b.N): _() def bench_defer(b): def fin(): pass @func def _(): defer(fin) for i in xrange(b.N): _() # test_error lives in errors_test.py # strings tests live in golang_str_test.py # ---- misc ---- # _pyrun runs `sys.executable argv... <stdin`. # it returns exit code, stdout and stderr. def _pyrun(argv, stdin=None, stdout=None, stderr=None, **kw): # -> retcode, stdout, stderr pyexe = kw.pop('pyexe', sys.executable) argv = [pyexe] + argv # adjust $PYTHONPATH to point to pygolang. This makes sure that external # script will succeed on `import golang` when running in-tree. kw = kw.copy() pathv = [dir_pygolang] env = kw.pop('env', os.environ.copy()) envadj = kw.pop('envadj', {}) env.update(envadj) envpath = env.get('PYTHONPATH') if envpath is not None: pathv.extend(envpath.split(os.pathsep)) env['PYTHONPATH'] = os.pathsep.join(pathv) # set $PYTHONIOENCODING to encoding of stdin/stdout/stderr # we need to do it because on Windows `python x.py | ...` runs with stdio # encoding set to cp125X even if just `python x.py` runs with stdio # encoding=UTF-8. if 'PYTHONIOENCODING' not in env: enc = set([_.encoding for _ in (sys.stdin, sys.stdout, sys.stderr)]) if None in enc: # without -s pytest uses _pytest.capture.DontReadFromInput enc.remove(None) # with None .encoding assert len(enc) == 1 env['PYTHONIOENCODING'] = enc.pop() # disable LeakSanitizer if requested, e.g. when test is known to leak something on purpose lsan = kw.pop('lsan', True) if not lsan: env['ASAN_OPTIONS'] = env.get('ASAN_OPTIONS', '') + ',detect_leaks=0' p = Popen(argv, stdin=(PIPE if stdin else None), stdout=stdout, stderr=stderr, env=env, **kw) stdout, stderr = p.communicate(stdin) # on windows print emits \r\n instead of just \n # normalize that to \n in *out if os.name == 'nt': if stdout is not None: stdout = stdout.replace(b'\r\n', b'\n') if stderr is not None: stderr = stderr.replace(b'\r\n', b'\n') return p.returncode, stdout, stderr # pyrun runs `sys.executable argv... <stdin`. # it raises exception if ran command fails. def pyrun(argv, stdin=None, stdout=None, stderr=None, **kw): retcode, stdout, stderr = _pyrun(argv, stdin=stdin, stdout=stdout, stderr=stderr, **kw) if retcode: raise RuntimeError(' '.join(argv) + '\n' + (stderr and str(stderr) or '(failed)')) return stdout # pyout runs `sys.executable argv... <stdin` and returns its output. # it raises exception if ran command fails. def pyout(argv, stdin=None, stdout=PIPE, stderr=None, **kw): return pyrun(argv, stdin=stdin, stdout=stdout, stderr=stderr, **kw) # panics is similar to pytest.raises and asserts that wrapped code panics with arg. class panics: def __init__(self, arg): self.arg = arg def __enter__(self): self.raises = raises(_PanicError) self.exc_info = self.raises.__enter__() def __exit__(self, exc_type, exc_val, exc_tb): ok = self.raises.__exit__(exc_type, exc_val, exc_tb) if not ok: return ok # _PanicError raised - let's check panic argument assert self.exc_info.value.args == (self.arg,) return ok def test_panics(): # no panic -> "did not raise" with raises(raises.Exception, match="DID NOT RAISE"): with panics(""): pass # raise different type -> exception propagates with raises(RuntimeError, match="hello world"): with panics(""): raise RuntimeError("hello world") # panic with different argument with raises(AssertionError, match=r"assert \('bbb',\) == \('aaa',\)"): with panics("aaa"): panic("bbb") # panic with expected argument with panics(123): panic(123) # assertDoc asserts that want == got via doctest. # # in want: # - PYGOLANG means real pygolang prefix # - empty lines are changed to <BLANKLINE> def assertDoc(want, got): want = u(want) got = u(got) # normalize got to PYGOLANG udir_pygolang = abbrev_home(dir_pygolang) # /home/x/.../pygolang -> ~/.../pygolang got = got.replace(dir_pygolang, "PYGOLANG") # /home/x/.../pygolang -> PYGOLANG got = got.replace(udir_pygolang, "PYGOLANG") # ~/.../pygolang -> PYGOLANG # got: normalize PYGOLANG\a\b\c -> PYGOLANG/a/b/c # a\b\c\d.py -> a/b/c/d.py def _(m): return m.group(0).replace(os.path.sep, '/') got = re.sub(r"(?<=PYGOLANG)[^\s]+(?=\s)", _, got) got = re.sub(r"([\w\\\.]+)(?=\.py)", _, got) # want: process conditionals # PY39(...) -> ... if py ≥ 3.9 else ø (inline) # `... +PY39` -> ... if py ≥ 3.9 else ø (whole line) # `... -PY39` -> ... if py < 3.9 else ø (whole line) have = {} # 'PYxy' -> y/n for minor in (9,10,11): have['PY3%d' % minor] = (sys.version_info >= (3, minor)) for x, havex in have.items(): want = re.sub(r"%s\((.*)\)" % x, r"\1" if havex else "", want) r = re.compile(r'^(?P<main>.*?) +(?P<y>(\+|-))%s$' % x) v = [] for l in want.splitlines(): m = r.match(l) if m is not None: l = m.group('main') y = {'+':True, '-':False}[m.group('y')] if (y and not havex) or (havex and not y): continue v.append(l) want = '\n'.join(v)+'\n' # want: ^$ -> <BLANKLINE> while "\n\n" in want: want = want.replace("\n\n", "\n<BLANKLINE>\n") X = doctest.OutputChecker() if not X.check_output(want, got, doctest.ELLIPSIS): # output_difference wants Example object with .want attr class Ex: pass _ = Ex() _.want = want fail("not equal:\n" + X.output_difference(_, got, doctest.ELLIPSIS | doctest.REPORT_UDIFF)) # fmtargspec returns formatted arguments for function f. # # For example: # def f(x, y=3): # ... # fmtargspec(f) -> '(x, y=3)' def fmtargspec(f): # -> str # inspect.formatargspec is deprecated since py3.5 and was removed in py3.11 # -> use inspect.signature instead. if six.PY3: return str(inspect.signature(f)) else: return inspect.formatargspec(*inspect.getargspec(f)) def test_fmtargspec(): def f(x, y=3, z=4, *argv, **kw): pass assert fmtargspec(f) == '(x, y=3, z=4, *argv, **kw)' # readfile returns content of file @path. def readfile(path): # -> bytes # on windows in text mode files are opened with encoding=locale.getdefaultlocale() # which is CP125X instead of UTF-8. -> manually decode as 'UTF-8' with open(path, "rb") as f: return f.read() # abbrev_home returns path with user home prefix abbreviated with ~. def abbrev_home(path): home = os.path.expanduser('~') if path == home: return '~' if path.startswith(home+'/'): return '~'+path[len(home):] return path