Commit d9b41a14 authored by Julien Muchembled's avatar Julien Muchembled

Partially fix race conditions when allocating ports

This fixes errors like:
- ConnectorException: makeListeningConnection on ('127.0.0.1', 33157) failed: 98:Address already in use
  Error executing './neomaster ...
- "error: [Errno 22] Invalid argument" in __allocatePort

A system-wide lock is used to prevent conflicts when several NEO test suites are
run simultaneously.

However, errors could still happen if other software tries to allocate ports.
Allocated ports are kept open as long as possible to minimize the probability
that it happens: they are closed just before spawning subprocesses.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2665 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 8b9d037f
......@@ -32,7 +32,7 @@ from neo.neoctl.neoctl import NeoCTL, NotReadyException
from neo.lib.protocol import ClusterStates, NodeTypes, CellStates, NodeStates
from neo.lib.util import dump, SOCKET_CONNECTORS_DICT
from neo.tests import DB_ADMIN, DB_PASSWD, NeoTestBase, buildUrlFromString, \
ADDRESS_TYPE, IP_VERSION_FORMAT_DICT
ADDRESS_TYPE, IP_VERSION_FORMAT_DICT, SocketLock
from neo.client.Storage import Storage
NEO_MASTER = 'neomaster'
......@@ -54,6 +54,39 @@ class AlreadyStopped(Exception):
class NotFound(Exception):
pass
class PortAllocator(object):
lock = SocketLock('neo.PortAllocator')
allocator_set = set()
def __init__(self):
self.socket_list = []
def allocate(self, address_type, local_ip):
s = socket.socket(address_type, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if not self.lock.locked():
self.lock.acquire()
self.allocator_set.add(self)
self.socket_list.append(s)
s.bind((local_ip, 0))
return s.getsockname()[1]
def release(self):
for s in self.socket_list:
s.close()
self.socket_list = None
def reset(self):
if self.lock.locked():
self.allocator_set.discard(self)
if not self.allocator_set:
self.lock.release()
if self.socket_list:
for s in self.socket_list:
s.close()
self.__init__()
class NEOProcess(object):
pid = 0
......@@ -185,18 +218,19 @@ class NEOCluster(object):
self.db_password = db_password
self.db_list = db_list
self.address_type = address_type
self.local_ip = IP_VERSION_FORMAT_DICT[self.address_type]
self.local_ip = local_ip = IP_VERSION_FORMAT_DICT[self.address_type]
if clear_databases:
self.setupDB()
self.process_dict = {}
self.port_set = set()
if temp_dir is None:
temp_dir = tempfile.mkdtemp(prefix='neo_')
print 'Using temp directory %r.' % (temp_dir, )
self.temp_dir = temp_dir
admin_port = self.__allocatePort()
self.port_allocator = PortAllocator()
admin_port = self.port_allocator.allocate(address_type, local_ip)
self.cluster_name = 'neo_%s' % (random.randint(0, 100), )
master_node_list = [self.__allocatePort() for i in xrange(master_node_count)]
master_node_list = [self.port_allocator.allocate(address_type, local_ip)
for i in xrange(master_node_count)]
self.master_nodes = '/'.join('%s:%s' % (
buildUrlFromString(self.local_ip), x, )
for x in master_node_list)
......@@ -246,19 +280,6 @@ class NEOCluster(object):
self.process_dict.setdefault(command, []).append(
NEOProcess(command, uuid, arguments))
def __allocatePort(self):
port_set = self.port_set
s = socket.socket(self.address_type, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
while True:
s.bind((self.local_ip, 0))
port = s.getsockname()[1]
if port not in port_set:
break
s.close()
port_set.add(port)
return port
def __allocateUUID(self):
uuid = os.urandom(16)
self.uuid_set.add(uuid)
......@@ -300,6 +321,7 @@ class NEOCluster(object):
def run(self, except_storages=()):
""" Start cluster processes except some storage nodes """
assert len(self.process_dict)
self.port_allocator.release()
for process_list in self.process_dict.itervalues():
for process in process_list:
if process not in except_storages:
......@@ -315,6 +337,7 @@ class NEOCluster(object):
time.sleep(0.5)
else:
break
self.port_allocator.reset()
def start(self, except_storages=()):
""" Do a complete start of a cluster """
......@@ -582,6 +605,10 @@ class NEOCluster(object):
def __del__(self):
if self.cleanup_on_delete:
os.removedirs(self.temp_dir)
try:
self.port_allocator.reset()
except AttributeError:
pass
class NEOFunctionalTest(NeoTestBase):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment