Commit db2d4929 authored by Xavier Thompson's avatar Xavier Thompson

SlapPopen: Improve logging and fix stderr and timeouts

See merge request nexedi/slapos.core!458
parents b0100fa6 4bdf69f8
...@@ -59,7 +59,11 @@ from slapos import manager as slapmanager ...@@ -59,7 +59,11 @@ from slapos import manager as slapmanager
from slapos.slap.slap import NotFoundError from slapos.slap.slap import NotFoundError
from slapos.slap.slap import ServerError from slapos.slap.slap import ServerError
from slapos.slap.slap import COMPUTER_PARTITION_REQUEST_LIST_TEMPLATE_FILENAME from slapos.slap.slap import COMPUTER_PARTITION_REQUEST_LIST_TEMPLATE_FILENAME
from slapos.util import mkdir_p, chownDirectory, string_to_boolean, listifdir from slapos.util import (mkdir_p,
chownDirectory,
string_to_boolean,
listifdir,
unicode2str)
from slapos.grid.exception import BuildoutFailedError from slapos.grid.exception import BuildoutFailedError
from slapos.grid.SlapObject import Software, Partition from slapos.grid.SlapObject import Software, Partition
from slapos.grid.svcbackend import (launchSupervisord, from slapos.grid.svcbackend import (launchSupervisord,
...@@ -68,7 +72,6 @@ from slapos.grid.svcbackend import (launchSupervisord, ...@@ -68,7 +72,6 @@ from slapos.grid.svcbackend import (launchSupervisord,
_getSupervisordSocketPath) _getSupervisordSocketPath)
from slapos.grid.utils import (md5digest, from slapos.grid.utils import (md5digest,
dropPrivileges, dropPrivileges,
killProcessTree,
SlapPopen, SlapPopen,
updateFile) updateFile)
from slapos.grid.promise import PromiseLauncher, PromiseError from slapos.grid.promise import PromiseLauncher, PromiseError
...@@ -738,28 +741,22 @@ stderr_logfile_backups=1 ...@@ -738,28 +741,22 @@ stderr_logfile_backups=1
os.dup2(1, 2) os.dup2(1, 2)
dropPrivileges(uid, gid, logger=self.logger) dropPrivileges(uid, gid, logger=self.logger)
os.dup2(err, 2) os.dup2(err, 2)
try: process = SlapPopen(
process = SlapPopen( command,
command, preexec_fn=preexec_fn,
preexec_fn=preexec_fn, cwd=instance_path,
cwd=instance_path, universal_newlines=True,
universal_newlines=True, stdout=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
stderr=subprocess.PIPE, logger=self.logger,
logger=self.logger, timeout=timeout,
timeout=timeout, )
) if process.returncode == 2:
stderr = process.stderr.read() raise PromiseError(unicode2str(process.error))
if process.returncode == 2: elif process.returncode:
raise PromiseError(stderr) raise Exception(unicode2str(process.error))
elif process.returncode: elif process.error:
raise Exception(stderr) self.logger.warn('Unexpected promise runner output:\n%s', process.error)
elif stderr:
self.logger.warn('Unexpected promise runner output:\n%s', stderr)
except subprocess.TimeoutExpired:
killProcessTree(process.pid, self.logger)
# If this happens, it might be that the timeout margin is too small.
raise Exception('Promise runner timed out')
else: else:
return PromiseLauncher(config=promise_config, logger=self.logger).run() return PromiseLauncher(config=promise_config, logger=self.logger).run()
......
...@@ -33,9 +33,9 @@ import hashlib ...@@ -33,9 +33,9 @@ import hashlib
import os import os
import pkg_resources import pkg_resources
import pwd import pwd
import select
import stat import stat
import sys import sys
import threading
import logging import logging
import psutil import psutil
import time import time
...@@ -93,24 +93,27 @@ LOCALE_ENVIRONMENT_REMOVE_LIST = [ ...@@ -93,24 +93,27 @@ LOCALE_ENVIRONMENT_REMOVE_LIST = [
] ]
def logAndAccumulateOutput(process_stdout, buffer, logger): class LineLogger(object):
"""Read process output and place the output in `buffer`, logging the lines
one by one as they are emitted.
""" """
current_output = '' Logger that takes chunks cut arbitrarily and logs them back line by line.
while 1: """
data = os.read(process_stdout.fileno(), 4096) def __init__(self, logger):
if not data: self.logger = logger
return self.current = ''
data = data.decode('utf-8', 'replace')
buffer.append(data) def update(self, data):
current_output += data lines = (self.current + data).splitlines()
for current_output_line in current_output.splitlines(True): self.current = lines.pop()
if current_output_line.endswith('\n'): for line in lines:
logger.info(current_output_line.rstrip('\n')) self.logger.info(line)
current_output = '' if data.endswith('\n'):
else: self.logger.info(self.current)
current_output = current_output_line self.current = ''
def flush(self):
if self.current:
self.logger.info(self.current)
self.current = ''
class SlapPopen(subprocess.Popen): class SlapPopen(subprocess.Popen):
...@@ -145,24 +148,67 @@ class SlapPopen(subprocess.Popen): ...@@ -145,24 +148,67 @@ class SlapPopen(subprocess.Popen):
if debug: if debug:
self.wait() self.wait()
self.output = '(output not captured in debug mode)' self.output = '(output not captured in debug mode)'
self.error = '(error not captured in debug mode)'
return return
self.stdin.flush() self.stdin.flush()
self.stdin.close() self.stdin.close()
self.stdin = None self.stdin = None
output_lines = [] stderr_fileno = stdout_fileno = None
buffers = {}
if kwargs['stdout'] is subprocess.PIPE:
line_logger = LineLogger(logger)
stdout_fileno = self.stdout.fileno()
buffers[stdout_fileno] = []
if kwargs['stderr'] is subprocess.PIPE:
stderr_fileno = self.stderr.fileno()
buffers[stderr_fileno] = []
poll = select.poll()
for fd in buffers:
poll.register(fd)
active = len(buffers)
if timeout is not None:
deadline = time.time() + timeout
while active:
for fd, _ in poll.poll(timeout):
data = os.read(fd, 4096).decode('utf-8', 'replace')
if data:
if fd == stdout_fileno:
line_logger.update(data)
buffers[fd].append(data)
else:
if fd == stdout_fileno:
line_logger.flush()
poll.unregister(fd)
active -= 1
if timeout is not None:
timeout = deadline - time.time()
if timeout <= 0:
timeout = 0
break
# BBB: reading output in a separate thread is not needed on python 3,
# iterating on self.stdout seems enough.
t = threading.Thread(
target=logAndAccumulateOutput,
args=(self.stdout, output_lines, logger))
t.start()
try: try:
self.wait(timeout=timeout) self.wait(timeout=timeout)
except subprocess.TimeoutExpired as e:
for p in killProcessTree(self.pid, logger):
p.wait(timeout=10) # arbitrary timeout, wait until process is killed
self.poll() # set returncode (and avoid still-running warning)
e.output = e.stdout = ''.join(buffers.get(stdout_fileno, ()))
e.stderr = ''.join(buffers.get(stderr_fileno, ()))
raise
finally: finally:
t.join() for s in (self.stdout, self.stderr):
self.output = ''.join(output_lines) if s:
try:
s.close()
except OSError:
pass
self.output = ''.join(buffers.get(stdout_fileno, ()))
self.error = ''.join(buffers.get(stderr_fileno, ()))
def md5digest(url): def md5digest(url):
...@@ -449,7 +495,7 @@ def killProcessTree(pid, logger): ...@@ -449,7 +495,7 @@ def killProcessTree(pid, logger):
process = psutil.Process(pid) process = psutil.Process(pid)
process.suspend() process.suspend()
except psutil.Error: except psutil.Error:
return return ()
process_list = [process] process_list = [process]
running_process_list = process.children(recursive=True) running_process_list = process.children(recursive=True)
...@@ -469,3 +515,5 @@ def killProcessTree(pid, logger): ...@@ -469,3 +515,5 @@ def killProcessTree(pid, logger):
process.kill() process.kill()
except psutil.Error as e: except psutil.Error as e:
logger.debug("Process kill: %s" % e) logger.debug("Process kill: %s" % e)
return process_list
...@@ -27,12 +27,18 @@ ...@@ -27,12 +27,18 @@
from __future__ import unicode_literals from __future__ import unicode_literals
import logging import logging
import os import os
import subprocess import psutil
import sys import sys
import tempfile import tempfile
import textwrap import textwrap
import time
import unittest import unittest
if sys.version_info >= (3,):
import subprocess
else:
import subprocess32 as subprocess
import mock import mock
import slapos.grid.utils import slapos.grid.utils
...@@ -169,6 +175,232 @@ class SlapPopenTestCase(unittest.TestCase): ...@@ -169,6 +175,232 @@ class SlapPopenTestCase(unittest.TestCase):
for fd in (child_stdin_r, child_stdout_r, child_stdout_w, stdin_backup, stdout_backup): for fd in (child_stdin_r, child_stdout_r, child_stdout_w, stdin_backup, stdout_backup):
os.close(fd) os.close(fd)
def test_stderr(self):
self.script.write(textwrap.dedent("""\
#!/bin/sh
>&2 echo "hello"
exit 123
""").encode())
self.script.close()
logger = mock.MagicMock()
program = slapos.grid.utils.SlapPopen(
self.script.name,
stdout=None,
stderr=subprocess.PIPE,
logger=logger)
# error code, and error output are returned
self.assertEqual(123, program.returncode)
self.assertEqual('hello\n', program.error)
self.assertEqual('', program.output)
# no output, nothing is logged "live"
self.assertFalse(logger.info.called)
def test_stdout_and_stderr(self):
self.script.write(textwrap.dedent("""\
#!/bin/sh
echo "hello"
>&2 echo "world"
exit 123
""").encode())
self.script.close()
logger = mock.MagicMock()
program = slapos.grid.utils.SlapPopen(
self.script.name,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
logger=logger)
# error code, stderr and stdout are returned
self.assertEqual(123, program.returncode)
self.assertEqual('hello\n', program.output)
self.assertEqual('world\n', program.error)
# only stdout is logged
logger.info.assert_called_once_with('hello')
def test_timeout_stdout_multiline(self):
self.script.write(textwrap.dedent("""\
#!/bin/sh
for i in $(seq 100)
do
echo .
sleep 0.1
done
""").encode())
self.script.close()
logger = mock.MagicMock()
start = time.time()
with self.assertRaises(subprocess.TimeoutExpired) as cm:
program = slapos.grid.utils.SlapPopen(
self.script.name,
timeout=1,
logger=logger)
# the timeout was respected
elapsed = time.time() - start
self.assertLess(elapsed, 5)
self.assertGreaterEqual(elapsed, 1)
# the output before timeout is captured
self.assertEqual(cm.exception.output, '.\n' * 10)
# each line before timeout is logged "live" as well
self.assertEqual(logger.info.call_args_list, [mock.call('.')] * 10)
def test_timeout_stdout_oneline(self):
self.script.write(textwrap.dedent("""\
#!/bin/sh
for i in $(seq 100)
do
echo -n .
sleep 0.1
done
""").encode())
self.script.close()
logger = mock.MagicMock()
start = time.time()
with self.assertRaises(subprocess.TimeoutExpired) as cm:
program = slapos.grid.utils.SlapPopen(
self.script.name,
timeout=1,
logger=logger)
# the timeout was respected
elapsed = time.time() - start
self.assertLess(elapsed, 5)
self.assertGreaterEqual(elapsed, 1)
# the output before timeout is captured
self.assertEqual(cm.exception.output, '.' * 10)
# endline is never reached, so nothing is logged "live"
self.assertFalse(logger.info.called)
def test_timeout_stdout_and_stderr(self):
self.script.write(textwrap.dedent("""\
#!/bin/sh
for i in $(seq 100)
do
>&2 echo -n -
echo -n .
sleep 0.1
done
""").encode())
self.script.close()
logger = mock.MagicMock()
start = time.time()
with self.assertRaises(subprocess.TimeoutExpired) as cm:
program = slapos.grid.utils.SlapPopen(
self.script.name,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=1,
logger=logger)
# the timeout was respected
elapsed = time.time() - start
self.assertLess(elapsed, 5)
self.assertGreaterEqual(elapsed, 1)
# the output before timeout is captured
self.assertEqual(cm.exception.output, '.' * 10)
self.assertEqual(cm.exception.stderr, '-' * 10)
# endline is never reached, so nothing is logged "live"
self.assertFalse(logger.info.called)
def test_timeout_no_stdout_no_stderr(self):
self.script.write(b'#!/bin/sh\nsleep 20')
self.script.close()
logger = mock.MagicMock()
start = time.time()
with self.assertRaises(subprocess.TimeoutExpired) as cm:
program = slapos.grid.utils.SlapPopen(
self.script.name,
timeout=1,
logger=logger)
# the timeout was respected
elapsed = time.time() - start
self.assertLess(elapsed, 5)
self.assertGreaterEqual(elapsed, 1)
# no output
self.assertEqual(cm.exception.output, '')
self.assertEqual(cm.exception.stderr, '')
# nothing is logged "live"
self.assertFalse(logger.info.called)
def test_timeout_killed(self):
self.script.write(b'#!/bin/sh\necho -n $$\nsleep 20')
self.script.close()
logger = mock.MagicMock()
start = time.time()
with self.assertRaises(subprocess.TimeoutExpired) as cm:
program = slapos.grid.utils.SlapPopen(
self.script.name,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=1,
logger=logger)
# the timeout was respected
elapsed = time.time() - start
self.assertLess(elapsed, 5)
self.assertGreaterEqual(elapsed, 1)
# output pid
pid = int(cm.exception.output)
self.assertEqual(cm.exception.stderr, '')
# subprocess has been killed
self.assertFalse(psutil.pid_exists(pid))
# endline is never reached, so nothing is logged "live"
self.assertFalse(logger.info.called)
def test_timeout_killed_grandchild(self):
self.script.write(textwrap.dedent("""\
#!/bin/sh
(echo $(exec /bin/sh -c 'echo "$PPID"'); sleep 20)
""").encode())
self.script.close()
logger = mock.MagicMock()
start = time.time()
with self.assertRaises(subprocess.TimeoutExpired) as cm:
program = slapos.grid.utils.SlapPopen(
self.script.name,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=1,
logger=logger)
# the timeout was respected
elapsed = time.time() - start
self.assertLess(elapsed, 5)
self.assertGreaterEqual(elapsed, 1)
# output pid
pid = int(cm.exception.output)
self.assertEqual(cm.exception.stderr, '')
# sub-subprocess has been killed
self.assertFalse(psutil.pid_exists(pid))
# the pid is logged "live"
logger.info.assert_called_once_with(str(pid))
class DummySystemExit(Exception): class DummySystemExit(Exception):
"""Dummy exception raised instead of SystemExit so that if something goes """Dummy exception raised instead of SystemExit so that if something goes
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment