Commit ed966e80 authored by Julien Muchembled's avatar Julien Muchembled

storage: fix possible KeyError when notifying about replicated partitions

parent fec9a3a5
...@@ -53,10 +53,11 @@ class Transaction(object): ...@@ -53,10 +53,11 @@ class Transaction(object):
self.lockless = set() self.lockless = set()
def __repr__(self): def __repr__(self):
return "<%s(tid=%r, uuid=%r, age=%.2fs) at 0x%x>" \ return "<%s(%s, locking_tid=%s, tid=%s, age=%.2fs) at 0x%x>" % (
% (self.__class__.__name__, self.__class__.__name__,
dump(self.tid),
uuid_str(self.uuid), uuid_str(self.uuid),
dump(self.locking_tid),
dump(self.tid),
time() - self._birth, time() - self._birth,
id(self)) id(self))
...@@ -120,7 +121,9 @@ class TransactionManager(EventQueue): ...@@ -120,7 +121,9 @@ class TransactionManager(EventQueue):
self._transaction_dict.iteritems()): self._transaction_dict.iteritems()):
if txn.locking_tid == MAX_TID: if txn.locking_tid == MAX_TID:
break # all remaining transactions are resolving a deadlock break # all remaining transactions are resolving a deadlock
for oid in txn.lockless.intersection(txn.serial_dict): assert txn.lockless.issubset(txn.serial_dict), (
txn.lockless, txn.serial_dict)
for oid in txn.lockless:
partition = getPartition(oid) partition = getPartition(oid)
if partition in replicated: if partition in replicated:
if store_lock_dict.get(oid, ttid) != ttid: if store_lock_dict.get(oid, ttid) != ttid:
...@@ -138,7 +141,9 @@ class TransactionManager(EventQueue): ...@@ -138,7 +141,9 @@ class TransactionManager(EventQueue):
partition, replicated.pop(partition))) partition, replicated.pop(partition)))
for oid, ttid in store_lock_dict.iteritems(): for oid, ttid in store_lock_dict.iteritems():
if getPartition(oid) in notify: if getPartition(oid) in notify:
self._transaction_dict[ttid].lockless.remove(oid) # Use 'discard' instead of 'remove', for oids that were
# locked after that the partition was replicated.
self._transaction_dict[ttid].lockless.discard(oid)
def register(self, conn, ttid): def register(self, conn, ttid):
""" """
......
...@@ -1935,6 +1935,49 @@ class Test(NEOThreadedTest): ...@@ -1935,6 +1935,49 @@ class Test(NEOThreadedTest):
3: [4, 'StoreTransaction'], 3: [4, 'StoreTransaction'],
}) })
@with_cluster(replicas=1)
def testNotifyReplicated2(self, cluster):
s0, s1 = cluster.storage_list
s1.stop()
cluster.join((s1,))
s1.resetNode()
t1, c1 = cluster.getTransaction()
r = c1.root()
for x in 'ab':
r[x] = PCounterWithResolution()
t1.commit()
r['a'].value += 1
r['b'].value += 2
t2, c2 = cluster.getTransaction()
r = c2.root()
r['a'].value += 3
r['b'].value += 4
thread = self.newPausedThread(t2.commit)
def t2_b(*args, **kw):
self.assertPartitionTable(cluster, 'UO')
f.remove(delay)
self.tic()
self.assertPartitionTable(cluster, 'UO')
yield 0
def t2_vote(*args, **kw):
self.tic()
self.assertPartitionTable(cluster, 'UU')
yield 0
with ConnectionFilter() as f, \
self.thread_switcher((thread,),
(1, 0, 1, 1, t2_b, 0, 0, 1, t2_vote, 0, 0),
('tpc_begin', 'tpc_begin', 1, 1, 2, 2,
'RebaseTransaction', 'RebaseTransaction', 'StoreTransaction',
'AnswerRebaseTransaction', 'AnswerRebaseTransaction',
)) as end:
delay = f.delayAskFetchTransactions()
s1.start()
self.tic()
t1.commit()
thread.join()
t2.begin()
self.assertEqual([4, 6], [r[x].value for x in 'ab'])
@with_cluster(storage_count=2, partitions=2) @with_cluster(storage_count=2, partitions=2)
def testDeadlockAvoidanceBeforeInvolvingAnotherNode(self, cluster): def testDeadlockAvoidanceBeforeInvolvingAnotherNode(self, cluster):
t1, c1 = cluster.getTransaction() t1, c1 = cluster.getTransaction()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment