Commit 7494de84 authored by Julien Muchembled's avatar Julien Muchembled

client: merge ConnectionPool inside Application

parents 305dda86 693aaf79
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import heapq import heapq
import random
import time import time
try: try:
...@@ -35,17 +36,21 @@ from neo.lib.protocol import NodeTypes, Packets, \ ...@@ -35,17 +36,21 @@ from neo.lib.protocol import NodeTypes, Packets, \
from neo.lib.util import makeChecksum, dump from neo.lib.util import makeChecksum, dump
from neo.lib.locking import Empty, Lock from neo.lib.locking import Empty, Lock
from neo.lib.connection import MTClientConnection, ConnectionClosed from neo.lib.connection import MTClientConnection, ConnectionClosed
from neo.lib.exception import NodeNotReady
from .exception import (NEOStorageError, NEOStorageCreationUndoneError, from .exception import (NEOStorageError, NEOStorageCreationUndoneError,
NEOStorageReadRetry, NEOStorageNotFoundError, NEOPrimaryMasterLost) NEOStorageReadRetry, NEOStorageNotFoundError, NEOPrimaryMasterLost)
from .handlers import storage, master from .handlers import storage, master
from neo.lib.threaded_app import ThreadedApplication from neo.lib.threaded_app import ThreadedApplication
from .cache import ClientCache from .cache import ClientCache
from .pool import ConnectionPool
from .transactions import TransactionContainer from .transactions import TransactionContainer
from neo.lib.util import p64, u64, parseMasterList from neo.lib.util import p64, u64, parseMasterList
CHECKED_SERIAL = object() CHECKED_SERIAL = object()
# How long before we might retry a connection to a node to which connection
# failed in the past.
MAX_FAILURE_AGE = 600
try: try:
from Signals.Signals import SignalHandler from Signals.Signals import SignalHandler
except ImportError: except ImportError:
...@@ -68,7 +73,6 @@ class Application(ThreadedApplication): ...@@ -68,7 +73,6 @@ class Application(ThreadedApplication):
name, **kw) name, **kw)
# Internal Attributes common to all thread # Internal Attributes common to all thread
self._db = None self._db = None
self.cp = ConnectionPool(self)
self.primary_master_node = None self.primary_master_node = None
self.trying_master_node = None self.trying_master_node = None
...@@ -102,6 +106,9 @@ class Application(ThreadedApplication): ...@@ -102,6 +106,9 @@ class Application(ThreadedApplication):
# _connecting_to_master_node is used to prevent simultaneous master # _connecting_to_master_node is used to prevent simultaneous master
# node connection attempts # node connection attempts
self._connecting_to_master_node = Lock() self._connecting_to_master_node = Lock()
# same for storage nodes
self._connecting_to_storage_node = Lock()
self._node_failure_dict = {}
self.compress = getCompress(compress) self.compress = getCompress(compress)
def __getattr__(self, attr): def __getattr__(self, attr):
...@@ -246,6 +253,53 @@ class Application(ThreadedApplication): ...@@ -246,6 +253,53 @@ class Application(ThreadedApplication):
logging.info("Connected and ready") logging.info("Connected and ready")
return conn return conn
def getStorageConnection(self, node):
conn = node._connection # XXX
if node.isRunning() if conn is None else not node._identified:
with self._connecting_to_storage_node:
conn = node._connection # XXX
if conn is None:
return self._connectToStorageNode(node)
return conn
def _connectToStorageNode(self, node):
if self.master_conn is None:
raise NEOPrimaryMasterLost
conn = MTClientConnection(self, self.storage_event_handler, node,
dispatcher=self.dispatcher)
p = Packets.RequestIdentification(NodeTypes.CLIENT,
self.uuid, None, self.name, (), self.id_timestamp)
try:
self._ask(conn, p, handler=self.storage_bootstrap_handler)
except ConnectionClosed:
logging.error('Connection to %r failed', node)
except NodeNotReady:
logging.info('%r not ready', node)
else:
logging.info('Connected %r', node)
# Make sure this node will be considered for the next reads
# even if there was a previous recent failure.
self._node_failure_dict.pop(node.getUUID(), None)
return conn
self._node_failure_dict[node.getUUID()] = time.time() + MAX_FAILURE_AGE
def getCellSortKey(self, cell, random=random.random):
# Prefer a node that didn't fail recently.
failure = self._node_failure_dict.get(cell.getUUID())
if failure:
if time.time() < failure:
# Or order by date of connection failure.
return failure
# Do not use 'del' statement: we didn't lock, so another
# thread might have removed uuid from _node_failure_dict.
self._node_failure_dict.pop(cell.getUUID(), None)
# A random one, connected or not, is a trivial and quite efficient way
# to distribute the load evenly. On write accesses, a client connects
# to all nodes of touched cells, but before that, or if a client is
# specialized to only do read-only accesses, it should not limit
# itself to only use the first connected nodes.
return random()
def registerDB(self, db, limit): def registerDB(self, db, limit):
self._db = db self._db = db
...@@ -274,7 +328,6 @@ class Application(ThreadedApplication): ...@@ -274,7 +328,6 @@ class Application(ThreadedApplication):
return int(u64(self.last_oid)) return int(u64(self.last_oid))
def _askStorageForRead(self, object_id, packet, askStorage=None): def _askStorageForRead(self, object_id, packet, askStorage=None):
cp = self.cp
pt = self.pt pt = self.pt
# BBB: On Py2, it can be a subclass of bytes (binary from zodbpickle). # BBB: On Py2, it can be a subclass of bytes (binary from zodbpickle).
if isinstance(object_id, bytes): if isinstance(object_id, bytes):
...@@ -287,10 +340,10 @@ class Application(ThreadedApplication): ...@@ -287,10 +340,10 @@ class Application(ThreadedApplication):
failed = 0 failed = 0
while 1: while 1:
cell_list = pt.getCellList(object_id, True) cell_list = pt.getCellList(object_id, True)
cell_list.sort(key=cp.getCellSortKey) cell_list.sort(key=self.getCellSortKey)
for cell in cell_list: for cell in cell_list:
node = cell.getNode() node = cell.getNode()
conn = cp.getConnForNode(node) conn = self.getStorageConnection(node)
if conn is not None: if conn is not None:
try: try:
return askStorage(conn, packet) return askStorage(conn, packet)
...@@ -725,8 +778,8 @@ class Application(ThreadedApplication): ...@@ -725,8 +778,8 @@ class Application(ThreadedApplication):
# Ask storage the undo serial (serial at which object's previous data # Ask storage the undo serial (serial at which object's previous data
# is) # is)
getCellList = self.pt.getCellList getCellList = self.pt.getCellList
getCellSortKey = self.cp.getCellSortKey getCellSortKey = self.getCellSortKey
getConnForNode = self.cp.getConnForNode getConnForNode = self.getStorageConnection
queue = self._thread_container.queue queue = self._thread_container.queue
ttid = txn_context.ttid ttid = txn_context.ttid
undo_object_tid_dict = {} undo_object_tid_dict = {}
...@@ -816,7 +869,7 @@ class Application(ThreadedApplication): ...@@ -816,7 +869,7 @@ class Application(ThreadedApplication):
packet = Packets.AskTIDs(first, last, INVALID_PARTITION) packet = Packets.AskTIDs(first, last, INVALID_PARTITION)
tid_set = set() tid_set = set()
for storage_node in self.pt.getNodeSet(True): for storage_node in self.pt.getNodeSet(True):
conn = self.cp.getConnForNode(storage_node) conn = self.getStorageConnection(storage_node)
if conn is None: if conn is None:
continue continue
conn.ask(packet, queue=queue, tid_set=tid_set) conn.ask(packet, queue=queue, tid_set=tid_set)
......
#
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import random, time
from neo.lib import logging
from neo.lib.locking import Lock
from neo.lib.protocol import NodeTypes, Packets
from neo.lib.connection import MTClientConnection, ConnectionClosed
from neo.lib.exception import NodeNotReady
from .exception import NEOPrimaryMasterLost
# How long before we might retry a connection to a node to which connection
# failed in the past.
MAX_FAILURE_AGE = 600
class ConnectionPool(object):
"""This class manages a pool of connections to storage nodes."""
# XXX: This is not a pool anymore.
def __init__(self, app):
self.app = app
# Define a lock in order to create one connection to
# a storage node at a time to avoid multiple connections
# to the same node.
self._lock = Lock()
self.node_failure_dict = {}
def _initNodeConnection(self, node):
"""Init a connection to a given storage node."""
app = self.app
if app.master_conn is None:
raise NEOPrimaryMasterLost
conn = MTClientConnection(app, app.storage_event_handler, node,
dispatcher=app.dispatcher)
p = Packets.RequestIdentification(NodeTypes.CLIENT,
app.uuid, None, app.name, (), app.id_timestamp)
try:
app._ask(conn, p, handler=app.storage_bootstrap_handler)
except ConnectionClosed:
logging.error('Connection to %r failed', node)
except NodeNotReady:
logging.info('%r not ready', node)
else:
logging.info('Connected %r', node)
# Make sure this node will be considered for the next reads
# even if there was a previous recent failure.
self.node_failure_dict.pop(node.getUUID(), None)
return conn
self.node_failure_dict[node.getUUID()] = time.time() + MAX_FAILURE_AGE
def getCellSortKey(self, cell, random=random.random):
# Prefer a node that didn't fail recently.
failure = self.node_failure_dict.get(cell.getUUID())
if failure:
if time.time() < failure:
# Or order by date of connection failure.
return failure
# Do not use 'del' statement: we didn't lock, so another
# thread might have removed uuid from node_failure_dict.
self.node_failure_dict.pop(cell.getUUID(), None)
# A random one, connected or not, is a trivial and quite efficient way
# to distribute the load evenly. On write accesses, a client connects
# to all nodes of touched cells, but before that, or if a client is
# specialized to only do read-only accesses, it should not limit
# itself to only use the first connected nodes.
return random()
def getConnForNode(self, node):
"""Return a locked connection object to a given node
If no connection exists, create a new one"""
conn = node._connection # XXX
if node.isRunning() if conn is None else not node._identified:
with self._lock:
conn = node._connection # XXX
if conn is None:
return self._initNodeConnection(node)
return conn
def closeAll(self):
with self._lock:
for node in self.app.nm.getStorageList():
conn = node._connection # XXX
if conn is not None:
conn.setReconnectionNoDelay()
conn.close()
...@@ -67,7 +67,7 @@ class Transaction(object): ...@@ -67,7 +67,7 @@ class Transaction(object):
try: try:
conn = conn_dict[uuid] conn = conn_dict[uuid]
except KeyError: except KeyError:
conn = conn_dict[uuid] = app.cp.getConnForNode(node) conn = conn_dict[uuid] = app.getStorageConnection(node)
if self.locking_tid and 'oid' in kw: if self.locking_tid and 'oid' in kw:
# A deadlock happened but this node is not aware of it. # A deadlock happened but this node is not aware of it.
# Tell it to write-lock with the same locking tid as # Tell it to write-lock with the same locking tid as
......
...@@ -439,13 +439,20 @@ class ClientApplication(Node, neo.client.app.Application): ...@@ -439,13 +439,20 @@ class ClientApplication(Node, neo.client.app.Application):
conn = self._getMasterConnection() conn = self._getMasterConnection()
else: else:
assert isinstance(peer, StorageApplication) assert isinstance(peer, StorageApplication)
conn = self.cp.getConnForNode(self.nm.getByUUID(peer.uuid)) conn = self.getStorageConnection(self.nm.getByUUID(peer.uuid))
yield conn yield conn
def extraCellSortKey(self, key): def extraCellSortKey(self, key):
return Patch(self.cp, getCellSortKey=lambda orig, cell: return Patch(self, getCellSortKey=lambda orig, cell:
(orig(cell, lambda: key(cell)), random.random())) (orig(cell, lambda: key(cell)), random.random()))
def closeAllStorageConnections(self):
for node in self.nm.getStorageList():
conn = node._connection # XXX
if conn is not None:
conn.setReconnectionNoDelay()
conn.close()
class NeoCTL(neo.neoctl.app.NeoCTL): class NeoCTL(neo.neoctl.app.NeoCTL):
def __init__(self, *args, **kw): def __init__(self, *args, **kw):
...@@ -1045,8 +1052,9 @@ class NEOThreadedTest(NeoTestBase): ...@@ -1045,8 +1052,9 @@ class NEOThreadedTest(NeoTestBase):
@staticmethod @staticmethod
def noConnection(jar, storage): def noConnection(jar, storage):
return Patch(jar.db().storage.app.cp, getConnForNode=lambda orig, node: return Patch(jar.db().storage.app,
None if node.getUUID() == storage.uuid else orig(node)) getStorageConnection=lambda orig, node:
None if node.getUUID() == storage.uuid else orig(node))
@staticmethod @staticmethod
def getConnectionApp(conn): def getConnectionApp(conn):
......
...@@ -468,7 +468,7 @@ class Test(NEOThreadedTest): ...@@ -468,7 +468,7 @@ class Test(NEOThreadedTest):
s0, s1 = cluster.client.nm.getStorageList() s0, s1 = cluster.client.nm.getStorageList()
conn = s0.getConnection() conn = s0.getConnection()
self.assertFalse(conn.isClosed()) self.assertFalse(conn.isClosed())
getCellSortKey = cluster.client.cp.getCellSortKey getCellSortKey = cluster.client.getCellSortKey
self.assertEqual(getCellSortKey(s0, good), 0) self.assertEqual(getCellSortKey(s0, good), 0)
cluster.neoctl.dropNode(s0.getUUID()) cluster.neoctl.dropNode(s0.getUUID())
self.assertEqual([s1], cluster.client.nm.getStorageList()) self.assertEqual([s1], cluster.client.nm.getStorageList())
...@@ -744,20 +744,20 @@ class Test(NEOThreadedTest): ...@@ -744,20 +744,20 @@ class Test(NEOThreadedTest):
def testStorageReconnectDuringStore(self, cluster): def testStorageReconnectDuringStore(self, cluster):
t, c = cluster.getTransaction() t, c = cluster.getTransaction()
c.root()[0] = 'ok' c.root()[0] = 'ok'
cluster.client.cp.closeAll() cluster.client.closeAllStorageConnections()
t.commit() # store request t.commit() # store request
@with_cluster(storage_count=2, partitions=2) @with_cluster(storage_count=2, partitions=2)
def testStorageReconnectDuringTransactionLog(self, cluster): def testStorageReconnectDuringTransactionLog(self, cluster):
t, c = cluster.getTransaction() t, c = cluster.getTransaction()
cluster.client.cp.closeAll() cluster.client.closeAllStorageConnections()
tid, (t1,) = cluster.client.transactionLog( tid, (t1,) = cluster.client.transactionLog(
ZERO_TID, c.db().lastTransaction(), 10) ZERO_TID, c.db().lastTransaction(), 10)
@with_cluster(storage_count=2, partitions=2) @with_cluster(storage_count=2, partitions=2)
def testStorageReconnectDuringUndoLog(self, cluster): def testStorageReconnectDuringUndoLog(self, cluster):
t, c = cluster.getTransaction() t, c = cluster.getTransaction()
cluster.client.cp.closeAll() cluster.client.closeAllStorageConnections()
t1, = cluster.client.undoLog(0, 10) t1, = cluster.client.undoLog(0, 10)
@with_cluster(storage_count=2, replicas=1) @with_cluster(storage_count=2, replicas=1)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment