app.py 52.2 KB
Newer Older
1
#
Grégory Wisniewski's avatar
Grégory Wisniewski committed
2
# Copyright (C) 2006-2010  Nexedi SA
3
#
Aurel's avatar
Aurel committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
Aurel's avatar
Aurel committed
9 10 11 12 13 14 15
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
Aurel's avatar
Aurel committed
17

18
from thread import get_ident
19
from cPickle import dumps, loads
20
from zlib import compress as real_compress, decompress
21
from neo.lib.locking import Queue, Empty
22
from random import shuffle
23
import time
24
import os
25

26
from ZODB.POSException import UndoError, StorageTransactionError, ConflictError
27
from ZODB.POSException import ReadConflictError
28
from ZODB.ConflictResolution import ResolvedSerial
29
from persistent.TimeStamp import TimeStamp
30

31 32 33 34 35 36 37 38
import neo.lib
from neo.lib.protocol import NodeTypes, Packets, INVALID_PARTITION, ZERO_TID
from neo.lib.event import EventManager
from neo.lib.util import makeChecksum as real_makeChecksum, dump
from neo.lib.locking import Lock
from neo.lib.connection import MTClientConnection, OnTimeout, ConnectionClosed
from neo.lib.node import NodeManager
from neo.lib.connector import getConnectorHandler
39
from neo.client.exception import NEOStorageError, NEOStorageCreationUndoneError
40
from neo.client.exception import NEOStorageNotFoundError
41
from neo.lib.exception import NeoException
42
from neo.client.handlers import storage, master
43
from neo.lib.dispatcher import Dispatcher, ForgottenPacket
44
from neo.client.poll import ThreadedPoll, psThreadedPoll
45
from neo.client.iterator import Iterator
46
from neo.client.mq import MQ, MQIndex
47
from neo.client.pool import ConnectionPool
48 49 50
from neo.lib.util import u64, parseMasterList
from neo.lib.profiling import profiler_decorator, PROFILING_ENABLED
from neo.lib.live_debug import register as registerLiveDebugger
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65

if PROFILING_ENABLED:
    # Those functions require a "real" python function wrapper before they can
    # be decorated.
    @profiler_decorator
    def compress(data):
        return real_compress(data)

    @profiler_decorator
    def makeChecksum(data):
        return real_makeChecksum(data)
else:
    # If profiling is disabled, directly use original functions.
    compress = real_compress
    makeChecksum = real_makeChecksum
Aurel's avatar
Aurel committed
66

67 68
class ThreadContext(object):

69
    def __init__(self):
70
        super(ThreadContext, self).__setattr__('_threads_dict', {})
71

72
    def __getThreadData(self):
73
        thread_id = get_ident()
74
        try:
Grégory Wisniewski's avatar
Grégory Wisniewski committed
75
            result = self._threads_dict[thread_id]
76
        except KeyError:
77 78
            self.clear(thread_id)
            result = self._threads_dict[thread_id]
79 80 81 82 83 84 85 86
        return result

    def __getattr__(self, name):
        thread_data = self.__getThreadData()
        try:
            return thread_data[name]
        except KeyError:
            raise AttributeError, name
87

88 89 90 91
    def __setattr__(self, name, value):
        thread_data = self.__getThreadData()
        thread_data[name] = value

92 93 94
    def clear(self, thread_id=None):
        if thread_id is None:
            thread_id = get_ident()
95 96 97 98 99
        thread_dict = self._threads_dict.get(thread_id)
        if thread_dict is None:
            queue = Queue(0)
        else:
            queue = thread_dict['queue']
100
        self._threads_dict[thread_id] = {
101 102 103
            'tid': None,
            'txn': None,
            'data_dict': {},
104
            'data_list': [],
105
            'object_base_serial_dict': {},
106 107 108
            'object_serial_dict': {},
            'object_stored_counter_dict': {},
            'conflict_serial_dict': {},
109
            'resolved_conflict_serial_dict': {},
110
            'txn_voted': False,
111
            'queue': queue,
112 113 114 115 116
            'txn_info': 0,
            'history': None,
            'node_tids': {},
            'node_ready': False,
            'asked_object': 0,
117
            'undo_object_tid_dict': {},
118
            'involved_nodes': set(),
119
            'last_transaction': None,
120 121
        }

122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244
class RevisionIndex(MQIndex):
    """
    This cache index allows accessing a specifig revision of a cached object.
    It requires cache key to be a 2-tuple, composed of oid and revision.

    Note: it is expected that rather few revisions are held in cache, with few
    lookups for old revisions, so they are held in a simple sorted list
    Note2: all methods here must be called with cache lock acquired.
    """
    def __init__(self):
        # key: oid
        # value: tid list, from highest to lowest
        self._oid_dict = {}
        # key: oid
        # value: tid list, from lowest to highest
        self._invalidated = {}

    def clear(self):
        self._oid_dict.clear()
        self._invalidated.clear()

    def remove(self, key):
        oid_dict = self._oid_dict
        oid, tid = key
        tid_list = oid_dict[oid]
        tid_list.remove(tid)
        if not tid_list:
            # No more serial known for this object, drop entirely
            del oid_dict[oid]
            self._invalidated.pop(oid, None)

    def add(self, key):
        oid_dict = self._oid_dict
        oid, tid = key
        try:
            serial_list = oid_dict[oid]
        except KeyError:
            serial_list = oid_dict[oid] = []
        else:
            assert tid not in serial_list
        if not(serial_list) or tid > serial_list[0]:
            serial_list.insert(0, tid)
        else:
            serial_list.insert(0, tid)
            serial_list.sort(reverse=True)
        invalidated = self._invalidated
        try:
            tid_list = invalidated[oid]
        except KeyError:
            pass
        else:
            try:
                tid_list.remove(tid)
            except ValueError:
                pass
            else:
                if not tid_list:
                    del invalidated[oid]

    def invalidate(self, oid_list, tid):
        """
        Mark object invalidated by given transaction.
        Must be called with increasing TID values (which is standard for
        ZODB).
        """
        invalidated = self._invalidated
        oid_dict = self._oid_dict
        for oid in (x for x in oid_list if x in oid_dict):
            try:
                tid_list = invalidated[oid]
            except KeyError:
                tid_list = invalidated[oid] = []
            assert not tid_list or tid > tid_list[-1], (dump(oid), dump(tid),
                dump(tid_list[-1]))
            tid_list.append(tid)

    def getSerialBefore(self, oid, tid):
        """
        Get the first tid in cache which value is lower that given tid.
        """
        # WARNING: return-intensive to save on indentation
        oid_list = self._oid_dict.get(oid)
        if oid_list is None:
            # Unknown oid
            return None
        for result in oid_list:
            if result < tid:
                # Candidate found
                break
        else:
            # No candidate in cache.
            return None
        # Check if there is a chance that an intermediate revision would
        # exist, while missing from cache.
        try:
            inv_tid_list = self._invalidated[oid]
        except KeyError:
            return result
        # Remember: inv_tid_list is sorted in ascending order.
        for inv_tid in inv_tid_list:
            if tid < inv_tid:
                # We don't care about invalidations past requested TID.
                break
            elif result < inv_tid < tid:
                # An invalidation was received between candidate revision,
                # and before requested TID: there is a matching revision we
                # don't know of, so we cannot answer.
                return None
        return result

    def getLatestSerial(self, oid):
        """
        Get the latest tid for given object.
        """
        result = self._oid_dict.get(oid)
        if result is not None:
            result = result[0]
            try:
                tid_list = self._invalidated[oid]
            except KeyError:
                pass
            else:
                if result < tid_list[-1]:
Vincent Pelletier's avatar
Vincent Pelletier committed
245 246
                    # An invalidation happened from a transaction later than
                    # our most recent view of this object, so we cannot answer.
247 248 249 250 251 252 253 254
                    result = None
        return result

    def getSerialList(self, oid):
        """
        Get the list of all serials cache knows about for given object.
        """
        return self._oid_dict.get(oid, [])[:]
255

Aurel's avatar
Aurel committed
256
class Application(object):
257 258
    """The client node application."""

Vincent Pelletier's avatar
Vincent Pelletier committed
259 260
    def __init__(self, master_nodes, name, connector=None, compress=True,
            **kw):
261
        # Start polling thread
262
        self.em = EventManager()
263 264
        self.poll_thread = ThreadedPoll(self.em, name=name)
        psThreadedPoll()
265
        # Internal Attributes common to all thread
266
        self._db = None
Aurel's avatar
Aurel committed
267
        self.name = name
268
        self.connector_handler = getConnectorHandler(connector)
269
        self.dispatcher = Dispatcher(self.poll_thread)
270
        self.nm = NodeManager()
271
        self.cp = ConnectionPool(self)
272
        self.pt = None
273
        self.master_conn = None
274
        self.primary_master_node = None
275
        self.trying_master_node = None
276 277

        # load master node list
278
        for address in parseMasterList(master_nodes):
279
            self.nm.createMaster(address=address)
280

281
        # no self-assigned UUID, primary master will supply us one
282
        self.uuid = None
283
        self.mq_cache = MQ()
284 285
        self.cache_revision_index = RevisionIndex()
        self.mq_cache.addIndex(self.cache_revision_index)
286
        self.new_oid_list = []
287
        self.last_oid = '\0' * 8
288
        self.storage_event_handler = storage.StorageEventHandler(self)
289
        self.storage_bootstrap_handler = storage.StorageBootstrapHandler(self)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
290
        self.storage_handler = storage.StorageAnswersHandler(self)
291 292
        self.primary_handler = master.PrimaryAnswersHandler(self)
        self.primary_bootstrap_handler = master.PrimaryBootstrapHandler(self)
293
        self.notifications_handler = master.PrimaryNotificationsHandler( self)
294
        # Internal attribute distinct between thread
295
        self.local_var = ThreadContext()
296
        # Lock definition :
297
        # _load_lock is used to make loading and storing atomic
298
        lock = Lock()
299 300
        self._load_lock_acquire = lock.acquire
        self._load_lock_release = lock.release
301 302
        # _oid_lock is used in order to not call multiple oid
        # generation at the same time
303 304
        lock = Lock()
        self._oid_lock_acquire = lock.acquire
Aurel's avatar
Aurel committed
305
        self._oid_lock_release = lock.release
306
        lock = Lock()
307
        # _cache_lock is used for the client cache
308 309
        self._cache_lock_acquire = lock.acquire
        self._cache_lock_release = lock.release
310
        lock = Lock()
311 312
        # _connecting_to_master_node is used to prevent simultaneous master
        # node connection attemps
313 314
        self._connecting_to_master_node_acquire = lock.acquire
        self._connecting_to_master_node_release = lock.release
315 316 317 318
        # _nm ensure exclusive access to the node manager
        lock = Lock()
        self._nm_acquire = lock.acquire
        self._nm_release = lock.release
319
        self.compress = compress
320
        registerLiveDebugger(on_log=self.log)
321

322 323 324 325 326 327
    def log(self):
        self.em.log()
        self.nm.log()
        if self.pt is not None:
            self.pt.log()

328
    @profiler_decorator
329 330 331 332 333 334 335 336 337 338 339
    def _handlePacket(self, conn, packet, handler=None):
        """
          conn
            The connection which received the packet (forwarded to handler).
          packet
            The packet to handle.
          handler
            The handler to use to handle packet.
            If not given, it will be guessed from connection's not type.
        """
        if handler is None:
340 341
            # Guess the handler to use based on the type of node on the
            # connection
342 343 344 345 346 347 348 349 350 351
            node = self.nm.getByAddress(conn.getAddress())
            if node is None:
                raise ValueError, 'Expecting an answer from a node ' \
                    'which type is not known... Is this right ?'
            if node.isStorage():
                handler = self.storage_handler
            elif node.isMaster():
                handler = self.primary_handler
            else:
                raise ValueError, 'Unknown node type: %r' % (node.__class__, )
352 353 354 355 356
        conn.lock()
        try:
            handler.dispatch(conn, packet)
        finally:
            conn.unlock()
357

358
    @profiler_decorator
359 360 361 362 363 364 365
    def _waitAnyMessage(self, block=True):
        """
          Handle all pending packets.
          block
            If True (default), will block until at least one packet was
            received.
        """
366 367 368
        pending = self.dispatcher.pending
        queue = self.local_var.queue
        get = queue.get
369
        _handlePacket = self._handlePacket
370
        while pending(queue):
371 372 373
            try:
                conn, packet = get(block)
            except Empty:
374
                break
375 376
            if packet is None or isinstance(packet, ForgottenPacket):
                # connection was closed or some packet was forgotten
377
                continue
378 379 380 381 382 383
            block = False
            try:
                _handlePacket(conn, packet)
            except ConnectionClosed:
                pass

384
    @profiler_decorator
385 386 387 388 389 390
    def _waitMessage(self, target_conn, msg_id, handler=None):
        """Wait for a message returned by the dispatcher in queues."""
        get = self.local_var.queue.get
        _handlePacket = self._handlePacket
        while True:
            conn, packet = get(True)
391
            is_forgotten = isinstance(packet, ForgottenPacket)
392 393 394 395 396
            if target_conn is conn:
                # check fake packet
                if packet is None:
                    raise ConnectionClosed
                if msg_id == packet.getId():
397 398 399
                    if is_forgotten:
                        raise ValueError, 'ForgottenPacket for an ' \
                            'explicitely expected packet.'
400
                    _handlePacket(conn, packet, handler=handler)
401
                    break
402
            if not is_forgotten and packet is not None:
403
                _handlePacket(conn, packet)
404

405
    @profiler_decorator
406
    def _askStorage(self, conn, packet):
407
        """ Send a request to a storage node and process it's answer """
408
        msg_id = conn.ask(packet, queue=self.local_var.queue)
409 410
        self._waitMessage(conn, msg_id, self.storage_handler)

411
    @profiler_decorator
412
    def _askPrimary(self, packet):
413
        """ Send a request to the primary master and process it's answer """
414
        conn = self._getMasterConnection()
415
        msg_id = conn.ask(packet, queue=self.local_var.queue)
416 417
        self._waitMessage(conn, msg_id, self.primary_handler)

418
    @profiler_decorator
419 420
    def _getMasterConnection(self):
        """ Connect to the primary master node on demand """
421
        # acquire the lock to allow only one thread to connect to the primary
422 423 424 425
        result = self.master_conn
        if result is None:
            self._connecting_to_master_node_acquire()
            try:
426
                self.new_oid_list = []
427 428 429 430 431
                result = self._connectToPrimaryNode()
                self.master_conn = result
            finally:
                self._connecting_to_master_node_release()
        return result
432

433
    def getPartitionTable(self):
434
        """ Return the partition table manager, reconnect the PMN if needed """
435 436 437 438 439
        # this ensure the master connection is established and the partition
        # table is up to date.
        self._getMasterConnection()
        return self.pt

440
    @profiler_decorator
441
    def _connectToPrimaryNode(self):
442 443 444
        """
            Lookup for the current primary master node
        """
445
        neo.lib.logging.debug('connecting to primary master...')
446 447
        ready = False
        nm = self.nm
448
        queue = self.local_var.queue
449 450 451 452 453 454 455 456
        while not ready:
            # Get network connection to primary master
            index = 0
            connected = False
            while not connected:
                if self.primary_master_node is not None:
                    # If I know a primary master node, pinpoint it.
                    self.trying_master_node = self.primary_master_node
457
                    self.primary_master_node = None
458 459
                else:
                    # Otherwise, check one by one.
460
                    master_list = nm.getMasterList()
461 462 463
                    try:
                        self.trying_master_node = master_list[index]
                    except IndexError:
464
                        time.sleep(1)
465 466 467 468
                        index = 0
                        self.trying_master_node = master_list[0]
                    index += 1
                # Connect to master
469
                conn = MTClientConnection(self.em,
470
                        self.notifications_handler,
471
                        addr=self.trying_master_node.getAddress(),
472
                        connector=self.connector_handler(),
473
                        dispatcher=self.dispatcher)
474
                # Query for primary master node
475 476
                if conn.getConnector() is None:
                    # This happens if a connection could not be established.
477 478
                    neo.lib.logging.error(
                                    'Connection to master node %s failed',
479 480
                                  self.trying_master_node)
                    continue
481
                try:
482
                    msg_id = conn.ask(Packets.AskPrimary(), queue=queue)
483
                    self._waitMessage(conn, msg_id,
484
                            handler=self.primary_bootstrap_handler)
485 486
                except ConnectionClosed:
                    continue
487
                # If we reached the primary master node, mark as connected
488 489
                connected = self.primary_master_node is not None and \
                        self.primary_master_node is self.trying_master_node
490 491
            neo.lib.logging.info(
                            'Connected to %s' % (self.primary_master_node, ))
492 493 494
            try:
                ready = self.identifyToPrimaryNode(conn)
            except ConnectionClosed:
495
                neo.lib.logging.error('Connection to %s lost',
496
                    self.trying_master_node)
497 498
                self.primary_master_node = None
                continue
499
        neo.lib.logging.info("Connected and ready")
500
        return conn
501

502 503 504 505 506 507
    def identifyToPrimaryNode(self, conn):
        """
            Request identification and required informations to be operational.
            Might raise ConnectionClosed so that the new primary can be
            looked-up again.
        """
508
        neo.lib.logging.info('Initializing from master')
509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526
        queue = self.local_var.queue
        # Identify to primary master and request initial data
        while conn.getUUID() is None:
            p = Packets.RequestIdentification(NodeTypes.CLIENT, self.uuid,
                    None, self.name)
            self._waitMessage(conn, conn.ask(p, queue=queue),
                    handler=self.primary_bootstrap_handler)
            if conn.getUUID() is None:
                # Node identification was refused by master, it is considered
                # as the primary as long as we are connected to it.
                time.sleep(1)
        if self.uuid is not None:
            msg_id = conn.ask(Packets.AskNodeInformation(), queue=queue)
            self._waitMessage(conn, msg_id,
                    handler=self.primary_bootstrap_handler)
            msg_id = conn.ask(Packets.AskPartitionTable(), queue=queue)
            self._waitMessage(conn, msg_id,
                    handler=self.primary_bootstrap_handler)
527
        return self.pt.operational()
528

529 530 531
    def registerDB(self, db, limit):
        self._db = db

532 533 534
    def getDB(self):
        return self._db

535
    @profiler_decorator
536 537 538 539 540
    def new_oid(self):
        """Get a new OID."""
        self._oid_lock_acquire()
        try:
            if len(self.new_oid_list) == 0:
541 542 543 544
                # Get new oid list from master node
                # we manage a list of oid here to prevent
                # from asking too many time new oid one by one
                # from master node
545
                self._askPrimary(Packets.AskNewOIDs(100))
546 547
                if len(self.new_oid_list) <= 0:
                    raise NEOStorageError('new_oid failed')
548
            self.last_oid = self.new_oid_list.pop(0)
549
            return self.last_oid
550 551 552
        finally:
            self._oid_lock_release()

553 554 555
    def getStorageSize(self):
        # return the last OID used, this is innacurate
        return int(u64(self.last_oid))
556

557
    @profiler_decorator
Vincent Pelletier's avatar
Vincent Pelletier committed
558
    def load(self, snapshot_tid, oid, serial=None, tid=None):
Vincent Pelletier's avatar
Vincent Pelletier committed
559 560 561
        """
        Internal method which manage load, loadSerial and loadBefore.
        OID and TID (serial) parameters are expected packed.
Vincent Pelletier's avatar
Vincent Pelletier committed
562 563 564
        snapshot_tid
            First TID not visible to current transaction.
            Set to None for no limit.
Vincent Pelletier's avatar
Vincent Pelletier committed
565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584
        oid
            OID of object to get.
        serial
            If given, the exact serial at which OID is desired.
            tid should be None.
        tid
            If given, the excluded upper bound serial at which OID is desired.
            serial should be None.

        Return value: (3-tuple)
        - Object data (None if object creation was undone).
        - Serial of given data.
        - Next serial at which object exists, or None. Only set when tid
          parameter is not None.

        Exceptions:
            NEOStorageError
                technical problem
            NEOStorageNotFoundError
                object exists but no data satisfies given parameters
Vincent Pelletier's avatar
Vincent Pelletier committed
585
            NEOStorageDoesNotExistError
Vincent Pelletier's avatar
Vincent Pelletier committed
586
                object doesn't exist
587 588
            NEOStorageCreationUndoneError
                object existed, but its creation was undone
Vincent Pelletier's avatar
Vincent Pelletier committed
589 590 591
        """
        # TODO:
        # - rename parameters (here and in handlers & packet definitions)
Vincent Pelletier's avatar
Vincent Pelletier committed
592 593 594 595 596 597 598 599 600 601
        if snapshot_tid is not None:
            if serial is None:
                if tid is None:
                    tid = snapshot_tid
                else:
                    tid = min(tid, snapshot_tid)
            # XXX: we must not clamp serial with snapshot_tid, as loadSerial is
            # used during conflict resolution to load object's current version,
            # which is not visible to us normaly (it was committed after our
            # snapshot was taken).
602

603 604
        self._load_lock_acquire()
        try:
605 606 607 608 609 610
            try:
                result = self._loadFromCache(oid, serial, tid)
            except KeyError:
                pass
            else:
                return result
611 612
            data, start_serial, end_serial = self._loadFromStorage(oid, serial,
                tid)
613 614 615 616 617
            self._cache_lock_acquire()
            try:
                self.mq_cache[(oid, start_serial)] = data, end_serial
            finally:
                self._cache_lock_release()
618 619 620 621 622
            if data == '':
                raise NEOStorageCreationUndoneError(dump(oid))
            return data, start_serial, end_serial
        finally:
            self._load_lock_release()
623 624

    @profiler_decorator
625
    def _loadFromStorage(self, oid, at_tid, before_tid):
626
        self.local_var.asked_object = 0
627
        packet = Packets.AskObject(oid, at_tid, before_tid)
628 629 630 631 632
        for node, conn in self.cp.iterateForObject(oid, readable=True):
            try:
                self._askStorage(conn, packet)
            except ConnectionClosed:
                continue
633

634 635 636 637 638
            # Check data
            noid, tid, next_tid, compression, checksum, data \
                = self.local_var.asked_object
            if noid != oid:
                # Oops, try with next node
639
                neo.lib.logging.error('got wrong oid %s instead of %s from %s',
640 641 642 643 644
                    noid, dump(oid), conn)
                self.local_var.asked_object = -1
                continue
            elif checksum != makeChecksum(data):
                # Check checksum.
645
                neo.lib.logging.error('wrong checksum from %s for oid %s',
646 647 648 649
                              conn, dump(oid))
                self.local_var.asked_object = -1
                continue
            break
Aurel's avatar
Aurel committed
650
        if self.local_var.asked_object == -1:
651
            raise NEOStorageError('inconsistent data')
652

653
        # Uncompress data
Aurel's avatar
Aurel committed
654
        if compression:
655
            data = decompress(data)
656
        return data, tid, next_tid
657

658
    @profiler_decorator
659 660 661 662 663
    def _loadFromCache(self, oid, at_tid, before_tid):
        """
        Load from local cache, raising KeyError if not found.
        """
        self._cache_lock_acquire()
664
        try:
665 666 667 668 669 670 671 672 673 674 675 676
            if at_tid is not None:
                tid = at_tid
            elif before_tid is not None:
                tid = self.cache_revision_index.getSerialBefore(oid,
                    before_tid)
            else:
                tid = self.cache_revision_index.getLatestSerial(oid)
            if tid is None:
                raise KeyError
            # Raises KeyError on miss
            data, next_tid = self.mq_cache[(oid, tid)]
            return (data, tid, next_tid)
677
        finally:
678
            self._cache_lock_release()
Aurel's avatar
Aurel committed
679

680
    @profiler_decorator
681 682 683
    def tpc_begin(self, transaction, tid=None, status=' '):
        """Begin a new transaction."""
        # First get a transaction, only one is allowed at a time
684
        if self.local_var.txn is transaction:
685
            # We already begin the same transaction
686
            raise StorageTransactionError('Duplicate tpc_begin calls')
687 688
        if self.local_var.txn is not None:
            raise NeoException, 'local_var is not clean in tpc_begin'
689
        # use the given TID or request a new one to the master
690 691 692 693
        self._askPrimary(Packets.AskBeginTransaction(tid))
        if self.local_var.tid is None:
            raise NEOStorageError('tpc_begin failed')
        assert tid in (None, self.local_var.tid), (tid, self.local_var.tid)
694
        self.local_var.txn = transaction
695

696
    @profiler_decorator
Vincent Pelletier's avatar
Vincent Pelletier committed
697
    def store(self, oid, serial, data, version, transaction):
698
        """Store object."""
699
        if transaction is not self.local_var.txn:
700
            raise StorageTransactionError(self, transaction)
701 702
        neo.lib.logging.debug(
                        'storing oid %s serial %s', dump(oid), dump(serial))
703 704 705
        self._store(oid, serial, data)
        return None

706
    def _store(self, oid, serial, data, data_serial=None, unlock=False):
707 708 709 710
        if data is None:
            # This is some undo: either a no-data object (undoing object
            # creation) or a back-pointer to an earlier revision (going back to
            # an older object revision).
711
            data = compressed_data = ''
712
            compression = 0
713 714
        else:
            assert data_serial is None
715 716
            compression = self.compress
            compressed_data = data
717 718 719 720 721 722 723
            if self.compress:
                compressed_data = compress(data)
                if len(compressed_data) > len(data):
                    compressed_data = data
                    compression = 0
                else:
                    compression = 1
724
        checksum = makeChecksum(compressed_data)
725
        on_timeout = OnTimeout(self.onStoreTimeout, self.local_var.tid, oid)
726
        # Store object in tmp cache
727 728 729 730 731
        local_var = self.local_var
        data_dict = local_var.data_dict
        if oid not in data_dict:
            local_var.data_list.append(oid)
        data_dict[oid] = data
732
        # Store data on each node
733
        self.local_var.object_stored_counter_dict[oid] = {}
734 735 736
        object_base_serial_dict = local_var.object_base_serial_dict
        if oid not in object_base_serial_dict:
            object_base_serial_dict[oid] = serial
737
        self.local_var.object_serial_dict[oid] = serial
738
        queue = self.local_var.queue
739
        add_involved_nodes = self.local_var.involved_nodes.add
740
        packet = Packets.AskStoreObject(oid, serial, compression,
741
            checksum, compressed_data, data_serial, self.local_var.tid, unlock)
742
        for node, conn in self.cp.iterateForObject(oid, writable=True):
743
            try:
744 745
                conn.ask(packet, on_timeout=on_timeout, queue=queue)
                add_involved_nodes(node)
746
            except ConnectionClosed:
747
                continue
748 749
        if not self.local_var.involved_nodes:
            raise NEOStorageError("Store failed")
750

751
        self._waitAnyMessage(False)
752

753
    def onStoreTimeout(self, conn, msg_id, ttid, oid):
754 755 756 757
        # NOTE: this method is called from poll thread, don't use
        # local_var !
        # Stop expecting the timed-out store request.
        queue = self.dispatcher.forget(conn, msg_id)
Vincent Pelletier's avatar
Vincent Pelletier committed
758 759
        # Ask the storage if someone locks the object.
        # Shorten timeout to react earlier to an unresponding storage.
760
        conn.ask(Packets.AskHasLock(ttid, oid), timeout=5, queue=queue)
Vincent Pelletier's avatar
Vincent Pelletier committed
761 762
        return True

763
    @profiler_decorator
764 765 766 767 768 769
    def _handleConflicts(self, tryToResolveConflict):
        result = []
        append = result.append
        local_var = self.local_var
        # Check for conflicts
        data_dict = local_var.data_dict
770
        object_base_serial_dict = local_var.object_base_serial_dict
771
        object_serial_dict = local_var.object_serial_dict
772 773
        conflict_serial_dict = local_var.conflict_serial_dict.copy()
        local_var.conflict_serial_dict.clear()
774
        resolved_conflict_serial_dict = local_var.resolved_conflict_serial_dict
775
        for oid, conflict_serial_set in conflict_serial_dict.iteritems():
776 777 778
            resolved_serial_set = resolved_conflict_serial_dict.setdefault(
                oid, set())
            conflict_serial = max(conflict_serial_set)
Vincent Pelletier's avatar
Vincent Pelletier committed
779 780
            if resolved_serial_set and conflict_serial <= max(
                    resolved_serial_set):
781
                # A later serial has already been resolved, skip.
782
                resolved_serial_set.update(conflict_serial_set)
783
                continue
784
            serial = object_serial_dict[oid]
785 786 787
            data = data_dict[oid]
            tid = local_var.tid
            resolved = False
788 789 790 791 792 793 794 795 796 797 798
            if conflict_serial == ZERO_TID:
                # Storage refused us from taking object lock, to avoid a
                # possible deadlock. TID is actually used for some kind of
                # "locking priority": when a higher value has the lock,
                # this means we stored objects "too late", and we would
                # otherwise cause a deadlock.
                # To recover, we must ask storages to release locks we
                # hold (to let possibly-competing transactions acquire
                # them), and requeue our already-sent store requests.
                # XXX: currently, brute-force is implemented: we send
                # object data again.
799
                neo.lib.logging.info('Deadlock avoidance triggered on %r:%r',
800 801 802 803 804 805 806 807 808 809
                    dump(oid), dump(serial))
                for store_oid, store_data in \
                        local_var.data_dict.iteritems():
                    store_serial = object_serial_dict[store_oid]
                    if store_data is None:
                        self.checkCurrentSerialInTransaction(store_oid,
                            store_serial)
                    else:
                        if store_data is '':
                            # Some undo
810
                            neo.lib.logging.warning('Deadlock avoidance cannot'
811 812 813 814 815 816 817 818
                                ' reliably work with undo, this must be '
                                'implemented.')
                            break
                        self._store(store_oid, store_serial, store_data,
                            unlock=True)
                else:
                    resolved = True
            elif data is not None:
819 820 821
                new_data = tryToResolveConflict(oid, conflict_serial,
                    serial, data)
                if new_data is not None:
822
                    neo.lib.logging.info('Conflict resolution succeed for ' \
823 824 825
                        '%r:%r with %r', dump(oid), dump(serial),
                        dump(conflict_serial))
                    # Mark this conflict as resolved
826
                    resolved_serial_set.update(conflict_serial_set)
827 828
                    # Base serial changes too, as we resolved a conflict
                    object_base_serial_dict[oid] = conflict_serial
829 830 831 832
                    # Try to store again
                    self._store(oid, conflict_serial, new_data)
                    append(oid)
                    resolved = True
833
                else:
834
                    neo.lib.logging.info('Conflict resolution failed for ' \
835 836
                        '%r:%r with %r', dump(oid), dump(serial),
                        dump(conflict_serial))
837 838 839
            if not resolved:
                # XXX: Is it really required to remove from data_dict ?
                del data_dict[oid]
840
                local_var.data_list.remove(oid)
841 842 843 844 845 846 847
                if data is None:
                    exc = ReadConflictError(oid=oid, serials=(conflict_serial,
                        serial))
                else:
                    exc = ConflictError(oid=oid, serials=(tid, serial),
                        data=data)
                raise exc
848
        return result
849

850 851 852 853 854 855 856 857 858 859
    @profiler_decorator
    def waitResponses(self):
        """Wait for all requests to be answered (or their connection to be
        dected as closed)"""
        queue = self.local_var.queue
        pending = self.dispatcher.pending
        _waitAnyMessage = self._waitAnyMessage
        while pending(queue):
            _waitAnyMessage()

860
    @profiler_decorator
861 862 863 864 865 866 867 868
    def waitStoreResponses(self, tryToResolveConflict):
        result = []
        append = result.append
        resolved_oid_set = set()
        update = resolved_oid_set.update
        local_var = self.local_var
        tid = local_var.tid
        _handleConflicts = self._handleConflicts
869 870 871 872 873 874 875 876 877 878
        conflict_serial_dict = local_var.conflict_serial_dict
        queue = local_var.queue
        pending = self.dispatcher.pending
        _waitAnyMessage = self._waitAnyMessage
        while pending(queue) or conflict_serial_dict:
            _waitAnyMessage()
            if conflict_serial_dict:
                conflicts = _handleConflicts(tryToResolveConflict)
                if conflicts:
                    update(conflicts)
879

Vincent Pelletier's avatar
Vincent Pelletier committed
880
        # Check for never-stored objects, and update result for all others
881
        for oid, store_dict in \
Vincent Pelletier's avatar
Vincent Pelletier committed
882
            local_var.object_stored_counter_dict.iteritems():
883
            if not store_dict:
884
                neo.lib.logging.error('tpc_store failed')
Vincent Pelletier's avatar
Vincent Pelletier committed
885 886 887
                raise NEOStorageError('tpc_store failed')
            elif oid in resolved_oid_set:
                append((oid, ResolvedSerial))
888
        return result
Aurel's avatar
Aurel committed
889

890
    @profiler_decorator
891
    def tpc_vote(self, transaction, tryToResolveConflict):
892
        """Store current transaction."""
893 894
        local_var = self.local_var
        if transaction is not local_var.txn:
895
            raise StorageTransactionError(self, transaction)
896 897 898

        result = self.waitStoreResponses(tryToResolveConflict)

899
        tid = local_var.tid
900
        # Store data on each node
901
        txn_stored_counter = 0
902
        packet = Packets.AskStoreTransaction(tid, str(transaction.user),
903
            str(transaction.description), dumps(transaction._extension),
904
            local_var.data_list)
905
        add_involved_nodes = self.local_var.involved_nodes.add
906
        for node, conn in self.cp.iterateForObject(tid, writable=True):
907
            neo.lib.logging.debug("voting object %s on %s", dump(tid),
908
                dump(conn.getUUID()))
909
            try:
910 911
                self._askStorage(conn, packet)
                add_involved_nodes(node)
912
            except ConnectionClosed:
913
                continue
914
            txn_stored_counter += 1
915 916

        # check at least one storage node accepted
917
        if txn_stored_counter == 0:
918
            neo.lib.logging.error('tpc_vote failed')
919
            raise NEOStorageError('tpc_vote failed')
920 921 922 923 924
        # Check if master connection is still alive.
        # This is just here to lower the probability of detecting a problem
        # in tpc_finish, as we should do our best to detect problem before
        # tpc_finish.
        self._getMasterConnection()
925

926
        local_var.txn_voted = True
927 928
        return result

929
    @profiler_decorator
930 931
    def tpc_abort(self, transaction):
        """Abort current transaction."""
932
        if transaction is not self.local_var.txn:
933
            return
Aurel's avatar
Aurel committed
934

935 936
        tid = self.local_var.tid
        p = Packets.AbortTransaction(tid)
937
        getConnForNode = self.cp.getConnForNode
938
        # cancel transaction one all those nodes
939
        for node in self.local_var.involved_nodes:
940
            conn = getConnForNode(node)
941 942
            if conn is None:
                continue
943 944 945
            try:
                conn.notify(p)
            except:
946 947 948 949
                neo.lib.logging.error(
                    'Exception in tpc_abort while notifying' \
                    'storage node %r of abortion, ignoring.',
                    conn, exc_info=1)
950
        self._getMasterConnection().notify(p)
951

952 953 954 955 956 957 958 959 960 961
        # Just wait for responses to arrive. If any leads to an exception,
        # log it and continue: we *must* eat all answers to not disturb the
        # next transaction.
        queue = self.local_var.queue
        pending = self.dispatcher.pending
        _waitAnyMessage = self._waitAnyMessage
        while pending(queue):
            try:
                _waitAnyMessage()
            except:
962 963 964 965
                neo.lib.logging.error(
                    'Exception in tpc_abort while' \
                    'handling pending answers, ignoring.',
                    exc_info=1)
966

967
        self.local_var.clear()
968

969
    @profiler_decorator
970
    def tpc_finish(self, transaction, tryToResolveConflict, f=None):
971
        """Finish current transaction."""
972 973
        local_var = self.local_var
        if local_var.txn is not transaction:
974
            raise StorageTransactionError('tpc_finish called for wrong '
975
                'transaction')
976
        if not local_var.txn_voted:
977
            self.tpc_vote(transaction, tryToResolveConflict)
978
        self._load_lock_acquire()
979
        try:
980 981 982 983 984 985
            # Call finish on master
            oid_list = local_var.data_list
            p = Packets.AskFinishTransaction(local_var.tid, oid_list)
            self._askPrimary(p)

            # From now on, self.local_var.tid holds the "real" TID.
986
            tid = local_var.tid
987 988
            # Call function given by ZODB
            if f is not None:
989
                f(tid)
990 991 992 993

            # Update cache
            self._cache_lock_acquire()
            try:
994
                mq_cache = self.mq_cache
995 996 997 998 999 1000
                update = mq_cache.update
                def updateNextSerial(value):
                    data, next_tid = value
                    assert next_tid is None, (dump(oid), dump(base_tid),
                        dump(next_tid))
                    return (data, tid)
1001
                get_baseTID = local_var.object_base_serial_dict.get
1002
                for oid, data in local_var.data_dict.iteritems():
1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013
                    if data is None:
                        # this is just a remain of
                        # checkCurrentSerialInTransaction call, ignore (no data
                        # was modified).
                        continue
                    # Update ex-latest value in cache
                    base_tid = get_baseTID(oid)
                    try:
                        update((oid, base_tid), updateNextSerial)
                    except KeyError:
                        pass
1014
                    if data == '':
1015
                        self.cache_revision_index.invalidate([oid], tid)
1016
                    else:
1017 1018
                        # Store in cache with no next_tid
                        mq_cache[(oid, tid)] = (data, None)
1019 1020
            finally:
                self._cache_lock_release()
1021
            local_var.clear()
1022
            return tid
1023
        finally:
1024
            self._load_lock_release()
1025

Vincent Pelletier's avatar
Vincent Pelletier committed
1026
    def undo(self, snapshot_tid, undone_tid, txn, tryToResolveConflict):
1027
        if txn is not self.local_var.txn:
1028
            raise StorageTransactionError(self, undone_tid)
1029

1030
        txn_info, txn_ext = self._getTransactionInformation(undone_tid)
1031
        txn_oid_list = txn_info['oids']
1032

1033 1034
        # Regroup objects per partition, to ask a minimum set of storage.
        partition_oid_dict = {}
1035
        pt = self.getPartitionTable()
1036
        for oid in txn_oid_list:
1037
            partition = pt.getPartition(oid)
1038 1039 1040 1041 1042 1043 1044 1045 1046 1047
            try:
                oid_list = partition_oid_dict[partition]
            except KeyError:
                oid_list = partition_oid_dict[partition] = []
            oid_list.append(oid)

        # Ask storage the undo serial (serial at which object's previous data
        # is)
        getCellList = pt.getCellList
        getCellSortKey = self.cp.getCellSortKey
1048
        getConnForCell = self.cp.getConnForCell
1049
        queue = self.local_var.queue
1050 1051 1052 1053 1054
        undo_object_tid_dict = self.local_var.undo_object_tid_dict = {}
        for partition, oid_list in partition_oid_dict.iteritems():
            cell_list = getCellList(partition, readable=True)
            shuffle(cell_list)
            cell_list.sort(key=getCellSortKey)
1055
            storage_conn = getConnForCell(cell_list[0])
1056
            storage_conn.ask(Packets.AskObjectUndoSerial(self.local_var.tid,
Vincent Pelletier's avatar
Vincent Pelletier committed
1057
                snapshot_tid, undone_tid, oid_list), queue=queue)
1058 1059 1060 1061 1062 1063 1064

        # Wait for all AnswerObjectUndoSerial. We might get OidNotFoundError,
        # meaning that objects in transaction's oid_list do not exist any
        # longer. This is the symptom of a pack, so forbid undoing transaction
        # when it happens, but sill keep waiting for answers.
        failed = False
        while True:
1065
            try:
1066 1067 1068
                self.waitResponses()
            except NEOStorageNotFoundError:
                failed = True
1069
            else:
1070 1071 1072
                break
        if failed:
            raise UndoError('non-undoable transaction')
1073

1074
        # Send undo data to all storage nodes.
1075
        for oid in txn_oid_list:
1076 1077 1078 1079 1080 1081 1082 1083
            current_serial, undo_serial, is_current = undo_object_tid_dict[oid]
            if is_current:
                data = None
            else:
                # Serial being undone is not the latest version for this
                # object. This is an undo conflict, try to resolve it.
                try:
                    # Load the latest version we are supposed to see
Vincent Pelletier's avatar
Vincent Pelletier committed
1084
                    data = self.load(snapshot_tid, oid, serial=current_serial)[0]
1085
                    # Load the version we were undoing to
Vincent Pelletier's avatar
Vincent Pelletier committed
1086
                    undo_data = self.load(snapshot_tid, oid, serial=undo_serial)[0]
1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100
                except NEOStorageNotFoundError:
                    raise UndoError('Object not found while resolving undo '
                        'conflict')
                # Resolve conflict
                try:
                    data = tryToResolveConflict(oid, current_serial,
                        undone_tid, undo_data, data)
                except ConflictError:
                    data = None
                if data is None:
                    raise UndoError('Some data were modified by a later ' \
                        'transaction', oid)
                undo_serial = None
            self._store(oid, current_serial, data, undo_serial)
1101

1102 1103 1104 1105
    def _insertMetadata(self, txn_info, extension):
        for k, v in loads(extension).items():
            txn_info[k] = v

1106
    def _getTransactionInformation(self, tid):
1107
        packet = Packets.AskTransactionInformation(tid)
1108
        for node, conn in self.cp.iterateForObject(tid, readable=True):
1109 1110 1111 1112 1113 1114 1115 1116 1117 1118
            try:
                self._askStorage(conn, packet)
            except ConnectionClosed:
                continue
            except NEOStorageNotFoundError:
                # TID not found
                continue
            break
        else:
            raise NEOStorageError('Transaction %r not found' % (tid, ))
1119 1120 1121 1122
        return (self.local_var.txn_info, self.local_var.txn_ext)

    def undoLog(self, first, last, filter=None, block=0):
        # XXX: undoLog is broken
1123 1124 1125 1126
        if last < 0:
            # See FileStorage.py for explanation
            last = first - last

1127
        # First get a list of transactions from all storage nodes.
1128 1129
        # Each storage node will return TIDs only for UP_TO_DATE state and
        # FEEDING state cells
1130
        pt = self.getPartitionTable()
1131
        storage_node_list = pt.getNodeList()
1132

1133
        self.local_var.node_tids = {}
1134
        queue = self.local_var.queue
1135
        for storage_node in storage_node_list:
1136
            conn = self.cp.getConnForNode(storage_node)
1137 1138
            if conn is None:
                continue
1139
            conn.ask(Packets.AskTIDs(first, last, INVALID_PARTITION), queue=queue)
1140 1141

        # Wait for answers from all storages.
1142
        self.waitResponses()
1143 1144

        # Reorder tids
1145 1146 1147
        ordered_tids = set()
        update = ordered_tids.update
        for tid_list in self.local_var.node_tids.itervalues():
1148
            update(tid_list)
1149
        ordered_tids = list(ordered_tids)
1150
        ordered_tids.sort(reverse=True)
1151 1152
        neo.lib.logging.debug(
                        "UndoLog tids %s", [dump(x) for x in ordered_tids])
1153 1154
        # For each transaction, get info
        undo_info = []
1155
        append = undo_info.append
1156
        for tid in ordered_tids:
1157
            (txn_info, txn_ext) = self._getTransactionInformation(tid)
1158
            if filter is None or filter(self.local_var.txn_info):
1159 1160
                txn_info = self.local_var.txn_info
                txn_info.pop('packed')
1161 1162
                txn_info.pop("oids")
                self._insertMetadata(txn_info, self.local_var.txn_ext)
1163
                append(txn_info)
1164 1165
                if len(undo_info) >= last - first:
                    break
1166 1167 1168
        # Check we return at least one element, otherwise call
        # again but extend offset
        if len(undo_info) == 0 and not block:
1169 1170
            undo_info = self.undoLog(first=first, last=last*5, filter=filter,
                    block=1)
1171 1172
        return undo_info

1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201
    def transactionLog(self, start, stop, limit):
        node_map = self.pt.getNodeMap()
        node_list = node_map.keys()
        node_list.sort(key=self.cp.getCellSortKey)
        partition_set = set(range(self.pt.getPartitions()))
        queue = self.local_var.queue
        # request a tid list for each partition
        self.local_var.tids_from = set()
        for node in node_list:
            conn = self.cp.getConnForNode(node)
            request_set = set(node_map[node]) & partition_set
            if conn is None or not request_set:
                continue
            partition_set -= set(request_set)
            packet = Packets.AskTIDsFrom(start, stop, limit, request_set)
            conn.ask(packet, queue=queue)
            if not partition_set:
                break
        assert not partition_set
        self.waitResponses()
        # request transactions informations
        txn_list = []
        append = txn_list.append
        tid = None
        for tid in sorted(self.local_var.tids_from):
            (txn_info, txn_ext) = self._getTransactionInformation(tid)
            txn_info['ext'] = loads(self.local_var.txn_ext)
            append(txn_info)
        return (tid, txn_list)
1202

1203
    def history(self, oid, version=None, size=1, filter=None):
1204
        # Get history informations for object first
1205
        packet = Packets.AskObjectHistory(oid, 0, size)
1206
        for node, conn in self.cp.iterateForObject(oid, readable=True):
1207 1208
            # FIXME: we keep overwriting self.local_var.history here, we
            # should aggregate it instead.
1209
            self.local_var.history = None
1210
            try:
1211
                self._askStorage(conn, packet)
1212
            except ConnectionClosed:
1213
                continue
1214

1215 1216
            if self.local_var.history[0] != oid:
                # Got history for wrong oid
1217
                raise NEOStorageError('inconsistency in storage: asked oid ' \
1218
                      '%r, got %r' % (oid, self.local_var.history[0]))
1219

1220
        if not isinstance(self.local_var.history, tuple):
1221 1222 1223
            raise NEOStorageError('history failed')

        # Now that we have object informations, get txn informations
Aurel's avatar
Aurel committed
1224
        history_list = []
1225
        for serial, size in self.local_var.history[1]:
1226
            txn_info, txn_ext = self._getTransactionInformation(serial)
1227
            # create history dict
1228 1229 1230 1231 1232 1233 1234 1235 1236
            txn_info.pop('id')
            txn_info.pop('oids')
            txn_info.pop('packed')
            txn_info['tid'] = serial
            txn_info['version'] = ''
            txn_info['size'] = size
            if filter is None or filter(txn_info):
                history_list.append(txn_info)
            self._insertMetadata(txn_info, txn_ext)
1237 1238

        return history_list
Aurel's avatar
Aurel committed
1239

1240
    @profiler_decorator
1241
    def importFrom(self, source, start, stop, tryToResolveConflict):
1242
        serials = {}
1243
        transaction_iter = source.iterator(start, stop)
1244
        for transaction in transaction_iter:
1245 1246
            tid = transaction.tid
            self.tpc_begin(transaction, tid, transaction.status)
1247
            for r in transaction:
1248 1249
                oid = r.oid
                pre = serials.get(oid, None)
1250
                # TODO: bypass conflict resolution, locks...
1251 1252 1253 1254 1255 1256
                self.store(oid, pre, r.data, r.version, transaction)
                serials[oid] = tid
            conflicted = self.tpc_vote(transaction, tryToResolveConflict)
            assert not conflicted, conflicted
            real_tid = self.tpc_finish(transaction, tryToResolveConflict)
            assert real_tid == tid, (real_tid, tid)
1257 1258
        transaction_iter.close()

1259 1260 1261
    def iterator(self, start, stop):
        if start is None:
            start = ZERO_TID
1262 1263
        return Iterator(self, start, stop)

1264
    def lastTransaction(self):
1265
        self._askPrimary(Packets.AskLastTransaction())
1266
        return self.local_var.last_transaction
1267

1268 1269 1270 1271 1272 1273 1274 1275 1276 1277
    def abortVersion(self, src, transaction):
        if transaction is not self.local_var.txn:
            raise StorageTransactionError(self, transaction)
        return '', []

    def commitVersion(self, src, dest, transaction):
        if transaction is not self.local_var.txn:
            raise StorageTransactionError(self, transaction)
        return '', []

1278 1279 1280 1281 1282
    def __del__(self):
        """Clear all connection."""
        # Due to bug in ZODB, close is not always called when shutting
        # down zope, so use __del__ to close connections
        for conn in self.em.getConnectionList():
1283
            conn.close()
1284 1285
        self.cp.flush()
        self.master_conn = None
1286
        # Stop polling thread
1287
        neo.lib.logging.debug('Stopping %s', self.poll_thread)
1288
        self.poll_thread.stop()
1289
        psThreadedPoll()
1290
    close = __del__
1291

1292 1293 1294
    def invalidationBarrier(self):
        self._askPrimary(Packets.AskBarrier())

1295 1296 1297 1298 1299 1300 1301 1302 1303
    def setNodeReady(self):
        self.local_var.node_ready = True

    def setNodeNotReady(self):
        self.local_var.node_ready = False

    def isNodeReady(self):
        return self.local_var.node_ready

1304
    def setTID(self, value):
1305
        self.local_var.tid = value
1306 1307

    def getTID(self):
1308
        return self.local_var.tid
1309

1310 1311 1312 1313 1314
    def pack(self, t):
        tid = repr(TimeStamp(*time.gmtime(t)[:5] + (t % 60, )))
        if tid == ZERO_TID:
            raise NEOStorageError('Invalid pack time')
        self._askPrimary(Packets.AskPack(tid))
1315 1316 1317 1318 1319 1320 1321 1322 1323
        # XXX: this is only needed to make ZODB unit tests pass.
        # It should not be otherwise required (clients should be free to load
        # old data as long as it is available in cache, event if it was pruned
        # by a pack), so don't bother invalidating on other clients.
        self._cache_lock_acquire()
        try:
            self.mq_cache.clear()
        finally:
            self._cache_lock_release()
1324

1325
    def getLastTID(self, oid):
Vincent Pelletier's avatar
Vincent Pelletier committed
1326
        return self.load(None, oid)[1]
1327 1328

    def checkCurrentSerialInTransaction(self, oid, serial, transaction):
1329 1330
        local_var = self.local_var
        if transaction is not local_var.txn:
1331
              raise StorageTransactionError(self, transaction)
1332 1333
        local_var.object_serial_dict[oid] = serial
        # Placeholders
1334
        queue = local_var.queue
1335 1336 1337 1338 1339 1340
        local_var.object_stored_counter_dict[oid] = {}
        data_dict = local_var.data_dict
        if oid not in data_dict:
            # Marker value so we don't try to resolve conflicts.
            data_dict[oid] = None
            local_var.data_list.append(oid)
1341
        packet = Packets.AskCheckCurrentSerial(local_var.tid, serial, oid)
1342
        for node, conn in self.cp.iterateForObject(oid, writable=True):
1343
            try:
1344
                conn.ask(packet, queue=queue)
1345 1346 1347 1348
            except ConnectionClosed:
                continue

        self._waitAnyMessage(False)
1349