Commit 95fd0786 authored by Martín Ferrari's avatar Martín Ferrari

Popen complete!

parent 935ce6a2
......@@ -100,12 +100,12 @@ class Server(object):
s = str(code) + "-" + clean[i] + "\n"
self._fd.write(s)
if self.debug:
sys.stderr.write("Reply: %s" % s)
sys.stderr.write("<ans> %s" % s)
s = str(code) + " " + clean[-1] + "\n"
self._fd.write(s)
if self.debug:
sys.stderr.write("Reply: %s" % s)
sys.stderr.write("<ans> %s" % s)
return
def readline(self):
......@@ -114,6 +114,8 @@ class Server(object):
if not line:
self.closed = True
return None
if self.debug:
sys.stderr.write("<C> %s" % line)
return line.rstrip()
def readchunk(self, size):
......@@ -213,7 +215,7 @@ class Server(object):
func = getattr(self, funcname)
if self.debug:
sys.stderr.write("Command: %s, args: %s\n" % (cmdname, args))
sys.stderr.write("<cmd> %s, args: %s\n" % (cmdname, args))
return (func, cmdname, args)
def run(self):
......@@ -295,6 +297,7 @@ class Server(object):
def do_PROC_RUN(self, cmdname):
try:
# self._proc['close_fds'] = True # forced
chld = netns.subprocess.spawn(**self._proc)
except:
(t, v, tb) = sys.exc_info()
......@@ -515,9 +518,9 @@ class Client(object):
def _b64(text):
text = str(text)
if filter(lambda x: ord(x) > ord(" ") and ord(x) <= ord("z")
and x != "=", text):
return text
else:
if filter(lambda x: ord(x) <= ord(" ") or ord(x) > ord("z")
or x == "=", text):
return "=" + base64.b64encode(text)
else:
return text
#!/usr/bin/env python
# vim:ts=4:sw=4:et:ai:sts=4
import fcntl, grp, os, pickle, pwd, signal, sys, traceback
import fcntl, grp, os, pickle, pwd, signal, select, sys, traceback
def spawn(executable, argv = None, cwd = None, env = None,
stdin = None, stdout = None, stderr = None, user = None):
def spawn(executable, argv = None, cwd = None, env = None, stdin = None,
stdout = None, stderr = None, close_fds = False, user = None):
"""Forks and execs a program, with stdio redirection and user switching.
The program is specified by `executable', if it does not contain any slash,
the PATH environment variable is used to search for the file.
......@@ -27,6 +27,9 @@ def spawn(executable, argv = None, cwd = None, env = None,
Note that the original descriptors are not closed, and that piping should
be handled externally.
When close_fds is True, it closes all file descriptors bigger than 2. It
can also be an iterable of file descriptors to close after fork.
Exceptions occurred while trying to set up the environment or executing the
program are propagated to the parent."""
......@@ -66,10 +69,25 @@ def spawn(executable, argv = None, cwd = None, env = None,
os.dup2(userfd[i], i)
if userfd[i] != i and userfd[i] not in userfd[0:i]:
_eintr_wrapper(os.close, userfd[i]) # only in child!
# Set up special control pipe
_eintr_wrapper(os.close, r)
flags = fcntl.fcntl(w, fcntl.F_GETFD)
fcntl.fcntl(w, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
if close_fds == False:
pass
elif close_fds == True:
for i in xrange(3, MAXFD):
if i != w:
try:
os.close(i)
except:
pass
else:
for i in close_fds:
os.close(i)
if user != None:
# Change user
os.setgid(gid)
......@@ -137,6 +155,9 @@ def wait(pid):
"""Wait for process to die and return the exit code."""
return _eintr_wrapper(os.waitpid, pid, 0)[1]
# User-facing interfaces
class Subprocess(object):
# FIXME: this is the visible interface; documentation should move here.
"""OO-style interface to spawn(), but invoked through the controlling
......@@ -186,7 +207,7 @@ class Subprocess(object):
if os.WIFSIGNALED(self._returncode):
return -os.WTERMSIG(self._returncode)
if os.WIFEXITED(self._returncode):
return os.EXITSTATUS(self._returncode)
return os.WEXITSTATUS(self._returncode)
raise RuntimeError("Invalid return code")
# FIXME: do we have any other way to deal with this than having explicit
......@@ -203,16 +224,16 @@ class Popen(Subprocess):
self.stdin = self.stdout = self.stderr = None
fdmap = { "stdin": stdin, "stdout": stdout, "stderr": stderr }
# if PIPE: all should be closed at the end
for k, v in fdmap:
for k, v in fdmap.items():
if v == None:
continue
if v == PIPE:
r, w = os.pipe()
if k == "stdin":
setattr(self, k, os.fdopen(w, 'wb', bufsize))
self.stdin = os.fdopen(w, 'wb', bufsize)
fdmap[k] = r
else:
setattr(self, k, os.fdopen(w, 'rb', bufsize))
setattr(self, k, os.fdopen(r, 'rb', bufsize))
fdmap[k] = w
elif isinstance(v, int):
pass
......@@ -221,18 +242,69 @@ class Popen(Subprocess):
if stderr == STDOUT:
fdmap['stderr'] = fdmap['stdout']
#print fdmap
super(Popen, self).__init__(node, executable, argv = argv, cwd = cwd,
env = env, stdin = fdmap['stdin'], stdout = fdmap['stdout'],
stderr = fdmap['stderr'], user = user)
# Close pipes, they have been dup()ed to the child
for k, v in fdmap:
for k, v in fdmap.items():
if getattr(self, k) != None:
_eintr_wrapper(os.close, v)
# def comunicate(self, input = None)
#self.universal_newlines = False # compat with subprocess.communicate
# No need to reinvent the wheel
#communicate = subprocess.communicate
#_communicate = subprocess._communicate
def communicate(self, input = None):
# almost verbatim from stdlib version
wset = []
rset = []
err = None
out = None
if self.stdin != None:
self.stdin.flush()
if input:
wset.append(self.stdin)
else:
self.stdin.close()
if self.stdout != None:
rset.append(self.stdout)
out = []
if self.stderr != None:
rset.append(self.stderr)
err = []
offset = 0
while rset or wset:
r, w, x = select.select(rset, wset, [])
if self.stdin in w:
offset += os.write(self.stdin.fileno(),
#buffer(input, offset, select.PIPE_BUF))
buffer(input, offset, 512)) # XXX: py2.7
if offset >= len(input):
self.stdin.close()
wset = []
for i in self.stdout, self.stderr:
if i in r:
d = os.read(i.fileno(), 1024) # No need for eintr wrapper
if d == "":
i.close
rset.remove(i)
else:
if i == self.stdout:
out.append(d)
else:
err.append(d)
if out != None:
out = ''.join(out)
if err != None:
err = ''.join(err)
self.wait()
return (out, err)
# internal stuff, do not look!
def _eintr_wrapper(f, *args):
"Wraps some callable with a loop that retries on EINTR"
......@@ -245,6 +317,12 @@ def _eintr_wrapper(f, *args):
else:
raise
try:
MAXFD = os.sysconf("SC_OPEN_MAX")
except:
MAXFD = 256
# Used to print extra info in nested exceptions
def _custom_hook(t, v, tb):
if hasattr(v, "child_traceback"):
......
......@@ -4,6 +4,8 @@
import netns, netns.subprocess, test_util
import grp, os, pwd, signal, sys, unittest
from netns.subprocess import PIPE, STDOUT, spawn, Subprocess, Popen, wait
def _stat(path):
try:
return os.stat(user)
......@@ -69,79 +71,162 @@ class TestSubprocess(unittest.TestCase):
@test_util.skipUnless(os.getuid() == 0, "Test requires root privileges")
def test_spawn_chuser(self):
user = 'nobody'
pid = netns.subprocess.spawn('/bin/sleep', ['/bin/sleep', '100'],
user = user)
pid = spawn('/bin/sleep', ['/bin/sleep', '100'], user = user)
self._check_ownership(user, pid)
os.kill(pid, signal.SIGTERM)
self.assertEquals(netns.subprocess.wait(pid), signal.SIGTERM)
self.assertEquals(wait(pid), signal.SIGTERM)
@test_util.skipUnless(os.getuid() == 0, "Test requires root privileges")
def test_Subprocess_chuser(self):
node = netns.Node(nonetns = True)
user = 'nobody'
p = netns.subprocess.Subprocess(node, '/bin/sleep',
['/bin/sleep', '1000'], user = user)
p = Subprocess(node, '/bin/sleep', ['/bin/sleep', '1000'], user = user)
self._check_ownership(user, p.pid)
p.signal()
self.assertEquals(p.wait(), -signal.SIGTERM)
def test_spawn_basic(self):
# User does not exist
self.assertRaises(ValueError, netns.subprocess.spawn,
self.assertRaises(ValueError, spawn,
'/bin/sleep', ['/bin/sleep', '1000'], user = self.nouser)
self.assertRaises(ValueError, netns.subprocess.spawn,
self.assertRaises(ValueError, spawn,
'/bin/sleep', ['/bin/sleep', '1000'], user = self.nouid)
# Invalid CWD: it is a file
self.assertRaises(OSError, netns.subprocess.spawn,
'/bin/sleep', cwd = '/bin/sleep')
self.assertRaises(OSError, spawn, '/bin/sleep', cwd = '/bin/sleep')
# Invalid CWD: does not exist
self.assertRaises(OSError, netns.subprocess.spawn,
'/bin/sleep', cwd = self.nofile)
self.assertRaises(OSError, spawn, '/bin/sleep', cwd = self.nofile)
# Exec failure
self.assertRaises(OSError, netns.subprocess.spawn, self.nofile)
self.assertRaises(OSError, spawn, self.nofile)
# Test that the environment is cleared: sleep should not be found
# XXX: This should be a python bug: if I don't set PATH explicitly, it
# uses a default search path
self.assertRaises(OSError, netns.subprocess.spawn,
'sleep', env = {'PATH': ''})
self.assertRaises(OSError, spawn, 'sleep', env = {'PATH': ''})
r, w = os.pipe()
p = netns.subprocess.spawn('/bin/echo', ['echo', 'hello world'],
stdout = w)
p = spawn('/bin/echo', ['echo', 'hello world'], stdout = w)
os.close(w)
self.assertEquals(_readall(r), "hello world\n")
os.close(r)
r0, w0 = os.pipe()
r1, w1 = os.pipe()
p = spawn('/bin/cat', stdout = w0, stdin = r1, close_fds = [r0, w1])
os.close(w0)
os.close(r1)
os.write(w1, "hello world\n")
os.close(w1)
self.assertEquals(_readall(r0), "hello world\n")
os.close(r0)
def test_Subprocess_basic(self):
node = netns.Node(nonetns = True) #, debug = True)
node = netns.Node(nonetns = True)
# User does not exist
self.assertRaises(RuntimeError, netns.subprocess.Subprocess, node,
self.assertRaises(RuntimeError, Subprocess, node,
'/bin/sleep', ['/bin/sleep', '1000'], user = self.nouser)
self.assertRaises(RuntimeError, netns.subprocess.Subprocess, node,
self.assertRaises(RuntimeError, Subprocess, node,
'/bin/sleep', ['/bin/sleep', '1000'], user = self.nouid)
# Invalid CWD: it is a file
self.assertRaises(RuntimeError, netns.subprocess.Subprocess, node,
self.assertRaises(RuntimeError, Subprocess, node,
'/bin/sleep', cwd = '/bin/sleep')
# Invalid CWD: does not exist
self.assertRaises(RuntimeError, netns.subprocess.Subprocess, node,
self.assertRaises(RuntimeError, Subprocess, node,
'/bin/sleep', cwd = self.nofile)
# Exec failure
self.assertRaises(RuntimeError, netns.subprocess.Subprocess, node,
self.nofile)
self.assertRaises(RuntimeError, Subprocess, node, self.nofile)
# Test that the environment is cleared: sleep should not be found
# XXX: This should be a python bug: if I don't set PATH explicitly, it
# uses a default search path
self.assertRaises(RuntimeError, netns.subprocess.Subprocess, node,
self.assertRaises(RuntimeError, Subprocess, node,
'sleep', env = {'PATH': ''})
# FIXME: tests fds
#import pdb; pdb.set_trace()
r, w = os.pipe()
p = netns.subprocess.Subprocess(node, '/bin/echo',
['echo', 'hello world'], stdout = w)
p = Subprocess(node, '/bin/echo', ['echo', 'hello world'], stdout = w)
os.close(w)
self.assertEquals(_readall(r), "hello world\n")
os.close(r)
def test_Popen(self):
node = netns.Node(nonetns = True, debug=0)
# repeat test with Popen interface
r0, w0 = os.pipe()
r1, w1 = os.pipe()
p = Popen(node, '/bin/cat', stdout = w0, stdin = r1)
os.close(w0)
os.close(r1)
os.write(w1, "hello world\n")
os.close(w1)
self.assertEquals(_readall(r0), "hello world\n")
os.close(r0)
# pipes
p = Popen(node, '/bin/cat', stdin = PIPE, stdout = PIPE)
p.stdin.write("hello world\n")
p.stdin.close()
self.assertEquals(p.stdout.readlines(), ["hello world\n"])
self.assertEquals(p.stderr, None)
self.assertEquals(p.wait(), 0)
p = Popen(node, '/bin/cat', stdin = PIPE, stdout = PIPE)
self.assertEquals(p.communicate("hello world\n"),
("hello world\n", None))
#
p = Popen(node, '/bin/sh', ['sh', '-c', 'cat >&2'],
stdin = PIPE, stderr = PIPE)
p.stdin.write("hello world\n")
p.stdin.close()
self.assertEquals(p.stderr.readlines(), ["hello world\n"])
self.assertEquals(p.stdout, None)
self.assertEquals(p.wait(), 0)
p = Popen(node, '/bin/sh', ['sh', '-c', 'cat >&2'],
stdin = PIPE, stderr = PIPE)
self.assertEquals(p.communicate("hello world\n"),
(None, "hello world\n"))
#
p = Popen(node, '/bin/sh', ['sh', '-c', 'cat >&2'],
stdin = PIPE, stdout = PIPE, stderr = STDOUT)
p.stdin.write("hello world\n")
p.stdin.close()
self.assertEquals(p.stdout.readlines(), ["hello world\n"])
self.assertEquals(p.stderr, None)
self.assertEquals(p.wait(), 0)
p = Popen(node, '/bin/sh', ['sh', '-c', 'cat >&2'],
stdin = PIPE, stdout = PIPE, stderr = STDOUT)
self.assertEquals(p.communicate("hello world\n"),
("hello world\n", None))
#
p = Popen(node, 'tee', ['tee', '/dev/stderr'],
stdin = PIPE, stdout = PIPE, stderr = STDOUT)
p.stdin.write("hello world\n")
p.stdin.close()
self.assertEquals(p.stdout.readlines(), ["hello world\n"] * 2)
self.assertEquals(p.stderr, None)
self.assertEquals(p.wait(), 0)
p = Popen(node, 'tee', ['tee', '/dev/stderr'],
stdin = PIPE, stdout = PIPE, stderr = STDOUT)
self.assertEquals(p.communicate("hello world\n"),
("hello world\n" * 2, None))
#
p = Popen(node, 'tee', ['tee', '/dev/stderr'],
stdin = PIPE, stdout = PIPE, stderr = PIPE)
p.stdin.write("hello world\n")
p.stdin.close()
self.assertEquals(p.stdout.readlines(), ["hello world\n"])
self.assertEquals(p.stderr.readlines(), ["hello world\n"])
self.assertEquals(p.wait(), 0)
p = Popen(node, 'tee', ['tee', '/dev/stderr'],
stdin = PIPE, stdout = PIPE, stderr = PIPE)
self.assertEquals(p.communicate("hello world\n"),
("hello world\n",) * 2)
# FIXME: tests for Popen!
if __name__ == '__main__':
unittest.main()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment