testClient.py 10.9 KB
Newer Older
1
#
Julien Muchembled's avatar
Julien Muchembled committed
2
# Copyright (C) 2009-2017  Nexedi SA
3 4 5 6 7 8 9 10 11 12 13 14
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

17
import os
18 19 20
import unittest
import transaction
import ZODB
Olivier Cros's avatar
Olivier Cros committed
21 22
import socket

23
from struct import pack
24
from neo.lib.util import makeChecksum, u64
25
from ZODB.FileStorage import FileStorage
26
from ZODB.POSException import ConflictError
27
from ZODB.tests.StorageTestBase import zodb_pickle
28
from persistent import Persistent
29
from . import NEOCluster, NEOFunctionalTest
30

31 32 33 34 35 36 37 38 39 40 41 42 43
TREE_SIZE = 6

class Tree(Persistent):
    """ A simple binary tree """

    def __init__(self, depth):
        self.depth = depth
        if depth <= 0:
            return
        depth -= 1
        self.right = Tree(depth)
        self.left = Tree(depth)

44

Julien Muchembled's avatar
Julien Muchembled committed
45
# simple persistent object with conflict resolution
46 47 48 49 50 51 52 53 54 55
class PCounter(Persistent):

    _value = 0

    def value(self):
        return self._value

    def inc(self):
        self._value += 1

56 57 58

class PCounterWithResolution(PCounter):

59 60 61 62
    def _p_resolveConflict(self, old, saved, new):
        new['_value'] = saved['_value'] + new['_value']
        return new

63 64
class PObject(Persistent):
    pass
65 66 67 68 69 70

class ClientTests(NEOFunctionalTest):

    def setUp(self):
        NEOFunctionalTest.setUp(self)
        self.neo = NEOCluster(
71
            ['test_neo1', 'test_neo2', 'test_neo3', 'test_neo4'],
72
            partitions=3,
73
            replicas=2,
74
            master_count=1,
75 76 77
            temp_dir=self.getTempDirectory()
        )

78
    def _tearDown(self, success):
79 80
        self.neo.stop()
        del self.neo
81
        NEOFunctionalTest._tearDown(self, success)
82

83 84 85 86 87 88 89 90 91 92
    def __setup(self):
        # start cluster
        self.neo.setupDB()
        self.neo.start()
        self.neo.expectClusterRunning()
        self.db = ZODB.DB(self.neo.getZODBStorage())

    def makeTransaction(self):
        # create a transaction a get the root object
        txn = transaction.TransactionManager()
93 94
        conn = self.db.open(transaction_manager=txn)
        return (txn, conn)
95

96
    def testConflictResolutionTriggered1(self):
97 98
        """ Check that ConflictError is raised on write conflict """
        # create the initial objects
99
        self.__setup()
100 101
        t, c = self.makeTransaction()
        c.root()['without_resolution'] = PCounter()
102 103
        t.commit()

104
        # first with no conflict resolution
105 106 107 108
        t1, c1 = self.makeTransaction()
        t2, c2 = self.makeTransaction()
        o1 = c1.root()['without_resolution']
        o2 = c2.root()['without_resolution']
109 110
        self.assertEqual(o1.value(), 0)
        self.assertEqual(o2.value(), 0)
111 112 113
        o1.inc()
        o2.inc()
        o2.inc()
114 115 116 117 118
        t1.commit()
        self.assertEqual(o1.value(), 1)
        self.assertEqual(o2.value(), 2)
        self.assertRaises(ConflictError, t2.commit)

119 120 121 122
    def testIsolationAtZopeLevel(self):
        """ Check transaction isolation within zope connection """
        self.__setup()
        t, c = self.makeTransaction()
Vincent Pelletier's avatar
Vincent Pelletier committed
123 124 125
        root = c.root()
        root['item'] = 0
        root['other'] = 'bla'
126 127 128
        t.commit()
        t1, c1 = self.makeTransaction()
        t2, c2 = self.makeTransaction()
Vincent Pelletier's avatar
Vincent Pelletier committed
129 130
        # Makes c2 take a snapshot of database state
        c2.root()['other']
131 132
        c1.root()['item'] = 1
        t1.commit()
Julien Muchembled's avatar
Julien Muchembled committed
133
        # load object from zope cache
134 135 136 137 138 139 140
        self.assertEqual(c1.root()['item'], 1)
        self.assertEqual(c2.root()['item'], 0)

    def testIsolationWithoutZopeCache(self):
        """ Check isolation with zope cache cleared """
        self.__setup()
        t, c = self.makeTransaction()
Vincent Pelletier's avatar
Vincent Pelletier committed
141 142 143
        root = c.root()
        root['item'] = 0
        root['other'] = 'bla'
144 145 146
        t.commit()
        t1, c1 = self.makeTransaction()
        t2, c2 = self.makeTransaction()
Vincent Pelletier's avatar
Vincent Pelletier committed
147 148
        # Makes c2 take a snapshot of database state
        c2.root()['other']
149 150 151 152 153 154 155 156
        c1.root()['item'] = 1
        t1.commit()
        # clear zope cache to force re-ask NEO
        c1.cacheMinimize()
        c2.cacheMinimize()
        self.assertEqual(c1.root()['item'], 1)
        self.assertEqual(c2.root()['item'], 0)

157 158 159 160 161 162 163 164 165 166 167 168 169
    def __checkTree(self, tree, depth=TREE_SIZE):
        self.assertTrue(isinstance(tree, Tree))
        self.assertEqual(depth, tree.depth)
        depth -= 1
        if depth <= 0:
            return
        self.__checkTree(tree.right, depth)
        self.__checkTree(tree.left, depth)

    def __getDataFS(self, reset=False):
        name = os.path.join(self.getTempDirectory(), 'data.fs')
        if reset and os.path.exists(name):
            os.remove(name)
170
        return FileStorage(file_name=name)
171

172 173 174 175 176 177 178 179
    def __populate(self, db, tree_size=TREE_SIZE):
        if isinstance(db.storage, FileStorage):
            from base64 import b64encode as undo_tid
        else:
            undo_tid = lambda x: x
        def undo(tid=None):
            db.undo(undo_tid(tid or db.lastTransaction()))
            transaction.commit()
180 181 182
        conn = db.open()
        root = conn.root()
        root['trees'] = Tree(tree_size)
183 184 185
        ob = root['trees'].right
        left = ob.left
        del ob.left
186
        transaction.commit()
187 188 189 190 191 192 193 194 195 196 197 198
        ob._p_changed = 1
        transaction.commit()
        t2 = db.lastTransaction()
        ob.left = left
        transaction.commit()
        undo()
        t4 = db.lastTransaction()
        undo(t2)
        undo()
        undo(t4)
        undo()
        undo()
199 200 201 202 203
        conn.close()

    def testImport(self):

        # source database
204 205
        dfs_storage  = self.__getDataFS()
        dfs_db = ZODB.DB(dfs_storage)
206 207 208 209 210 211 212 213
        self.__populate(dfs_db)

        # create a neo storage
        self.neo.start()
        neo_storage = self.neo.getZODBStorage()

        # copy data fs to neo
        neo_storage.copyTransactionsFrom(dfs_storage, verbose=0)
214
        dfs_db.close()
215 216 217 218 219

        # check neo content
        (neo_db, neo_conn) = self.neo.getZODBConnection()
        self.__checkTree(neo_conn.root()['trees'])

220 221 222 223 224 225
    def __dump(self, storage):
        return {u64(t.tid): [(u64(o.oid), o.data_txn and u64(o.data_txn),
                              None if o.data is None else makeChecksum(o.data))
                             for o in t]
                for t in storage.iterator()}

226
    def testExport(self):
227 228 229 230

        # create a neo storage
        self.neo.start()
        (neo_db, neo_conn) = self.neo.getZODBConnection()
231
        self.__populate(neo_db)
232
        dump = self.__dump(neo_db.storage)
233 234

        # copy neo to data fs
235
        dfs_storage  = self.__getDataFS(reset=True)
236
        neo_storage = self.neo.getZODBStorage()
237
        dfs_storage.copyTransactionsFrom(neo_storage)
238 239

        # check data fs content
240 241
        dfs_db = ZODB.DB(dfs_storage)
        root = dfs_db.open().root()
242 243

        self.__checkTree(root['trees'])
244 245 246 247 248 249 250 251 252 253 254 255
        dfs_db.close()
        self.neo.stop()

        self.neo = NEOCluster(db_list=['test_neo1'], partitions=3,
            importer=[("root", {
                "storage": "<filestorage>\npath %s\n</filestorage>"
                            % dfs_storage.getName()})],
            temp_dir=self.getTempDirectory())
        self.neo.start()
        neo_db, neo_conn = self.neo.getZODBConnection()
        self.__checkTree(neo_conn.root()['trees'])
        self.assertEqual(dump, self.__dump(neo_db.storage))
256

Olivier Cros's avatar
Olivier Cros committed
257 258
    def testIPv6Client(self):
        """ Test the connectivity of an IPv6 connection for neo client """
259

Olivier Cros's avatar
Olivier Cros committed
260
        def test():
261
            """
Olivier Cros's avatar
Olivier Cros committed
262 263 264
            Implement the IPv6Client test
            """
            self.neo = NEOCluster(['test_neo1'], replicas=0,
265
                temp_dir = self.getTempDirectory(),
Olivier Cros's avatar
Olivier Cros committed
266 267 268 269 270 271
                address_type = socket.AF_INET6
                )
            self.neo.start()
            db1, conn1 = self.neo.getZODBConnection()
            db2, conn2 = self.neo.getZODBConnection()
        self.runWithTimeout(40, test)
272

273 274 275 276 277 278 279 280 281 282 283 284 285 286
    def testDelayedLocksCancelled(self):
        """
            Hold a lock on an object, try to get another lock on the same
            object to delay it. Then cancel the second transaction and check
            that the lock is not hold when the first transaction ends
        """
        def test():
            self.neo = NEOCluster(['test_neo1'], replicas=0,
                temp_dir=self.getTempDirectory())
            self.neo.start()
            db1, conn1 = self.neo.getZODBConnection()
            db2, conn2 = self.neo.getZODBConnection()
            st1, st2 = conn1._storage, conn2._storage
            t1, t2 = transaction.Transaction(), transaction.Transaction()
287 288
            t1.user = t2.user = u'user'
            t1.description = t2.description = u'desc'
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305
            oid = st1.new_oid()
            rev = '\0' * 8
            data = zodb_pickle(PObject())
            st1.tpc_begin(t1)
            st2.tpc_begin(t2)
            # t1 own the lock
            st1.store(oid, rev, data, '', t1)
            # t2 store is delayed
            st2.store(oid, rev, data, '', t2)
            # cancel t2, should cancel the store too
            st2.tpc_abort(t2)
            # finish t1, should release the lock
            st1.tpc_vote(t1)
            st1.tpc_finish(t1)
            db3, conn3 = self.neo.getZODBConnection()
            st3 = conn3._storage
            t3 = transaction.Transaction()
306 307
            t3.user = u'user'
            t3.description = u'desc'
308
            st3.tpc_begin(t3)
Julien Muchembled's avatar
Julien Muchembled committed
309
            # retrieve the last revision
310
            data, serial = st3.load(oid)
311 312 313 314 315
            # try to store again, should not be delayed
            st3.store(oid, serial, data, '', t3)
            # the vote should not timeout
            st3.tpc_vote(t3)
            st3.tpc_finish(t3)
316
        self.runWithTimeout(10, test)
317

318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339
    def testGreaterOIDSaved(self):
        """
            Store an object with an OID greater than the last generated by the
            master. This OID must be intercepted at commit, used for next OID
            generations and persistently saved on storage nodes.
        """
        self.neo.start()
        db1, conn1 = self.neo.getZODBConnection()
        st1 = conn1._storage
        t1 = transaction.Transaction()
        rev = '\0' * 8
        data = zodb_pickle(PObject())
        my_oid = pack('!Q', 100000)
        # store an object with this OID
        st1.tpc_begin(t1)
        st1.store(my_oid, rev, data, '', t1)
        st1.tpc_vote(t1)
        st1.tpc_finish(t1)
        # request an oid, should be greater than mine
        oid = st1.new_oid()
        self.assertTrue(oid > my_oid)

340 341 342 343 344 345
def test_suite():
    return unittest.makeSuite(ClientTests)

if __name__ == "__main__":
    unittest.main(defaultTest="test_suite")