Commit 164d04bf authored by Alain Takoudjou's avatar Alain Takoudjou

grid.promise: avoid blocking process while sending or receiving message from queue

if the message size is too big, the promise process can hang on queue.put and parent process will no be able to read the message.
This will finally result in a deadlock situation.
parent 2925af95
......@@ -49,6 +49,11 @@ PROMISE_PERIOD_FILE_NAME = '%s.periodicity'
class BaseResult(object):
def __init__(self, problem=False, message=None, date=None):
self.__problem = problem
# The promise message should be very short,
# a huge message size can freeze the process Pipe
# XXX this is important to prevent process deadlock
if len(message) > 5000:
message = '...%s' % message[-5000:]
self.__message = message
self.__date = date
if self.__date is None:
......@@ -398,6 +403,19 @@ class GenericPromise(object):
return module(problem=False, message=message)
return module(problem=True, message=message)
def __sendResult(self, result_item, retry_amount=3):
"""Send result to queue, retry if error (non blocking)"""
error = None
for i in range(0, retry_amount):
try:
self.__queue.put_nowait(result_item)
break
except Queue.Full, e:
error = e
time.sleep(0.5)
if error:
raise error
def _test(self, result_count=1, failure_amount=1, latest_minute=0):
"""
Default promise test method
......@@ -471,9 +489,9 @@ class GenericPromise(object):
self.__logger_buffer.close()
# send the result of this promise
self.__queue.put(PromiseQueueResult(
self.__sendResult(PromiseQueueResult(
path=self.__promise_path,
name=self.__name,
title=self.__title,
item=result
), True)
))
......@@ -49,7 +49,7 @@ class WrapPromise(GenericPromise):
@staticmethod
def terminate(name, logger, process, signum, frame):
if signum in [signal.SIGINT, signal.SIGTERM] and process:
if signum in [signal.SIGINT, signal.SIGTERM] and process.poll() is None:
logger.info("Terminating promise process %r" % name)
try:
# make sure we kill the process on timeout
......
......@@ -963,6 +963,29 @@ echo "success"
self.launcher.run()
self.assertTrue(self.called)
def test_runpromise_wrapped_will_timeout_two(self):
first_promise = "my_bash_promise"
first_promise_path = os.path.join(self.legacy_promise_dir, first_promise)
second_promise = "my_second_bash_promise"
second_promise_path = os.path.join(self.legacy_promise_dir, second_promise)
def createPromise(promise_path):
with open(promise_path, 'w') as f:
f.write("""#!/bin/bash
echo "some data from promise"
sleep 20
echo "success"
exit 1
""")
os.chmod(promise_path, 0744)
createPromise(first_promise_path)
createPromise(second_promise_path)
self.configureLauncher(timeout=0.5)
# run promise will timeout
with self.assertRaises(PromiseError):
self.launcher.run()
class TestSlapOSGenericPromise(TestSlapOSPromiseMixin):
def initialisePromise(self, promise_content="", success=True, timeout=60):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment