app.py 42.5 KB
Newer Older
1
#
Grégory Wisniewski's avatar
Grégory Wisniewski committed
2
# Copyright (C) 2006-2010  Nexedi SA
3
#
Aurel's avatar
Aurel committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
Aurel's avatar
Aurel committed
9 10 11 12 13 14 15
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
Aurel's avatar
Aurel committed
17

18
from thread import get_ident
19
from cPickle import dumps, loads
20
from zlib import compress as real_compress, decompress
21
from neo.locking import Queue, Empty
22
from random import shuffle
23
import traceback
24
import time
25

26
from ZODB.POSException import UndoError, StorageTransactionError, ConflictError
27
from ZODB.ConflictResolution import ResolvedSerial
28

29 30 31 32
from neo import setupLog
setupLog('CLIENT', verbose=True)

from neo import logging
33
from neo.protocol import NodeTypes, Packets, INVALID_PARTITION
34
from neo.event import EventManager
35
from neo.util import makeChecksum as real_makeChecksum, dump
36
from neo.locking import Lock
37
from neo.connection import MTClientConnection
38
from neo.node import NodeManager
39
from neo.connector import getConnectorHandler
40
from neo.client.exception import NEOStorageError
41
from neo.client.exception import NEOStorageNotFoundError, ConnectionClosed
42 43
from neo.exception import NeoException
from neo.client.handlers import storage, master
44
from neo.dispatcher import Dispatcher
45
from neo.client.poll import ThreadedPoll
46 47
from neo.client.iterator import Iterator
from neo.client.mq import MQ
48
from neo.client.pool import ConnectionPool
49
from neo.util import u64, parseMasterList
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
from neo.profiling import profiler_decorator, PROFILING_ENABLED

if PROFILING_ENABLED:
    # Those functions require a "real" python function wrapper before they can
    # be decorated.
    @profiler_decorator
    def compress(data):
        return real_compress(data)

    @profiler_decorator
    def makeChecksum(data):
        return real_makeChecksum(data)
else:
    # If profiling is disabled, directly use original functions.
    compress = real_compress
    makeChecksum = real_makeChecksum
Aurel's avatar
Aurel committed
66

67 68
class ThreadContext(object):

69
    def __init__(self):
70
        super(ThreadContext, self).__setattr__('_threads_dict', {})
71

72
    def __getThreadData(self):
73
        thread_id = get_ident()
74
        try:
Grégory Wisniewski's avatar
Grégory Wisniewski committed
75
            result = self._threads_dict[thread_id]
76
        except KeyError:
77 78
            self.clear(thread_id)
            result = self._threads_dict[thread_id]
79 80 81 82 83 84 85 86
        return result

    def __getattr__(self, name):
        thread_data = self.__getThreadData()
        try:
            return thread_data[name]
        except KeyError:
            raise AttributeError, name
87

88 89 90 91
    def __setattr__(self, name, value):
        thread_data = self.__getThreadData()
        thread_data[name] = value

92 93 94
    def clear(self, thread_id=None):
        if thread_id is None:
            thread_id = get_ident()
95 96 97 98 99
        thread_dict = self._threads_dict.get(thread_id)
        if thread_dict is None:
            queue = Queue(0)
        else:
            queue = thread_dict['queue']
100
        self._threads_dict[thread_id] = {
101 102 103
            'tid': None,
            'txn': None,
            'data_dict': {},
104 105 106
            'object_serial_dict': {},
            'object_stored_counter_dict': {},
            'conflict_serial_dict': {},
107
            'resolved_conflict_serial_dict': {},
108 109 110
            'object_stored': 0,
            'txn_voted': False,
            'txn_finished': False,
111
            'queue': queue,
112 113 114 115 116
            'txn_info': 0,
            'history': None,
            'node_tids': {},
            'node_ready': False,
            'asked_object': 0,
117 118
            'undo_conflict_oid_list': [],
            'undo_error_oid_list': [],
119 120 121
        }


Aurel's avatar
Aurel committed
122
class Application(object):
123 124
    """The client node application."""

125
    def __init__(self, master_nodes, name, connector=None, **kw):
126
        # Start polling thread
127 128
        self.em = EventManager()
        self.poll_thread = ThreadedPoll(self.em)
129
        # Internal Attributes common to all thread
130
        self._db = None
Aurel's avatar
Aurel committed
131
        self.name = name
132
        self.connector_handler = getConnectorHandler(connector)
133
        self.dispatcher = Dispatcher()
134
        self.nm = NodeManager()
135
        self.cp = ConnectionPool(self)
136
        self.pt = None
137
        self.master_conn = None
138
        self.primary_master_node = None
139
        self.trying_master_node = None
140 141

        # load master node list
142
        for address in parseMasterList(master_nodes):
143
            self.nm.createMaster(address=address)
144

145
        # no self-assigned UUID, primary master will supply us one
146
        self.uuid = None
147
        self.mq_cache = MQ()
148
        self.new_oid_list = []
149
        self.last_oid = '\0' * 8
150
        self.storage_event_handler = storage.StorageEventHandler(self)
151
        self.storage_bootstrap_handler = storage.StorageBootstrapHandler(self)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
152
        self.storage_handler = storage.StorageAnswersHandler(self)
153 154
        self.primary_handler = master.PrimaryAnswersHandler(self)
        self.primary_bootstrap_handler = master.PrimaryBootstrapHandler(self)
155
        self.notifications_handler = master.PrimaryNotificationsHandler( self)
156
        # Internal attribute distinct between thread
157
        self.local_var = ThreadContext()
158
        # Lock definition :
159
        # _load_lock is used to make loading and storing atomic
160
        lock = Lock()
161 162
        self._load_lock_acquire = lock.acquire
        self._load_lock_release = lock.release
163 164
        # _oid_lock is used in order to not call multiple oid
        # generation at the same time
165 166
        lock = Lock()
        self._oid_lock_acquire = lock.acquire
Aurel's avatar
Aurel committed
167
        self._oid_lock_release = lock.release
168
        lock = Lock()
169
        # _cache_lock is used for the client cache
170 171
        self._cache_lock_acquire = lock.acquire
        self._cache_lock_release = lock.release
172
        lock = Lock()
173 174
        # _connecting_to_master_node is used to prevent simultaneous master
        # node connection attemps
175 176
        self._connecting_to_master_node_acquire = lock.acquire
        self._connecting_to_master_node_release = lock.release
177 178 179 180
        # _nm ensure exclusive access to the node manager
        lock = Lock()
        self._nm_acquire = lock.acquire
        self._nm_release = lock.release
181

182
    @profiler_decorator
183 184 185 186 187 188 189 190 191 192 193
    def _handlePacket(self, conn, packet, handler=None):
        """
          conn
            The connection which received the packet (forwarded to handler).
          packet
            The packet to handle.
          handler
            The handler to use to handle packet.
            If not given, it will be guessed from connection's not type.
        """
        if handler is None:
194 195
            # Guess the handler to use based on the type of node on the
            # connection
196 197 198 199 200 201 202 203 204 205
            node = self.nm.getByAddress(conn.getAddress())
            if node is None:
                raise ValueError, 'Expecting an answer from a node ' \
                    'which type is not known... Is this right ?'
            if node.isStorage():
                handler = self.storage_handler
            elif node.isMaster():
                handler = self.primary_handler
            else:
                raise ValueError, 'Unknown node type: %r' % (node.__class__, )
206 207 208 209 210
        conn.lock()
        try:
            handler.dispatch(conn, packet)
        finally:
            conn.unlock()
211

212
    @profiler_decorator
213 214 215 216 217 218 219
    def _waitAnyMessage(self, block=True):
        """
          Handle all pending packets.
          block
            If True (default), will block until at least one packet was
            received.
        """
220 221 222
        pending = self.dispatcher.pending
        queue = self.local_var.queue
        get = queue.get
223
        _handlePacket = self._handlePacket
224
        while pending(queue):
225 226 227
            try:
                conn, packet = get(block)
            except Empty:
228
                break
229 230 231
            if packet is None:
                # connection was closed
                continue
232 233 234 235 236 237
            block = False
            try:
                _handlePacket(conn, packet)
            except ConnectionClosed:
                pass

238
    @profiler_decorator
239 240 241 242 243 244 245 246 247 248 249 250 251
    def _waitMessage(self, target_conn, msg_id, handler=None):
        """Wait for a message returned by the dispatcher in queues."""
        get = self.local_var.queue.get
        _handlePacket = self._handlePacket
        while True:
            conn, packet = get(True)
            if target_conn is conn:
                # check fake packet
                if packet is None:
                    raise ConnectionClosed
                if msg_id == packet.getId():
                    self._handlePacket(conn, packet, handler=handler)
                    break
252 253
            elif packet is not None:
                self._handlePacket(conn, packet)
254

255
    @profiler_decorator
256
    def _askStorage(self, conn, packet):
257
        """ Send a request to a storage node and process it's answer """
258
        msg_id = conn.ask(packet)
259 260
        self._waitMessage(conn, msg_id, self.storage_handler)

261
    @profiler_decorator
262
    def _askPrimary(self, packet):
263
        """ Send a request to the primary master and process it's answer """
264
        conn = self._getMasterConnection()
265
        msg_id = conn.ask(packet)
266 267
        self._waitMessage(conn, msg_id, self.primary_handler)

268
    @profiler_decorator
269 270
    def _getMasterConnection(self):
        """ Connect to the primary master node on demand """
271
        # acquire the lock to allow only one thread to connect to the primary
272 273 274 275
        result = self.master_conn
        if result is None:
            self._connecting_to_master_node_acquire()
            try:
276
                self.new_oid_list = []
277 278 279 280 281
                result = self._connectToPrimaryNode()
                self.master_conn = result
            finally:
                self._connecting_to_master_node_release()
        return result
282

283
    def _getPartitionTable(self):
284
        """ Return the partition table manager, reconnect the PMN if needed """
285 286 287 288 289
        # this ensure the master connection is established and the partition
        # table is up to date.
        self._getMasterConnection()
        return self.pt

290
    @profiler_decorator
291 292
    def _getCellListForOID(self, oid, readable=False, writable=False):
        """ Return the cells available for the specified OID """
293
        pt = self._getPartitionTable()
294 295 296 297
        return pt.getCellListForOID(oid, readable, writable)

    def _getCellListForTID(self, tid, readable=False, writable=False):
        """ Return the cells available for the specified TID """
298
        pt = self._getPartitionTable()
299
        return pt.getCellListForTID(tid, readable, writable)
300

301
    @profiler_decorator
302
    def _connectToPrimaryNode(self):
303
        logging.debug('connecting to primary master...')
304 305 306 307 308 309 310 311 312 313
        ready = False
        nm = self.nm
        while not ready:
            # Get network connection to primary master
            index = 0
            connected = False
            while not connected:
                if self.primary_master_node is not None:
                    # If I know a primary master node, pinpoint it.
                    self.trying_master_node = self.primary_master_node
314
                    self.primary_master_node = None
315 316
                else:
                    # Otherwise, check one by one.
317
                    master_list = nm.getMasterList()
318 319 320
                    try:
                        self.trying_master_node = master_list[index]
                    except IndexError:
321
                        time.sleep(1)
322 323 324 325
                        index = 0
                        self.trying_master_node = master_list[0]
                    index += 1
                # Connect to master
326 327
                conn = MTClientConnection(self.local_var, self.em,
                        self.notifications_handler,
328
                        addr=self.trying_master_node.getAddress(),
329
                        connector=self.connector_handler(),
330
                        dispatcher=self.dispatcher)
331
                # Query for primary master node
332 333 334 335 336 337
                if conn.getConnector() is None:
                    # This happens if a connection could not be established.
                    logging.error('Connection to master node %s failed',
                                  self.trying_master_node)
                    continue
                msg_id = conn.ask(Packets.AskPrimary())
338
                try:
339
                    self._waitMessage(conn, msg_id,
340
                            handler=self.primary_bootstrap_handler)
341 342
                except ConnectionClosed:
                    continue
343
                # If we reached the primary master node, mark as connected
344 345
                connected = self.primary_master_node is not None and \
                        self.primary_master_node is self.trying_master_node
346

347
            logging.info('connected to a primary master node')
348
            # Identify to primary master and request initial data
349
            while conn.getUUID() is None:
350 351 352 353 354 355 356 357
                if conn.getConnector() is None:
                    logging.error('Connection to master node %s lost',
                                  self.trying_master_node)
                    self.primary_master_node = None
                    break
                p = Packets.RequestIdentification(NodeTypes.CLIENT,
                        self.uuid, None, self.name)
                msg_id = conn.ask(p)
358
                try:
359
                    self._waitMessage(conn, msg_id,
360
                            handler=self.primary_bootstrap_handler)
361 362 363
                except ConnectionClosed:
                    self.primary_master_node = None
                    break
364
                if conn.getUUID() is None:
365
                    # Node identification was refused by master.
366
                    time.sleep(1)
367
            if self.uuid is not None:
368
                msg_id = conn.ask(Packets.AskNodeInformation())
369
                self._waitMessage(conn, msg_id,
370
                        handler=self.primary_bootstrap_handler)
371
                msg_id = conn.ask(Packets.AskPartitionTable([]))
372
                self._waitMessage(conn, msg_id,
373
                        handler=self.primary_bootstrap_handler)
374
            ready = self.uuid is not None and self.pt is not None \
375
                                 and self.pt.operational()
376
        logging.info("connected to primary master node %s" %
377
                self.primary_master_node)
378
        return conn
379

380 381 382
    def registerDB(self, db, limit):
        self._db = db

383 384 385
    def getDB(self):
        return self._db

386
    @profiler_decorator
387 388 389 390 391
    def new_oid(self):
        """Get a new OID."""
        self._oid_lock_acquire()
        try:
            if len(self.new_oid_list) == 0:
392 393 394 395
                # Get new oid list from master node
                # we manage a list of oid here to prevent
                # from asking too many time new oid one by one
                # from master node
396
                self._askPrimary(Packets.AskNewOIDs(100))
397 398
                if len(self.new_oid_list) <= 0:
                    raise NEOStorageError('new_oid failed')
399 400
            self.last_oid = self.new_oid_list.pop()
            return self.last_oid
401 402 403
        finally:
            self._oid_lock_release()

404 405 406
    def getStorageSize(self):
        # return the last OID used, this is innacurate
        return int(u64(self.last_oid))
407

408
    @profiler_decorator
Aurel's avatar
Aurel committed
409 410 411 412
    def getSerial(self, oid):
        # Try in cache first
        self._cache_lock_acquire()
        try:
413 414
            if oid in self.mq_cache:
                return self.mq_cache[oid][0]
Aurel's avatar
Aurel committed
415 416 417
        finally:
            self._cache_lock_release()
        # history return serial, so use it
Grégory Wisniewski's avatar
Grégory Wisniewski committed
418
        hist = self.history(oid, size=1, object_only=1)
Aurel's avatar
Aurel committed
419 420
        if len(hist) == 0:
            raise NEOStorageNotFoundError()
421 422 423 424
        if hist[0] != oid:
            raise NEOStorageError('getSerial failed')
        return hist[1][0][0]

Aurel's avatar
Aurel committed
425

426
    @profiler_decorator
427
    def _load(self, oid, serial=None, tid=None, cache=0):
Aurel's avatar
Aurel committed
428
        """Internal method which manage load ,loadSerial and loadBefore."""
429
        cell_list = self._getCellListForOID(oid, readable=True)
430 431
        if len(cell_list) == 0:
            # No cells available, so why are we running ?
432 433
            logging.error('oid %s not found because no storage is ' \
                    'available for it', dump(oid))
434 435 436 437 438 439 440
            raise NEOStorageNotFoundError()

        shuffle(cell_list)
        self.local_var.asked_object = 0
        for cell in cell_list:
            logging.debug('trying to load %s from %s',
                          dump(oid), dump(cell.getUUID()))
441
            conn = self.cp.getConnForCell(cell)
442 443
            if conn is None:
                continue
444

445
            try:
446
                self._askStorage(conn, Packets.AskObject(oid, serial, tid))
447
            except ConnectionClosed:
448
                continue
449

450 451 452 453 454 455 456 457 458 459
            if self.local_var.asked_object == -1:
                # OID not found
                break

            # Check data
            noid, start_serial, end_serial, compression, checksum, data \
                = self.local_var.asked_object
            if noid != oid:
                # Oops, try with next node
                logging.error('got wrong oid %s instead of %s from node %s',
460
                              noid, dump(oid), cell.getAddress())
461 462 463 464 465
                self.local_var.asked_object = -1
                continue
            elif checksum != makeChecksum(data):
                # Check checksum.
                logging.error('wrong checksum from node %s for oid %s',
466
                              cell.getAddress(), dump(oid))
467 468 469 470 471 472 473
                self.local_var.asked_object = -1
                continue
            else:
                # Everything looks alright.
                break

        if self.local_var.asked_object == 0:
474
            # We didn't got any object from all storage node because of
475
            # connection error
476
            logging.warning('oid %s not found because of connection failure',
477
                    dump(oid))
478
            raise NEOStorageNotFoundError()
479

Aurel's avatar
Aurel committed
480
        if self.local_var.asked_object == -1:
481
            # We didn't got any object from all storage node
482
            logging.info('oid %s not found', dump(oid))
483
            raise NEOStorageNotFoundError()
484

485
        # Uncompress data
Aurel's avatar
Aurel committed
486
        if compression:
487
            data = decompress(data)
488

Aurel's avatar
Aurel committed
489 490
        # Put in cache only when using load
        if cache:
Aurel's avatar
Aurel committed
491
            self._cache_lock_acquire()
Aurel's avatar
Aurel committed
492
            try:
493
                self.mq_cache[oid] = start_serial, data
Aurel's avatar
Aurel committed
494
            finally:
Aurel's avatar
Aurel committed
495
                self._cache_lock_release()
496 497
        if data == '':
            data = None
498
        return data, start_serial, end_serial
499

500

501
    @profiler_decorator
502 503 504
    def load(self, oid, version=None):
        """Load an object for a given oid."""
        # First try from cache
505
        self._load_lock_acquire()
506
        try:
507 508 509 510
            self._cache_lock_acquire()
            try:
                if oid in self.mq_cache:
                    logging.debug('load oid %s is cached', dump(oid))
511 512
                    serial, data = self.mq_cache[oid]
                    return data, serial
513 514 515 516
            finally:
                self._cache_lock_release()
            # Otherwise get it from storage node
            return self._load(oid, cache=1)[:2]
517
        finally:
518
            self._load_lock_release()
Aurel's avatar
Aurel committed
519

520

521
    @profiler_decorator
522
    def loadSerial(self, oid, serial):
Aurel's avatar
Aurel committed
523
        """Load an object for a given oid and serial."""
524 525
        # Do not try in cache as it manages only up-to-date object
        logging.debug('loading %s at %s', dump(oid), dump(serial))
526
        return self._load(oid, serial=serial)[0]
Aurel's avatar
Aurel committed
527

528

529
    @profiler_decorator
530
    def loadBefore(self, oid, tid):
Aurel's avatar
Aurel committed
531
        """Load an object for a given oid before tid committed."""
532 533
        # Do not try in cache as it manages only up-to-date object
        logging.debug('loading %s before %s', dump(oid), dump(tid))
534
        data, start, end = self._load(oid, tid=tid)
535 536 537 538 539
        if end is None:
            # No previous version
            return None
        else:
            return data, start, end
Aurel's avatar
Aurel committed
540

541

542
    @profiler_decorator
543 544 545
    def tpc_begin(self, transaction, tid=None, status=' '):
        """Begin a new transaction."""
        # First get a transaction, only one is allowed at a time
546
        if self.local_var.txn is transaction:
547
            # We already begin the same transaction
548
            return
549 550
        if self.local_var.txn is not None:
            raise NeoException, 'local_var is not clean in tpc_begin'
551 552 553 554
        # ask the primary master to start a transaction, if no tid is supplied,
        # the master will supply us one. Otherwise the requested tid will be
        # used if possible.
        self.local_var.tid = None
555
        self._askPrimary(Packets.AskBeginTransaction(tid))
556 557
        if self.local_var.tid is None:
            raise NEOStorageError('tpc_begin failed')
558
        self.local_var.txn = transaction
559

560

561
    @profiler_decorator
Vincent Pelletier's avatar
Vincent Pelletier committed
562
    def store(self, oid, serial, data, version, transaction):
563
        """Store object."""
564
        if transaction is not self.local_var.txn:
565
            raise StorageTransactionError(self, transaction)
Aurel's avatar
Aurel committed
566
        logging.debug('storing oid %s serial %s',
567
                     dump(oid), dump(serial))
568
        # Find which storage node to use
569
        cell_list = self._getCellListForOID(oid, writable=True)
570
        if len(cell_list) == 0:
Aurel's avatar
Aurel committed
571
            raise NEOStorageError
572 573 574
        if data is None:
            # this is a George Bailey object, stored as an empty string
            data = ''
575
        compressed_data = compress(data)
576
        if len(compressed_data) > len(data):
577 578 579 580
            compressed_data = data
            compression = 0
        else:
            compression = 1
581
        checksum = makeChecksum(compressed_data)
582
        p = Packets.AskStoreObject(oid, serial, compression,
583
                 checksum, compressed_data, self.local_var.tid)
584 585
        # Store object in tmp cache
        self.local_var.data_dict[oid] = data
586
        # Store data on each node
587
        self.local_var.object_stored_counter_dict[oid] = {}
588
        self.local_var.object_serial_dict[oid] = (serial, version)
589
        getConnForCell = self.cp.getConnForCell
590
        for cell in cell_list:
591
            conn = getConnForCell(cell)
592
            if conn is None:
593
                continue
594
            try:
595
                conn.ask(p)
596
            except ConnectionClosed:
597
                continue
598

599 600
        self._waitAnyMessage(False)
        return None
601

602
    @profiler_decorator
603 604 605 606 607 608 609
    def _handleConflicts(self, tryToResolveConflict):
        result = []
        append = result.append
        local_var = self.local_var
        # Check for conflicts
        data_dict = local_var.data_dict
        object_serial_dict = local_var.object_serial_dict
610 611
        conflict_serial_dict = local_var.conflict_serial_dict
        resolved_conflict_serial_dict = local_var.resolved_conflict_serial_dict
612 613 614 615 616 617 618 619
        for oid, conflict_serial_set in conflict_serial_dict.items():
            resolved_serial_set = resolved_conflict_serial_dict.setdefault(
                oid, set())
            conflict_serial = max(conflict_serial_set)
            if resolved_serial_set and conflict_serial <= max(resolved_serial_set):
                # A later serial has already been resolved, skip.
                resolved_serial_set.update(conflict_serial_dict.pop(oid))
                continue
620 621 622 623 624 625 626 627
            serial, version = object_serial_dict[oid]
            data = data_dict[oid]
            tid = local_var.tid
            resolved = False
            if conflict_serial <= tid:
                new_data = tryToResolveConflict(oid, conflict_serial, serial,
                    data)
                if new_data is not None:
628 629
                    logging.info('Conflict resolution succeed for %r:%r with %r',
                        dump(oid), dump(serial), dump(conflict_serial))
630
                    # Mark this conflict as resolved
631
                    resolved_serial_set.update(conflict_serial_dict.pop(oid))
632 633
                    # Try to store again
                    self.store(oid, conflict_serial, new_data, version,
Vincent Pelletier's avatar
Vincent Pelletier committed
634
                        local_var.txn)
635 636
                    append(oid)
                    resolved = True
637 638 639 640 641 642
                else:
                    logging.info('Conflict resolution failed for %r:%r with %r',
                        dump(oid), dump(serial), dump(conflict_serial))
            else:
                logging.info('Conflict reported for %r:%r with later ' \
                    'transaction %r , cannot resolve conflict.')
643 644 645 646 647 648
            if not resolved:
                # XXX: Is it really required to remove from data_dict ?
                del data_dict[oid]
                raise ConflictError(oid=oid,
                    serials=(tid, serial), data=data)
        return result
649

650 651 652 653 654 655 656 657 658 659
    @profiler_decorator
    def waitResponses(self):
        """Wait for all requests to be answered (or their connection to be
        dected as closed)"""
        queue = self.local_var.queue
        pending = self.dispatcher.pending
        _waitAnyMessage = self._waitAnyMessage
        while pending(queue):
            _waitAnyMessage()

660
    @profiler_decorator
661 662 663 664 665 666 667 668 669
    def waitStoreResponses(self, tryToResolveConflict):
        result = []
        append = result.append
        resolved_oid_set = set()
        update = resolved_oid_set.update
        local_var = self.local_var
        tid = local_var.tid
        _handleConflicts = self._handleConflicts
        while True:
670
            self.waitResponses()
671 672 673 674 675 676 677
            conflicts = _handleConflicts(tryToResolveConflict)
            if conflicts:
                update(conflicts)
            else:
                # No more conflict resolutions to do, no more pending store
                # requests
                break
678

Vincent Pelletier's avatar
Vincent Pelletier committed
679
        # Check for never-stored objects, and update result for all others
680
        for oid, store_dict in \
Vincent Pelletier's avatar
Vincent Pelletier committed
681
            local_var.object_stored_counter_dict.iteritems():
682
            if not store_dict:
Vincent Pelletier's avatar
Vincent Pelletier committed
683 684 685 686 687
                raise NEOStorageError('tpc_store failed')
            elif oid in resolved_oid_set:
                append((oid, ResolvedSerial))
            else:
                append((oid, tid))
688
        return result
Aurel's avatar
Aurel committed
689

690
    @profiler_decorator
691
    def tpc_vote(self, transaction, tryToResolveConflict):
692
        """Store current transaction."""
693 694
        local_var = self.local_var
        if transaction is not local_var.txn:
695
            raise StorageTransactionError(self, transaction)
696 697 698

        result = self.waitStoreResponses(tryToResolveConflict)

699
        tid = local_var.tid
700
        # Store data on each node
701
        voted_counter = 0
702 703 704 705
        p = Packets.AskStoreTransaction(tid, transaction.user,
            transaction.description, dumps(transaction._extension),
            local_var.data_dict.keys())
        for cell in self._getCellListForTID(tid, writable=True):
706 707
            logging.debug("voting object %s %s", cell.getAddress(),
                cell.getState())
708
            conn = self.cp.getConnForCell(cell)
709 710
            if conn is None:
                continue
711

712
            local_var.txn_voted = False
713 714
            try:
                self._askStorage(conn, p)
715
            except ConnectionClosed:
716
                continue
717

718
            if not self.isTransactionVoted():
719
                raise NEOStorageError('tpc_vote failed')
720
            voted_counter += 1
721 722

        # check at least one storage node accepted
723
        if voted_counter == 0:
724
            raise NEOStorageError('tpc_vote failed')
725 726 727 728 729
        # Check if master connection is still alive.
        # This is just here to lower the probability of detecting a problem
        # in tpc_finish, as we should do our best to detect problem before
        # tpc_finish.
        self._getMasterConnection()
730

731 732
        return result

733
    @profiler_decorator
734 735
    def tpc_abort(self, transaction):
        """Abort current transaction."""
736
        if transaction is not self.local_var.txn:
737
            return
Aurel's avatar
Aurel committed
738

739
        tid = self.local_var.tid
740 741 742
        # select nodes where transaction was stored
        cell_set = set(self._getCellListForTID(tid,
            writable=True))
743 744
        # select nodes where objects were stored
        for oid in self.local_var.data_dict.iterkeys():
745
            cell_set |= set(self._getCellListForOID(oid, writable=True))
Aurel's avatar
Aurel committed
746

747
        p = Packets.AbortTransaction(tid)
748 749
        # cancel transaction one all those nodes
        for cell in cell_set:
750
            conn = self.cp.getConnForCell(cell)
751 752
            if conn is None:
                continue
753
            conn.notify(p)
754

755
        # Abort the transaction in the primary master node.
756
        conn = self._getMasterConnection()
757
        conn.notify(p)
758 759 760 761 762 763 764 765 766 767 768 769 770 771

        # Just wait for responses to arrive. If any leads to an exception,
        # log it and continue: we *must* eat all answers to not disturb the
        # next transaction.
        queue = self.local_var.queue
        pending = self.dispatcher.pending
        _waitAnyMessage = self._waitAnyMessage
        while pending(queue):
            try:
                _waitAnyMessage()
            except:
                logging.error('Exception in tpc_abort: %s',
                    traceback.format_exc())

772
        self.local_var.clear()
773

774
    @profiler_decorator
775 776
    def tpc_finish(self, transaction, f=None):
        """Finish current transaction."""
777
        if self.local_var.txn is not transaction:
778
            return
779
        self._load_lock_acquire()
780
        try:
781
            tid = self.local_var.tid
782 783
            # Call function given by ZODB
            if f is not None:
784
                f(tid)
785 786

            # Call finish on master
787
            oid_list = self.local_var.data_dict.keys()
788
            p = Packets.AskFinishTransaction(tid, oid_list)
789
            self._askPrimary(p)
790

791
            if not self.isTransactionFinished():
792
                raise NEOStorageError('tpc_finish failed')
Aurel's avatar
Aurel committed
793

794 795 796
            # Update cache
            self._cache_lock_acquire()
            try:
797
                mq_cache = self.mq_cache
798
                for oid, data in self.local_var.data_dict.iteritems():
799
                    if data == '':
800 801
                        if oid in mq_cache:
                            del mq_cache[oid]
802 803
                    else:
                        # Now serial is same as tid
804
                        mq_cache[oid] = tid, data
805 806
            finally:
                self._cache_lock_release()
807
            self.local_var.clear()
808
            return tid
809
        finally:
810
            self._load_lock_release()
811

812
    def undo(self, undone_tid, txn, tryToResolveConflict):
813
        if txn is not self.local_var.txn:
814
            raise StorageTransactionError(self, undone_tid)
815

816
        # First get transaction information from a storage node.
817
        cell_list = self._getCellListForTID(undone_tid, readable=True)
818
        assert len(cell_list), 'No cell found for transaction %s' % (
819
            dump(undone_tid), )
820 821
        shuffle(cell_list)
        for cell in cell_list:
822
            conn = self.cp.getConnForCell(cell)
823 824
            if conn is None:
                continue
825

826
            self.local_var.txn_info = 0
827
            self.local_var.txn_ext = 0
828
            try:
829
                self._askStorage(conn, Packets.AskTransactionInformation(
830
                    undone_tid))
831
            except ConnectionClosed:
832
                continue
833

834 835
            if self.local_var.txn_info == -1:
                # Tid not found, try with next node
836
                logging.warning('Transaction %s was not found on node %s',
837
                    dump(undone_tid), self.nm.getByAddress(conn.getAddress()))
838
                continue
839
            elif isinstance(self.local_var.txn_info, dict):
840 841 842
                break
            else:
                raise NEOStorageError('undo failed')
843
        else:
844 845
            raise NEOStorageError('undo failed')

846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887
        if self.local_var.txn_info['packed']:
            UndoError('non-undoable transaction')

        tid = self.local_var.tid

        undo_conflict_oid_list = self.local_var.undo_conflict_oid_list = []
        undo_error_oid_list = self.local_var.undo_error_oid_list = []
        ask_undo_transaction = Packets.AskUndoTransaction(tid, undone_tid)
        getConnForNode = self.cp.getConnForNode
        for storage_node in self.nm.getStorageList():
            storage_conn = getConnForNode(storage_node)
            storage_conn.ask(ask_undo_transaction)
        # Wait for all AnswerUndoTransaction.
        self.waitResponses()

        # Don't do any handling for "live" conflicts, raise
        if undo_conflict_oid_list:
            raise ConflictError(oid=undo_conflict_oid_list[0], serials=(tid,
                undone_tid), data=None)

        # Try to resolve undo conflicts
        for oid in undo_error_oid_list:
            def loadBefore(oid, tid):
                try:
                    result = self._load(oid, tid=tid)
                except NEOStorageNotFoundError:
                    raise UndoError("Object not found while resolving undo " \
                        "conflict")
                return result[:2]
            # Load the latest version we are supposed to see
            data, data_tid = loadBefore(oid, tid)
            # Load the version we were undoing to
            undo_data, _ = loadBefore(oid, undone_tid)
            # Resolve conflict
            new_data = tryToResolveConflict(oid, data_tid, undone_tid, undo_data,
                data)
            if new_data is None:
                raise UndoError('Some data were modified by a later ' \
                    'transaction', oid)
            else:
                self.store(oid, data_tid, new_data, '', self.local_var.txn)

888
        oid_list = self.local_var.txn_info['oids']
889 890 891
        # Consistency checking: all oids of the transaction must have been
        # reported as undone
        data_dict = self.local_var.data_dict
Aurel's avatar
Aurel committed
892
        for oid in oid_list:
893
            assert oid in data_dict, repr(oid)
894
        return self.local_var.tid, oid_list
895

896 897 898 899
    def _insertMetadata(self, txn_info, extension):
        for k, v in loads(extension).items():
            txn_info[k] = v

900
    def __undoLog(self, first, last, filter=None, block=0, with_oids=False):
901 902 903 904
        if last < 0:
            # See FileStorage.py for explanation
            last = first - last

905
        # First get a list of transactions from all storage nodes.
906 907
        # Each storage node will return TIDs only for UP_TO_DATE state and
        # FEEDING state cells
908
        pt = self._getPartitionTable()
909
        storage_node_list = pt.getNodeList()
910

911 912
        self.local_var.node_tids = {}
        for storage_node in storage_node_list:
913
            conn = self.cp.getConnForNode(storage_node)
914 915
            if conn is None:
                continue
916
            conn.ask(Packets.AskTIDs(first, last, INVALID_PARTITION))
917 918

        # Wait for answers from all storages.
919
        while len(self.local_var.node_tids) != len(storage_node_list):
920
            self._waitAnyMessage()
921 922

        # Reorder tids
923 924 925
        ordered_tids = set()
        update = ordered_tids.update
        for tid_list in self.local_var.node_tids.itervalues():
926
            update(tid_list)
927
        ordered_tids = list(ordered_tids)
928
        ordered_tids.sort(reverse=True)
929
        logging.debug("UndoLog, tids %s", ordered_tids)
930 931
        # For each transaction, get info
        undo_info = []
932
        append = undo_info.append
933
        for tid in ordered_tids:
934
            cell_list = self._getCellListForTID(tid, readable=True)
935 936
            shuffle(cell_list)
            for cell in cell_list:
937
                conn = self.cp.getConnForCell(cell)
938 939
                if conn is not None:
                    self.local_var.txn_info = 0
940
                    self.local_var.txn_ext = 0
941
                    try:
942
                        self._askStorage(conn,
943
                                Packets.AskTransactionInformation(tid))
944
                    except ConnectionClosed:
945 946 947
                        continue
                    if isinstance(self.local_var.txn_info, dict):
                        break
948

949
            if self.local_var.txn_info in (-1, 0):
Aurel's avatar
Aurel committed
950
                # TID not found at all
951 952 953
                raise NeoException, 'Data inconsistency detected: ' \
                                    'transaction info for TID %r could not ' \
                                    'be found' % (tid, )
Aurel's avatar
Aurel committed
954

955
            if filter is None or filter(self.local_var.txn_info):
956
                self.local_var.txn_info.pop('packed')
957 958
                if not with_oids:
                    self.local_var.txn_info.pop("oids")
959
                append(self.local_var.txn_info)
960 961
                self._insertMetadata(self.local_var.txn_info,
                        self.local_var.txn_ext)
962 963
                if len(undo_info) >= last - first:
                    break
964 965 966
        # Check we return at least one element, otherwise call
        # again but extend offset
        if len(undo_info) == 0 and not block:
967 968
            undo_info = self.__undoLog(first=first, last=last*5, filter=filter,
                    block=1, with_oids=with_oids)
969 970
        return undo_info

971
    def undoLog(self, first, last, filter=None, block=0):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
972
        return self.__undoLog(first, last, filter, block)
973 974

    def transactionLog(self, first, last):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
975
        return self.__undoLog(first, last, with_oids=True)
976

Grégory Wisniewski's avatar
Grégory Wisniewski committed
977
    def history(self, oid, version=None, size=1, filter=None, object_only=0):
978
        # Get history informations for object first
979
        cell_list = self._getCellListForOID(oid, readable=True)
980 981 982
        shuffle(cell_list)

        for cell in cell_list:
983
            conn = self.cp.getConnForCell(cell)
984 985
            if conn is None:
                continue
986

987
            self.local_var.history = None
988
            try:
Grégory Wisniewski's avatar
Grégory Wisniewski committed
989
                self._askStorage(conn, Packets.AskObjectHistory(oid, 0, size))
990
            except ConnectionClosed:
991
                continue
992

993 994 995 996 997
            if self.local_var.history == -1:
                # Not found, go on with next node
                continue
            if self.local_var.history[0] != oid:
                # Got history for wrong oid
998 999 1000
                raise NEOStorageError('inconsistency in storage: asked oid ' \
                                      '%r, got %r' % (
                                      oid, self.local_var.history[0]))
1001

1002
        if not isinstance(self.local_var.history, tuple):
1003 1004
            raise NEOStorageError('history failed')

1005 1006
        if self.local_var.history[1] == [] or \
            self.local_var.history[1][0][1] == 0:
1007 1008 1009 1010
            # KeyError expected if no history was found
            # XXX: this may requires an error from the storages
            raise KeyError

1011 1012 1013 1014
        if object_only:
            # Use by getSerial
            return self.local_var.history

1015
        # Now that we have object informations, get txn informations
Aurel's avatar
Aurel committed
1016
        history_list = []
1017
        for serial, size in self.local_var.history[1]:
1018
            self._getCellListForTID(serial, readable=True)
1019 1020 1021
            shuffle(cell_list)

            for cell in cell_list:
1022
                conn = self.cp.getConnForCell(cell)
1023 1024
                if conn is None:
                    continue
1025

1026 1027
                # ask transaction information
                self.local_var.txn_info = None
1028
                try:
1029
                    self._askStorage(conn,
1030
                            Packets.AskTransactionInformation(serial))
1031
                except ConnectionClosed:
1032
                    continue
1033

1034 1035
                if self.local_var.txn_info == -1:
                    # TID not found
Aurel's avatar
Aurel committed
1036
                    continue
1037
                if isinstance(self.local_var.txn_info, dict):
1038 1039 1040
                    break

            # create history dict
1041 1042
            self.local_var.txn_info.pop('id')
            self.local_var.txn_info.pop('oids')
1043
            self.local_var.txn_info.pop('packed')
1044
            self.local_var.txn_info['tid'] = serial
1045
            self.local_var.txn_info['version'] = ''
1046
            self.local_var.txn_info['size'] = size
1047 1048
            if filter is None or filter(self.local_var.txn_info):
                history_list.append(self.local_var.txn_info)
1049 1050
            self._insertMetadata(self.local_var.txn_info,
                    self.local_var.txn_ext)
1051 1052

        return history_list
Aurel's avatar
Aurel committed
1053

1054
    @profiler_decorator
1055
    def importFrom(self, source, start, stop, tryToResolveConflict):
1056 1057 1058 1059 1060 1061 1062 1063 1064 1065
        serials = {}
        def updateLastSerial(oid, result):
            if result:
                if isinstance(result, str):
                    assert oid is not None
                    serials[oid] = result
                else:
                    for oid, serial in result:
                        assert isinstance(serial, str), serial
                        serials[oid] = serial
1066
        transaction_iter = source.iterator(start, stop)
1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078
        for transaction in transaction_iter:
            self.tpc_begin(transaction, transaction.tid, transaction.status)
            for r in transaction:
                pre = serials.get(r.oid, None)
                # TODO: bypass conflict resolution, locks...
                result = self.store(r.oid, pre, r.data, r.version, transaction)
                updateLastSerial(r.oid, result)
            updateLastSerial(None, self.tpc_vote(transaction,
                        tryToResolveConflict))
            self.tpc_finish(transaction)
        transaction_iter.close()

1079 1080 1081
    def iterator(self, start=None, stop=None):
        return Iterator(self, start, stop)

1082 1083 1084 1085 1086
    def lastTransaction(self):
        # XXX: this doesn't consider transactions created by other clients,
        #  should ask the primary master
        return self.local_var.tid

1087 1088 1089 1090 1091 1092 1093 1094 1095 1096
    def abortVersion(self, src, transaction):
        if transaction is not self.local_var.txn:
            raise StorageTransactionError(self, transaction)
        return '', []

    def commitVersion(self, src, dest, transaction):
        if transaction is not self.local_var.txn:
            raise StorageTransactionError(self, transaction)
        return '', []

Grégory Wisniewski's avatar
Grégory Wisniewski committed
1097 1098 1099 1100
    def loadEx(self, oid, version):
        data, serial = self.load(oid=oid)
        return data, serial, ''

1101 1102 1103 1104 1105
    def __del__(self):
        """Clear all connection."""
        # Due to bug in ZODB, close is not always called when shutting
        # down zope, so use __del__ to close connections
        for conn in self.em.getConnectionList():
1106
            conn.close()
1107 1108
        # Stop polling thread
        self.poll_thread.stop()
1109
    close = __del__
1110 1111

    def sync(self):
1112
        self._waitAnyMessage(False)
1113

1114 1115 1116 1117 1118 1119 1120 1121 1122
    def setNodeReady(self):
        self.local_var.node_ready = True

    def setNodeNotReady(self):
        self.local_var.node_ready = False

    def isNodeReady(self):
        return self.local_var.node_ready

1123
    def setTID(self, value):
1124
        self.local_var.tid = value
1125 1126

    def getTID(self):
1127
        return self.local_var.tid
1128 1129

    def setTransactionFinished(self):
1130
        self.local_var.txn_finished = True
1131 1132

    def isTransactionFinished(self):
1133
        return self.local_var.txn_finished
1134 1135

    def setTransactionVoted(self):
1136
        self.local_var.txn_voted = True
1137 1138

    def isTransactionVoted(self):
1139
        return self.local_var.txn_voted
1140