Commit c13bfd11 authored by Kevin Modzelewski's avatar Kevin Modzelewski

Merge pull request #1089 from undingen/minor_compat6

fix misc minor compatibility issues
parents e3205e38 dce102f3
......@@ -7,7 +7,8 @@ import re
import os.path
import tempfile
import subprocess
import py_compile
# Pyston change: we can't import this currently
# import py_compile
import contextlib
import shutil
try:
......
# expected: fail
# Adapted from test_file.py by Daniel Stutzbach
from __future__ import unicode_literals
......@@ -29,6 +28,8 @@ class AutoFileTests(unittest.TestCase):
self.f.close()
os.remove(TESTFN)
# Pyston change: disable this test becasue of GC
@unittest.skip("only works with refcounting")
def testWeakRefs(self):
# verify weak references
p = proxy(self.f)
......
......@@ -347,19 +347,25 @@ def _is_gui_available():
def is_resource_enabled(resource):
"""Test whether a resource is enabled. Known resources are set by
regrtest.py."""
# Pyston change: we assume that resources are not available in general
return use_resources is not None and resource in use_resources
def requires(resource, msg=None):
"""Raise ResourceDenied if the specified resource is not available."""
"""Raise ResourceDenied if the specified resource is not available.
If the caller's module is __main__ then automatically return True. The
possibility of False being returned occurs when regrtest.py is executing."""
if resource == 'gui' and not _is_gui_available():
raise ResourceDenied(_is_gui_available.reason)
# Pyston change: we don't check if the caller's module is __main__ using sys._getframe() magic.
# see if the caller's module is __main__ - if so, treat as if
# the resource was set
if sys._getframe(1).f_globals.get("__name__") == "__main__":
return
if not is_resource_enabled(resource):
if msg is None:
msg = "Use of the `%s' resource not enabled" % resource
raise ResourceDenied(msg)
# Don't use "localhost", since resolving it uses the DNS under recent
# Windows versions (see issue #18792).
HOST = "127.0.0.1"
......@@ -501,11 +507,6 @@ try:
except NameError:
have_unicode = False
requires_unicode = unittest.skipUnless(have_unicode, 'no unicode support')
def u(s):
return unicode(s, 'unicode-escape')
is_jython = sys.platform.startswith('java')
# FS_NONASCII: non-ASCII Unicode character encodable by
......@@ -749,49 +750,42 @@ class WarningsRecorder(object):
def _filterwarnings(filters, quiet=False):
# Pyston change:
# this bare yield seems to work for now, but we might need to yield up a WarningsRecorder in some cases?
yield
# TODO: Frame introspection in Pyston?
# old code follows:
# """Catch the warnings, then check if all the expected
# warnings have been raised and re-raise unexpected warnings.
# If 'quiet' is True, only re-raise the unexpected warnings.
# """
"""Catch the warnings, then check if all the expected
warnings have been raised and re-raise unexpected warnings.
If 'quiet' is True, only re-raise the unexpected warnings.
"""
# Clear the warning registry of the calling module
# in order to re-raise the warnings.
# frame = sys._getframe(2)
# registry = frame.f_globals.get('__warningregistry__')
# if registry:
# registry.clear()
# with warnings.catch_warnings(record=True) as w:
# # Set filter "always" to record all warnings. Because
# # test_warnings swap the module, we need to look up in
# # the sys.modules dictionary.
# sys.modules['warnings'].simplefilter("always")
# yield WarningsRecorder(w)
# # Filter the recorded warnings
# reraise = [warning.message for warning in w]
# missing = []
# for msg, cat in filters:
# seen = False
# for exc in reraise[:]:
# message = str(exc)
# # Filter out the matching messages
# if (re.match(msg, message, re.I) and
# issubclass(exc.__class__, cat)):
# seen = True
# reraise.remove(exc)
# if not seen and not quiet:
# # This filter caught nothing
# missing.append((msg, cat.__name__))
# if reraise:
# raise AssertionError("unhandled warning %r" % reraise[0])
# if missing:
# raise AssertionError("filter (%r, %s) did not catch any warning" %
# missing[0])
frame = sys._getframe(2)
registry = frame.f_globals.get('__warningregistry__')
if registry:
registry.clear()
with warnings.catch_warnings(record=True) as w:
# Set filter "always" to record all warnings. Because
# test_warnings swap the module, we need to look up in
# the sys.modules dictionary.
sys.modules['warnings'].simplefilter("always")
yield WarningsRecorder(w)
# Filter the recorded warnings
reraise = [warning.message for warning in w]
missing = []
for msg, cat in filters:
seen = False
for exc in reraise[:]:
message = str(exc)
# Filter out the matching messages
if (re.match(msg, message, re.I) and
issubclass(exc.__class__, cat)):
seen = True
reraise.remove(exc)
if not seen and not quiet:
# This filter caught nothing
missing.append((msg, cat.__name__))
if reraise:
raise AssertionError("unhandled warning %r" % reraise[0])
if missing:
raise AssertionError("filter (%r, %s) did not catch any warning" %
missing[0])
@contextlib.contextmanager
......
......@@ -773,55 +773,21 @@ class Pathname_Tests(unittest.TestCase):
class Utility_Tests(unittest.TestCase):
"""Testcase to test the various utility functions in the urllib."""
# In Python 3 this test class is moved to test_urlparse.
def test_splittype(self):
splittype = urllib.splittype
self.assertEqual(splittype('type:opaquestring'), ('type', 'opaquestring'))
self.assertEqual(splittype('opaquestring'), (None, 'opaquestring'))
self.assertEqual(splittype(':opaquestring'), (None, ':opaquestring'))
self.assertEqual(splittype('type:'), ('type', ''))
self.assertEqual(splittype('type:opaque:string'), ('type', 'opaque:string'))
def test_splithost(self):
splithost = urllib.splithost
self.assertEqual(splithost('//www.example.org:80/foo/bar/baz.html'),
('www.example.org:80', '/foo/bar/baz.html'))
self.assertEqual(splithost('//www.example.org:80'),
('www.example.org:80', ''))
self.assertEqual(splithost('/foo/bar/baz.html'),
(None, '/foo/bar/baz.html'))
def test_splituser(self):
splituser = urllib.splituser
self.assertEqual(splituser('User:Pass@www.python.org:080'),
('User:Pass', 'www.python.org:080'))
self.assertEqual(splituser('@www.python.org:080'),
('', 'www.python.org:080'))
self.assertEqual(splituser('www.python.org:080'),
(None, 'www.python.org:080'))
self.assertEqual(splituser('User:Pass@'),
('User:Pass', ''))
self.assertEqual(splituser('User@example.com:Pass@www.python.org:080'),
('User@example.com:Pass', 'www.python.org:080'))
def test_splitpasswd(self):
# Some of the password examples are not sensible, but it is added to
# confirming to RFC2617 and addressing issue4675.
splitpasswd = urllib.splitpasswd
self.assertEqual(splitpasswd('user:ab'), ('user', 'ab'))
self.assertEqual(splitpasswd('user:a\nb'), ('user', 'a\nb'))
self.assertEqual(splitpasswd('user:a\tb'), ('user', 'a\tb'))
self.assertEqual(splitpasswd('user:a\rb'), ('user', 'a\rb'))
self.assertEqual(splitpasswd('user:a\fb'), ('user', 'a\fb'))
self.assertEqual(splitpasswd('user:a\vb'), ('user', 'a\vb'))
self.assertEqual(splitpasswd('user:a:b'), ('user', 'a:b'))
self.assertEqual(splitpasswd('user:a b'), ('user', 'a b'))
self.assertEqual(splitpasswd('user 2:ab'), ('user 2', 'ab'))
self.assertEqual(splitpasswd('user+1:a+b'), ('user+1', 'a+b'))
self.assertEqual(splitpasswd('user:'), ('user', ''))
self.assertEqual(splitpasswd('user'), ('user', None))
self.assertEqual(splitpasswd(':ab'), ('', 'ab'))
"""Some of the password examples are not sensible, but it is added to
confirming to RFC2617 and addressing issue4675.
"""
self.assertEqual(('user', 'ab'),urllib.splitpasswd('user:ab'))
self.assertEqual(('user', 'a\nb'),urllib.splitpasswd('user:a\nb'))
self.assertEqual(('user', 'a\tb'),urllib.splitpasswd('user:a\tb'))
self.assertEqual(('user', 'a\rb'),urllib.splitpasswd('user:a\rb'))
self.assertEqual(('user', 'a\fb'),urllib.splitpasswd('user:a\fb'))
self.assertEqual(('user', 'a\vb'),urllib.splitpasswd('user:a\vb'))
self.assertEqual(('user', 'a:b'),urllib.splitpasswd('user:a:b'))
self.assertEqual(('user', 'a b'),urllib.splitpasswd('user:a b'))
self.assertEqual(('user 2', 'ab'),urllib.splitpasswd('user 2:ab'))
self.assertEqual(('user+1', 'a+b'),urllib.splitpasswd('user+1:a+b'))
def test_splitport(self):
splitport = urllib.splitport
......@@ -830,9 +796,6 @@ class Utility_Tests(unittest.TestCase):
self.assertEqual(splitport('parrot:'), ('parrot', None))
self.assertEqual(splitport('127.0.0.1'), ('127.0.0.1', None))
self.assertEqual(splitport('parrot:cheese'), ('parrot:cheese', None))
self.assertEqual(splitport('[::1]:88'), ('[::1]', '88'))
self.assertEqual(splitport('[::1]'), ('[::1]', None))
self.assertEqual(splitport(':88'), ('', '88'))
def test_splitnport(self):
splitnport = urllib.splitnport
......@@ -846,59 +809,6 @@ class Utility_Tests(unittest.TestCase):
self.assertEqual(splitnport('parrot:cheese'), ('parrot', None))
self.assertEqual(splitnport('parrot:cheese', 55), ('parrot', None))
def test_splitquery(self):
# Normal cases are exercised by other tests; ensure that we also
# catch cases with no port specified (testcase ensuring coverage)
splitquery = urllib.splitquery
self.assertEqual(splitquery('http://python.org/fake?foo=bar'),
('http://python.org/fake', 'foo=bar'))
self.assertEqual(splitquery('http://python.org/fake?foo=bar?'),
('http://python.org/fake?foo=bar', ''))
self.assertEqual(splitquery('http://python.org/fake'),
('http://python.org/fake', None))
self.assertEqual(splitquery('?foo=bar'), ('', 'foo=bar'))
def test_splittag(self):
splittag = urllib.splittag
self.assertEqual(splittag('http://example.com?foo=bar#baz'),
('http://example.com?foo=bar', 'baz'))
self.assertEqual(splittag('http://example.com?foo=bar#'),
('http://example.com?foo=bar', ''))
self.assertEqual(splittag('#baz'), ('', 'baz'))
self.assertEqual(splittag('http://example.com?foo=bar'),
('http://example.com?foo=bar', None))
self.assertEqual(splittag('http://example.com?foo=bar#baz#boo'),
('http://example.com?foo=bar#baz', 'boo'))
def test_splitattr(self):
splitattr = urllib.splitattr
self.assertEqual(splitattr('/path;attr1=value1;attr2=value2'),
('/path', ['attr1=value1', 'attr2=value2']))
self.assertEqual(splitattr('/path;'), ('/path', ['']))
self.assertEqual(splitattr(';attr1=value1;attr2=value2'),
('', ['attr1=value1', 'attr2=value2']))
self.assertEqual(splitattr('/path'), ('/path', []))
def test_splitvalue(self):
# Normal cases are exercised by other tests; test pathological cases
# with no key/value pairs. (testcase ensuring coverage)
splitvalue = urllib.splitvalue
self.assertEqual(splitvalue('foo=bar'), ('foo', 'bar'))
self.assertEqual(splitvalue('foo='), ('foo', ''))
self.assertEqual(splitvalue('=bar'), ('', 'bar'))
self.assertEqual(splitvalue('foobar'), ('foobar', None))
self.assertEqual(splitvalue('foo=bar=baz'), ('foo', 'bar=baz'))
def test_toBytes(self):
result = urllib.toBytes(u'http://www.python.org')
self.assertEqual(result, 'http://www.python.org')
self.assertRaises(UnicodeError, urllib.toBytes,
test_support.u(r'http://www.python.org/medi\u00e6val'))
def test_unwrap(self):
url = urllib.unwrap('<URL:type://host/path>')
self.assertEqual(url, 'type://host/path')
class URLopener_Tests(unittest.TestCase):
"""Testcase to test the open method of URLopener class."""
......
......@@ -479,9 +479,21 @@ extern "C" PyObject* PyObject_SelfIter(PyObject* obj) noexcept {
}
extern "C" int PyObject_GenericSetAttr(PyObject* obj, PyObject* name, PyObject* value) noexcept {
RELEASE_ASSERT(PyString_Check(name), "");
if (!PyString_Check(name)) {
if (PyUnicode_Check(name)) {
name = PyUnicode_AsEncodedString(name, NULL, NULL);
if (name == NULL)
return -1;
} else {
PyErr_Format(PyExc_TypeError, "attribute name must be string, not '%.200s'", Py_TYPE(name)->tp_name);
return -1;
}
}
BoxedString* str = static_cast<BoxedString*>(name);
internStringMortalInplace(str);
assert(PyString_Check(name));
try {
if (value == NULL)
delattrGeneric(obj, str, NULL);
......
......@@ -30,6 +30,7 @@
#include "core/thread_utils.h"
#include "core/util.h"
#include "runtime/objmodel.h" // _printStacktrace
#include "runtime/types.h"
namespace pyston {
namespace threading {
......@@ -737,5 +738,23 @@ extern "C" void PyThread_delete_key_value(int key) noexcept {
Py_FatalError("unimplemented");
}
extern "C" PyObject *_PyThread_CurrentFrames(void) noexcept {
try {
LOCK_REGION(&threading_lock);
BoxedDict* result = new BoxedDict;
for (auto& pair : current_threads) {
FrameInfo* frame_info = (FrameInfo*)pair.second->public_thread_state->frame_info;
Box* frame = getFrame(frame_info);
assert(frame);
result->d[boxInt(pair.first)] = frame;
}
return result;
} catch (ExcInfo) {
RELEASE_ASSERT(0, "not implemented");
}
}
} // namespace threading
} // namespace pyston
......@@ -121,6 +121,13 @@ Box* sysGetFrame(Box* val) {
return frame;
}
Box* sysCurrentFrames() {
Box* rtn = _PyThread_CurrentFrames();
if (!rtn)
throwCAPIException();
return rtn;
}
Box* sysGetDefaultEncoding() {
return boxString(PyUnicode_GetDefaultEncoding());
}
......@@ -674,6 +681,8 @@ void setupSys() {
sys_module->giveAttr(
"_getframe",
new BoxedFunction(FunctionMetadata::create((void*)sysGetFrame, UNKNOWN, 1, false, false), { NULL }));
sys_module->giveAttr("_current_frames",
new BoxedFunction(FunctionMetadata::create((void*)sysCurrentFrames, UNKNOWN, 0)));
sys_module->giveAttr("getdefaultencoding", new BoxedBuiltinFunctionOrMethod(
FunctionMetadata::create((void*)sysGetDefaultEncoding, STR, 0),
"getdefaultencoding", getdefaultencoding_doc));
......
......@@ -1359,9 +1359,9 @@ void setupInt() {
static PyNumberMethods int_as_number;
int_cls->tp_as_number = &int_as_number;
for (int i = 0; i < NUM_INTERNED_INTS; i++) {
interned_ints[i] = new BoxedInt(i);
gc::registerPermanentRoot(interned_ints[i]);
for (int i = MIN_INTERNED_INT; i <= MAX_INTERNED_INT; i++) {
interned_ints[-MIN_INTERNED_INT + i] = new BoxedInt(i);
gc::registerPermanentRoot(interned_ints[-MIN_INTERNED_INT + i]);
}
int_cls->giveAttr("__getnewargs__", new BoxedFunction(FunctionMetadata::create((void*)int_getnewargs, UNKNOWN, 1,
......
......@@ -1708,8 +1708,9 @@ extern "C" Box* getclsattr(Box* obj, BoxedString* attr) {
else {
gotten = getclsattrInternal<NOT_REWRITABLE>(obj, attr, NULL);
}
RELEASE_ASSERT(gotten, "%s:%s", getTypeName(obj), attr->data());
if (!gotten)
raiseExcHelper(AttributeError, "%s", attr->data());
return gotten;
}
......@@ -5589,7 +5590,9 @@ void Box::delattr(BoxedString* attr, DelattrRewriteArgs* rewrite_args) {
}
if (cls->instancesHaveDictAttrs()) {
Py_FatalError("unimplemented");
BoxedDict* d = getDict();
d->d.erase(attr);
return;
}
abort();
......
......@@ -157,7 +157,7 @@ Box* superRepr(Box* _s) {
// Ported from the CPython version:
BoxedClass* supercheck(BoxedClass* type, Box* obj) {
template <ExceptionStyle S> BoxedClass* superCheck(BoxedClass* type, Box* obj) noexcept(S == CAPI) {
if (PyType_Check(obj) && isSubclass(static_cast<BoxedClass*>(obj), type))
return static_cast<BoxedClass*>(obj);
......@@ -172,7 +172,33 @@ BoxedClass* supercheck(BoxedClass* type, Box* obj) {
return static_cast<BoxedClass*>(class_attr);
}
raiseExcHelper(TypeError, "super(type, obj): obj must be an instance or subtype of type");
if (S == CXX)
raiseExcHelper(TypeError, "super(type, obj): obj must be an instance or subtype of type");
else
PyErr_SetString(TypeError, "super(type, obj): obj must be an instance or subtype of type");
return NULL;
}
template <ExceptionStyle S>
static PyObject* superGet(PyObject* _self, PyObject* obj, PyObject* type) noexcept(S == CAPI) {
BoxedSuper* self = static_cast<BoxedSuper*>(_self);
if (obj == NULL || obj == None || self->obj != NULL) {
/* Not binding to an object, or already bound */
return self;
}
if (self->cls != super_cls) {
/* If self is an instance of a (strict) subclass of super,
call its type */
return runtimeCallInternal<S, NOT_REWRITABLE>(self->cls, NULL, ArgPassSpec(2), self->type, obj, NULL, NULL,
NULL);
} else {
/* Inline the common case */
BoxedClass* obj_type = superCheck<S>(self->type, obj);
if (obj_type == NULL)
return NULL;
return new BoxedSuper(self->type, obj, obj_type);
}
}
Box* superInit(Box* _self, Box* _type, Box* obj) {
......@@ -187,7 +213,7 @@ Box* superInit(Box* _self, Box* _type, Box* obj) {
if (obj == None)
obj = NULL;
if (obj != NULL)
obj_type = supercheck(type, obj);
obj_type = superCheck<CXX>(type, obj);
self->type = type;
self->obj = obj;
......@@ -206,6 +232,7 @@ void setupSuper() {
super_cls->giveAttr(
"__init__", new BoxedFunction(FunctionMetadata::create((void*)superInit, UNKNOWN, 3, false, false), { NULL }));
super_cls->giveAttr("__get__", new BoxedFunction(FunctionMetadata::create((void*)superGet<CXX>, UNKNOWN, 3)));
super_cls->giveAttr("__thisclass__",
new BoxedMemberDescriptor(BoxedMemberDescriptor::OBJECT, offsetof(BoxedSuper, type)));
......@@ -216,5 +243,6 @@ void setupSuper() {
super_cls->freeze();
super_cls->tp_getattro = super_getattro;
super_cls->tp_descr_get = superGet<CAPI>;
}
}
......@@ -1163,11 +1163,15 @@ inline BoxedString* boxString(llvm::StringRef s) {
return new (s.size()) BoxedString(s);
}
#define NUM_INTERNED_INTS 100
#define MIN_INTERNED_INT -5 // inclusive
#define MAX_INTERNED_INT 256 // inclusive
static_assert(MIN_INTERNED_INT < 0 && MAX_INTERNED_INT > 0, "");
#define NUM_INTERNED_INTS ((-MIN_INTERNED_INT) + MAX_INTERNED_INT + 1)
extern BoxedInt* interned_ints[NUM_INTERNED_INTS];
extern "C" inline Box* boxInt(int64_t n) {
if (0 <= n && n < NUM_INTERNED_INTS) {
return interned_ints[n];
if (n >= MIN_INTERNED_INT && n <= MAX_INTERNED_INT) {
return interned_ints[(-MIN_INTERNED_INT) + n];
}
return new BoxedInt(n);
}
......
......@@ -95,7 +95,6 @@ test_exceptions we are missing recursion-depth checking
test_extcall f(**kw) crashes if kw isn't a dict
test_file2k we abort when you try to open() a directory
test_file_eintr not sure
test_fileio [unknown]
test_fork1 [unknown]
test_frozen [unknown]
test_ftplib [unknown]
......
......@@ -38,3 +38,6 @@ class Test(object):
t = Test()
t.a = 1
object.__setattr__(t, u"ustr", "42")
print t.ustr
# this test is a modfied version of a testcase inside test_descr
class C:
__metaclass__ = type
def __init__(self):
self.__state = 0
def getstate(self):
return self.__state
def setstate(self, state):
self.__state = state
a = C()
assert a.getstate() == 0
a.setstate(10)
assert a.getstate() == 10
class D:
class __metaclass__(type):
def myself(cls): return cls
assert D.myself() == D
d = D()
assert d.__class__ == D
class M1(type):
def __new__(cls, name, bases, dict):
dict['__spam__'] = 1
return type.__new__(cls, name, bases, dict)
class C:
__metaclass__ = M1
assert C.__spam__ == 1
c = C()
assert c.__spam__ == 1
class _instance(object):
pass
class M2(object):
@staticmethod
def __new__(cls, name, bases, dict):
self = object.__new__(cls)
self.name = name
self.bases = bases
self.dict = dict
return self
def __call__(self):
it = _instance()
# Early binding of methods
for key in self.dict:
if key.startswith("__"):
continue
setattr(it, key, self.dict[key].__get__(it, self))
return it
class C:
__metaclass__ = M2
def spam(self):
return 42
assert C.name == 'C'
assert C.bases == ()
assert 'spam' in C.dict
c = C()
assert c.spam() == 42
# More metaclass examples
class autosuper(type):
# Automatically add __super to the class
# This trick only works for dynamic classes
def __new__(metaclass, name, bases, dict):
cls = super(autosuper, metaclass).__new__(metaclass,
name, bases, dict)
# Name mangling for __super removes leading underscores
while name[:1] == "_":
name = name[1:]
if name:
name = "_%s__super" % name
else:
name = "__super"
setattr(cls, name, super(cls))
return cls
class A:
__metaclass__ = autosuper
def meth(self):
return "A"
class B(A):
def meth(self):
return "B" + self.__super.meth()
class C(A):
def meth(self):
return "C" + self.__super.meth()
class D(C, B):
def meth(self):
return "D" + self.__super.meth()
assert D().meth() == "DCBA"
class E(B, C):
def meth(self):
return "E" + self.__super.meth()
assert E().meth() == "EBCA"
class autoproperty(type):
# Automatically create property attributes when methods
# named _get_x and/or _set_x are found
def __new__(metaclass, name, bases, dict):
hits = {}
for key, val in dict.iteritems():
if key.startswith("_get_"):
key = key[5:]
get, set = hits.get(key, (None, None))
get = val
hits[key] = get, set
elif key.startswith("_set_"):
key = key[5:]
get, set = hits.get(key, (None, None))
set = val
hits[key] = get, set
for key, (get, set) in hits.iteritems():
dict[key] = property(get, set)
return super(autoproperty, metaclass).__new__(metaclass,
name, bases, dict)
class A:
__metaclass__ = autoproperty
def _get_x(self):
return -self.__x
def _set_x(self, x):
self.__x = -x
a = A()
assert not hasattr(a, "x")
a.x = 12
assert a.x == 12
assert a._A__x == -12
class multimetaclass(autoproperty, autosuper):
# Merge of multiple cooperating metaclasses
pass
class A:
__metaclass__ = multimetaclass
def _get_x(self):
return "A"
class B(A):
def _get_x(self):
return "B" + self.__super._get_x()
class C(A):
def _get_x(self):
return "C" + self.__super._get_x()
class D(C, B):
def _get_x(self):
return "D" + self.__super._get_x()
assert D().x == "DCBA"
# Make sure type(x) doesn't call x.__class__.__init__
class T(type):
counter = 0
def __init__(self, *args):
T.counter += 1
class C:
__metaclass__ = T
assert T.counter == 1
a = C()
assert type(a) == C
assert T.counter == 1
class C(object): pass
c = C()
try: c()
except TypeError: pass
else: self.fail("calling object w/o call method should raise "
"TypeError")
# Testing code to find most derived baseclass
class A(type):
def __new__(*args, **kwargs):
return type.__new__(*args, **kwargs)
class B(object):
pass
class C(object):
__metaclass__ = A
# The most derived metaclass of D is A rather than type.
class D(B, C):
pass
print "finished"
# this is copied out of cpythons test_sys.py and adopted to use assert stmts
import sys
import thread
import threading, thread
import traceback
# Spawn a thread that blocks at a known place. Then the main
# thread does sys._current_frames(), and verifies that the frames
# returned make sense.
entered_g = threading.Event()
leave_g = threading.Event()
thread_info = [] # the thread's id
def f123():
g456()
def g456():
thread_info.append(thread.get_ident())
entered_g.set()
leave_g.wait()
t = threading.Thread(target=f123)
t.start()
entered_g.wait()
# At this point, t has finished its entered_g.set(), although it's
# impossible to guess whether it's still on that line or has moved on
# to its leave_g.wait().
assert len(thread_info) == 1
thread_id = thread_info[0]
d = sys._current_frames()
main_id = thread.get_ident()
assert main_id in d
assert thread_id in d
# Verify that the captured main-thread frame is _this_ frame.
frame = d.pop(main_id)
assert frame is sys._getframe()
# Verify that the captured thread frame is blocked in g456, called
# from f123. This is a litte tricky, since various bits of
# threading.py are also in the thread's call stack.
frame = d.pop(thread_id)
stack = traceback.extract_stack(frame)
for i, (filename, lineno, funcname, sourceline) in enumerate(stack):
if funcname == "f123":
break
else:
self.fail("didn't find f123() on thread's call stack")
assert sourceline == "g456()"
# And the next record must be for g456().
filename, lineno, funcname, sourceline = stack[i+1]
assert funcname == "g456"
assert sourceline in ["leave_g.wait()", "entered_g.set()"]
# Reap the spawned thread.
leave_g.set()
t.join()
print "finished"
......@@ -67,3 +67,8 @@ def f2(b, C):
print f2(False, NewC), f2(False, OldC)
print f2(True, NewC), f2(True, OldC)
try:
with None:
print "inside"
except AttributeError as e:
assert "__exit__" in str(e)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment