diff --git a/product/TIDStorage/bin/tidstorage.py b/product/TIDStorage/bin/tidstorage.py index 4b4a5ca7d0eb47ba031ba568b90e93cb20c6d9f1..7eeef2084a044b9f66ad8ee8fcd3fe516d335dd1 100755 --- a/product/TIDStorage/bin/tidstorage.py +++ b/product/TIDStorage/bin/tidstorage.py @@ -179,6 +179,9 @@ class TIDServer(SocketServer.BaseRequestHandler): tid_dict = self._field_exchange.recv_dict() self._tid_storage.commit(identifier, tid_dict) + def bootstraped(self): + self._field_exchange.send_int(has_bootstraped and 1 or 0) + def handle(self): global tid_storage self._tid_storage = tid_storage @@ -187,7 +190,8 @@ class TIDServer(SocketServer.BaseRequestHandler): 'begin': self.begin, 'abort': self.abort, 'commit': self.commit, - 'dump': self.dump + 'dump': self.dump, + 'bootstraped': self.bootstraped, } self.log('Connected') try: @@ -327,38 +331,36 @@ class TIDStorage: storage_id_set = self._storage_id_to_storage_id_set_dict[storage_id] # Raises if not found storage_id_set.remove(storage_id) - if self._tid_file is not None: - now = time.time() - can_full_dump = has_bootstraped and (self._next_full_dump is not None) and (self._next_full_dump < now) - can_dump = (not can_full_dump) and (self._next_dump is not None) and (self._next_dump < now) - record_for_dump = can_dump or (self._next_dump is not None) - append_to_file = has_bootstraped and (can_dump or can_full_dump) - else: - append_to_file = record_for_dump = can_dump = can_full_dump = False - for key, value in self._storage_id_to_storage_id_set_dict.iteritems(): - if len(value) == 0 and key in self._transcient: - self._storage[key] = self._transcient.pop(key) - if record_for_dump: - self._since_last_burst.add(key) - if append_to_file: - if can_full_dump: - to_dump_dict = self._storage - dump_code = 'f' + if has_bootstraped: + if self._tid_file is not None: + now = time.time() + can_full_dump = (self._next_full_dump is not None) and (self._next_full_dump < now) + can_dump = (not can_full_dump) and (self._next_dump is not None) and (self._next_dump < now) + record_for_dump = can_dump or (self._next_dump is not None) + append_to_file = (can_dump or can_full_dump) else: - to_dump_dict = dict([(key, self._storage[key]) for key in self._since_last_burst]) - dump_code = 'd' - if len(to_dump_dict): - self._tid_file.write('%.02f %s %r\n' % (now, dump_code, to_dump_dict)) + append_to_file = record_for_dump = can_dump = can_full_dump = False + for key, value in self._storage_id_to_storage_id_set_dict.iteritems(): + if len(value) == 0 and key in self._transcient: + self._storage[key] = self._transcient.pop(key) + if record_for_dump: + self._since_last_burst.add(key) + if append_to_file: if can_full_dump: - self._next_full_dump = now + self._full_dump_period - if self._next_dump is not None: - self._next_dump = now + self._burst_period - self._since_last_burst.clear() - if not has_bootstraped: + to_dump_dict = self._storage + dump_code = 'f' + else: + to_dump_dict = dict([(key, self._storage[key]) for key in self._since_last_burst]) + dump_code = 'd' + if len(to_dump_dict): + self._tid_file.write('%.02f %s %r\n' % (now, dump_code, to_dump_dict)) + if can_full_dump: + self._next_full_dump = now + self._full_dump_period + if self._next_dump is not None: + self._next_dump = now + self._burst_period + self._since_last_burst.clear() + else: doBootstrap() - #if len(self._storage_id_to_transaction_id_list_dict) == 0: - # self._storage.update(self._transcient) - # self._transcient.clear() finally: self._storage_id_lock.release() @@ -369,6 +371,13 @@ class TIDStorage: finally: self._storage_id_lock.release() + def dump_transcient(self): + self._storage_id_lock.acquire() + try: + return self._transcient.copy() + finally: + self._storage_id_lock.release() + def begin(self, transaction_id, storage_id_list): self._storage_id_lock.acquire() try: @@ -433,12 +442,12 @@ class BootstrapContent(threading.Thread): else: storage_id_to_object_path_dict[key] = mountpoint target_storage_id_set = sets.ImmutableSet(storage_id_to_object_path_dict.keys()) - known_storage_id_set = sets.ImmutableSet(tid_storage.dump().keys()) + known_storage_id_set = sets.ImmutableSet(tid_storage.dump_transcient().keys()) to_check_storage_id_set = target_storage_id_set - known_storage_id_set while len(to_check_storage_id_set) and can_bootstrap: serialize_url = None for storage_id in to_check_storage_id_set: - if can_bootstrap and storage_id not in tid_storage.dump().keys(): + if can_bootstrap and storage_id not in tid_storage.dump_transcient().keys(): serialize_url = base_url % (storage_id_to_object_path_dict[storage_id], ) try: # Query a Zope, which will contact this process in return to store @@ -450,7 +459,7 @@ class BootstrapContent(threading.Thread): log('Opened %r: %r' % (serialize_url, page.read())) # Let some time for zope to contact TIDStorage back and fill the gaps. time.sleep(5) - known_storage_id_set = sets.ImmutableSet(tid_storage.dump().keys()) + known_storage_id_set = sets.ImmutableSet(tid_storage.dump_transcient().keys()) to_check_storage_id_set = target_storage_id_set - known_storage_id_set if len(to_check_storage_id_set): log('Bootstrap in progress... Mising storages: %r' % (to_check_storage_id_set, )) diff --git a/product/TIDStorage/tests/testTIDServer.py b/product/TIDStorage/tests/testTIDServer.py index 5d859c1c167e0f955b7ab23169caca102b498fde..a6b8acf05a50cfa2101ffd1f63b09678e1b0ce90 100755 --- a/product/TIDStorage/tests/testTIDServer.py +++ b/product/TIDStorage/tests/testTIDServer.py @@ -3,6 +3,7 @@ from ExchangeProtocol import ExchangeProtocol import traceback import socket +import time import sys assert len(sys.argv) == 3, 'Requires exactly 2 arguments: <address> <port>' @@ -53,6 +54,14 @@ class TIDClient: internal_storage_tid_dict['%s_%s' % (test_id, key)] = value self._exchange_protocol.send_dict(internal_storage_tid_dict) + def bootstraped(self): + self._exchange_protocol.send_field('bootstraped') + return self._exchange_protocol.recv_int() + + def waitForBootstrap(self): + while not self.bootstraped(): + time.sleep(0.1) + class TestTIDServerV2: def __init__(self, address, port): self._client = TIDClient((address, port)) @@ -60,13 +69,35 @@ class TestTIDServerV2: def assertEqual(self, value, target): assert value == target, 'value %r does not match target %r' % (value, target) - def testInitialValue(self, test_id): + def test_01_InitialValue(self, test_id): """ Check that the storage is empty """ self.assertEqual(self._client.dump_all(), {}) + + def test_02_Bootstrap(self, test_id): + """ + Trigger bootstrap and check that no value is visible until bootstrap is + done. + """ + t1_storage_tid_dict = {'s0': 1} + t2_storage_tid_dict = {'s1': 1} + self.assertEqual(self._client.dump(test_id), {}) + self._client.begin(test_id, 't1', t1_storage_tid_dict.keys()) + self.assertEqual(self._client.dump(test_id), {}) + self._client.commit(test_id, 't1', t1_storage_tid_dict) + # Bootstrap is runing on the server, nothing is visible yet. + self.assertEqual(self._client.dump(test_id), {}) + self._client.waitForBootstrap() + # Nothing is available yet, we need one more transaction to happen. + self.assertEqual(self._client.dump(test_id), {}) + self._client.begin(test_id, 't2', t2_storage_tid_dict.keys()) + self.assertEqual(self._client.dump(test_id), {}) + self._client.commit(test_id, 't2', t2_storage_tid_dict) + # Now everything must be available. + self.assertEqual(self._client.dump(test_id), {'s0': 1, 's1': 1}) - def testScenario1(self, test_id): + def test_03_Scenario1(self, test_id): """ Simple begin - commit case. """ @@ -77,7 +108,7 @@ class TestTIDServerV2: self._client.commit(test_id, 't1', storage_tid_dict) self.assertEqual(self._client.dump(test_id), storage_tid_dict) - def testScenario2(self, test_id): + def test_04_Scenario2(self, test_id): """ Simple begin - abort case. """ @@ -88,7 +119,7 @@ class TestTIDServerV2: self._client.abort(test_id, 't1') self.assertEqual(self._client.dump(test_id), {}) - def testScenario3(self, test_id): + def test_05_Scenario3(self, test_id): """ 2 concurent transactions impacting a common storage. Second transaction begins after first does, and commits before @@ -106,7 +137,7 @@ class TestTIDServerV2: self._client.commit(test_id, 't1', t1_storage_tid_dict) self.assertEqual(self._client.dump(test_id), {'s1': 2, 's2': 1, 's3': 1}) - def testScenario4(self, test_id): + def test_06_Scenario4(self, test_id): """ 3 concurent transactions. Transactions 1 and 2 impact same storage s1. @@ -132,7 +163,7 @@ class TestTIDServerV2: self._client.commit(test_id, 't1', t1_storage_tid_dict) self.assertEqual(self._client.dump(test_id), {'s1': 2, 's2': 2, 's3': 1}) - def testScenario4bis(self, test_id): + def test_07_Scenario4bis(self, test_id): """ 3 concurent transactions. Transactions 1 and 2 impact same storage s1. @@ -162,7 +193,7 @@ class TestTIDServerV2: self._client.abort(test_id, 't1') self.assertEqual(self._client.dump(test_id), {'s1': 2, 's3': 1}) - def testScenario5(self, test_id): + def test_08_Scenario5(self, test_id): """ 2 concurent transactions impacting a common storage. Second transaction begins after first does, and commits after @@ -180,7 +211,7 @@ class TestTIDServerV2: self._client.commit(test_id, 't2', t2_storage_tid_dict) self.assertEqual(self._client.dump(test_id), {'s1': 2, 's2': 1}) - def testScenario6(self, test_id): + def test_09_Scenario6(self, test_id): """ 2 concurent transactions impacting separate sets of storages. Check that the first commit impacts dump data immediately. @@ -197,7 +228,7 @@ class TestTIDServerV2: self._client.commit(test_id, 't2', t2_storage_tid_dict) self.assertEqual(self._client.dump(test_id), {'s1': 1, 's2': 1}) - def testScenario7(self, test_id): + def test_10_Scenario7(self, test_id): """ 3 concurent transactions. t1 and t2 impact a set of different storages. @@ -222,7 +253,7 @@ class TestTIDServerV2: self._client.commit(test_id, 't3', t3_storage_tid_dict) self.assertEqual(self._client.dump(test_id), {'s1': 2, 's2': 2}) - def testScenario8(self, test_id): + def test_11_Scenario8(self, test_id): """ Simple increase case. """