testReplication.py 31.6 KB
Newer Older
1
#
Julien Muchembled's avatar
Julien Muchembled committed
2
# Copyright (C) 2012-2017  Nexedi SA
3 4 5 6 7 8 9 10 11 12 13 14
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

17
import random, sys, threading, time
18
import transaction
19
from ZODB.POSException import ReadOnlyError, POSKeyError
20
import unittest
21
from collections import defaultdict
22
from functools import wraps
23
from neo.lib import logging
24
from neo.client.exception import NEOStorageError
25
from neo.master.handlers.backup import BackupHandler
26
from neo.storage.checker import CHECK_COUNT
27
from neo.storage.replicator import Replicator
28
from neo.lib.connector import SocketConnector
29
from neo.lib.connection import ClientConnection
30
from neo.lib.protocol import CellStates, ClusterStates, Packets, \
31
    ZERO_OID, ZERO_TID, MAX_TID, uuid_str
32
from neo.lib.util import p64, u64
33
from .. import expectedFailure, Patch, TransactionalResource
34 35
from . import ConnectionFilter, NEOCluster, NEOThreadedTest, \
    predictable_random, with_cluster
36
from .test import PCounter, PCounterWithResolution # XXX
37 38


39 40 41
def backup_test(partitions=1, upstream_kw={}, backup_kw={}):
    def decorator(wrapped):
        def wrapper(self):
42
            with NEOCluster(partitions, **upstream_kw) as upstream:
43
                upstream.start()
44 45
                with NEOCluster(partitions, upstream=upstream,
                                **backup_kw) as backup:
46 47
                    backup.start()
                    backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP)
48
                    self.tic()
49 50 51 52 53
                    wrapped(self, backup)
        return wraps(wrapped)(wrapper)
    return decorator


54 55
class ReplicationTests(NEOThreadedTest):

Julien Muchembled's avatar
Julien Muchembled committed
56
    def checksumPartition(self, storage, partition, max_tid=MAX_TID):
57
        dm = storage.dm
Julien Muchembled's avatar
Julien Muchembled committed
58
        args = partition, None, ZERO_TID, max_tid
59 60
        return dm.checkTIDRange(*args), \
            dm.checkSerialRange(min_oid=ZERO_OID, *args)
61

Julien Muchembled's avatar
Julien Muchembled committed
62 63 64
    def checkPartitionReplicated(self, source, destination, partition, **kw):
        self.assertEqual(self.checksumPartition(source, partition, **kw),
                         self.checksumPartition(destination, partition, **kw))
65

Julien Muchembled's avatar
Julien Muchembled committed
66
    def checkBackup(self, cluster, **kw):
67 68 69 70 71
        upstream_pt = cluster.upstream.primary_master.pt
        pt = cluster.primary_master.pt
        np = pt.getPartitions()
        self.assertEqual(np, upstream_pt.getPartitions())
        checked = 0
72
        source_dict = {x.uuid: x for x in cluster.upstream.storage_list}
73
        for storage in cluster.storage_list:
74
            self.assertFalse(storage.dm._uncommitted_data)
75 76 77 78
            self.assertEqual(np, storage.pt.getPartitions())
            for partition in pt.getAssignedPartitionList(storage.uuid):
                cell_list = upstream_pt.getCellList(partition, readable=True)
                source = source_dict[random.choice(cell_list).getUUID()]
Julien Muchembled's avatar
Julien Muchembled committed
79
                self.checkPartitionReplicated(source, storage, partition, **kw)
80 81 82
                checked += 1
        return checked

83 84 85 86 87 88 89 90 91 92
    def checkReplicas(self, cluster):
        pt = cluster.primary_master.pt
        storage_dict = {x.uuid: x for x in cluster.storage_list}
        for offset in xrange(pt.getPartitions()):
            checksum_list = [
                self.checksumPartition(storage_dict[x.getUUID()], offset)
                for x in pt.getCellList(offset)]
            self.assertEqual(1, len(set(checksum_list)),
                             (offset, checksum_list))

93
    def testBackupNormalCase(self):
94 95 96
        np = 7
        nr = 2
        check_dict = dict.fromkeys(xrange(np))
97 98
        with NEOCluster(partitions=np, replicas=nr-1, storage_count=3
                        ) as upstream:
99 100 101
            upstream.start()
            importZODB = upstream.importZODB()
            importZODB(3)
102 103 104 105 106 107
            def delaySecondary(conn, packet):
                if isinstance(packet, Packets.Replicate):
                    tid, upstream_name, source_dict = packet.decode()
                    return not upstream_name and all(source_dict.itervalues())
            with NEOCluster(partitions=np, replicas=nr-1, storage_count=5,
                            upstream=upstream) as backup:
108 109 110
                backup.start()
                # Initialize & catch up.
                backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP)
111
                self.tic()
112
                self.assertEqual(np*nr, self.checkBackup(backup))
113 114
                # Normal case, following upstream cluster closely.
                importZODB(17)
115
                self.tic()
116
                self.assertEqual(np*nr, self.checkBackup(backup))
117 118

                # Check that a backup cluster can be restarted.
119 120 121 122 123
                backup.stop()
                backup.start()
                self.assertEqual(backup.neoctl.getClusterState(),
                                 ClusterStates.BACKINGUP)
                importZODB(17)
124
                self.tic()
125 126
                self.assertEqual(np*nr, self.checkBackup(backup))
                backup.neoctl.checkReplicas(check_dict, ZERO_TID, None)
127
                self.tic()
128 129
                # Stop backing up, nothing truncated.
                backup.neoctl.setClusterState(ClusterStates.STOPPING_BACKUP)
130
                self.tic()
131
                self.assertEqual(np*nr, self.checkBackup(backup))
132 133
                self.assertEqual(backup.neoctl.getClusterState(),
                                 ClusterStates.RUNNING)
134

135
                backup.stop()
Julien Muchembled's avatar
Julien Muchembled committed
136 137
                backup.start()
                backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP)
138
                self.tic()
Julien Muchembled's avatar
Julien Muchembled committed
139 140 141 142
                with backup.master.filterConnection(*backup.storage_list) as f:
                    f.add(delaySecondary)
                    while not f.filtered_count:
                        importZODB(1)
143
                    self.tic()
Julien Muchembled's avatar
Julien Muchembled committed
144
                    backup.neoctl.setClusterState(ClusterStates.STOPPING_BACKUP)
145 146
                    self.tic()
                self.tic()
Julien Muchembled's avatar
Julien Muchembled committed
147
                self.assertEqual(np*nr, self.checkBackup(backup,
148
                    max_tid=backup.last_tid))
149

Julien Muchembled's avatar
Julien Muchembled committed
150 151 152
                backup.stop()
                backup.start()
                backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP)
153
                self.tic()
Julien Muchembled's avatar
Julien Muchembled committed
154
                with ConnectionFilter() as f:
155
                    f.delayAddObject(lambda conn: conn.getUUID() is None)
Julien Muchembled's avatar
Julien Muchembled committed
156 157
                    while not f.filtered_count:
                        importZODB(1)
158
                    self.tic()
Julien Muchembled's avatar
Julien Muchembled committed
159
                    backup.neoctl.setClusterState(ClusterStates.STOPPING_BACKUP)
160 161
                    self.tic()
                self.tic()
Julien Muchembled's avatar
Julien Muchembled committed
162
                self.assertEqual(np*nr, self.checkBackup(backup,
163
                    max_tid=backup.last_tid))
164 165 166 167 168 169 170 171 172

    @predictable_random()
    def testBackupNodeLost(self):
        """Check backup cluster can recover after random connection loss

        - backup master disconnected from upstream master
        - primary storage disconnected from backup master
        - non-primary storage disconnected from backup master
        """
173 174
        np = 4
        check_dict = dict.fromkeys(xrange(np))
175 176 177 178 179 180 181 182 183 184 185 186 187 188
        from neo.master.backup_app import random
        def fetchObjects(orig, min_tid=None, min_oid=ZERO_OID):
            if min_tid is None:
                counts[0] += 1
                if counts[0] > 1:
                    orig.im_self.app.master_conn.close()
            return orig(min_tid, min_oid)
        def onTransactionCommitted(orig, txn):
            counts[0] += 1
            if counts[0] > 1:
                node_list = orig.im_self.nm.getClientList(only_identified=True)
                node_list.remove(txn.getNode())
                node_list[0].getConnection().close()
            return orig(txn)
189
        with NEOCluster(partitions=np, replicas=0, storage_count=1) as upstream:
190 191
            upstream.start()
            importZODB = upstream.importZODB(random=random)
192 193 194
            # Do not start with an empty DB so that 'primary_dict' below is not
            # empty on the first iteration.
            importZODB(1)
195 196
            with NEOCluster(partitions=np, replicas=2, storage_count=4,
                            upstream=upstream) as backup:
197 198
                backup.start()
                backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP)
199
                self.tic()
200 201 202 203 204 205 206 207
                storage_list = [x.uuid for x in backup.storage_list]
                slave = set(xrange(len(storage_list))).difference
                for event in xrange(10):
                    counts = [0]
                    if event == 5:
                        p = Patch(upstream.master.tm,
                            _on_commit=onTransactionCommitted)
                    else:
208
                        primary_dict = defaultdict(list)
209 210
                        for k, v in sorted(backup.master.backup_app
                                           .primary_partition_dict.iteritems()):
211
                            primary_dict[storage_list.index(v._uuid)].append(k)
212 213 214 215 216 217 218 219
                        if event % 2:
                            storage = slave(primary_dict).pop()
                        else:
                            storage, partition_list = primary_dict.popitem()
                        # Populate until the found storage performs
                        # a second replication partially and aborts.
                        p = Patch(backup.storage_list[storage].replicator,
                                  fetchObjects=fetchObjects)
220
                    with p:
221
                        importZODB(lambda x: counts[0] > 1)
222 223
                    if event > 5:
                        backup.neoctl.checkReplicas(check_dict, ZERO_TID, None)
224
                    self.tic()
225
                    self.assertEqual(np*3, self.checkBackup(backup))
226

227 228 229 230
    @backup_test()
    def testBackupUpstreamStorageDead(self, backup):
        upstream = backup.upstream
        with ConnectionFilter() as f:
231
            f.delayInvalidateObjects()
232 233
            upstream.importZODB()(1)
        count = [0]
234
        def _connect(orig, conn):
235
            count[0] += 1
236 237
            orig(conn)
        with Patch(ClientConnection, _connect=_connect):
238
            upstream.storage.listening_conn.close()
239 240 241 242 243 244 245 246 247 248
            self.tic(step=2)
            self.assertEqual(count[0], 0)
            t = SocketConnector.CONNECT_LIMIT = .5
            t += time.time()
            self.tic()
            # 1st attempt failed, 2nd is deferred
            self.assertEqual(count[0], 2)
            self.tic(check_timeout=(backup.storage,))
            # 2nd failed, 3rd deferred
            self.assertEqual(count[0], 4)
249
            self.assertLessEqual(t, time.time())
250

251 252
    @backup_test()
    def testBackupDelayedUnlockTransaction(self, backup):
Julien Muchembled's avatar
Julien Muchembled committed
253 254 255 256 257
        """
        Check that a backup storage node is put on hold by upstream if
        the requested transaction is still locked. Such case happens when
        the backup cluster reacts very quickly to a new transaction.
        """
258 259
        upstream = backup.upstream
        with upstream.master.filterConnection(upstream.storage) as f:
260
            f.delayNotifyUnlockInformation()
261
            upstream.importZODB()(1)
262 263
            self.tic()
        self.tic()
264 265
        self.assertEqual(1, self.checkBackup(backup))

266 267
    @with_cluster()
    def testBackupEarlyInvalidation(self, upstream):
268
        """
269
        The backup master must ignore notifications before being fully
270 271
        initialized.
        """
272
        with NEOCluster(upstream=upstream) as backup:
273 274
                backup.start()
                with ConnectionFilter() as f:
275
                    f.delayAskPartitionTable(lambda conn:
276 277 278 279 280
                        isinstance(conn.getHandler(), BackupHandler))
                    backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP)
                    upstream.importZODB()(1)
                    self.tic()
                self.tic()
281
                self.assertTrue(backup.master.is_alive())
282

283 284 285 286 287 288 289 290 291 292 293 294 295 296 297
    @backup_test()
    def testBackupTid(self, backup):
        """
        Check that the backup cluster does not claim it has all the data just
        after it came back whereas new transactions were committed during its
        absence.
        """
        importZODB = backup.upstream.importZODB()
        importZODB(1)
        self.tic()
        last_tid = backup.upstream.last_tid
        self.assertEqual(last_tid, backup.backup_tid)
        backup.stop()
        importZODB(1)
        with ConnectionFilter() as f:
298
            f.delayAskFetchTransactions()
299 300 301 302 303
            backup.start()
            self.assertEqual(last_tid, backup.backup_tid)
        self.tic()
        self.assertEqual(1, self.checkBackup(backup))

304 305
    @with_cluster(start_cluster=0, partitions=3, replicas=1, storage_count=3)
    def testSafeTweak(self, cluster):
306 307 308 309 310 311 312 313 314
        """
        Check that tweak always tries to keep a minimum of (replicas + 1)
        readable cells, otherwise we have less/no redundancy as long as
        replication has not finished.
        """
        def changePartitionTable(orig, *args):
            orig(*args)
            sys.exit()
        s0, s1, s2 = cluster.storage_list
315
        if 1:
316 317 318 319
            cluster.start([s0, s1])
            s2.start()
            self.tic()
            cluster.enableStorageList([s2])
320 321
            # 2 UP_TO_DATE cells become FEEDING:
            # they are dropped only when the replication is done,
322 323 324 325
            # so that 1 storage can still die without data loss.
            with Patch(s0.dm, changePartitionTable=changePartitionTable):
                cluster.neoctl.tweakPartitionTable()
                self.tic()
326
            self.assertEqual(cluster.neoctl.getClusterState(),
327 328
                             ClusterStates.RUNNING)

329 330
    @with_cluster(start_cluster=0, partitions=3, replicas=1, storage_count=3)
    def testReplicationAbortedBySource(self, cluster):
331 332 333 334 335 336
        """
        Check that a feeding node aborts replication when its partition is
        dropped, and that the out-of-date node finishes to replicate from
        another source.
        Here are the different states of partitions over time:
          pt: 0: U|U|U
337 338 339 340
          pt: 0: UO.|U.O|FOO
          pt: 0: UU.|U.O|FOO
          pt: 0: UU.|U.U|FOO # nodes 1 & 2 replicate from node 0
          pt: 0: UU.|U.U|.OU # here node 0 lost partition 2
341
                             # and node 1 must switch to node 2
342
          pt: 0: UU.|U.U|.UU
343 344
        """
        def delayAskFetch(conn, packet):
345 346 347
            return isinstance(packet, delayed) and \
                   packet.decode()[0] == offset and \
                   conn in s1.getConnectionList(s0)
348 349
        def changePartitionTable(orig, ptid, cell_list):
            if (offset, s0.uuid, CellStates.DISCARDED) in cell_list:
350
                connection_filter.remove(delayAskFetch)
351 352 353 354
                # XXX: this is currently not done by
                #      default for performance reason
                orig.im_self.dropPartitions((offset,))
            return orig(ptid, cell_list)
355
        np = cluster.num_partitions
356 357
        s0, s1, s2 = cluster.storage_list
        for delayed in Packets.AskFetchTransactions, Packets.AskFetchObjects:
358 359 360
            if cluster.started:
                cluster.stop(1)
            if 1:
361
                cluster.start([s0])
362
                cluster.populate([range(np*2)] * np)
363 364
                s1.start()
                s2.start()
365
                self.tic()
366
                cluster.neoctl.enableStorageList([s1.uuid, s2.uuid])
367
                cluster.neoctl.tweakPartitionTable()
368 369 370
                offset, = [offset for offset, row in enumerate(
                                      cluster.master.pt.partition_list)
                                  for cell in row if cell.isFeeding()]
371 372 373
                with ConnectionFilter() as connection_filter:
                    connection_filter.add(delayAskFetch,
                        Patch(s0.dm, changePartitionTable=changePartitionTable))
374
                    self.tic()
375
                    self.assertEqual(1, connection_filter.filtered_count)
376
                self.tic()
377 378
                self.checkPartitionReplicated(s1, s2, offset)

379 380
    @with_cluster(start_cluster=0, partitions=2, storage_count=2)
    def testClientReadingDuringTweak(self, cluster):
381 382 383
        def sync(orig):
            m2c.remove(delay)
            orig()
384
        s0, s1 = cluster.storage_list
385
        if 1:
386 387 388 389 390 391 392 393 394 395 396 397 398
            cluster.start([s0])
            storage = cluster.getZODBStorage()
            oid = p64(1)
            txn = transaction.Transaction()
            storage.tpc_begin(txn)
            storage.store(oid, None, 'foo', '', txn)
            storage.tpc_finish(txn)
            storage._cache.clear()
            s1.start()
            self.tic()
            cluster.neoctl.enableStorageList([s1.uuid])
            cluster.neoctl.tweakPartitionTable()
            with cluster.master.filterConnection(cluster.client) as m2c:
399
                delay = m2c.delayNotifyPartitionChanges()
400
                self.tic()
401 402 403
                with Patch(cluster.client, sync=sync):
                    self.assertEqual('foo', storage.load(oid)[0])
                self.assertNotIn(delay, m2c)
404

405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428
    @with_cluster(start_cluster=False, storage_count=3, partitions=3)
    def testAbortingReplication(self, cluster):
        s1, s2, s3 = cluster.storage_list
        cluster.start((s1, s2))
        t, c = cluster.getTransaction()
        r = c.root()
        for x in 'ab':
            r[x] = PCounter()
        t.commit()
        cluster.stop(replicas=1)
        cluster.start((s1, s2))
        with ConnectionFilter() as f:
            f.delayAddObject()
            cluster.neoctl.tweakPartitionTable()
            s3.start()
            self.tic()
            cluster.neoctl.enableStorageList((s3.uuid,))
            cluster.neoctl.tweakPartitionTable()
            self.tic()
        self.tic()
        for s in cluster.storage_list:
            self.assertTrue(s.is_alive())
        self.checkReplicas(cluster)

429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538
    @with_cluster(start_cluster=0, replicas=1, storage_count=4, partitions=2)
    def testTweakVsReplication(self, cluster, done=False):
        S = cluster.storage_list
        cluster.start(S[:1])
        t, c = cluster.getTransaction()
        ob = c.root()[''] = PCounterWithResolution()
        t.commit()
        self.assertEqual(1, u64(ob._p_oid))
        for s in S[1:]:
            s.start()
        self.tic()
        def tweak():
            self.tic()
            self.assertFalse(delay_list)
            self.assertPartitionTable(cluster, 'UU|UO')
            f.delayAskFetchObjects()
            cluster.enableStorageList(S[2:])
            cluster.neoctl.tweakPartitionTable()
            self.tic()
            self.assertPartitionTable(cluster, 'UU..|F.OO')
        with ConnectionFilter() as f, cluster.moduloTID(1), \
             Patch(S[1].replicator,
                   _nextPartitionSortKey=lambda orig, offset: offset):
            delay_list = [1, 0]
            delay = (f.delayNotifyReplicationDone if done else
                     f.delayAnswerFetchObjects)(lambda _: delay_list.pop())
            cluster.enableStorageList((S[1],))
            cluster.neoctl.tweakPartitionTable()
            ob._p_changed = 1
            if done:
                tweak()
                t.commit()
            else:
                t2, c2 = cluster.getTransaction()
                c2.root()['']._p_changed = 1
                l = threading.Lock(); l.acquire()
                TransactionalResource(t2, 0, tpc_vote=lambda _: l.release())
                t2 = self.newPausedThread(t2.commit)
                self.tic()
                @TransactionalResource(t, 0)
                def tpc_vote(_):
                    t2.start()
                    l.acquire()
                    f.remove(delay)
                    tweak()
                t.commit()
                t2.join()
            cluster.neoctl.dropNode(S[2].uuid)
            cluster.neoctl.dropNode(S[3].uuid)
            cluster.neoctl.tweakPartitionTable()
            if done:
                f.remove(delay)
            self.tic()
            self.assertPartitionTable(cluster, 'UU|UO')
        self.tic()
        self.assertPartitionTable(cluster, 'UU|UU')
        self.checkReplicas(cluster)

    def testTweakVsReplicationDone(self):
        self.testTweakVsReplication(True)

    @with_cluster(start_cluster=0, storage_count=2, partitions=2)
    def testCommitVsDiscardedCell(self, cluster):
        s0, s1 = cluster.storage_list
        cluster.start((s0,))
        t, c = cluster.getTransaction()
        ob = c.root()[''] = PCounterWithResolution()
        t.commit()
        self.assertEqual(1, u64(ob._p_oid))
        s1.start()
        self.tic()
        nonlocal_ = []
        with ConnectionFilter() as f:
            delay = f.delayNotifyReplicationDone()
            cluster.enableStorageList((s1,))
            cluster.neoctl.tweakPartitionTable()
            self.tic()
            self.assertPartitionTable(cluster, 'U.|FO')
            t2, c2 = cluster.getTransaction()
            c2.root()[''].value += 3
            l = threading.Lock(); l.acquire()
            @TransactionalResource(t2, 0)
            def tpc_vote(_):
                self.tic()
                l.release()
            t2 = self.newPausedThread(t2.commit)
            @TransactionalResource(t, 0, tpc_finish=lambda _:
                f.remove(nonlocal_.pop(0)))
            def tpc_vote(_):
                t2.start()
                l.acquire()
                nonlocal_.append(f.delayNotifyPartitionChanges())
                f.remove(delay)
                self.tic()
                self.assertPartitionTable(cluster, 'U.|.U', cluster.master)
                nonlocal_.append(cluster.master.pt.getID())
            ob.value += 2
            t.commit()
            t2.join()
        self.tic()
        self.assertPartitionTable(cluster, 'U.|.U')
        self.assertEqual(cluster.master.pt.getID(), nonlocal_.pop())
        t.begin()
        self.assertEqual(ob.value, 5)
        # get the second to last tid (for which ob=2)
        tid2 = s1.dm.getObject(ob._p_oid, None, ob._p_serial)[0]
        # s0 must not have committed anything for partition 1
        with s0.dm.replicated(1):
            self.assertFalse(s0.dm.getObject(ob._p_oid, tid2))

539 540 541
    @with_cluster(start_cluster=0, replicas=1)
    def testResumingReplication(self, cluster):
        if 1:
542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557
            s0, s1 = cluster.storage_list
            cluster.start(storage_list=(s0,))
            t, c = cluster.getTransaction()
            r = c.root()
            r._p_changed = 1
            t.commit()
            s1.start()
            self.tic()
            with Patch(Replicator, connected=lambda *_: None):
                cluster.enableStorageList((s1,))
                cluster.neoctl.tweakPartitionTable()
                r._p_changed = 1
                t.commit()
                self.tic()
                s1.stop()
                cluster.join((s1,))
558
            t0, t1, t2 = c.db().storage.iterator()
559 560 561 562 563 564
            s1.resetNode()
            s1.start()
            self.tic()
            self.assertEqual([], cluster.getOutdatedCells())
            s0.stop()
            cluster.join((s0,))
565
            t0, t1, t2 = c.db().storage.iterator()
566

567 568 569 570 571 572 573 574 575 576
    @with_cluster(start_cluster=0, replicas=1, partitions=2)
    def testReplicationBlockedByUnfinished1(self, cluster,
                                            delay_replication=False):
        s0, s1 = cluster.storage_list
        cluster.start(storage_list=(s0,))
        storage = cluster.getZODBStorage()
        oid = storage.new_oid()
        with ConnectionFilter() as f, cluster.moduloTID(1 - u64(oid) % 2):
            if delay_replication:
                delay_replication = f.delayAnswerFetchObjects()
577
            tid = None
578 579 580
            expected = 'U|U'
            for n in xrange(3):
                # On second iteration, the transaction will block replication
581
                # until tpc_finish.
582
                # We do a last iteration as a quick check that the cluster
583 584 585
                # remains functional after such a scenario.
                txn = transaction.Transaction()
                storage.tpc_begin(txn)
586 587
                tid = storage.store(oid, tid, str(n), '', txn)
                if n == 1:
588 589 590 591 592
                    # Start the outdated storage.
                    s1.start()
                    self.tic()
                    cluster.enableStorageList((s1,))
                    cluster.neoctl.tweakPartitionTable()
593
                    expected = 'UO|UO'
594
                self.tic()
595
                self.assertPartitionTable(cluster, expected)
596
                storage.tpc_vote(txn)
597
                self.assertPartitionTable(cluster, expected)
598
                tid = storage.tpc_finish(txn)
599 600 601 602 603 604 605 606
                if n == 1:
                    if delay_replication:
                        self.tic()
                        self.assertPartitionTable(cluster, expected)
                        f.remove(delay_replication)
                        delay_replication = None
                    self.tic() # replication resumes and ends
                    expected = 'UU|UU'
607
                self.assertPartitionTable(cluster, expected)
608 609
            self.assertEqual(cluster.neoctl.getClusterState(),
                             ClusterStates.RUNNING)
610 611 612 613
        self.checkPartitionReplicated(s0, s1, 0)

    def testReplicationBlockedByUnfinished2(self):
        self.testReplicationBlockedByUnfinished1(True)
614

615 616
    @with_cluster(partitions=5, replicas=2, storage_count=3)
    def testCheckReplicas(self, cluster):
617 618 619 620
        from neo.storage import checker
        def corrupt(offset):
            s0, s1, s2 = (storage_dict[cell.getUUID()]
                for cell in cluster.master.pt.getCellList(offset, True))
621 622
            logging.info('corrupt partition %u of %s',
                         offset, uuid_str(s1.uuid))
623 624 625 626 627 628 629 630
            s1.dm.deleteObject(p64(np+offset), p64(corrupt_tid))
            return s0.uuid
        def check(expected_state, expected_count):
            self.assertEqual(expected_count, len([None
              for row in cluster.neoctl.getPartitionRowList()[1]
              for cell in row[1]
              if cell[1] == CellStates.CORRUPTED]))
            self.assertEqual(expected_state, cluster.neoctl.getClusterState())
631
        np = cluster.num_partitions
632 633 634
        tid_count = np * 3
        corrupt_tid = tid_count // 2
        check_dict = dict.fromkeys(xrange(np))
635
        with Patch(checker, CHECK_COUNT=2):
636
            cluster.populate([range(np*2)] * tid_count)
637
            storage_dict = {x.uuid: x for x in cluster.storage_list}
638
            cluster.neoctl.checkReplicas(check_dict, ZERO_TID, None)
639
            self.tic()
640 641 642
            check(ClusterStates.RUNNING, 0)
            source = corrupt(0)
            cluster.neoctl.checkReplicas(check_dict, p64(corrupt_tid+1), None)
643
            self.tic()
644 645
            check(ClusterStates.RUNNING, 0)
            cluster.neoctl.checkReplicas({0: source}, ZERO_TID, None)
646
            self.tic()
647 648 649
            check(ClusterStates.RUNNING, 1)
            corrupt(1)
            cluster.neoctl.checkReplicas(check_dict, p64(corrupt_tid+1), None)
650
            self.tic()
651 652
            check(ClusterStates.RUNNING, 1)
            cluster.neoctl.checkReplicas(check_dict, ZERO_TID, None)
653
            self.tic()
654
            check(ClusterStates.RECOVERING, 4)
655

656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684
    @backup_test()
    def testBackupReadOnlyAccess(self, backup):
        """Check backup cluster can be used in read-only mode by ZODB clients"""
        B = backup
        U = B.upstream
        Z = U.getZODBStorage()
        #Zb = B.getZODBStorage()    # XXX see below about invalidations

        oid_list = []
        tid_list = []

        # S -> Sb link stops working during [cutoff, recover) test iterations
        cutoff  = 4
        recover = 7
        def delayReplication(conn, packet):
            return isinstance(packet, Packets.AnswerFetchTransactions)

        with ConnectionFilter() as f:
            for i in xrange(10):
                if i == cutoff:
                    f.add(delayReplication)
                if i == recover:
                    # .remove() removes the filter and retransmits all packets
                    # that were queued once first filtered packed was detected
                    # on a connection.
                    f.remove(delayReplication)

                # commit new data to U
                txn = transaction.Transaction()
685
                txn.note(u'test transaction %s' % i)
686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749
                Z.tpc_begin(txn)
                oid = Z.new_oid()
                Z.store(oid, None, '%s-%i' % (oid, i), '', txn)
                Z.tpc_vote(txn)
                tid = Z.tpc_finish(txn)
                oid_list.append(oid)
                tid_list.append(tid)

                # make sure data propagated to B  (depending on cutoff)
                self.tic()
                if cutoff <= i < recover:
                    self.assertLess(B.backup_tid, U.last_tid)
                else:
                    self.assertEqual(B.backup_tid, U.last_tid)
                self.assertEqual(B.last_tid,   U.last_tid)
                self.assertEqual(1, self.checkBackup(B, max_tid=B.backup_tid))

                # read data from B and verify it is what it should be
                # XXX we open new ZODB storage every time because invalidations
                # are not yet implemented in read-only mode.
                Zb = B.getZODBStorage()
                for j, oid in enumerate(oid_list):
                    if cutoff <= i < recover and j >= cutoff:
                        self.assertRaises(POSKeyError, Zb.load, oid, '')
                    else:
                        data, serial = Zb.load(oid, '')
                        self.assertEqual(data, '%s-%s' % (oid, j))
                        self.assertEqual(serial, tid_list[j])

                # verify how transaction log & friends behave under potentially
                # not-yet-fully fetched backup state (transactions committed at
                # [cutoff, recover) should not be there; otherwise transactions
                # should be fully there)
                Zb = B.getZODBStorage()
                Btxn_list = list(Zb.iterator())
                self.assertEqual(len(Btxn_list), cutoff if cutoff <= i < recover
                                                 else i+1)
                for j, txn in enumerate(Btxn_list):
                    self.assertEqual(txn.tid, tid_list[j])
                    self.assertEqual(txn.description, 'test transaction %i' % j)
                    obj, = txn
                    self.assertEqual(obj.oid, oid_list[j])
                    self.assertEqual(obj.data, '%s-%s' % (obj.oid, j))

                # TODO test askObjectHistory once it is fixed

                # try to commit something to backup storage and make sure it is
                # really read-only
                Zb._cache._max_size = 0     # make store() do work in sync way
                txn = transaction.Transaction()
                self.assertRaises(ReadOnlyError, Zb.tpc_begin, txn)
                self.assertRaises(ReadOnlyError, Zb.new_oid)
                self.assertRaises(ReadOnlyError, Zb.store, oid_list[-1],
                                            tid_list[-1], 'somedata', '', txn)
                # tpc_vote first checks whether there were store replies -
                # thus not ReadOnlyError
                self.assertRaises(NEOStorageError, Zb.tpc_vote, txn)

                # close storage because client app is otherwise shared in
                # threaded tests and we need to refresh last_tid on next run
                # (XXX see above about invalidations not working)
                Zb.close()


750 751
if __name__ == "__main__":
    unittest.main()