Commit 2ed8fb0b authored by Julien Muchembled's avatar Julien Muchembled

Fix resource leaks when nodes and threaded tests end

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2823 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent bf255041
...@@ -79,6 +79,12 @@ class Application(object): ...@@ -79,6 +79,12 @@ class Application(object):
self.reset() self.reset()
registerLiveDebugger(on_log=self.log) registerLiveDebugger(on_log=self.log)
def close(self):
self.listening_conn = None
self.nm.close()
self.em.close()
del self.__dict__
def reset(self): def reset(self):
self.bootstrapped = False self.bootstrapped = False
self.master_conn = None self.master_conn = None
......
...@@ -113,6 +113,7 @@ class MasterEventHandler(EventHandler): ...@@ -113,6 +113,7 @@ class MasterEventHandler(EventHandler):
def _connectionLost(self, conn): def _connectionLost(self, conn):
app = self.app app = self.app
if app.listening_conn: # if running
assert app.master_conn in (conn, None) assert app.master_conn in (conn, None)
app.dispatcher.clear() app.dispatcher.clear()
app.reset() app.reset()
......
...@@ -30,6 +30,11 @@ class EpollEventManager(object): ...@@ -30,6 +30,11 @@ class EpollEventManager(object):
self.epoll = Epoll() self.epoll = Epoll()
self._pending_processing = [] self._pending_processing = []
def close(self):
for c in self.connection_dict.values():
c.close()
del self.__dict__
def getConnectionList(self): def getConnectionList(self):
# XXX: use index # XXX: use index
return [x for x in self.connection_dict.values() if not x.isAborted()] return [x for x in self.connection_dict.values() if not x.isAborted()]
......
...@@ -26,6 +26,8 @@ from neo.lib import attributeTracker ...@@ -26,6 +26,8 @@ from neo.lib import attributeTracker
class Node(object): class Node(object):
"""This class represents a node.""" """This class represents a node."""
_connection = None
def __init__(self, manager, address=None, uuid=None, def __init__(self, manager, address=None, uuid=None,
state=NodeStates.UNKNOWN): state=NodeStates.UNKNOWN):
self._state = state self._state = state
...@@ -33,7 +35,6 @@ class Node(object): ...@@ -33,7 +35,6 @@ class Node(object):
self._uuid = uuid self._uuid = uuid
self._manager = manager self._manager = manager
self._last_state_change = time() self._last_state_change = time()
self._connection = None
manager.add(self) manager.add(self)
def notify(self, packet): def notify(self, packet):
...@@ -88,7 +89,7 @@ class Node(object): ...@@ -88,7 +89,7 @@ class Node(object):
Callback from node's connection when closed Callback from node's connection when closed
""" """
assert self._connection is not None assert self._connection is not None
self._connection = None del self._connection
self._manager._updateIdentified(self) self._manager._updateIdentified(self)
def setConnection(self, connection): def setConnection(self, connection):
...@@ -261,6 +262,8 @@ class NodeManager(object): ...@@ -261,6 +262,8 @@ class NodeManager(object):
self._state_dict = {} self._state_dict = {}
self._identified_dict = {} self._identified_dict = {}
close = __init__
def add(self, node): def add(self, node):
if node in self._node_set: if node in self._node_set:
neo.lib.logging.warning('adding a known node %r, ignoring', node) neo.lib.logging.warning('adding a known node %r, ignoring', node)
......
...@@ -94,6 +94,12 @@ class Application(object): ...@@ -94,6 +94,12 @@ class Application(object):
registerLiveDebugger(on_log=self.log) registerLiveDebugger(on_log=self.log)
def close(self):
self.listening_conn = None
self.nm.close()
self.em.close()
del self.__dict__
def log(self): def log(self):
self.em.log() self.em.log()
self.nm.log() self.nm.log()
......
...@@ -31,6 +31,7 @@ class ClientServiceHandler(MasterHandler): ...@@ -31,6 +31,7 @@ class ClientServiceHandler(MasterHandler):
def connectionLost(self, conn, new_state): def connectionLost(self, conn, new_state):
# cancel its transactions and forgot the node # cancel its transactions and forgot the node
app = self.app app = self.app
if app.listening_conn: # if running
node = app.nm.getByUUID(conn.getUUID()) node = app.nm.getByUUID(conn.getUUID())
assert node is not None assert node is not None
app.tm.abortFor(node) app.tm.abortFor(node)
......
...@@ -39,6 +39,9 @@ class TerminalNeoCTL(object): ...@@ -39,6 +39,9 @@ class TerminalNeoCTL(object):
def __init__(self, address): def __init__(self, address):
self.neoctl = NeoCTL(address) self.neoctl = NeoCTL(address)
def __del__(self):
self.neoctl.close()
# Utility methods (could be functions) # Utility methods (could be functions)
def asNodeState(self, value): def asNodeState(self, value):
return NodeStates.getByName(value.upper()) return NodeStates.getByName(value.upper())
......
...@@ -38,6 +38,10 @@ class NeoCTL(object): ...@@ -38,6 +38,10 @@ class NeoCTL(object):
self.handler = CommandEventHandler(self) self.handler = CommandEventHandler(self)
self.response_queue = [] self.response_queue = []
def close(self):
self.em.close()
del self.__dict__
def __getConnection(self): def __getConnection(self):
if not self.connected: if not self.connected:
self.connection = ClientConnection(self.em, self.handler, self.connection = ClientConnection(self.em, self.handler,
......
...@@ -88,6 +88,12 @@ class Application(object): ...@@ -88,6 +88,12 @@ class Application(object):
registerLiveDebugger(on_log=self.log) registerLiveDebugger(on_log=self.log)
def close(self):
self.listening_conn = None
self.nm.close()
self.em.close()
del self.__dict__
def _poll(self): def _poll(self):
self.em.poll(1) self.em.poll(1)
......
...@@ -26,6 +26,7 @@ from neo.lib.protocol import NodeStates, NodeTypes, Packets, Errors ...@@ -26,6 +26,7 @@ from neo.lib.protocol import NodeStates, NodeTypes, Packets, Errors
class BaseMasterHandler(EventHandler): class BaseMasterHandler(EventHandler):
def connectionLost(self, conn, new_state): def connectionLost(self, conn, new_state):
if self.app.listening_conn: # if running
raise PrimaryFailure('connection lost') raise PrimaryFailure('connection lost')
def stopOperation(self, conn): def stopOperation(self, conn):
......
...@@ -36,6 +36,7 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler): ...@@ -36,6 +36,7 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler):
def connectionLost(self, conn, new_state): def connectionLost(self, conn, new_state):
uuid = conn.getUUID() uuid = conn.getUUID()
node = self.app.nm.getByUUID(uuid) node = self.app.nm.getByUUID(uuid)
if self.app.listening_conn: # if running
assert node is not None, conn assert node is not None, conn
self.app.nm.remove(node) self.app.nm.remove(node)
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
# along with this program; if not, write to the Free Software # along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import os, random, socket, sys, tempfile, threading, time, types import os, random, socket, sys, tempfile, threading, time, types, weakref
from collections import deque from collections import deque
from functools import wraps from functools import wraps
from Queue import Queue, Empty from Queue import Queue, Empty
...@@ -367,8 +367,9 @@ class NEOCluster(object): ...@@ -367,8 +367,9 @@ class NEOCluster(object):
ip = getVirtualIp('master') ip = getVirtualIp('master')
self.master_nodes = ' '.join('%s:%s' % (ip, i) self.master_nodes = ' '.join('%s:%s' % (ip, i)
for i in xrange(master_count)) for i in xrange(master_count))
kw = dict(cluster=self, getReplicas=replicas, getPartitions=partitions, weak_self = weakref.proxy(self)
getAdapter=adapter, getReset=clear_databases) kw = dict(cluster=weak_self, getReplicas=replicas, getAdapter=adapter,
getPartitions=partitions, getReset=clear_databases)
self.master_list = [MasterApplication(address=(ip, i), **kw) self.master_list = [MasterApplication(address=(ip, i), **kw)
for i in xrange(master_count)] for i in xrange(master_count)]
ip = getVirtualIp('storage') ip = getVirtualIp('storage')
...@@ -383,8 +384,8 @@ class NEOCluster(object): ...@@ -383,8 +384,8 @@ class NEOCluster(object):
for i, x in enumerate(db_list)] for i, x in enumerate(db_list)]
ip = getVirtualIp('admin') ip = getVirtualIp('admin')
self.admin_list = [AdminApplication(address=(ip, 0), **kw)] self.admin_list = [AdminApplication(address=(ip, 0), **kw)]
self.client = ClientApplication(self) self.client = ClientApplication(weak_self)
self.neoctl = NeoCTL(self) self.neoctl = NeoCTL(weak_self)
# A few shortcuts that work when there's only 1 master/storage/admin # A few shortcuts that work when there's only 1 master/storage/admin
@property @property
...@@ -493,6 +494,13 @@ class NEOCluster(object): ...@@ -493,6 +494,13 @@ class NEOCluster(object):
txn = transaction.TransactionManager() txn = transaction.TransactionManager()
return txn, self.db.open(transaction_manager=txn) return txn, self.db.open(transaction_manager=txn)
def __del__(self):
self.neoctl.close()
for node_type in 'admin', 'storage', 'master':
for node in getattr(self, node_type + '_list'):
node.close()
self.client.em.close()
class NEOThreadedTest(NeoTestBase): class NEOThreadedTest(NeoTestBase):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment