app.py 41.5 KB
Newer Older
1
#
Julien Muchembled's avatar
Julien Muchembled committed
2
# Copyright (C) 2006-2016  Nexedi SA
3
#
Aurel's avatar
Aurel committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
Aurel's avatar
Aurel committed
9 10 11 12 13 14
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
Aurel's avatar
Aurel committed
16

17
from cPickle import dumps, loads
Vincent Pelletier's avatar
Vincent Pelletier committed
18
from zlib import compress, decompress
Julien Muchembled's avatar
Julien Muchembled committed
19
from random import shuffle
20
import heapq
21
import time
22
import weakref
23
from functools import partial
24

25
from ZODB.POSException import UndoError, StorageTransactionError, ConflictError
26
from ZODB.POSException import ReadConflictError
27
from ZODB.ConflictResolution import ResolvedSerial
28
from persistent.TimeStamp import TimeStamp
29

30
from neo.lib import logging
31
from neo.lib.protocol import NodeTypes, Packets, \
32
    INVALID_PARTITION, MAX_TID, ZERO_HASH, ZERO_TID
33
from neo.lib.event import EventManager
Vincent Pelletier's avatar
Vincent Pelletier committed
34
from neo.lib.util import makeChecksum, dump
35
from neo.lib.locking import Empty, Lock, SimpleQueue
36
from neo.lib.connection import MTClientConnection, ConnectionClosed
37
from neo.lib.node import NodeManager
38 39 40
from .exception import NEOStorageError, NEOStorageCreationUndoneError
from .exception import NEOStorageNotFoundError
from .handlers import storage, master
41
from neo.lib.dispatcher import Dispatcher, ForgottenPacket
42
from neo.lib.threaded_app import ThreadedApplication
43 44
from .cache import ClientCache
from .pool import ConnectionPool
45
from neo.lib.util import p64, u64, parseMasterList
46
from neo.lib.debug import register as registerLiveDebugger
47

48
CHECKED_SERIAL = object()
49

50 51 52 53 54 55 56
try:
    from Signals.Signals import SignalHandler
except ImportError:
    SignalHandler = None
if SignalHandler:
    import signal
    SignalHandler.registerHandler(signal.SIGUSR2, logging.reopen)
57

58

59
class TransactionContainer(dict):
60

61 62
    def pop(self, txn):
        return dict.pop(self, id(txn), None)
63

64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
    def get(self, txn):
        try:
            return self[id(txn)]
        except KeyError:
            raise StorageTransactionError("unknown transaction %r" % txn)

    def new(self, txn):
        key = id(txn)
        if key in self:
            raise StorageTransactionError("commit of transaction %r"
                                          " already started" % txn)
        context = self[key] = {
            'queue': SimpleQueue(),
            'txn': txn,
            'ttid': None,
            'data_dict': {},
            'data_size': 0,
            'cache_dict': {},
            'cache_size': 0,
            'object_base_serial_dict': {},
            'object_serial_dict': {},
            'object_stored_counter_dict': {},
            'conflict_serial_dict': {},
            'resolved_conflict_serial_dict': {},
            'involved_nodes': set(),
89
            'checked_nodes': set(),
90 91 92 93 94
        }
        return context


class Application(ThreadedApplication):
95 96
    """The client node application."""

97
    def __init__(self, master_nodes, name, compress=True, **kw):
98
        super(Application, self).__init__(parseMasterList(master_nodes),
99
                                          name, **kw)
100
        # Internal Attributes common to all thread
101
        self._db = None
102
        self.cp = ConnectionPool(self)
103
        self.primary_master_node = None
104
        self.trying_master_node = None
105

106
        # no self-assigned UUID, primary master will supply us one
107
        self._cache = ClientCache()
108
        self._loading_oid = None
109
        self.new_oid_list = ()
110
        self.last_oid = '\0' * 8
111
        self.last_tid = None
112
        self.storage_event_handler = storage.StorageEventHandler(self)
113
        self.storage_bootstrap_handler = storage.StorageBootstrapHandler(self)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
114
        self.storage_handler = storage.StorageAnswersHandler(self)
115 116
        self.primary_handler = master.PrimaryAnswersHandler(self)
        self.primary_bootstrap_handler = master.PrimaryBootstrapHandler(self)
117
        self.notifications_handler = master.PrimaryNotificationsHandler( self)
118
        self._txn_container = TransactionContainer()
119
        # Lock definition :
120
        # _load_lock is used to make loading and storing atomic
121
        lock = Lock()
122 123
        self._load_lock_acquire = lock.acquire
        self._load_lock_release = lock.release
124 125
        # _oid_lock is used in order to not call multiple oid
        # generation at the same time
126 127
        lock = Lock()
        self._oid_lock_acquire = lock.acquire
Aurel's avatar
Aurel committed
128
        self._oid_lock_release = lock.release
129
        lock = Lock()
130
        # _cache_lock is used for the client cache
131 132
        self._cache_lock_acquire = lock.acquire
        self._cache_lock_release = lock.release
133 134
        # _connecting_to_master_node is used to prevent simultaneous master
        # node connection attemps
135
        self._connecting_to_master_node = Lock()
136
        self.compress = compress
137

138 139 140 141 142
    def __getattr__(self, attr):
        if attr == 'pt':
            self._getMasterConnection()
        return self.__getattribute__(attr)

143 144 145 146
    def log(self):
        super(Application, self).log()
        logging.info("%r", self._cache)

147 148 149 150 151
    @property
    def txn_contexts(self):
        # do not iter lazily to avoid race condition
        return self._txn_container.values

152
    def _waitAnyMessage(self, queue, block=True):
153 154 155 156 157 158
        """
          Handle all pending packets.
          block
            If True (default), will block until at least one packet was
            received.
        """
159 160
        pending = self.dispatcher.pending
        get = queue.get
161
        _handlePacket = self._handlePacket
162
        while pending(queue):
163
            try:
164
                conn, packet, kw = get(block)
165
            except Empty:
166
                break
167 168
            if packet is None or isinstance(packet, ForgottenPacket):
                # connection was closed or some packet was forgotten
169
                continue
170 171
            block = False
            try:
172
                _handlePacket(conn, packet, kw)
173 174 175
            except ConnectionClosed:
                pass

176 177 178 179 180 181 182 183 184 185 186 187 188
    def _waitAnyTransactionMessage(self, txn_context, block=True):
        """
        Just like _waitAnyMessage, but for per-transaction exchanges, rather
        than per-thread.
        """
        queue = txn_context['queue']
        self.setHandlerData(txn_context)
        try:
            self._waitAnyMessage(queue, block=block)
        finally:
            # Don't leave access to thread context, even if a raise happens.
            self.setHandlerData(None)

189
    def _askStorage(self, conn, packet, **kw):
Vincent Pelletier's avatar
Vincent Pelletier committed
190
        """ Send a request to a storage node and process its answer """
191
        return self._ask(conn, packet, handler=self.storage_handler, **kw)
192

193
    def _askPrimary(self, packet, **kw):
Vincent Pelletier's avatar
Vincent Pelletier committed
194
        """ Send a request to the primary master and process its answer """
195
        return self._ask(self._getMasterConnection(), packet,
196
            handler=self.primary_handler, **kw)
197

198 199
    def _getMasterConnection(self):
        """ Connect to the primary master node on demand """
200
        # For performance reasons, get 'master_conn' without locking.
201 202
        result = self.master_conn
        if result is None:
203 204 205 206 207
            # If not connected, 'master_conn' must be tested again while we have
            # the lock, to avoid concurrent threads reconnecting.
            with self._connecting_to_master_node:
                result = self.master_conn
                if result is None:
208
                    self.new_oid_list = ()
209
                    result = self.master_conn = self._connectToPrimaryNode()
210
        return result
211

212
    def _connectToPrimaryNode(self):
213 214 215
        """
            Lookup for the current primary master node
        """
216
        logging.debug('connecting to primary master...')
217
        self.start()
218
        index = -1
219 220 221
        ask = self._ask
        handler = self.primary_bootstrap_handler
        while 1:
222
            # Get network connection to primary master
223
            while 1:
224 225 226
                if self.primary_master_node is not None:
                    # If I know a primary master node, pinpoint it.
                    self.trying_master_node = self.primary_master_node
227
                    self.primary_master_node = None
228 229
                else:
                    # Otherwise, check one by one.
230
                    master_list = self.nm.getMasterList()
231 232
                    index = (index + 1) % len(master_list)
                    self.trying_master_node = master_list[index]
233
                # Connect to master
234
                conn = MTClientConnection(self,
235
                        self.notifications_handler,
236
                        node=self.trying_master_node,
237
                        dispatcher=self.dispatcher)
238
                try:
239
                    ask(conn, Packets.RequestIdentification(
240
                            NodeTypes.CLIENT, self.uuid, None, self.name),
241
                        handler=handler)
242 243
                except ConnectionClosed:
                    continue
244
                # If we reached the primary master node, mark as connected
245 246 247
                if self.primary_master_node is not None and \
                   self.primary_master_node is self.trying_master_node:
                    break
248
            logging.info('Connected to %s', self.primary_master_node)
249
            try:
250 251 252 253 254 255 256 257 258
                # Request identification and required informations to be
                # operational. Might raise ConnectionClosed so that the new
                # primary can be looked-up again.
                logging.info('Initializing from master')
                ask(conn, Packets.AskNodeInformation(), handler=handler)
                ask(conn, Packets.AskPartitionTable(), handler=handler)
                ask(conn, Packets.AskLastTransaction(), handler=handler)
                if self.pt.operational():
                    break
259
            except ConnectionClosed:
260
                logging.error('Connection to %s lost', self.trying_master_node)
261
                self.primary_master_node = None
262
        logging.info("Connected and ready")
263
        return conn
264

265 266 267
    def registerDB(self, db, limit):
        self._db = db

268 269 270
    def getDB(self):
        return self._db

271 272 273 274
    def new_oid(self):
        """Get a new OID."""
        self._oid_lock_acquire()
        try:
275
            if not self.new_oid_list:
276 277 278 279
                # Get new oid list from master node
                # we manage a list of oid here to prevent
                # from asking too many time new oid one by one
                # from master node
280
                self._askPrimary(Packets.AskNewOIDs(100))
281
                if not self.new_oid_list:
282
                    raise NEOStorageError('new_oid failed')
283 284
            self.last_oid = oid = self.new_oid_list.pop()
            return oid
285 286 287
        finally:
            self._oid_lock_release()

Julien Muchembled's avatar
Julien Muchembled committed
288 289
    def getObjectCount(self):
        # return the last OID used, this is inaccurate
290
        return int(u64(self.last_oid))
291

292
    def load(self, oid, tid=None, before_tid=None):
Vincent Pelletier's avatar
Vincent Pelletier committed
293 294 295 296 297 298
        """
        Internal method which manage load, loadSerial and loadBefore.
        OID and TID (serial) parameters are expected packed.
        oid
            OID of object to get.
        tid
299 300 301
            If given, the exact serial at which OID is desired.
            before_tid should be None.
        before_tid
Vincent Pelletier's avatar
Vincent Pelletier committed
302 303 304 305 306 307 308 309 310 311 312 313 314 315
            If given, the excluded upper bound serial at which OID is desired.
            serial should be None.

        Return value: (3-tuple)
        - Object data (None if object creation was undone).
        - Serial of given data.
        - Next serial at which object exists, or None. Only set when tid
          parameter is not None.

        Exceptions:
            NEOStorageError
                technical problem
            NEOStorageNotFoundError
                object exists but no data satisfies given parameters
Vincent Pelletier's avatar
Vincent Pelletier committed
316
            NEOStorageDoesNotExistError
Vincent Pelletier's avatar
Vincent Pelletier committed
317
                object doesn't exist
318 319
            NEOStorageCreationUndoneError
                object existed, but its creation was undone
320 321 322 323

        Note that loadSerial is used during conflict resolution to load
        object's current version, which is not visible to us normaly (it was
        committed after our snapshot was taken).
Vincent Pelletier's avatar
Vincent Pelletier committed
324 325
        """
        # TODO:
326
        # - rename parameters (here? and in handlers & packet definitions)
327

328 329
        acquire = self._cache_lock_acquire
        release = self._cache_lock_release
330
        # XXX: Consider using a more fine-grained lock.
331 332
        self._load_lock_acquire()
        try:
333 334 335 336 337
            acquire()
            try:
                result = self._loadFromCache(oid, tid, before_tid)
                if result:
                    return result
338 339
                self._loading_oid = oid
            finally:
340
                release()
341 342 343 344 345 346 347
            # When not bound to a ZODB Connection, load() may be the
            # first method called and last_tid may still be None.
            # This happens, for example, when opening the DB.
            if not (tid or before_tid) and self.last_tid:
                # Do not get something more recent than the last invalidation
                # we got from master.
                before_tid = p64(u64(self.last_tid) + 1)
348
            data, tid, next_tid, _ = self._loadFromStorage(oid, tid, before_tid)
349 350
            acquire()
            try:
351 352 353 354 355 356 357 358 359
                if self._loading_oid:
                    # Common case (no race condition).
                    self._cache.store(oid, data, tid, next_tid)
                elif self._loading_invalidated:
                    # oid has just been invalidated.
                    if not next_tid:
                        next_tid = self._loading_invalidated
                    self._cache.store(oid, data, tid, next_tid)
                # Else, we just reconnected to the master.
360 361
            finally:
                release()
362 363
        finally:
            self._load_lock_release()
364
        return data, tid, next_tid
365

366 367
    def _loadFromStorage(self, oid, at_tid, before_tid):
        packet = Packets.AskObject(oid, at_tid, before_tid)
368 369
        for node, conn in self.cp.iterateForObject(oid, readable=True):
            try:
370
                tid, next_tid, compression, checksum, data, data_tid \
371
                    = self._askStorage(conn, packet)
372 373
            except ConnectionClosed:
                continue
374

375
            if data or checksum != ZERO_HASH:
376
                if checksum != makeChecksum(data):
377
                    logging.error('wrong checksum from %s for oid %s',
378
                              conn, dump(oid))
379
                    continue
380 381
                return (decompress(data) if compression else data,
                        tid, next_tid, data_tid)
382
            raise NEOStorageCreationUndoneError(dump(oid))
383
        raise NEOStorageError("storage down or corrupted data")
384

385
    def _loadFromCache(self, oid, at_tid=None, before_tid=None):
386
        """
387
        Load from local cache, return None if not found.
388
        """
389 390 391 392 393
        if at_tid:
            result = self._cache.load(oid, at_tid + '*')
            assert not result or result[1] == at_tid
            return result
        return self._cache.load(oid, before_tid)
Aurel's avatar
Aurel committed
394

395 396 397
    def tpc_begin(self, transaction, tid=None, status=' '):
        """Begin a new transaction."""
        # First get a transaction, only one is allowed at a time
398
        txn_context = self._txn_container.new(transaction)
399
        # use the given TID or request a new one to the master
400 401
        answer_ttid = self._askPrimary(Packets.AskBeginTransaction(tid))
        if answer_ttid is None:
402
            raise NEOStorageError('tpc_begin failed')
403 404
        assert tid in (None, answer_ttid), (tid, answer_ttid)
        txn_context['ttid'] = answer_ttid
405

Vincent Pelletier's avatar
Vincent Pelletier committed
406
    def store(self, oid, serial, data, version, transaction):
407
        """Store object."""
408
        logging.debug('storing oid %s serial %s', dump(oid), dump(serial))
409
        self._store(self._txn_container.get(transaction), oid, serial, data)
410

411 412 413
    def _store(self, txn_context, oid, serial, data, data_serial=None,
            unlock=False):
        ttid = txn_context['ttid']
414 415 416 417
        if data is None:
            # This is some undo: either a no-data object (undoing object
            # creation) or a back-pointer to an earlier revision (going back to
            # an older object revision).
418
            compressed_data = ''
419
            compression = 0
420
            checksum = ZERO_HASH
421 422
        else:
            assert data_serial is None
423
            size = len(data)
424 425
            if self.compress:
                compressed_data = compress(data)
426
                if size < len(compressed_data):
427 428 429 430
                    compressed_data = data
                    compression = 0
                else:
                    compression = 1
431 432 433
            else:
                compression = 0
                compressed_data = data
434
            checksum = makeChecksum(compressed_data)
435
            txn_context['data_size'] += size
436 437 438 439 440
        on_timeout = partial(
            self.onStoreTimeout,
            txn_context=txn_context,
            oid=oid,
        )
441
        # Store object in tmp cache
442
        txn_context['data_dict'][oid] = data
443
        # Store data on each node
444
        txn_context['object_stored_counter_dict'][oid] = {}
445
        txn_context['object_base_serial_dict'].setdefault(oid, serial)
446 447 448 449
        txn_context['object_serial_dict'][oid] = serial
        queue = txn_context['queue']
        involved_nodes = txn_context['involved_nodes']
        add_involved_nodes = involved_nodes.add
450
        packet = Packets.AskStoreObject(oid, serial, compression,
451
            checksum, compressed_data, data_serial, ttid, unlock)
452
        for node, conn in self.cp.iterateForObject(oid):
453
            try:
454 455
                conn.ask(packet, on_timeout=on_timeout, queue=queue)
                add_involved_nodes(node)
456
            except ConnectionClosed:
457
                continue
458
        if not involved_nodes:
459
            raise NEOStorageError("Store failed")
460

461 462
        while txn_context['data_size'] >= self._cache._max_size:
            self._waitAnyTransactionMessage(txn_context)
463
        self._waitAnyTransactionMessage(txn_context, False)
464

465
    def onStoreTimeout(self, conn, msg_id, txn_context, oid):
466
        # NOTE: this method is called from poll thread, don't use
467 468
        #       thread-specific value !
        txn_context.setdefault('timeout_dict', {})[oid] = msg_id
Vincent Pelletier's avatar
Vincent Pelletier committed
469
        # Ask the storage if someone locks the object.
470 471 472 473
        # By sending a message with a smaller timeout,
        # the connection will be kept open.
        conn.ask(Packets.AskHasLock(txn_context['ttid'], oid),
                 timeout=5, queue=txn_context['queue'])
Vincent Pelletier's avatar
Vincent Pelletier committed
474

475
    def _handleConflicts(self, txn_context, tryToResolveConflict):
476 477 478
        result = []
        append = result.append
        # Check for conflicts
479 480 481 482 483 484 485
        data_dict = txn_context['data_dict']
        object_base_serial_dict = txn_context['object_base_serial_dict']
        object_serial_dict = txn_context['object_serial_dict']
        conflict_serial_dict = txn_context['conflict_serial_dict'].copy()
        txn_context['conflict_serial_dict'].clear()
        resolved_conflict_serial_dict = txn_context[
            'resolved_conflict_serial_dict']
486
        for oid, conflict_serial_set in conflict_serial_dict.iteritems():
487
            conflict_serial = max(conflict_serial_set)
488
            serial = object_serial_dict[oid]
489
            if ZERO_TID in conflict_serial_set:
490 491
              if 1:
                # XXX: disable deadlock avoidance code until it is fixed
492
                logging.info('Deadlock avoidance on %r:%r',
493
                    dump(oid), dump(serial))
494 495 496 497 498 499 500
                # 'data' parameter of ConflictError is only used to report the
                # class of the object. It doesn't matter if 'data' is None
                # because the transaction is too big.
                try:
                    data = data_dict[oid]
                except KeyError:
                    data = txn_context['cache_dict'][oid]
501
              else:
502 503 504 505 506 507 508 509 510 511
                # Storage refused us from taking object lock, to avoid a
                # possible deadlock. TID is actually used for some kind of
                # "locking priority": when a higher value has the lock,
                # this means we stored objects "too late", and we would
                # otherwise cause a deadlock.
                # To recover, we must ask storages to release locks we
                # hold (to let possibly-competing transactions acquire
                # them), and requeue our already-sent store requests.
                # XXX: currently, brute-force is implemented: we send
                # object data again.
512
                # WARNING: not maintained code
513
                logging.info('Deadlock avoidance triggered on %r:%r',
514
                    dump(oid), dump(serial))
515
                for store_oid, store_data in data_dict.iteritems():
516
                    store_serial = object_serial_dict[store_oid]
517
                    if store_data is CHECKED_SERIAL:
518 519
                        self._checkCurrentSerialInTransaction(txn_context,
                            store_oid, store_serial)
520
                    else:
521
                        if store_data is None:
522
                            # Some undo
523 524
                            logging.warning('Deadlock avoidance cannot reliably'
                                ' work with undo, this must be implemented.')
525
                            conflict_serial = ZERO_TID
526
                            break
527 528
                        self._store(txn_context, store_oid, store_serial,
                            store_data, unlock=True)
529
                else:
530
                    continue
531 532 533 534 535
            else:
                data = data_dict.pop(oid)
                if data is CHECKED_SERIAL:
                    raise ReadConflictError(oid=oid, serials=(conflict_serial,
                        serial))
536 537
                # TODO: data can be None if a conflict happens during undo
                if data:
538
                    txn_context['data_size'] -= len(data)
539 540 541 542 543 544 545
                resolved_serial_set = resolved_conflict_serial_dict.setdefault(
                    oid, set())
                if resolved_serial_set and conflict_serial <= max(
                        resolved_serial_set):
                    # A later serial has already been resolved, skip.
                    resolved_serial_set.update(conflict_serial_set)
                    continue
546 547 548 549
                try:
                    new_data = tryToResolveConflict(oid, conflict_serial,
                        serial, data)
                except ConflictError:
550 551 552 553
                    logging.info('Conflict resolution failed for '
                        '%r:%r with %r', dump(oid), dump(serial),
                        dump(conflict_serial))
                else:
554
                    logging.info('Conflict resolution succeeded for '
555 556 557
                        '%r:%r with %r', dump(oid), dump(serial),
                        dump(conflict_serial))
                    # Mark this conflict as resolved
558
                    resolved_serial_set.update(conflict_serial_set)
559 560
                    # Base serial changes too, as we resolved a conflict
                    object_base_serial_dict[oid] = conflict_serial
561
                    # Try to store again
562
                    self._store(txn_context, oid, conflict_serial, new_data)
563
                    append(oid)
564
                    continue
565 566
            # With recent ZODB, get_pickle_metadata (from ZODB.utils) does
            # not support empty values, so do not pass 'data' in this case.
567
            raise ConflictError(oid=oid, serials=(conflict_serial,
568
                serial), data=data or None)
569
        return result
570

571
    def waitResponses(self, queue):
572
        """Wait for all requests to be answered (or their connection to be
Vincent Pelletier's avatar
Vincent Pelletier committed
573
        detected as closed)"""
574 575 576
        pending = self.dispatcher.pending
        _waitAnyMessage = self._waitAnyMessage
        while pending(queue):
577
            _waitAnyMessage(queue)
578

579
    def waitStoreResponses(self, txn_context, tryToResolveConflict):
580 581 582 583 584
        result = []
        append = result.append
        resolved_oid_set = set()
        update = resolved_oid_set.update
        _handleConflicts = self._handleConflicts
585 586
        queue = txn_context['queue']
        conflict_serial_dict = txn_context['conflict_serial_dict']
587
        pending = self.dispatcher.pending
588
        _waitAnyTransactionMessage = self._waitAnyTransactionMessage
589
        while pending(queue) or conflict_serial_dict:
590 591 592
            # Note: handler data can be overwritten by _handleConflicts
            # so we must set it for each iteration.
            _waitAnyTransactionMessage(txn_context)
593
            if conflict_serial_dict:
594 595
                conflicts = _handleConflicts(txn_context,
                    tryToResolveConflict)
596 597
                if conflicts:
                    update(conflicts)
598

Vincent Pelletier's avatar
Vincent Pelletier committed
599
        # Check for never-stored objects, and update result for all others
600
        for oid, store_dict in \
601
                txn_context['object_stored_counter_dict'].iteritems():
602
            if not store_dict:
603
                logging.error('tpc_store failed')
Vincent Pelletier's avatar
Vincent Pelletier committed
604 605 606
                raise NEOStorageError('tpc_store failed')
            elif oid in resolved_oid_set:
                append((oid, ResolvedSerial))
607
        return result
Aurel's avatar
Aurel committed
608

609
    def tpc_vote(self, transaction, tryToResolveConflict):
610
        """Store current transaction."""
611 612
        txn_context = self._txn_container.get(transaction)
        result = self.waitStoreResponses(txn_context, tryToResolveConflict)
613

614
        ttid = txn_context['ttid']
615
        # Store data on each node
616
        assert not txn_context['data_dict'], txn_context
617
        packet = Packets.AskStoreTransaction(ttid, str(transaction.user),
618
            str(transaction.description), dumps(transaction._extension),
619
            txn_context['cache_dict'])
620 621
        queue = txn_context['queue']
        trans_nodes = []
622
        for node, conn in self.cp.iterateForObject(ttid):
623
            logging.debug("voting transaction %s on %s", dump(ttid),
624
                dump(conn.getUUID()))
625
            try:
626
                conn.ask(packet, queue=queue)
627
            except ConnectionClosed:
628
                continue
629
            trans_nodes.append(node)
630
        # check at least one storage node accepted
631 632 633 634 635 636 637 638 639 640 641 642
        if trans_nodes:
            involved_nodes = txn_context['involved_nodes']
            packet = Packets.AskVoteTransaction(ttid)
            for node in involved_nodes.difference(trans_nodes):
                conn = self.cp.getConnForNode(node)
                if conn is not None:
                    try:
                        conn.ask(packet, queue=queue)
                    except ConnectionClosed:
                        pass
            involved_nodes.update(trans_nodes)
            self.waitResponses(queue)
643 644 645 646 647 648 649 650
            txn_context['voted'] = None
            # We must not go further if connection to master was lost since
            # tpc_begin, to lower the probability of failing during tpc_finish.
            if 'error' in txn_context:
                raise NEOStorageError(txn_context['error'])
            return result
        logging.error('tpc_vote failed')
        raise NEOStorageError('tpc_vote failed')
651

652 653
    def tpc_abort(self, transaction):
        """Abort current transaction."""
654
        txn_context = self._txn_container.pop(transaction)
655
        if txn_context is None:
656
            return
657 658 659 660 661 662 663 664 665 666 667 668
        p = Packets.AbortTransaction(txn_context['ttid'])
        # cancel transaction on all those nodes
        nodes = map(self.cp.getConnForNode,
            txn_context['involved_nodes'] |
            txn_context['checked_nodes'])
        nodes.append(self.master_conn)
        for conn in nodes:
            if conn is not None:
                try:
                    conn.notify(p)
                except ConnectionClosed:
                    pass
669 670 671
        # We don't need to flush queue, as it won't be reused by future
        # transactions (deleted on next line & indexed by transaction object
        # instance).
672
        self.dispatcher.forget_queue(txn_context['queue'], flush_queue=False)
673

674
    def tpc_finish(self, transaction, tryToResolveConflict, f=None):
675 676 677 678 679 680 681
        """Finish current transaction

        To avoid inconsistencies between several databases involved in the
        same transaction, an IStorage implementation must do its best not to
        fail in tpc_finish. In particular, making a transaction permanent
        should ideally be as simple as switching a bit permanently.

682 683 684 685 686 687 688 689
        In NEO, all the data (with the exception of the tid, simply because
        it is not known yet) is already flushed on disk at the end on the vote.
        During tpc_finish, all nodes storing the transaction metadata are asked
        to commit by saving the new tid and flushing again: for SQL backends,
        it's just an UPDATE of 1 cell. At last, the metadata is moved to
        a final place so that the new transaction is readable, but this is
        something that can always be replayed (during the verification phase)
        if any failure happens.
690
        """
691
        txn_container = self._txn_container
692
        if 'voted' not in txn_container.get(transaction):
693
            self.tpc_vote(transaction, tryToResolveConflict)
694
        checked_list = []
695
        self._load_lock_acquire()
696
        try:
697
            # Call finish on master
698
            txn_context = txn_container.pop(transaction)
699
            cache_dict = txn_context['cache_dict']
700 701 702 703
            checked_list = [oid for oid, data  in cache_dict.iteritems()
                                if data is CHECKED_SERIAL]
            for oid in checked_list:
                del cache_dict[oid]
704 705 706 707 708 709 710 711 712
            ttid = txn_context['ttid']
            p = Packets.AskFinishTransaction(ttid, cache_dict, checked_list)
            try:
                tid = self._askPrimary(p, cache_dict=cache_dict, callback=f)
                assert tid
            except ConnectionClosed:
                tid = self._getFinalTID(ttid)
                if not tid:
                    raise
713
            return tid
714
        finally:
715
            self._load_lock_release()
716

717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740
    def _getFinalTID(self, ttid):
        try:
            p = Packets.AskFinalTID(ttid)
            while 1:
                try:
                    tid = self._askPrimary(p)
                    break
                except ConnectionClosed:
                    pass
            if tid == MAX_TID:
                while 1:
                    for _, conn in self.cp.iterateForObject(
                            ttid, readable=True):
                        try:
                            return self._askStorage(conn, p)
                        except ConnectionClosed:
                            pass
                    self._getMasterConnection()
            elif tid:
                return tid
        except Exception:
            logging.exception("Failed to get final tid for TXN %s",
                              dump(ttid))

741
    def undo(self, undone_tid, txn, tryToResolveConflict):
742
        txn_context = self._txn_container.get(txn)
743
        txn_info, txn_ext = self._getTransactionInformation(undone_tid)
744
        txn_oid_list = txn_info['oids']
745

746 747
        # Regroup objects per partition, to ask a minimum set of storage.
        partition_oid_dict = {}
748
        for oid in txn_oid_list:
749
            partition = self.pt.getPartition(oid)
750 751 752 753 754 755 756 757
            try:
                oid_list = partition_oid_dict[partition]
            except KeyError:
                oid_list = partition_oid_dict[partition] = []
            oid_list.append(oid)

        # Ask storage the undo serial (serial at which object's previous data
        # is)
758
        getCellList = self.pt.getCellList
759
        getCellSortKey = self.cp.getCellSortKey
760
        getConnForCell = self.cp.getConnForCell
761
        queue = self._thread_container.queue
762
        ttid = txn_context['ttid']
763
        undo_object_tid_dict = {}
764
        snapshot_tid = p64(u64(self.last_tid) + 1)
765 766
        for partition, oid_list in partition_oid_dict.iteritems():
            cell_list = getCellList(partition, readable=True)
Julien Muchembled's avatar
Julien Muchembled committed
767 768 769 770
            # We do want to shuffle before getting one with the smallest
            # key, so that all cells with the same (smallest) key has
            # identical chance to be chosen.
            shuffle(cell_list)
771
            storage_conn = getConnForCell(min(cell_list, key=getCellSortKey))
772
            storage_conn.ask(Packets.AskObjectUndoSerial(ttid,
773 774
                snapshot_tid, undone_tid, oid_list),
                queue=queue, undo_object_tid_dict=undo_object_tid_dict)
775 776 777 778

        # Wait for all AnswerObjectUndoSerial. We might get OidNotFoundError,
        # meaning that objects in transaction's oid_list do not exist any
        # longer. This is the symptom of a pack, so forbid undoing transaction
Vincent Pelletier's avatar
Vincent Pelletier committed
779 780
        # when it happens.
        try:
781
            self.waitResponses(queue)
Vincent Pelletier's avatar
Vincent Pelletier committed
782 783
        except NEOStorageNotFoundError:
            self.dispatcher.forget_queue(queue)
784
            raise UndoError('non-undoable transaction')
785

786
        # Send undo data to all storage nodes.
787
        for oid in txn_oid_list:
788 789 790 791 792 793 794 795
            current_serial, undo_serial, is_current = undo_object_tid_dict[oid]
            if is_current:
                data = None
            else:
                # Serial being undone is not the latest version for this
                # object. This is an undo conflict, try to resolve it.
                try:
                    # Load the latest version we are supposed to see
796
                    data = self.load(oid, current_serial)[0]
797
                    # Load the version we were undoing to
798
                    undo_data = self.load(oid, undo_serial)[0]
799 800 801 802 803 804 805 806 807 808 809
                except NEOStorageNotFoundError:
                    raise UndoError('Object not found while resolving undo '
                        'conflict')
                # Resolve conflict
                try:
                    data = tryToResolveConflict(oid, current_serial,
                        undone_tid, undo_data, data)
                except ConflictError:
                    raise UndoError('Some data were modified by a later ' \
                        'transaction', oid)
                undo_serial = None
810
            self._store(txn_context, oid, current_serial, data, undo_serial)
811

812
        return None, txn_oid_list
813

814 815 816 817
    def _insertMetadata(self, txn_info, extension):
        for k, v in loads(extension).items():
            txn_info[k] = v

818
    def _getTransactionInformation(self, tid):
819
        packet = Packets.AskTransactionInformation(tid)
820
        for node, conn in self.cp.iterateForObject(tid, readable=True):
821
            try:
822
                txn_info, txn_ext = self._askStorage(conn, packet)
823 824 825 826 827 828 829 830
            except ConnectionClosed:
                continue
            except NEOStorageNotFoundError:
                # TID not found
                continue
            break
        else:
            raise NEOStorageError('Transaction %r not found' % (tid, ))
831
        return (txn_info, txn_ext)
832 833 834

    def undoLog(self, first, last, filter=None, block=0):
        # XXX: undoLog is broken
835 836 837 838
        if last < 0:
            # See FileStorage.py for explanation
            last = first - last

839
        # First get a list of transactions from all storage nodes.
840 841
        # Each storage node will return TIDs only for UP_TO_DATE state and
        # FEEDING state cells
842
        queue = self._thread_container.queue
843
        packet = Packets.AskTIDs(first, last, INVALID_PARTITION)
844
        tid_set = set()
845
        for storage_node in self.pt.getNodeSet(True):
846
            conn = self.cp.getConnForNode(storage_node)
847 848
            if conn is None:
                continue
849
            conn.ask(packet, queue=queue, tid_set=tid_set)
850 851

        # Wait for answers from all storages.
852
        self.waitResponses(queue)
853 854

        # Reorder tids
855
        ordered_tids = sorted(tid_set, reverse=True)
856
        logging.debug("UndoLog tids %s", map(dump, ordered_tids))
857 858
        # For each transaction, get info
        undo_info = []
859
        append = undo_info.append
860
        for tid in ordered_tids:
861
            (txn_info, txn_ext) = self._getTransactionInformation(tid)
862
            if filter is None or filter(txn_info):
863
                txn_info.pop('packed')
864
                txn_info.pop("oids")
865
                self._insertMetadata(txn_info, txn_ext)
866
                append(txn_info)
867 868
                if len(undo_info) >= last - first:
                    break
869 870 871
        # Check we return at least one element, otherwise call
        # again but extend offset
        if len(undo_info) == 0 and not block:
872 873
            undo_info = self.undoLog(first=first, last=last*5, filter=filter,
                    block=1)
874 875
        return undo_info

876
    def transactionLog(self, start, stop, limit):
877
        tid_list = []
878
        # request a tid list for each partition
879
        for offset in xrange(self.pt.getPartitions()):
880
            p = Packets.AskTIDsFrom(start, stop, limit, offset)
881 882 883 884 885 886 887 888 889 890 891 892 893
            for node, conn in self.cp.iterateForObject(offset, readable=True):
                try:
                    r = self._askStorage(conn, p)
                    break
                except ConnectionClosed:
                    pass
            else:
                raise NEOStorageError('transactionLog failed')
            if r:
                tid_list = list(heapq.merge(tid_list, r))
                if len(tid_list) >= limit:
                    del tid_list[limit:]
                    stop = tid_list[-1]
894 895 896 897
        # request transactions informations
        txn_list = []
        append = txn_list.append
        tid = None
898
        for tid in tid_list:
899
            (txn_info, txn_ext) = self._getTransactionInformation(tid)
900
            txn_info['ext'] = loads(txn_ext)
901 902
            append(txn_info)
        return (tid, txn_list)
903

904
    def history(self, oid, size=1, filter=None):
905
        # Get history informations for object first
906
        packet = Packets.AskObjectHistory(oid, 0, size)
907
        for node, conn in self.cp.iterateForObject(oid, readable=True):
908
            try:
909
                history_list = self._askStorage(conn, packet)
910
            except ConnectionClosed:
911
                continue
912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927
            # Now that we have object informations, get txn informations
            result = []
            # history_list is already sorted descending (by the storage)
            for serial, size in history_list:
                txn_info, txn_ext = self._getTransactionInformation(serial)
                # create history dict
                txn_info.pop('id')
                txn_info.pop('oids')
                txn_info.pop('packed')
                txn_info['tid'] = serial
                txn_info['version'] = ''
                txn_info['size'] = size
                if filter is None or filter(txn_info):
                    result.append(txn_info)
                self._insertMetadata(txn_info, txn_ext)
            return result
Aurel's avatar
Aurel committed
928

929 930
    def importFrom(self, source, start, stop, tryToResolveConflict,
            preindex=None):
931 932 933 934 935
        # TODO: The main difference with BaseStorage implementation is that
        #       preindex can't be filled with the result 'store' (tid only
        #       known after 'tpc_finish'. This method could be dropped if we
        #       implemented IStorageRestoreable (a wrapper around source would
        #       still be required for partial import).
936 937
        if preindex is None:
            preindex = {}
938
        for transaction in source.iterator(start, stop):
939 940
            tid = transaction.tid
            self.tpc_begin(transaction, tid, transaction.status)
941
            for r in transaction:
942
                oid = r.oid
943
                pre = preindex.get(oid)
944
                self.store(oid, pre, r.data, r.version, transaction)
945
                preindex[oid] = tid
946 947 948 949
            conflicted = self.tpc_vote(transaction, tryToResolveConflict)
            assert not conflicted, conflicted
            real_tid = self.tpc_finish(transaction, tryToResolveConflict)
            assert real_tid == tid, (real_tid, tid)
950

951
    from .iterator import iterator
952

953
    def lastTransaction(self):
954 955
        self._askPrimary(Packets.AskLastTransaction())
        return self.last_tid
956

957
    def pack(self, t):
958
        tid = TimeStamp(*time.gmtime(t)[:5] + (t % 60, )).raw()
959 960 961
        if tid == ZERO_TID:
            raise NEOStorageError('Invalid pack time')
        self._askPrimary(Packets.AskPack(tid))
962 963 964 965 966 967
        # XXX: this is only needed to make ZODB unit tests pass.
        # It should not be otherwise required (clients should be free to load
        # old data as long as it is available in cache, event if it was pruned
        # by a pack), so don't bother invalidating on other clients.
        self._cache_lock_acquire()
        try:
968
            self._cache.clear()
969 970
        finally:
            self._cache_lock_release()
971

972
    def getLastTID(self, oid):
973
        return self.load(oid)[1]
974 975

    def checkCurrentSerialInTransaction(self, oid, serial, transaction):
976 977
        self._checkCurrentSerialInTransaction(
            self._txn_container.get(transaction), oid, serial)
978 979 980 981

    def _checkCurrentSerialInTransaction(self, txn_context, oid, serial):
        ttid = txn_context['ttid']
        txn_context['object_serial_dict'][oid] = serial
982
        # Placeholders
983 984
        queue = txn_context['queue']
        txn_context['object_stored_counter_dict'][oid] = {}
985 986 987 988
        # ZODB.Connection performs calls 'checkCurrentSerialInTransaction'
        # after stores, and skips oids that have been succeessfully stored.
        assert oid not in txn_context['cache_dict'], (oid, txn_context)
        txn_context['data_dict'].setdefault(oid, CHECKED_SERIAL)
989
        checked_nodes = txn_context['checked_nodes']
990
        packet = Packets.AskCheckCurrentSerial(ttid, serial, oid)
991
        for node, conn in self.cp.iterateForObject(oid):
992
            try:
993
                conn.ask(packet, queue=queue)
994 995
            except ConnectionClosed:
                continue
996 997 998
            checked_nodes.add(node)
        if not checked_nodes:
            raise NEOStorageError("checkCurrent failed")
999
        self._waitAnyTransactionMessage(txn_context, False)
1000