Commit 7f75ea32 authored by Martín Ferrari's avatar Martín Ferrari

Some fixes; better debugging; better handling of errors in Slave

parent 1ede6065
...@@ -6,7 +6,7 @@ try: ...@@ -6,7 +6,7 @@ try:
from yaml import CDumper as Dumper from yaml import CDumper as Dumper
except ImportError: except ImportError:
from yaml import Loader, Dumper from yaml import Loader, Dumper
import base64, os, passfd, re, signal, socket, sys, unshare, yaml import base64, os, passfd, re, signal, socket, sys, traceback, unshare, yaml
import netns.subprocess import netns.subprocess
# ============================================================================ # ============================================================================
...@@ -64,25 +64,25 @@ class Server(object): ...@@ -64,25 +64,25 @@ class Server(object):
"""Class that implements the communication protocol and dispatches calls to """Class that implements the communication protocol and dispatches calls to
the required functions. Also works as the main loop for the slave the required functions. Also works as the main loop for the slave
process.""" process."""
def __init__(self, fd): def __init__(self, fd, debug = False):
# Dictionary of valid commands # Dictionary of valid commands
self.commands = _proto_commands self.commands = _proto_commands
# Flag to stop the server # Flag to stop the server
self.closed = False self.closed = False
# Print debug info # Print debug info
self.debug = True self.debug = debug
# Dictionary to keep track of started processes # Dictionary to keep track of started processes
self._children = dict() self._children = dict()
# Buffer and flag for PROC mode # Buffer and flag for PROC mode
self._proc = None self._proc = None
if hasattr(fd, "readline"): if hasattr(fd, "readline"):
self.f = fd self._fd = fd
else: else:
if hasattr(fd, "makefile"): if hasattr(fd, "makefile"):
self.f = fd.makefile("r+", 1) # line buffered self._fd = fd.makefile("r+", 1) # line buffered
else: else:
self.f = os.fdopen(fd, "r+", 1) self._fd = os.fdopen(fd, "r+", 1)
def reply(self, code, text): def reply(self, code, text):
"Send back a reply to the client; handle multiline messages" "Send back a reply to the client; handle multiline messages"
...@@ -93,13 +93,20 @@ class Server(object): ...@@ -93,13 +93,20 @@ class Server(object):
for i in text: for i in text:
clean.extend(i.splitlines()) clean.extend(i.splitlines())
for i in range(len(clean) - 1): for i in range(len(clean) - 1):
self.f.write(str(code) + "-" + clean[i] + "\n") s = str(code) + "-" + clean[i] + "\n"
self.f.write(str(code) + " " + clean[-1] + "\n") self._fd.write(s)
if self.debug:
sys.stderr.write("Reply: %s" % s)
s = str(code) + " " + clean[-1] + "\n"
self._fd.write(s)
if self.debug:
sys.stderr.write("Reply: %s" % s)
return return
def readline(self): def readline(self):
"Read a line from the socket and detect connection break-up." "Read a line from the socket and detect connection break-up."
line = self.f.readline() line = self._fd.readline()
if not line: if not line:
self.closed = True self.closed = True
return None return None
...@@ -111,7 +118,7 @@ class Server(object): ...@@ -111,7 +118,7 @@ class Server(object):
res = "" res = ""
while True: while True:
line = self.f.readline() line = self._fd.readline()
if not line: if not line:
self.closed = True self.closed = True
return None return None
...@@ -198,6 +205,8 @@ class Server(object): ...@@ -198,6 +205,8 @@ class Server(object):
j += 1 j += 1
func = getattr(self, funcname) func = getattr(self, funcname)
if self.debug:
sys.stderr.write("Command: %s, args: %s\n" % (cmdname, args))
return (func, cmdname, args) return (func, cmdname, args)
def run(self): def run(self):
...@@ -208,11 +217,9 @@ class Server(object): ...@@ -208,11 +217,9 @@ class Server(object):
cmd = self.readcmd() cmd = self.readcmd()
if cmd == None: if cmd == None:
continue continue
if self.debug:
sys.stderr.write("Command: %s, args: %s\n" % (cmd[1], cmd[2]))
cmd[0](cmd[1], *cmd[2]) cmd[0](cmd[1], *cmd[2])
try: try:
self.f.close() self._fd.close()
except: except:
pass pass
# FIXME: cleanup # FIXME: cleanup
...@@ -257,16 +264,25 @@ class Server(object): ...@@ -257,16 +264,25 @@ class Server(object):
self.reply(354, self.reply(354,
"Pass the file descriptor now, with '%s\\n' as payload." % "Pass the file descriptor now, with '%s\\n' as payload." %
cmdname) cmdname)
if cmdname == 'PROC SIN':
mode = 'r'
else:
mode = 'w'
try: try:
fd, payload = passfd.recvfd(len(cmdname) + 1) fd, payload = passfd.recvfd(self._fd, len(cmdname) + 1, mode)
assert payload[0:len(cmdname)] == cmdname except BaseException, e: # FIXME
except: self.reply(500, "Error receiving FD: %s" % str(e))
self.reply(500, "Invalid FD or payload.")
raise
return return
if payload[0:len(cmdname)] != cmdname:
self.reply(500, "Invalid payload: %s." % payload)
return
m = {'PROC SIN': 'stdin', 'PROC SOUT': 'stdout', 'PROC SERR': 'stderr'} m = {'PROC SIN': 'stdin', 'PROC SOUT': 'stdout', 'PROC SERR': 'stderr'}
self._proc[m[cmdname]] = fd self._proc[m[cmdname]] = fd
self.reply(200, 'FD saved as %d.' % m[cmdname]) self.reply(200, 'FD saved as %s.' % m[cmdname])
# Same code for the three commands # Same code for the three commands
do_PROC_SOUT = do_PROC_SERR = do_PROC_SIN do_PROC_SOUT = do_PROC_SERR = do_PROC_SIN
...@@ -274,7 +290,7 @@ class Server(object): ...@@ -274,7 +290,7 @@ class Server(object):
def do_PROC_RUN(self, cmdname): def do_PROC_RUN(self, cmdname):
try: try:
chld = netns.subprocess.Subprocess(**self._proc) chld = netns.subprocess.Subprocess(**self._proc)
except BaseException, e: except BaseException, e: # FIXME
self.reply(500, "Failure starting process: %s" % str(e)) self.reply(500, "Failure starting process: %s" % str(e))
self._proc = None self._proc = None
self.commands = _proto_commands self.commands = _proto_commands
...@@ -333,7 +349,7 @@ class Server(object): ...@@ -333,7 +349,7 @@ class Server(object):
# Client-side protocol implementation, and slave process creation # Client-side protocol implementation, and slave process creation
# #
# Handle the creation of the child; parent gets (fd, pid), child never returns # Handle the creation of the child; parent gets (fd, pid), child never returns
def _start_child(): def _start_child(debug = False):
# Create socket pair to communicate # Create socket pair to communicate
(s0, s1) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM, 0) (s0, s1) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM, 0)
# Spawn a child that will run in a loop # Spawn a child that will run in a loop
...@@ -344,31 +360,29 @@ def _start_child(): ...@@ -344,31 +360,29 @@ def _start_child():
try: try:
s0.close() s0.close()
srv = Server(s1) srv = Server(s1, debug)
#unshare.unshare(unshare.CLONE_NEWNET) #unshare.unshare(unshare.CLONE_NEWNET)
srv.run()
except BaseException, e: except BaseException, e:
s = "Slave node aborting: %s\n" % str(e) s = "Slave node aborting: %s\n" % str(e)
sep = "=" * 70 + "\n"
sys.stderr.write(s + sep)
traceback.print_exc(file=sys.stdout)
sys.stderr.write(sep)
try: try:
# try to pass the error to parent, if possible # try to pass the error to parent, if possible
s1.send("500 " + s) s1.send("500 " + s)
except: except:
pass pass
sys.stderr.write(s)
os._exit(1) os._exit(1)
# Try block just in case... os._exit(0)
try:
srv.run()
except:
os._exit(1)
else:
os._exit(0)
# NOTREACHED # NOTREACHED
class Slave(object): class Slave(object):
"""Class to create and manage slave processes; it is at the same time a """Class to create and manage slave processes; it is at the same time a
client implementation for the communication protocol.""" client implementation for the communication protocol."""
def __init__(self, fd = None, pid = None): def __init__(self, debug = False, fd = None, pid = None):
"""When called without arguments, it will fork, create a new network """When called without arguments, it will fork, create a new network
namespace and enter a loop to serve requests from the master. The namespace and enter a loop to serve requests from the master. The
parent process will return an object which is used to control the slave parent process will return an object which is used to control the slave
...@@ -387,7 +401,7 @@ class Slave(object): ...@@ -387,7 +401,7 @@ class Slave(object):
else: else:
fd = os.fdopen(fd, "r+", 1) fd = os.fdopen(fd, "r+", 1)
else: else:
f, pid = _start_child() f, pid = _start_child(debug)
fd = f.makefile("r+", 1) # line buffered fd = f.makefile("r+", 1) # line buffered
self._pid = pid self._pid = pid
...@@ -435,7 +449,7 @@ class Slave(object): ...@@ -435,7 +449,7 @@ class Slave(object):
"Pass a file descriptor" "Pass a file descriptor"
self._send_cmd("PROC", type) self._send_cmd("PROC", type)
self._read_and_check_reply(3) self._read_and_check_reply(3)
passfd.sendfd(self._fd.fileno(), fd, "PROC " + type) passfd.sendfd(self._fd, fd, "PROC " + type)
self._read_and_check_reply() self._read_and_check_reply()
def popen(self, uid, gid, file, argv = None, cwd = None, env = None, def popen(self, uid, gid, file, argv = None, cwd = None, env = None,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment