app.py 47.4 KB
Newer Older
1
#
Grégory Wisniewski's avatar
Grégory Wisniewski committed
2
# Copyright (C) 2006-2010  Nexedi SA
3
#
Aurel's avatar
Aurel committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
Aurel's avatar
Aurel committed
9 10 11 12 13 14 15
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
Aurel's avatar
Aurel committed
17

18
from thread import get_ident
19
from cPickle import dumps, loads
20
from zlib import compress as real_compress, decompress
21
from neo.locking import Queue, Empty
22
from random import shuffle
23
import time
24

25
from ZODB.POSException import UndoError, StorageTransactionError, ConflictError
26
from ZODB.ConflictResolution import ResolvedSerial
27
from persistent.TimeStamp import TimeStamp
28

29 30 31 32
from neo import setupLog
setupLog('CLIENT', verbose=True)

from neo import logging
33
from neo.protocol import NodeTypes, Packets, INVALID_PARTITION, ZERO_TID
34
from neo.event import EventManager
35
from neo.util import makeChecksum as real_makeChecksum, dump
36
from neo.locking import Lock
Vincent Pelletier's avatar
Vincent Pelletier committed
37
from neo.connection import MTClientConnection, OnTimeout
38
from neo.node import NodeManager
39
from neo.connector import getConnectorHandler
40
from neo.client.exception import NEOStorageError, NEOStorageCreationUndoneError
41
from neo.client.exception import NEOStorageNotFoundError, ConnectionClosed
42 43
from neo.exception import NeoException
from neo.client.handlers import storage, master
44
from neo.dispatcher import Dispatcher, ForgottenPacket
45
from neo.client.poll import ThreadedPoll
46 47
from neo.client.iterator import Iterator
from neo.client.mq import MQ
48
from neo.client.pool import ConnectionPool
49
from neo.util import u64, parseMasterList
50
from neo.profiling import profiler_decorator, PROFILING_ENABLED
51
from neo.live_debug import register as registerLiveDebugger
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66

if PROFILING_ENABLED:
    # Those functions require a "real" python function wrapper before they can
    # be decorated.
    @profiler_decorator
    def compress(data):
        return real_compress(data)

    @profiler_decorator
    def makeChecksum(data):
        return real_makeChecksum(data)
else:
    # If profiling is disabled, directly use original functions.
    compress = real_compress
    makeChecksum = real_makeChecksum
Aurel's avatar
Aurel committed
67

68 69
class ThreadContext(object):

70
    def __init__(self):
71
        super(ThreadContext, self).__setattr__('_threads_dict', {})
72

73
    def __getThreadData(self):
74
        thread_id = get_ident()
75
        try:
Grégory Wisniewski's avatar
Grégory Wisniewski committed
76
            result = self._threads_dict[thread_id]
77
        except KeyError:
78 79
            self.clear(thread_id)
            result = self._threads_dict[thread_id]
80 81 82 83 84 85 86 87
        return result

    def __getattr__(self, name):
        thread_data = self.__getThreadData()
        try:
            return thread_data[name]
        except KeyError:
            raise AttributeError, name
88

89 90 91 92
    def __setattr__(self, name, value):
        thread_data = self.__getThreadData()
        thread_data[name] = value

93 94 95
    def clear(self, thread_id=None):
        if thread_id is None:
            thread_id = get_ident()
96 97 98 99 100
        thread_dict = self._threads_dict.get(thread_id)
        if thread_dict is None:
            queue = Queue(0)
        else:
            queue = thread_dict['queue']
101
        self._threads_dict[thread_id] = {
102 103 104
            'tid': None,
            'txn': None,
            'data_dict': {},
105
            'data_list': [],
106 107 108
            'object_serial_dict': {},
            'object_stored_counter_dict': {},
            'conflict_serial_dict': {},
109
            'resolved_conflict_serial_dict': {},
110 111 112
            'object_stored': 0,
            'txn_voted': False,
            'txn_finished': False,
113
            'queue': queue,
114 115 116 117 118
            'txn_info': 0,
            'history': None,
            'node_tids': {},
            'node_ready': False,
            'asked_object': 0,
119
            'undo_object_tid_dict': {},
120
            'involved_nodes': set(),
121 122 123
        }


Aurel's avatar
Aurel committed
124
class Application(object):
125 126
    """The client node application."""

127
    def __init__(self, master_nodes, name, connector=None, compress=True, **kw):
128
        # Start polling thread
129 130
        self.em = EventManager()
        self.poll_thread = ThreadedPoll(self.em)
131
        # Internal Attributes common to all thread
132
        self._db = None
Aurel's avatar
Aurel committed
133
        self.name = name
134
        self.connector_handler = getConnectorHandler(connector)
135
        self.dispatcher = Dispatcher()
136
        self.nm = NodeManager()
137
        self.cp = ConnectionPool(self)
138
        self.pt = None
139
        self.master_conn = None
140
        self.primary_master_node = None
141
        self.trying_master_node = None
142 143

        # load master node list
144
        for address in parseMasterList(master_nodes):
145
            self.nm.createMaster(address=address)
146

147
        # no self-assigned UUID, primary master will supply us one
148
        self.uuid = None
149
        self.mq_cache = MQ()
150
        self.new_oid_list = []
151
        self.last_oid = '\0' * 8
152
        self.storage_event_handler = storage.StorageEventHandler(self)
153
        self.storage_bootstrap_handler = storage.StorageBootstrapHandler(self)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
154
        self.storage_handler = storage.StorageAnswersHandler(self)
155 156
        self.primary_handler = master.PrimaryAnswersHandler(self)
        self.primary_bootstrap_handler = master.PrimaryBootstrapHandler(self)
157
        self.notifications_handler = master.PrimaryNotificationsHandler( self)
158
        # Internal attribute distinct between thread
159
        self.local_var = ThreadContext()
160
        # Lock definition :
161
        # _load_lock is used to make loading and storing atomic
162
        lock = Lock()
163 164
        self._load_lock_acquire = lock.acquire
        self._load_lock_release = lock.release
165 166
        # _oid_lock is used in order to not call multiple oid
        # generation at the same time
167 168
        lock = Lock()
        self._oid_lock_acquire = lock.acquire
Aurel's avatar
Aurel committed
169
        self._oid_lock_release = lock.release
170
        lock = Lock()
171
        # _cache_lock is used for the client cache
172 173
        self._cache_lock_acquire = lock.acquire
        self._cache_lock_release = lock.release
174
        lock = Lock()
175 176
        # _connecting_to_master_node is used to prevent simultaneous master
        # node connection attemps
177 178
        self._connecting_to_master_node_acquire = lock.acquire
        self._connecting_to_master_node_release = lock.release
179 180 181 182
        # _nm ensure exclusive access to the node manager
        lock = Lock()
        self._nm_acquire = lock.acquire
        self._nm_release = lock.release
183
        self.compress = compress
184
        registerLiveDebugger(on_log=self.log)
185

186 187 188 189 190 191
    def log(self):
        self.em.log()
        self.nm.log()
        if self.pt is not None:
            self.pt.log()

192
    @profiler_decorator
193 194 195 196 197 198 199 200 201 202 203
    def _handlePacket(self, conn, packet, handler=None):
        """
          conn
            The connection which received the packet (forwarded to handler).
          packet
            The packet to handle.
          handler
            The handler to use to handle packet.
            If not given, it will be guessed from connection's not type.
        """
        if handler is None:
204 205
            # Guess the handler to use based on the type of node on the
            # connection
206 207 208 209 210 211 212 213 214 215
            node = self.nm.getByAddress(conn.getAddress())
            if node is None:
                raise ValueError, 'Expecting an answer from a node ' \
                    'which type is not known... Is this right ?'
            if node.isStorage():
                handler = self.storage_handler
            elif node.isMaster():
                handler = self.primary_handler
            else:
                raise ValueError, 'Unknown node type: %r' % (node.__class__, )
216 217 218 219 220
        conn.lock()
        try:
            handler.dispatch(conn, packet)
        finally:
            conn.unlock()
221

222
    @profiler_decorator
223 224 225 226 227 228 229
    def _waitAnyMessage(self, block=True):
        """
          Handle all pending packets.
          block
            If True (default), will block until at least one packet was
            received.
        """
230 231 232
        pending = self.dispatcher.pending
        queue = self.local_var.queue
        get = queue.get
233
        _handlePacket = self._handlePacket
234
        while pending(queue):
235 236 237
            try:
                conn, packet = get(block)
            except Empty:
238
                break
239 240
            if packet is None or isinstance(packet, ForgottenPacket):
                # connection was closed or some packet was forgotten
241
                continue
242 243 244 245 246 247
            block = False
            try:
                _handlePacket(conn, packet)
            except ConnectionClosed:
                pass

248
    @profiler_decorator
249 250 251 252 253 254
    def _waitMessage(self, target_conn, msg_id, handler=None):
        """Wait for a message returned by the dispatcher in queues."""
        get = self.local_var.queue.get
        _handlePacket = self._handlePacket
        while True:
            conn, packet = get(True)
255
            is_forgotten = isinstance(packet, ForgottenPacket)
256 257 258 259 260
            if target_conn is conn:
                # check fake packet
                if packet is None:
                    raise ConnectionClosed
                if msg_id == packet.getId():
261 262 263
                    if is_forgotten:
                        raise ValueError, 'ForgottenPacket for an ' \
                            'explicitely expected packet.'
264 265
                    self._handlePacket(conn, packet, handler=handler)
                    break
266
            elif not is_forgotten and packet is not None:
267
                self._handlePacket(conn, packet)
268

269
    @profiler_decorator
270
    def _askStorage(self, conn, packet):
271
        """ Send a request to a storage node and process it's answer """
272
        msg_id = conn.ask(packet, queue=self.local_var.queue)
273 274
        self._waitMessage(conn, msg_id, self.storage_handler)

275
    @profiler_decorator
276
    def _askPrimary(self, packet):
277
        """ Send a request to the primary master and process it's answer """
278
        conn = self._getMasterConnection()
279
        msg_id = conn.ask(packet, queue=self.local_var.queue)
280 281
        self._waitMessage(conn, msg_id, self.primary_handler)

282
    @profiler_decorator
283 284
    def _getMasterConnection(self):
        """ Connect to the primary master node on demand """
285
        # acquire the lock to allow only one thread to connect to the primary
286 287 288 289
        result = self.master_conn
        if result is None:
            self._connecting_to_master_node_acquire()
            try:
290
                self.new_oid_list = []
291 292 293 294 295
                result = self._connectToPrimaryNode()
                self.master_conn = result
            finally:
                self._connecting_to_master_node_release()
        return result
296

297
    def _getPartitionTable(self):
298
        """ Return the partition table manager, reconnect the PMN if needed """
299 300 301 302 303
        # this ensure the master connection is established and the partition
        # table is up to date.
        self._getMasterConnection()
        return self.pt

304
    @profiler_decorator
305 306
    def _getCellListForOID(self, oid, readable=False, writable=False):
        """ Return the cells available for the specified OID """
307
        pt = self._getPartitionTable()
308 309 310 311
        return pt.getCellListForOID(oid, readable, writable)

    def _getCellListForTID(self, tid, readable=False, writable=False):
        """ Return the cells available for the specified TID """
312
        pt = self._getPartitionTable()
313
        return pt.getCellListForTID(tid, readable, writable)
314

315
    @profiler_decorator
316
    def _connectToPrimaryNode(self):
317
        logging.debug('connecting to primary master...')
318 319
        ready = False
        nm = self.nm
320
        queue = self.local_var.queue
321 322 323 324 325 326 327 328
        while not ready:
            # Get network connection to primary master
            index = 0
            connected = False
            while not connected:
                if self.primary_master_node is not None:
                    # If I know a primary master node, pinpoint it.
                    self.trying_master_node = self.primary_master_node
329
                    self.primary_master_node = None
330 331
                else:
                    # Otherwise, check one by one.
332
                    master_list = nm.getMasterList()
333 334 335
                    try:
                        self.trying_master_node = master_list[index]
                    except IndexError:
336
                        time.sleep(1)
337 338 339 340
                        index = 0
                        self.trying_master_node = master_list[0]
                    index += 1
                # Connect to master
341
                conn = MTClientConnection(self.em,
342
                        self.notifications_handler,
343
                        addr=self.trying_master_node.getAddress(),
344
                        connector=self.connector_handler(),
345
                        dispatcher=self.dispatcher)
346
                # Query for primary master node
347 348 349 350 351
                if conn.getConnector() is None:
                    # This happens if a connection could not be established.
                    logging.error('Connection to master node %s failed',
                                  self.trying_master_node)
                    continue
352
                msg_id = conn.ask(Packets.AskPrimary(), queue=queue)
353
                try:
354
                    self._waitMessage(conn, msg_id,
355
                            handler=self.primary_bootstrap_handler)
356 357
                except ConnectionClosed:
                    continue
358
                # If we reached the primary master node, mark as connected
359 360
                connected = self.primary_master_node is not None and \
                        self.primary_master_node is self.trying_master_node
361

362
            logging.info('connected to a primary master node')
363
            # Identify to primary master and request initial data
364
            while conn.getUUID() is None:
365 366 367 368 369 370 371
                if conn.getConnector() is None:
                    logging.error('Connection to master node %s lost',
                                  self.trying_master_node)
                    self.primary_master_node = None
                    break
                p = Packets.RequestIdentification(NodeTypes.CLIENT,
                        self.uuid, None, self.name)
372
                msg_id = conn.ask(p, queue=queue)
373
                try:
374
                    self._waitMessage(conn, msg_id,
375
                            handler=self.primary_bootstrap_handler)
376 377 378
                except ConnectionClosed:
                    self.primary_master_node = None
                    break
379
                if conn.getUUID() is None:
380
                    # Node identification was refused by master.
381
                    time.sleep(1)
382
            if self.uuid is not None:
383
                msg_id = conn.ask(Packets.AskNodeInformation(), queue=queue)
384
                self._waitMessage(conn, msg_id,
385
                        handler=self.primary_bootstrap_handler)
386
                msg_id = conn.ask(Packets.AskPartitionTable(), queue=queue)
387
                self._waitMessage(conn, msg_id,
388
                        handler=self.primary_bootstrap_handler)
389
            ready = self.uuid is not None and self.pt is not None \
390
                                 and self.pt.operational()
391
        logging.info("connected to primary master node %s" %
392
                self.primary_master_node)
393
        return conn
394

395 396 397
    def registerDB(self, db, limit):
        self._db = db

398 399 400
    def getDB(self):
        return self._db

401
    @profiler_decorator
402 403 404 405 406
    def new_oid(self):
        """Get a new OID."""
        self._oid_lock_acquire()
        try:
            if len(self.new_oid_list) == 0:
407 408 409 410
                # Get new oid list from master node
                # we manage a list of oid here to prevent
                # from asking too many time new oid one by one
                # from master node
411
                self._askPrimary(Packets.AskNewOIDs(100))
412 413
                if len(self.new_oid_list) <= 0:
                    raise NEOStorageError('new_oid failed')
414
            self.last_oid = self.new_oid_list.pop(0)
415
            return self.last_oid
416 417 418
        finally:
            self._oid_lock_release()

419 420 421
    def getStorageSize(self):
        # return the last OID used, this is innacurate
        return int(u64(self.last_oid))
422

423
    @profiler_decorator
Aurel's avatar
Aurel committed
424 425 426 427
    def getSerial(self, oid):
        # Try in cache first
        self._cache_lock_acquire()
        try:
428 429
            if oid in self.mq_cache:
                return self.mq_cache[oid][0]
Aurel's avatar
Aurel committed
430 431 432
        finally:
            self._cache_lock_release()
        # history return serial, so use it
Grégory Wisniewski's avatar
Grégory Wisniewski committed
433
        hist = self.history(oid, size=1, object_only=1)
Aurel's avatar
Aurel committed
434 435
        if len(hist) == 0:
            raise NEOStorageNotFoundError()
436 437 438 439
        if hist[0] != oid:
            raise NEOStorageError('getSerial failed')
        return hist[1][0][0]

Aurel's avatar
Aurel committed
440

441
    @profiler_decorator
442
    def _load(self, oid, serial=None, tid=None, cache=0):
Vincent Pelletier's avatar
Vincent Pelletier committed
443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467
        """
        Internal method which manage load, loadSerial and loadBefore.
        OID and TID (serial) parameters are expected packed.
        oid
            OID of object to get.
        serial
            If given, the exact serial at which OID is desired.
            tid should be None.
        tid
            If given, the excluded upper bound serial at which OID is desired.
            serial should be None.
        cache
            Store data in cache for future lookups.

        Return value: (3-tuple)
        - Object data (None if object creation was undone).
        - Serial of given data.
        - Next serial at which object exists, or None. Only set when tid
          parameter is not None.

        Exceptions:
            NEOStorageError
                technical problem
            NEOStorageNotFoundError
                object exists but no data satisfies given parameters
Vincent Pelletier's avatar
Vincent Pelletier committed
468
            NEOStorageDoesNotExistError
Vincent Pelletier's avatar
Vincent Pelletier committed
469
                object doesn't exist
470 471
            NEOStorageCreationUndoneError
                object existed, but its creation was undone
Vincent Pelletier's avatar
Vincent Pelletier committed
472 473 474
        """
        # TODO:
        # - rename parameters (here and in handlers & packet definitions)
475
        cell_list = self._getCellListForOID(oid, readable=True)
476 477
        if len(cell_list) == 0:
            # No cells available, so why are we running ?
478 479
            raise NEOStorageError('No storage available for oid %s' % (
                dump(oid), ))
480 481

        shuffle(cell_list)
482
        cell_list.sort(key=self.cp.getCellSortKey)
483
        self.local_var.asked_object = 0
484
        packet = Packets.AskObject(oid, serial, tid)
485
        for cell in cell_list:
486 487
            logging.debug('trying to load %s at %s before %s from %s',
                dump(oid), dump(serial), dump(tid), dump(cell.getUUID()))
488
            conn = self.cp.getConnForCell(cell)
489 490
            if conn is None:
                continue
491

492
            try:
493
                self._askStorage(conn, packet)
494
            except ConnectionClosed:
495
                continue
496

497 498 499 500 501 502
            # Check data
            noid, start_serial, end_serial, compression, checksum, data \
                = self.local_var.asked_object
            if noid != oid:
                # Oops, try with next node
                logging.error('got wrong oid %s instead of %s from node %s',
503
                              noid, dump(oid), cell.getAddress())
504 505 506 507 508
                self.local_var.asked_object = -1
                continue
            elif checksum != makeChecksum(data):
                # Check checksum.
                logging.error('wrong checksum from node %s for oid %s',
509
                              cell.getAddress(), dump(oid))
510 511 512 513 514 515 516
                self.local_var.asked_object = -1
                continue
            else:
                # Everything looks alright.
                break

        if self.local_var.asked_object == 0:
517
            # We didn't got any object from all storage node because of
518
            # connection error
519
            raise NEOStorageError('connection failure')
520

Aurel's avatar
Aurel committed
521
        if self.local_var.asked_object == -1:
522
            raise NEOStorageError('inconsistent data')
523

524
        # Uncompress data
Aurel's avatar
Aurel committed
525
        if compression:
526
            data = decompress(data)
527

Aurel's avatar
Aurel committed
528 529
        # Put in cache only when using load
        if cache:
Aurel's avatar
Aurel committed
530
            self._cache_lock_acquire()
Aurel's avatar
Aurel committed
531
            try:
532
                self.mq_cache[oid] = start_serial, data
Aurel's avatar
Aurel committed
533
            finally:
Aurel's avatar
Aurel committed
534
                self._cache_lock_release()
535
        if data == '':
536
            raise NEOStorageCreationUndoneError(dump(oid))
537
        return data, start_serial, end_serial
538

539

540
    @profiler_decorator
541 542 543
    def load(self, oid, version=None):
        """Load an object for a given oid."""
        # First try from cache
544
        self._load_lock_acquire()
545
        try:
546 547 548 549
            self._cache_lock_acquire()
            try:
                if oid in self.mq_cache:
                    logging.debug('load oid %s is cached', dump(oid))
550 551
                    serial, data = self.mq_cache[oid]
                    return data, serial
552 553 554
            finally:
                self._cache_lock_release()
            # Otherwise get it from storage node
555 556 557 558 559 560 561
            result = self._load(oid, cache=1)[:2]
            # Start a network barrier, so we get all invalidations *after* we
            # received data. This ensures we get any invalidation message that
            # would have been about the version we loaded.
            # Those invalidations are checked at ZODB level, so it decides if
            # loaded data can be handed to current transaction or if a separate
            # loadBefore call is required.
562
            # XXX: A better implementation is required to improve performances
563
            self.invalidationBarrier()
564
            return result
565
        finally:
566
            self._load_lock_release()
Aurel's avatar
Aurel committed
567

568

569
    @profiler_decorator
570
    def loadSerial(self, oid, serial):
Aurel's avatar
Aurel committed
571
        """Load an object for a given oid and serial."""
572 573
        # Do not try in cache as it manages only up-to-date object
        logging.debug('loading %s at %s', dump(oid), dump(serial))
574
        return self._load(oid, serial=serial)[0]
Aurel's avatar
Aurel committed
575

576

577
    @profiler_decorator
578
    def loadBefore(self, oid, tid):
Aurel's avatar
Aurel committed
579
        """Load an object for a given oid before tid committed."""
580 581
        # Do not try in cache as it manages only up-to-date object
        logging.debug('loading %s before %s', dump(oid), dump(tid))
582
        return self._load(oid, tid=tid)
Aurel's avatar
Aurel committed
583

584

585
    @profiler_decorator
586 587 588
    def tpc_begin(self, transaction, tid=None, status=' '):
        """Begin a new transaction."""
        # First get a transaction, only one is allowed at a time
589
        if self.local_var.txn is transaction:
590
            # We already begin the same transaction
591
            raise StorageTransactionError('Duplicate tpc_begin calls')
592 593
        if self.local_var.txn is not None:
            raise NeoException, 'local_var is not clean in tpc_begin'
594 595 596 597 598 599
        # use the given TID or request a new one to the master
        self.local_var.tid = tid
        if tid is None:
            self._askPrimary(Packets.AskBeginTransaction())
            if self.local_var.tid is None:
                raise NEOStorageError('tpc_begin failed')
600
        self.local_var.txn = transaction
601

602
    @profiler_decorator
Vincent Pelletier's avatar
Vincent Pelletier committed
603
    def store(self, oid, serial, data, version, transaction):
604
        """Store object."""
605
        if transaction is not self.local_var.txn:
606
            raise StorageTransactionError(self, transaction)
Aurel's avatar
Aurel committed
607
        logging.debug('storing oid %s serial %s',
608
                     dump(oid), dump(serial))
609 610 611 612
        self._store(oid, serial, data)
        return None

    def _store(self, oid, serial, data, data_serial=None):
613
        # Find which storage node to use
614
        cell_list = self._getCellListForOID(oid, writable=True)
615
        if len(cell_list) == 0:
Aurel's avatar
Aurel committed
616
            raise NEOStorageError
617 618
        if data is None or data_serial is not None:
            assert data is None or data_serial is None, data_serial
619
            # this is a George Bailey object, stored as an empty string
620
            data = compressed_data = ''
621
            compression = 0
622 623 624 625 626 627 628 629 630
        else:
            assert data_serial is None
            if self.compress:
                compressed_data = compress(data)
                if len(compressed_data) > len(data):
                    compressed_data = data
                    compression = 0
                else:
                    compression = 1
631
        checksum = makeChecksum(compressed_data)
632
        p = Packets.AskStoreObject(oid, serial, compression,
633
                 checksum, compressed_data, data_serial, self.local_var.tid)
634
        on_timeout = OnTimeout(self.onStoreTimeout, self.local_var.tid, oid)
635
        # Store object in tmp cache
636 637 638 639 640
        local_var = self.local_var
        data_dict = local_var.data_dict
        if oid not in data_dict:
            local_var.data_list.append(oid)
        data_dict[oid] = data
641
        # Store data on each node
642
        self.local_var.object_stored_counter_dict[oid] = {}
643
        self.local_var.object_serial_dict[oid] = serial
644
        getConnForCell = self.cp.getConnForCell
645
        queue = self.local_var.queue
646
        add_involved_nodes = self.local_var.involved_nodes.add
647
        for cell in cell_list:
648
            conn = getConnForCell(cell)
649
            if conn is None:
650
                continue
651
            try:
652
                conn.ask(p, on_timeout=on_timeout, queue=queue)
653
                add_involved_nodes(cell.getNode())
654
            except ConnectionClosed:
655
                continue
656

657
        self._waitAnyMessage(False)
658

659
    def onStoreTimeout(self, conn, msg_id, tid, oid):
660 661 662 663
        # NOTE: this method is called from poll thread, don't use
        # local_var !
        # Stop expecting the timed-out store request.
        queue = self.dispatcher.forget(conn, msg_id)
Vincent Pelletier's avatar
Vincent Pelletier committed
664 665
        # Ask the storage if someone locks the object.
        # Shorten timeout to react earlier to an unresponding storage.
666
        conn.ask(Packets.AskHasLock(tid, oid), timeout=5, queue=queue)
Vincent Pelletier's avatar
Vincent Pelletier committed
667 668
        return True

669
    @profiler_decorator
670 671 672 673 674 675 676
    def _handleConflicts(self, tryToResolveConflict):
        result = []
        append = result.append
        local_var = self.local_var
        # Check for conflicts
        data_dict = local_var.data_dict
        object_serial_dict = local_var.object_serial_dict
677 678
        conflict_serial_dict = local_var.conflict_serial_dict
        resolved_conflict_serial_dict = local_var.resolved_conflict_serial_dict
679 680 681 682 683 684 685 686
        for oid, conflict_serial_set in conflict_serial_dict.items():
            resolved_serial_set = resolved_conflict_serial_dict.setdefault(
                oid, set())
            conflict_serial = max(conflict_serial_set)
            if resolved_serial_set and conflict_serial <= max(resolved_serial_set):
                # A later serial has already been resolved, skip.
                resolved_serial_set.update(conflict_serial_dict.pop(oid))
                continue
687
            serial = object_serial_dict[oid]
688 689 690 691 692 693 694
            data = data_dict[oid]
            tid = local_var.tid
            resolved = False
            if conflict_serial <= tid:
                new_data = tryToResolveConflict(oid, conflict_serial, serial,
                    data)
                if new_data is not None:
695 696
                    logging.info('Conflict resolution succeed for %r:%r with %r',
                        dump(oid), dump(serial), dump(conflict_serial))
697
                    # Mark this conflict as resolved
698
                    resolved_serial_set.update(conflict_serial_dict.pop(oid))
699
                    # Try to store again
700
                    self._store(oid, conflict_serial, new_data)
701 702
                    append(oid)
                    resolved = True
703 704 705 706 707
                else:
                    logging.info('Conflict resolution failed for %r:%r with %r',
                        dump(oid), dump(serial), dump(conflict_serial))
            else:
                logging.info('Conflict reported for %r:%r with later ' \
Grégory Wisniewski's avatar
Grégory Wisniewski committed
708 709
                    'transaction %r , cannot resolve conflict.', dump(oid),
                    dump(serial), dump(conflict_serial))
710 711 712
            if not resolved:
                # XXX: Is it really required to remove from data_dict ?
                del data_dict[oid]
713
                local_var.data_list.remove(oid)
714 715 716
                raise ConflictError(oid=oid,
                    serials=(tid, serial), data=data)
        return result
717

718 719 720 721 722 723 724 725 726 727
    @profiler_decorator
    def waitResponses(self):
        """Wait for all requests to be answered (or their connection to be
        dected as closed)"""
        queue = self.local_var.queue
        pending = self.dispatcher.pending
        _waitAnyMessage = self._waitAnyMessage
        while pending(queue):
            _waitAnyMessage()

728
    @profiler_decorator
729 730 731 732 733 734 735 736 737
    def waitStoreResponses(self, tryToResolveConflict):
        result = []
        append = result.append
        resolved_oid_set = set()
        update = resolved_oid_set.update
        local_var = self.local_var
        tid = local_var.tid
        _handleConflicts = self._handleConflicts
        while True:
738
            self.waitResponses()
739 740 741 742 743 744 745
            conflicts = _handleConflicts(tryToResolveConflict)
            if conflicts:
                update(conflicts)
            else:
                # No more conflict resolutions to do, no more pending store
                # requests
                break
746

Vincent Pelletier's avatar
Vincent Pelletier committed
747
        # Check for never-stored objects, and update result for all others
748
        for oid, store_dict in \
Vincent Pelletier's avatar
Vincent Pelletier committed
749
            local_var.object_stored_counter_dict.iteritems():
750
            if not store_dict:
Vincent Pelletier's avatar
Vincent Pelletier committed
751 752 753 754 755
                raise NEOStorageError('tpc_store failed')
            elif oid in resolved_oid_set:
                append((oid, ResolvedSerial))
            else:
                append((oid, tid))
756
        return result
Aurel's avatar
Aurel committed
757

758
    @profiler_decorator
759
    def tpc_vote(self, transaction, tryToResolveConflict):
760
        """Store current transaction."""
761 762
        local_var = self.local_var
        if transaction is not local_var.txn:
763
            raise StorageTransactionError(self, transaction)
764 765 766

        result = self.waitStoreResponses(tryToResolveConflict)

767
        tid = local_var.tid
768
        # Store data on each node
769
        voted_counter = 0
770 771
        p = Packets.AskStoreTransaction(tid, str(transaction.user),
            str(transaction.description), dumps(transaction._extension),
772
            local_var.data_list)
773
        add_involved_nodes = self.local_var.involved_nodes.add
774
        for cell in self._getCellListForTID(tid, writable=True):
775 776
            logging.debug("voting object %s %s", cell.getAddress(),
                cell.getState())
777
            conn = self.cp.getConnForCell(cell)
778 779
            if conn is None:
                continue
780

781
            local_var.txn_voted = False
782 783
            try:
                self._askStorage(conn, p)
784
                add_involved_nodes(cell.getNode())
785
            except ConnectionClosed:
786
                continue
787

788
            if not self.isTransactionVoted():
789
                raise NEOStorageError('tpc_vote failed')
790
            voted_counter += 1
791 792

        # check at least one storage node accepted
793
        if voted_counter == 0:
794
            raise NEOStorageError('tpc_vote failed')
795 796 797 798 799
        # Check if master connection is still alive.
        # This is just here to lower the probability of detecting a problem
        # in tpc_finish, as we should do our best to detect problem before
        # tpc_finish.
        self._getMasterConnection()
800

801 802
        return result

803
    @profiler_decorator
804 805
    def tpc_abort(self, transaction):
        """Abort current transaction."""
806
        if transaction is not self.local_var.txn:
807
            return
Aurel's avatar
Aurel committed
808

809 810
        tid = self.local_var.tid
        p = Packets.AbortTransaction(tid)
811
        getConnForNode = self.cp.getConnForNode
812
        # cancel transaction one all those nodes
813
        for node in self.local_var.involved_nodes:
814
            conn = getConnForNode(node)
815 816
            if conn is None:
                continue
817 818 819 820 821
            try:
                conn.notify(p)
            except:
                logging.error('Exception in tpc_abort while notifying ' \
                    'storage node %r of abortion, ignoring.', conn, exc_info=1)
822

823 824 825 826 827 828 829 830 831 832
        # Just wait for responses to arrive. If any leads to an exception,
        # log it and continue: we *must* eat all answers to not disturb the
        # next transaction.
        queue = self.local_var.queue
        pending = self.dispatcher.pending
        _waitAnyMessage = self._waitAnyMessage
        while pending(queue):
            try:
                _waitAnyMessage()
            except:
833 834
                logging.error('Exception in tpc_abort while handling ' \
                    'pending answers, ignoring.', exc_info=1)
835

836
        self.local_var.clear()
837

838
    @profiler_decorator
839
    def tpc_finish(self, transaction, tryToResolveConflict, f=None):
840
        """Finish current transaction."""
841
        if self.local_var.txn is not transaction:
842
            raise StorageTransactionError('tpc_finish called for wrong '
843
                'transaction')
844 845
        if not self.local_var.txn_voted:
            self.tpc_vote(transaction, tryToResolveConflict)
846
        self._load_lock_acquire()
847
        try:
848
            tid = self.local_var.tid
849 850
            # Call function given by ZODB
            if f is not None:
851
                f(tid)
852 853

            # Call finish on master
854
            oid_list = self.local_var.data_list
855
            p = Packets.AskFinishTransaction(tid, oid_list)
856
            self._askPrimary(p)
857

858
            if not self.isTransactionFinished():
859
                raise NEOStorageError('tpc_finish failed')
Aurel's avatar
Aurel committed
860

861 862 863
            # Update cache
            self._cache_lock_acquire()
            try:
864
                mq_cache = self.mq_cache
865
                for oid, data in self.local_var.data_dict.iteritems():
866
                    if data == '':
867 868
                        if oid in mq_cache:
                            del mq_cache[oid]
869 870
                    else:
                        # Now serial is same as tid
871
                        mq_cache[oid] = tid, data
872 873
            finally:
                self._cache_lock_release()
874
            self.local_var.clear()
875
            return tid
876
        finally:
877
            self._load_lock_release()
878

879
    def undo(self, undone_tid, txn, tryToResolveConflict):
880
        if txn is not self.local_var.txn:
881
            raise StorageTransactionError(self, undone_tid)
882

883
        # First get transaction information from a storage node.
884
        cell_list = self._getCellListForTID(undone_tid, readable=True)
885
        shuffle(cell_list)
886
        cell_list.sort(key=self.cp.getCellSortKey)
887
        packet = Packets.AskTransactionInformation(undone_tid)
888
        getConnForCell = self.cp.getConnForCell
889
        for cell in cell_list:
890
            conn = getConnForCell(cell)
891 892
            if conn is None:
                continue
893

894
            self.local_var.txn_info = 0
895
            self.local_var.txn_ext = 0
896
            try:
897
                self._askStorage(conn, packet)
898
            except ConnectionClosed:
899
                continue
900
            except NEOStorageNotFoundError:
901
                # Tid not found, try with next node
902
                logging.warning('Transaction %s was not found on node %s',
903
                    dump(undone_tid), self.nm.getByAddress(conn.getAddress()))
904
                continue
905 906

            if isinstance(self.local_var.txn_info, dict):
907 908 909
                break
            else:
                raise NEOStorageError('undo failed')
910
        else:
911 912
            raise NEOStorageError('undo failed')

913
        tid = self.local_var.tid
914
        oid_list = self.local_var.txn_info['oids']
915

916 917 918 919
        # Regroup objects per partition, to ask a minimum set of storage.
        partition_oid_dict = {}
        pt = self._getPartitionTable()
        for oid in oid_list:
920
            partition = pt.getPartition(oid)
921 922 923 924 925 926 927 928 929 930
            try:
                oid_list = partition_oid_dict[partition]
            except KeyError:
                oid_list = partition_oid_dict[partition] = []
            oid_list.append(oid)

        # Ask storage the undo serial (serial at which object's previous data
        # is)
        getCellList = pt.getCellList
        getCellSortKey = self.cp.getCellSortKey
931
        queue = self.local_var.queue
932 933 934 935 936 937 938 939 940 941 942 943 944 945 946
        undo_object_tid_dict = self.local_var.undo_object_tid_dict = {}
        for partition, oid_list in partition_oid_dict.iteritems():
            cell_list = getCellList(partition, readable=True)
            shuffle(cell_list)
            cell_list.sort(key=getCellSortKey)
            storage_conn = getConnForCell(cell_list[0])
            storage_conn.ask(Packets.AskObjectUndoSerial(tid, undone_tid,
                oid_list), queue=queue)

        # Wait for all AnswerObjectUndoSerial. We might get OidNotFoundError,
        # meaning that objects in transaction's oid_list do not exist any
        # longer. This is the symptom of a pack, so forbid undoing transaction
        # when it happens, but sill keep waiting for answers.
        failed = False
        while True:
947
            try:
948 949 950
                self.waitResponses()
            except NEOStorageNotFoundError:
                failed = True
951
            else:
952 953 954
                break
        if failed:
            raise UndoError('non-undoable transaction')
955

956
        # Send undo data to all storage nodes.
Aurel's avatar
Aurel committed
957
        for oid in oid_list:
958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982
            current_serial, undo_serial, is_current = undo_object_tid_dict[oid]
            if is_current:
                data = None
            else:
                # Serial being undone is not the latest version for this
                # object. This is an undo conflict, try to resolve it.
                try:
                    # Load the latest version we are supposed to see
                    data = self.loadSerial(oid, current_serial)
                    # Load the version we were undoing to
                    undo_data = self.loadSerial(oid, undo_serial)
                except NEOStorageNotFoundError:
                    raise UndoError('Object not found while resolving undo '
                        'conflict')
                # Resolve conflict
                try:
                    data = tryToResolveConflict(oid, current_serial,
                        undone_tid, undo_data, data)
                except ConflictError:
                    data = None
                if data is None:
                    raise UndoError('Some data were modified by a later ' \
                        'transaction', oid)
                undo_serial = None
            self._store(oid, current_serial, data, undo_serial)
983
        return tid, oid_list
984

985 986 987 988
    def _insertMetadata(self, txn_info, extension):
        for k, v in loads(extension).items():
            txn_info[k] = v

989
    def __undoLog(self, first, last, filter=None, block=0, with_oids=False):
990 991 992 993
        if last < 0:
            # See FileStorage.py for explanation
            last = first - last

994
        # First get a list of transactions from all storage nodes.
995 996
        # Each storage node will return TIDs only for UP_TO_DATE state and
        # FEEDING state cells
997
        pt = self._getPartitionTable()
998
        storage_node_list = pt.getNodeList()
999

1000
        self.local_var.node_tids = {}
1001
        queue = self.local_var.queue
1002
        for storage_node in storage_node_list:
1003
            conn = self.cp.getConnForNode(storage_node)
1004 1005
            if conn is None:
                continue
1006
            conn.ask(Packets.AskTIDs(first, last, INVALID_PARTITION), queue=queue)
1007 1008

        # Wait for answers from all storages.
1009
        self.waitResponses()
1010 1011

        # Reorder tids
1012 1013 1014
        ordered_tids = set()
        update = ordered_tids.update
        for tid_list in self.local_var.node_tids.itervalues():
1015
            update(tid_list)
1016
        ordered_tids = list(ordered_tids)
1017
        ordered_tids.sort(reverse=True)
1018
        logging.debug("UndoLog tids %s", [dump(x) for x in ordered_tids])
1019 1020
        # For each transaction, get info
        undo_info = []
1021
        append = undo_info.append
1022
        for tid in ordered_tids:
1023
            cell_list = self._getCellListForTID(tid, readable=True)
1024
            shuffle(cell_list)
1025
            cell_list.sort(key=self.cp.getCellSortKey)
1026
            for cell in cell_list:
1027
                conn = self.cp.getConnForCell(cell)
1028 1029
                if conn is not None:
                    self.local_var.txn_info = 0
1030
                    self.local_var.txn_ext = 0
1031
                    try:
1032
                        self._askStorage(conn,
1033
                                Packets.AskTransactionInformation(tid))
1034
                    except ConnectionClosed:
1035 1036 1037
                        continue
                    if isinstance(self.local_var.txn_info, dict):
                        break
1038

1039
            if self.local_var.txn_info in (-1, 0):
Aurel's avatar
Aurel committed
1040
                # TID not found at all
1041 1042 1043
                raise NeoException, 'Data inconsistency detected: ' \
                                    'transaction info for TID %r could not ' \
                                    'be found' % (tid, )
Aurel's avatar
Aurel committed
1044

1045
            if filter is None or filter(self.local_var.txn_info):
1046
                self.local_var.txn_info.pop('packed')
1047 1048
                if not with_oids:
                    self.local_var.txn_info.pop("oids")
1049
                append(self.local_var.txn_info)
1050 1051
                self._insertMetadata(self.local_var.txn_info,
                        self.local_var.txn_ext)
1052 1053
                if len(undo_info) >= last - first:
                    break
1054 1055 1056
        # Check we return at least one element, otherwise call
        # again but extend offset
        if len(undo_info) == 0 and not block:
1057 1058
            undo_info = self.__undoLog(first=first, last=last*5, filter=filter,
                    block=1, with_oids=with_oids)
1059 1060
        return undo_info

1061
    def undoLog(self, first, last, filter=None, block=0):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
1062
        return self.__undoLog(first, last, filter, block)
1063 1064

    def transactionLog(self, first, last):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
1065
        return self.__undoLog(first, last, with_oids=True)
1066

Grégory Wisniewski's avatar
Grégory Wisniewski committed
1067
    def history(self, oid, version=None, size=1, filter=None, object_only=0):
1068
        # Get history informations for object first
1069
        cell_list = self._getCellListForOID(oid, readable=True)
1070
        shuffle(cell_list)
1071
        cell_list.sort(key=self.cp.getCellSortKey)
1072
        for cell in cell_list:
1073 1074
            # FIXME: we keep overwriting self.local_var.history here, we
            # should aggregate it instead.
1075
            conn = self.cp.getConnForCell(cell)
1076 1077
            if conn is None:
                continue
1078

1079
            self.local_var.history = None
1080
            try:
Grégory Wisniewski's avatar
Grégory Wisniewski committed
1081
                self._askStorage(conn, Packets.AskObjectHistory(oid, 0, size))
1082
            except ConnectionClosed:
1083
                continue
1084

1085 1086
            if self.local_var.history[0] != oid:
                # Got history for wrong oid
1087 1088 1089
                raise NEOStorageError('inconsistency in storage: asked oid ' \
                                      '%r, got %r' % (
                                      oid, self.local_var.history[0]))
1090

1091
        if not isinstance(self.local_var.history, tuple):
1092 1093
            raise NEOStorageError('history failed')

1094 1095
        if self.local_var.history[1] == [] or \
            self.local_var.history[1][0][1] == 0:
1096 1097 1098 1099
            # KeyError expected if no history was found
            # XXX: this may requires an error from the storages
            raise KeyError

1100 1101 1102 1103
        if object_only:
            # Use by getSerial
            return self.local_var.history

1104
        # Now that we have object informations, get txn informations
Aurel's avatar
Aurel committed
1105
        history_list = []
1106
        for serial, size in self.local_var.history[1]:
1107
            self._getCellListForTID(serial, readable=True)
1108
            shuffle(cell_list)
1109
            cell_list.sort(key=self.cp.getCellSortKey)
1110
            for cell in cell_list:
1111
                conn = self.cp.getConnForCell(cell)
1112 1113
                if conn is None:
                    continue
1114

1115 1116
                # ask transaction information
                self.local_var.txn_info = None
1117
                try:
1118
                    self._askStorage(conn,
1119
                            Packets.AskTransactionInformation(serial))
1120
                except ConnectionClosed:
1121
                    continue
1122
                except NEOStorageNotFoundError:
1123
                    # TID not found
Aurel's avatar
Aurel committed
1124
                    continue
1125
                if isinstance(self.local_var.txn_info, dict):
1126 1127 1128
                    break

            # create history dict
1129 1130
            self.local_var.txn_info.pop('id')
            self.local_var.txn_info.pop('oids')
1131
            self.local_var.txn_info.pop('packed')
1132
            self.local_var.txn_info['tid'] = serial
1133
            self.local_var.txn_info['version'] = ''
1134
            self.local_var.txn_info['size'] = size
1135 1136
            if filter is None or filter(self.local_var.txn_info):
                history_list.append(self.local_var.txn_info)
1137 1138
            self._insertMetadata(self.local_var.txn_info,
                    self.local_var.txn_ext)
1139 1140

        return history_list
Aurel's avatar
Aurel committed
1141

1142
    @profiler_decorator
1143
    def importFrom(self, source, start, stop, tryToResolveConflict):
1144 1145 1146 1147 1148 1149 1150 1151 1152 1153
        serials = {}
        def updateLastSerial(oid, result):
            if result:
                if isinstance(result, str):
                    assert oid is not None
                    serials[oid] = result
                else:
                    for oid, serial in result:
                        assert isinstance(serial, str), serial
                        serials[oid] = serial
1154
        transaction_iter = source.iterator(start, stop)
1155 1156 1157 1158 1159 1160 1161 1162 1163
        for transaction in transaction_iter:
            self.tpc_begin(transaction, transaction.tid, transaction.status)
            for r in transaction:
                pre = serials.get(r.oid, None)
                # TODO: bypass conflict resolution, locks...
                result = self.store(r.oid, pre, r.data, r.version, transaction)
                updateLastSerial(r.oid, result)
            updateLastSerial(None, self.tpc_vote(transaction,
                        tryToResolveConflict))
1164
            self.tpc_finish(transaction, tryToResolveConflict)
1165 1166
        transaction_iter.close()

1167 1168 1169
    def iterator(self, start=None, stop=None):
        return Iterator(self, start, stop)

1170 1171 1172 1173 1174
    def lastTransaction(self):
        # XXX: this doesn't consider transactions created by other clients,
        #  should ask the primary master
        return self.local_var.tid

1175 1176 1177 1178 1179 1180 1181 1182 1183 1184
    def abortVersion(self, src, transaction):
        if transaction is not self.local_var.txn:
            raise StorageTransactionError(self, transaction)
        return '', []

    def commitVersion(self, src, dest, transaction):
        if transaction is not self.local_var.txn:
            raise StorageTransactionError(self, transaction)
        return '', []

Grégory Wisniewski's avatar
Grégory Wisniewski committed
1185 1186 1187 1188
    def loadEx(self, oid, version):
        data, serial = self.load(oid=oid)
        return data, serial, ''

1189 1190 1191 1192 1193
    def __del__(self):
        """Clear all connection."""
        # Due to bug in ZODB, close is not always called when shutting
        # down zope, so use __del__ to close connections
        for conn in self.em.getConnectionList():
1194
            conn.close()
1195 1196
        # Stop polling thread
        self.poll_thread.stop()
1197
    close = __del__
1198

1199 1200 1201
    def invalidationBarrier(self):
        self._askPrimary(Packets.AskBarrier())

1202
    def sync(self):
1203
        self._waitAnyMessage(False)
1204

1205 1206 1207 1208 1209 1210 1211 1212 1213
    def setNodeReady(self):
        self.local_var.node_ready = True

    def setNodeNotReady(self):
        self.local_var.node_ready = False

    def isNodeReady(self):
        return self.local_var.node_ready

1214
    def setTID(self, value):
1215
        self.local_var.tid = value
1216 1217

    def getTID(self):
1218
        return self.local_var.tid
1219 1220

    def setTransactionFinished(self):
1221
        self.local_var.txn_finished = True
1222 1223

    def isTransactionFinished(self):
1224
        return self.local_var.txn_finished
1225 1226

    def setTransactionVoted(self):
1227
        self.local_var.txn_voted = True
1228 1229

    def isTransactionVoted(self):
1230
        return self.local_var.txn_voted
1231

1232 1233 1234 1235 1236 1237
    def pack(self, t):
        tid = repr(TimeStamp(*time.gmtime(t)[:5] + (t % 60, )))
        if tid == ZERO_TID:
            raise NEOStorageError('Invalid pack time')
        self._askPrimary(Packets.AskPack(tid))