Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 85 additions & 43 deletions Lib/subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,16 @@
else:
_mswindows = True

# wasm32-emscripten and wasm32-wasi do not support processes
_can_fork_exec = sys.platform not in {"emscripten", "wasi"}
# some platforms do not support subprocesses
_can_fork_exec = sys.platform not in {"emscripten", "wasi", "ios", "tvos", "watchos"}

if _mswindows:
import _winapi
from _winapi import (CREATE_NEW_CONSOLE, CREATE_NEW_PROCESS_GROUP,
STD_INPUT_HANDLE, STD_OUTPUT_HANDLE,
STD_ERROR_HANDLE, SW_HIDE,
STARTF_USESTDHANDLES, STARTF_USESHOWWINDOW,
STARTF_FORCEONFEEDBACK, STARTF_FORCEOFFFEEDBACK,
ABOVE_NORMAL_PRIORITY_CLASS, BELOW_NORMAL_PRIORITY_CLASS,
HIGH_PRIORITY_CLASS, IDLE_PRIORITY_CLASS,
NORMAL_PRIORITY_CLASS, REALTIME_PRIORITY_CLASS,
Expand All @@ -93,6 +94,7 @@
"STD_INPUT_HANDLE", "STD_OUTPUT_HANDLE",
"STD_ERROR_HANDLE", "SW_HIDE",
"STARTF_USESTDHANDLES", "STARTF_USESHOWWINDOW",
"STARTF_FORCEONFEEDBACK", "STARTF_FORCEOFFFEEDBACK",
"STARTUPINFO",
"ABOVE_NORMAL_PRIORITY_CLASS", "BELOW_NORMAL_PRIORITY_CLASS",
"HIGH_PRIORITY_CLASS", "IDLE_PRIORITY_CLASS",
Expand All @@ -103,18 +105,22 @@
if _can_fork_exec:
from _posixsubprocess import fork_exec as _fork_exec
# used in methods that are called by __del__
_waitpid = os.waitpid
_waitstatus_to_exitcode = os.waitstatus_to_exitcode
_WIFSTOPPED = os.WIFSTOPPED
_WSTOPSIG = os.WSTOPSIG
_WNOHANG = os.WNOHANG
class _del_safe:
waitpid = os.waitpid
waitstatus_to_exitcode = os.waitstatus_to_exitcode
WIFSTOPPED = os.WIFSTOPPED
WSTOPSIG = os.WSTOPSIG
WNOHANG = os.WNOHANG
ECHILD = errno.ECHILD
else:
_fork_exec = None
_waitpid = None
_waitstatus_to_exitcode = None
_WIFSTOPPED = None
_WSTOPSIG = None
_WNOHANG = None
class _del_safe:
waitpid = None
waitstatus_to_exitcode = None
WIFSTOPPED = None
WSTOPSIG = None
WNOHANG = None
ECHILD = errno.ECHILD

import select
import selectors

Expand Down Expand Up @@ -346,7 +352,7 @@ def _args_from_interpreter_flags():
if dev_mode:
args.extend(('-X', 'dev'))
for opt in ('faulthandler', 'tracemalloc', 'importtime',
'frozen_modules', 'showrefcount', 'utf8'):
'frozen_modules', 'showrefcount', 'utf8', 'gil'):
if opt in xoptions:
value = xoptions[opt]
if value is True:
Expand Down Expand Up @@ -380,7 +386,7 @@ def _text_encoding():

def call(*popenargs, timeout=None, **kwargs):
"""Run command with arguments. Wait for command to complete or
timeout, then return the returncode attribute.
for timeout seconds, then return the returncode attribute.

The arguments are the same as for the Popen constructor. Example:

Expand Down Expand Up @@ -517,8 +523,8 @@ def run(*popenargs,
in the returncode attribute, and output & stderr attributes if those streams
were captured.

If timeout is given, and the process takes too long, a TimeoutExpired
exception will be raised.
If timeout (seconds) is given and the process takes too long,
a TimeoutExpired exception will be raised.

There is an optional argument "input", allowing you to
pass bytes or a string to the subprocess's stdin. If you use this argument
Expand Down Expand Up @@ -709,6 +715,9 @@ def _use_posix_spawn():
# os.posix_spawn() is not available
return False

if ((_env := os.environ.get('_PYTHON_SUBPROCESS_USE_POSIX_SPAWN')) in ('0', '1')):
return bool(int(_env))

if sys.platform in ('darwin', 'sunos5'):
# posix_spawn() is a syscall on both macOS and Solaris,
# and properly reports errors
Expand Down Expand Up @@ -744,6 +753,7 @@ def _use_posix_spawn():
# guarantee the given libc/syscall API will be used.
_USE_POSIX_SPAWN = _use_posix_spawn()
_USE_VFORK = True
_HAVE_POSIX_SPAWN_CLOSEFROM = hasattr(os, 'POSIX_SPAWN_CLOSEFROM')


class Popen:
Expand Down Expand Up @@ -834,6 +844,9 @@ def __init__(self, args, bufsize=-1, executable=None,
if not isinstance(bufsize, int):
raise TypeError("bufsize must be an integer")

if stdout is STDOUT:
raise ValueError("STDOUT can only be used for stderr")

if pipesize is None:
pipesize = -1 # Restore default
if not isinstance(pipesize, int):
Expand Down Expand Up @@ -1224,8 +1237,11 @@ def communicate(self, input=None, timeout=None):

finally:
self._communication_started = True

sts = self.wait(timeout=self._remaining_time(endtime))
try:
sts = self.wait(timeout=self._remaining_time(endtime))
except TimeoutExpired as exc:
exc.timeout = timeout
raise

return (stdout, stderr)

Expand Down Expand Up @@ -1600,6 +1616,10 @@ def _readerthread(self, fh, buffer):
fh.close()


def _writerthread(self, input):
self._stdin_write(input)


def _communicate(self, input, endtime, orig_timeout):
# Start reader threads feeding into a list hanging off of this
# object, unless they've already been started.
Expand All @@ -1618,8 +1638,23 @@ def _communicate(self, input, endtime, orig_timeout):
self.stderr_thread.daemon = True
self.stderr_thread.start()

if self.stdin:
self._stdin_write(input)
# Start writer thread to send input to stdin, unless already
# started. The thread writes input and closes stdin when done,
# or continues in the background on timeout.
if self.stdin and not hasattr(self, "_stdin_thread"):
self._stdin_thread = \
threading.Thread(target=self._writerthread,
args=(input,))
self._stdin_thread.daemon = True
self._stdin_thread.start()

# Wait for the writer thread, or time out. If we time out, the
# thread remains writing and the fd left open in case the user
# calls communicate again.
if hasattr(self, "_stdin_thread"):
self._stdin_thread.join(self._remaining_time(endtime))
if self._stdin_thread.is_alive():
raise TimeoutExpired(self.args, orig_timeout)

# Wait for the reader threads, or time out. If we time out, the
# threads remain reading and the fds left open in case the user
Expand Down Expand Up @@ -1749,14 +1784,11 @@ def _get_handles(self, stdin, stdout, stderr):
errread, errwrite)


def _posix_spawn(self, args, executable, env, restore_signals,
def _posix_spawn(self, args, executable, env, restore_signals, close_fds,
p2cread, p2cwrite,
c2pread, c2pwrite,
errread, errwrite):
"""Execute program using os.posix_spawn()."""
if env is None:
env = os.environ

kwargs = {}
if restore_signals:
# See _Py_RestoreSignals() in Python/pylifecycle.c
Expand All @@ -1778,6 +1810,10 @@ def _posix_spawn(self, args, executable, env, restore_signals,
):
if fd != -1:
file_actions.append((os.POSIX_SPAWN_DUP2, fd, fd2))

if close_fds:
file_actions.append((os.POSIX_SPAWN_CLOSEFROM, 3))

if file_actions:
kwargs['file_actions'] = file_actions

Expand Down Expand Up @@ -1825,7 +1861,7 @@ def _execute_child(self, args, executable, preexec_fn, close_fds,
if (_USE_POSIX_SPAWN
and os.path.dirname(executable)
and preexec_fn is None
and not close_fds
and (not close_fds or _HAVE_POSIX_SPAWN_CLOSEFROM)
and not pass_fds
and cwd is None
and (p2cread == -1 or p2cread > 2)
Expand All @@ -1837,7 +1873,7 @@ def _execute_child(self, args, executable, preexec_fn, close_fds,
and gids is None
and uid is None
and umask < 0):
self._posix_spawn(args, executable, env, restore_signals,
self._posix_spawn(args, executable, env, restore_signals, close_fds,
p2cread, p2cwrite,
c2pread, c2pwrite,
errread, errwrite)
Expand Down Expand Up @@ -1958,20 +1994,16 @@ def _execute_child(self, args, executable, preexec_fn, close_fds,
raise child_exception_type(err_msg)


def _handle_exitstatus(self, sts,
_waitstatus_to_exitcode=_waitstatus_to_exitcode,
_WIFSTOPPED=_WIFSTOPPED,
_WSTOPSIG=_WSTOPSIG):
def _handle_exitstatus(self, sts, _del_safe=_del_safe):
"""All callers to this function MUST hold self._waitpid_lock."""
# This method is called (indirectly) by __del__, so it cannot
# refer to anything outside of its local scope.
if _WIFSTOPPED(sts):
self.returncode = -_WSTOPSIG(sts)
if _del_safe.WIFSTOPPED(sts):
self.returncode = -_del_safe.WSTOPSIG(sts)
else:
self.returncode = _waitstatus_to_exitcode(sts)
self.returncode = _del_safe.waitstatus_to_exitcode(sts)

def _internal_poll(self, _deadstate=None, _waitpid=_waitpid,
_WNOHANG=_WNOHANG, _ECHILD=errno.ECHILD):
def _internal_poll(self, _deadstate=None, _del_safe=_del_safe):
"""Check if child process has terminated. Returns returncode
attribute.

Expand All @@ -1987,13 +2019,13 @@ def _internal_poll(self, _deadstate=None, _waitpid=_waitpid,
try:
if self.returncode is not None:
return self.returncode # Another thread waited.
pid, sts = _waitpid(self.pid, _WNOHANG)
pid, sts = _del_safe.waitpid(self.pid, _del_safe.WNOHANG)
if pid == self.pid:
self._handle_exitstatus(sts)
except OSError as e:
if _deadstate is not None:
self.returncode = _deadstate
elif e.errno == _ECHILD:
elif e.errno == _del_safe.ECHILD:
# This happens if SIGCLD is set to be ignored or
# waiting for child processes has otherwise been
# disabled for our process. This child is dead, we
Expand Down Expand Up @@ -2067,6 +2099,10 @@ def _communicate(self, input, endtime, orig_timeout):
self.stdin.flush()
except BrokenPipeError:
pass # communicate() must ignore BrokenPipeError.
except ValueError:
# ignore ValueError: I/O operation on closed file.
if not self.stdin.closed:
raise
if not input:
try:
self.stdin.close()
Expand All @@ -2092,10 +2128,13 @@ def _communicate(self, input, endtime, orig_timeout):
self._save_input(input)

if self._input:
input_view = memoryview(self._input)
if not isinstance(self._input, memoryview):
input_view = memoryview(self._input)
else:
input_view = self._input.cast("b") # byte input required

with _PopenSelector() as selector:
if self.stdin and input:
if self.stdin and not self.stdin.closed and self._input:
selector.register(self.stdin, selectors.EVENT_WRITE)
if self.stdout and not self.stdout.closed:
selector.register(self.stdout, selectors.EVENT_READ)
Expand Down Expand Up @@ -2128,7 +2167,7 @@ def _communicate(self, input, endtime, orig_timeout):
selector.unregister(key.fileobj)
key.fileobj.close()
else:
if self._input_offset >= len(self._input):
if self._input_offset >= len(input_view):
selector.unregister(key.fileobj)
key.fileobj.close()
elif key.fileobj in (self.stdout, self.stderr):
Expand All @@ -2137,8 +2176,11 @@ def _communicate(self, input, endtime, orig_timeout):
selector.unregister(key.fileobj)
key.fileobj.close()
self._fileobj2output[key.fileobj].append(data)

self.wait(timeout=self._remaining_time(endtime))
try:
self.wait(timeout=self._remaining_time(endtime))
except TimeoutExpired as exc:
exc.timeout = orig_timeout
raise

# All data exchanged. Translate lists into strings.
if stdout is not None:
Expand Down
3 changes: 0 additions & 3 deletions Lib/test/test_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,9 +344,6 @@ def testIteration(self):
class COtherFileTests(OtherFileTests, unittest.TestCase):
open = io.open

@unittest.expectedFailure # TODO: RUSTPYTHON
def testSetBufferSize(self):
return super().testSetBufferSize()

class PyOtherFileTests(OtherFileTests, unittest.TestCase):
open = staticmethod(pyio.open)
Expand Down
Loading
Loading