Commit f47505e6 authored by Denis Bilenko's avatar Denis Bilenko

add gevent/subprocess.py

parent 6938a34b
import sys
import os
import errno
import types
import gc
import signal
import traceback
from gevent.event import Event
from gevent.hub import get_hub
from gevent.select import select
from gevent.fileobject import FileObject
from gevent.greenlet import Greenlet, joinall
spawn = Greenlet.spawn
__subprocess__ = __import__('subprocess')
# Standard functions and classes that this module re-implements in a gevent-aware way.
__implements__ = ['Popen']
# Standard functions and classes that this module re-imports.
__imports__ = ['PIPE',
'STDOUT',
'call',
'check_call',
'check_output',
'CalledProcessError',
# Windows:
'CREATE_NEW_CONSOLE',
'CREATE_NEW_PROCESS_GROUP',
'STD_INPUT_HANDLE',
'STD_OUTPUT_HANDLE',
'STD_ERROR_HANDLE',
'SW_HIDE',
'STARTF_USESTDHANDLES',
'STARTF_USESHOWWINDOW']
__extra__ = ['MAXFD',
'_eintr_retry_call',
'STARTUPINFO',
'pywintypes',
'_PIPE_BUF',
'_subprocess',
'list2cmdline']
for name in (__imports__ + __extra__):
try:
value = getattr(__subprocess__, name)
globals()[name] = value
except AttributeError:
if name in __imports__:
__imports__.remove(name)
else:
__extra__.remove(name)
__all__ = __implements__ + __imports__
mswindows = sys.platform == 'win32'
if mswindows:
import msvcrt
else:
import fcntl
import pickle
# When select or poll has indicated that the file is writable,
# we can write up to _PIPE_BUF bytes without risk of blocking.
# POSIX defines PIPE_BUF as >= 512.
_PIPE_BUF = getattr(select, 'PIPE_BUF', 512)
class Popen(object):
def __init__(self, args, bufsize=0, executable=None,
stdin=None, stdout=None, stderr=None,
preexec_fn=None, close_fds=False, shell=False,
cwd=None, env=None, universal_newlines=False,
startupinfo=None, creationflags=0, threadpool=None):
"""Create new Popen instance."""
if not isinstance(bufsize, (int, long)):
raise TypeError("bufsize must be an integer")
hub = get_hub()
if mswindows:
if preexec_fn is not None:
raise ValueError("preexec_fn is not supported on Windows "
"platforms")
if close_fds and (stdin is not None or stdout is not None or
stderr is not None):
raise ValueError("close_fds is not supported on Windows "
"platforms if you redirect stdin/stdout/stderr")
if threadpool is None:
threadpool = hub.threadpool
self.threadpool = threadpool
else:
# POSIX
if startupinfo is not None:
raise ValueError("startupinfo is only supported on Windows "
"platforms")
if creationflags != 0:
raise ValueError("creationflags is only supported on Windows "
"platforms")
assert threadpool is None
self._loop = hub.loop
self._event = Event()
self.stdin = None
self.stdout = None
self.stderr = None
self.pid = None
self.returncode = None
self.universal_newlines = universal_newlines
# Input and output objects. The general principle is like
# this:
#
# Parent Child
# ------ -----
# p2cwrite ---stdin---> p2cread
# c2pread <--stdout--- c2pwrite
# errread <--stderr--- errwrite
#
# On POSIX, the child objects are file descriptors. On
# Windows, these are Windows file handles. The parent objects
# are file descriptors on both platforms. The parent objects
# are None when not using PIPEs. The child objects are None
# when not redirecting.
(p2cread, p2cwrite,
c2pread, c2pwrite,
errread, errwrite) = self._get_handles(stdin, stdout, stderr)
self._execute_child(args, executable, preexec_fn, close_fds,
cwd, env, universal_newlines,
startupinfo, creationflags, shell,
p2cread, p2cwrite,
c2pread, c2pwrite,
errread, errwrite)
if mswindows:
if p2cwrite is not None:
p2cwrite = msvcrt.open_osfhandle(p2cwrite.Detach(), 0)
if c2pread is not None:
c2pread = msvcrt.open_osfhandle(c2pread.Detach(), 0)
if errread is not None:
errread = msvcrt.open_osfhandle(errread.Detach(), 0)
if p2cwrite is not None:
self.stdin = FileObject(p2cwrite, 'wb')
if c2pread is not None:
if universal_newlines:
self.stdout = FileObject(c2pread, 'rU')
else:
self.stdout = FileObject(c2pread, 'rb')
if errread is not None:
if universal_newlines:
self.stderr = FileObject(errread, 'rU')
else:
self.stderr = FileObject(errread, 'rb')
def _on_child(self, watcher):
watcher.stop()
self._handle_exitstatus(watcher.rstatus)
self._event.set()
def communicate(self, input=None):
"""Interact with process: Send data to stdin. Read data from
stdout and stderr, until end-of-file is reached. Wait for
process to terminate. The optional input argument should be a
string to be sent to the child process, or None, if no data
should be sent to the child.
communicate() returns a tuple (stdout, stderr)."""
greenlets = []
if self.stdin:
greenlets.append(spawn(write_and_close, self.stdin, input))
if self.stdout:
stdout = spawn(self.stdout.read)
greenlets.append(stdout)
else:
stdout = None
if self.stderr:
stderr = spawn(self.stderr.read)
greenlets.append(stderr)
else:
stderr = None
joinall(greenlets)
if self.stdout:
self.stdout.close()
if self.stderr:
self.stderr.close()
self.wait()
return (None if stdout is None else stdout.value or '',
None if stderr is None else stderr.value or '')
def poll(self):
return self._internal_poll()
if mswindows:
#
# Windows methods
#
def _get_handles(self, stdin, stdout, stderr):
"""Construct and return tuple with IO objects:
p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite
"""
if stdin is None and stdout is None and stderr is None:
return (None, None, None, None, None, None)
p2cread, p2cwrite = None, None
c2pread, c2pwrite = None, None
errread, errwrite = None, None
if stdin is None:
p2cread = _subprocess.GetStdHandle(_subprocess.STD_INPUT_HANDLE)
if p2cread is None:
p2cread, _ = _subprocess.CreatePipe(None, 0)
elif stdin == PIPE:
p2cread, p2cwrite = _subprocess.CreatePipe(None, 0)
elif isinstance(stdin, int):
p2cread = msvcrt.get_osfhandle(stdin)
else:
# Assuming file-like object
p2cread = msvcrt.get_osfhandle(stdin.fileno())
p2cread = self._make_inheritable(p2cread)
if stdout is None:
c2pwrite = _subprocess.GetStdHandle(_subprocess.STD_OUTPUT_HANDLE)
if c2pwrite is None:
_, c2pwrite = _subprocess.CreatePipe(None, 0)
elif stdout == PIPE:
c2pread, c2pwrite = _subprocess.CreatePipe(None, 0)
elif isinstance(stdout, int):
c2pwrite = msvcrt.get_osfhandle(stdout)
else:
# Assuming file-like object
c2pwrite = msvcrt.get_osfhandle(stdout.fileno())
c2pwrite = self._make_inheritable(c2pwrite)
if stderr is None:
errwrite = _subprocess.GetStdHandle(_subprocess.STD_ERROR_HANDLE)
if errwrite is None:
_, errwrite = _subprocess.CreatePipe(None, 0)
elif stderr == PIPE:
errread, errwrite = _subprocess.CreatePipe(None, 0)
elif stderr == STDOUT:
errwrite = c2pwrite
elif isinstance(stderr, int):
errwrite = msvcrt.get_osfhandle(stderr)
else:
# Assuming file-like object
errwrite = msvcrt.get_osfhandle(stderr.fileno())
errwrite = self._make_inheritable(errwrite)
return (p2cread, p2cwrite,
c2pread, c2pwrite,
errread, errwrite)
def _make_inheritable(self, handle):
"""Return a duplicate of handle, which is inheritable"""
return _subprocess.DuplicateHandle(_subprocess.GetCurrentProcess(),
handle, _subprocess.GetCurrentProcess(), 0, 1,
_subprocess.DUPLICATE_SAME_ACCESS)
def _find_w9xpopen(self):
"""Find and return absolut path to w9xpopen.exe"""
w9xpopen = os.path.join(
os.path.dirname(_subprocess.GetModuleFileName(0)),
"w9xpopen.exe")
if not os.path.exists(w9xpopen):
# Eeek - file-not-found - possibly an embedding
# situation - see if we can locate it in sys.exec_prefix
w9xpopen = os.path.join(os.path.dirname(sys.exec_prefix),
"w9xpopen.exe")
if not os.path.exists(w9xpopen):
raise RuntimeError("Cannot locate w9xpopen.exe, which is "
"needed for Popen to work with your "
"shell or platform.")
return w9xpopen
def _execute_child(self, args, executable, preexec_fn, close_fds,
cwd, env, universal_newlines,
startupinfo, creationflags, shell,
p2cread, p2cwrite,
c2pread, c2pwrite,
errread, errwrite):
"""Execute program (MS Windows version)"""
if not isinstance(args, types.StringTypes):
args = list2cmdline(args)
# Process startup details
if startupinfo is None:
startupinfo = STARTUPINFO()
if None not in (p2cread, c2pwrite, errwrite):
startupinfo.dwFlags |= _subprocess.STARTF_USESTDHANDLES
startupinfo.hStdInput = p2cread
startupinfo.hStdOutput = c2pwrite
startupinfo.hStdError = errwrite
if shell:
startupinfo.dwFlags |= _subprocess.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = _subprocess.SW_HIDE
comspec = os.environ.get("COMSPEC", "cmd.exe")
args = '{} /c "{}"'.format (comspec, args)
if (_subprocess.GetVersion() >= 0x80000000 or
os.path.basename(comspec).lower() == "command.com"):
# Win9x, or using command.com on NT. We need to
# use the w9xpopen intermediate program. For more
# information, see KB Q150956
# (http://web.archive.org/web/20011105084002/http://support.microsoft.com/support/kb/articles/Q150/9/56.asp)
w9xpopen = self._find_w9xpopen()
args = '"%s" %s' % (w9xpopen, args)
# Not passing CREATE_NEW_CONSOLE has been known to
# cause random failures on win9x. Specifically a
# dialog: "Your program accessed mem currently in
# use at xxx" and a hopeful warning about the
# stability of your system. Cost is Ctrl+C wont
# kill children.
creationflags |= _subprocess.CREATE_NEW_CONSOLE
# Start the process
try:
hp, ht, pid, tid = _subprocess.CreateProcess(executable, args,
# no special security
None, None,
int(not close_fds),
creationflags,
env,
cwd,
startupinfo)
except pywintypes.error, e:
# Translate pywintypes.error to WindowsError, which is
# a subclass of OSError. FIXME: We should really
# translate errno using _sys_errlist (or similar), but
# how can this be done from Python?
raise WindowsError(*e.args)
finally:
# Child is launched. Close the parent's copy of those pipe
# handles that only the child should have open. You need
# to make sure that no handles to the write end of the
# output pipe are maintained in this process or else the
# pipe will not close when the child process exits and the
# ReadFile will hang.
if p2cread is not None:
p2cread.Close()
if c2pwrite is not None:
c2pwrite.Close()
if errwrite is not None:
errwrite.Close()
# Retain the process handle, but close the thread handle
self._handle = hp
self.pid = pid
ht.Close()
def _internal_poll(self, _deadstate=None,
_WaitForSingleObject=_subprocess.WaitForSingleObject,
_WAIT_OBJECT_0=_subprocess.WAIT_OBJECT_0,
_GetExitCodeProcess=_subprocess.GetExitCodeProcess):
"""Check if child process has terminated. Returns returncode
attribute.
This method is called by __del__, so it can only refer to objects
in its local scope.
"""
if self.returncode is None:
if _WaitForSingleObject(self._handle, 0) == _WAIT_OBJECT_0:
self.returncode = _GetExitCodeProcess(self._handle)
return self.returncode
def wait(self):
"""Wait for child process to terminate. Returns returncode
attribute."""
if self.returncode is None:
self.threadpool.apply_e(BaseException, _subprocess.WaitForSingleObject, (self._handle, _subprocess.INFINITE))
self.returncode = _subprocess.GetExitCodeProcess(self._handle)
return self.returncode
def send_signal(self, sig):
"""Send a signal to the process
"""
if sig == signal.SIGTERM:
self.terminate()
elif sig == signal.CTRL_C_EVENT:
os.kill(self.pid, signal.CTRL_C_EVENT)
elif sig == signal.CTRL_BREAK_EVENT:
os.kill(self.pid, signal.CTRL_BREAK_EVENT)
else:
raise ValueError("Unsupported signal: {}".format(sig))
def terminate(self):
"""Terminates the process
"""
_subprocess.TerminateProcess(self._handle, 1)
kill = terminate
else:
#
# POSIX methods
#
def _get_handles(self, stdin, stdout, stderr):
"""Construct and return tuple with IO objects:
p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite
"""
p2cread, p2cwrite = None, None
c2pread, c2pwrite = None, None
errread, errwrite = None, None
if stdin is None:
pass
elif stdin == PIPE:
p2cread, p2cwrite = os.pipe()
elif isinstance(stdin, int):
p2cread = stdin
else:
# Assuming file-like object
p2cread = stdin.fileno()
if stdout is None:
pass
elif stdout == PIPE:
c2pread, c2pwrite = os.pipe()
elif isinstance(stdout, int):
c2pwrite = stdout
else:
# Assuming file-like object
c2pwrite = stdout.fileno()
if stderr is None:
pass
elif stderr == PIPE:
errread, errwrite = os.pipe()
elif stderr == STDOUT:
errwrite = c2pwrite
elif isinstance(stderr, int):
errwrite = stderr
else:
# Assuming file-like object
errwrite = stderr.fileno()
return (p2cread, p2cwrite,
c2pread, c2pwrite,
errread, errwrite)
def _set_cloexec_flag(self, fd, cloexec=True):
try:
cloexec_flag = fcntl.FD_CLOEXEC
except AttributeError:
cloexec_flag = 1
old = fcntl.fcntl(fd, fcntl.F_GETFD)
if cloexec:
fcntl.fcntl(fd, fcntl.F_SETFD, old | cloexec_flag)
else:
fcntl.fcntl(fd, fcntl.F_SETFD, old & ~cloexec_flag)
def _close_fds(self, but):
if hasattr(os, 'closerange'):
os.closerange(3, but)
os.closerange(but + 1, MAXFD)
else:
for i in xrange(3, MAXFD):
if i == but:
continue
try:
os.close(i)
except:
pass
def _execute_child(self, args, executable, preexec_fn, close_fds,
cwd, env, universal_newlines,
startupinfo, creationflags, shell,
p2cread, p2cwrite,
c2pread, c2pwrite,
errread, errwrite):
"""Execute program (POSIX version)"""
if isinstance(args, types.StringTypes):
args = [args]
else:
args = list(args)
if shell:
args = ["/bin/sh", "-c"] + args
if executable:
args[0] = executable
if executable is None:
executable = args[0]
# For transferring possible exec failure from child to parent
# The first char specifies the exception type: 0 means
# OSError, 1 means some other error.
errpipe_read, errpipe_write = os.pipe()
try:
try:
self._set_cloexec_flag(errpipe_write)
gc_was_enabled = gc.isenabled()
# Disable gc to avoid bug where gc -> file_dealloc ->
# write to stderr -> hang. http://bugs.python.org/issue1336
gc.disable()
try:
self.pid = os.fork()
except:
if gc_was_enabled:
gc.enable()
raise
if self.pid == 0:
# Child
try:
# Close parent's pipe ends
if p2cwrite is not None:
os.close(p2cwrite)
if c2pread is not None:
os.close(c2pread)
if errread is not None:
os.close(errread)
os.close(errpipe_read)
# Dup fds for child
def _dup2(a, b):
# dup2() removes the CLOEXEC flag but
# we must do it ourselves if dup2()
# would be a no-op (issue #10806).
if a == b:
self._set_cloexec_flag(a, False)
elif a is not None:
os.dup2(a, b)
_dup2(p2cread, 0)
_dup2(c2pwrite, 1)
_dup2(errwrite, 2)
# Close pipe fds. Make sure we don't close the
# same fd more than once, or standard fds.
closed = { None }
for fd in [p2cread, c2pwrite, errwrite]:
if fd not in closed and fd > 2:
os.close(fd)
closed.add(fd)
# Close all other fds, if asked for
if close_fds:
self._close_fds(but=errpipe_write)
if cwd is not None:
os.chdir(cwd)
if preexec_fn:
preexec_fn()
if env is None:
os.execvp(executable, args)
else:
os.execvpe(executable, args, env)
except:
exc_type, exc_value, tb = sys.exc_info()
# Save the traceback and attach it to the exception object
exc_lines = traceback.format_exception(exc_type,
exc_value,
tb)
exc_value.child_traceback = ''.join(exc_lines)
os.write(errpipe_write, pickle.dumps(exc_value))
# This exitcode won't be reported to applications, so it
# really doesn't matter what we return.
os._exit(255)
# Parent
self._watcher = self._loop.child(self.pid)
self._watcher.start(self._on_child, self._watcher)
if gc_was_enabled:
gc.enable()
finally:
# be sure the FD is closed no matter what
os.close(errpipe_write)
if p2cread is not None and p2cwrite is not None:
os.close(p2cread)
if c2pwrite is not None and c2pread is not None:
os.close(c2pwrite)
if errwrite is not None and errread is not None:
os.close(errwrite)
# Wait for exec to fail or succeed; possibly raising exception
# Exception limited to 1M
data = FileObject(errpipe_read, 'rb', close=False).read(1048576)
finally:
# be sure the FD is closed no matter what
try:
os.close(errpipe_read)
except EnvironmentError:
pass
if data != "":
self.wait()
child_exception = pickle.loads(data)
for fd in (p2cwrite, c2pread, errread):
if fd is not None:
os.close(fd)
raise child_exception
def _handle_exitstatus(self, sts, _WIFSIGNALED=os.WIFSIGNALED,
_WTERMSIG=os.WTERMSIG, _WIFEXITED=os.WIFEXITED,
_WEXITSTATUS=os.WEXITSTATUS):
# This method is called (indirectly) by __del__, so it cannot
# refer to anything outside of its local scope."""
if _WIFSIGNALED(sts):
self.returncode = -_WTERMSIG(sts)
elif _WIFEXITED(sts):
self.returncode = _WEXITSTATUS(sts)
else:
# Should never happen
raise RuntimeError("Unknown child exit status!")
def _internal_poll(self):
"""Check if child process has terminated. Returns returncode
attribute.
"""
return self.returncode
def wait(self):
"""Wait for child process to terminate. Returns returncode
attribute."""
if self.returncode is None:
self._event.wait()
return self.returncode
def send_signal(self, sig):
"""Send a signal to the process
"""
os.kill(self.pid, sig)
def terminate(self):
"""Terminate the process with SIGTERM
"""
self.send_signal(signal.SIGTERM)
def kill(self):
"""Kill the process with SIGKILL
"""
self.send_signal(signal.SIGKILL)
def write_and_close(fobj, data):
try:
if data:
fobj.write(data)
except (OSError, IOError), ex:
if ex.errno != errno.EPIPE and ex.errno != errno.EINVAL:
raise
finally:
try:
fobj.close()
except EnvironmentError:
pass
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment