Commit aac495e4 authored by Julien Muchembled's avatar Julien Muchembled

qa: more reliable use of extra clients in threaded tests

testExternalInvalidation is splitted to minimize reindentation.
parent 82027ac9
...@@ -725,13 +725,29 @@ class NEOCluster(object): ...@@ -725,13 +725,29 @@ class NEOCluster(object):
assert state in (ClusterStates.RUNNING, ClusterStates.BACKINGUP), state assert state in (ClusterStates.RUNNING, ClusterStates.BACKINGUP), state
self.enableStorageList(storage_list) self.enableStorageList(storage_list)
def newClient(self): def _newClient(self):
return ClientApplication(name=self.name, master_nodes=self.master_nodes, return ClientApplication(name=self.name, master_nodes=self.master_nodes,
compress=self.compress, ssl=self.SSL) compress=self.compress, ssl=self.SSL)
@contextmanager
def newClient(self, with_db=False):
x = self._newClient()
try:
t = x.poll_thread
closed = []
if with_db:
x = ZODB.DB(storage=self.getZODBStorage(client=x))
else:
# XXX: Do nothing if finally if the caller already closed it.
x.close = lambda: closed.append(x.__class__.close(x))
yield x
finally:
closed or x.close()
self.join((t,))
@cached_property @cached_property
def client(self): def client(self):
client = self.newClient() client = self._newClient()
# Make sure client won't be reused after it was closed. # Make sure client won't be reused after it was closed.
def close(): def close():
client = self.client client = self.client
...@@ -825,9 +841,9 @@ class NEOCluster(object): ...@@ -825,9 +841,9 @@ class NEOCluster(object):
for o in oid_list: for o in oid_list:
tid_dict[o] = i tid_dict[o] = i
def getTransaction(self): def getTransaction(self, db=None):
txn = transaction.TransactionManager() txn = transaction.TransactionManager()
return txn, self.db.open(transaction_manager=txn) return txn, (self.db if db is None else db).open(txn)
def __del__(self, __print_exc=traceback.print_exc): def __del__(self, __print_exc=traceback.print_exc):
try: try:
......
...@@ -889,8 +889,12 @@ class Test(NEOThreadedTest): ...@@ -889,8 +889,12 @@ class Test(NEOThreadedTest):
def testExternalInvalidation(self): def testExternalInvalidation(self):
cluster = NEOCluster() cluster = NEOCluster()
try: try:
self._testExternalInvalidation(cluster)
finally:
cluster.stop()
def _testExternalInvalidation(self, cluster):
cluster.start() cluster.start()
cache = cluster.client._cache
# Initialize objects # Initialize objects
t1, c1 = cluster.getTransaction() t1, c1 = cluster.getTransaction()
c1.root()['x'] = x1 = PCounter() c1.root()['x'] = x1 = PCounter()
...@@ -905,7 +909,8 @@ class Test(NEOThreadedTest): ...@@ -905,7 +909,8 @@ class Test(NEOThreadedTest):
# (at this time, we still have x=0 and y=1) # (at this time, we still have x=0 and y=1)
t2, c2 = cluster.getTransaction() t2, c2 = cluster.getTransaction()
# Copy y to x using a different Master-Client connection # Copy y to x using a different Master-Client connection
client = cluster.newClient() with cluster.newClient() as client:
cache = cluster.client._cache
txn = transaction.Transaction() txn = transaction.Transaction()
client.tpc_begin(txn) client.tpc_begin(txn)
client.store(x1._p_oid, x1._p_serial, y, '', txn) client.store(x1._p_oid, x1._p_serial, y, '', txn)
...@@ -984,9 +989,6 @@ class Test(NEOThreadedTest): ...@@ -984,9 +989,6 @@ class Test(NEOThreadedTest):
self.assertFalse(invalidations(c1)) self.assertFalse(invalidations(c1))
self.assertEqual(x1.value, 1) self.assertEqual(x1.value, 1)
finally:
cluster.stop()
def testReadVerifyingStorage(self): def testReadVerifyingStorage(self):
cluster = NEOCluster(storage_count=2, partitions=2) cluster = NEOCluster(storage_count=2, partitions=2)
try: try:
...@@ -995,11 +997,8 @@ class Test(NEOThreadedTest): ...@@ -995,11 +997,8 @@ class Test(NEOThreadedTest):
c1.root()['x'] = x = PCounter() c1.root()['x'] = x = PCounter()
t1.commit() t1.commit()
# We need a second client for external invalidations. # We need a second client for external invalidations.
t2 = transaction.TransactionManager() with cluster.newClient(1) as db:
db = DB(storage=cluster.getZODBStorage(client=cluster.newClient())) t2, c2 = cluster.getTransaction(db)
try:
c2 = db.open(t2)
t2.begin()
r = c2.root() r = c2.root()
r['y'] = None r['y'] = None
r['x']._p_activate() r['x']._p_activate()
...@@ -1012,8 +1011,6 @@ class Test(NEOThreadedTest): ...@@ -1012,8 +1011,6 @@ class Test(NEOThreadedTest):
t2.commit() t2.commit()
for storage in cluster.storage_list: for storage in cluster.storage_list:
self.assertFalse(storage.tm._transaction_dict) self.assertFalse(storage.tm._transaction_dict)
finally:
db.close()
# Check we didn't get an invalidation, which would cause an # Check we didn't get an invalidation, which would cause an
# assertion failure in the cache. Connection does the same check in # assertion failure in the cache. Connection does the same check in
# _setstate_noncurrent so this could be also done by starting a # _setstate_noncurrent so this could be also done by starting a
...@@ -1048,14 +1045,11 @@ class Test(NEOThreadedTest): ...@@ -1048,14 +1045,11 @@ class Test(NEOThreadedTest):
self.tic() self.tic()
# modify x with another client # modify x with another client
client = cluster.newClient() with cluster.newClient() as client:
try:
txn = transaction.Transaction() txn = transaction.Transaction()
client.tpc_begin(txn) client.tpc_begin(txn)
client.store(x1._p_oid, x1._p_serial, y, '', txn) client.store(x1._p_oid, x1._p_serial, y, '', txn)
tid = client.tpc_finish(txn, None) tid = client.tpc_finish(txn, None)
finally:
client.close()
self.tic() self.tic()
# Check reconnection to the master and storage. # Check reconnection to the master and storage.
...@@ -1226,14 +1220,9 @@ class Test(NEOThreadedTest): ...@@ -1226,14 +1220,9 @@ class Test(NEOThreadedTest):
with cluster.master.filterConnection(cluster.storage) as m2s: with cluster.master.filterConnection(cluster.storage) as m2s:
m2s.add(delayNotifyInformation) m2s.add(delayNotifyInformation)
cluster.client.master_conn.close() cluster.client.master_conn.close()
client = cluster.newClient() with cluster.newClient() as client, Patch(
p = Patch(client.storage_bootstrap_handler, notReady=notReady) client.storage_bootstrap_handler, notReady=notReady):
try:
p.apply()
x = client.load(ZERO_TID) x = client.load(ZERO_TID)
finally:
del p
client.close()
self.assertNotIn(delayNotifyInformation, m2s) self.assertNotIn(delayNotifyInformation, m2s)
finally: finally:
cluster.stop() cluster.stop()
...@@ -1392,8 +1381,7 @@ class Test(NEOThreadedTest): ...@@ -1392,8 +1381,7 @@ class Test(NEOThreadedTest):
self.assertRaises(TransientError, getattr, c, "root") self.assertRaises(TransientError, getattr, c, "root")
uuid = cluster.client.uuid uuid = cluster.client.uuid
# Let's use a second client to steal the node id of the first one. # Let's use a second client to steal the node id of the first one.
client = cluster.newClient() with cluster.newClient() as client:
try:
client.sync() client.sync()
self.assertEqual(uuid, client.uuid) self.assertEqual(uuid, client.uuid)
# The client reconnects successfully to the master and storage, # The client reconnects successfully to the master and storage,
...@@ -1405,8 +1393,6 @@ class Test(NEOThreadedTest): ...@@ -1405,8 +1393,6 @@ class Test(NEOThreadedTest):
self.assertNotEqual(uuid, cluster.client.uuid) self.assertNotEqual(uuid, cluster.client.uuid)
# Second reconnection, for a successful load. # Second reconnection, for a successful load.
c.root c.root
finally:
client.close()
finally: finally:
cluster.stop() cluster.stop()
...@@ -1450,15 +1436,12 @@ class Test(NEOThreadedTest): ...@@ -1450,15 +1436,12 @@ class Test(NEOThreadedTest):
s2c, = s2c s2c, = s2c
m2c, = cluster.master.getConnectionList(cluster.client) m2c, = cluster.master.getConnectionList(cluster.client)
m2c.close() m2c.close()
Cb = cluster.newClient() with cluster.newClient() as Cb:
try:
Cb.pt # only connect to the master Cb.pt # only connect to the master
del s2c.readable del s2c.readable
self.assertRaises(NEOPrimaryMasterLost, t.join) self.assertRaises(NEOPrimaryMasterLost, t.join)
self.assertTrue(s2c.isClosed()) self.assertTrue(s2c.isClosed())
connectToStorage(Cb) connectToStorage(Cb)
finally:
Cb.close()
finally: finally:
cluster.stop() cluster.stop()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment