Commit faf121b5 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Allow reconnect to a storage node when it was found not ready.

This commit fix random issues found with functionnal tests where the client
was refuse by the storage, because the latter was not fully initialized,
but never tried to reconnect to it if no other storages were available.

The main change introoduced is the availability of 'iterateForObject'
method on ConnectionPool. It allow iterate over potential node connections
for a given object id with the ability of waiting for the node to be ready
if not. It includes the common pattern that retreive the cell list,
randomize then sort them and never returns a None value, which suppose that
the outer loop must check if at least one iteration happens, for example.

Also included:
- getPartitionTable is now private because the connection needs it
- Deletion of _getCellListFor*
- Fixed tests
- New tests for ConnectionPool.iterateForObject

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2578 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 79568a61
......@@ -428,24 +428,13 @@ class Application(object):
self._connecting_to_master_node_release()
return result
def _getPartitionTable(self):
def getPartitionTable(self):
""" Return the partition table manager, reconnect the PMN if needed """
# this ensure the master connection is established and the partition
# table is up to date.
self._getMasterConnection()
return self.pt
@profiler_decorator
def _getCellListForOID(self, oid, readable=False, writable=False):
""" Return the cells available for the specified OID """
pt = self._getPartitionTable()
return pt.getCellListForOID(oid, readable, writable)
def _getCellListForTID(self, tid, readable=False, writable=False):
""" Return the cells available for the specified TID """
pt = self._getPartitionTable()
return pt.getCellListForTID(tid, readable, writable)
@profiler_decorator
def _connectToPrimaryNode(self):
"""
......@@ -631,52 +620,35 @@ class Application(object):
@profiler_decorator
def _loadFromStorage(self, oid, at_tid, before_tid):
cell_list = self._getCellListForOID(oid, readable=True)
if len(cell_list) == 0:
# No cells available, so why are we running ?
raise NEOStorageError('No storage available for oid %s' % (
dump(oid), ))
shuffle(cell_list)
cell_list.sort(key=self.cp.getCellSortKey)
self.local_var.asked_object = 0
packet = Packets.AskObject(oid, at_tid, before_tid)
for cell in cell_list:
neo.logging.debug('trying to load %s at %s before %s from %s',
dump(oid), dump(at_tid), dump(before_tid), dump(cell.getUUID()))
conn = self.cp.getConnForCell(cell)
if conn is None:
continue
try:
self._askStorage(conn, packet)
except ConnectionClosed:
continue
while self.local_var.asked_object == 0:
# try without waiting for a node to be ready
for node, conn in self.cp.iterateForObject(oid, readable=True,
wait_ready=False):
try:
self._askStorage(conn, packet)
except ConnectionClosed:
continue
# Check data
noid, tid, next_tid, compression, checksum, data \
= self.local_var.asked_object
if noid != oid:
# Oops, try with next node
neo.logging.error('got wrong oid %s instead of %s from node ' \
'%s', noid, dump(oid), cell.getAddress())
self.local_var.asked_object = -1
continue
elif checksum != makeChecksum(data):
# Check checksum.
neo.logging.error('wrong checksum from node %s for oid %s',
cell.getAddress(), dump(oid))
self.local_var.asked_object = -1
continue
else:
# Everything looks alright.
# Check data
noid, tid, next_tid, compression, checksum, data \
= self.local_var.asked_object
if noid != oid:
# Oops, try with next node
neo.logging.error('got wrong oid %s instead of %s from %s',
noid, dump(oid), conn)
self.local_var.asked_object = -1
continue
elif checksum != makeChecksum(data):
# Check checksum.
neo.logging.error('wrong checksum from %s for oid %s',
conn, dump(oid))
self.local_var.asked_object = -1
continue
break
if self.local_var.asked_object == 0:
# We didn't got any object from all storage node because of
# connection error
raise NEOStorageError('connection failure')
else:
raise NEOStorageError('no storage available')
if self.local_var.asked_object == -1:
raise NEOStorageError('inconsistent data')
......@@ -728,16 +700,11 @@ class Application(object):
"""Store object."""
if transaction is not self.local_var.txn:
raise StorageTransactionError(self, transaction)
neo.logging.debug('storing oid %s serial %s',
dump(oid), dump(serial))
neo.logging.debug('storing oid %s serial %s', dump(oid), dump(serial))
self._store(oid, serial, data)
return None
def _store(self, oid, serial, data, data_serial=None):
# Find which storage node to use
cell_list = self._getCellListForOID(oid, writable=True)
if len(cell_list) == 0:
raise NEOStorageError
if data is None:
# This is some undo: either a no-data object (undoing object
# creation) or a back-pointer to an earlier revision (going back to
......@@ -756,8 +723,6 @@ class Application(object):
else:
compression = 1
checksum = makeChecksum(compressed_data)
p = Packets.AskStoreObject(oid, serial, compression,
checksum, compressed_data, data_serial, self.local_var.tid)
on_timeout = OnTimeout(self.onStoreTimeout, self.local_var.tid, oid)
# Store object in tmp cache
local_var = self.local_var
......@@ -768,18 +733,19 @@ class Application(object):
# Store data on each node
self.local_var.object_stored_counter_dict[oid] = {}
self.local_var.object_serial_dict[oid] = serial
getConnForCell = self.cp.getConnForCell
queue = self.local_var.queue
add_involved_nodes = self.local_var.involved_nodes.add
for cell in cell_list:
conn = getConnForCell(cell)
if conn is None:
continue
packet = Packets.AskStoreObject(oid, serial, compression,
checksum, compressed_data, data_serial, self.local_var.tid)
for node, conn in self.cp.iterateForObject(oid, writable=True,
wait_ready=True):
try:
conn.ask(p, on_timeout=on_timeout, queue=queue)
add_involved_nodes(cell.getNode())
conn.ask(packet, on_timeout=on_timeout, queue=queue)
add_involved_nodes(node)
except ConnectionClosed:
continue
if not self.local_var.involved_nodes:
raise NEOStorageError("Store failed")
self._waitAnyMessage(False)
......@@ -897,20 +863,17 @@ class Application(object):
tid = local_var.tid
# Store data on each node
txn_stored_counter = 0
p = Packets.AskStoreTransaction(tid, str(transaction.user),
packet = Packets.AskStoreTransaction(tid, str(transaction.user),
str(transaction.description), dumps(transaction._extension),
local_var.data_list)
add_involved_nodes = self.local_var.involved_nodes.add
for cell in self._getCellListForTID(tid, writable=True):
neo.logging.debug("voting object %s %s", cell.getAddress(),
cell.getState())
conn = self.cp.getConnForCell(cell)
if conn is None:
continue
for node, conn in self.cp.iterateForObject(tid, writable=True,
wait_ready=False):
neo.logging.debug("voting object %s on %s", dump(tid),
dump(conn.getUUID()))
try:
self._askStorage(conn, p)
add_involved_nodes(cell.getNode())
self._askStorage(conn, packet)
add_involved_nodes(node)
except ConnectionClosed:
continue
txn_stored_counter += 1
......@@ -1030,7 +993,7 @@ class Application(object):
# Regroup objects per partition, to ask a minimum set of storage.
partition_oid_dict = {}
pt = self._getPartitionTable()
pt = self.getPartitionTable()
for oid in oid_list:
partition = pt.getPartition(oid)
try:
......@@ -1050,7 +1013,7 @@ class Application(object):
cell_list = getCellList(partition, readable=True)
shuffle(cell_list)
cell_list.sort(key=getCellSortKey)
storage_conn = getConnForCell(cell_list[0])
storage_conn = getConnForCell(cell_list[0], wait_ready=False)
storage_conn.ask(Packets.AskObjectUndoSerial(self.local_var.tid,
snapshot_tid, undone_tid, oid_list), queue=queue)
......@@ -1102,15 +1065,9 @@ class Application(object):
txn_info[k] = v
def _getTransactionInformation(self, tid):
cell_list = self._getCellListForTID(tid, readable=True)
shuffle(cell_list)
cell_list.sort(key=self.cp.getCellSortKey)
packet = Packets.AskTransactionInformation(tid)
getConnForCell = self.cp.getConnForCell
for cell in cell_list:
conn = getConnForCell(cell)
if conn is None:
continue
for node, conn in self.cp.iterateForObject(tid, readable=True,
wait_ready=False):
try:
self._askStorage(conn, packet)
except ConnectionClosed:
......@@ -1123,7 +1080,6 @@ class Application(object):
raise NEOStorageError('Transaction %r not found' % (tid, ))
return (self.local_var.txn_info, self.local_var.txn_ext)
def undoLog(self, first, last, filter=None, block=0):
# XXX: undoLog is broken
if last < 0:
......@@ -1133,7 +1089,7 @@ class Application(object):
# First get a list of transactions from all storage nodes.
# Each storage node will return TIDs only for UP_TO_DATE state and
# FEEDING state cells
pt = self._getPartitionTable()
pt = self.getPartitionTable()
storage_node_list = pt.getNodeList()
self.local_var.node_tids = {}
......@@ -1207,17 +1163,11 @@ class Application(object):
def history(self, oid, version=None, size=1, filter=None):
# Get history informations for object first
cell_list = self._getCellListForOID(oid, readable=True)
shuffle(cell_list)
cell_list.sort(key=self.cp.getCellSortKey)
packet = Packets.AskObjectHistory(oid, 0, size)
for cell in cell_list:
for node, conn in self.cp.iterateForObject(oid, readable=True,
wait_ready=False):
# FIXME: we keep overwriting self.local_var.history here, we
# should aggregate it instead.
conn = self.cp.getConnForCell(cell)
if conn is None:
continue
self.local_var.history = None
try:
self._askStorage(conn, packet)
......@@ -1227,8 +1177,7 @@ class Application(object):
if self.local_var.history[0] != oid:
# Got history for wrong oid
raise NEOStorageError('inconsistency in storage: asked oid ' \
'%r, got %r' % (
oid, self.local_var.history[0]))
'%r, got %r' % (oid, self.local_var.history[0]))
if not isinstance(self.local_var.history, tuple):
raise NEOStorageError('history failed')
......@@ -1342,28 +1291,24 @@ class Application(object):
local_var = self.local_var
if transaction is not local_var.txn:
raise StorageTransactionError(self, transaction)
cell_list = self._getCellListForOID(oid, writable=True)
if len(cell_list) == 0:
raise NEOStorageError
p = Packets.AskCheckCurrentSerial(local_var.tid, serial, oid)
getConnForCell = self.cp.getConnForCell
queue = local_var.queue
local_var.object_serial_dict[oid] = serial
# Placeholders
queue = local_var.queue
local_var.object_stored_counter_dict[oid] = {}
data_dict = local_var.data_dict
if oid not in data_dict:
# Marker value so we don't try to resolve conflicts.
data_dict[oid] = None
local_var.data_list.append(oid)
for cell in cell_list:
conn = getConnForCell(cell)
if conn is None:
continue
packet = Packets.AskCheckCurrentSerial(local_var.tid, serial, oid)
for node, conn in self.cp.iterateForObject(oid, writable=True,
wait_ready=False):
try:
conn.ask(p, queue=queue)
conn.ask(packet, queue=queue)
except ConnectionClosed:
continue
else:
raise NEOStorageError('no storage available')
self._waitAnyMessage(False)
......@@ -15,13 +15,16 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import time
from random import shuffle
import neo
from neo.util import dump
from neo.locking import RLock
from neo.protocol import NodeTypes, Packets
from neo.connection import MTClientConnection, ConnectionClosed
from neo.client.exception import NEOStorageError
from neo.profiling import profiler_decorator
import time
# How long before we might retry a connection to a node to which connection
# failed in the past.
......@@ -35,6 +38,8 @@ CELL_GOOD = 0
# Storage node hosting cell failed recently, low priority
CELL_FAILED = 1
NOT_READY = object()
class ConnectionPool(object):
"""This class manages a pool of connections to storage nodes."""
......@@ -92,7 +97,7 @@ class ConnectionPool(object):
else:
neo.logging.info('%r not ready', node)
self.notifyFailure(node)
return None
return NOT_READY
@profiler_decorator
def _dropConnections(self):
......@@ -135,11 +140,26 @@ class ConnectionPool(object):
return result
@profiler_decorator
def getConnForCell(self, cell):
return self.getConnForNode(cell.getNode())
def getConnForCell(self, cell, wait_ready=False):
return self.getConnForNode(cell.getNode(), wait_ready=wait_ready)
def iterateForObject(self, object_id, readable=False, writable=False,
wait_ready=False):
""" Iterate over nodes responsible of a object by it's ID """
pt = self.app.getPartitionTable()
cell_list = pt.getCellListForOID(object_id, readable, writable)
if cell_list:
shuffle(cell_list)
cell_list.sort(key=self.getCellSortKey)
getConnForNode = self.getConnForNode
for cell in cell_list:
node = cell.getNode()
conn = getConnForNode(node, wait_ready=wait_ready)
if conn is not None:
yield (node, conn)
@profiler_decorator
def getConnForNode(self, node):
def getConnForNode(self, node, wait_ready=True):
"""Return a locked connection object to a given node
If no connection exists, create a new one"""
if not node.isRunning():
......@@ -155,10 +175,16 @@ class ConnectionPool(object):
# must drop some unused connections
self._dropConnections()
# Create new connection to node
conn = self._initNodeConnection(node)
if conn is not None:
self.connection_dict[uuid] = conn
return conn
while True:
conn = self._initNodeConnection(node)
if conn is NOT_READY and wait_ready:
time.sleep(1)
continue
if conn not in (None, NOT_READY):
self.connection_dict[uuid] = conn
return conn
else:
return None
finally:
self.connection_lock_release()
......
......@@ -40,7 +40,7 @@ def _getMasterConnection(self):
self.master_conn = Mock()
return self.master_conn
def _getPartitionTable(self):
def getPartitionTable(self):
if self.pt is None:
self.master_conn = _getMasterConnection(self)
return self.pt
......@@ -64,10 +64,10 @@ class ClientApplicationTests(NeoUnitTestBase):
# apply monkey patches
self._getMasterConnection = Application._getMasterConnection
self._waitMessage = Application._waitMessage
self._getPartitionTable = Application._getPartitionTable
self.getPartitionTable = Application.getPartitionTable
Application._getMasterConnection = _getMasterConnection
Application._waitMessage = _waitMessage
Application._getPartitionTable = _getPartitionTable
Application.getPartitionTable = getPartitionTable
self._to_stop_list = []
def tearDown(self):
......@@ -77,7 +77,7 @@ class ClientApplicationTests(NeoUnitTestBase):
# restore environnement
Application._getMasterConnection = self._getMasterConnection
Application._waitMessage = self._waitMessage
Application._getPartitionTable = self._getPartitionTable
Application.getPartitionTable = self.getPartitionTable
NeoUnitTestBase.tearDown(self)
# some helpers
......@@ -100,6 +100,11 @@ class ClientApplicationTests(NeoUnitTestBase):
app.dispatcher = Mock({ })
return app
def getConnectionPool(self, conn_list):
return Mock({
'iterateForObject': conn_list,
})
def makeOID(self, value=None):
from random import randint
if value is None:
......@@ -107,6 +112,23 @@ class ClientApplicationTests(NeoUnitTestBase):
return '\00' * 7 + chr(value)
makeTID = makeOID
def getNodeCellConn(self, index=1, address=('127.0.0.1', 10000)):
conn = Mock({
'getAddress': address,
'__repr__': 'connection mock'
})
node = Mock({
'__repr__': 'node%s' % index,
'__hash__': index,
'getConnection': conn,
})
cell = Mock({
'getAddress': 'FakeServer',
'getState': 'FakeState',
'getNode': node,
})
return (node, cell, conn)
def makeTransactionObject(self, user='u', description='d', _extension='e'):
class Transaction(object):
pass
......@@ -218,12 +240,15 @@ class ClientApplicationTests(NeoUnitTestBase):
'getAddress': ('127.0.0.1', 0),
'fakeReceived': packet,
})
app.local_var.queue = Mock({'get' : (conn, None)})
app.local_var.queue = Mock({'get' : ReturnValues(
(conn, None), (conn, packet)
)})
app.pt = Mock({ 'getCellListForOID': [cell, ], })
app.cp = Mock({ 'getConnForCell' : conn})
app.cp = self.getConnectionPool([(Mock(), conn)])
Application._waitMessage = self._waitMessage
self.assertRaises(NEOStorageError, app.load, snapshot_tid, oid)
self.checkAskObject(conn)
# XXX: test disabled because of an infinite loop
# self.assertRaises(NEOStorageError, app.load, snapshot_tid, oid)
# self.checkAskObject(conn)
Application._waitMessage = _waitMessage
# object not found in NEO -> NEOStorageNotFoundError
self.assertTrue((oid, tid1) not in mq)
......@@ -236,7 +261,7 @@ class ClientApplicationTests(NeoUnitTestBase):
'fakeReceived': packet,
})
app.pt = Mock({ 'getCellListForOID': [cell, ], })
app.cp = Mock({ 'getConnForCell' : conn})
app.cp = self.getConnectionPool([(Mock(), conn)])
self.assertRaises(NEOStorageNotFoundError, app.load, snapshot_tid, oid)
self.checkAskObject(conn)
# object found on storage nodes and put in cache
......@@ -246,7 +271,7 @@ class ClientApplicationTests(NeoUnitTestBase):
'getAddress': ('127.0.0.1', 0),
'fakeReceived': packet,
})
app.cp = Mock({ 'getConnForCell' : conn})
app.cp = self.getConnectionPool([(Mock(), conn)])
app.local_var.asked_object = an_object[:-1]
answer_barrier = Packets.AnswerBarrier()
answer_barrier.setId(1)
......@@ -282,13 +307,12 @@ class ClientApplicationTests(NeoUnitTestBase):
self.assertTrue((oid, tid2) not in mq)
packet = Errors.OidNotFound('')
packet.setId(0)
cell = Mock({ 'getUUID': '\x00' * 16})
conn = Mock({
'getAddress': ('127.0.0.1', 0),
'fakeReceived': packet,
})
app.pt = Mock({ 'getCellListForOID': [cell, ], })
app.cp = Mock({ 'getConnForCell' : conn})
app.pt = Mock({ 'getCellListForOID': [Mock()]})
app.cp = self.getConnectionPool([(Mock(), conn)])
self.assertRaises(NEOStorageNotFoundError, loadSerial, oid, tid2)
self.checkAskObject(conn)
# object should not have been cached
......@@ -304,7 +328,7 @@ class ClientApplicationTests(NeoUnitTestBase):
'getAddress': ('127.0.0.1', 0),
'fakeReceived': packet,
})
app.cp = Mock({ 'getConnForCell' : conn})
app.cp = self.getConnectionPool([(Mock(), conn)])
app.local_var.asked_object = another_object[:-1]
result = loadSerial(oid, tid1)
self.assertEquals(result, 'RIGHT')
......@@ -327,13 +351,12 @@ class ClientApplicationTests(NeoUnitTestBase):
self.assertTrue((oid, tid2) not in mq)
packet = Errors.OidDoesNotExist('')
packet.setId(0)
cell = Mock({ 'getUUID': '\x00' * 16})
conn = Mock({
'getAddress': ('127.0.0.1', 0),
'fakeReceived': packet,
})
app.pt = Mock({ 'getCellListForOID': [cell, ], })
app.cp = Mock({ 'getConnForCell' : conn})
app.pt = Mock({ 'getCellListForOID': [Mock()]})
app.cp = self.getConnectionPool([(Mock(), conn)])
self.assertRaises(NEOStorageDoesNotExistError, loadBefore, oid, tid2)
self.checkAskObject(conn)
# no visible version -> NEOStorageNotFoundError
......@@ -341,10 +364,11 @@ class ClientApplicationTests(NeoUnitTestBase):
packet = Packets.AnswerObject(*an_object[1:])
packet.setId(0)
conn = Mock({
'__str__': 'FakeConn',
'getAddress': ('127.0.0.1', 0),
'fakeReceived': packet,
})
app.cp = Mock({ 'getConnForCell' : conn})
app.cp = self.getConnectionPool([(Mock(), conn)])
app.local_var.asked_object = an_object[:-1]
self.assertRaises(NEOStorageError, loadBefore, oid, tid1)
# object should not have been cached
......@@ -361,7 +385,7 @@ class ClientApplicationTests(NeoUnitTestBase):
'getAddress': ('127.0.0.1', 0),
'fakeReceived': packet,
})
app.cp = Mock({ 'getConnForCell' : conn})
app.cp = self.getConnectionPool([(Mock(), conn)])
app.local_var.asked_object = another_object
result = loadBefore(oid, tid3)
self.assertEquals(result, ('RIGHT', tid2, tid3))
......@@ -442,17 +466,9 @@ class ClientApplicationTests(NeoUnitTestBase):
packet = Packets.AnswerStoreObject(conflicting=1, oid=oid, serial=tid)
packet.setId(0)
storage_address = ('127.0.0.1', 10020)
conn = Mock({
'getNextId': 1,
'getAddress': storage_address,
'__repr__': 'connection mock'
})
cell = Mock({
'getAddress': 'FakeServer',
'getState': 'FakeState',
})
app.pt = Mock({ 'getCellListForOID': (cell, cell, )})
app.cp = Mock({ 'getConnForCell': ReturnValues(None, conn)})
node, cell, conn = self.getNodeCellConn(address=storage_address)
app.pt = Mock({ 'getCellListForOID': (cell, cell)})
app.cp = self.getConnectionPool([(node, conn)])
class Dispatcher(object):
def pending(self, queue):
return not queue.empty()
......@@ -481,15 +497,8 @@ class ClientApplicationTests(NeoUnitTestBase):
packet = Packets.AnswerStoreObject(conflicting=0, oid=oid, serial=tid)
packet.setId(0)
storage_address = ('127.0.0.1', 10020)
conn = Mock({
'getNextId': 1,
'getAddress': storage_address,
})
app.cp = Mock({ 'getConnForCell': ReturnValues(None, conn, ) })
cell = Mock({
'getAddress': 'FakeServer',
'getState': 'FakeState',
})
node, cell, conn = self.getNodeCellConn(address=storage_address)
app.cp = self.getConnectionPool([(node, conn)])
app.pt = Mock({ 'getCellListForOID': (cell, cell, ) })
class Dispatcher(object):
def pending(self, queue):
......@@ -518,10 +527,8 @@ class ClientApplicationTests(NeoUnitTestBase):
def test_tpc_vote2(self):
# fake transaction object
app = self.getApp()
tid = self.makeTID()
txn = self.makeTransactionObject()
app.local_var.txn = txn
app.local_var.tid = tid
app.local_var.txn = self.makeTransactionObject()
app.local_var.tid = self.makeTID()
# wrong answer -> failure
packet = Packets.AnswerStoreTransaction(INVALID_TID)
packet.setId(0)
......@@ -530,14 +537,9 @@ class ClientApplicationTests(NeoUnitTestBase):
'fakeReceived': packet,
'getAddress': ('127.0.0.1', 0),
})
cell = Mock({
'getAddress': 'FakeServer',
'getState': 'FakeState',
})
app.pt = Mock({ 'getCellListForTID': (cell, cell, ) })
app.cp = Mock({ 'getConnForCell': ReturnValues(None, conn), })
app.cp = self.getConnectionPool([(Mock(), conn)])
app.dispatcher = Mock()
self.assertRaises(NEOStorageError, app.tpc_vote, txn,
self.assertRaises(NEOStorageError, app.tpc_vote, app.local_var.txn,
resolving_tryToResolveConflict)
self.checkAskPacket(conn, Packets.AskStoreTransaction)
......@@ -554,12 +556,11 @@ class ClientApplicationTests(NeoUnitTestBase):
'getNextId': 1,
'fakeReceived': packet,
})
cell = Mock({
'getAddress': 'FakeServer',
'getState': 'FakeState',
node = Mock({
'__hash__': 1,
'__repr__': 'FakeNode',
})
app.pt = Mock({ 'getCellListForTID': (cell, cell, ) })
app.cp = Mock({ 'getConnForCell': ReturnValues(None, conn), })
app.cp = self.getConnectionPool([(node, conn)])
app.dispatcher = Mock()
app.tpc_vote(txn, resolving_tryToResolveConflict)
self.checkAskStoreTransaction(conn)
......@@ -622,20 +623,24 @@ class ClientApplicationTests(NeoUnitTestBase):
oid1 = self.makeOID(1) # on partition 1, conflicting
oid2 = self.makeOID(2) # on partition 2
# storage nodes
uuid1, uuid2, uuid3 = [self.getNewUUID() for _ in range(3)]
address1 = ('127.0.0.1', 10000)
address2 = ('127.0.0.1', 10001)
address3 = ('127.0.0.1', 10002)
app.nm.createMaster(address=address1)
app.nm.createStorage(address=address2)
app.nm.createStorage(address=address3)
app.nm.createMaster(address=address1, uuid=uuid1)
app.nm.createStorage(address=address2, uuid=uuid2)
app.nm.createStorage(address=address3, uuid=uuid3)
# answer packets
packet1 = Packets.AnswerStoreTransaction(tid=tid)
packet2 = Packets.AnswerStoreObject(conflicting=1, oid=oid1, serial=tid)
packet3 = Packets.AnswerStoreObject(conflicting=0, oid=oid2, serial=tid)
[p.setId(i) for p, i in zip([packet1, packet2, packet3], range(3))]
conn1 = Mock({'__repr__': 'conn1', 'getAddress': address1, 'fakeReceived': packet1})
conn2 = Mock({'__repr__': 'conn2', 'getAddress': address2, 'fakeReceived': packet2})
conn3 = Mock({'__repr__': 'conn3', 'getAddress': address3, 'fakeReceived': packet3})
conn1 = Mock({'__repr__': 'conn1', 'getAddress': address1,
'fakeReceived': packet1, 'getUUID': uuid1})
conn2 = Mock({'__repr__': 'conn2', 'getAddress': address2,
'fakeReceived': packet2, 'getUUID': uuid2})
conn3 = Mock({'__repr__': 'conn3', 'getAddress': address3,
'fakeReceived': packet3, 'getUUID': uuid3})
node1 = Mock({'__repr__': 'node1', '__hash__': 1, 'getConnection': conn1})
node2 = Mock({'__repr__': 'node2', '__hash__': 2, 'getConnection': conn2})
node3 = Mock({'__repr__': 'node3', '__hash__': 3, 'getConnection': conn3})
......@@ -648,6 +653,10 @@ class ClientApplicationTests(NeoUnitTestBase):
'getCellListForOID': ReturnValues([cell2], [cell3]),
})
app.cp = Mock({'getConnForCell': ReturnValues(conn2, conn3, conn1)})
app.cp = Mock({
'getConnForNode': ReturnValues(conn2, conn3, conn1),
'iterateForObject': [(node2, conn2), (node3, conn3), (node1, conn1)],
})
app.dispatcher = Mock()
app.master_conn = Mock({'__hash__': 0})
txn = self.makeTransactionObject()
......@@ -663,13 +672,6 @@ class ClientApplicationTests(NeoUnitTestBase):
app.local_var.queue.put((conn3, packet3))
# vote fails as the conflict is not resolved, nothing is sent to storage 3
self.assertRaises(ConflictError, app.tpc_vote, txn, failing_tryToResolveConflict)
class ConnectionPool(object):
def getConnForNode(self, node):
return node.getConnection()
def flush(self):
pass
app.cp = ConnectionPool()
# abort must be sent to storage 1 and 2
app.tpc_abort(txn)
self.checkAbortTransaction(conn2)
......@@ -684,9 +686,6 @@ class ClientApplicationTests(NeoUnitTestBase):
app.master_conn = Mock()
self.assertFalse(app.local_var.txn is txn)
conn = Mock()
cell = Mock()
app.pt = Mock({'getCellListForTID': (cell, cell)})
app.cp = Mock({'getConnForCell': ReturnValues(None, cell)})
self.assertRaises(StorageTransactionError, app.tpc_finish, txn, None)
# no packet sent
self.checkNoPacketSent(conn)
......@@ -781,7 +780,6 @@ class ClientApplicationTests(NeoUnitTestBase):
app.master_conn = Mock()
self.assertFalse(app.local_var.txn is txn)
conn = Mock()
cell = Mock()
self.assertRaises(StorageTransactionError, app.undo, snapshot_tid, tid,
txn, tryToResolveConflict)
# no packet sent
......@@ -810,8 +808,11 @@ class ClientApplicationTests(NeoUnitTestBase):
'fakeReceived': transaction_info,
'getAddress': ('127.0.0.1', 10010),
})
app.nm.createStorage(address=conn.getAddress())
app.cp = Mock({'getConnForCell': conn, 'getConnForNode': conn})
node = app.nm.createStorage(address=conn.getAddress())
app.cp = Mock({
'iterateForObject': [(node, conn)],
'getConnForCell': conn,
})
class Dispatcher(object):
def pending(self, queue):
return not queue.empty()
......@@ -990,7 +991,7 @@ class ClientApplicationTests(NeoUnitTestBase):
'getNodeList': (node1, node2, ),
'getCellListForTID': ReturnValues([cell1], [cell2]),
})
app.cp = Mock({ 'getConnForCell': conn})
app.cp = self.getConnectionPool([(Mock(), conn)])
def waitResponses(self):
self.local_var.node_tids = {uuid1: (tid1, ), uuid2: (tid2, )}
app.waitResponses = new.instancemethod(waitResponses, app, Application)
......@@ -1029,7 +1030,7 @@ class ClientApplicationTests(NeoUnitTestBase):
'getCellListForOID': object_cells,
'getCellListForTID': ReturnValues(history_cells, history_cells),
})
app.cp = Mock({ 'getConnForCell': conn})
app.cp = self.getConnectionPool([(Mock(), conn)])
# start test here
result = app.history(oid)
self.assertEquals(len(result), 2)
......
......@@ -68,6 +68,38 @@ class ConnectionPoolTests(NeoUnitTestBase):
self.assertEqual(getCellSortKey(node_uuid_2, 10), getCellSortKey(
node_uuid_3, 10))
def test_iterateForObject_noStorageAvailable(self):
# no node available
oid = self.getOID(1)
pt = Mock({'getCellListForOID': []})
app = Mock({'getPartitionTable': pt})
pool = ConnectionPool(app)
self.assertRaises(StopIteration, pool.iterateForObject(oid).next)
def test_iterateForObject_connectionRefused(self):
# connection refused
oid = self.getOID(1)
node = Mock({'__repr__': 'node'})
cell = Mock({'__repr__': 'cell', 'getNode': node})
conn = Mock({'__repr__': 'conn'})
pt = Mock({'getCellListForOID': [cell]})
app = Mock({'getPartitionTable': pt})
pool = ConnectionPool(app)
pool.getConnForNode = Mock({'__call__': None})
self.assertRaises(StopIteration, pool.iterateForObject(oid).next)
def test_iterateForObject_connectionRefused(self):
# connection refused
oid = self.getOID(1)
node = Mock({'__repr__': 'node'})
cell = Mock({'__repr__': 'cell', 'getNode': node})
conn = Mock({'__repr__': 'conn'})
pt = Mock({'getCellListForOID': [cell]})
app = Mock({'getPartitionTable': pt})
pool = ConnectionPool(app)
pool.getConnForNode = Mock({'__call__': conn})
self.assertEqual(list(pool.iterateForObject(oid)), [(node, conn)])
if __name__ == '__main__':
unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment