Commit 6a75a654 authored by Julien Muchembled's avatar Julien Muchembled

client: speed up cell sorting on read-access

parent 0e57eb05
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
from cPickle import dumps, loads from cPickle import dumps, loads
from zlib import compress, decompress from zlib import compress, decompress
from random import shuffle
import heapq import heapq
import time import time
...@@ -278,10 +277,6 @@ class Application(ThreadedApplication): ...@@ -278,10 +277,6 @@ class Application(ThreadedApplication):
failed = 0 failed = 0
while 1: while 1:
cell_list = pt.getCellList(object_id, True) cell_list = pt.getCellList(object_id, True)
# Shuffle to randomise node to access...
shuffle(cell_list)
# ...and sort with non-unique keys, to prioritise ranges of
# randomised entries.
cell_list.sort(key=cp.getCellSortKey) cell_list.sort(key=cp.getCellSortKey)
for cell in cell_list: for cell in cell_list:
node = cell.getNode() node = cell.getNode()
...@@ -731,10 +726,6 @@ class Application(ThreadedApplication): ...@@ -731,10 +726,6 @@ class Application(ThreadedApplication):
# only between the client and the storage, the latter would # only between the client and the storage, the latter would
# still be readable until we commit. # still be readable until we commit.
if txn_context.involved_nodes.get(cell.getUUID(), 0) < 2] if txn_context.involved_nodes.get(cell.getUUID(), 0) < 2]
# We do want to shuffle before getting one with the smallest
# key, so that all cells with the same (smallest) key has
# identical chance to be chosen.
shuffle(cell_list)
storage_conn = getConnForNode( storage_conn = getConnForNode(
min(cell_list, key=getCellSortKey).getNode()) min(cell_list, key=getCellSortKey).getNode())
storage_conn.ask(Packets.AskObjectUndoSerial(ttid, storage_conn.ask(Packets.AskObjectUndoSerial(ttid,
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import time import random, time
from neo.lib import logging from neo.lib import logging
from neo.lib.locking import Lock from neo.lib.locking import Lock
from neo.lib.protocol import NodeTypes, Packets from neo.lib.protocol import NodeTypes, Packets
...@@ -26,14 +26,6 @@ from .exception import NEOPrimaryMasterLost ...@@ -26,14 +26,6 @@ from .exception import NEOPrimaryMasterLost
# failed in the past. # failed in the past.
MAX_FAILURE_AGE = 600 MAX_FAILURE_AGE = 600
# Cell list sort keys, only for read access
# We are connected to storage node hosting cell, high priority
CELL_CONNECTED = -1
# normal priority
CELL_GOOD = 0
# Storage node hosting cell failed recently, low priority
CELL_FAILED = 1
class ConnectionPool(object): class ConnectionPool(object):
"""This class manages a pool of connections to storage nodes.""" """This class manages a pool of connections to storage nodes."""
...@@ -66,21 +58,24 @@ class ConnectionPool(object): ...@@ -66,21 +58,24 @@ class ConnectionPool(object):
else: else:
logging.info('Connected %r', node) logging.info('Connected %r', node)
return conn return conn
self.notifyFailure(node)
def notifyFailure(self, node):
self.node_failure_dict[node.getUUID()] = time.time() + MAX_FAILURE_AGE self.node_failure_dict[node.getUUID()] = time.time() + MAX_FAILURE_AGE
def getCellSortKey(self, cell): def getCellSortKey(self, cell, random=random.random):
# The use of 'random' suffles cells to randomise node to access.
uuid = cell.getUUID() uuid = cell.getUUID()
# First, prefer a connected node.
if uuid in self.connection_dict: if uuid in self.connection_dict:
return CELL_CONNECTED return random()
# Then one that didn't fail recently.
failure = self.node_failure_dict.get(uuid) failure = self.node_failure_dict.get(uuid)
if failure: if failure:
if time.time() < failure: if time.time() < failure:
return CELL_FAILED # At last, order by date of connection failure.
return failure
# Do not use 'del' statement: we didn't lock, so another
# thread might have removed uuid from node_failure_dict.
self.node_failure_dict.pop(uuid, None) self.node_failure_dict.pop(uuid, None)
return CELL_GOOD return 1 + random()
def getConnForNode(self, node): def getConnForNode(self, node):
"""Return a locked connection object to a given node """Return a locked connection object to a given node
......
...@@ -66,7 +66,6 @@ UNIT_TEST_MODULES = [ ...@@ -66,7 +66,6 @@ UNIT_TEST_MODULES = [
# client application # client application
'neo.tests.client.testClientApp', 'neo.tests.client.testClientApp',
'neo.tests.client.testMasterHandler', 'neo.tests.client.testMasterHandler',
'neo.tests.client.testConnectionPool',
# light functional tests # light functional tests
'neo.tests.threaded.test', 'neo.tests.threaded.test',
'neo.tests.threaded.testImporter', 'neo.tests.threaded.testImporter',
......
#
# Copyright (C) 2009-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import time, unittest
from ..mock import Mock
from .. import NeoUnitTestBase
from neo.client.app import ConnectionPool
from neo.client import pool
class ConnectionPoolTests(NeoUnitTestBase):
# TODO: test getConnForNode (requires splitting complex functionalities)
def test_CellSortKey(self):
cp = ConnectionPool(None)
node_uuid_1 = self.getStorageUUID()
node_uuid_2 = self.getStorageUUID()
node_uuid_3 = self.getStorageUUID()
# We are connected to node 1
cp.connection_dict[node_uuid_1] = None
def uuid_now(func, uuid, now):
pool.time = Mock({'time': now})
try:
return func(Mock({'getUUID': uuid}))
finally:
pool.time = time
# A connection to node 3 failed, will be forgotten at 5
uuid_now(cp.notifyFailure, node_uuid_3, 5 - pool.MAX_FAILURE_AGE)
def getCellSortKey(*args):
return uuid_now(cp.getCellSortKey, *args)
# At 0, key values are not ambiguous
self.assertTrue(getCellSortKey(node_uuid_1, 0) < getCellSortKey(
node_uuid_2, 0) < getCellSortKey(node_uuid_3, 0))
# At 10, nodes 2 and 3 have the same key value
self.assertTrue(getCellSortKey(node_uuid_1, 10) < getCellSortKey(
node_uuid_2, 10))
self.assertEqual(getCellSortKey(node_uuid_2, 10), getCellSortKey(
node_uuid_3, 10))
if __name__ == '__main__':
unittest.main()
...@@ -446,7 +446,7 @@ class ClientApplication(Node, neo.client.app.Application): ...@@ -446,7 +446,7 @@ class ClientApplication(Node, neo.client.app.Application):
def extraCellSortKey(self, key): def extraCellSortKey(self, key):
return Patch(self.cp, getCellSortKey=lambda orig, cell: return Patch(self.cp, getCellSortKey=lambda orig, cell:
(orig(cell), key(cell))) (orig(cell, lambda: key(cell)), random.random()))
class NeoCTL(neo.neoctl.app.NeoCTL): class NeoCTL(neo.neoctl.app.NeoCTL):
......
...@@ -41,7 +41,6 @@ from . import ClientApplication, ConnectionFilter, LockLock, NEOThreadedTest, \ ...@@ -41,7 +41,6 @@ from . import ClientApplication, ConnectionFilter, LockLock, NEOThreadedTest, \
RandomConflictDict, ThreadId, with_cluster RandomConflictDict, ThreadId, with_cluster
from neo.lib.util import add64, makeChecksum, p64, u64 from neo.lib.util import add64, makeChecksum, p64, u64
from neo.client.exception import NEOPrimaryMasterLost, NEOStorageError from neo.client.exception import NEOPrimaryMasterLost, NEOStorageError
from neo.client.pool import CELL_CONNECTED, CELL_GOOD
from neo.client.transactions import Transaction from neo.client.transactions import Transaction
from neo.master.handlers.client import ClientServiceHandler from neo.master.handlers.client import ClientServiceHandler
from neo.storage.handlers.client import ClientOperationHandler from neo.storage.handlers.client import ClientOperationHandler
...@@ -467,11 +466,11 @@ class Test(NEOThreadedTest): ...@@ -467,11 +466,11 @@ class Test(NEOThreadedTest):
conn = s0.getConnection() conn = s0.getConnection()
self.assertFalse(conn.isClosed()) self.assertFalse(conn.isClosed())
getCellSortKey = cluster.client.cp.getCellSortKey getCellSortKey = cluster.client.cp.getCellSortKey
self.assertEqual(getCellSortKey(s0), CELL_CONNECTED) self.assertEqual(getCellSortKey(s0, int), 0)
cluster.neoctl.dropNode(s0.getUUID()) cluster.neoctl.dropNode(s0.getUUID())
self.assertEqual([s1], cluster.client.nm.getStorageList()) self.assertEqual([s1], cluster.client.nm.getStorageList())
self.assertTrue(conn.isClosed()) self.assertTrue(conn.isClosed())
self.assertEqual(getCellSortKey(s0), CELL_GOOD) self.assertEqual(getCellSortKey(s0, int), 1)
# XXX: the test originally checked that 'unregister' method # XXX: the test originally checked that 'unregister' method
# was called (even if it's useless in this case), # was called (even if it's useless in this case),
# but we would need an API to do that easily. # but we would need an API to do that easily.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment