testClientApp.py 36.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
#
# Copyright (C) 2009  Nexedi SA
# 
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17 18

import unittest
19
from mock import Mock, ReturnValues
20
from ZODB.POSException import StorageTransactionError, UndoError, ConflictError
21
from neo.tests import NeoTestBase
22
from neo.client.app import Application
23 24
from neo.client.exception import NEOStorageError, NEOStorageNotFoundError, \
        NEOStorageConflictError
25
from neo import protocol
Grégory Wisniewski's avatar
Grégory Wisniewski committed
26
from neo.protocol import Packets, INVALID_TID, INVALID_SERIAL
27
from neo.util import makeChecksum
28
import neo.connection
29

30
def _getMasterConnection(self):
31
    if self.master_conn is None:
32 33 34 35 36 37
        self.uuid = 'C' * 16
        self.num_partitions = 10
        self.num_replicas = 1
        self.pt = Mock({
            'getCellListForID': (),
        })
38 39
        self.master_conn = Mock()
    return self.master_conn
40

41 42 43 44 45
def _getPartitionTable(self):
    if self.pt is None:
        self.master_conn = _getMasterConnection(self)
    return self.pt

46 47 48 49
def _waitMessage(self, conn=None, msg_id=None, handler=None):
    if conn is not None and handler is not None:
        handler.dispatch(conn, conn.fakeReceived())
    else:
50 51 52
        raise NotImplementedError


53
class ClientApplicationTests(NeoTestBase):
54

55
    def setUp(self):
56 57 58
        # apply monkey patches
        self._getMasterConnection = Application._getMasterConnection
        self._waitMessage = Application._waitMessage
59
        self._getPartitionTable = Application._getPartitionTable
60 61
        Application._getMasterConnection = _getMasterConnection
        Application._waitMessage = _waitMessage
62
        Application._getPartitionTable = _getPartitionTable
63 64 65 66 67

    def tearDown(self):
        # restore environnement
        Application._getMasterConnection = self._getMasterConnection
        Application._waitMessage = self._waitMessage
68
        Application._getPartitionTable = self._getPartitionTable
69

70 71
    # some helpers

72 73 74 75 76 77 78 79
    def checkAskPacket(self, conn, packet_type, decode=False):
        calls = conn.mockGetNamedCalls('ask')
        self.assertEquals(len(calls), 1)
        # client connection got queue as first parameter
        packet = calls[0].getParam(1)
        self.assertTrue(isinstance(packet, protocol.Packet))
        self.assertEquals(packet.getType(), packet_type)
        if decode:
80
            return packet.decode()
81 82
        return packet

83 84 85
    def getApp(self, master_nodes='127.0.0.1:10010', name='test',
               connector='SocketConnector', **kw):
        app = Application(master_nodes, name, connector, **kw)
86
        app.dispatcher = Mock({ })
87
        return app
88

89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
    def makeOID(self, value=None):
        from random import randint
        if value is None:
            value = randint(0, 255)
        return '\00' * 7 + chr(value)
    makeTID = makeOID

    def makeTransactionObject(self, user='u', description='d', _extension='e'):
        class Transaction(object): pass
        txn = Transaction()
        txn.user = user
        txn.description = description
        txn._extension = _extension
        return txn

104 105 106 107 108 109
    def beginTransaction(self, app, tid):
        txn = self.makeTransactionObject()
        app.tpc_begin(txn, tid=tid)
        return txn

    def storeObject(self, app, oid=None, data='DATA'):
110
        tid = app.local_var.tid
111 112
        if oid is None:
            oid = self.makeOID()
113
        obj = (oid, tid, 'DATA', '', app.local_var.txn)
114
        packet = Packets.AnswerStoreObject(conflicting=0, oid=oid, serial=tid)
115
        conn = Mock({ 'getNextId': 1, 'fakeReceived': packet, })
116
        cell = Mock({ 'getAddress': 'FakeServer', 'getState': 'FakeState', })
117
        app.cp = Mock({ 'getConnForCell': conn})
118
        app.pt = Mock({ 'getCellListForID': (cell, cell, ) })
119 120 121
        return oid

    def voteTransaction(self, app):
122 123
        tid = app.local_var.tid
        txn = app.local_var.txn
124
        packet = Packets.AnswerStoreTransaction(tid=tid)
125
        conn = Mock({ 'getNextId': 1, 'fakeReceived': packet, })
126
        cell = Mock({ 'getAddress': 'FakeServer', 'getState': 'FakeState', })
127
        app.pt = Mock({ 'getCellListForID': (cell, cell, ) })
128
        app.cp = Mock({ 'getConnForCell': ReturnValues(None, conn), })
129 130 131
        app.tpc_vote(txn)

    def finishTransaction(self, app):
132 133
        txn = app.local_var.txn
        tid = app.local_var.tid
134
        packet = Packets.NotifyTransactionFinished(tid)
135 136 137 138 139 140 141 142 143
        app.master_conn = Mock({ 
            'getNextId': 1,
            'getAddress': ('127.0.0.1', 10010),
            'fakeReceived': packet,    
        })
        app.tpc_finish(txn)

    # common checks

144
    def checkDispatcherRegisterCalled(self, app, conn):
145
        calls = app.dispatcher.mockGetNamedCalls('register')
146
        #self.assertEquals(len(calls), 1)
147
        #self.assertEquals(calls[0].getParam(0), conn)
148
        #self.assertTrue(isinstance(calls[0].getParam(2), Queue))
149 150 151 152 153

    def test_getQueue(self):
        app = self.getApp()
        # Test sanity check
        self.assertTrue(getattr(app, 'local_var', None) is not None)
154 155
        # Test that queue is created 
        self.assertTrue(getattr(app.local_var, 'queue', None) is not None)
156 157 158 159 160 161 162 163 164 165 166

    def test_registerDB(self):
        app = self.getApp()
        dummy_db = []
        app.registerDB(dummy_db, None)
        self.assertTrue(app.getDB() is dummy_db)

    def test_new_oid(self):
        app = self.getApp()
        test_msg_id = 50
        test_oid_list = ['\x00\x00\x00\x00\x00\x00\x00\x01', '\x00\x00\x00\x00\x00\x00\x00\x02']
167
        response_packet = Packets.AnswerNewOIDs(test_oid_list[:])
168
        app.master_conn = Mock({'getNextId': test_msg_id, '_addPacket': None,
169 170 171 172 173 174 175 176 177 178 179
                                'expectMessage': None, 'lock': None,
                                'unlock': None,
                                # Test-specific method
                                'fakeReceived': response_packet})
        new_oid = app.new_oid()
        self.assertTrue(new_oid in test_oid_list)
        self.assertEqual(len(app.new_oid_list), 1)
        self.assertTrue(app.new_oid_list[0] in test_oid_list)
        self.assertNotEqual(app.new_oid_list[0], new_oid)

    def test_getSerial(self):
180 181 182 183 184 185
        app = self.getApp()
        mq = app.mq_cache
        oid = self.makeOID()
        tid = self.makeTID()
        # cache cleared -> result from ZODB
        self.assertTrue(oid not in mq)
186
        app.pt = Mock({ 'getCellListForID': (), })
187 188
        app.local_var.history = (oid, [(tid, 0)])
        self.assertEquals(app.getSerial(oid), tid)
189
        self.assertEquals(len(app.pt.mockGetNamedCalls('getCellListForID')), 1)
190 191 192
        # fill the cache -> hit
        mq.store(oid, (tid, ''))
        self.assertTrue(oid in mq)
193
        app.pt = Mock({ 'getCellListForID': (), })
194 195
        app.getSerial(oid)
        self.assertEquals(app.getSerial(oid), tid)
196
        self.assertEquals(len(app.pt.mockGetNamedCalls('getCellListForID')), 0)
197
    
198
    def test_load(self):
199 200 201 202 203
        app = self.getApp()
        mq = app.mq_cache
        oid = self.makeOID()
        tid1 = self.makeTID(1)
        tid2 = self.makeTID(2)
204
        an_object = (1, oid, tid1, tid2, 0, makeChecksum('OBJ'), 'OBJ')
205 206 207 208 209
        # connection to SN close
        self.assertTrue(oid not in mq)
        packet = protocol.oidNotFound('')
        cell = Mock({ 'getUUID': '\x00' * 16})
        conn = Mock({'getUUID': '\x10' * 16,
210
                     'getAddress': ('127.0.0.1', 0),
211 212 213
                     'fakeReceived': packet,    
                     })
        app.local_var.queue = Mock({'get_nowait' : (conn, None)})
214
        app.pt = Mock({ 'getCellListForOID': (cell, ), })
215
        app.cp = Mock({ 'getConnForCell' : conn})
216
        app.local_var.asked_object = -1
217
        Application._waitMessage = self._waitMessage
218 219 220
        self.assertRaises(NEOStorageNotFoundError, app.load, oid)
        self.checkAskObject(conn)
        Application._waitMessage = _waitMessage
221 222
        # object not found in NEO -> NEOStorageNotFoundError
        self.assertTrue(oid not in mq)
223
        packet = protocol.oidNotFound('')
224 225
        cell = Mock({ 'getUUID': '\x00' * 16})
        conn = Mock({ 
226
            'getAddress': ('127.0.0.1', 0),
227 228
            'fakeReceived': packet,    
        })
229
        app.pt = Mock({ 'getCellListForOID': (cell, ), })
230
        app.cp = Mock({ 'getConnForCell' : conn})
231 232
        app.local_var.asked_object = -1
        self.assertRaises(NEOStorageNotFoundError, app.load, oid)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
233
        self.checkAskObject(conn)
234
        # object found on storage nodes and put in cache
235
        packet = Packets.AnswerObject(*an_object[1:])
236
        conn = Mock({ 
237
            'getAddress': ('127.0.0.1', 0),
238 239
            'fakeReceived': packet,    
        })
240
        app.cp = Mock({ 'getConnForCell' : conn})
241 242
        app.local_var.asked_object = an_object
        result = app.load(oid)
243
        self.assertEquals(result, ('OBJ', tid1))
Grégory Wisniewski's avatar
Grégory Wisniewski committed
244
        self.checkAskObject(conn)
245 246 247
        self.assertTrue(oid in mq)
        # object is now cached, try to reload it 
        conn = Mock({ 
248
            'getAddress': ('127.0.0.1', 0),
249
        })
250
        app.cp = Mock({ '_getConnForCell' : conn})
251
        result = app.load(oid)
252
        self.assertEquals(result, ('OBJ', tid1))
253
        self.checkNoPacketSent(conn)
254
        
255
    def test_loadSerial(self):
256 257 258 259 260 261 262
        app = self.getApp()
        mq = app.mq_cache
        oid = self.makeOID()
        tid1 = self.makeTID(1)
        tid2 = self.makeTID(2)
        # object not found in NEO -> NEOStorageNotFoundError
        self.assertTrue(oid not in mq)
263
        packet = protocol.oidNotFound('')
264 265
        cell = Mock({ 'getUUID': '\x00' * 16})
        conn = Mock({ 
266
            'getAddress': ('127.0.0.1', 0),
267 268
            'fakeReceived': packet,    
        })
269
        app.pt = Mock({ 'getCellListForID': (cell, ), })
270
        app.cp = Mock({ 'getConnForCell' : conn})
271 272
        app.local_var.asked_object = -1
        self.assertRaises(NEOStorageNotFoundError, app.loadSerial, oid, tid2)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
273
        self.checkAskObject(conn)
274 275 276 277 278
        # object should not have been cached
        self.assertFalse(oid in mq)
        # now a cached version ewxists but should not be hit 
        mq.store(oid, (tid1, 'WRONG'))
        self.assertTrue(oid in mq)
279
        another_object = (1, oid, tid2, INVALID_SERIAL, 0, makeChecksum('RIGHT'), 'RIGHT')
280
        packet = Packets.AnswerObject(*another_object[1:])
281
        conn = Mock({ 
282
            'getAddress': ('127.0.0.1', 0),
283 284
            'fakeReceived': packet,    
        })
285
        app.cp = Mock({ 'getConnForCell' : conn})
286 287 288
        app.local_var.asked_object = another_object
        result = app.loadSerial(oid, tid1)
        self.assertEquals(result, 'RIGHT')
Grégory Wisniewski's avatar
Grégory Wisniewski committed
289
        self.checkAskObject(conn)
290
        self.assertTrue(oid in mq)
291 292

    def test_loadBefore(self):
293 294 295 296 297 298 299
        app = self.getApp()
        mq = app.mq_cache
        oid = self.makeOID()
        tid1 = self.makeTID(1)
        tid2 = self.makeTID(2)
        # object not found in NEO -> NEOStorageNotFoundError
        self.assertTrue(oid not in mq)
300
        packet = protocol.oidNotFound('')
301 302
        cell = Mock({ 'getUUID': '\x00' * 16})
        conn = Mock({ 
303
            'getAddress': ('127.0.0.1', 0),
304 305
            'fakeReceived': packet,    
        })
306
        app.pt = Mock({ 'getCellListForID': (cell, ), })
307
        app.cp = Mock({ 'getConnForCell' : conn})
308 309
        app.local_var.asked_object = -1
        self.assertRaises(NEOStorageNotFoundError, app.loadBefore, oid, tid2)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
310
        self.checkAskObject(conn)
311
        # no previous versions -> return None
312
        an_object = (1, oid, tid2, INVALID_SERIAL, 0, makeChecksum(''), '')
313
        packet = Packets.AnswerObject(*an_object[1:])
314
        conn = Mock({ 
315
            'getAddress': ('127.0.0.1', 0),
316 317
            'fakeReceived': packet,    
        })
318
        app.cp = Mock({ 'getConnForCell' : conn})
319 320 321 322 323 324 325 326
        app.local_var.asked_object = an_object
        result = app.loadBefore(oid, tid1)
        self.assertEquals(result, None)
        # object should not have been cached
        self.assertFalse(oid in mq)
        # as for loadSerial, the object is cached but should be loaded from db 
        mq.store(oid, (tid1, 'WRONG'))
        self.assertTrue(oid in mq)
327
        another_object = (1, oid, tid1, tid2, 0, makeChecksum('RIGHT'), 'RIGHT')
328
        packet = Packets.AnswerObject(*another_object[1:])
329
        conn = Mock({ 
330
            'getAddress': ('127.0.0.1', 0),
331 332
            'fakeReceived': packet,    
        })
333
        app.cp = Mock({ 'getConnForCell' : conn})
334 335 336
        app.local_var.asked_object = another_object
        result = app.loadBefore(oid, tid1)
        self.assertEquals(result, ('RIGHT', tid1, tid2))
Grégory Wisniewski's avatar
Grégory Wisniewski committed
337
        self.checkAskObject(conn)
338
        self.assertTrue(oid in mq)
339 340

    def test_tpc_begin(self):
341 342 343 344 345 346 347
        app = self.getApp()
        tid = self.makeTID()
        txn = Mock()
        # first, tid is supplied 
        self.assertNotEquals(getattr(app, 'tid', None), tid)
        self.assertNotEquals(getattr(app, 'txn', None), txn)
        app.tpc_begin(transaction=txn, tid=tid)
348 349
        self.assertTrue(app.local_var.txn is txn)
        self.assertEquals(app.local_var.tid, tid)
350 351
        # next, the transaction already begin -> do nothing
        app.tpc_begin(transaction=txn, tid=None)
352 353
        self.assertTrue(app.local_var.txn is txn)
        self.assertEquals(app.local_var.tid, tid)
354
        # cancel and start a transaction without tid
355 356
        app.local_var.txn = None
        app.local_var.tid = None
357 358
        # no connection -> NEOStorageError (wait until connected to primary)
        #self.assertRaises(NEOStorageError, app.tpc_begin, transaction=txn, tid=None)
359
        # ask a tid to pmn
360
        packet = Packets.AnswerBeginTransaction(tid=tid)
361 362 363 364 365 366 367
        app.master_conn = Mock({
            'getNextId': 1,
            'expectMessage': None, 
            'lock': None,
            'unlock': None,
            'fakeReceived': packet,
        })
368
        app.dispatcher = Mock({ })
369
        app.tpc_begin(transaction=txn, tid=None)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
370
        self.checkAskNewTid(app.master_conn)
371
        self.checkDispatcherRegisterCalled(app, app.master_conn)
372
        # check attributes
373 374
        self.assertTrue(app.local_var.txn is txn)
        self.assertEquals(app.local_var.tid, tid)
375 376 377 378 379 380 381

    def test_store1(self):
        app = self.getApp()
        oid = self.makeOID(11)
        tid = self.makeTID()
        txn = self.makeTransactionObject()
        # invalid transaction > StorageTransactionError
382 383
        app.local_var.txn = old_txn = object()
        self.assertTrue(app.local_var.txn is not txn)
384
        self.assertRaises(StorageTransactionError, app.store, oid, tid, '', None, txn)
385
        self.assertEquals(app.local_var.txn, old_txn)
386
        # check partition_id and an empty cell list -> NEOStorageError
387 388
        app.local_var.txn = txn
        app.local_var.tid = tid
389
        app.pt = Mock({ 'getCellListForID': (), })
390 391
        app.num_partitions = 2 
        self.assertRaises(NEOStorageError, app.store, oid, tid, '',  None, txn)
392
        calls = app.pt.mockGetNamedCalls('getCellListForID')
393
        self.assertEquals(len(calls), 1)
394
        self.assertEquals(calls[0].getParam(0), oid) # oid=11 
395 396 397 398 399 400 401

    def test_store2(self):
        app = self.getApp()
        oid = self.makeOID(11)
        tid = self.makeTID()
        txn = self.makeTransactionObject()
        # build conflicting state
402 403
        app.local_var.txn = txn
        app.local_var.tid = tid
404
        packet = Packets.AnswerStoreObject(conflicting=1, oid=oid, serial=tid)
405 406 407 408 409
        conn = Mock({ 
            'getNextId': 1,
            'fakeReceived': packet,    
        })
        cell = Mock({
410
            'getAddress': 'FakeServer',
411 412
            'getState': 'FakeState',
        })
413
        app.pt = Mock({ 'getCellListForID': (cell, cell, )})
414
        app.cp = Mock({ 'getConnForCell': ReturnValues(None, conn)})
415
        app.dispatcher = Mock({})
416 417
        app.local_var.object_stored = (oid, tid)
        app.local_var.data_dict[oid] = 'BEFORE'
418
        self.assertRaises(NEOStorageConflictError, app.store, oid, tid, '', None, txn)
419
        self.assertTrue(oid not in app.local_var.data_dict)
420
        self.assertEquals(app.conflict_serial, tid)
421
        self.assertEquals(app.local_var.object_stored, (-1, tid))
Grégory Wisniewski's avatar
Grégory Wisniewski committed
422
        self.checkAskStoreObject(conn)
423
        self.checkDispatcherRegisterCalled(app, conn)
424 425 426 427 428 429 430

    def test_store3(self):
        app = self.getApp()
        oid = self.makeOID(11)
        tid = self.makeTID()
        txn = self.makeTransactionObject()
        # case with no conflict
431 432
        app.local_var.txn = txn
        app.local_var.tid = tid
433
        packet = Packets.AnswerStoreObject(conflicting=0, oid=oid, serial=tid)
434 435 436 437
        conn = Mock({ 
            'getNextId': 1,
            'fakeReceived': packet,    
        })
438
        app.cp = Mock({ 'getConnForCell': ReturnValues(None, conn, ) })
439
        cell = Mock({
440
            'getAddress': 'FakeServer',
441 442
            'getState': 'FakeState',
        })
443
        app.pt = Mock({ 'getCellListForID': (cell, cell, ) })
444 445
        app.dispatcher = Mock({})
        app.conflict_serial = None # reset by hand
446
        app.local_var.object_stored = ()
447
        app.store(oid, tid, 'DATA', None, txn)
448 449
        self.assertEquals(app.local_var.object_stored, (oid, tid))
        self.assertEquals(app.local_var.data_dict.get(oid, None), 'DATA')
450
        self.assertNotEquals(app.conflict_serial, tid)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
451
        self.checkAskStoreObject(conn)
452
        self.checkDispatcherRegisterCalled(app, conn)
453 454 455 456 457 458

    def test_tpc_vote1(self):
        app = self.getApp()
        oid = self.makeOID(11)
        txn = self.makeTransactionObject()
        # invalid transaction > StorageTransactionError
459 460
        app.local_var.txn = old_txn = object()
        self.assertTrue(app.local_var.txn is not txn)
461
        self.assertRaises(StorageTransactionError, app.tpc_vote, txn)
462
        self.assertEquals(app.local_var.txn, old_txn)
463 464 465 466 467 468

    def test_tpc_vote2(self):
        # fake transaction object
        app = self.getApp()
        tid = self.makeTID()
        txn = self.makeTransactionObject()
469 470
        app.local_var.txn = txn
        app.local_var.tid = tid
471
        # wrong answer -> failure
472
        packet = Packets.AnswerNewOIDs(())
473 474 475
        conn = Mock({ 
            'getNextId': 1,
            'fakeReceived': packet,    
476
            'getAddress': ('127.0.0.1', 0),
477 478
        })
        cell = Mock({
479
            'getAddress': 'FakeServer',
480 481
            'getState': 'FakeState',
        })
482
        app.pt = Mock({ 'getCellListForID': (cell, cell, ) })
483
        app.cp = Mock({ 'getConnForCell': ReturnValues(None, conn), })
484 485 486
        app.dispatcher = Mock()
        app.tpc_begin(txn, tid)
        self.assertRaises(NEOStorageError, app.tpc_vote, txn)
487
        self.assertEquals(len(conn.mockGetNamedCalls('abort')), 1)
488 489
        calls = conn.mockGetNamedCalls('ask')
        self.assertEquals(len(calls), 1)
490 491
        packet = calls[0].getParam(0)
        self.assertTrue(isinstance(packet, Packet))
492
        self.assertEquals(packet._type, AskStoreTransaction)
493 494 495 496 497

    def test_tpc_vote3(self):
        app = self.getApp()
        tid = self.makeTID()
        txn = self.makeTransactionObject()
498 499
        app.local_var.txn = txn
        app.local_var.tid = tid
500
        # response -> OK
501
        packet = Packets.AnswerStoreTransaction(tid=tid)
502 503 504 505 506
        conn = Mock({ 
            'getNextId': 1,
            'fakeReceived': packet,    
        })
        cell = Mock({
507
            'getAddress': 'FakeServer',
508 509
            'getState': 'FakeState',
        })
510
        app.pt = Mock({ 'getCellListForID': (cell, cell, ) })
511
        app.cp = Mock({ 'getConnForCell': ReturnValues(None, conn), })
512 513 514
        app.dispatcher = Mock()
        app.tpc_begin(txn, tid)
        app.tpc_vote(txn)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
515
        self.checkAskStoreTransaction(conn)
516
        self.checkDispatcherRegisterCalled(app, conn)
517 518 519 520 521 522

    def test_tpc_abort1(self):
        # ignore mismatch transaction
        app = self.getApp()
        tid = self.makeTID()
        txn = self.makeTransactionObject()
523
        app.local_var.txn = old_txn = object()
524
        app.master_conn = Mock()
525 526
        app.local_var.tid = tid
        self.assertFalse(app.local_var.txn is txn)
527 528
        conn = Mock()
        cell = Mock()
529
        app.pt = Mock({'getCellListForID': (cell, cell)})
530
        app.cp = Mock({'getConnForCell': ReturnValues(None, cell)})
531 532 533 534
        app.tpc_abort(txn)
        # no packet sent
        self.checkNoPacketSent(conn)
        self.checkNoPacketSent(app.master_conn)
535 536
        self.assertEquals(app.local_var.txn, old_txn)
        self.assertEquals(app.local_var.tid, tid)
537 538 539 540 541 542 543 544 545

    def test_tpc_abort2(self):
        # 2 nodes : 1 transaction in the first, 2 objects in the second
        # connections to each node should received only one packet to abort
        # and transaction must also be aborted on the master node
        # for simplicity, just one cell per partition
        oid1, oid2 = self.makeOID(2), self.makeOID(4) # on partition 0
        app, tid = self.getApp(), self.makeTID(1)     # on partition 1
        txn = self.makeTransactionObject()
546
        app.local_var.txn, app.local_var.tid = txn, tid
547 548 549 550 551
        app.master_conn = Mock({'__hash__': 0})
        app.num_partitions = 2
        cell1 = Mock({ 'getNode': 'NODE1', '__hash__': 1 })
        cell2 = Mock({ 'getNode': 'NODE2', '__hash__': 2 })
        conn1, conn2 = Mock({ 'getNextId': 1, }), Mock({ 'getNextId': 2, })
552
        app.pt = Mock({ 'getCellListForID': ReturnValues((cell1, ), (cell1, ), (cell1, cell2)), })
553
        app.cp = Mock({ 'getConnForCell': ReturnValues(conn1, conn2), })
554
        # fake data
555
        app.local_var.data_dict = {oid1: '', oid2: ''}
556 557
        app.tpc_abort(txn)
        # will check if there was just one call/packet :
558 559 560
        self.checkNotifyPacket(conn1, AbortTransaction)
        self.checkNotifyPacket(conn2, AbortTransaction)
        self.checkNotifyPacket(app.master_conn, AbortTransaction)
561 562 563 564 565
        self.assertEquals(app.local_var.tid, None)
        self.assertEquals(app.local_var.txn, None)
        self.assertEquals(app.local_var.data_dict, {})
        self.assertEquals(app.local_var.txn_voted, False)
        self.assertEquals(app.local_var.txn_finished, False)
566 567 568 569 570 571

    def test_tpc_finish1(self):
        # ignore mismatch transaction
        app = self.getApp()
        tid = self.makeTID()
        txn = self.makeTransactionObject()
572
        app.local_var.txn = old_txn = object()
573
        app.master_conn = Mock()
574
        self.assertFalse(app.local_var.txn is txn)
575 576
        conn = Mock()
        cell = Mock()
577
        app.pt = Mock({'getCellListForID': (cell, cell)})
578
        app.cp = Mock({'getConnForCell': ReturnValues(None, cell)})
579 580 581 582
        app.tpc_finish(txn)
        # no packet sent
        self.checkNoPacketSent(conn)
        self.checkNoPacketSent(app.master_conn)
583
        self.assertEquals(app.local_var.txn, old_txn)
584 585 586 587 588 589

    def test_tpc_finish2(self):
        # bad answer -> NEOStorageError
        app = self.getApp()
        tid = self.makeTID()
        txn = self.makeTransactionObject()
590
        app.local_var.txn, app.local_var.tid = txn, tid
591 592 593 594 595 596
        # test callable passed to tpc_finish
        self.f_called = False
        self.f_called_with_tid = None
        def hook(tid): 
            self.f_called = True
            self.f_called_with_tid = tid
597
        packet = Packets.AnswerBeginTransaction(INVALID_TID) 
598 599 600 601 602 603
        app.master_conn = Mock({ 
            'getNextId': 1,
            'getAddress': ('127.0.0.1', 10000),
            'fakeReceived': packet,    
        })
        app.dispatcher = Mock({})
604
        app.local_var.txn_finished = False
605 606 607
        self.assertRaises(NEOStorageError, app.tpc_finish, txn, hook)
        self.assertTrue(self.f_called)
        self.assertEquals(self.f_called_with_tid, tid)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
608
        self.checkFinishTransaction(app.master_conn)
609
        self.checkDispatcherRegisterCalled(app, app.master_conn)
610 611

    def test_tpc_finish3(self):
612
        # transaction is finished
613 614 615
        app = self.getApp()
        tid = self.makeTID()
        txn = self.makeTransactionObject()
616
        app.local_var.txn, app.local_var.tid = txn, tid
617 618 619 620 621
        self.f_called = False
        self.f_called_with_tid = None
        def hook(tid): 
            self.f_called = True
            self.f_called_with_tid = tid
622
        packet = Packets.NotifyTransactionFinished(tid)
623 624 625 626 627 628
        app.master_conn = Mock({ 
            'getNextId': 1,
            'getAddress': ('127.0.0.1', 10010),
            'fakeReceived': packet,    
        })
        app.dispatcher = Mock({})
629
        app.local_var.txn_finished = True
630 631 632
        app.tpc_finish(txn, hook)
        self.assertTrue(self.f_called)
        self.assertEquals(self.f_called_with_tid, tid)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
633
        self.checkFinishTransaction(app.master_conn)
634
        #self.checkDispatcherRegisterCalled(app, app.master_conn)
635 636 637 638 639
        self.assertEquals(app.local_var.tid, None)
        self.assertEquals(app.local_var.txn, None)
        self.assertEquals(app.local_var.data_dict, {})
        self.assertEquals(app.local_var.txn_voted, False)
        self.assertEquals(app.local_var.txn_finished, False)
640 641 642 643 644 645 646

    def test_undo1(self):
        # invalid transaction
        app = self.getApp()
        tid = self.makeTID()
        txn = self.makeTransactionObject()
        wrapper = Mock()
647
        app.local_var.txn = old_txn = object()
648
        app.master_conn = Mock()
649
        self.assertFalse(app.local_var.txn is txn)
650 651 652 653 654 655 656 657
        conn = Mock()
        cell = Mock()
        self.assertRaises(StorageTransactionError, app.undo, tid, txn, wrapper)
        # no packet sent
        self.checkNoPacketSent(conn)
        self.checkNoPacketSent(app.master_conn)
        # nothing done
        self.assertEquals(len(wrapper.mockGetNamedCalls('tryToResolveConflict')), 0)
658
        self.assertEquals(app.local_var.txn, old_txn)
659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687

    def test_undo2(self):
        # Four tests here :
        # undo txn1 where obj1 was created -> fail
        # undo txn2 where obj2 was modified in tid3 -> fail
        # undo txn3 where there is a conflict on obj2
        # undo txn3 where obj2 was altered from tid2 -> ok
        # txn4 is the transaction where the undo occurs
        app = self.getApp()
        app.num_partitions = 2
        oid1, oid2 = self.makeOID(1), self.makeOID(2)
        tid1, tid2 = self.makeTID(1), self.makeTID(2)
        tid3, tid4 = self.makeTID(3), self.makeTID(4)
        # commit version 1 of object 1
        txn1 = self.beginTransaction(app, tid=tid1)
        self.storeObject(app, oid=oid1, data='O1V1')
        self.voteTransaction(app)
        self.finishTransaction(app)
        # commit version 1 of object 2
        txn2 = self.beginTransaction(app, tid=tid2)
        self.storeObject(app, oid=oid2, data='O1V2')
        self.voteTransaction(app)
        self.finishTransaction(app)
        # commit version 2 of object 2
        txn3 = self.beginTransaction(app, tid=tid3)
        self.storeObject(app, oid=oid2, data='O2V2')
        self.voteTransaction(app)
        self.finishTransaction(app)
        # undo 1 -> no previous revision
688
        u1p1 = Packets.AnswerTransactionInformation(tid1, '', '', '', (oid1, ))
689
        u1p2 = protocol.oidNotFound('oid not found')
690
        # undo 2 -> not end tid
691 692
        u2p1 = Packets.AnswerTransactionInformation(tid2, '', '', '', (oid2, ))
        u2p2 = Packets.AnswerObject(oid2, tid2, tid3, 0, makeChecksum('O2V1'), 'O2V1')
693
        # undo 3 -> conflict
694 695 696
        u3p1 = Packets.AnswerTransactionInformation(tid3, '', '', '', (oid2, ))
        u3p2 = Packets.AnswerObject(oid2, tid3, tid3, 0, makeChecksum('O2V2'), 'O2V2')
        u3p3 = Packets.AnswerStoreObject(conflicting=1, oid=oid2, serial=tid2)
697
        # undo 4 -> ok
698 699 700
        u4p1 = Packets.AnswerTransactionInformation(tid3, '', '', '', (oid2, ))
        u4p2 = Packets.AnswerObject(oid2, tid3, tid3, 0, makeChecksum('O2V2'), 'O2V2')
        u4p3 = Packets.AnswerStoreObject(conflicting=0, oid=oid2, serial=tid2)
701 702 703 704 705 706 707
        # test logic
        packets = (u1p1, u1p2, u2p1, u2p2, u3p1, u3p2, u3p3, u3p1, u4p2, u4p3)
        conn = Mock({ 
            'getNextId': 1, 
            'fakeReceived': ReturnValues(*packets),
            'getAddress': ('127.0.0.1', 10010),
        })
708
        cell = Mock({ 'getAddress': 'FakeServer', 'getState': 'FakeState', })
709
        app.pt = Mock({ 'getCellListForID': (cell, ) })
710
        app.cp = Mock({ 'getConnForCell': conn})
711 712 713 714 715 716 717 718 719
        wrapper = Mock({'tryToResolveConflict': None})
        txn4 = self.beginTransaction(app, tid=tid4)
        # all start here
        self.assertRaises(UndoError, app.undo, tid1, txn4, wrapper)
        self.assertRaises(UndoError, app.undo, tid2, txn4, wrapper)
        self.assertRaises(ConflictError, app.undo, tid3, txn4, wrapper)
        self.assertEquals(len(wrapper.mockGetNamedCalls('tryToResolveConflict')), 1)
        self.assertEquals(app.undo(tid3, txn4, wrapper), (tid4, [oid2, ]))
        self.finishTransaction(app)
720 721

    def test_undoLog(self):
722 723 724 725 726 727 728 729 730 731
        app = self.getApp()
        app.num_partitions = 2
        uuid1, uuid2 = '\x00' * 15 + '\x01', '\x00' * 15 + '\x02'
        # two nodes, two partition, two transaction, two objects :
        node1, node2 = Mock({}), Mock({})
        cell1, cell2 = Mock({}), Mock({})
        tid1, tid2 = self.makeTID(1), self.makeTID(2)
        oid1, oid2 = self.makeOID(1), self.makeOID(2)
        # TIDs packets supplied by _waitMessage hook
        # TXN info packets
732 733
        p3 = Packets.AnswerTransactionInformation(tid1, '', '', '', (oid1, ))
        p4 = Packets.AnswerTransactionInformation(tid2, '', '', '', (oid2, ))
734 735 736 737 738 739 740 741 742
        conn = Mock({
            'getNextId': 1,
            'getUUID': ReturnValues(uuid1, uuid2),
            'fakeGetApp': app,
            'fakeReceived': ReturnValues(p3, p4),
            'getAddress': ('127.0.0.1', 10010),
        })
        app.pt = Mock({
            'getNodeList': (node1, node2, ),
743
            'getCellListForTID': ReturnValues([cell1], [cell2]),
744
        })
745
        app.cp = Mock({ '_getConnForCell': conn})
746
        def _waitMessage(self, conn=None, msg_id=None, handler=None):
747 748 749 750 751 752 753 754 755
            self.local_var.node_tids = {uuid1: (tid1, ), uuid2: (tid2, )}
            Application._waitMessage = _waitMessage_old
        _waitMessage_old = Application._waitMessage
        Application._waitMessage = _waitMessage
        def txn_filter(info):
            return info['id'] > '\x00' * 8
        result = app.undoLog(0, 4, filter=txn_filter)
        self.assertEquals(result[0]['id'], tid1)
        self.assertEquals(result[1]['id'], tid2)
756 757

    def test_history(self):
758 759 760 761 762
        app = self.getApp()
        oid = self.makeOID(1)
        tid1, tid2 = self.makeTID(1), self.makeTID(2)
        object_history = ( (tid1, 42), (tid2, 42),)
        # object history, first is a wrong oid, second is valid
763
        p2 = Packets.AnswerObjectHistory(oid, object_history)
764
        # transaction history
765 766
        p3 = Packets.AnswerTransactionInformation(tid1, 'u', 'd', 'e', (oid, ))
        p4 = Packets.AnswerTransactionInformation(tid2, 'u', 'd', 'e', (oid, ))
767
        # faked environnement
768 769 770
        conn = Mock({
            'getNextId': 1,
            'fakeGetApp': app,
771
            'fakeReceived': ReturnValues(p2, p3, p4),
772 773
            'getAddress': ('127.0.0.1', 10010),
        })
774
        object_cells = [ Mock({}), ]
775 776
        history_cells = [ Mock({}), Mock({}) ]
        app.pt = Mock({
777
            'getCellListForID': ReturnValues(object_cells, history_cells,
778 779
                history_cells),
        })
780
        app.cp = Mock({ 'getConnForCell': conn})
781
        # start test here
782 783
        result = app.history(oid)
        self.assertEquals(len(result), 2)
784 785
        self.assertEquals(result[0]['tid'], tid1)
        self.assertEquals(result[1]['tid'], tid2)
786 787 788
        self.assertEquals(result[0]['size'], 42)
        self.assertEquals(result[1]['size'], 42)

789
    def test_connectToPrimaryNode(self):
790 791 792 793 794 795
        # here we have three master nodes :
        # the connection to the first will fail
        # the second will have changed
        # the third will not be ready
        # after the third, the partition table will be operational 
        # (as if it was connected to the primary master node)
796
        from neo.tests import DoNothingConnector
797
        # will raise IndexError at the third iteration
798
        app = self.getApp('127.0.0.1:10010/127.0.0.1:10011')
799 800 801 802 803 804 805
        # TODO: test more connection failure cases
        # Seventh packet : askNodeInformation succeeded
        all_passed = []
        def _waitMessage8(self, conn=None, msg_id=None, handler=None):
            all_passed.append(1)
        # Sixth packet : askPartitionTable succeeded
        def _waitMessage7(self, conn=None, msg_id=None, handler=None):
806
            app.pt = Mock({'operational': True})
807 808 809 810
            Application._waitMessage = _waitMessage8
        # fifth packet : request node identification succeeded
        def _waitMessage6(self, conn=None, msg_id=None, handler=None):
            conn.setUUID('D' * 16)
811
            app.uuid = 'C' * 16
812 813 814 815
            Application._waitMessage = _waitMessage7
        # fourth iteration : connection to primary master succeeded
        def _waitMessage5(self, conn=None, msg_id=None, handler=None):
            app.trying_master_node = app.primary_master_node = Mock({
816
                'getAddress': ('192.168.1.1', 10000),
817 818 819 820 821 822 823 824
                '__str__': 'Fake master node',
            })
            Application._waitMessage = _waitMessage6
        # third iteration : node not ready
        def _waitMessage4(app, conn=None, msg_id=None, handler=None):
            app.setNodeNotReady() 
            app.trying_master_node = None
            Application._waitMessage = _waitMessage5
825
        # second iteration : master node changed
826
        def _waitMessage3(app, conn=None, msg_id=None, handler=None):
827
            app.primary_master_node = Mock({
828
                'getAddress': ('192.168.1.1', 10000),
829 830 831 832
                '__str__': 'Fake master node',
            })
            Application._waitMessage = _waitMessage4
        # first iteration : connection failed
833
        def _waitMessage2(app, conn=None, msg_id=None, handler=None):
834
            app.trying_master_node = None
835 836
            Application._waitMessage = _waitMessage3
        # do nothing for the first call
837
        def _waitMessage1(app, conn=None, msg_id=None, handler=None):
838 839 840 841 842 843
            Application._waitMessage = _waitMessage2
        _waitMessage_old = Application._waitMessage
        Application._waitMessage = _waitMessage1
        # faked environnement
        app.connector_handler = DoNothingConnector
        app.em = Mock({})
844
        app.pt = Mock({ 'operational': False})
845
        try:
846
            app.master_conn = app._connectToPrimaryNode()
847 848 849
            self.assertEqual(len(all_passed), 1)
            self.assertTrue(app.master_conn is not None)
            self.assertTrue(app.pt.operational())
850 851 852
        finally:
            Application._waitMessage = _waitMessage_old

853 854 855 856 857 858 859 860 861
    def test_askStorage(self):
        """ _askStorage is private but test it anyway """
        app = self.getApp('')
        app.dispatcher = Mock()
        conn = Mock()
        self.test_ok = False
        def _waitMessage_hook(app, conn=None, msg_id=None, handler=None):
            self.test_ok = True
        _waitMessage_old = Application._waitMessage
862
        packet = Packets.AskBeginTransaction(None)
863
        Application._waitMessage = _waitMessage_hook
864 865 866 867 868
        try:
            app._askStorage(conn, packet)
        finally:
            Application._waitMessage = _waitMessage_old
        # check packet sent, connection unlocked and dispatcher updated
Grégory Wisniewski's avatar
Grégory Wisniewski committed
869
        self.checkAskNewTid(conn)
870
        self.assertEquals(len(conn.mockGetNamedCalls('unlock')), 1)
871
        self.checkDispatcherRegisterCalled(app, conn)
872 873 874 875 876 877 878 879 880
        # and _waitMessage called
        self.assertTrue(self.test_ok)

    def test_askPrimary(self):
        """ _askPrimary is private but test it anyway """
        app = self.getApp('')
        app.dispatcher = Mock()
        conn = Mock()
        app.master_conn = conn
881
        app.primary_handler = Mock()
882 883
        self.test_ok = False
        def _waitMessage_hook(app, conn=None, msg_id=None, handler=None):
884
            self.assertTrue(handler is app.primary_handler)
885 886
            self.test_ok = True
        _waitMessage_old = Application._waitMessage
887
        Application._waitMessage = _waitMessage_hook
888
        packet = Packets.AskBeginTransaction(None)
889 890 891 892 893
        try:
            app._askPrimary(packet)
        finally:
            Application._waitMessage = _waitMessage_old
        # check packet sent, connection locked during process and dispatcher updated
Grégory Wisniewski's avatar
Grégory Wisniewski committed
894
        self.checkAskNewTid(conn)
895 896
        self.assertEquals(len(conn.mockGetNamedCalls('lock')), 1)
        self.assertEquals(len(conn.mockGetNamedCalls('unlock')), 1)
897
        self.checkDispatcherRegisterCalled(app, conn)
898 899 900 901
        # and _waitMessage called
        self.assertTrue(self.test_ok)
        # check NEOStorageError is raised when the primary connection is lost
        app.master_conn = None
902 903
        # check disabled since we reonnect to pmn
        #self.assertRaises(NEOStorageError, app._askPrimary, packet)
904

905 906 907 908

if __name__ == '__main__':
    unittest.main()