diff --git a/Lib/test/test_sys.py b/Lib/test/test_sys.py index 1ce2e9fc0f..8a91403bfe 100644 --- a/Lib/test/test_sys.py +++ b/Lib/test/test_sys.py @@ -1,9 +1,12 @@ import builtins import codecs +# import _datetime # TODO: RUSTPYTHON import gc +import io import locale import operator import os +import random import struct import subprocess import sys @@ -14,14 +17,21 @@ from test.support.script_helper import assert_python_ok, assert_python_failure from test.support import threading_helper from test.support import import_helper +from test.support import force_not_colorized +try: + from test.support import interpreters +except ImportError: + interpreters = None import textwrap import unittest import warnings -# count the number of test runs, used to create unique -# strings to intern in test_intern() -INTERN_NUMRUNS = 0 +def requires_subinterpreters(meth): + """Decorator to skip a test if subinterpreters are not supported.""" + return unittest.skipIf(interpreters is None, + 'subinterpreters required')(meth) + DICT_KEY_STRUCT_FORMAT = 'n2BI2n' @@ -71,6 +81,18 @@ def baddisplayhook(obj): code = compile("42", "", "single") self.assertRaises(ValueError, eval, code) + def test_gh130163(self): + class X: + def __repr__(self): + sys.stdout = io.StringIO() + support.gc_collect() + return 'foo' + + with support.swap_attr(sys, 'stdout', None): + sys.stdout = io.StringIO() # the only reference + sys.displayhook(X()) # should not crash + + class ActiveExceptionTests(unittest.TestCase): def test_exc_info_no_exception(self): self.assertEqual(sys.exc_info(), (None, None, None)) @@ -137,6 +159,7 @@ def f(): class ExceptHookTest(unittest.TestCase): + @force_not_colorized def test_original_excepthook(self): try: raise ValueError(42) @@ -148,8 +171,8 @@ def test_original_excepthook(self): self.assertRaises(TypeError, sys.__excepthook__) - # TODO: RUSTPYTHON, SyntaxError formatting in arbitrary tracebacks - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON; SyntaxError formatting in arbitrary tracebacks + @force_not_colorized def test_excepthook_bytes_filename(self): # bpo-37467: sys.excepthook() must not crash if a filename # is a bytes string @@ -169,7 +192,8 @@ def test_excepthook_bytes_filename(self): def test_excepthook(self): with test.support.captured_output("stderr") as stderr: - sys.excepthook(1, '1', 1) + with test.support.catch_unraisable_exception(): + sys.excepthook(1, '1', 1) self.assertTrue("TypeError: print_exception(): Exception expected for " \ "value, str found" in stderr.getvalue()) @@ -196,6 +220,20 @@ def test_exit(self): self.assertEqual(out, b'') self.assertEqual(err, b'') + # gh-125842: Windows uses 32-bit unsigned integers for exit codes + # so a -1 exit code is sometimes interpreted as 0xffff_ffff. + rc, out, err = assert_python_failure('-c', 'import sys; sys.exit(0xffff_ffff)') + self.assertIn(rc, (-1, 0xff, 0xffff_ffff)) + self.assertEqual(out, b'') + self.assertEqual(err, b'') + + # Overflow results in a -1 exit code, which may be converted to 0xff + # or 0xffff_ffff. + rc, out, err = assert_python_failure('-c', 'import sys; sys.exit(2**128)') + self.assertIn(rc, (-1, 0xff, 0xffff_ffff)) + self.assertEqual(out, b'') + self.assertEqual(err, b'') + # call with integer argument with self.assertRaises(SystemExit) as cm: sys.exit(42) @@ -238,17 +276,36 @@ def check_exit_message(code, expected, **env_vars): # test that the exit message is written with backslashreplace error # handler to stderr - # TODO: RUSTPYTHON; allow surrogates in strings - # check_exit_message( - # r'import sys; sys.exit("surrogates:\uDCFF")', - # b"surrogates:\\udcff") + check_exit_message( + r'import sys; sys.exit("surrogates:\uDCFF")', + b"surrogates:\\udcff") # test that the unicode message is encoded to the stderr encoding # instead of the default encoding (utf8) - # TODO: RUSTPYTHON; handle PYTHONIOENCODING - # check_exit_message( - # r'import sys; sys.exit("h\xe9")', - # b"h\xe9", PYTHONIOENCODING='latin-1') + check_exit_message( + r'import sys; sys.exit("h\xe9")', + b"h\xe9", PYTHONIOENCODING='latin-1') + + @support.requires_subprocess() + def test_exit_codes_under_repl(self): + # GH-129900: SystemExit, or things that raised it, didn't + # get their return code propagated by the REPL + import tempfile + + exit_ways = [ + "exit", + "__import__('sys').exit", + "raise SystemExit" + ] + + for exitfunc in exit_ways: + for return_code in (0, 123): + with self.subTest(exitfunc=exitfunc, return_code=return_code): + with tempfile.TemporaryFile("w+") as stdin: + stdin.write(f"{exitfunc}({return_code})\n") + stdin.seek(0) + proc = subprocess.run([sys.executable], stdin=stdin) + self.assertEqual(proc.returncode, return_code) def test_getdefaultencoding(self): self.assertRaises(TypeError, sys.getdefaultencoding, 42) @@ -273,21 +330,30 @@ def test_switchinterval(self): finally: sys.setswitchinterval(orig) - def test_recursionlimit(self): + def test_getrecursionlimit(self): + limit = sys.getrecursionlimit() + self.assertIsInstance(limit, int) + self.assertGreater(limit, 1) + self.assertRaises(TypeError, sys.getrecursionlimit, 42) - oldlimit = sys.getrecursionlimit() - self.assertRaises(TypeError, sys.setrecursionlimit) - self.assertRaises(ValueError, sys.setrecursionlimit, -42) - sys.setrecursionlimit(10000) - self.assertEqual(sys.getrecursionlimit(), 10000) - sys.setrecursionlimit(oldlimit) - - @unittest.skipIf(getattr(sys, "_rustpython_debugbuild", False), "TODO: RUSTPYTHON, stack overflow on debug build") + + def test_setrecursionlimit(self): + old_limit = sys.getrecursionlimit() + try: + sys.setrecursionlimit(10_005) + self.assertEqual(sys.getrecursionlimit(), 10_005) + + self.assertRaises(TypeError, sys.setrecursionlimit) + self.assertRaises(ValueError, sys.setrecursionlimit, -42) + finally: + sys.setrecursionlimit(old_limit) + + @unittest.skipIf(getattr(sys, '_rustpython_debugbuild', False), 'TODO: RUSTPYTHON; stack overflow on debug build') def test_recursionlimit_recovery(self): if hasattr(sys, 'gettrace') and sys.gettrace(): self.skipTest('fatal error if run with a trace function') - oldlimit = sys.getrecursionlimit() + old_limit = sys.getrecursionlimit() def f(): f() try: @@ -306,38 +372,63 @@ def f(): with self.assertRaises(RecursionError): f() finally: - sys.setrecursionlimit(oldlimit) + sys.setrecursionlimit(old_limit) @test.support.cpython_only - def test_setrecursionlimit_recursion_depth(self): + def test_setrecursionlimit_to_depth(self): # Issue #25274: Setting a low recursion limit must be blocked if the # current recursion depth is already higher than limit. - from _testinternalcapi import get_recursion_depth - - def set_recursion_limit_at_depth(depth, limit): - recursion_depth = get_recursion_depth() - if recursion_depth >= depth: + old_limit = sys.getrecursionlimit() + try: + depth = support.get_recursion_depth() + with self.subTest(limit=sys.getrecursionlimit(), depth=depth): + # depth + 1 is OK + sys.setrecursionlimit(depth + 1) + + # reset the limit to be able to call self.assertRaises() + # context manager + sys.setrecursionlimit(old_limit) with self.assertRaises(RecursionError) as cm: - sys.setrecursionlimit(limit) - self.assertRegex(str(cm.exception), - "cannot set the recursion limit to [0-9]+ " - "at the recursion depth [0-9]+: " - "the limit is too low") - else: - set_recursion_limit_at_depth(depth, limit) + sys.setrecursionlimit(depth) + self.assertRegex(str(cm.exception), + "cannot set the recursion limit to [0-9]+ " + "at the recursion depth [0-9]+: " + "the limit is too low") + finally: + sys.setrecursionlimit(old_limit) - oldlimit = sys.getrecursionlimit() - try: - sys.setrecursionlimit(1000) + @unittest.skipUnless(support.Py_GIL_DISABLED, "only meaningful if the GIL is disabled") + @threading_helper.requires_working_threading() + def test_racing_recursion_limit(self): + from threading import Thread + def something_recursive(): + def count(n): + if n > 0: + return count(n - 1) + 1 + return 0 - for limit in (10, 25, 50, 75, 100, 150, 200): - set_recursion_limit_at_depth(limit, limit) - finally: - sys.setrecursionlimit(oldlimit) + count(50) + + def set_recursion_limit(): + for limit in range(100, 200): + sys.setrecursionlimit(limit) + + threads = [] + for _ in range(5): + threads.append(Thread(target=set_recursion_limit)) + + for _ in range(5): + threads.append(Thread(target=something_recursive)) + + with threading_helper.catch_threading_exception() as cm: + with threading_helper.start_threads(threads): + pass + + if cm.exc_value: + raise cm.exc_value - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_getwindowsversion(self): # Raise SkipTest if sys doesn't have getwindowsversion attribute test.support.get_attribute(sys, "getwindowsversion") @@ -368,8 +459,7 @@ def test_getwindowsversion(self): # still has 5 elements maj, min, buildno, plat, csd = sys.getwindowsversion() - # TODO: RUSTPYTHON, AttributeError: module 'sys' has no attribute 'call_tracing' - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON; AttributeError: module 'sys' has no attribute 'call_tracing' def test_call_tracing(self): self.assertRaises(TypeError, sys.call_tracing, type, 2) @@ -386,15 +476,21 @@ def test_dlopenflags(self): @test.support.refcount_test def test_refcount(self): - # n here must be a global in order for this test to pass while - # tracing with a python function. Tracing calls PyFrame_FastToLocals - # which will add a copy of any locals to the frame object, causing - # the reference count to increase by 2 instead of 1. + # n here originally had to be a global in order for this test to pass + # while tracing with a python function. Tracing used to call + # PyFrame_FastToLocals, which would add a copy of any locals to the + # frame object, causing the ref count to increase by 2 instead of 1. + # While that no longer happens (due to PEP 667), this test case retains + # its original global-based implementation + # PEP 683's immortal objects also made this point moot, since the + # refcount for None doesn't change anyway. Maybe this test should be + # using a different constant value? (e.g. an integer) global n self.assertRaises(TypeError, sys.getrefcount) c = sys.getrefcount(None) n = None - self.assertEqual(sys.getrefcount(None), c+1) + # Singleton refcnts don't change + self.assertEqual(sys.getrefcount(None), c) del n self.assertEqual(sys.getrefcount(None), c) if hasattr(sys, "gettotalrefcount"): @@ -408,6 +504,26 @@ def test_getframe(self): is sys._getframe().f_code ) + def test_getframemodulename(self): + # Default depth gets ourselves + self.assertEqual(__name__, sys._getframemodulename()) + self.assertEqual("unittest.case", sys._getframemodulename(1)) + i = 0 + f = sys._getframe(i) + while f: + self.assertEqual( + f.f_globals['__name__'], + sys._getframemodulename(i) or '__main__' + ) + i += 1 + f2 = f.f_back + try: + f = sys._getframe(i) + except ValueError: + break + self.assertIs(f, f2) + self.assertIsNone(sys._getframemodulename(i)) + # sys._current_frames() is a CPython-only gimmick. # XXX RUSTPYTHON: above comment is from original cpython test; not sure why the cpython_only decorator wasn't added @test.support.cpython_only @@ -436,49 +552,49 @@ def g456(): t.start() entered_g.wait() - # At this point, t has finished its entered_g.set(), although it's - # impossible to guess whether it's still on that line or has moved on - # to its leave_g.wait(). - self.assertEqual(len(thread_info), 1) - thread_id = thread_info[0] - - d = sys._current_frames() - for tid in d: - self.assertIsInstance(tid, int) - self.assertGreater(tid, 0) - - main_id = threading.get_ident() - self.assertIn(main_id, d) - self.assertIn(thread_id, d) - - # Verify that the captured main-thread frame is _this_ frame. - frame = d.pop(main_id) - self.assertTrue(frame is sys._getframe()) - - # Verify that the captured thread frame is blocked in g456, called - # from f123. This is a little tricky, since various bits of - # threading.py are also in the thread's call stack. - frame = d.pop(thread_id) - stack = traceback.extract_stack(frame) - for i, (filename, lineno, funcname, sourceline) in enumerate(stack): - if funcname == "f123": - break - else: - self.fail("didn't find f123() on thread's call stack") - - self.assertEqual(sourceline, "g456()") + try: + # At this point, t has finished its entered_g.set(), although it's + # impossible to guess whether it's still on that line or has moved on + # to its leave_g.wait(). + self.assertEqual(len(thread_info), 1) + thread_id = thread_info[0] + + d = sys._current_frames() + for tid in d: + self.assertIsInstance(tid, int) + self.assertGreater(tid, 0) + + main_id = threading.get_ident() + self.assertIn(main_id, d) + self.assertIn(thread_id, d) + + # Verify that the captured main-thread frame is _this_ frame. + frame = d.pop(main_id) + self.assertTrue(frame is sys._getframe()) + + # Verify that the captured thread frame is blocked in g456, called + # from f123. This is a little tricky, since various bits of + # threading.py are also in the thread's call stack. + frame = d.pop(thread_id) + stack = traceback.extract_stack(frame) + for i, (filename, lineno, funcname, sourceline) in enumerate(stack): + if funcname == "f123": + break + else: + self.fail("didn't find f123() on thread's call stack") - # And the next record must be for g456(). - filename, lineno, funcname, sourceline = stack[i+1] - self.assertEqual(funcname, "g456") - self.assertIn(sourceline, ["leave_g.wait()", "entered_g.set()"]) + self.assertEqual(sourceline, "g456()") - # Reap the spawned thread. - leave_g.set() - t.join() + # And the next record must be for g456(). + filename, lineno, funcname, sourceline = stack[i+1] + self.assertEqual(funcname, "g456") + self.assertIn(sourceline, ["leave_g.wait()", "entered_g.set()"]) + finally: + # Reap the spawned thread. + leave_g.set() + t.join() - # TODO: RUSTPYTHON, AttributeError: module 'sys' has no attribute '_current_exceptions' - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON; AttributeError: module 'sys' has no attribute '_current_exceptions' @threading_helper.reap_threads @threading_helper.requires_working_threading() def test_current_exceptions(self): @@ -488,7 +604,7 @@ def test_current_exceptions(self): # Spawn a thread that blocks at a known place. Then the main # thread does sys._current_frames(), and verifies that the frames # returned make sense. - entered_g = threading.Event() + g_raised = threading.Event() leave_g = threading.Event() thread_info = [] # the thread's id @@ -497,55 +613,54 @@ def f123(): def g456(): thread_info.append(threading.get_ident()) - entered_g.set() while True: try: raise ValueError("oops") except ValueError: + g_raised.set() if leave_g.wait(timeout=support.LONG_TIMEOUT): break t = threading.Thread(target=f123) t.start() - entered_g.wait() + g_raised.wait(timeout=support.LONG_TIMEOUT) - # At this point, t has finished its entered_g.set(), although it's - # impossible to guess whether it's still on that line or has moved on - # to its leave_g.wait(). - self.assertEqual(len(thread_info), 1) - thread_id = thread_info[0] - - d = sys._current_exceptions() - for tid in d: - self.assertIsInstance(tid, int) - self.assertGreater(tid, 0) - - main_id = threading.get_ident() - self.assertIn(main_id, d) - self.assertIn(thread_id, d) - self.assertEqual((None, None, None), d.pop(main_id)) - - # Verify that the captured thread frame is blocked in g456, called - # from f123. This is a little tricky, since various bits of - # threading.py are also in the thread's call stack. - exc_type, exc_value, exc_tb = d.pop(thread_id) - stack = traceback.extract_stack(exc_tb.tb_frame) - for i, (filename, lineno, funcname, sourceline) in enumerate(stack): - if funcname == "f123": - break - else: - self.fail("didn't find f123() on thread's call stack") - - self.assertEqual(sourceline, "g456()") + try: + self.assertEqual(len(thread_info), 1) + thread_id = thread_info[0] + + d = sys._current_exceptions() + for tid in d: + self.assertIsInstance(tid, int) + self.assertGreater(tid, 0) + + main_id = threading.get_ident() + self.assertIn(main_id, d) + self.assertIn(thread_id, d) + self.assertEqual(None, d.pop(main_id)) + + # Verify that the captured thread frame is blocked in g456, called + # from f123. This is a little tricky, since various bits of + # threading.py are also in the thread's call stack. + exc_value = d.pop(thread_id) + stack = traceback.extract_stack(exc_value.__traceback__.tb_frame) + for i, (filename, lineno, funcname, sourceline) in enumerate(stack): + if funcname == "f123": + break + else: + self.fail("didn't find f123() on thread's call stack") - # And the next record must be for g456(). - filename, lineno, funcname, sourceline = stack[i+1] - self.assertEqual(funcname, "g456") - self.assertTrue(sourceline.startswith("if leave_g.wait(")) + self.assertEqual(sourceline, "g456()") - # Reap the spawned thread. - leave_g.set() - t.join() + # And the next record must be for g456(). + filename, lineno, funcname, sourceline = stack[i+1] + self.assertEqual(funcname, "g456") + self.assertTrue((sourceline.startswith("if leave_g.wait(") or + sourceline.startswith("g_raised.set()"))) + finally: + # Reap the spawned thread. + leave_g.set() + t.join() def test_attributes(self): self.assertIsInstance(sys.api_version, int) @@ -647,7 +762,7 @@ def test_thread_info(self): self.assertEqual(len(info), 3) self.assertIn(info.name, ('nt', 'pthread', 'pthread-stubs', 'solaris', None)) self.assertIn(info.lock, ('semaphore', 'mutex+cond', None)) - if sys.platform.startswith(("linux", "freebsd")): + if sys.platform.startswith(("linux", "android", "freebsd")): self.assertEqual(info.name, "pthread") elif sys.platform == "win32": self.assertEqual(info.name, "nt") @@ -670,13 +785,23 @@ def test_43581(self): self.assertEqual(sys.__stdout__.encoding, sys.__stderr__.encoding) def test_intern(self): - global INTERN_NUMRUNS - INTERN_NUMRUNS += 1 + has_is_interned = (test.support.check_impl_detail(cpython=True) + or hasattr(sys, '_is_interned')) self.assertRaises(TypeError, sys.intern) - s = "never interned before" + str(INTERN_NUMRUNS) + self.assertRaises(TypeError, sys.intern, b'abc') + if has_is_interned: + self.assertRaises(TypeError, sys._is_interned) + self.assertRaises(TypeError, sys._is_interned, b'abc') + s = "never interned before" + str(random.randrange(0, 10**9)) self.assertTrue(sys.intern(s) is s) + if has_is_interned: + self.assertIs(sys._is_interned(s), True) s2 = s.swapcase().swapcase() + if has_is_interned: + self.assertIs(sys._is_interned(s2), False) self.assertTrue(sys.intern(s2) is s) + if has_is_interned: + self.assertIs(sys._is_interned(s2), False) # Subclasses of string can't be interned, because they # provide too much opportunity for insane things to happen. @@ -688,6 +813,73 @@ def __hash__(self): return 123 self.assertRaises(TypeError, sys.intern, S("abc")) + if has_is_interned: + self.assertIs(sys._is_interned(S("abc")), False) + + @support.cpython_only + @requires_subinterpreters + def test_subinterp_intern_dynamically_allocated(self): + # Implementation detail: Dynamically allocated strings + # are distinct between interpreters + s = "never interned before" + str(random.randrange(0, 10**9)) + t = sys.intern(s) + self.assertIs(t, s) + + interp = interpreters.create() + interp.exec(textwrap.dedent(f''' + import sys + + # set `s`, avoid parser interning & constant folding + s = str({s.encode()!r}, 'utf-8') + + t = sys.intern(s) + + assert id(t) != {id(s)}, (id(t), {id(s)}) + assert id(t) != {id(t)}, (id(t), {id(t)}) + ''')) + + @support.cpython_only + @requires_subinterpreters + def test_subinterp_intern_statically_allocated(self): + # Implementation detail: Statically allocated strings are shared + # between interpreters. + # See Tools/build/generate_global_objects.py for the list + # of strings that are always statically allocated. + for s in ('__init__', 'CANCELLED', '', 'utf-8', + '{{', '', '\n', '_', 'x', '\0', '\N{CEDILLA}', '\xff', + ): + with self.subTest(s=s): + t = sys.intern(s) + + interp = interpreters.create() + interp.exec(textwrap.dedent(f''' + import sys + + # set `s`, avoid parser interning & constant folding + s = str({s.encode()!r}, 'utf-8') + + t = sys.intern(s) + assert id(t) == {id(t)}, (id(t), {id(t)}) + ''')) + + @support.cpython_only + @requires_subinterpreters + def test_subinterp_intern_singleton(self): + # Implementation detail: singletons are used for 0- and 1-character + # latin1 strings. + for s in '', '\n', '_', 'x', '\0', '\N{CEDILLA}', '\xff': + with self.subTest(s=s): + interp = interpreters.create() + interp.exec(textwrap.dedent(f''' + import sys + + # set `s`, avoid parser interning & constant folding + s = str({s.encode()!r}, 'utf-8') + + assert id(s) == {id(s)} + t = sys.intern(s) + ''')) + self.assertTrue(sys._is_interned(s)) def test_sys_flags(self): self.assertTrue(sys.flags) @@ -709,12 +901,7 @@ def test_sys_flags(self): def assert_raise_on_new_sys_type(self, sys_attr): # Users are intentionally prevented from creating new instances of # sys.flags, sys.version_info, and sys.getwindowsversion. - arg = sys_attr - attr_type = type(sys_attr) - with self.assertRaises(TypeError): - attr_type(arg) - with self.assertRaises(TypeError): - attr_type.__new__(attr_type, arg) + support.check_disallow_instantiation(self, type(sys_attr), sys_attr) def test_sys_flags_no_instantiation(self): self.assert_raise_on_new_sys_type(sys.flags) @@ -722,8 +909,7 @@ def test_sys_flags_no_instantiation(self): def test_sys_version_info_no_instantiation(self): self.assert_raise_on_new_sys_type(sys.version_info) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_sys_getwindowsversion_no_instantiation(self): # Skip if not being run on Windows. test.support.get_attribute(sys, "getwindowsversion") @@ -733,8 +919,8 @@ def test_sys_getwindowsversion_no_instantiation(self): def test_clear_type_cache(self): sys._clear_type_cache() - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON + @force_not_colorized @support.requires_subprocess() def test_ioencoding(self): env = dict(os.environ) @@ -898,14 +1084,12 @@ def check_locale_surrogateescape(self, locale): 'stdout: surrogateescape\n' 'stderr: backslashreplace\n') - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON @support.requires_subprocess() def test_c_locale_surrogateescape(self): self.check_locale_surrogateescape('C') - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON @support.requires_subprocess() def test_posix_locale_surrogateescape(self): self.check_locale_surrogateescape('POSIX') @@ -958,12 +1142,12 @@ def test_debugmallocstats(self): "sys.getallocatedblocks unavailable on this build") def test_getallocatedblocks(self): try: - import _testcapi + import _testinternalcapi except ImportError: with_pymalloc = support.with_pymalloc() else: try: - alloc_name = _testcapi.pymem_getallocatorsname() + alloc_name = _testinternalcapi.pymem_getallocatorsname() except RuntimeError as exc: # "cannot get allocators name" (ex: tracemalloc is used) with_pymalloc = True @@ -995,8 +1179,14 @@ def test_getallocatedblocks(self): c = sys.getallocatedblocks() self.assertIn(c, range(b - 50, b + 50)) - # TODO: RUSTPYTHON, AtExit.__del__ is not invoked because module destruction is missing. - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON; AssertionError: False is not true + def test_is_gil_enabled(self): + if support.Py_GIL_DISABLED: + self.assertIs(type(sys._is_gil_enabled()), bool) + else: + self.assertTrue(sys._is_gil_enabled()) + + @unittest.expectedFailure # TODO: RUSTPYTHON; AtExit.__del__ is not invoked because module destruction is missing. def test_is_finalizing(self): self.assertIs(sys.is_finalizing(), False) # Don't use the atexit module because _Py_Finalizing is only set @@ -1018,8 +1208,7 @@ def __del__(self): rc, stdout, stderr = assert_python_ok('-c', code) self.assertEqual(stdout.rstrip(), b'True') - # TODO: RUSTPYTHON, IndexError: list index out of range - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON; IndexError: list index out of range def test_issue20602(self): # sys.flags and sys.float_info were wiped during shutdown. code = """if 1: @@ -1052,15 +1241,15 @@ def __del__(self): self.assertEqual(stdout.rstrip(), b"") self.assertEqual(stderr.rstrip(), b"") - @unittest.skipUnless(hasattr(sys, 'getandroidapilevel'), - 'need sys.getandroidapilevel()') + @unittest.expectedFailure # TODO: RUSTPYTHON; AttributeError: module 'sys' has no attribute 'getandroidapilevel' + @unittest.skipUnless(sys.platform == "android", "Android only") def test_getandroidapilevel(self): level = sys.getandroidapilevel() self.assertIsInstance(level, int) self.assertGreater(level, 0) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON + @force_not_colorized @support.requires_subprocess() def test_sys_tracebacklimit(self): code = """if 1: @@ -1081,14 +1270,20 @@ def check(tracebacklimit, expected): traceback = [ b'Traceback (most recent call last):', b' File "", line 8, in ', + b' f2()', + b' ~~^^', b' File "", line 6, in f2', + b' f1()', + b' ~~^^', b' File "", line 4, in f1', + b' 1 / 0', + b' ~~^~~', b'ZeroDivisionError: division by zero' ] check(10, traceback) check(3, traceback) - check(2, traceback[:1] + traceback[2:]) - check(1, traceback[:1] + traceback[3:]) + check(2, traceback[:1] + traceback[4:]) + check(1, traceback[:1] + traceback[7:]) check(0, [traceback[-1]]) check(-1, [traceback[-1]]) check(1<<1000, traceback) @@ -1124,15 +1319,13 @@ def test_orig_argv(self): self.assertEqual(proc.stdout.rstrip().splitlines(), expected, proc) - # TODO: RUSTPYTHON, AttributeError: module 'sys' has no attribute 'stdlib_module_names' - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON; AttributeError: module 'sys' has no attribute 'stdlib_module_names' def test_module_names(self): self.assertIsInstance(sys.stdlib_module_names, frozenset) for name in sys.stdlib_module_names: self.assertIsInstance(name, str) - # TODO: RUSTPYTHON, AttributeError: module 'sys' has no attribute '_stdlib_dir' - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON; AttributeError: module 'sys' has no attribute '_stdlib_dir' def test_stdlib_dir(self): os = import_helper.import_fresh_module('os') marker = getattr(os, '__file__', None) @@ -1142,41 +1335,76 @@ def test_stdlib_dir(self): self.assertEqual(os.path.normpath(sys._stdlib_dir), os.path.normpath(expected)) + @unittest.skipUnless(hasattr(sys, 'getobjects'), 'need sys.getobjects()') + def test_getobjects(self): + # sys.getobjects(0) + all_objects = sys.getobjects(0) + self.assertIsInstance(all_objects, list) + self.assertGreater(len(all_objects), 0) + + # sys.getobjects(0, MyType) + class MyType: + pass + size = 100 + my_objects = [MyType() for _ in range(size)] + get_objects = sys.getobjects(0, MyType) + self.assertEqual(len(get_objects), size) + for obj in get_objects: + self.assertIsInstance(obj, MyType) + + # sys.getobjects(3, MyType) + get_objects = sys.getobjects(3, MyType) + self.assertEqual(len(get_objects), 3) + + @unittest.skipUnless(hasattr(sys, '_stats_on'), 'need Py_STATS build') + def test_pystats(self): + # Call the functions, just check that they don't crash + # Cannot save/restore state. + sys._stats_on() + sys._stats_off() + sys._stats_clear() + sys._stats_dump() + + @test.support.cpython_only + @unittest.skipUnless(hasattr(sys, 'abiflags'), 'need sys.abiflags') + def test_disable_gil_abi(self): + self.assertEqual('t' in sys.abiflags, support.Py_GIL_DISABLED) + @test.support.cpython_only class UnraisableHookTest(unittest.TestCase): - def write_unraisable_exc(self, exc, err_msg, obj): - import _testcapi - import types - err_msg2 = f"Exception ignored {err_msg}" - try: - _testcapi.write_unraisable_exc(exc, err_msg, obj) - return types.SimpleNamespace(exc_type=type(exc), - exc_value=exc, - exc_traceback=exc.__traceback__, - err_msg=err_msg2, - object=obj) - finally: - # Explicitly break any reference cycle - exc = None - def test_original_unraisablehook(self): - for err_msg in (None, "original hook"): - with self.subTest(err_msg=err_msg): - obj = "an object" - - with test.support.captured_output("stderr") as stderr: - with test.support.swap_attr(sys, 'unraisablehook', - sys.__unraisablehook__): - self.write_unraisable_exc(ValueError(42), err_msg, obj) - - err = stderr.getvalue() - if err_msg is not None: - self.assertIn(f'Exception ignored {err_msg}: {obj!r}\n', err) - else: - self.assertIn(f'Exception ignored in: {obj!r}\n', err) - self.assertIn('Traceback (most recent call last):\n', err) - self.assertIn('ValueError: 42\n', err) + _testcapi = import_helper.import_module('_testcapi') + from _testcapi import err_writeunraisable, err_formatunraisable + obj = hex + + with support.swap_attr(sys, 'unraisablehook', + sys.__unraisablehook__): + with support.captured_stderr() as stderr: + err_writeunraisable(ValueError(42), obj) + lines = stderr.getvalue().splitlines() + self.assertEqual(lines[0], f'Exception ignored in: {obj!r}') + self.assertEqual(lines[1], 'Traceback (most recent call last):') + self.assertEqual(lines[-1], 'ValueError: 42') + + with support.captured_stderr() as stderr: + err_writeunraisable(ValueError(42), None) + lines = stderr.getvalue().splitlines() + self.assertEqual(lines[0], 'Traceback (most recent call last):') + self.assertEqual(lines[-1], 'ValueError: 42') + + with support.captured_stderr() as stderr: + err_formatunraisable(ValueError(42), 'Error in %R', obj) + lines = stderr.getvalue().splitlines() + self.assertEqual(lines[0], f'Error in {obj!r}:') + self.assertEqual(lines[1], 'Traceback (most recent call last):') + self.assertEqual(lines[-1], 'ValueError: 42') + + with support.captured_stderr() as stderr: + err_formatunraisable(ValueError(42), None) + lines = stderr.getvalue().splitlines() + self.assertEqual(lines[0], 'Traceback (most recent call last):') + self.assertEqual(lines[-1], 'ValueError: 42') def test_original_unraisablehook_err(self): # bpo-22836: PyErr_WriteUnraisable() should give sensible reports @@ -1223,6 +1451,8 @@ def test_original_unraisablehook_exception_qualname(self): # Check that the exception is printed with its qualified name # rather than just classname, and the module names appears # unless it is one of the hard-coded exclusions. + _testcapi = import_helper.import_module('_testcapi') + from _testcapi import err_writeunraisable class A: class B: class X(Exception): @@ -1234,9 +1464,7 @@ class X(Exception): with test.support.captured_stderr() as stderr, test.support.swap_attr( sys, 'unraisablehook', sys.__unraisablehook__ ): - expected = self.write_unraisable_exc( - A.B.X(), "msg", "obj" - ) + err_writeunraisable(A.B.X(), "obj") report = stderr.getvalue() self.assertIn(A.B.X.__qualname__, report) if moduleName in ['builtins', '__main__']: @@ -1252,34 +1480,45 @@ def test_original_unraisablehook_wrong_type(self): sys.unraisablehook(exc) def test_custom_unraisablehook(self): + _testcapi = import_helper.import_module('_testcapi') + from _testcapi import err_writeunraisable, err_formatunraisable hook_args = None def hook_func(args): nonlocal hook_args hook_args = args - obj = object() + obj = hex try: with test.support.swap_attr(sys, 'unraisablehook', hook_func): - expected = self.write_unraisable_exc(ValueError(42), - "custom hook", obj) - for attr in "exc_type exc_value exc_traceback err_msg object".split(): - self.assertEqual(getattr(hook_args, attr), - getattr(expected, attr), - (hook_args, expected)) + exc = ValueError(42) + err_writeunraisable(exc, obj) + self.assertIs(hook_args.exc_type, type(exc)) + self.assertIs(hook_args.exc_value, exc) + self.assertIs(hook_args.exc_traceback, exc.__traceback__) + self.assertIsNone(hook_args.err_msg) + self.assertEqual(hook_args.object, obj) + + err_formatunraisable(exc, "custom hook %R", obj) + self.assertIs(hook_args.exc_type, type(exc)) + self.assertIs(hook_args.exc_value, exc) + self.assertIs(hook_args.exc_traceback, exc.__traceback__) + self.assertEqual(hook_args.err_msg, f'custom hook {obj!r}') + self.assertIsNone(hook_args.object) finally: # expected and hook_args contain an exception: break reference cycle expected = None hook_args = None def test_custom_unraisablehook_fail(self): + _testcapi = import_helper.import_module('_testcapi') + from _testcapi import err_writeunraisable def hook_func(*args): raise Exception("hook_func failed") with test.support.captured_output("stderr") as stderr: with test.support.swap_attr(sys, 'unraisablehook', hook_func): - self.write_unraisable_exc(ValueError(42), - "custom hook fail", None) + err_writeunraisable(ValueError(42), "custom hook fail") err = stderr.getvalue() self.assertIn(f'Exception ignored in sys.unraisablehook: ' @@ -1295,8 +1534,9 @@ class SizeofTest(unittest.TestCase): def setUp(self): self.P = struct.calcsize('P') self.longdigit = sys.int_info.sizeof_digit - import _testinternalcapi + _testinternalcapi = import_helper.import_module("_testinternalcapi") self.gc_headsize = _testinternalcapi.SIZEOF_PYGC_HEAD + self.managed_pre_header_size = _testinternalcapi.SIZEOF_MANAGED_PRE_HEADER check_sizeof = test.support.check_sizeof @@ -1332,7 +1572,7 @@ class OverflowSizeof(int): def __sizeof__(self): return int(self) self.assertEqual(sys.getsizeof(OverflowSizeof(sys.maxsize)), - sys.maxsize + self.gc_headsize) + sys.maxsize + self.gc_headsize + self.managed_pre_header_size) with self.assertRaises(OverflowError): sys.getsizeof(OverflowSizeof(sys.maxsize + 1)) with self.assertRaises(ValueError): @@ -1454,10 +1694,10 @@ class C(object): pass def func(): return sys._getframe() x = func() - check(x, size('3Pi3c7P2ic??2P')) + check(x, size('3Pi2c2P7P2ic??2P')) # function def func(): pass - check(func, size('14Pi')) + check(func, size('15Pi')) class c(): @staticmethod def foo(): @@ -1471,7 +1711,7 @@ def bar(cls): check(bar, size('PP')) # generator def get_gen(): yield 1 - check(get_gen(), size('P2P4P4c7P2ic??P')) + check(get_gen(), size('PP4P4c7P2ic??2P')) # iterator check(iter('abc'), size('lP')) # callable-iterator @@ -1499,7 +1739,10 @@ def get_gen(): yield 1 check(int(PyLong_BASE**2-1), vsize('') + 2*self.longdigit) check(int(PyLong_BASE**2), vsize('') + 3*self.longdigit) # module - check(unittest, size('PnPPP')) + if support.Py_GIL_DISABLED: + check(unittest, size('PPPPPP')) + else: + check(unittest, size('PPPPP')) # None check(None, size('')) # NotImplementedType @@ -1514,9 +1757,10 @@ def delx(self): del self.__x x = property(getx, setx, delx, "") check(x, size('5Pi')) # PyCapsule - # XXX + check(_datetime.datetime_CAPI, size('6P')) # rangeiterator - check(iter(range(1)), size('4l')) + check(iter(range(1)), size('3l')) + check(iter(range(2**65)), size('3P')) # reverse check(reversed(''), size('nP')) # range @@ -1553,8 +1797,8 @@ def delx(self): del self.__x check((1,2,3), vsize('') + 3*self.P) # type # static type: PyTypeObject - fmt = 'P2nPI13Pl4Pn9Pn12PIP' - s = vsize('2P' + fmt) + fmt = 'P2nPI13Pl4Pn9Pn12PIPc' + s = vsize(fmt) check(int, s) # class s = vsize(fmt + # PyTypeObject @@ -1564,7 +1808,7 @@ def delx(self): del self.__x '10P' # PySequenceMethods '2P' # PyBufferProcs '6P' - '1P' # Specializer cache + '1PIP' # Specializer cache ) class newstyleclass(object): pass # Separate block for PyDictKeysObject with 8 keys and 5 entries @@ -1586,8 +1830,8 @@ class newstyleclass(object): pass '\u0100'*40, '\uffff'*100, '\U00010000'*30, '\U0010ffff'*100] # also update field definitions in test_unicode.test_raiseMemError - asciifields = "nnbP" - compactfields = asciifields + "nPn" + asciifields = "nnb" + compactfields = asciifields + "nP" unicodefields = compactfields + "P" for s in samples: maxchar = ord(max(s)) @@ -1611,11 +1855,15 @@ class newstyleclass(object): pass # TODO: add check that forces layout of unicodefields # weakref import weakref - check(weakref.ref(int), size('2Pn3P')) + if support.Py_GIL_DISABLED: + expected = size('2Pn4P') + else: + expected = size('2Pn3P') + check(weakref.ref(int), expected) # weakproxy # XXX # weakcallableproxy - check(weakref.proxy(int), size('2Pn3P')) + check(weakref.proxy(int), expected) def check_slots(self, obj, base, extra): expected = sys.getsizeof(base) + struct.calcsize(extra) @@ -1657,15 +1905,16 @@ def test_pythontypes(self): check(_ast.AST(), size('P')) try: raise TypeError - except TypeError: - tb = sys.exc_info()[2] + except TypeError as e: + tb = e.__traceback__ # traceback if tb is not None: check(tb, size('2P2i')) # symtable entry # XXX # sys.flags - check(sys.flags, vsize('') + self.P * len(sys.flags)) + # FIXME: The +1 will not be necessary once gh-122575 is fixed + check(sys.flags, vsize('') + self.P * (1 + len(sys.flags))) def test_asyncgen_hooks(self): old = sys.get_asyncgen_hooks() @@ -1673,6 +1922,21 @@ def test_asyncgen_hooks(self): self.assertIsNone(old.finalizer) firstiter = lambda *a: None + finalizer = lambda *a: None + + with self.assertRaises(TypeError): + sys.set_asyncgen_hooks(firstiter=firstiter, finalizer="invalid") + cur = sys.get_asyncgen_hooks() + self.assertIsNone(cur.firstiter) + self.assertIsNone(cur.finalizer) + + # gh-118473 + with self.assertRaises(TypeError): + sys.set_asyncgen_hooks(firstiter="invalid", finalizer=finalizer) + cur = sys.get_asyncgen_hooks() + self.assertIsNone(cur.firstiter) + self.assertIsNone(cur.finalizer) + sys.set_asyncgen_hooks(firstiter=firstiter) hooks = sys.get_asyncgen_hooks() self.assertIs(hooks.firstiter, firstiter) @@ -1680,7 +1944,6 @@ def test_asyncgen_hooks(self): self.assertIs(hooks.finalizer, None) self.assertIs(hooks[1], None) - finalizer = lambda *a: None sys.set_asyncgen_hooks(finalizer=finalizer) hooks = sys.get_asyncgen_hooks() self.assertIs(hooks.firstiter, firstiter) diff --git a/crates/vm/src/stdlib/sys.rs b/crates/vm/src/stdlib/sys.rs index 45b1d56605..57507ba484 100644 --- a/crates/vm/src/stdlib/sys.rs +++ b/crates/vm/src/stdlib/sys.rs @@ -331,6 +331,11 @@ mod sys { // TODO: sys.audit implementation } + #[pyfunction] + const fn _is_gil_enabled() -> bool { + false // We don't implement GIL + } + #[pyfunction] fn exit(code: OptionalArg, vm: &VirtualMachine) -> PyResult { let code = code.unwrap_or_none(vm);