app.py 39.5 KB
Newer Older
1
#
Aurel's avatar
Aurel committed
2
# Copyright (C) 2006-2009  Nexedi SA
3
#
Aurel's avatar
Aurel committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
Aurel's avatar
Aurel committed
9 10 11 12 13 14 15
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
Aurel's avatar
Aurel committed
17

18
from thread import get_ident
19
from cPickle import dumps
20
from zlib import compress, decompress
21
from Queue import Queue, Empty
22
from random import shuffle
Yoshinori Okuji's avatar
Yoshinori Okuji committed
23
from time import sleep
24

25 26
from ZODB.POSException import UndoError, StorageTransactionError, ConflictError

27 28 29 30
from neo import setupLog
setupLog('CLIENT', verbose=True)

from neo import logging
31
from neo import protocol
32
from neo.protocol import NodeTypes, Packets
33
from neo.event import EventManager
34
from neo.util import makeChecksum, dump
35 36
from neo.locking import RLock, Lock
from neo.connection import MTClientConnection
37
from neo.node import NodeManager
38
from neo.connector import getConnectorHandler
39 40 41 42
from neo.client.exception import NEOStorageError, NEOStorageConflictError
from neo.client.exception import NEOStorageNotFoundError
from neo.exception import NeoException
from neo.client.handlers import storage, master
43
from neo.dispatcher import Dispatcher
44
from neo.client.poll import ThreadedPoll
45 46
from neo.client.iterator import Iterator
from neo.client.mq import MQ
47
from neo.util import u64, parseMasterList
48

Aurel's avatar
Aurel committed
49

50
class ConnectionClosed(Exception):
51
    pass
52

53

54 55
class ConnectionPool(object):
    """This class manages a pool of connections to storage nodes."""
56

57
    def __init__(self, app, max_pool_size = 25):
58
        self.app = app
59
        self.max_pool_size = max_pool_size
60
        self.connection_dict = {}
61 62 63
        # Define a lock in order to create one connection to
        # a storage node at a time to avoid multiple connections
        # to the same node.
Yoshinori Okuji's avatar
Yoshinori Okuji committed
64
        l = RLock()
65 66
        self.connection_lock_acquire = l.acquire
        self.connection_lock_release = l.release
Aurel's avatar
Aurel committed
67

68
    def _initNodeConnection(self, node):
Aurel's avatar
Aurel committed
69
        """Init a connection to a given storage node."""
70
        addr = node.getAddress()
71 72
        if addr is None:
            return None
73

Yoshinori Okuji's avatar
Yoshinori Okuji committed
74 75 76
        app = self.app

        # Loop until a connection is obtained.
77
        while True:
78
            logging.debug('trying to connect to %s - %s', node, node.getState())
79
            app.setNodeReady()
80 81 82
            conn = MTClientConnection(app.em, app.storage_event_handler, addr,
                                      connector_handler=app.connector_handler,
                                      dispatcher=app.dispatcher)
83 84 85 86
            conn.lock()
            try:
                if conn.getConnector() is None:
                    # This happens, if a connection could not be established.
87
                    logging.error('Connection to storage node %s failed', node)
88
                    return None
Yoshinori Okuji's avatar
Yoshinori Okuji committed
89

90
                p = Packets.RequestIdentification(NodeTypes.CLIENT,
91
                            app.uuid, None, app.name)
92
                msg_id = conn.ask(app.local_var.queue, p)
93 94
            finally:
                conn.unlock()
95

Yoshinori Okuji's avatar
Yoshinori Okuji committed
96
            try:
97
                app._waitMessage(conn, msg_id,
98
                        handler=app.storage_bootstrap_handler)
99
            except ConnectionClosed:
100
                logging.error('Connection to storage node %s failed', node)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
101 102
                return None

103 104 105 106
            if app.isNodeReady():
                logging.info('connected to storage node %s', node)
                return conn
            else:
107
                logging.info('Storage node %s not ready', node)
Aurel's avatar
Aurel committed
108
                return None
Yoshinori Okuji's avatar
Yoshinori Okuji committed
109

110 111
    def _dropConnections(self):
        """Drop connections."""
Aurel's avatar
Aurel committed
112 113
        for node_uuid, conn in self.connection_dict.items():
            # Drop first connection which looks not used
Aurel's avatar
Aurel committed
114 115
            conn.lock()
            try:
116
                if not conn.pending() and \
117
                        not self.app.dispatcher.registered(conn):
118
                    del self.connection_dict[conn.getUUID()]
Aurel's avatar
Aurel committed
119
                    conn.close()
120 121
                    logging.debug('_dropConnections : connection to storage ' \
                            'node %s:%d closed', *(conn.getAddress()))
122 123
                    if len(self.connection_dict) <= self.max_pool_size:
                        break
Aurel's avatar
Aurel committed
124 125
            finally:
                conn.unlock()
126 127 128

    def _createNodeConnection(self, node):
        """Create a connection to a given storage node."""
129
        if len(self.connection_dict) > self.max_pool_size:
Aurel's avatar
Aurel committed
130
            # must drop some unused connections
131
            self._dropConnections()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
132 133 134 135 136 137 138

        self.connection_lock_release()
        try:
            conn = self._initNodeConnection(node)
        finally:
            self.connection_lock_acquire()

Aurel's avatar
Aurel committed
139 140
        if conn is None:
            return None
Yoshinori Okuji's avatar
Yoshinori Okuji committed
141

Aurel's avatar
Aurel committed
142 143 144 145
        self.connection_dict[node.getUUID()] = conn
        conn.lock()
        return conn

146 147 148
    def getConnForCell(self, cell):
        return self.getConnForNode(cell.getNode())

Aurel's avatar
Aurel committed
149 150 151
    def getConnForNode(self, node):
        """Return a locked connection object to a given node
        If no connection exists, create a new one"""
152
        if not node.isRunning():
153
            return None
Yoshinori Okuji's avatar
Yoshinori Okuji committed
154
        uuid = node.getUUID()
155 156
        self.connection_lock_acquire()
        try:
157 158
            try:
                conn = self.connection_dict[uuid]
Aurel's avatar
Aurel committed
159 160 161
                # Already connected to node
                conn.lock()
                return conn
162
            except KeyError:
Aurel's avatar
Aurel committed
163 164
                # Create new connection to node
                return self._createNodeConnection(node)
165 166 167
        finally:
            self.connection_lock_release()

168 169
    def removeConnection(self, node):
        """Explicitly remove connection when a node is broken."""
Aurel's avatar
Aurel committed
170 171
        self.connection_lock_acquire()
        try:
172 173 174 175
            try:
                del self.connection_dict[node.getUUID()]
            except KeyError:
                pass
Aurel's avatar
Aurel committed
176 177
        finally:
            self.connection_lock_release()
178

179

180 181 182 183
class ThreadContext(object):

    _threads_dict = {}

184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200
    def __init__(self):
        self.tid = None
        self.txn = None
        self.txn_voted = False
        self.txn_finished = False
        self.txn_info = 0
        self.history = None
        self.data_dict = {}
        self.node_tids = {}
        self.node_ready = False
        self.conflict_serial = 0
        self.asked_object = 0
        self.object_stored_counter = 0
        self.voted_counter = 0
        self.object_stored = 0
        self.queue = Queue(5)

201
    def __getThreadData(self):
202
        thread_id = get_ident()
203
        try:
Grégory Wisniewski's avatar
Grégory Wisniewski committed
204
            result = self._threads_dict[thread_id]
205
        except KeyError:
206 207
            self.clear(thread_id)
            result = self._threads_dict[thread_id]
208 209 210 211 212 213 214 215
        return result

    def __getattr__(self, name):
        thread_data = self.__getThreadData()
        try:
            return thread_data[name]
        except KeyError:
            raise AttributeError, name
216

217 218 219 220
    def __setattr__(self, name, value):
        thread_data = self.__getThreadData()
        thread_data[name] = value

221 222 223 224
    def clear(self, thread_id=None):
        if thread_id is None:
            thread_id = get_ident()
        self._threads_dict[thread_id] = {
225 226 227 228 229 230
            'tid': None,
            'txn': None,
            'data_dict': {},
            'object_stored': 0,
            'txn_voted': False,
            'txn_finished': False,
231
            'queue': Queue(5),
232 233 234
        }


Aurel's avatar
Aurel committed
235
class Application(object):
236 237
    """The client node application."""

238
    def __init__(self, master_nodes, name, connector=None, **kw):
239
        # XXX: use a configuration entry
240 241 242
        em = EventManager()
        # Start polling thread
        self.poll_thread = ThreadedPoll(em)
243
        # Internal Attributes common to all thread
244
        self._db = None
Aurel's avatar
Aurel committed
245
        self.name = name
246
        self.em = em
247
        self.connector_handler = getConnectorHandler(connector)
248
        self.dispatcher = Dispatcher()
249
        self.nm = NodeManager()
250
        self.cp = ConnectionPool(self)
251
        self.pt = None
252
        self.master_conn = None
253
        self.primary_master_node = None
254
        self.trying_master_node = None
255 256 257 258

        # load master node list
        self.master_node_list = parseMasterList(master_nodes)
        logging.debug('master nodes are %s', self.master_node_list)
259 260
        for address in self.master_node_list:
            self.nm.createMaster(address=address)
261

262
        # no self-assigned UUID, primary master will supply us one
263
        self.uuid = None
264
        self.mq_cache = MQ()
265
        self.new_oid_list = []
266
        self.last_oid = '\0' * 8
267 268
        self.storage_event_handler = storage.StorageEventHandler(self, self.
                dispatcher)
269
        self.storage_bootstrap_handler = storage.StorageBootstrapHandler(self)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
270
        self.storage_handler = storage.StorageAnswersHandler(self)
271 272 273 274
        self.primary_handler = master.PrimaryAnswersHandler(self)
        self.primary_bootstrap_handler = master.PrimaryBootstrapHandler(self)
        self.notifications_handler = master.PrimaryNotificationsHandler(
                self, self.dispatcher)
275
        # Internal attribute distinct between thread
276
        self.local_var = ThreadContext()
277
        # Lock definition :
278
        # _load_lock is used to make loading and storing atomic
279
        lock = Lock()
280 281
        self._load_lock_acquire = lock.acquire
        self._load_lock_release = lock.release
282 283
        # _oid_lock is used in order to not call multiple oid
        # generation at the same time
284 285
        lock = Lock()
        self._oid_lock_acquire = lock.acquire
Aurel's avatar
Aurel committed
286
        self._oid_lock_release = lock.release
287
        lock = Lock()
288
        # _cache_lock is used for the client cache
289 290
        self._cache_lock_acquire = lock.acquire
        self._cache_lock_release = lock.release
291
        lock = Lock()
292 293
        # _connecting_to_master_node is used to prevent simultaneous master
        # node connection attemps
294 295
        self._connecting_to_master_node_acquire = lock.acquire
        self._connecting_to_master_node_release = lock.release
296 297 298 299
        # _nm ensure exclusive access to the node manager
        lock = Lock()
        self._nm_acquire = lock.acquire
        self._nm_release = lock.release
300

301
    def _waitMessage(self, target_conn = None, msg_id = None, handler=None):
302
        """Wait for a message returned by the dispatcher in queues."""
303
        local_queue = self.local_var.queue
304
        while 1:
305 306
            if msg_id is None:
                try:
307
                    conn, packet = local_queue.get_nowait()
308 309 310 311 312
                except Empty:
                    break
            else:
                conn, packet = local_queue.get()
            # check fake packet
Yoshinori Okuji's avatar
Yoshinori Okuji committed
313
            if packet is None:
314
                if conn.getUUID() == target_conn.getUUID():
315
                    raise ConnectionClosed
Yoshinori Okuji's avatar
Yoshinori Okuji committed
316 317
                else:
                    continue
318 319 320
            # Guess the handler to use based on the type of node on the
            # connection
            if handler is None:
321
                node = self.nm.getByAddress(conn.getAddress())
322 323 324 325
                if node is None:
                    raise ValueError, 'Expecting an answer from a node ' \
                        'which type is not known... Is this right ?'
                else:
326
                    if node.isStorage():
327
                        handler = self.storage_handler
328
                    elif node.isMaster():
329 330 331
                        handler = self.primary_handler
                    else:
                        raise ValueError, 'Unknown node type: %r' % (
332
                            node.__class__, )
333
            handler.dispatch(conn, packet)
334
            if target_conn is conn and msg_id == packet.getId():
335
                break
336

337 338 339
    def _askStorage(self, conn, packet, timeout=5, additional_timeout=30):
        """ Send a request to a storage node and process it's answer """
        try:
340 341
            msg_id = conn.ask(self.local_var.queue, packet, timeout,
                              additional_timeout)
342 343 344 345 346 347 348
        finally:
            # assume that the connection was already locked
            conn.unlock()
        self._waitMessage(conn, msg_id, self.storage_handler)

    def _askPrimary(self, packet, timeout=5, additional_timeout=30):
        """ Send a request to the primary master and process it's answer """
349
        conn = self._getMasterConnection()
350 351
        conn.lock()
        try:
352 353
            msg_id = conn.ask(self.local_var.queue, packet, timeout,
                              additional_timeout)
354 355 356 357
        finally:
            conn.unlock()
        self._waitMessage(conn, msg_id, self.primary_handler)

358 359
    def _getMasterConnection(self):
        """ Connect to the primary master node on demand """
360
        # acquire the lock to allow only one thread to connect to the primary
361
        self._connecting_to_master_node_acquire()
362
        try:
363
            if self.master_conn is None:
364
                self.new_oid_list = []
365
                self.master_conn = self._connectToPrimaryNode()
366 367 368 369
            return self.master_conn
        finally:
            self._connecting_to_master_node_release()

370
    def _getPartitionTable(self):
371
        """ Return the partition table manager, reconnect the PMN if needed """
372 373 374 375 376
        # this ensure the master connection is established and the partition
        # table is up to date.
        self._getMasterConnection()
        return self.pt

377 378
    def _getCellListForOID(self, oid, readable=False, writable=False):
        """ Return the cells available for the specified OID """
379
        pt = self._getPartitionTable()
380 381 382 383
        return pt.getCellListForOID(oid, readable, writable)

    def _getCellListForTID(self, tid, readable=False, writable=False):
        """ Return the cells available for the specified TID """
384
        pt = self._getPartitionTable()
385
        return pt.getCellListForTID(tid, readable, writable)
386

387
    def _connectToPrimaryNode(self):
388
        logging.debug('connecting to primary master...')
389 390 391 392 393 394 395 396 397 398
        ready = False
        nm = self.nm
        while not ready:
            # Get network connection to primary master
            index = 0
            connected = False
            while not connected:
                if self.primary_master_node is not None:
                    # If I know a primary master node, pinpoint it.
                    self.trying_master_node = self.primary_master_node
399
                    self.primary_master_node = None
400 401
                else:
                    # Otherwise, check one by one.
402
                    master_list = nm.getMasterList()
403 404 405
                    try:
                        self.trying_master_node = master_list[index]
                    except IndexError:
406
                        sleep(1)
407 408 409 410
                        index = 0
                        self.trying_master_node = master_list[0]
                    index += 1
                # Connect to master
411
                conn = MTClientConnection(self.em, self.notifications_handler,
412 413 414
                        addr=self.trying_master_node.getAddress(),
                        connector_handler=self.connector_handler,
                        dispatcher=self.dispatcher)
415 416
                # Query for primary master node
                conn.lock()
417
                try:
418
                    if conn.getConnector() is None:
419
                        # This happens if a connection could not be established.
420 421 422
                        logging.error('Connection to master node %s failed',
                                      self.trying_master_node)
                        continue
423
                    msg_id = conn.ask(self.local_var.queue,
424
                            Packets.AskPrimary())
425 426
                finally:
                    conn.unlock()
427
                try:
428
                    self._waitMessage(conn, msg_id,
429
                            handler=self.primary_bootstrap_handler)
430 431
                except ConnectionClosed:
                    continue
432
                # If we reached the primary master node, mark as connected
433 434
                connected = self.primary_master_node is not None and \
                        self.primary_master_node is self.trying_master_node
435

436
            logging.info('connected to a primary master node')
437
            # Identify to primary master and request initial data
438 439 440
            while conn.getUUID() is None:
                conn.lock()
                try:
441 442 443 444 445
                    if conn.getConnector() is None:
                        logging.error('Connection to master node %s lost',
                                      self.trying_master_node)
                        self.primary_master_node = None
                        break
446
                    p = Packets.RequestIdentification(NodeTypes.CLIENT,
447
                            self.uuid, None, self.name)
448
                    msg_id = conn.ask(self.local_var.queue, p)
449 450
                finally:
                    conn.unlock()
451
                try:
452
                    self._waitMessage(conn, msg_id,
453
                            handler=self.primary_bootstrap_handler)
454 455 456
                except ConnectionClosed:
                    self.primary_master_node = None
                    break
457
                if conn.getUUID() is None:
458
                    # Node identification was refused by master.
459
                    sleep(1)
460
            if self.uuid is not None:
461 462
                conn.lock()
                try:
463
                    msg_id = conn.ask(self.local_var.queue,
464
                                      Packets.AskNodeInformation())
465 466
                finally:
                    conn.unlock()
467
                self._waitMessage(conn, msg_id,
468
                        handler=self.primary_bootstrap_handler)
469 470
                conn.lock()
                try:
471
                    msg_id = conn.ask(self.local_var.queue,
472
                                      Packets.AskPartitionTable([]))
473 474
                finally:
                    conn.unlock()
475
                self._waitMessage(conn, msg_id,
476
                        handler=self.primary_bootstrap_handler)
477
            ready = self.uuid is not None and self.pt is not None \
478
                                 and self.pt.operational()
479
        logging.info("connected to primary master node %s" %
480
                self.primary_master_node)
481
        return conn
482

483 484 485
    def registerDB(self, db, limit):
        self._db = db

486 487 488
    def getDB(self):
        return self._db

489 490 491 492 493
    def new_oid(self):
        """Get a new OID."""
        self._oid_lock_acquire()
        try:
            if len(self.new_oid_list) == 0:
494 495 496 497
                # Get new oid list from master node
                # we manage a list of oid here to prevent
                # from asking too many time new oid one by one
                # from master node
498
                self._askPrimary(Packets.AskNewOIDs(100))
499 500
                if len(self.new_oid_list) <= 0:
                    raise NEOStorageError('new_oid failed')
501 502
            self.last_oid = self.new_oid_list.pop()
            return self.last_oid
503 504 505
        finally:
            self._oid_lock_release()

506 507 508
    def getStorageSize(self):
        # return the last OID used, this is innacurate
        return int(u64(self.last_oid))
509

Aurel's avatar
Aurel committed
510 511 512 513
    def getSerial(self, oid):
        # Try in cache first
        self._cache_lock_acquire()
        try:
514 515
            if oid in self.mq_cache:
                return self.mq_cache[oid][0]
Aurel's avatar
Aurel committed
516 517 518
        finally:
            self._cache_lock_release()
        # history return serial, so use it
519
        hist = self.history(oid, length = 1, object_only = 1)
Aurel's avatar
Aurel committed
520 521
        if len(hist) == 0:
            raise NEOStorageNotFoundError()
522 523 524 525
        if hist[0] != oid:
            raise NEOStorageError('getSerial failed')
        return hist[1][0][0]

Aurel's avatar
Aurel committed
526

527
    def _load(self, oid, serial=None, tid=None, cache=0):
Aurel's avatar
Aurel committed
528
        """Internal method which manage load ,loadSerial and loadBefore."""
529
        cell_list = self._getCellListForOID(oid, readable=True)
530 531
        if len(cell_list) == 0:
            # No cells available, so why are we running ?
532 533
            logging.error('oid %s not found because no storage is ' \
                    'available for it', dump(oid))
534 535 536 537 538 539 540
            raise NEOStorageNotFoundError()

        shuffle(cell_list)
        self.local_var.asked_object = 0
        for cell in cell_list:
            logging.debug('trying to load %s from %s',
                          dump(oid), dump(cell.getUUID()))
541
            conn = self.cp.getConnForCell(cell)
542 543
            if conn is None:
                continue
544

545
            try:
546
                self._askStorage(conn, Packets.AskObject(oid, serial, tid))
547
            except ConnectionClosed:
548
                continue
549

550 551 552 553 554 555 556 557 558 559
            if self.local_var.asked_object == -1:
                # OID not found
                break

            # Check data
            noid, start_serial, end_serial, compression, checksum, data \
                = self.local_var.asked_object
            if noid != oid:
                # Oops, try with next node
                logging.error('got wrong oid %s instead of %s from node %s',
560
                              noid, dump(oid), cell.getAddress())
561 562 563 564 565
                self.local_var.asked_object = -1
                continue
            elif checksum != makeChecksum(data):
                # Check checksum.
                logging.error('wrong checksum from node %s for oid %s',
566
                              cell.getAddress(), dump(oid))
567 568 569 570 571 572 573
                self.local_var.asked_object = -1
                continue
            else:
                # Everything looks alright.
                break

        if self.local_var.asked_object == 0:
574
            # We didn't got any object from all storage node because of
575
            # connection error
576
            logging.warning('oid %s not found because of connection failure',
577
                    dump(oid))
578
            raise NEOStorageNotFoundError()
579

Aurel's avatar
Aurel committed
580
        if self.local_var.asked_object == -1:
581
            # We didn't got any object from all storage node
582
            logging.info('oid %s not found', dump(oid))
583
            raise NEOStorageNotFoundError()
584

585
        # Uncompress data
Aurel's avatar
Aurel committed
586
        if compression:
587
            data = decompress(data)
588

Aurel's avatar
Aurel committed
589 590
        # Put in cache only when using load
        if cache:
Aurel's avatar
Aurel committed
591
            self._cache_lock_acquire()
Aurel's avatar
Aurel committed
592
            try:
593
                self.mq_cache[oid] = start_serial, data
Aurel's avatar
Aurel committed
594
            finally:
Aurel's avatar
Aurel committed
595
                self._cache_lock_release()
596 597
        if data == '':
            data = None
598
        return data, start_serial, end_serial
599

600

601 602 603
    def load(self, oid, version=None):
        """Load an object for a given oid."""
        # First try from cache
604
        self._load_lock_acquire()
605
        try:
606 607 608 609
            self._cache_lock_acquire()
            try:
                if oid in self.mq_cache:
                    logging.debug('load oid %s is cached', dump(oid))
610 611
                    serial, data = self.mq_cache[oid]
                    return data, serial
612 613 614 615
            finally:
                self._cache_lock_release()
            # Otherwise get it from storage node
            return self._load(oid, cache=1)[:2]
616
        finally:
617
            self._load_lock_release()
Aurel's avatar
Aurel committed
618

619

620
    def loadSerial(self, oid, serial):
Aurel's avatar
Aurel committed
621
        """Load an object for a given oid and serial."""
622 623
        # Do not try in cache as it manages only up-to-date object
        logging.debug('loading %s at %s', dump(oid), dump(serial))
624
        return self._load(oid, serial=serial)[0]
Aurel's avatar
Aurel committed
625

626

627
    def loadBefore(self, oid, tid):
Aurel's avatar
Aurel committed
628
        """Load an object for a given oid before tid committed."""
629 630
        # Do not try in cache as it manages only up-to-date object
        logging.debug('loading %s before %s', dump(oid), dump(tid))
631
        data, start, end = self._load(oid, tid=tid)
632 633 634 635 636
        if end is None:
            # No previous version
            return None
        else:
            return data, start, end
Aurel's avatar
Aurel committed
637

638

639 640 641
    def tpc_begin(self, transaction, tid=None, status=' '):
        """Begin a new transaction."""
        # First get a transaction, only one is allowed at a time
642
        if self.local_var.txn is transaction:
643
            # We already begin the same transaction
644
            return
645 646 647 648
        # ask the primary master to start a transaction, if no tid is supplied,
        # the master will supply us one. Otherwise the requested tid will be
        # used if possible.
        self.local_var.tid = None
649
        self._askPrimary(Packets.AskBeginTransaction(tid))
650 651
        if self.local_var.tid is None:
            raise NEOStorageError('tpc_begin failed')
652
        self.local_var.txn = transaction
653

654

655 656
    def store(self, oid, serial, data, version, transaction):
        """Store object."""
657
        if transaction is not self.local_var.txn:
658
            raise StorageTransactionError(self, transaction)
Aurel's avatar
Aurel committed
659
        logging.debug('storing oid %s serial %s',
660
                     dump(oid), dump(serial))
661
        # Find which storage node to use
662
        cell_list = self._getCellListForOID(oid, writable=True)
663
        if len(cell_list) == 0:
Aurel's avatar
Aurel committed
664
            raise NEOStorageError
665 666 667
        if data is None:
            # this is a George Bailey object, stored as an empty string
            data = ''
668
        compressed_data = compress(data)
669
        checksum = makeChecksum(compressed_data)
670
        # Store data on each node
671
        self.local_var.object_stored_counter = 0
672
        for cell in cell_list:
673
            conn = self.cp.getConnForCell(cell)
674
            if conn is None:
675
                continue
676

677
            self.local_var.object_stored = 0
678
            p = Packets.AskStoreObject(oid, serial, 1,
679
                     checksum, compressed_data, self.local_var.tid)
680 681
            try:
                self._askStorage(conn, p)
682
            except ConnectionClosed:
683
                continue
684 685

            # Check we don't get any conflict
686 687
            if self.local_var.object_stored[0] == -1:
                if self.local_var.data_dict.has_key(oid):
Aurel's avatar
Aurel committed
688 689
                    # One storage already accept the object, is it normal ??
                    # remove from dict and raise ConflictError, don't care of
690 691
                    # previous node which already store data as it would be
                    # resent again if conflict is resolved or txn will be
692
                    # aborted
693
                    del self.local_var.data_dict[oid]
694
                self.local_var.conflict_serial = self.local_var.object_stored[1]
695
                raise NEOStorageConflictError
696
            # increase counter so that we know if a node has stored the object
697
            # or not
698
            self.local_var.object_stored_counter += 1
699

700 701 702
        if self.local_var.object_stored_counter == 0:
            # no storage nodes were available
            raise NEOStorageError('tpc_store failed')
703

704
        # Store object in tmp cache
705
        self.local_var.data_dict[oid] = data
706

707
        return self.local_var.tid
708

Aurel's avatar
Aurel committed
709

710 711
    def tpc_vote(self, transaction):
        """Store current transaction."""
712
        if transaction is not self.local_var.txn:
713
            raise StorageTransactionError(self, transaction)
714 715
        user = transaction.user
        desc = transaction.description
Aurel's avatar
Aurel committed
716
        ext = dumps(transaction._extension)
717
        oid_list = self.local_var.data_dict.keys()
718
        # Store data on each node
719
        self._getPartitionTable() # XXX: establish connection if needed
720
        cell_list = self._getCellListForTID(self.local_var.tid, writable=True)
721
        self.local_var.voted_counter = 0
722
        for cell in cell_list:
723
            logging.debug("voting object %s %s" %(cell.getAddress(),
724
                cell.getState()))
725
            conn = self.cp.getConnForCell(cell)
726 727
            if conn is None:
                continue
728

729
            self.local_var.txn_voted = False
730
            p = Packets.AskStoreTransaction(self.local_var.tid,
731
                    user, desc, ext, oid_list)
732 733
            try:
                self._askStorage(conn, p)
734
            except ConnectionClosed:
735
                continue
736

737
            if not self.isTransactionVoted():
738
                raise NEOStorageError('tpc_vote failed')
739 740 741 742 743
            self.local_var.voted_counter += 1

        # check at least one storage node accepted
        if self.local_var.voted_counter == 0:
            raise NEOStorageError('tpc_vote failed')
744

745 746
    def tpc_abort(self, transaction):
        """Abort current transaction."""
747
        if transaction is not self.local_var.txn:
748
            return
Aurel's avatar
Aurel committed
749

750
        cell_set = set()
751 752
        # select nodes where objects were stored
        for oid in self.local_var.data_dict.iterkeys():
753
            cell_set |= set(self._getCellListForOID(oid, writable=True))
754
        # select nodes where transaction was stored
755
        cell_set |= set(self._getCellListForTID(self.local_var.tid,
756
            writable=True))
Aurel's avatar
Aurel committed
757

758 759
        # cancel transaction one all those nodes
        for cell in cell_set:
760
            conn = self.cp.getConnForCell(cell)
761 762 763
            if conn is None:
                continue
            try:
764
                conn.notify(Packets.AbortTransaction(self.local_var.tid))
765 766
            finally:
                conn.unlock()
767

768
        # Abort the transaction in the primary master node.
769
        conn = self._getMasterConnection()
770 771
        conn.lock()
        try:
772
            conn.notify(Packets.AbortTransaction(self.local_var.tid))
773 774
        finally:
            conn.unlock()
775
        self.local_var.clear()
776

777 778
    def tpc_finish(self, transaction, f=None):
        """Finish current transaction."""
779
        if self.local_var.txn is not transaction:
780
            return
781
        self._load_lock_acquire()
782
        try:
783 784
            # Call function given by ZODB
            if f is not None:
785
                f(self.local_var.tid)
786 787

            # Call finish on master
788
            oid_list = self.local_var.data_dict.keys()
789
            p = Packets.FinishTransaction(oid_list, self.local_var.tid)
790
            self._askPrimary(p)
791

792
            if not self.isTransactionFinished():
793
                raise NEOStorageError('tpc_finish failed')
Aurel's avatar
Aurel committed
794

795 796 797
            # Update cache
            self._cache_lock_acquire()
            try:
798 799
                for oid in self.local_var.data_dict.iterkeys():
                    data = self.local_var.data_dict[oid]
800
                    # Now serial is same as tid
801
                    self.mq_cache[oid] = self.local_var.tid, data
802 803
            finally:
                self._cache_lock_release()
804
            self.local_var.clear()
805
            return self.local_var.tid
806
        finally:
807
            self._load_lock_release()
808

Aurel's avatar
Aurel committed
809
    def undo(self, transaction_id, txn, wrapper):
810
        if txn is not self.local_var.txn:
811
            raise StorageTransactionError(self, transaction_id)
812

813
        # First get transaction information from a storage node.
814
        cell_list = self._getCellListForTID(transaction_id, writable=True)
815 816
        shuffle(cell_list)
        for cell in cell_list:
817
            conn = self.cp.getConnForCell(cell)
818 819
            if conn is None:
                continue
820

821
            self.local_var.txn_info = 0
822
            try:
823 824
                self._askStorage(conn, Packets.AskTransactionInformation(
                    transaction_id))
825
            except ConnectionClosed:
826
                continue
827

828 829 830
            if self.local_var.txn_info == -1:
                # Tid not found, try with next node
                continue
831
            elif isinstance(self.local_var.txn_info, dict):
832 833 834 835
                break
            else:
                raise NEOStorageError('undo failed')

836
        if self.local_var.txn_info in (-1, 0):
837 838 839
            raise NEOStorageError('undo failed')

        oid_list = self.local_var.txn_info['oids']
Aurel's avatar
Aurel committed
840 841 842
        # Second get object data from storage node using loadBefore
        data_dict = {}
        for oid in oid_list:
843 844 845 846 847
            try:
                result = self.loadBefore(oid, transaction_id)
            except NEOStorageNotFoundError:
                # no previous revision, can't undo (as in filestorage)
                raise UndoError("no previous record", oid)
848
            data, start, end = result
Aurel's avatar
Aurel committed
849 850 851
            # end must be TID we are going to undone otherwise it means
            # a later transaction modify the object
            if end != transaction_id:
852
                raise UndoError("non-undoable transaction", oid)
Aurel's avatar
Aurel committed
853
            data_dict[oid] = data
Aurel's avatar
Aurel committed
854

855
        # Third do transaction with old data
856 857
        oid_list = data_dict.keys()
        for oid in oid_list:
Aurel's avatar
Aurel committed
858
            data = data_dict[oid]
Aurel's avatar
Aurel committed
859
            try:
860
                self.store(oid, transaction_id, data, None, txn)
Aurel's avatar
Aurel committed
861
            except NEOStorageConflictError, serial:
862
                if serial <= self.local_var.tid:
863
                    new_data = wrapper.tryToResolveConflict(oid,
864
                            self.local_var.tid, serial, data)
Aurel's avatar
Aurel committed
865
                    if new_data is not None:
866
                        self.store(oid, self.local_var.tid, new_data, None, txn)
Aurel's avatar
Aurel committed
867
                        continue
868
                raise ConflictError(oid = oid, serials = (self.local_var.tid,
869
                    serial),
870
                                    data = data)
871
        return self.local_var.tid, oid_list
872

873
    def __undoLog(self, first, last, filter=None, block=0, with_oids=False):
874 875 876 877
        if last < 0:
            # See FileStorage.py for explanation
            last = first - last

878
        # First get a list of transactions from all storage nodes.
879 880
        # Each storage node will return TIDs only for UP_TO_DATE state and
        # FEEDING state cells
881
        pt = self._getPartitionTable()
882
        storage_node_list = pt.getNodeList()
883

884 885
        self.local_var.node_tids = {}
        for storage_node in storage_node_list:
886
            conn = self.cp.getConnForNode(storage_node)
887 888
            if conn is None:
                continue
889

890
            try:
891
                conn.ask(self.local_var.queue, Packets.AskTIDs(first, last,
892
                    protocol.INVALID_PARTITION))
893 894 895 896
            finally:
                conn.unlock()

        # Wait for answers from all storages.
897
        while len(self.local_var.node_tids) != len(storage_node_list):
898 899
            try:
                self._waitMessage(handler=self.storage_handler)
900
            except ConnectionClosed:
901
                continue
902 903

        # Reorder tids
904 905 906
        ordered_tids = set()
        update = ordered_tids.update
        for tid_list in self.local_var.node_tids.itervalues():
907
            update(tid_list)
908
        ordered_tids = list(ordered_tids)
909 910
        # XXX do we need a special cmp function here ?
        ordered_tids.sort(reverse=True)
911
        logging.debug("UndoLog, tids %s", ordered_tids)
912 913
        # For each transaction, get info
        undo_info = []
914
        append = undo_info.append
915
        for tid in ordered_tids:
916
            cell_list = self._getCellListForTID(tid, readable=True)
917 918
            shuffle(cell_list)
            for cell in cell_list:
919
                conn = self.cp.getConnForCell(cell)
920 921 922
                if conn is not None:
                    self.local_var.txn_info = 0
                    try:
923
                        self._askStorage(conn,
924
                                Packets.AskTransactionInformation(tid))
925
                    except ConnectionClosed:
926 927 928
                        continue
                    if isinstance(self.local_var.txn_info, dict):
                        break
929

930
            if self.local_var.txn_info in (-1, 0):
Aurel's avatar
Aurel committed
931
                # TID not found at all
932 933 934
                raise NeoException, 'Data inconsistency detected: ' \
                                    'transaction info for TID %r could not ' \
                                    'be found' % (tid, )
Aurel's avatar
Aurel committed
935

936
            if filter is None or filter(self.local_var.txn_info):
937 938
                if not with_oids:
                    self.local_var.txn_info.pop("oids")
939 940 941
                append(self.local_var.txn_info)
                if len(undo_info) >= last - first:
                    break
942 943 944
        # Check we return at least one element, otherwise call
        # again but extend offset
        if len(undo_info) == 0 and not block:
945
            undo_info = self.undoLog(first=first, last=last*5, filter=filter,
946
                    block=1)
947 948
        return undo_info

949
    def undoLog(self, first, last, filter=None, block=0):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
950
        return self.__undoLog(first, last, filter, block)
951 952

    def transactionLog(self, first, last):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
953
        return self.__undoLog(first, last, with_oids=True)
954

955
    def history(self, oid, version=None, length=1, filter=None, object_only=0):
956
        # Get history informations for object first
957
        cell_list = self._getCellListForOID(oid, readable=True)
958 959 960
        shuffle(cell_list)

        for cell in cell_list:
961
            conn = self.cp.getConnForCell(cell)
962 963
            if conn is None:
                continue
964

965
            self.local_var.history = None
966
            try:
967
                self._askStorage(conn, Packets.AskObjectHistory(oid, 0, length))
968
            except ConnectionClosed:
969
                continue
970

971 972 973 974 975
            if self.local_var.history == -1:
                # Not found, go on with next node
                continue
            if self.local_var.history[0] != oid:
                # Got history for wrong oid
976 977 978
                raise NEOStorageError('inconsistency in storage: asked oid ' \
                                      '%r, got %r' % (
                                      oid, self.local_var.history[0]))
979

980
        if not isinstance(self.local_var.history, tuple):
981 982 983 984 985 986
            raise NEOStorageError('history failed')
        if object_only:
            # Use by getSerial
            return self.local_var.history

        # Now that we have object informations, get txn informations
Aurel's avatar
Aurel committed
987
        history_list = []
988
        for serial, size in self.local_var.history[1]:
989
            self._getCellListForTID(serial, readable=True)
990 991 992
            shuffle(cell_list)

            for cell in cell_list:
993
                conn = self.cp.getConnForCell(cell)
994 995
                if conn is None:
                    continue
996

997 998
                # ask transaction information
                self.local_var.txn_info = None
999
                try:
1000
                    self._askStorage(conn,
1001
                            Packets.AskTransactionInformation(serial))
1002
                except ConnectionClosed:
1003
                    continue
1004

1005 1006
                if self.local_var.txn_info == -1:
                    # TID not found
Aurel's avatar
Aurel committed
1007
                    continue
1008
                if isinstance(self.local_var.txn_info, dict):
1009 1010 1011
                    break

            # create history dict
1012 1013
            self.local_var.txn_info.pop('id')
            self.local_var.txn_info.pop('oids')
1014
            self.local_var.txn_info['tid'] = serial
1015 1016
            self.local_var.txn_info['version'] = None
            self.local_var.txn_info['size'] = size
1017 1018
            if filter is None or filter(self.local_var.txn_info):
                history_list.append(self.local_var.txn_info)
1019 1020

        return history_list
Aurel's avatar
Aurel committed
1021

1022 1023 1024
    def iterator(self, start=None, stop=None):
        return Iterator(self, start, stop)

1025 1026 1027 1028 1029
    def __del__(self):
        """Clear all connection."""
        # Due to bug in ZODB, close is not always called when shutting
        # down zope, so use __del__ to close connections
        for conn in self.em.getConnectionList():
1030
            conn.close()
1031 1032
        # Stop polling thread
        self.poll_thread.stop()
1033
    close = __del__
1034 1035

    def sync(self):
1036
        self._waitMessage()
1037

1038 1039 1040 1041 1042 1043 1044 1045 1046
    def setNodeReady(self):
        self.local_var.node_ready = True

    def setNodeNotReady(self):
        self.local_var.node_ready = False

    def isNodeReady(self):
        return self.local_var.node_ready

1047
    def setTID(self, value):
1048
        self.local_var.tid = value
1049 1050

    def getTID(self):
1051
        return self.local_var.tid
1052 1053

    def getConflictSerial(self):
1054
        return self.local_var.conflict_serial
1055 1056

    def setTransactionFinished(self):
1057
        self.local_var.txn_finished = True
1058 1059

    def isTransactionFinished(self):
1060
        return self.local_var.txn_finished
1061 1062

    def setTransactionVoted(self):
1063
        self.local_var.txn_voted = True
1064 1065

    def isTransactionVoted(self):
1066
        return self.local_var.txn_voted
1067