app.py 44.8 KB
Newer Older
1
#
Grégory Wisniewski's avatar
Grégory Wisniewski committed
2
# Copyright (C) 2006-2010  Nexedi SA
3
#
Aurel's avatar
Aurel committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
Aurel's avatar
Aurel committed
9 10 11 12 13 14 15
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
Aurel's avatar
Aurel committed
17

18
from cPickle import dumps, loads
19
from zlib import compress as real_compress, decompress
20
from neo.lib.locking import Empty
Julien Muchembled's avatar
Julien Muchembled committed
21
from random import shuffle
22
import heapq
23
import time
24
import os
25

26
from ZODB.POSException import UndoError, StorageTransactionError, ConflictError
27
from ZODB.POSException import ReadConflictError
28
from ZODB.ConflictResolution import ResolvedSerial
29
from persistent.TimeStamp import TimeStamp
30

31
import neo.lib
32 33
from neo.lib.protocol import NodeTypes, Packets, \
    INVALID_PARTITION, ZERO_HASH, ZERO_TID
34 35 36 37 38 39
from neo.lib.event import EventManager
from neo.lib.util import makeChecksum as real_makeChecksum, dump
from neo.lib.locking import Lock
from neo.lib.connection import MTClientConnection, OnTimeout, ConnectionClosed
from neo.lib.node import NodeManager
from neo.lib.connector import getConnectorHandler
40 41
from .exception import NEOStorageError, NEOStorageCreationUndoneError
from .exception import NEOStorageNotFoundError
42
from neo.lib.exception import NeoException
43
from .handlers import storage, master
44
from neo.lib.dispatcher import Dispatcher, ForgottenPacket
45 46 47 48
from .poll import ThreadedPoll, psThreadedPoll
from .iterator import Iterator
from .cache import ClientCache
from .pool import ConnectionPool
49 50
from neo.lib.util import u64, parseMasterList
from neo.lib.profiling import profiler_decorator, PROFILING_ENABLED
51
from neo.lib.debug import register as registerLiveDebugger
52
from .container import ThreadContainer, TransactionContainer
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67

if PROFILING_ENABLED:
    # Those functions require a "real" python function wrapper before they can
    # be decorated.
    @profiler_decorator
    def compress(data):
        return real_compress(data)

    @profiler_decorator
    def makeChecksum(data):
        return real_makeChecksum(data)
else:
    # If profiling is disabled, directly use original functions.
    compress = real_compress
    makeChecksum = real_makeChecksum
Aurel's avatar
Aurel committed
68

69 70 71
CHECKED_SERIAL = object()


Aurel's avatar
Aurel committed
72
class Application(object):
73 74
    """The client node application."""

75 76
    def __init__(self, master_nodes, name, compress=True,
            dynamic_master_list=None, **kw):
77
        # Start polling thread
78
        self.em = EventManager()
79 80
        self.poll_thread = ThreadedPoll(self.em, name=name)
        psThreadedPoll()
81
        # Internal Attributes common to all thread
82
        self._db = None
Aurel's avatar
Aurel committed
83
        self.name = name
Olivier Cros's avatar
Olivier Cros committed
84 85
        master_addresses, connector_name = parseMasterList(master_nodes)
        self.connector_handler = getConnectorHandler(connector_name)
86
        self.dispatcher = Dispatcher(self.poll_thread)
87
        self.nm = NodeManager(dynamic_master_list)
88
        self.cp = ConnectionPool(self)
89
        self.pt = None
90
        self.master_conn = None
91
        self.primary_master_node = None
92
        self.trying_master_node = None
93 94

        # load master node list
Olivier Cros's avatar
Olivier Cros committed
95
        for address in master_addresses:
96
            self.nm.createMaster(address=address)
97

98
        # no self-assigned UUID, primary master will supply us one
99
        self.uuid = None
100
        self._cache = ClientCache()
101
        self.new_oid_list = []
102
        self.last_oid = '\0' * 8
103
        self.storage_event_handler = storage.StorageEventHandler(self)
104
        self.storage_bootstrap_handler = storage.StorageBootstrapHandler(self)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
105
        self.storage_handler = storage.StorageAnswersHandler(self)
106 107
        self.primary_handler = master.PrimaryAnswersHandler(self)
        self.primary_bootstrap_handler = master.PrimaryBootstrapHandler(self)
108
        self.notifications_handler = master.PrimaryNotificationsHandler( self)
109
        # Internal attribute distinct between thread
110 111
        self._thread_container = ThreadContainer()
        self._txn_container = TransactionContainer()
112
        # Lock definition :
113
        # _load_lock is used to make loading and storing atomic
114
        lock = Lock()
115 116
        self._load_lock_acquire = lock.acquire
        self._load_lock_release = lock.release
117 118
        # _oid_lock is used in order to not call multiple oid
        # generation at the same time
119 120
        lock = Lock()
        self._oid_lock_acquire = lock.acquire
Aurel's avatar
Aurel committed
121
        self._oid_lock_release = lock.release
122
        lock = Lock()
123
        # _cache_lock is used for the client cache
124 125
        self._cache_lock_acquire = lock.acquire
        self._cache_lock_release = lock.release
126
        lock = Lock()
127 128
        # _connecting_to_master_node is used to prevent simultaneous master
        # node connection attemps
129 130
        self._connecting_to_master_node_acquire = lock.acquire
        self._connecting_to_master_node_release = lock.release
131 132 133 134
        # _nm ensure exclusive access to the node manager
        lock = Lock()
        self._nm_acquire = lock.acquire
        self._nm_release = lock.release
135
        self.compress = compress
136
        registerLiveDebugger(on_log=self.log)
137

138 139 140 141 142 143 144 145 146
    def getHandlerData(self):
        return self._thread_container.get()['answer']

    def setHandlerData(self, data):
        self._thread_container.get()['answer'] = data

    def _getThreadQueue(self):
        return self._thread_container.get()['queue']

147 148 149 150 151 152
    def log(self):
        self.em.log()
        self.nm.log()
        if self.pt is not None:
            self.pt.log()

153
    @profiler_decorator
154
    def _handlePacket(self, conn, packet, kw={}, handler=None):
155 156 157 158 159 160 161 162 163 164
        """
          conn
            The connection which received the packet (forwarded to handler).
          packet
            The packet to handle.
          handler
            The handler to use to handle packet.
            If not given, it will be guessed from connection's not type.
        """
        if handler is None:
165 166
            # Guess the handler to use based on the type of node on the
            # connection
167 168 169 170 171 172 173 174 175 176
            node = self.nm.getByAddress(conn.getAddress())
            if node is None:
                raise ValueError, 'Expecting an answer from a node ' \
                    'which type is not known... Is this right ?'
            if node.isStorage():
                handler = self.storage_handler
            elif node.isMaster():
                handler = self.primary_handler
            else:
                raise ValueError, 'Unknown node type: %r' % (node.__class__, )
177 178
        conn.lock()
        try:
179
            handler.dispatch(conn, packet, kw)
180 181
        finally:
            conn.unlock()
182

183
    @profiler_decorator
184
    def _waitAnyMessage(self, queue, block=True):
185 186 187 188 189 190
        """
          Handle all pending packets.
          block
            If True (default), will block until at least one packet was
            received.
        """
191 192
        pending = self.dispatcher.pending
        get = queue.get
193
        _handlePacket = self._handlePacket
194
        while pending(queue):
195
            try:
196
                conn, packet, kw = get(block)
197
            except Empty:
198
                break
199 200
            if packet is None or isinstance(packet, ForgottenPacket):
                # connection was closed or some packet was forgotten
201
                continue
202 203
            block = False
            try:
204
                _handlePacket(conn, packet, kw)
205 206 207
            except ConnectionClosed:
                pass

208 209 210 211 212 213 214 215 216 217 218 219 220
    def _waitAnyTransactionMessage(self, txn_context, block=True):
        """
        Just like _waitAnyMessage, but for per-transaction exchanges, rather
        than per-thread.
        """
        queue = txn_context['queue']
        self.setHandlerData(txn_context)
        try:
            self._waitAnyMessage(queue, block=block)
        finally:
            # Don't leave access to thread context, even if a raise happens.
            self.setHandlerData(None)

221
    @profiler_decorator
222 223 224 225 226
    def _ask(self, conn, packet, handler=None):
        self.setHandlerData(None)
        queue = self._getThreadQueue()
        msg_id = conn.ask(packet, queue=queue)
        get = queue.get
227 228
        _handlePacket = self._handlePacket
        while True:
229
            qconn, qpacket, kw = get(True)
230 231
            is_forgotten = isinstance(qpacket, ForgottenPacket)
            if conn is qconn:
232
                # check fake packet
233
                if qpacket is None:
234
                    raise ConnectionClosed
235
                if msg_id == qpacket.getId():
236 237 238
                    if is_forgotten:
                        raise ValueError, 'ForgottenPacket for an ' \
                            'explicitely expected packet.'
239
                    _handlePacket(qconn, qpacket, kw, handler)
240
                    break
241
            if not is_forgotten and qpacket is not None:
242
                _handlePacket(qconn, qpacket, kw)
243
        return self.getHandlerData()
244

245
    @profiler_decorator
246
    def _askStorage(self, conn, packet):
Vincent Pelletier's avatar
Vincent Pelletier committed
247
        """ Send a request to a storage node and process its answer """
248
        return self._ask(conn, packet, handler=self.storage_handler)
249

250
    @profiler_decorator
251
    def _askPrimary(self, packet):
Vincent Pelletier's avatar
Vincent Pelletier committed
252
        """ Send a request to the primary master and process its answer """
253 254
        return self._ask(self._getMasterConnection(), packet,
            handler=self.primary_handler)
255

256
    @profiler_decorator
257 258
    def _getMasterConnection(self):
        """ Connect to the primary master node on demand """
259
        # acquire the lock to allow only one thread to connect to the primary
260 261 262 263
        result = self.master_conn
        if result is None:
            self._connecting_to_master_node_acquire()
            try:
264
                self.new_oid_list = []
265 266 267 268
                result = self._connectToPrimaryNode()
            finally:
                self._connecting_to_master_node_release()
        return result
269

270
    def getPartitionTable(self):
271
        """ Return the partition table manager, reconnect the PMN if needed """
272 273 274 275 276
        # this ensure the master connection is established and the partition
        # table is up to date.
        self._getMasterConnection()
        return self.pt

277
    @profiler_decorator
278
    def _connectToPrimaryNode(self):
279 280 281
        """
            Lookup for the current primary master node
        """
282
        neo.lib.logging.debug('connecting to primary master...')
283 284
        ready = False
        nm = self.nm
285
        packet = Packets.AskPrimary()
286 287 288 289 290 291 292 293
        while not ready:
            # Get network connection to primary master
            index = 0
            connected = False
            while not connected:
                if self.primary_master_node is not None:
                    # If I know a primary master node, pinpoint it.
                    self.trying_master_node = self.primary_master_node
294
                    self.primary_master_node = None
295 296
                else:
                    # Otherwise, check one by one.
297
                    master_list = nm.getMasterList()
298 299 300
                    try:
                        self.trying_master_node = master_list[index]
                    except IndexError:
301
                        time.sleep(1)
302 303 304 305
                        index = 0
                        self.trying_master_node = master_list[0]
                    index += 1
                # Connect to master
306
                conn = MTClientConnection(self.em,
307
                        self.notifications_handler,
308
                        node=self.trying_master_node,
309
                        connector=self.connector_handler(),
310
                        dispatcher=self.dispatcher)
311
                # Query for primary master node
312 313
                if conn.getConnector() is None:
                    # This happens if a connection could not be established.
314 315
                    neo.lib.logging.error(
                                    'Connection to master node %s failed',
316 317
                                  self.trying_master_node)
                    continue
318
                try:
319 320
                    self._ask(conn, packet,
                        handler=self.primary_bootstrap_handler)
321 322
                except ConnectionClosed:
                    continue
323
                # If we reached the primary master node, mark as connected
324 325
                connected = self.primary_master_node is not None and \
                        self.primary_master_node is self.trying_master_node
326 327
            neo.lib.logging.info(
                            'Connected to %s' % (self.primary_master_node, ))
328 329 330
            try:
                ready = self.identifyToPrimaryNode(conn)
            except ConnectionClosed:
331
                neo.lib.logging.error('Connection to %s lost',
332
                    self.trying_master_node)
333
                self.primary_master_node = None
334
        neo.lib.logging.info("Connected and ready")
335
        return conn
336

337 338 339 340 341 342
    def identifyToPrimaryNode(self, conn):
        """
            Request identification and required informations to be operational.
            Might raise ConnectionClosed so that the new primary can be
            looked-up again.
        """
343
        neo.lib.logging.info('Initializing from master')
344 345
        ask = self._ask
        handler = self.primary_bootstrap_handler
346
        # Identify to primary master and request initial data
347 348
        p = Packets.RequestIdentification(NodeTypes.CLIENT, self.uuid, None,
            self.name)
349 350
        assert self.master_conn is None, self.master_conn
        while self.master_conn is None:
351
            ask(conn, p, handler=handler)
352 353 354 355
            if conn.getUUID() is None:
                # Node identification was refused by master, it is considered
                # as the primary as long as we are connected to it.
                time.sleep(1)
356 357
        ask(conn, Packets.AskNodeInformation(), handler=handler)
        ask(conn, Packets.AskPartitionTable(), handler=handler)
358
        return self.pt.operational()
359

360 361 362
    def registerDB(self, db, limit):
        self._db = db

363 364 365
    def getDB(self):
        return self._db

366
    @profiler_decorator
367 368 369 370 371
    def new_oid(self):
        """Get a new OID."""
        self._oid_lock_acquire()
        try:
            if len(self.new_oid_list) == 0:
372 373 374 375
                # Get new oid list from master node
                # we manage a list of oid here to prevent
                # from asking too many time new oid one by one
                # from master node
376
                self._askPrimary(Packets.AskNewOIDs(100))
377 378
                if len(self.new_oid_list) <= 0:
                    raise NEOStorageError('new_oid failed')
379
            self.last_oid = self.new_oid_list.pop(0)
380
            return self.last_oid
381 382 383
        finally:
            self._oid_lock_release()

384 385 386
    def getStorageSize(self):
        # return the last OID used, this is innacurate
        return int(u64(self.last_oid))
387

388
    @profiler_decorator
389
    def load(self, oid, tid=None, before_tid=None):
Vincent Pelletier's avatar
Vincent Pelletier committed
390 391 392 393 394 395
        """
        Internal method which manage load, loadSerial and loadBefore.
        OID and TID (serial) parameters are expected packed.
        oid
            OID of object to get.
        tid
396 397 398
            If given, the exact serial at which OID is desired.
            before_tid should be None.
        before_tid
Vincent Pelletier's avatar
Vincent Pelletier committed
399 400 401 402 403 404 405 406 407 408 409 410 411 412
            If given, the excluded upper bound serial at which OID is desired.
            serial should be None.

        Return value: (3-tuple)
        - Object data (None if object creation was undone).
        - Serial of given data.
        - Next serial at which object exists, or None. Only set when tid
          parameter is not None.

        Exceptions:
            NEOStorageError
                technical problem
            NEOStorageNotFoundError
                object exists but no data satisfies given parameters
Vincent Pelletier's avatar
Vincent Pelletier committed
413
            NEOStorageDoesNotExistError
Vincent Pelletier's avatar
Vincent Pelletier committed
414
                object doesn't exist
415 416
            NEOStorageCreationUndoneError
                object existed, but its creation was undone
417 418 419 420

        Note that loadSerial is used during conflict resolution to load
        object's current version, which is not visible to us normaly (it was
        committed after our snapshot was taken).
Vincent Pelletier's avatar
Vincent Pelletier committed
421 422
        """
        # TODO:
423
        # - rename parameters (here? and in handlers & packet definitions)
424

425 426
        self._load_lock_acquire()
        try:
427
            result = self._loadFromCache(oid, tid, before_tid)
428
            if not result:
429
                result = self._loadFromStorage(oid, tid, before_tid)
430 431 432 433 434 435
                self._cache_lock_acquire()
                try:
                    self._cache.store(oid, *result)
                finally:
                    self._cache_lock_release()
            return result
436 437
        finally:
            self._load_lock_release()
438 439

    @profiler_decorator
440 441
    def _loadFromStorage(self, oid, at_tid, before_tid):
        packet = Packets.AskObject(oid, at_tid, before_tid)
442 443
        for node, conn in self.cp.iterateForObject(oid, readable=True):
            try:
444 445
                noid, tid, next_tid, compression, checksum, data \
                    = self._askStorage(conn, packet)
446 447
            except ConnectionClosed:
                continue
448

449
            if data or checksum != ZERO_HASH:
450 451
                if checksum != makeChecksum(data):
                    neo.lib.logging.error('wrong checksum from %s for oid %s',
452
                              conn, dump(oid))
453 454 455 456 457 458 459 460
                    continue
                if compression:
                    data = decompress(data)
                return data, tid, next_tid
            raise NEOStorageCreationUndoneError(dump(oid))
        # We didn't got any object from all storage node because of
        # connection error
        raise NEOStorageError('connection failure')
461

462
    @profiler_decorator
463
    def _loadFromCache(self, oid, at_tid=None, before_tid=None):
464
        """
465
        Load from local cache, return None if not found.
466 467
        """
        self._cache_lock_acquire()
468
        try:
469 470 471 472 473
            if at_tid:
                result = self._cache.load(oid, at_tid + '*')
                assert not result or result[1] == at_tid
                return result
            return self._cache.load(oid, before_tid)
474
        finally:
475
            self._cache_lock_release()
Aurel's avatar
Aurel committed
476

477
    @profiler_decorator
478 479
    def tpc_begin(self, transaction, tid=None, status=' '):
        """Begin a new transaction."""
480
        txn_container = self._txn_container
481
        # First get a transaction, only one is allowed at a time
482
        if txn_container.get(transaction) is not None:
483
            # We already begin the same transaction
484
            raise StorageTransactionError('Duplicate tpc_begin calls')
485
        txn_context = txn_container.new(transaction)
486
        # use the given TID or request a new one to the master
487 488
        answer_ttid = self._askPrimary(Packets.AskBeginTransaction(tid))
        if answer_ttid is None:
489
            raise NEOStorageError('tpc_begin failed')
490 491 492
        assert tid in (None, answer_ttid), (tid, answer_ttid)
        txn_context['txn'] = transaction
        txn_context['ttid'] = answer_ttid
493

494
    @profiler_decorator
Vincent Pelletier's avatar
Vincent Pelletier committed
495
    def store(self, oid, serial, data, version, transaction):
496
        """Store object."""
497 498
        txn_context = self._txn_container.get(transaction)
        if txn_context is None:
499
            raise StorageTransactionError(self, transaction)
500 501
        neo.lib.logging.debug(
                        'storing oid %s serial %s', dump(oid), dump(serial))
502
        self._store(txn_context, oid, serial, data)
503 504
        return None

505 506 507
    def _store(self, txn_context, oid, serial, data, data_serial=None,
            unlock=False):
        ttid = txn_context['ttid']
508 509 510 511
        if data is None:
            # This is some undo: either a no-data object (undoing object
            # creation) or a back-pointer to an earlier revision (going back to
            # an older object revision).
512
            compressed_data = ''
513
            compression = 0
514
            checksum = ZERO_HASH
515 516
        else:
            assert data_serial is None
517 518
            compression = self.compress
            compressed_data = data
519
            size = len(data)
520 521
            if self.compress:
                compressed_data = compress(data)
522
                if size < len(compressed_data):
523 524 525 526
                    compressed_data = data
                    compression = 0
                else:
                    compression = 1
527
            checksum = makeChecksum(compressed_data)
528
            txn_context['data_size'] += size
529
        on_timeout = OnTimeout(self.onStoreTimeout, txn_context, oid)
530
        # Store object in tmp cache
531
        txn_context['data_dict'][oid] = data
532
        # Store data on each node
533 534
        txn_context['object_stored_counter_dict'][oid] = {}
        object_base_serial_dict = txn_context['object_base_serial_dict']
535 536
        if oid not in object_base_serial_dict:
            object_base_serial_dict[oid] = serial
537 538 539 540
        txn_context['object_serial_dict'][oid] = serial
        queue = txn_context['queue']
        involved_nodes = txn_context['involved_nodes']
        add_involved_nodes = involved_nodes.add
541
        packet = Packets.AskStoreObject(oid, serial, compression,
542
            checksum, compressed_data, data_serial, ttid, unlock)
543
        for node, conn in self.cp.iterateForObject(oid):
544
            try:
545 546
                conn.ask(packet, on_timeout=on_timeout, queue=queue)
                add_involved_nodes(node)
547
            except ConnectionClosed:
548
                continue
549
        if not involved_nodes:
550
            raise NEOStorageError("Store failed")
551

552 553
        while txn_context['data_size'] >= self._cache._max_size:
            self._waitAnyTransactionMessage(txn_context)
554
        self._waitAnyTransactionMessage(txn_context, False)
555

556
    def onStoreTimeout(self, conn, msg_id, txn_context, oid):
557
        # NOTE: this method is called from poll thread, don't use
558 559
        #       thread-specific value !
        txn_context.setdefault('timeout_dict', {})[oid] = msg_id
Vincent Pelletier's avatar
Vincent Pelletier committed
560
        # Ask the storage if someone locks the object.
561 562 563 564
        # By sending a message with a smaller timeout,
        # the connection will be kept open.
        conn.ask(Packets.AskHasLock(txn_context['ttid'], oid),
                 timeout=5, queue=txn_context['queue'])
Vincent Pelletier's avatar
Vincent Pelletier committed
565

566
    @profiler_decorator
567
    def _handleConflicts(self, txn_context, tryToResolveConflict):
568 569 570
        result = []
        append = result.append
        # Check for conflicts
571 572 573 574 575 576 577
        data_dict = txn_context['data_dict']
        object_base_serial_dict = txn_context['object_base_serial_dict']
        object_serial_dict = txn_context['object_serial_dict']
        conflict_serial_dict = txn_context['conflict_serial_dict'].copy()
        txn_context['conflict_serial_dict'].clear()
        resolved_conflict_serial_dict = txn_context[
            'resolved_conflict_serial_dict']
578
        for oid, conflict_serial_set in conflict_serial_dict.iteritems():
579
            conflict_serial = max(conflict_serial_set)
580
            serial = object_serial_dict[oid]
581
            if ZERO_TID in conflict_serial_set:
582 583 584 585
              if 1:
                # XXX: disable deadlock avoidance code until it is fixed
                neo.lib.logging.info('Deadlock avoidance on %r:%r',
                    dump(oid), dump(serial))
586 587 588 589 590 591 592
                # 'data' parameter of ConflictError is only used to report the
                # class of the object. It doesn't matter if 'data' is None
                # because the transaction is too big.
                try:
                    data = data_dict[oid]
                except KeyError:
                    data = txn_context['cache_dict'][oid]
593
              else:
594 595 596 597 598 599 600 601 602 603
                # Storage refused us from taking object lock, to avoid a
                # possible deadlock. TID is actually used for some kind of
                # "locking priority": when a higher value has the lock,
                # this means we stored objects "too late", and we would
                # otherwise cause a deadlock.
                # To recover, we must ask storages to release locks we
                # hold (to let possibly-competing transactions acquire
                # them), and requeue our already-sent store requests.
                # XXX: currently, brute-force is implemented: we send
                # object data again.
604
                # WARNING: not maintained code
605
                neo.lib.logging.info('Deadlock avoidance triggered on %r:%r',
606
                    dump(oid), dump(serial))
607
                for store_oid, store_data in data_dict.iteritems():
608
                    store_serial = object_serial_dict[store_oid]
609
                    if store_data is CHECKED_SERIAL:
610 611
                        self._checkCurrentSerialInTransaction(txn_context,
                            store_oid, store_serial)
612
                    else:
613
                        if store_data is None:
614
                            # Some undo
615
                            neo.lib.logging.warning('Deadlock avoidance cannot'
616 617
                                ' reliably work with undo, this must be '
                                'implemented.')
618
                            conflict_serial = ZERO_TID
619
                            break
620 621
                        self._store(txn_context, store_oid, store_serial,
                            store_data, unlock=True)
622
                else:
623
                    continue
624 625 626 627 628 629 630
            else:
                data = data_dict.pop(oid)
                if data is CHECKED_SERIAL:
                    raise ReadConflictError(oid=oid, serials=(conflict_serial,
                        serial))
                if data: # XXX: can 'data' be None ???
                    txn_context['data_size'] -= len(data)
631 632 633 634 635 636 637
                resolved_serial_set = resolved_conflict_serial_dict.setdefault(
                    oid, set())
                if resolved_serial_set and conflict_serial <= max(
                        resolved_serial_set):
                    # A later serial has already been resolved, skip.
                    resolved_serial_set.update(conflict_serial_set)
                    continue
638 639 640
                new_data = tryToResolveConflict(oid, conflict_serial,
                    serial, data)
                if new_data is not None:
641
                    neo.lib.logging.info('Conflict resolution succeed for ' \
642 643 644
                        '%r:%r with %r', dump(oid), dump(serial),
                        dump(conflict_serial))
                    # Mark this conflict as resolved
645
                    resolved_serial_set.update(conflict_serial_set)
646 647
                    # Base serial changes too, as we resolved a conflict
                    object_base_serial_dict[oid] = conflict_serial
648
                    # Try to store again
649
                    self._store(txn_context, oid, conflict_serial, new_data)
650
                    append(oid)
651
                    continue
652
                else:
653
                    neo.lib.logging.info('Conflict resolution failed for ' \
654 655
                        '%r:%r with %r', dump(oid), dump(serial),
                        dump(conflict_serial))
656 657
            raise ConflictError(oid=oid, serials=(txn_context['ttid'],
                serial), data=data)
658
        return result
659

660
    @profiler_decorator
661
    def waitResponses(self, queue):
662
        """Wait for all requests to be answered (or their connection to be
Vincent Pelletier's avatar
Vincent Pelletier committed
663
        detected as closed)"""
664 665 666
        pending = self.dispatcher.pending
        _waitAnyMessage = self._waitAnyMessage
        while pending(queue):
667
            _waitAnyMessage(queue)
668

669
    @profiler_decorator
670
    def waitStoreResponses(self, txn_context, tryToResolveConflict):
671 672 673 674
        result = []
        append = result.append
        resolved_oid_set = set()
        update = resolved_oid_set.update
675
        ttid = txn_context['ttid']
676
        _handleConflicts = self._handleConflicts
677 678
        queue = txn_context['queue']
        conflict_serial_dict = txn_context['conflict_serial_dict']
679
        pending = self.dispatcher.pending
680
        _waitAnyTransactionMessage = self._waitAnyTransactionMessage
681
        while pending(queue) or conflict_serial_dict:
682 683 684
            # Note: handler data can be overwritten by _handleConflicts
            # so we must set it for each iteration.
            _waitAnyTransactionMessage(txn_context)
685
            if conflict_serial_dict:
686 687
                conflicts = _handleConflicts(txn_context,
                    tryToResolveConflict)
688 689
                if conflicts:
                    update(conflicts)
690

Vincent Pelletier's avatar
Vincent Pelletier committed
691
        # Check for never-stored objects, and update result for all others
692
        for oid, store_dict in \
693
                txn_context['object_stored_counter_dict'].iteritems():
694
            if not store_dict:
695
                neo.lib.logging.error('tpc_store failed')
Vincent Pelletier's avatar
Vincent Pelletier committed
696 697 698
                raise NEOStorageError('tpc_store failed')
            elif oid in resolved_oid_set:
                append((oid, ResolvedSerial))
699
        return result
Aurel's avatar
Aurel committed
700

701
    @profiler_decorator
702
    def tpc_vote(self, transaction, tryToResolveConflict):
703
        """Store current transaction."""
704 705
        txn_context = self._txn_container.get(transaction)
        if txn_context is None or transaction is not txn_context['txn']:
706
            raise StorageTransactionError(self, transaction)
707

708
        result = self.waitStoreResponses(txn_context, tryToResolveConflict)
709

710
        ttid = txn_context['ttid']
711
        # Store data on each node
712
        txn_stored_counter = 0
713
        assert not txn_context['data_dict'], txn_context
714
        packet = Packets.AskStoreTransaction(ttid, str(transaction.user),
715
            str(transaction.description), dumps(transaction._extension),
716
            txn_context['cache_dict'])
717
        add_involved_nodes = txn_context['involved_nodes'].add
718
        for node, conn in self.cp.iterateForObject(ttid):
719
            neo.lib.logging.debug("voting object %s on %s", dump(ttid),
720
                dump(conn.getUUID()))
721
            try:
722
                self._askStorage(conn, packet)
723
            except ConnectionClosed:
724
                continue
725
            add_involved_nodes(node)
726
            txn_stored_counter += 1
727 728

        # check at least one storage node accepted
729
        if txn_stored_counter == 0:
730
            neo.lib.logging.error('tpc_vote failed')
731
            raise NEOStorageError('tpc_vote failed')
732 733 734 735 736
        # Check if master connection is still alive.
        # This is just here to lower the probability of detecting a problem
        # in tpc_finish, as we should do our best to detect problem before
        # tpc_finish.
        self._getMasterConnection()
737

738
        txn_context['txn_voted'] = True
739 740
        return result

741
    @profiler_decorator
742 743
    def tpc_abort(self, transaction):
        """Abort current transaction."""
744 745 746
        txn_container = self._txn_container
        txn_context = txn_container.get(transaction)
        if txn_context is None:
747
            return
Aurel's avatar
Aurel committed
748

749 750
        ttid = txn_context['ttid']
        p = Packets.AbortTransaction(ttid)
751
        getConnForNode = self.cp.getConnForNode
752
        # cancel transaction one all those nodes
753
        for node in txn_context['involved_nodes']:
754
            conn = getConnForNode(node)
755 756
            if conn is None:
                continue
757 758 759
            try:
                conn.notify(p)
            except:
760 761 762 763
                neo.lib.logging.error(
                    'Exception in tpc_abort while notifying' \
                    'storage node %r of abortion, ignoring.',
                    conn, exc_info=1)
764
        self._getMasterConnection().notify(p)
765 766 767 768 769 770
        queue = txn_context['queue']
        # We don't need to flush queue, as it won't be reused by future
        # transactions (deleted on next line & indexed by transaction object
        # instance).
        self.dispatcher.forget_queue(queue, flush_queue=False)
        txn_container.delete(transaction)
771

772
    @profiler_decorator
773
    def tpc_finish(self, transaction, tryToResolveConflict, f=None):
774
        """Finish current transaction."""
775 776 777
        txn_container = self._txn_container
        txn_context = txn_container.get(transaction)
        if txn_context is None:
778
            raise StorageTransactionError('tpc_finish called for wrong '
779
                'transaction')
780
        if not txn_context['txn_voted']:
781
            self.tpc_vote(transaction, tryToResolveConflict)
782
        self._load_lock_acquire()
783
        try:
784
            # Call finish on master
785 786 787
            cache_dict = txn_context['cache_dict']
            tid = self._askPrimary(Packets.AskFinishTransaction(
                txn_context['ttid'], cache_dict))
788

789 790
            # Call function given by ZODB
            if f is not None:
791
                f(tid)
792 793 794 795

            # Update cache
            self._cache_lock_acquire()
            try:
796
                cache = self._cache
797
                for oid, data in cache_dict.iteritems():
798
                    if data is CHECKED_SERIAL:
799 800 801 802 803
                        # this is just a remain of
                        # checkCurrentSerialInTransaction call, ignore (no data
                        # was modified).
                        continue
                    # Update ex-latest value in cache
804
                    cache.invalidate(oid, tid)
805
                    if data is not None:
806
                        # Store in cache with no next_tid
807
                        cache.store(oid, data, tid, None)
808 809
            finally:
                self._cache_lock_release()
810
            txn_container.delete(transaction)
811
            return tid
812
        finally:
813
            self._load_lock_release()
814

Vincent Pelletier's avatar
Vincent Pelletier committed
815
    def undo(self, snapshot_tid, undone_tid, txn, tryToResolveConflict):
816 817
        txn_context = self._txn_container.get(txn)
        if txn_context is None:
818
            raise StorageTransactionError(self, undone_tid)
819

820
        txn_info, txn_ext = self._getTransactionInformation(undone_tid)
821
        txn_oid_list = txn_info['oids']
822

823 824
        # Regroup objects per partition, to ask a minimum set of storage.
        partition_oid_dict = {}
825
        pt = self.getPartitionTable()
826
        for oid in txn_oid_list:
827
            partition = pt.getPartition(oid)
828 829 830 831 832 833 834 835 836 837
            try:
                oid_list = partition_oid_dict[partition]
            except KeyError:
                oid_list = partition_oid_dict[partition] = []
            oid_list.append(oid)

        # Ask storage the undo serial (serial at which object's previous data
        # is)
        getCellList = pt.getCellList
        getCellSortKey = self.cp.getCellSortKey
838
        getConnForCell = self.cp.getConnForCell
839 840
        queue = self._getThreadQueue()
        ttid = txn_context['ttid']
841
        undo_object_tid_dict = {}
842 843
        for partition, oid_list in partition_oid_dict.iteritems():
            cell_list = getCellList(partition, readable=True)
Julien Muchembled's avatar
Julien Muchembled committed
844 845 846 847
            # We do want to shuffle before getting one with the smallest
            # key, so that all cells with the same (smallest) key has
            # identical chance to be chosen.
            shuffle(cell_list)
848
            storage_conn = getConnForCell(min(cell_list, key=getCellSortKey))
849
            storage_conn.ask(Packets.AskObjectUndoSerial(ttid,
850 851
                snapshot_tid, undone_tid, oid_list),
                queue=queue, undo_object_tid_dict=undo_object_tid_dict)
852 853 854 855

        # Wait for all AnswerObjectUndoSerial. We might get OidNotFoundError,
        # meaning that objects in transaction's oid_list do not exist any
        # longer. This is the symptom of a pack, so forbid undoing transaction
Vincent Pelletier's avatar
Vincent Pelletier committed
856 857
        # when it happens.
        try:
858
            self.waitResponses(queue)
Vincent Pelletier's avatar
Vincent Pelletier committed
859 860
        except NEOStorageNotFoundError:
            self.dispatcher.forget_queue(queue)
861
            raise UndoError('non-undoable transaction')
862

863
        # Send undo data to all storage nodes.
864
        for oid in txn_oid_list:
865 866 867 868 869 870 871 872
            current_serial, undo_serial, is_current = undo_object_tid_dict[oid]
            if is_current:
                data = None
            else:
                # Serial being undone is not the latest version for this
                # object. This is an undo conflict, try to resolve it.
                try:
                    # Load the latest version we are supposed to see
873
                    data = self.load(oid, current_serial)[0]
874
                    # Load the version we were undoing to
875
                    undo_data = self.load(oid, undo_serial)[0]
876 877 878 879 880 881 882 883 884 885 886 887 888
                except NEOStorageNotFoundError:
                    raise UndoError('Object not found while resolving undo '
                        'conflict')
                # Resolve conflict
                try:
                    data = tryToResolveConflict(oid, current_serial,
                        undone_tid, undo_data, data)
                except ConflictError:
                    data = None
                if data is None:
                    raise UndoError('Some data were modified by a later ' \
                        'transaction', oid)
                undo_serial = None
889
            self._store(txn_context, oid, current_serial, data, undo_serial)
890

891
        return None, txn_oid_list
892

893 894 895 896
    def _insertMetadata(self, txn_info, extension):
        for k, v in loads(extension).items():
            txn_info[k] = v

897
    def _getTransactionInformation(self, tid):
898
        packet = Packets.AskTransactionInformation(tid)
899
        for node, conn in self.cp.iterateForObject(tid, readable=True):
900
            try:
901
                txn_info, txn_ext = self._askStorage(conn, packet)
902 903 904 905 906 907 908 909
            except ConnectionClosed:
                continue
            except NEOStorageNotFoundError:
                # TID not found
                continue
            break
        else:
            raise NEOStorageError('Transaction %r not found' % (tid, ))
910
        return (txn_info, txn_ext)
911 912 913

    def undoLog(self, first, last, filter=None, block=0):
        # XXX: undoLog is broken
914 915 916 917
        if last < 0:
            # See FileStorage.py for explanation
            last = first - last

918
        # First get a list of transactions from all storage nodes.
919 920
        # Each storage node will return TIDs only for UP_TO_DATE state and
        # FEEDING state cells
921
        pt = self.getPartitionTable()
922
        storage_node_list = pt.getNodeList()
923

924
        queue = self._getThreadQueue()
925
        packet = Packets.AskTIDs(first, last, INVALID_PARTITION)
926
        tid_set = set()
927
        for storage_node in storage_node_list:
928
            conn = self.cp.getConnForNode(storage_node)
929 930
            if conn is None:
                continue
931
            conn.ask(packet, queue=queue, tid_set=tid_set)
932 933

        # Wait for answers from all storages.
934
        self.waitResponses(queue)
935 936

        # Reorder tids
937
        ordered_tids = sorted(tid_set, reverse=True)
938
        neo.lib.logging.debug("UndoLog tids %s", map(dump, ordered_tids))
939 940
        # For each transaction, get info
        undo_info = []
941
        append = undo_info.append
942
        for tid in ordered_tids:
943
            (txn_info, txn_ext) = self._getTransactionInformation(tid)
944
            if filter is None or filter(txn_info):
945
                txn_info.pop('packed')
946
                txn_info.pop("oids")
947
                self._insertMetadata(txn_info, txn_ext)
948
                append(txn_info)
949 950
                if len(undo_info) >= last - first:
                    break
951 952 953
        # Check we return at least one element, otherwise call
        # again but extend offset
        if len(undo_info) == 0 and not block:
954 955
            undo_info = self.undoLog(first=first, last=last*5, filter=filter,
                    block=1)
956 957
        return undo_info

958
    def transactionLog(self, start, stop, limit):
959
        tid_list = []
960
        # request a tid list for each partition
961 962 963 964 965 966 967 968 969 970 971 972 973 974 975
        for offset in xrange(self.pt.getPartitions()):
            p = Packets.AskTIDsFrom(start, stop, limit, [offset])
            for node, conn in self.cp.iterateForObject(offset, readable=True):
                try:
                    r = self._askStorage(conn, p)
                    break
                except ConnectionClosed:
                    pass
            else:
                raise NEOStorageError('transactionLog failed')
            if r:
                tid_list = list(heapq.merge(tid_list, r))
                if len(tid_list) >= limit:
                    del tid_list[limit:]
                    stop = tid_list[-1]
976 977 978 979
        # request transactions informations
        txn_list = []
        append = txn_list.append
        tid = None
980
        for tid in tid_list:
981
            (txn_info, txn_ext) = self._getTransactionInformation(tid)
982
            txn_info['ext'] = loads(txn_ext)
983 984
            append(txn_info)
        return (tid, txn_list)
985

986
    def history(self, oid, size=1, filter=None):
987
        # Get history informations for object first
988
        packet = Packets.AskObjectHistory(oid, 0, size)
989
        for node, conn in self.cp.iterateForObject(oid, readable=True):
990
            try:
991
                history_list = self._askStorage(conn, packet)
992
            except ConnectionClosed:
993
                continue
994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009
            # Now that we have object informations, get txn informations
            result = []
            # history_list is already sorted descending (by the storage)
            for serial, size in history_list:
                txn_info, txn_ext = self._getTransactionInformation(serial)
                # create history dict
                txn_info.pop('id')
                txn_info.pop('oids')
                txn_info.pop('packed')
                txn_info['tid'] = serial
                txn_info['version'] = ''
                txn_info['size'] = size
                if filter is None or filter(txn_info):
                    result.append(txn_info)
                self._insertMetadata(txn_info, txn_ext)
            return result
Aurel's avatar
Aurel committed
1010

1011
    @profiler_decorator
1012 1013 1014 1015
    def importFrom(self, source, start, stop, tryToResolveConflict,
            preindex=None):
        if preindex is None:
            preindex = {}
1016
        transaction_iter = source.iterator(start, stop)
1017
        for transaction in transaction_iter:
1018 1019
            tid = transaction.tid
            self.tpc_begin(transaction, tid, transaction.status)
1020
            for r in transaction:
1021
                oid = r.oid
1022
                pre = preindex.get(oid)
1023
                # TODO: bypass conflict resolution, locks...
1024
                self.store(oid, pre, r.data, r.version, transaction)
1025
                preindex[oid] = tid
1026 1027 1028 1029
            conflicted = self.tpc_vote(transaction, tryToResolveConflict)
            assert not conflicted, conflicted
            real_tid = self.tpc_finish(transaction, tryToResolveConflict)
            assert real_tid == tid, (real_tid, tid)
1030 1031
        transaction_iter.close()

1032 1033 1034
    def iterator(self, start, stop):
        if start is None:
            start = ZERO_TID
1035 1036
        return Iterator(self, start, stop)

1037
    def lastTransaction(self):
1038
        return self._askPrimary(Packets.AskLastTransaction())
1039

1040
    def abortVersion(self, src, transaction):
1041
        if self._txn_container.get(transaction) is None:
1042 1043 1044 1045
            raise StorageTransactionError(self, transaction)
        return '', []

    def commitVersion(self, src, dest, transaction):
1046
        if self._txn_container.get(transaction) is None:
1047 1048 1049
            raise StorageTransactionError(self, transaction)
        return '', []

1050 1051 1052 1053 1054
    def __del__(self):
        """Clear all connection."""
        # Due to bug in ZODB, close is not always called when shutting
        # down zope, so use __del__ to close connections
        for conn in self.em.getConnectionList():
1055
            conn.close()
1056 1057
        self.cp.flush()
        self.master_conn = None
1058
        # Stop polling thread
1059
        neo.lib.logging.debug('Stopping %s', self.poll_thread)
1060
        self.poll_thread.stop()
1061
        psThreadedPoll()
1062
    close = __del__
1063

1064 1065 1066 1067 1068
    def pack(self, t):
        tid = repr(TimeStamp(*time.gmtime(t)[:5] + (t % 60, )))
        if tid == ZERO_TID:
            raise NEOStorageError('Invalid pack time')
        self._askPrimary(Packets.AskPack(tid))
1069 1070 1071 1072 1073 1074
        # XXX: this is only needed to make ZODB unit tests pass.
        # It should not be otherwise required (clients should be free to load
        # old data as long as it is available in cache, event if it was pruned
        # by a pack), so don't bother invalidating on other clients.
        self._cache_lock_acquire()
        try:
1075
            self._cache.clear()
1076 1077
        finally:
            self._cache_lock_release()
1078

1079
    def getLastTID(self, oid):
1080
        return self.load(oid)[1]
1081 1082

    def checkCurrentSerialInTransaction(self, oid, serial, transaction):
1083 1084
        txn_context = self._txn_container.get(transaction)
        if txn_context is None:
1085
              raise StorageTransactionError(self, transaction)
1086 1087 1088 1089 1090
        self._checkCurrentSerialInTransaction(txn_context, oid, serial)

    def _checkCurrentSerialInTransaction(self, txn_context, oid, serial):
        ttid = txn_context['ttid']
        txn_context['object_serial_dict'][oid] = serial
1091
        # Placeholders
1092 1093
        queue = txn_context['queue']
        txn_context['object_stored_counter_dict'][oid] = {}
1094 1095 1096 1097
        # ZODB.Connection performs calls 'checkCurrentSerialInTransaction'
        # after stores, and skips oids that have been succeessfully stored.
        assert oid not in txn_context['cache_dict'], (oid, txn_context)
        txn_context['data_dict'].setdefault(oid, CHECKED_SERIAL)
1098
        packet = Packets.AskCheckCurrentSerial(ttid, serial, oid)
1099
        for node, conn in self.cp.iterateForObject(oid):
1100
            try:
1101
                conn.ask(packet, queue=queue)
1102 1103
            except ConnectionClosed:
                continue
1104
        self._waitAnyTransactionMessage(txn_context, False)
1105