Commit 574b64df authored by Julien Muchembled's avatar Julien Muchembled

master: simplify code

parent eb3a3937
...@@ -559,23 +559,22 @@ class Application(BaseApplication): ...@@ -559,23 +559,22 @@ class Application(BaseApplication):
# I have received all the lock answers now: # I have received all the lock answers now:
# - send a Notify Transaction Finished to the initiated client node # - send a Notify Transaction Finished to the initiated client node
# - Invalidate Objects to the other client nodes # - Invalidate Objects to the other client nodes
ttid = txn.getTTID() ttid = txn.ttid
tid = txn.getTID() tid = txn.tid
transaction_node = txn.getNode() transaction_node = txn.node
invalidate_objects = Packets.InvalidateObjects(tid, txn.getOIDList()) invalidate_objects = Packets.InvalidateObjects(tid, txn.oid_list)
client_list = self.nm.getClientList(only_identified=True) client_list = self.nm.getClientList(only_identified=True)
for client_node in client_list: for client_node in client_list:
if client_node is transaction_node: if client_node is transaction_node:
client_node.send(Packets.AnswerTransactionFinished(ttid, tid), client_node.send(Packets.AnswerTransactionFinished(ttid, tid),
msg_id=txn.getMessageId()) msg_id=txn.msg_id)
else: else:
client_node.send(invalidate_objects) client_node.send(invalidate_objects)
# Unlock Information to relevant storage nodes. # Unlock Information to relevant storage nodes.
notify_unlock = Packets.NotifyUnlockInformation(ttid) notify_unlock = Packets.NotifyUnlockInformation(ttid)
getByUUID = self.nm.getByUUID getByUUID = self.nm.getByUUID
txn_storage_list = txn.getUUIDList() for storage_uuid in txn.involved:
for storage_uuid in txn_storage_list:
getByUUID(storage_uuid).send(notify_unlock) getByUUID(storage_uuid).send(notify_unlock)
# Notify storage nodes about new pack order if any. # Notify storage nodes about new pack order if any.
......
...@@ -27,22 +27,18 @@ class Transaction(object): ...@@ -27,22 +27,18 @@ class Transaction(object):
""" """
A pending transaction A pending transaction
""" """
_tid = None tid = None
_msg_id = None oid_list = ()
_oid_list = None failed = \
_failed = frozenset() involved = frozenset()
_prepared = False
# uuid dict hold flag to known who has locked the transaction
_uuid_set = None
_lock_wait_uuid_set = None
def __init__(self, node, storage_readiness, ttid): def __init__(self, node, storage_readiness, ttid):
""" """
Prepare the transaction, set OIDs and UUIDs related to it Prepare the transaction, set OIDs and UUIDs related to it
""" """
self._node = node self.node = node
self._storage_readiness = storage_readiness self.storage_readiness = storage_readiness
self._ttid = ttid self.ttid = ttid
self._birth = time() self._birth = time()
# store storage uuids that must be notified at commit # store storage uuids that must be notified at commit
self._notification_set = set() self._notification_set = set()
...@@ -50,57 +46,14 @@ class Transaction(object): ...@@ -50,57 +46,14 @@ class Transaction(object):
def __repr__(self): def __repr__(self):
return "<%s(client=%r, tid=%r, oids=%r, storages=%r, age=%.2fs) at %x>" % ( return "<%s(client=%r, tid=%r, oids=%r, storages=%r, age=%.2fs) at %x>" % (
self.__class__.__name__, self.__class__.__name__,
self._node, self.node,
dump(self._tid), dump(self.tid),
map(dump, self._oid_list or ()), map(dump, self.oid_list),
map(uuid_str, self._uuid_set or ()), map(uuid_str, self.involved),
time() - self._birth, time() - self._birth,
id(self), id(self),
) )
def getNode(self):
"""
Return the node that had began the transaction
"""
return self._node
def getTTID(self):
"""
Return the temporary transaction ID.
"""
return self._ttid
def getTID(self):
"""
Return the transaction ID
"""
return self._tid
def getMessageId(self):
"""
Returns the packet ID to use in the answer
"""
return self._msg_id
def getUUIDList(self):
"""
Returns the list of node's UUID that lock the transaction
"""
return list(self._uuid_set)
def getOIDList(self):
"""
Returns the list of OIDs used in the transaction
"""
return list(self._oid_list)
def isPrepared(self):
"""
Returns True if the commit has been requested by the client
"""
return self._prepared
def registerForNotification(self, uuid): def registerForNotification(self, uuid):
""" """
Register a node that requires a notification at commit Register a node that requires a notification at commit
...@@ -114,14 +67,12 @@ class Transaction(object): ...@@ -114,14 +67,12 @@ class Transaction(object):
""" """
return list(self._notification_set) return list(self._notification_set)
def prepare(self, tid, oid_list, uuid_set, msg_id): def prepare(self, tid, oid_list, involved, msg_id):
self.tid = tid
self._tid = tid self.oid_list = oid_list
self._oid_list = oid_list self.msg_id = msg_id
self._msg_id = msg_id self.involved = involved
self._uuid_set = uuid_set self.locking = involved.copy()
self._lock_wait_uuid_set = uuid_set.copy()
self._prepared = True
def storageLost(self, uuid): def storageLost(self, uuid):
""" """
...@@ -133,16 +84,17 @@ class Transaction(object): ...@@ -133,16 +84,17 @@ class Transaction(object):
# XXX: We might lose information that a storage successfully locked # XXX: We might lose information that a storage successfully locked
# data but was later found to be disconnected. This loss has no impact # data but was later found to be disconnected. This loss has no impact
# on current code, but it might be disturbing to reader or future code. # on current code, but it might be disturbing to reader or future code.
if self._prepared: if self.tid:
self._lock_wait_uuid_set.discard(uuid) locking = self.locking
self._uuid_set.discard(uuid) locking.discard(uuid)
return self.locked() self.involved.discard(uuid)
return not locking
return False return False
def clientLost(self, node): def clientLost(self, node):
if self._node is node: if self.node is node:
if self._prepared: if self.tid:
self._node = None # orphan self.node = None # orphan
else: else:
return True # abort return True # abort
else: else:
...@@ -154,14 +106,9 @@ class Transaction(object): ...@@ -154,14 +106,9 @@ class Transaction(object):
Define that a node has locked the transaction Define that a node has locked the transaction
Returns true if all nodes are locked Returns true if all nodes are locked
""" """
self._lock_wait_uuid_set.remove(uuid) locking = self.locking
return self.locked() locking.remove(uuid)
return not locking
def locked(self):
"""
Returns true if all nodes are locked
"""
return not self._lock_wait_uuid_set
class TransactionManager(EventQueue): class TransactionManager(EventQueue):
...@@ -302,7 +249,7 @@ class TransactionManager(EventQueue): ...@@ -302,7 +249,7 @@ class TransactionManager(EventQueue):
txn = self[ttid] txn = self[ttid]
# The client does not know which nodes are not expected to have # The client does not know which nodes are not expected to have
# transactions in full. Let's filter out them. # transactions in full. Let's filter out them.
failed = app.getStorageReadySet(txn._storage_readiness) failed = app.getStorageReadySet(txn.storage_readiness)
failed.intersection_update(uuid_list) failed.intersection_update(uuid_list)
if failed: if failed:
operational = app.pt.operational operational = app.pt.operational
...@@ -312,16 +259,15 @@ class TransactionManager(EventQueue): ...@@ -312,16 +259,15 @@ class TransactionManager(EventQueue):
return False return False
all_failed = failed.copy() all_failed = failed.copy()
for t in self._ttid_dict.itervalues(): for t in self._ttid_dict.itervalues():
all_failed |= t._failed all_failed |= t.failed
if not operational(all_failed): if not operational(all_failed):
# Other transactions were voted and unless they're aborted, # Other transactions were voted and unless they're aborted,
# we won't be able to finish this one, because that would make # we won't be able to finish this one, because that would make
# the cluster non-operational. Let's tell the caller to retry # the cluster non-operational. Let's retry later.
# later.
raise DelayEvent raise DelayEvent
# Allow the client to finish the transaction, # Allow the client to finish the transaction,
# even if this will disconnect storage nodes. # even if this will disconnect storage nodes.
txn._failed = failed txn.failed = failed
return True return True
def prepare(self, app, ttid, oid_list, checked_list, msg_id): def prepare(self, app, ttid, oid_list, checked_list, msg_id):
...@@ -331,22 +277,22 @@ class TransactionManager(EventQueue): ...@@ -331,22 +277,22 @@ class TransactionManager(EventQueue):
txn = self[ttid] txn = self[ttid]
pt = app.pt pt = app.pt
failed = txn._failed failed = txn.failed
if failed and not pt.operational(failed): if failed and not pt.operational(failed):
return None, None return None, None
ready = app.getStorageReadySet(txn._storage_readiness) ready = app.getStorageReadySet(txn.storage_readiness)
getPartition = pt.getPartition getPartition = pt.getPartition
partition_set = set(map(getPartition, oid_list)) partition_set = set(map(getPartition, oid_list))
partition_set.update(map(getPartition, checked_list)) partition_set.update(map(getPartition, checked_list))
partition_set.add(getPartition(ttid)) partition_set.add(getPartition(ttid))
node_list = [] node_list = []
uuid_set = set() involved = set()
for partition in partition_set: for partition in partition_set:
for cell in pt.getCellList(partition): for cell in pt.getCellList(partition):
node = cell.getNode() node = cell.getNode()
if node.isIdentified(): if node.isIdentified():
uuid = node.getUUID() uuid = node.getUUID()
if uuid in uuid_set: if uuid in involved:
continue continue
if uuid in failed: if uuid in failed:
# This will commit a new PT with outdated cells before # This will commit a new PT with outdated cells before
...@@ -354,7 +300,7 @@ class TransactionManager(EventQueue): ...@@ -354,7 +300,7 @@ class TransactionManager(EventQueue):
# the verification phase. # the verification phase.
node.getConnection().close() node.getConnection().close()
elif uuid in ready: elif uuid in ready:
uuid_set.add(uuid) involved.add(uuid)
node_list.append(node) node_list.append(node)
# A node that was not ready at the beginning of the transaction # A node that was not ready at the beginning of the transaction
# can't have readable cells. And if we're still operational without # can't have readable cells. And if we're still operational without
...@@ -369,8 +315,8 @@ class TransactionManager(EventQueue): ...@@ -369,8 +315,8 @@ class TransactionManager(EventQueue):
tid = self._nextTID(ttid, pt.getPartitions()) tid = self._nextTID(ttid, pt.getPartitions())
self._queue.append(ttid) self._queue.append(ttid)
logging.debug('Finish TXN %s for %s (was %s)', logging.debug('Finish TXN %s for %s (was %s)',
dump(tid), txn.getNode(), dump(ttid)) dump(tid), txn.node, dump(ttid))
txn.prepare(tid, oid_list, uuid_set, msg_id) txn.prepare(tid, oid_list, involved, msg_id)
# check if greater and foreign OID was stored # check if greater and foreign OID was stored
if oid_list: if oid_list:
self.setLastOID(max(oid_list)) self.setLastOID(max(oid_list))
...@@ -382,7 +328,7 @@ class TransactionManager(EventQueue): ...@@ -382,7 +328,7 @@ class TransactionManager(EventQueue):
""" """
logging.debug('Abort TXN %s for %s', dump(ttid), uuid_str(uuid)) logging.debug('Abort TXN %s for %s', dump(ttid), uuid_str(uuid))
txn = self[ttid] txn = self[ttid]
if txn.isPrepared(): if txn.tid:
raise ProtocolError("commit already requested for ttid %s" raise ProtocolError("commit already requested for ttid %s"
% dump(ttid)) % dump(ttid))
del self[ttid] del self[ttid]
...@@ -424,7 +370,7 @@ class TransactionManager(EventQueue): ...@@ -424,7 +370,7 @@ class TransactionManager(EventQueue):
while queue: while queue:
ttid = queue[0] ttid = queue[0]
txn = self._ttid_dict[ttid] txn = self._ttid_dict[ttid]
if not txn.locked(): if txn.locking:
break break
del queue[0], self._ttid_dict[ttid] del queue[0], self._ttid_dict[ttid]
self._on_commit(txn) self._on_commit(txn)
...@@ -433,7 +379,7 @@ class TransactionManager(EventQueue): ...@@ -433,7 +379,7 @@ class TransactionManager(EventQueue):
def clientLost(self, node): def clientLost(self, node):
for txn in self._ttid_dict.values(): for txn in self._ttid_dict.values():
if txn.clientLost(node): if txn.clientLost(node):
tid = txn.getTTID() tid = txn.ttid
del self[tid] del self[tid]
yield tid, txn.getNotificationUUIDList() yield tid, txn.getNotificationUUIDList()
......
...@@ -220,7 +220,7 @@ class ReplicationTests(NEOThreadedTest): ...@@ -220,7 +220,7 @@ class ReplicationTests(NEOThreadedTest):
counts[0] += 1 counts[0] += 1
if counts[0] > 1: if counts[0] > 1:
node_list = orig.im_self.nm.getClientList(only_identified=True) node_list = orig.im_self.nm.getClientList(only_identified=True)
node_list.remove(txn.getNode()) node_list.remove(txn.node)
node_list[0].getConnection().close() node_list[0].getConnection().close()
return orig(txn) return orig(txn)
with NEOCluster(partitions=np, replicas=0, storage_count=1) as upstream: with NEOCluster(partitions=np, replicas=0, storage_count=1) as upstream:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment