"""Test suite for zdrun.py.""" import os import sys import time import signal import tempfile import unittest import socket from StringIO import StringIO import ZConfig from zdaemon import zdrun, zdctl class ConfiguredOptions: """Options class that loads configuration from a specified string. This always loads from the string, regardless of any -C option that may be given. """ def set_configuration(self, configuration): self.__configuration = configuration self.configfile = "<preloaded string>" def load_configfile(self): sio = StringIO(self.__configuration) cfg = ZConfig.loadConfigFile(self.schema, sio, self.zconfig_options) self.configroot, self.confighandlers = cfg class ConfiguredZDRunOptions(ConfiguredOptions, zdrun.ZDRunOptions): def __init__(self, configuration): zdrun.ZDRunOptions.__init__(self) self.set_configuration(configuration) class ZDaemonTests(unittest.TestCase): python = os.path.abspath(sys.executable) assert os.path.exists(python) here = os.path.abspath(os.path.dirname(__file__)) assert os.path.isdir(here) nokill = os.path.join(here, "nokill.py") assert os.path.exists(nokill) parent = os.path.dirname(here) zdrun = os.path.join(parent, "zdrun.py") assert os.path.exists(zdrun) ppath = os.pathsep.join(sys.path) def setUp(self): self.zdsock = tempfile.mktemp() self.new_stdout = StringIO() self.save_stdout = sys.stdout sys.stdout = self.new_stdout self.expect = "" def tearDown(self): sys.stdout = self.save_stdout for sig in (signal.SIGTERM, signal.SIGHUP, signal.SIGINT, signal.SIGCHLD): signal.signal(sig, signal.SIG_DFL) try: os.unlink(self.zdsock) except os.error: pass output = self.new_stdout.getvalue() self.assertEqual(self.expect, output) def quoteargs(self, args): for i in range(len(args)): if " " in args[i]: args[i] = '"%s"' % args[i] return " ".join(args) def rundaemon(self, args): # Add quotes, in case some pathname contains spaces (e.g. Mac OS X) args = self.quoteargs(args) cmd = ('PYTHONPATH="%s" "%s" "%s" -d -s "%s" %s' % (self.ppath, self.python, self.zdrun, self.zdsock, args)) os.system(cmd) # When the daemon crashes, the following may help debug it: ##os.system("PYTHONPATH=%s %s %s -s %s %s &" % ## (self.ppath, self.python, self.zdrun, self.zdsock, args)) def _run(self, args, cmdclass=None): if type(args) is type(""): args = args.split() try: zdctl.main(["-s", self.zdsock] + args, cmdclass=cmdclass) except SystemExit: pass def testCmdclassOverride(self): class MyCmd(zdctl.ZDCmd): def do_sproing(self, rest): print rest self._run("-p echo sproing expected", cmdclass=MyCmd) self.expect = "expected\n" def testSystem(self): self.rundaemon(["echo", "-n"]) self.expect = "" ## def testInvoke(self): ## self._run("echo -n") ## self.expect = "" ## def testControl(self): ## self.rundaemon(["sleep", "1000"]) ## time.sleep(1) ## self._run("stop") ## time.sleep(1) ## self._run("exit") ## self.expect = "Sent SIGTERM\nExiting now\n" ## def testStop(self): ## self.rundaemon([self.python, self.nokill]) ## time.sleep(1) ## self._run("stop") ## time.sleep(1) ## self._run("exit") ## self.expect = "Sent SIGTERM\nSent SIGTERM; will exit later\n" def testHelp(self): self._run("-h") import __main__ self.expect = __main__.__doc__ def testOptionsSysArgv(self): # Check that options are parsed from sys.argv by default options = zdrun.ZDRunOptions() save_sys_argv = sys.argv try: sys.argv = ["A", "B", "C"] options.realize() finally: sys.argv = save_sys_argv self.assertEqual(options.options, []) self.assertEqual(options.args, ["B", "C"]) def testOptionsBasic(self): # Check basic option parsing options = zdrun.ZDRunOptions() options.realize(["B", "C"], "foo") self.assertEqual(options.options, []) self.assertEqual(options.args, ["B", "C"]) self.assertEqual(options.progname, "foo") def testOptionsHelp(self): # Check that -h behaves properly options = zdrun.ZDRunOptions() try: options.realize(["-h"], doc=zdrun.__doc__) except SystemExit, err: self.failIf(err.code) else: self.fail("SystemExit expected") self.expect = zdrun.__doc__ def testSubprocessBasic(self): # Check basic subprocess management: spawn, kill, wait options = zdrun.ZDRunOptions() options.realize(["sleep", "100"]) proc = zdrun.Subprocess(options) self.assertEqual(proc.pid, 0) pid = proc.spawn() self.assertEqual(proc.pid, pid) msg = proc.kill(signal.SIGTERM) self.assertEqual(msg, None) wpid, wsts = os.waitpid(pid, 0) self.assertEqual(wpid, pid) self.assertEqual(os.WIFSIGNALED(wsts), 1) self.assertEqual(os.WTERMSIG(wsts), signal.SIGTERM) proc.setstatus(wsts) self.assertEqual(proc.pid, 0) def testEventlogOverride(self): # Make sure runner.eventlog is used if it exists options = ConfiguredZDRunOptions("""\ <runner> program /bin/true <eventlog> level 42 </eventlog> </runner> <eventlog> level 35 </eventlog> """) options.realize(["/bin/true"]) self.assertEqual(options.config_logger.level, 42) def testEventlogWithoutOverride(self): # Make sure eventlog is used if runner.eventlog doesn't exist options = ConfiguredZDRunOptions("""\ <runner> program /bin/true </runner> <eventlog> level 35 </eventlog> """) options.realize(["/bin/true"]) self.assertEqual(options.config_logger.level, 35) def testRunIgnoresParentSignals(self): # Spawn a process which will in turn spawn a zdrun process. # We make sure that the zdrun process is still running even if # its parent process receives an interrupt signal (it should # not be passed to zdrun). zdrun_socket = os.path.join(self.here, 'testsock') zdctlpid = os.spawnvp( os.P_NOWAIT, sys.executable, [sys.executable, os.path.join(self.here, 'parent.py')] ) # Wait for it to start, but no longer than a minute. deadline = time.time() + 60 is_started = False while time.time() < deadline: response = send_action('status\n', zdrun_socket) if response is None: time.sleep(0.05) else: is_started = True break self.assert_(is_started, "spawned process failed to start in a minute") # Kill it, and wait a little to ensure it's dead. os.kill(zdctlpid, signal.SIGINT) time.sleep(0.25) # Make sure the child is still responsive. response = send_action('status\n', zdrun_socket) self.assert_(response is not None and '\n' in response) # Kill the process. send_action('exit\n', zdrun_socket) def testUmask(self): path = tempfile.mktemp() # With umask 666, we should create a file that we aren't able # to write. If access says no, assume that umask works. try: touch_cmd = "/bin/touch" if not os.path.exists(touch_cmd): touch_cmd = "/usr/bin/touch" # Mac OS X self.rundaemon(["-m", "666", touch_cmd, path]) for i in range(5): if not os.path.exists(path): time.sleep(0.1) self.assert_(os.path.exists(path)) self.assert_(not os.access(path, os.W_OK)) finally: if os.path.exists(path): os.remove(path) def send_action(action, sockname): """Send an action to the zdrun server and return the response. Return None if the server is not up or any other error happened. """ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) try: sock.connect(sockname) sock.send(action + "\n") sock.shutdown(1) # We're not writing any more response = "" while 1: data = sock.recv(1000) if not data: break response += data sock.close() return response except socket.error, msg: return None def test_suite(): suite = unittest.TestSuite() if os.name == "posix": suite.addTest(unittest.makeSuite(ZDaemonTests)) return suite if __name__ == '__main__': __file__ = sys.argv[0] unittest.main(defaultTest='test_suite')