Commit a87f9a98 authored by Julien Muchembled's avatar Julien Muchembled

taskdistribution: really fix lock to avoid errors with concurrent RPC calls

parent 11f58766
...@@ -96,22 +96,14 @@ def patchRPCParser(error_handler): ...@@ -96,22 +96,14 @@ def patchRPCParser(error_handler):
parser_klass.feed = verbose_feed parser_klass.feed = verbose_feed
class RPCRetry(object): class RPCRetry(object):
def __init__(self, proxy, retry_time, logger, timeout=120): def __init__(self, proxy, retry_time, logger):
super(RPCRetry, self).__init__() super(RPCRetry, self).__init__()
self._proxy = proxy self._proxy = proxy
self._retry_time = retry_time self._retry_time = retry_time
self._logger = logger self._logger = logger
self.__rpc_lock = threading.Lock()
self.timeout = timeout
def _RPC(self, func_id, args=()): def _RPC(self, func_id, args=()):
default_timeout = socket.getdefaulttimeout()
socket.setdefaulttimeout(self.timeout)
try:
with self.__rpc_lock:
return getattr(self._proxy, func_id)(*args) return getattr(self._proxy, func_id)(*args)
finally:
socket.setdefaulttimeout(default_timeout)
def _retryRPC(self, func_id, args=()): def _retryRPC(self, func_id, args=()):
retry_time = self._retry_time retry_time = self._retry_time
...@@ -359,6 +351,23 @@ class TestResultProxy(RPCRetry): ...@@ -359,6 +351,23 @@ class TestResultProxy(RPCRetry):
cond.notify() cond.notify()
self._watcher_thread.join() self._watcher_thread.join()
class ServerProxy(xmlrpclib.ServerProxy):
def __init__(self, *args, **kw):
xmlrpclib.ServerProxy.__init__(self, *args, **kw)
transport = self.__transport
def make_connection(*args, **kw):
conn = transport.__class__.make_connection(transport, *args, **kw)
assert hasattr(conn, 'timeout'), conn
conn.timeout = 120
return conn
transport.make_connection = make_connection
self.__rpc_lock = threading.Lock()
def __request(self, *args, **kw):
with self.__rpc_lock:
return xmlrpclib.ServerProxy.__request(self, *args, **kw)
class TaskDistributionTool(RPCRetry): class TaskDistributionTool(RPCRetry):
def __init__(self, portal_url, retry_time=64, logger=None): def __init__(self, portal_url, retry_time=64, logger=None):
""" """
...@@ -371,7 +380,7 @@ class TaskDistributionTool(RPCRetry): ...@@ -371,7 +380,7 @@ class TaskDistributionTool(RPCRetry):
if portal_url is None: if portal_url is None:
proxy = DummyTaskDistributionTool() proxy = DummyTaskDistributionTool()
else: else:
proxy = xmlrpclib.ServerProxy( proxy = ServerProxy(
portal_url, portal_url,
allow_none=True, allow_none=True,
).portal_task_distribution ).portal_task_distribution
...@@ -423,10 +432,7 @@ class TaskDistributor(RPCRetry): ...@@ -423,10 +432,7 @@ class TaskDistributor(RPCRetry):
if portal_url is None: if portal_url is None:
proxy = DummyTaskDistributionTool() proxy = DummyTaskDistributionTool()
else: else:
proxy = xmlrpclib.ServerProxy( proxy = ServerProxy(portal_url, allow_none=True)
portal_url,
allow_none=True,
)
super(TaskDistributor, self).__init__(proxy, retry_time,logger) super(TaskDistributor, self).__init__(proxy, retry_time,logger)
protocol_revision = self._retryRPC('getProtocolRevision') protocol_revision = self._retryRPC('getProtocolRevision')
if protocol_revision != 1: if protocol_revision != 1:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment