app.py 39.9 KB
Newer Older
Aurel's avatar
Aurel committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
# Copyright (C) 2006-2009  Nexedi SA
# 
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

17
import logging
18
from thread import get_ident
19
from cPickle import dumps
20
from zlib import compress, decompress
21
from Queue import Queue, Empty
22
from random import shuffle
Yoshinori Okuji's avatar
Yoshinori Okuji committed
23
from time import sleep
24 25

from neo.client.mq import MQ
26
from neo.node import NodeManager, MasterNode, StorageNode
27
from neo.connection import MTClientConnection
28
from neo import protocol
29
from neo.client.handlers import storage, master
30
from neo.client.exception import NEOStorageError, NEOStorageConflictError, \
31
     NEOStorageNotFoundError
32
from neo.exception import NeoException
33
from neo.util import makeChecksum, dump
34
from neo.connector import getConnectorHandler
35 36 37
from neo.client.dispatcher import Dispatcher
from neo.client.poll import ThreadedPoll
from neo.event import EventManager
38
from neo.locking import RLock, Lock
39

40
from ZODB.POSException import UndoError, StorageTransactionError, ConflictError
Aurel's avatar
Aurel committed
41

42 43
class ConnectionClosed(Exception): 
    pass
44

45 46
class ConnectionPool(object):
    """This class manages a pool of connections to storage nodes."""
47

48
    def __init__(self, app, max_pool_size = 25):
49
        self.app = app
50
        self.max_pool_size = max_pool_size
51
        self.connection_dict = {}
52 53 54
        # Define a lock in order to create one connection to
        # a storage node at a time to avoid multiple connections
        # to the same node.
Yoshinori Okuji's avatar
Yoshinori Okuji committed
55
        l = RLock()
56 57
        self.connection_lock_acquire = l.acquire
        self.connection_lock_release = l.release
Aurel's avatar
Aurel committed
58

59
    def _initNodeConnection(self, node):
Aurel's avatar
Aurel committed
60
        """Init a connection to a given storage node."""
61
        addr = node.getServer()
62 63
        if addr is None:
            return None
64

Yoshinori Okuji's avatar
Yoshinori Okuji committed
65 66 67
        app = self.app

        # Loop until a connection is obtained.
68
        while True:
Aurel's avatar
Aurel committed
69
            logging.info('trying to connect to %s - %s', node, node.getState())
70
            app.setNodeReady()
71 72 73
            conn = MTClientConnection(app.em, app.storage_event_handler, addr,
                                      connector_handler=app.connector_handler,
                                      dispatcher=app.dispatcher)
74 75 76 77
            conn.lock()
            try:
                if conn.getConnector() is None:
                    # This happens, if a connection could not be established.
78
                    logging.error('Connection to storage node %s failed', node)
79
                    return None
Yoshinori Okuji's avatar
Yoshinori Okuji committed
80

81
                p = protocol.requestNodeIdentification(protocol.CLIENT_NODE_TYPE,
82
                            app.uuid, None, app.name)
83
                msg_id = conn.ask(app.local_var.queue, p)
84 85
            finally:
                conn.unlock()
86

Yoshinori Okuji's avatar
Yoshinori Okuji committed
87
            try:
88 89
                app._waitMessage(conn, msg_id, handler=app.storage_bootstrap_handler)
            except ConnectionClosed:
90
                logging.error('Connection to storage node %s failed', node)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
91 92
                return None

93 94 95 96
            if app.isNodeReady():
                logging.info('connected to storage node %s', node)
                return conn
            else:
97
                logging.info('Storage node %s not ready', node)
Aurel's avatar
Aurel committed
98
                return None
Yoshinori Okuji's avatar
Yoshinori Okuji committed
99

100 101
    def _dropConnections(self):
        """Drop connections."""
Aurel's avatar
Aurel committed
102 103
        for node_uuid, conn in self.connection_dict.items():
            # Drop first connection which looks not used
Aurel's avatar
Aurel committed
104 105
            conn.lock()
            try:
106
                if not conn.pending() and \
107
                        not self.app.dispatcher.registered(conn):
108
                    del self.connection_dict[conn.getUUID()]
Aurel's avatar
Aurel committed
109
                    conn.close()
110
                    logging.debug('_dropConnections : connection to storage node %s:%d closed', 
111 112 113
                                 *(conn.getAddress()))
                    if len(self.connection_dict) <= self.max_pool_size:
                        break
Aurel's avatar
Aurel committed
114 115
            finally:
                conn.unlock()
116 117 118

    def _createNodeConnection(self, node):
        """Create a connection to a given storage node."""
119
        if len(self.connection_dict) > self.max_pool_size:
Aurel's avatar
Aurel committed
120
            # must drop some unused connections
121
            self._dropConnections()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
122 123 124 125 126 127 128

        self.connection_lock_release()
        try:
            conn = self._initNodeConnection(node)
        finally:
            self.connection_lock_acquire()

Aurel's avatar
Aurel committed
129 130
        if conn is None:
            return None
Yoshinori Okuji's avatar
Yoshinori Okuji committed
131

Aurel's avatar
Aurel committed
132 133 134 135 136 137 138 139
        # add node to node manager
        if self.app.nm.getNodeByServer(node.getServer()) is None:
            n = StorageNode(node.getServer())
            self.app.nm.add(n)
        self.connection_dict[node.getUUID()] = conn
        conn.lock()
        return conn

140 141 142
    def getConnForCell(self, cell):
        return self.getConnForNode(cell.getNode())

Aurel's avatar
Aurel committed
143 144 145
    def getConnForNode(self, node):
        """Return a locked connection object to a given node
        If no connection exists, create a new one"""
146
        if node.getState() != protocol.RUNNING_STATE:
147
            return None
Yoshinori Okuji's avatar
Yoshinori Okuji committed
148
        uuid = node.getUUID()
149 150
        self.connection_lock_acquire()
        try:
151 152
            try:
                conn = self.connection_dict[uuid]
Aurel's avatar
Aurel committed
153 154 155
                # Already connected to node
                conn.lock()
                return conn
156
            except KeyError:
Aurel's avatar
Aurel committed
157 158
                # Create new connection to node
                return self._createNodeConnection(node)
159 160 161
        finally:
            self.connection_lock_release()

162 163
    def removeConnection(self, node):
        """Explicitly remove connection when a node is broken."""
Aurel's avatar
Aurel committed
164 165
        self.connection_lock_acquire()
        try:
166 167 168 169
            try:
                del self.connection_dict[node.getUUID()]
            except KeyError:
                pass
Aurel's avatar
Aurel committed
170 171
        finally:
            self.connection_lock_release()
172

173

174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
class ThreadContext(object):

    _threads_dict = {}

    def __getThreadData(self):
        id = get_ident()
        try:
            result = self._threads_dict[id]
        except KeyError:
            self.clear(id)
            result = self._threads_dict[id]
        return result

    def __getattr__(self, name):
        thread_data = self.__getThreadData()
        try:
            return thread_data[name]
        except KeyError:
            raise AttributeError, name
        
    def __setattr__(self, name, value):
        thread_data = self.__getThreadData()
        thread_data[name] = value

    def clear(self, id=None):
        if id is None:
            id = get_ident()
        self._threads_dict[id] = {
            'tid': None,
            'txn': None,
            'data_dict': {},
            'object_stored': 0,
            'txn_voted': False,
            'txn_finished': False,
208
            'queue': Queue(5),
209 210 211
        }


Aurel's avatar
Aurel committed
212
class Application(object):
213 214
    """The client node application."""

215
    def __init__(self, master_nodes, name, connector, **kw):
216 217 218 219
        # XXX: use a configuration entry
        from neo import buildFormatString
        format = buildFormatString('CLIENT')
        logging.basicConfig(level=logging.DEBUG, format=format)
220 221 222
        em = EventManager()
        # Start polling thread
        self.poll_thread = ThreadedPoll(em)
223
        # Internal Attributes common to all thread
Aurel's avatar
Aurel committed
224
        self.name = name
225
        self.em = em
226
        self.connector_handler = getConnectorHandler(connector)
227
        self.dispatcher = Dispatcher()
228
        self.nm = NodeManager()
229
        self.cp = ConnectionPool(self)
230
        self.pt = None
231
        self.master_conn = None
232
        self.primary_master_node = None
233 234
        self.trying_master_node = None
        # XXX: this code duplicates neo.config.ConfigurationManager.getMasterNodeList
235
        logging.debug('master node address are %s' % (master_nodes,))
236 237 238 239 240 241 242 243 244 245 246 247 248
        self.master_node_list = master_node_list = []
        for node in master_nodes.split():
            if not node:
                continue
            if ':' in node:
                ip_address, port = node.split(':')
                port = int(port)
            else:
                ip_address = node
                port = 10100 # XXX: default_master_port
            server = (ip_address, port)
            master_node_list.append(server)
            self.nm.add(MasterNode(server=server))
249
        # no self-assigned UUID, primary master will supply us one
250
        self.uuid = None
251
        self.mq_cache = MQ()
252
        self.new_oid_list = []
253
        self.ptid = None
254 255
        self.storage_event_handler = storage.StorageEventHandler(self, self.dispatcher)
        self.storage_bootstrap_handler = storage.StorageBootstrapHandler(self)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
256
        self.storage_handler = storage.StorageAnswersHandler(self)
257 258 259 260
        self.primary_handler = master.PrimaryAnswersHandler(self)
        self.primary_bootstrap_handler = master.PrimaryBootstrapHandler(self)
        self.notifications_handler = master.PrimaryNotificationsHandler(
                self, self.dispatcher)
261
        # Internal attribute distinct between thread
262
        self.local_var = ThreadContext()
263
        # Lock definition :
264
        # _load_lock is used to make loading and storing atomic
265
        lock = Lock()
266 267
        self._load_lock_acquire = lock.acquire
        self._load_lock_release = lock.release
268 269
        # _oid_lock is used in order to not call multiple oid
        # generation at the same time
270 271
        lock = Lock()
        self._oid_lock_acquire = lock.acquire
Aurel's avatar
Aurel committed
272
        self._oid_lock_release = lock.release
273
        lock = Lock()
274
        # _cache_lock is used for the client cache
275 276
        self._cache_lock_acquire = lock.acquire
        self._cache_lock_release = lock.release
277
        lock = Lock()
278 279
        # _connecting_to_master_node is used to prevent simultaneous master
        # node connection attemps
280 281
        self._connecting_to_master_node_acquire = lock.acquire
        self._connecting_to_master_node_release = lock.release
282 283 284 285
        # _nm ensure exclusive access to the node manager
        lock = Lock()
        self._nm_acquire = lock.acquire
        self._nm_release = lock.release
286

287
    def notifyDeadNode(self, conn):
288
        """ Notify a storage failure to the primary master """
289
        s_node = self.nm.getNodeByServer(conn.getAddress())
290
        if s_node is None or s_node.getNodeType() != protocol.STORAGE_NODE_TYPE:
291 292 293 294 295
            return
        s_uuid = s_node.getUUID()
        m_conn = self._getMasterConnection()
        m_conn.lock()
        try:
296
            node_list = [(protocol.STORAGE_NODE_TYPE, s_node.getServer(), s_uuid, s_node.getState())]
297 298 299
            m_conn.notify(protocol.notifyNodeInformation(node_list))
        finally:
            m_conn.unlock()
300

301
    def _waitMessage(self, target_conn = None, msg_id = None, handler=None):
302
        """Wait for a message returned by the dispatcher in queues."""
303
        local_queue = self.local_var.queue
304
        while 1:
305 306
            if msg_id is None:
                try:
307
                    conn, packet = local_queue.get_nowait()
308 309 310 311 312
                except Empty:
                    break
            else:
                conn, packet = local_queue.get()
            # check fake packet
Yoshinori Okuji's avatar
Yoshinori Okuji committed
313
            if packet is None:
314
                if conn.getUUID() == target_conn.getUUID():
315
                    raise ConnectionClosed
Yoshinori Okuji's avatar
Yoshinori Okuji committed
316 317
                else:
                    continue
318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333
            # Guess the handler to use based on the type of node on the
            # connection
            if handler is None:
                node = self.nm.getNodeByServer(conn.getAddress())
                if node is None:
                    raise ValueError, 'Expecting an answer from a node ' \
                        'which type is not known... Is this right ?'
                else:
                    node_type = node.getType()
                    if node_type == protocol.STORAGE_NODE_TYPE:
                        handler = self.storage_handler
                    elif node_type == protocol.MASTER_NODE_TYPE:
                        handler = self.primary_handler
                    else:
                        raise ValueError, 'Unknown node type: %r' % (
                            node_type, )
334
            handler.dispatch(conn, packet)
335
            if target_conn is conn and msg_id == packet.getId():
336
                break
337

338 339 340
    def _askStorage(self, conn, packet, timeout=5, additional_timeout=30):
        """ Send a request to a storage node and process it's answer """
        try:
341 342
            msg_id = conn.ask(self.local_var.queue, packet, timeout,
                              additional_timeout)
343 344 345 346 347 348 349
        finally:
            # assume that the connection was already locked
            conn.unlock()
        self._waitMessage(conn, msg_id, self.storage_handler)

    def _askPrimary(self, packet, timeout=5, additional_timeout=30):
        """ Send a request to the primary master and process it's answer """
350
        conn = self._getMasterConnection()
351 352
        conn.lock()
        try:
353 354
            msg_id = conn.ask(self.local_var.queue, packet, timeout,
                              additional_timeout)
355 356 357 358
        finally:
            conn.unlock()
        self._waitMessage(conn, msg_id, self.primary_handler)

359 360
    def _getMasterConnection(self):
        """ Connect to the primary master node on demand """
361
        # acquire the lock to allow only one thread to connect to the primary
362
        lock = self._connecting_to_master_node_acquire()
363
        try:
364
            if self.master_conn is None:
365
                self.new_oid_list = []
366 367 368 369 370 371 372
                self.master_conn = self._connectToPrimaryMasterNode()
            return self.master_conn
        finally:
            self._connecting_to_master_node_release()

    def _getPartitionTable(self):
        """ Return the partition table manager, reconnect the PMN if needed """
373 374 375 376 377
        # this ensure the master connection is established and the partition
        # table is up to date.
        self._getMasterConnection()
        return self.pt

378 379
    def _getCellListForOID(self, oid, readable=False, writable=False):
        """ Return the cells available for the specified OID """
380
        pt = self._getPartitionTable()
381 382 383 384 385 386
        return pt.getCellListForOID(oid, readable, writable)

    def _getCellListForTID(self, tid, readable=False, writable=False):
        """ Return the cells available for the specified TID """
        pt = self._getPartitionTable()
        return pt.getCellListForTID(tid, readable, writable)
387 388 389

    def _connectToPrimaryMasterNode(self):
        logging.debug('connecting to primary master...')
390 391 392 393 394 395 396 397 398 399
        ready = False
        nm = self.nm
        while not ready:
            # Get network connection to primary master
            index = 0
            connected = False
            while not connected:
                if self.primary_master_node is not None:
                    # If I know a primary master node, pinpoint it.
                    self.trying_master_node = self.primary_master_node
400
                    self.primary_master_node = None
401 402 403 404 405 406 407 408 409 410
                else:
                    # Otherwise, check one by one.
                    master_list = nm.getMasterNodeList()
                    try:
                        self.trying_master_node = master_list[index]
                    except IndexError:
                        index = 0
                        self.trying_master_node = master_list[0]
                    index += 1
                # Connect to master
411
                conn = MTClientConnection(self.em, self.notifications_handler,
412
                                          addr=self.trying_master_node.getServer(),
413 414
                                          connector_handler=self.connector_handler,
                                          dispatcher=self.dispatcher)
415 416
                # Query for primary master node
                conn.lock()
417
                try:
418 419 420 421 422
                    if conn.getConnector() is None:
                        # This happens, if a connection could not be established.
                        logging.error('Connection to master node %s failed',
                                      self.trying_master_node)
                        continue
423
                    msg_id = conn.ask(self.local_var.queue, protocol.askPrimaryMaster())
424 425
                finally:
                    conn.unlock()
426 427 428 429
                try:
                    self._waitMessage(conn, msg_id, handler=self.primary_bootstrap_handler)
                except ConnectionClosed:
                    continue
430 431 432 433
                # If we reached the primary master node, mark as connected
                connected = self.primary_master_node is not None \
                            and self.primary_master_node is self.trying_master_node

434
            logging.info('connected to a primary master node')
435
            # Identify to primary master and request initial data
436 437 438
            while conn.getUUID() is None:
                conn.lock()
                try:
439 440 441 442 443
                    if conn.getConnector() is None:
                        logging.error('Connection to master node %s lost',
                                      self.trying_master_node)
                        self.primary_master_node = None
                        break
444
                    p = protocol.requestNodeIdentification(protocol.CLIENT_NODE_TYPE,
445
                            self.uuid, None, self.name)
446
                    msg_id = conn.ask(self.local_var.queue, p)
447 448
                finally:
                    conn.unlock()
449 450 451 452 453
                try:
                    self._waitMessage(conn, msg_id, handler=self.primary_bootstrap_handler)
                except ConnectionClosed:
                    self.primary_master_node = None
                    break
454
                if conn.getUUID() is None:
455 456 457 458 459 460 461 462 463 464
                    # Node identification was refused by master.
                    # Sleep a bit an retry.
                    # XXX: This should be replaced by:
                    # - queuing requestNodeIdentification at master side
                    # - sending the acceptance from master when it becomes
                    #   ready
                    # Thus removing the need to:
                    # - needlessly bother the primary master every 5 seconds
                    #   (...per client)
                    # - have a sleep in the code (yuck !)
465
                    sleep(5)
466
            if self.uuid is not None:
467 468 469 470 471
                # TODO: pipeline those 2 requests
                # This is currently impossible because _waitMessage can only
                # wait on one message at a time
                conn.lock()
                try:
472 473
                    msg_id = conn.ask(self.local_var.queue,
                                      protocol.askPartitionTable([]))
474 475 476 477 478
                finally:
                    conn.unlock()
                self._waitMessage(conn, msg_id, handler=self.primary_bootstrap_handler)
                conn.lock()
                try:
479 480
                    msg_id = conn.ask(self.local_var.queue,
                                      protocol.askNodeInformation())
481 482 483
                finally:
                    conn.unlock()
                self._waitMessage(conn, msg_id, handler=self.primary_bootstrap_handler)
484
            ready = self.uuid is not None and self.pt is not None \
485
                                 and self.pt.operational()
486 487 488
        logging.info("connected to primary master node %s" % self.primary_master_node)
        return conn
        
489 490 491
    def registerDB(self, db, limit):
        self._db = db

492 493 494
    def getDB(self):
        return self._db

495 496 497 498 499
    def new_oid(self):
        """Get a new OID."""
        self._oid_lock_acquire()
        try:
            if len(self.new_oid_list) == 0:
500 501 502 503
                # Get new oid list from master node
                # we manage a list of oid here to prevent
                # from asking too many time new oid one by one
                # from master node
504
                self._askPrimary(protocol.askNewOIDs(100))
505 506
                if len(self.new_oid_list) <= 0:
                    raise NEOStorageError('new_oid failed')
Aurel's avatar
Aurel committed
507
            return self.new_oid_list.pop()
508 509 510
        finally:
            self._oid_lock_release()

511

Aurel's avatar
Aurel committed
512 513 514 515
    def getSerial(self, oid):
        # Try in cache first
        self._cache_lock_acquire()
        try:
516 517
            if oid in self.mq_cache:
                return self.mq_cache[oid][0]
Aurel's avatar
Aurel committed
518 519 520
        finally:
            self._cache_lock_release()
        # history return serial, so use it
521
        hist = self.history(oid, length = 1, object_only = 1)
Aurel's avatar
Aurel committed
522 523
        if len(hist) == 0:
            raise NEOStorageNotFoundError()
524 525 526 527
        if hist[0] != oid:
            raise NEOStorageError('getSerial failed')
        return hist[1][0][0]

Aurel's avatar
Aurel committed
528

529
    def _load(self, oid, serial=None, tid=None, cache=0):
Aurel's avatar
Aurel committed
530
        """Internal method which manage load ,loadSerial and loadBefore."""
531
        cell_list = self._getCellListForOID(oid, readable=True)
532 533 534 535 536 537 538 539 540 541
        if len(cell_list) == 0:
            # No cells available, so why are we running ?
            logging.error('oid %s not found because no storage is available for it', dump(oid))
            raise NEOStorageNotFoundError()

        shuffle(cell_list)
        self.local_var.asked_object = 0
        for cell in cell_list:
            logging.debug('trying to load %s from %s',
                          dump(oid), dump(cell.getUUID()))
542
            conn = self.cp.getConnForCell(cell)
543 544
            if conn is None:
                continue
545

546
            try:
547
                self._askStorage(conn, protocol.askObject(oid, serial, tid))
548
            except ConnectionClosed:
549
                continue
550

551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577
            if self.local_var.asked_object == -1:
                # OID not found
                break

            # Check data
            noid, start_serial, end_serial, compression, checksum, data \
                = self.local_var.asked_object
            if noid != oid:
                # Oops, try with next node
                logging.error('got wrong oid %s instead of %s from node %s',
                              noid, dump(oid), cell.getServer())
                self.local_var.asked_object = -1
                continue
            elif checksum != makeChecksum(data):
                # Check checksum.
                logging.error('wrong checksum from node %s for oid %s',
                              cell.getServer(), dump(oid))
                self.local_var.asked_object = -1
                continue
            else:
                # Everything looks alright.
                break

        if self.local_var.asked_object == 0:
            # We didn't got any object from all storage node because of connection error
            logging.warning('oid %s not found because of connection failure', dump(oid))
            raise NEOStorageNotFoundError()
578

Aurel's avatar
Aurel committed
579
        if self.local_var.asked_object == -1:
580
            # We didn't got any object from all storage node
581
            logging.info('oid %s not found', dump(oid))
582
            raise NEOStorageNotFoundError()
583

584
        # Uncompress data
Aurel's avatar
Aurel committed
585
        if compression:
586
            data = decompress(data)
587

Aurel's avatar
Aurel committed
588 589
        # Put in cache only when using load
        if cache:
Aurel's avatar
Aurel committed
590
            self._cache_lock_acquire()
Aurel's avatar
Aurel committed
591
            try:
592
                self.mq_cache[oid] = start_serial, data
Aurel's avatar
Aurel committed
593
            finally:
Aurel's avatar
Aurel committed
594
                self._cache_lock_release()
595
        return data, start_serial, end_serial
596

597

598 599 600
    def load(self, oid, version=None):
        """Load an object for a given oid."""
        # First try from cache
601
        self._load_lock_acquire()
602
        try:
603 604 605 606
            self._cache_lock_acquire()
            try:
                if oid in self.mq_cache:
                    logging.debug('load oid %s is cached', dump(oid))
607 608
                    serial, data = self.mq_cache[oid]
                    return data, serial
609 610 611 612
            finally:
                self._cache_lock_release()
            # Otherwise get it from storage node
            return self._load(oid, cache=1)[:2]
613
        finally:
614
            self._load_lock_release()
Aurel's avatar
Aurel committed
615

616

617
    def loadSerial(self, oid, serial):
Aurel's avatar
Aurel committed
618
        """Load an object for a given oid and serial."""
619 620
        # Do not try in cache as it manages only up-to-date object
        logging.debug('loading %s at %s', dump(oid), dump(serial))
621
        return self._load(oid, serial=serial)[0]
Aurel's avatar
Aurel committed
622

623

624
    def loadBefore(self, oid, tid):
Aurel's avatar
Aurel committed
625
        """Load an object for a given oid before tid committed."""
626 627
        # Do not try in cache as it manages only up-to-date object
        logging.debug('loading %s before %s', dump(oid), dump(tid))
628
        data, start, end = self._load(oid, tid=tid)
629 630 631 632 633
        if end is None:
            # No previous version
            return None
        else:
            return data, start, end
Aurel's avatar
Aurel committed
634

635

636 637 638
    def tpc_begin(self, transaction, tid=None, status=' '):
        """Begin a new transaction."""
        # First get a transaction, only one is allowed at a time
639
        if self.local_var.txn is transaction:
640
            # We already begin the same transaction
641 642 643
            return
        # Get a new transaction id if necessary
        if tid is None:
644
            self.local_var.tid = None
645
            self._askPrimary(protocol.askNewTID())
646
            if self.local_var.tid is None:
647
                raise NEOStorageError('tpc_begin failed')
648
        else:
649 650
            self.local_var.tid = tid
        self.local_var.txn = transaction            
651

652

653 654
    def store(self, oid, serial, data, version, transaction):
        """Store object."""
655
        if transaction is not self.local_var.txn:
656
            raise StorageTransactionError(self, transaction)
Aurel's avatar
Aurel committed
657
        logging.debug('storing oid %s serial %s',
658
                     dump(oid), dump(serial))
659
        # Find which storage node to use
660
        cell_list = self._getCellListForOID(oid, writable=True)
661
        if len(cell_list) == 0:
Aurel's avatar
Aurel committed
662 663
            # FIXME must wait for cluster to be ready
            raise NEOStorageError
664
        # Store data on each node
665
        compressed_data = compress(data)
666
        checksum = makeChecksum(compressed_data)
667
        self.local_var.object_stored_counter = 0
668
        for cell in cell_list:
669
            conn = self.cp.getConnForCell(cell)
670
            if conn is None:                
671
                continue
672

673 674 675
            self.local_var.object_stored = 0
            p = protocol.askStoreObject(oid, serial, 1,
                     checksum, compressed_data, self.local_var.tid)
676 677
            try:
                self._askStorage(conn, p)
678
            except ConnectionClosed:
679
                continue
680 681

            # Check we don't get any conflict
682 683
            if self.local_var.object_stored[0] == -1:
                if self.local_var.data_dict.has_key(oid):
Aurel's avatar
Aurel committed
684 685 686 687
                    # One storage already accept the object, is it normal ??
                    # remove from dict and raise ConflictError, don't care of
                    # previous node which already store data as it would be resent
                    # again if conflict is resolved or txn will be aborted
688 689
                    del self.local_var.data_dict[oid]
                self.conflict_serial = self.local_var.object_stored[1]
690
                raise NEOStorageConflictError
691 692
            # increase counter so that we know if a node has stored the object or not
            self.local_var.object_stored_counter += 1
693

694 695 696 697
        if self.local_var.object_stored_counter == 0:
            # no storage nodes were available
            raise NEOStorageError('tpc_store failed')
        
698
        # Store object in tmp cache
699
        self.local_var.data_dict[oid] = data
700

701
        return self.local_var.tid
702

Aurel's avatar
Aurel committed
703

704 705
    def tpc_vote(self, transaction):
        """Store current transaction."""
706
        if transaction is not self.local_var.txn:
707
            raise StorageTransactionError(self, transaction)
708 709
        user = transaction.user
        desc = transaction.description
Aurel's avatar
Aurel committed
710
        ext = dumps(transaction._extension)
711
        oid_list = self.local_var.data_dict.keys()
712
        # Store data on each node
713
        pt = self._getPartitionTable()
714
        cell_list = self._getCellListForTID(self.local_var.tid, writable=True)
715
        self.local_var.voted_counter = 0
716
        for cell in cell_list:
Aurel's avatar
Aurel committed
717
            logging.info("voting object %s %s" %(cell.getServer(), cell.getState()))
718
            conn = self.cp.getConnForCell(cell)
719 720
            if conn is None:
                continue
721

722 723 724
            self.local_var.txn_voted = False
            p = protocol.askStoreTransaction(self.local_var.tid, 
                    user, desc, ext, oid_list)
725 726
            try:
                self._askStorage(conn, p)
727
            except ConnectionClosed:
728
                continue
729

730
            if not self.isTransactionVoted():
731
                raise NEOStorageError('tpc_vote failed')
732 733 734 735 736
            self.local_var.voted_counter += 1

        # check at least one storage node accepted
        if self.local_var.voted_counter == 0:
            raise NEOStorageError('tpc_vote failed')
737

738 739
    def tpc_abort(self, transaction):
        """Abort current transaction."""
740
        if transaction is not self.local_var.txn:
741
            return
Aurel's avatar
Aurel committed
742

743
        cell_set = set()
744 745
        # select nodes where objects were stored
        for oid in self.local_var.data_dict.iterkeys():
746
            cell_set |= set(self._getCellListForOID(oid, writable=True))
747
        # select nodes where transaction was stored
748
        cell_set |= set(self._getCellListForTID(self.local_var.tid, writable=True))
Aurel's avatar
Aurel committed
749

750 751
        # cancel transaction one all those nodes
        for cell in cell_set:
752
            conn = self.cp.getConnForCell(cell)
753 754 755
            if conn is None:
                continue
            try:
756
                conn.notify(protocol.abortTransaction(self.local_var.tid))
757 758
            finally:
                conn.unlock()
759

760
        # Abort the transaction in the primary master node.
761
        conn = self._getMasterConnection()
762 763
        conn.lock()
        try:
764
            conn.notify(protocol.abortTransaction(self.local_var.tid))
765 766
        finally:
            conn.unlock()
767
        self.local_var.clear()
768

769 770
    def tpc_finish(self, transaction, f=None):
        """Finish current transaction."""
771
        if self.local_var.txn is not transaction:
772
            return
773
        self._load_lock_acquire()
774
        try:
775 776
            # Call function given by ZODB
            if f is not None:
777
                f(self.local_var.tid)
778 779

            # Call finish on master
780
            oid_list = self.local_var.data_dict.keys()
781 782
            p = protocol.finishTransaction(oid_list, self.local_var.tid)
            self._askPrimary(p)
783

784
            if not self.isTransactionFinished():
785
                raise NEOStorageError('tpc_finish failed')
Aurel's avatar
Aurel committed
786

787 788 789
            # Update cache
            self._cache_lock_acquire()
            try:
790 791
                for oid in self.local_var.data_dict.iterkeys():
                    data = self.local_var.data_dict[oid]
792
                    # Now serial is same as tid
793
                    self.mq_cache[oid] = self.local_var.tid, data
794 795
            finally:
                self._cache_lock_release()
796
            self.local_var.clear()
797
            return self.local_var.tid
798
        finally:
799
            self._load_lock_release()
800

Aurel's avatar
Aurel committed
801
    def undo(self, transaction_id, txn, wrapper):
802
        if txn is not self.local_var.txn:
803
            raise StorageTransactionError(self, transaction_id)
804

805
        # First get transaction information from a storage node.
806
        cell_list = self._getCellListForTID(transaction_id, writable=True)
807 808
        shuffle(cell_list)
        for cell in cell_list:
809
            conn = self.cp.getConnForCell(cell)
810 811
            if conn is None:
                continue
812

813
            self.local_var.txn_info = 0
814 815
            try:
                self._askStorage(conn, protocol.askTransactionInformation(transaction_id))
816
            except ConnectionClosed:
817
                continue
818

819 820 821
            if self.local_var.txn_info == -1:
                # Tid not found, try with next node
                continue
822
            elif isinstance(self.local_var.txn_info, dict):
823 824 825 826
                break
            else:
                raise NEOStorageError('undo failed')

827
        if self.local_var.txn_info in (-1, 0):
828 829 830
            raise NEOStorageError('undo failed')

        oid_list = self.local_var.txn_info['oids']
Aurel's avatar
Aurel committed
831 832 833
        # Second get object data from storage node using loadBefore
        data_dict = {}
        for oid in oid_list:
834 835 836 837 838
            try:
                result = self.loadBefore(oid, transaction_id)
            except NEOStorageNotFoundError:
                # no previous revision, can't undo (as in filestorage)
                raise UndoError("no previous record", oid)
839
            data, start, end = result
Aurel's avatar
Aurel committed
840 841 842
            # end must be TID we are going to undone otherwise it means
            # a later transaction modify the object
            if end != transaction_id:
843
                raise UndoError("non-undoable transaction", oid)
Aurel's avatar
Aurel committed
844
            data_dict[oid] = data
Aurel's avatar
Aurel committed
845

846
        # Third do transaction with old data
847 848
        oid_list = data_dict.keys()
        for oid in oid_list:
Aurel's avatar
Aurel committed
849
            data = data_dict[oid]
Aurel's avatar
Aurel committed
850
            try:
851
                self.store(oid, transaction_id, data, None, txn)
Aurel's avatar
Aurel committed
852
            except NEOStorageConflictError, serial:
853 854
                if serial <= self.local_var.tid:
                    new_data = wrapper.tryToResolveConflict(oid, self.local_var.tid,
855
                                                            serial, data)
Aurel's avatar
Aurel committed
856
                    if new_data is not None:
857
                        self.store(oid, self.local_var.tid, new_data, None, txn)
Aurel's avatar
Aurel committed
858
                        continue
859
                raise ConflictError(oid = oid, serials = (self.local_var.tid, serial),
860
                                    data = data)
861
        return self.local_var.tid, oid_list
862

863
    def undoLog(self, first, last, filter=None, block=0):
864 865 866 867
        if last < 0:
            # See FileStorage.py for explanation
            last = first - last

868
        # First get a list of transactions from all storage nodes.
869 870
        # Each storage node will return TIDs only for UP_TO_DATE_STATE and
        # FEEDING_STATE cells
871 872
        pt = self._getPartitionTable()
        storage_node_list = pt.getNodeList()
873

874 875
        self.local_var.node_tids = {}
        for storage_node in storage_node_list:
876
            conn = self.cp.getConnForNode(storage_node)
877 878
            if conn is None:
                continue
879

880
            try:
881
                conn.ask(self.local_var.queue,
882
                         protocol.askTIDs(first, last, protocol.INVALID_PARTITION))
883 884 885 886 887
            finally:
                conn.unlock()

        # Wait for answers from all storages.
        # FIXME this is a busy loop.
888
        while len(self.local_var.node_tids) != len(storage_node_list):
889 890
            try:
                self._waitMessage(handler=self.storage_handler)
891
            except ConnectionClosed:
892
                continue
893 894

        # Reorder tids
895 896 897
        ordered_tids = set()
        update = ordered_tids.update
        for tid_list in self.local_var.node_tids.itervalues():
898
            update(tid_list)
899
        ordered_tids = list(ordered_tids)
900 901
        # XXX do we need a special cmp function here ?
        ordered_tids.sort(reverse=True)
902
        logging.debug("UndoLog, tids %s", ordered_tids)
903 904
        # For each transaction, get info
        undo_info = []
905
        append = undo_info.append
906
        for tid in ordered_tids:
907
            cell_list = self._getCellListForTID(tid, readable=True)
908 909
            shuffle(cell_list)
            for cell in cell_list:
910
                conn = self.cp.getConnForCell(cell)
911 912 913 914
                if conn is not None:
                    self.local_var.txn_info = 0
                    try:
                        self._askStorage(conn, protocol.askTransactionInformation(tid))
915
                    except ConnectionClosed:
916 917 918
                        continue
                    if isinstance(self.local_var.txn_info, dict):
                        break
919

920
            if self.local_var.txn_info in (-1, 0):
Aurel's avatar
Aurel committed
921
                # TID not found at all
922 923 924
                raise NeoException, 'Data inconsistency detected: ' \
                                    'transaction info for TID %r could not ' \
                                    'be found' % (tid, )
Aurel's avatar
Aurel committed
925

926 927 928 929 930
            if filter is None or filter(self.local_var.txn_info):
                self.local_var.txn_info.pop("oids")
                append(self.local_var.txn_info)
                if len(undo_info) >= last - first:
                    break
931 932 933 934
        # Check we return at least one element, otherwise call
        # again but extend offset
        if len(undo_info) == 0 and not block:
            undo_info = self.undoLog(first=first, last=last*5, filter=filter, block=1)
935 936
        return undo_info

937
    # FIXME: filter function isn't used 
938
    def history(self, oid, version=None, length=1, filter=None, object_only=0):
939
        # Get history informations for object first
940
        cell_list = self._getCellListForOID(oid, readable=True)
941 942 943
        shuffle(cell_list)

        for cell in cell_list:
944
            conn = self.cp.getConnForCell(cell)
945 946
            if conn is None:
                continue
947

948
            self.local_var.history = None
949 950
            try:
                self._askStorage(conn, protocol.askObjectHistory(oid, 0, length))
951
            except ConnectionClosed:
952
                continue
953

954 955 956 957 958
            if self.local_var.history == -1:
                # Not found, go on with next node
                continue
            if self.local_var.history[0] != oid:
                # Got history for wrong oid
959 960 961
                raise NEOStorageError('inconsistency in storage: asked oid ' \
                                      '%r, got %r' % (
                                      oid, self.local_var.history[0]))
962

963
        if not isinstance(self.local_var.history, tuple):
964 965 966 967 968 969
            raise NEOStorageError('history failed')
        if object_only:
            # Use by getSerial
            return self.local_var.history

        # Now that we have object informations, get txn informations
Aurel's avatar
Aurel committed
970
        history_list = []
971
        for serial, size in self.local_var.history[1]:
972
            self._getCellListForTID(serial, readable=True)
973 974 975
            shuffle(cell_list)

            for cell in cell_list:
976
                conn = self.cp.getConnForCell(cell)
977 978
                if conn is None:
                    continue
979

980 981
                # ask transaction information
                self.local_var.txn_info = None
982 983
                try:
                    self._askStorage(conn, protocol.askTransactionInformation(serial))
984
                except ConnectionClosed:
985
                    continue
986

987 988
                if self.local_var.txn_info == -1:
                    # TID not found
Aurel's avatar
Aurel committed
989
                    continue
990
                if isinstance(self.local_var.txn_info, dict):
991 992 993
                    break

            # create history dict
994 995
            self.local_var.txn_info.pop('id')
            self.local_var.txn_info.pop('oids')
996
            self.local_var.txn_info['tid'] = serial
997 998 999
            self.local_var.txn_info['version'] = None
            self.local_var.txn_info['size'] = size
            history_list.append(self.local_var.txn_info)
1000 1001

        return history_list
Aurel's avatar
Aurel committed
1002

1003 1004 1005 1006 1007
    def __del__(self):
        """Clear all connection."""
        # Due to bug in ZODB, close is not always called when shutting
        # down zope, so use __del__ to close connections
        for conn in self.em.getConnectionList():
1008 1009 1010 1011 1012
            conn.lock()
            try:
                conn.close()
            finally:
                conn.release()
1013 1014
        # Stop polling thread
        self.poll_thread.stop()
1015
    close = __del__
1016 1017

    def sync(self):
1018
        self._waitMessage()
1019

1020 1021 1022 1023 1024 1025 1026 1027 1028
    def setNodeReady(self):
        self.local_var.node_ready = True

    def setNodeNotReady(self):
        self.local_var.node_ready = False

    def isNodeReady(self):
        return self.local_var.node_ready

1029
    def setTID(self, value):
1030
        self.local_var.tid = value
1031 1032

    def getTID(self):
1033
        return self.local_var.tid
1034 1035 1036 1037 1038

    def getConflictSerial(self):
        return self.conflict_serial

    def setTransactionFinished(self):
1039
        self.local_var.txn_finished = True
1040 1041

    def isTransactionFinished(self):
1042
        return self.local_var.txn_finished
1043 1044

    def setTransactionVoted(self):
1045
        self.local_var.txn_voted = True
1046 1047

    def isTransactionVoted(self):
1048
        return self.local_var.txn_voted
1049