app.py 39.9 KB
Newer Older
1
#
Aurel's avatar
Aurel committed
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
# Copyright (C) 2006-2009  Nexedi SA
# 
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

18
from thread import get_ident
19
from cPickle import dumps
20
from zlib import compress, decompress
21
from Queue import Queue, Empty
22
from random import shuffle
Yoshinori Okuji's avatar
Yoshinori Okuji committed
23
from time import sleep
24

25 26
from ZODB.POSException import UndoError, StorageTransactionError, ConflictError

27 28 29 30
from neo import setupLog
setupLog('CLIENT', verbose=True)

from neo import logging
31
from neo import protocol
32
from neo.event import EventManager
33
from neo.util import makeChecksum, dump
34 35 36
from neo.locking import RLock, Lock
from neo.connection import MTClientConnection
from neo.node import NodeManager, MasterNode, StorageNode
37
from neo.connector import getConnectorHandler
38 39 40 41
from neo.client.exception import NEOStorageError, NEOStorageConflictError
from neo.client.exception import NEOStorageNotFoundError
from neo.exception import NeoException
from neo.client.handlers import storage, master
42
from neo.dispatcher import Dispatcher
43
from neo.client.poll import ThreadedPoll
44 45
from neo.client.iterator import Iterator
from neo.client.mq import MQ
46

Aurel's avatar
Aurel committed
47

48 49
class ConnectionClosed(Exception): 
    pass
50

51

52 53
class ConnectionPool(object):
    """This class manages a pool of connections to storage nodes."""
54

55
    def __init__(self, app, max_pool_size = 25):
56
        self.app = app
57
        self.max_pool_size = max_pool_size
58
        self.connection_dict = {}
59 60 61
        # Define a lock in order to create one connection to
        # a storage node at a time to avoid multiple connections
        # to the same node.
Yoshinori Okuji's avatar
Yoshinori Okuji committed
62
        l = RLock()
63 64
        self.connection_lock_acquire = l.acquire
        self.connection_lock_release = l.release
Aurel's avatar
Aurel committed
65

66
    def _initNodeConnection(self, node):
Aurel's avatar
Aurel committed
67
        """Init a connection to a given storage node."""
68
        addr = node.getServer()
69 70
        if addr is None:
            return None
71

Yoshinori Okuji's avatar
Yoshinori Okuji committed
72 73 74
        app = self.app

        # Loop until a connection is obtained.
75
        while True:
76
            logging.debug('trying to connect to %s - %s', node, node.getState())
77
            app.setNodeReady()
78 79 80
            conn = MTClientConnection(app.em, app.storage_event_handler, addr,
                                      connector_handler=app.connector_handler,
                                      dispatcher=app.dispatcher)
81 82 83 84
            conn.lock()
            try:
                if conn.getConnector() is None:
                    # This happens, if a connection could not be established.
85
                    logging.error('Connection to storage node %s failed', node)
86
                    return None
Yoshinori Okuji's avatar
Yoshinori Okuji committed
87

88
                p = protocol.requestNodeIdentification(protocol.CLIENT_NODE_TYPE,
89
                            app.uuid, None, app.name)
90
                msg_id = conn.ask(app.local_var.queue, p)
91 92
            finally:
                conn.unlock()
93

Yoshinori Okuji's avatar
Yoshinori Okuji committed
94
            try:
95 96
                app._waitMessage(conn, msg_id, handler=app.storage_bootstrap_handler)
            except ConnectionClosed:
97
                logging.error('Connection to storage node %s failed', node)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
98 99
                return None

100 101 102 103
            if app.isNodeReady():
                logging.info('connected to storage node %s', node)
                return conn
            else:
104
                logging.info('Storage node %s not ready', node)
Aurel's avatar
Aurel committed
105
                return None
Yoshinori Okuji's avatar
Yoshinori Okuji committed
106

107 108
    def _dropConnections(self):
        """Drop connections."""
Aurel's avatar
Aurel committed
109 110
        for node_uuid, conn in self.connection_dict.items():
            # Drop first connection which looks not used
Aurel's avatar
Aurel committed
111 112
            conn.lock()
            try:
113
                if not conn.pending() and \
114
                        not self.app.dispatcher.registered(conn):
115
                    del self.connection_dict[conn.getUUID()]
Aurel's avatar
Aurel committed
116
                    conn.close()
117
                    logging.debug('_dropConnections : connection to storage node %s:%d closed', 
118 119 120
                                 *(conn.getAddress()))
                    if len(self.connection_dict) <= self.max_pool_size:
                        break
Aurel's avatar
Aurel committed
121 122
            finally:
                conn.unlock()
123 124 125

    def _createNodeConnection(self, node):
        """Create a connection to a given storage node."""
126
        if len(self.connection_dict) > self.max_pool_size:
Aurel's avatar
Aurel committed
127
            # must drop some unused connections
128
            self._dropConnections()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
129 130 131 132 133 134 135

        self.connection_lock_release()
        try:
            conn = self._initNodeConnection(node)
        finally:
            self.connection_lock_acquire()

Aurel's avatar
Aurel committed
136 137
        if conn is None:
            return None
Yoshinori Okuji's avatar
Yoshinori Okuji committed
138

Aurel's avatar
Aurel committed
139 140 141 142
        self.connection_dict[node.getUUID()] = conn
        conn.lock()
        return conn

143 144 145
    def getConnForCell(self, cell):
        return self.getConnForNode(cell.getNode())

Aurel's avatar
Aurel committed
146 147 148
    def getConnForNode(self, node):
        """Return a locked connection object to a given node
        If no connection exists, create a new one"""
149
        if node.getState() != protocol.RUNNING_STATE:
150
            return None
Yoshinori Okuji's avatar
Yoshinori Okuji committed
151
        uuid = node.getUUID()
152 153
        self.connection_lock_acquire()
        try:
154 155
            try:
                conn = self.connection_dict[uuid]
Aurel's avatar
Aurel committed
156 157 158
                # Already connected to node
                conn.lock()
                return conn
159
            except KeyError:
Aurel's avatar
Aurel committed
160 161
                # Create new connection to node
                return self._createNodeConnection(node)
162 163 164
        finally:
            self.connection_lock_release()

165 166
    def removeConnection(self, node):
        """Explicitly remove connection when a node is broken."""
Aurel's avatar
Aurel committed
167 168
        self.connection_lock_acquire()
        try:
169 170 171 172
            try:
                del self.connection_dict[node.getUUID()]
            except KeyError:
                pass
Aurel's avatar
Aurel committed
173 174
        finally:
            self.connection_lock_release()
175

176

177 178 179 180
class ThreadContext(object):

    _threads_dict = {}

181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
    def __init__(self):
        self.tid = None
        self.txn = None
        self.txn_voted = False
        self.txn_finished = False
        self.txn_info = 0
        self.history = None
        self.data_dict = {}
        self.node_tids = {}
        self.node_ready = False
        self.conflict_serial = 0
        self.asked_object = 0
        self.object_stored_counter = 0
        self.voted_counter = 0
        self.object_stored = 0
        self.queue = Queue(5)

198
    def __getThreadData(self):
199
        thread_id = get_ident()
200
        try:
Grégory Wisniewski's avatar
Grégory Wisniewski committed
201
            result = self._threads_dict[thread_id]
202
        except KeyError:
203 204
            self.clear(thread_id)
            result = self._threads_dict[thread_id]
205 206 207 208 209 210 211 212 213 214 215 216 217
        return result

    def __getattr__(self, name):
        thread_data = self.__getThreadData()
        try:
            return thread_data[name]
        except KeyError:
            raise AttributeError, name
        
    def __setattr__(self, name, value):
        thread_data = self.__getThreadData()
        thread_data[name] = value

218 219 220 221
    def clear(self, thread_id=None):
        if thread_id is None:
            thread_id = get_ident()
        self._threads_dict[thread_id] = {
222 223 224 225 226 227
            'tid': None,
            'txn': None,
            'data_dict': {},
            'object_stored': 0,
            'txn_voted': False,
            'txn_finished': False,
228
            'queue': Queue(5),
229 230 231
        }


Aurel's avatar
Aurel committed
232
class Application(object):
233 234
    """The client node application."""

235
    def __init__(self, master_nodes, name, connector=None, **kw):
236
        # XXX: use a configuration entry
237 238 239
        em = EventManager()
        # Start polling thread
        self.poll_thread = ThreadedPoll(em)
240
        # Internal Attributes common to all thread
241
        self._db = None
Aurel's avatar
Aurel committed
242
        self.name = name
243
        self.em = em
244
        self.connector_handler = getConnectorHandler(connector)
245
        self.dispatcher = Dispatcher()
246
        self.nm = NodeManager()
247
        self.cp = ConnectionPool(self)
248
        self.pt = None
249
        self.master_conn = None
250
        self.primary_master_node = None
251 252
        self.trying_master_node = None
        # XXX: this code duplicates neo.config.ConfigurationManager.getMasterNodeList
253
        logging.debug('master node address are %s' % (master_nodes,))
254 255 256 257 258 259 260 261 262 263 264 265 266
        self.master_node_list = master_node_list = []
        for node in master_nodes.split():
            if not node:
                continue
            if ':' in node:
                ip_address, port = node.split(':')
                port = int(port)
            else:
                ip_address = node
                port = 10100 # XXX: default_master_port
            server = (ip_address, port)
            master_node_list.append(server)
            self.nm.add(MasterNode(server=server))
267
        # no self-assigned UUID, primary master will supply us one
268
        self.uuid = None
269
        self.mq_cache = MQ()
270
        self.new_oid_list = []
271
        self.last_oid = '\0' * 8
272 273
        self.storage_event_handler = storage.StorageEventHandler(self, self.dispatcher)
        self.storage_bootstrap_handler = storage.StorageBootstrapHandler(self)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
274
        self.storage_handler = storage.StorageAnswersHandler(self)
275 276 277 278
        self.primary_handler = master.PrimaryAnswersHandler(self)
        self.primary_bootstrap_handler = master.PrimaryBootstrapHandler(self)
        self.notifications_handler = master.PrimaryNotificationsHandler(
                self, self.dispatcher)
279
        # Internal attribute distinct between thread
280
        self.local_var = ThreadContext()
281
        # Lock definition :
282
        # _load_lock is used to make loading and storing atomic
283
        lock = Lock()
284 285
        self._load_lock_acquire = lock.acquire
        self._load_lock_release = lock.release
286 287
        # _oid_lock is used in order to not call multiple oid
        # generation at the same time
288 289
        lock = Lock()
        self._oid_lock_acquire = lock.acquire
Aurel's avatar
Aurel committed
290
        self._oid_lock_release = lock.release
291
        lock = Lock()
292
        # _cache_lock is used for the client cache
293 294
        self._cache_lock_acquire = lock.acquire
        self._cache_lock_release = lock.release
295
        lock = Lock()
296 297
        # _connecting_to_master_node is used to prevent simultaneous master
        # node connection attemps
298 299
        self._connecting_to_master_node_acquire = lock.acquire
        self._connecting_to_master_node_release = lock.release
300 301 302 303
        # _nm ensure exclusive access to the node manager
        lock = Lock()
        self._nm_acquire = lock.acquire
        self._nm_release = lock.release
304

305
    def _waitMessage(self, target_conn = None, msg_id = None, handler=None):
306
        """Wait for a message returned by the dispatcher in queues."""
307
        local_queue = self.local_var.queue
308
        while 1:
309 310
            if msg_id is None:
                try:
311
                    conn, packet = local_queue.get_nowait()
312 313 314 315 316
                except Empty:
                    break
            else:
                conn, packet = local_queue.get()
            # check fake packet
Yoshinori Okuji's avatar
Yoshinori Okuji committed
317
            if packet is None:
318
                if conn.getUUID() == target_conn.getUUID():
319
                    raise ConnectionClosed
Yoshinori Okuji's avatar
Yoshinori Okuji committed
320 321
                else:
                    continue
322 323 324 325 326 327 328 329
            # Guess the handler to use based on the type of node on the
            # connection
            if handler is None:
                node = self.nm.getNodeByServer(conn.getAddress())
                if node is None:
                    raise ValueError, 'Expecting an answer from a node ' \
                        'which type is not known... Is this right ?'
                else:
330
                    if node.isStorage():
331
                        handler = self.storage_handler
332
                    elif node.isMaster():
333 334 335 336
                        handler = self.primary_handler
                    else:
                        raise ValueError, 'Unknown node type: %r' % (
                            node_type, )
337
            handler.dispatch(conn, packet)
338
            if target_conn is conn and msg_id == packet.getId():
339
                break
340

341 342 343
    def _askStorage(self, conn, packet, timeout=5, additional_timeout=30):
        """ Send a request to a storage node and process it's answer """
        try:
344 345
            msg_id = conn.ask(self.local_var.queue, packet, timeout,
                              additional_timeout)
346 347 348 349 350 351 352
        finally:
            # assume that the connection was already locked
            conn.unlock()
        self._waitMessage(conn, msg_id, self.storage_handler)

    def _askPrimary(self, packet, timeout=5, additional_timeout=30):
        """ Send a request to the primary master and process it's answer """
353
        conn = self._getMasterConnection()
354 355
        conn.lock()
        try:
356 357
            msg_id = conn.ask(self.local_var.queue, packet, timeout,
                              additional_timeout)
358 359 360 361
        finally:
            conn.unlock()
        self._waitMessage(conn, msg_id, self.primary_handler)

362 363
    def _getMasterConnection(self):
        """ Connect to the primary master node on demand """
364
        # acquire the lock to allow only one thread to connect to the primary
365
        lock = self._connecting_to_master_node_acquire()
366
        try:
367
            if self.master_conn is None:
368
                self.new_oid_list = []
369 370 371 372 373
                self.master_conn = self._connectToPrimaryMasterNode()
            return self.master_conn
        finally:
            self._connecting_to_master_node_release()

374
    def _getPartitionTable(self):
375
        """ Return the partition table manager, reconnect the PMN if needed """
376 377 378 379 380
        # this ensure the master connection is established and the partition
        # table is up to date.
        self._getMasterConnection()
        return self.pt

381 382
    def _getCellListForOID(self, oid, readable=False, writable=False):
        """ Return the cells available for the specified OID """
383
        pt = self._getPartitionTable()
384 385 386 387
        return pt.getCellListForOID(oid, readable, writable)

    def _getCellListForTID(self, tid, readable=False, writable=False):
        """ Return the cells available for the specified TID """
388
        pt = self._getPartitionTable()
389
        return pt.getCellListForTID(tid, readable, writable)
390 391 392

    def _connectToPrimaryMasterNode(self):
        logging.debug('connecting to primary master...')
393 394 395 396 397 398 399 400 401 402
        ready = False
        nm = self.nm
        while not ready:
            # Get network connection to primary master
            index = 0
            connected = False
            while not connected:
                if self.primary_master_node is not None:
                    # If I know a primary master node, pinpoint it.
                    self.trying_master_node = self.primary_master_node
403
                    self.primary_master_node = None
404 405 406 407 408 409
                else:
                    # Otherwise, check one by one.
                    master_list = nm.getMasterNodeList()
                    try:
                        self.trying_master_node = master_list[index]
                    except IndexError:
410
                        sleep(1)
411 412 413 414
                        index = 0
                        self.trying_master_node = master_list[0]
                    index += 1
                # Connect to master
415
                conn = MTClientConnection(self.em, self.notifications_handler,
416
                                          addr=self.trying_master_node.getServer(),
417 418
                                          connector_handler=self.connector_handler,
                                          dispatcher=self.dispatcher)
419 420
                # Query for primary master node
                conn.lock()
421
                try:
422 423 424 425 426
                    if conn.getConnector() is None:
                        # This happens, if a connection could not be established.
                        logging.error('Connection to master node %s failed',
                                      self.trying_master_node)
                        continue
427
                    msg_id = conn.ask(self.local_var.queue, protocol.askPrimaryMaster())
428 429
                finally:
                    conn.unlock()
430 431 432 433
                try:
                    self._waitMessage(conn, msg_id, handler=self.primary_bootstrap_handler)
                except ConnectionClosed:
                    continue
434 435 436 437
                # If we reached the primary master node, mark as connected
                connected = self.primary_master_node is not None \
                            and self.primary_master_node is self.trying_master_node

438
            logging.info('connected to a primary master node')
439
            # Identify to primary master and request initial data
440 441 442
            while conn.getUUID() is None:
                conn.lock()
                try:
443 444 445 446 447
                    if conn.getConnector() is None:
                        logging.error('Connection to master node %s lost',
                                      self.trying_master_node)
                        self.primary_master_node = None
                        break
448
                    p = protocol.requestNodeIdentification(protocol.CLIENT_NODE_TYPE,
449
                            self.uuid, None, self.name)
450
                    msg_id = conn.ask(self.local_var.queue, p)
451 452
                finally:
                    conn.unlock()
453 454 455 456 457
                try:
                    self._waitMessage(conn, msg_id, handler=self.primary_bootstrap_handler)
                except ConnectionClosed:
                    self.primary_master_node = None
                    break
458
                if conn.getUUID() is None:
459 460 461 462 463 464 465 466 467 468
                    # Node identification was refused by master.
                    # Sleep a bit an retry.
                    # XXX: This should be replaced by:
                    # - queuing requestNodeIdentification at master side
                    # - sending the acceptance from master when it becomes
                    #   ready
                    # Thus removing the need to:
                    # - needlessly bother the primary master every 5 seconds
                    #   (...per client)
                    # - have a sleep in the code (yuck !)
469
                    sleep(1)
470
            if self.uuid is not None:
471 472
                conn.lock()
                try:
473
                    msg_id = conn.ask(self.local_var.queue,
474
                                      protocol.askNodeInformation())
475 476 477 478 479
                finally:
                    conn.unlock()
                self._waitMessage(conn, msg_id, handler=self.primary_bootstrap_handler)
                conn.lock()
                try:
480
                    msg_id = conn.ask(self.local_var.queue,
481
                                      protocol.askPartitionTable([]))
482 483 484
                finally:
                    conn.unlock()
                self._waitMessage(conn, msg_id, handler=self.primary_bootstrap_handler)
485
            ready = self.uuid is not None and self.pt is not None \
486
                                 and self.pt.operational()
487 488 489
        logging.info("connected to primary master node %s" % self.primary_master_node)
        return conn
        
490 491 492
    def registerDB(self, db, limit):
        self._db = db

493 494 495
    def getDB(self):
        return self._db

496 497 498 499 500
    def new_oid(self):
        """Get a new OID."""
        self._oid_lock_acquire()
        try:
            if len(self.new_oid_list) == 0:
501 502 503 504
                # Get new oid list from master node
                # we manage a list of oid here to prevent
                # from asking too many time new oid one by one
                # from master node
505
                self._askPrimary(protocol.askNewOIDs(100))
506 507
                if len(self.new_oid_list) <= 0:
                    raise NEOStorageError('new_oid failed')
508 509
            self.last_oid = self.new_oid_list.pop()
            return self.last_oid
510 511 512
        finally:
            self._oid_lock_release()

513 514 515 516
    def getStorageSize(self):
        # return the last OID used, this is innacurate
        from neo.util import u64
        return int(u64(self.last_oid))
517

Aurel's avatar
Aurel committed
518 519 520 521
    def getSerial(self, oid):
        # Try in cache first
        self._cache_lock_acquire()
        try:
522 523
            if oid in self.mq_cache:
                return self.mq_cache[oid][0]
Aurel's avatar
Aurel committed
524 525 526
        finally:
            self._cache_lock_release()
        # history return serial, so use it
527
        hist = self.history(oid, length = 1, object_only = 1)
Aurel's avatar
Aurel committed
528 529
        if len(hist) == 0:
            raise NEOStorageNotFoundError()
530 531 532 533
        if hist[0] != oid:
            raise NEOStorageError('getSerial failed')
        return hist[1][0][0]

Aurel's avatar
Aurel committed
534

535
    def _load(self, oid, serial=None, tid=None, cache=0):
Aurel's avatar
Aurel committed
536
        """Internal method which manage load ,loadSerial and loadBefore."""
537
        cell_list = self._getCellListForOID(oid, readable=True)
538 539 540 541 542 543 544 545 546 547
        if len(cell_list) == 0:
            # No cells available, so why are we running ?
            logging.error('oid %s not found because no storage is available for it', dump(oid))
            raise NEOStorageNotFoundError()

        shuffle(cell_list)
        self.local_var.asked_object = 0
        for cell in cell_list:
            logging.debug('trying to load %s from %s',
                          dump(oid), dump(cell.getUUID()))
548
            conn = self.cp.getConnForCell(cell)
549 550
            if conn is None:
                continue
551

552
            try:
553
                self._askStorage(conn, protocol.askObject(oid, serial, tid))
554
            except ConnectionClosed:
555
                continue
556

557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583
            if self.local_var.asked_object == -1:
                # OID not found
                break

            # Check data
            noid, start_serial, end_serial, compression, checksum, data \
                = self.local_var.asked_object
            if noid != oid:
                # Oops, try with next node
                logging.error('got wrong oid %s instead of %s from node %s',
                              noid, dump(oid), cell.getServer())
                self.local_var.asked_object = -1
                continue
            elif checksum != makeChecksum(data):
                # Check checksum.
                logging.error('wrong checksum from node %s for oid %s',
                              cell.getServer(), dump(oid))
                self.local_var.asked_object = -1
                continue
            else:
                # Everything looks alright.
                break

        if self.local_var.asked_object == 0:
            # We didn't got any object from all storage node because of connection error
            logging.warning('oid %s not found because of connection failure', dump(oid))
            raise NEOStorageNotFoundError()
584

Aurel's avatar
Aurel committed
585
        if self.local_var.asked_object == -1:
586
            # We didn't got any object from all storage node
587
            logging.info('oid %s not found', dump(oid))
588
            raise NEOStorageNotFoundError()
589

590
        # Uncompress data
Aurel's avatar
Aurel committed
591
        if compression:
592
            data = decompress(data)
593

Aurel's avatar
Aurel committed
594 595
        # Put in cache only when using load
        if cache:
Aurel's avatar
Aurel committed
596
            self._cache_lock_acquire()
Aurel's avatar
Aurel committed
597
            try:
598
                self.mq_cache[oid] = start_serial, data
Aurel's avatar
Aurel committed
599
            finally:
Aurel's avatar
Aurel committed
600
                self._cache_lock_release()
601
        return data, start_serial, end_serial
602

603

604 605 606
    def load(self, oid, version=None):
        """Load an object for a given oid."""
        # First try from cache
607
        self._load_lock_acquire()
608
        try:
609 610 611 612
            self._cache_lock_acquire()
            try:
                if oid in self.mq_cache:
                    logging.debug('load oid %s is cached', dump(oid))
613 614
                    serial, data = self.mq_cache[oid]
                    return data, serial
615 616 617 618
            finally:
                self._cache_lock_release()
            # Otherwise get it from storage node
            return self._load(oid, cache=1)[:2]
619
        finally:
620
            self._load_lock_release()
Aurel's avatar
Aurel committed
621

622

623
    def loadSerial(self, oid, serial):
Aurel's avatar
Aurel committed
624
        """Load an object for a given oid and serial."""
625 626
        # Do not try in cache as it manages only up-to-date object
        logging.debug('loading %s at %s', dump(oid), dump(serial))
627
        return self._load(oid, serial=serial)[0]
Aurel's avatar
Aurel committed
628

629

630
    def loadBefore(self, oid, tid):
Aurel's avatar
Aurel committed
631
        """Load an object for a given oid before tid committed."""
632 633
        # Do not try in cache as it manages only up-to-date object
        logging.debug('loading %s before %s', dump(oid), dump(tid))
634
        data, start, end = self._load(oid, tid=tid)
635 636 637 638 639
        if end is None:
            # No previous version
            return None
        else:
            return data, start, end
Aurel's avatar
Aurel committed
640

641

642 643 644
    def tpc_begin(self, transaction, tid=None, status=' '):
        """Begin a new transaction."""
        # First get a transaction, only one is allowed at a time
645
        if self.local_var.txn is transaction:
646
            # We already begin the same transaction
647
            return
648 649 650 651 652 653 654
        # ask the primary master to start a transaction, if no tid is supplied,
        # the master will supply us one. Otherwise the requested tid will be
        # used if possible.
        self.local_var.tid = None
        self._askPrimary(protocol.askBeginTransaction(tid))
        if self.local_var.tid is None:
            raise NEOStorageError('tpc_begin failed')
655
        self.local_var.txn = transaction            
656

657

658 659
    def store(self, oid, serial, data, version, transaction):
        """Store object."""
660
        if transaction is not self.local_var.txn:
661
            raise StorageTransactionError(self, transaction)
Aurel's avatar
Aurel committed
662
        logging.debug('storing oid %s serial %s',
663
                     dump(oid), dump(serial))
664
        # Find which storage node to use
665
        cell_list = self._getCellListForOID(oid, writable=True)
666
        if len(cell_list) == 0:
Aurel's avatar
Aurel committed
667
            raise NEOStorageError
668
        # Store data on each node
669
        compressed_data = compress(data)
670
        checksum = makeChecksum(compressed_data)
671
        self.local_var.object_stored_counter = 0
672
        for cell in cell_list:
673
            conn = self.cp.getConnForCell(cell)
674
            if conn is None:                
675
                continue
676

677 678 679
            self.local_var.object_stored = 0
            p = protocol.askStoreObject(oid, serial, 1,
                     checksum, compressed_data, self.local_var.tid)
680 681
            try:
                self._askStorage(conn, p)
682
            except ConnectionClosed:
683
                continue
684 685

            # Check we don't get any conflict
686 687
            if self.local_var.object_stored[0] == -1:
                if self.local_var.data_dict.has_key(oid):
Aurel's avatar
Aurel committed
688 689 690 691
                    # One storage already accept the object, is it normal ??
                    # remove from dict and raise ConflictError, don't care of
                    # previous node which already store data as it would be resent
                    # again if conflict is resolved or txn will be aborted
692
                    del self.local_var.data_dict[oid]
693
                self.local_var.conflict_serial = self.local_var.object_stored[1]
694
                raise NEOStorageConflictError
695 696
            # increase counter so that we know if a node has stored the object or not
            self.local_var.object_stored_counter += 1
697

698 699 700 701
        if self.local_var.object_stored_counter == 0:
            # no storage nodes were available
            raise NEOStorageError('tpc_store failed')
        
702
        # Store object in tmp cache
703
        self.local_var.data_dict[oid] = data
704

705
        return self.local_var.tid
706

Aurel's avatar
Aurel committed
707

708 709
    def tpc_vote(self, transaction):
        """Store current transaction."""
710
        if transaction is not self.local_var.txn:
711
            raise StorageTransactionError(self, transaction)
712 713
        user = transaction.user
        desc = transaction.description
Aurel's avatar
Aurel committed
714
        ext = dumps(transaction._extension)
715
        oid_list = self.local_var.data_dict.keys()
716
        # Store data on each node
717
        pt = self._getPartitionTable()
718
        cell_list = self._getCellListForTID(self.local_var.tid, writable=True)
719
        self.local_var.voted_counter = 0
720
        for cell in cell_list:
721
            logging.debug("voting object %s %s" %(cell.getServer(), cell.getState()))
722
            conn = self.cp.getConnForCell(cell)
723 724
            if conn is None:
                continue
725

726 727 728
            self.local_var.txn_voted = False
            p = protocol.askStoreTransaction(self.local_var.tid, 
                    user, desc, ext, oid_list)
729 730
            try:
                self._askStorage(conn, p)
731
            except ConnectionClosed:
732
                continue
733

734
            if not self.isTransactionVoted():
735
                raise NEOStorageError('tpc_vote failed')
736 737 738 739 740
            self.local_var.voted_counter += 1

        # check at least one storage node accepted
        if self.local_var.voted_counter == 0:
            raise NEOStorageError('tpc_vote failed')
741

742 743
    def tpc_abort(self, transaction):
        """Abort current transaction."""
744
        if transaction is not self.local_var.txn:
745
            return
Aurel's avatar
Aurel committed
746

747
        cell_set = set()
748 749
        # select nodes where objects were stored
        for oid in self.local_var.data_dict.iterkeys():
750
            cell_set |= set(self._getCellListForOID(oid, writable=True))
751
        # select nodes where transaction was stored
752
        cell_set |= set(self._getCellListForTID(self.local_var.tid, writable=True))
Aurel's avatar
Aurel committed
753

754 755
        # cancel transaction one all those nodes
        for cell in cell_set:
756
            conn = self.cp.getConnForCell(cell)
757 758 759
            if conn is None:
                continue
            try:
760
                conn.notify(protocol.abortTransaction(self.local_var.tid))
761 762
            finally:
                conn.unlock()
763

764
        # Abort the transaction in the primary master node.
765
        conn = self._getMasterConnection()
766 767
        conn.lock()
        try:
768
            conn.notify(protocol.abortTransaction(self.local_var.tid))
769 770
        finally:
            conn.unlock()
771
        self.local_var.clear()
772

773 774
    def tpc_finish(self, transaction, f=None):
        """Finish current transaction."""
775
        if self.local_var.txn is not transaction:
776
            return
777
        self._load_lock_acquire()
778
        try:
779 780
            # Call function given by ZODB
            if f is not None:
781
                f(self.local_var.tid)
782 783

            # Call finish on master
784
            oid_list = self.local_var.data_dict.keys()
785 786
            p = protocol.finishTransaction(oid_list, self.local_var.tid)
            self._askPrimary(p)
787

788
            if not self.isTransactionFinished():
789
                raise NEOStorageError('tpc_finish failed')
Aurel's avatar
Aurel committed
790

791 792 793
            # Update cache
            self._cache_lock_acquire()
            try:
794 795
                for oid in self.local_var.data_dict.iterkeys():
                    data = self.local_var.data_dict[oid]
796
                    # Now serial is same as tid
797
                    self.mq_cache[oid] = self.local_var.tid, data
798 799
            finally:
                self._cache_lock_release()
800
            self.local_var.clear()
801
            return self.local_var.tid
802
        finally:
803
            self._load_lock_release()
804

Aurel's avatar
Aurel committed
805
    def undo(self, transaction_id, txn, wrapper):
806
        if txn is not self.local_var.txn:
807
            raise StorageTransactionError(self, transaction_id)
808

809
        # First get transaction information from a storage node.
810
        cell_list = self._getCellListForTID(transaction_id, writable=True)
811 812
        shuffle(cell_list)
        for cell in cell_list:
813
            conn = self.cp.getConnForCell(cell)
814 815
            if conn is None:
                continue
816

817
            self.local_var.txn_info = 0
818 819
            try:
                self._askStorage(conn, protocol.askTransactionInformation(transaction_id))
820
            except ConnectionClosed:
821
                continue
822

823 824 825
            if self.local_var.txn_info == -1:
                # Tid not found, try with next node
                continue
826
            elif isinstance(self.local_var.txn_info, dict):
827 828 829 830
                break
            else:
                raise NEOStorageError('undo failed')

831
        if self.local_var.txn_info in (-1, 0):
832 833 834
            raise NEOStorageError('undo failed')

        oid_list = self.local_var.txn_info['oids']
Aurel's avatar
Aurel committed
835 836 837
        # Second get object data from storage node using loadBefore
        data_dict = {}
        for oid in oid_list:
838 839 840 841 842
            try:
                result = self.loadBefore(oid, transaction_id)
            except NEOStorageNotFoundError:
                # no previous revision, can't undo (as in filestorage)
                raise UndoError("no previous record", oid)
843
            data, start, end = result
Aurel's avatar
Aurel committed
844 845 846
            # end must be TID we are going to undone otherwise it means
            # a later transaction modify the object
            if end != transaction_id:
847
                raise UndoError("non-undoable transaction", oid)
Aurel's avatar
Aurel committed
848
            data_dict[oid] = data
Aurel's avatar
Aurel committed
849

850
        # Third do transaction with old data
851 852
        oid_list = data_dict.keys()
        for oid in oid_list:
Aurel's avatar
Aurel committed
853
            data = data_dict[oid]
Aurel's avatar
Aurel committed
854
            try:
855
                self.store(oid, transaction_id, data, None, txn)
Aurel's avatar
Aurel committed
856
            except NEOStorageConflictError, serial:
857 858
                if serial <= self.local_var.tid:
                    new_data = wrapper.tryToResolveConflict(oid, self.local_var.tid,
859
                                                            serial, data)
Aurel's avatar
Aurel committed
860
                    if new_data is not None:
861
                        self.store(oid, self.local_var.tid, new_data, None, txn)
Aurel's avatar
Aurel committed
862
                        continue
863
                raise ConflictError(oid = oid, serials = (self.local_var.tid, serial),
864
                                    data = data)
865
        return self.local_var.tid, oid_list
866

867
    def __undoLog(self, first, last, filter=None, block=0, with_oids=False):
868 869 870 871
        if last < 0:
            # See FileStorage.py for explanation
            last = first - last

872
        # First get a list of transactions from all storage nodes.
873 874
        # Each storage node will return TIDs only for UP_TO_DATE_STATE and
        # FEEDING_STATE cells
875
        pt = self._getPartitionTable()
876
        storage_node_list = pt.getNodeList()
877

878 879
        self.local_var.node_tids = {}
        for storage_node in storage_node_list:
880
            conn = self.cp.getConnForNode(storage_node)
881 882
            if conn is None:
                continue
883

884
            try:
885
                conn.ask(self.local_var.queue,
886
                         protocol.askTIDs(first, last, protocol.INVALID_PARTITION))
887 888 889 890
            finally:
                conn.unlock()

        # Wait for answers from all storages.
891
        while len(self.local_var.node_tids) != len(storage_node_list):
892 893
            try:
                self._waitMessage(handler=self.storage_handler)
894
            except ConnectionClosed:
895
                continue
896 897

        # Reorder tids
898 899 900
        ordered_tids = set()
        update = ordered_tids.update
        for tid_list in self.local_var.node_tids.itervalues():
901
            update(tid_list)
902
        ordered_tids = list(ordered_tids)
903 904
        # XXX do we need a special cmp function here ?
        ordered_tids.sort(reverse=True)
905
        logging.debug("UndoLog, tids %s", ordered_tids)
906 907
        # For each transaction, get info
        undo_info = []
908
        append = undo_info.append
909
        for tid in ordered_tids:
910
            cell_list = self._getCellListForTID(tid, readable=True)
911 912
            shuffle(cell_list)
            for cell in cell_list:
913
                conn = self.cp.getConnForCell(cell)
914 915 916 917
                if conn is not None:
                    self.local_var.txn_info = 0
                    try:
                        self._askStorage(conn, protocol.askTransactionInformation(tid))
918
                    except ConnectionClosed:
919 920 921
                        continue
                    if isinstance(self.local_var.txn_info, dict):
                        break
922

923
            if self.local_var.txn_info in (-1, 0):
Aurel's avatar
Aurel committed
924
                # TID not found at all
925 926 927
                raise NeoException, 'Data inconsistency detected: ' \
                                    'transaction info for TID %r could not ' \
                                    'be found' % (tid, )
Aurel's avatar
Aurel committed
928

929
            if filter is None or filter(self.local_var.txn_info):
930 931
                if not with_oids:
                    self.local_var.txn_info.pop("oids")
932 933 934
                append(self.local_var.txn_info)
                if len(undo_info) >= last - first:
                    break
935 936 937 938
        # Check we return at least one element, otherwise call
        # again but extend offset
        if len(undo_info) == 0 and not block:
            undo_info = self.undoLog(first=first, last=last*5, filter=filter, block=1)
939 940
        return undo_info

941
    def undoLog(self, first, last, filter=None, block=0):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
942
        return self.__undoLog(first, last, filter, block)
943 944

    def transactionLog(self, first, last):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
945
        return self.__undoLog(first, last, with_oids=True)
946

947
    def history(self, oid, version=None, length=1, filter=None, object_only=0):
948
        # Get history informations for object first
949
        cell_list = self._getCellListForOID(oid, readable=True)
950 951 952
        shuffle(cell_list)

        for cell in cell_list:
953
            conn = self.cp.getConnForCell(cell)
954 955
            if conn is None:
                continue
956

957
            self.local_var.history = None
958 959
            try:
                self._askStorage(conn, protocol.askObjectHistory(oid, 0, length))
960
            except ConnectionClosed:
961
                continue
962

963 964 965 966 967
            if self.local_var.history == -1:
                # Not found, go on with next node
                continue
            if self.local_var.history[0] != oid:
                # Got history for wrong oid
968 969 970
                raise NEOStorageError('inconsistency in storage: asked oid ' \
                                      '%r, got %r' % (
                                      oid, self.local_var.history[0]))
971

972
        if not isinstance(self.local_var.history, tuple):
973 974 975 976 977 978
            raise NEOStorageError('history failed')
        if object_only:
            # Use by getSerial
            return self.local_var.history

        # Now that we have object informations, get txn informations
Aurel's avatar
Aurel committed
979
        history_list = []
980
        for serial, size in self.local_var.history[1]:
981
            self._getCellListForTID(serial, readable=True)
982 983 984
            shuffle(cell_list)

            for cell in cell_list:
985
                conn = self.cp.getConnForCell(cell)
986 987
                if conn is None:
                    continue
988

989 990
                # ask transaction information
                self.local_var.txn_info = None
991 992
                try:
                    self._askStorage(conn, protocol.askTransactionInformation(serial))
993
                except ConnectionClosed:
994
                    continue
995

996 997
                if self.local_var.txn_info == -1:
                    # TID not found
Aurel's avatar
Aurel committed
998
                    continue
999
                if isinstance(self.local_var.txn_info, dict):
1000 1001 1002
                    break

            # create history dict
1003 1004
            self.local_var.txn_info.pop('id')
            self.local_var.txn_info.pop('oids')
1005
            self.local_var.txn_info['tid'] = serial
1006 1007
            self.local_var.txn_info['version'] = None
            self.local_var.txn_info['size'] = size
1008 1009
            if filter is None or filter(self.local_var.txn_info):
                history_list.append(self.local_var.txn_info)
1010 1011

        return history_list
Aurel's avatar
Aurel committed
1012

1013 1014 1015
    def iterator(self, start=None, stop=None):
        return Iterator(self, start, stop)

1016 1017 1018 1019 1020
    def __del__(self):
        """Clear all connection."""
        # Due to bug in ZODB, close is not always called when shutting
        # down zope, so use __del__ to close connections
        for conn in self.em.getConnectionList():
1021
            conn.close()
1022 1023
        # Stop polling thread
        self.poll_thread.stop()
1024
    close = __del__
1025 1026

    def sync(self):
1027
        self._waitMessage()
1028

1029 1030 1031 1032 1033 1034 1035 1036 1037
    def setNodeReady(self):
        self.local_var.node_ready = True

    def setNodeNotReady(self):
        self.local_var.node_ready = False

    def isNodeReady(self):
        return self.local_var.node_ready

1038
    def setTID(self, value):
1039
        self.local_var.tid = value
1040 1041

    def getTID(self):
1042
        return self.local_var.tid
1043 1044

    def getConflictSerial(self):
1045
        return self.local_var.conflict_serial
1046 1047

    def setTransactionFinished(self):
1048
        self.local_var.txn_finished = True
1049 1050

    def isTransactionFinished(self):
1051
        return self.local_var.txn_finished
1052 1053

    def setTransactionVoted(self):
1054
        self.local_var.txn_voted = True
1055 1056

    def isTransactionVoted(self):
1057
        return self.local_var.txn_voted
1058