app.py 36.7 KB
Newer Older
Aurel's avatar
Aurel committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
# Copyright (C) 2006-2009  Nexedi SA
# 
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

17 18
import logging
import os
19
from thread import get_ident
20
from cPickle import dumps, loads
21
from zlib import compress, decompress
22
from Queue import Queue, Empty
23
from random import shuffle
Yoshinori Okuji's avatar
Yoshinori Okuji committed
24
from time import sleep
25 26

from neo.client.mq import MQ
27
from neo.node import NodeManager, MasterNode, StorageNode
28
from neo.connection import MTClientConnection
29
from neo import protocol
30
from neo.protocol import Packet, INVALID_UUID, INVALID_TID, INVALID_PARTITION, \
Aurel's avatar
Aurel committed
31 32
        INVALID_PTID, CLIENT_NODE_TYPE, UP_TO_DATE_STATE, INVALID_SERIAL, \
        DOWN_STATE, HIDDEN_STATE
33
from neo.client.handler import *
34
from neo.client.exception import NEOStorageError, NEOStorageConflictError, \
35
     NEOStorageNotFoundError, NEOStorageConnectionFailure
36
from neo.util import makeChecksum, dump
37
from neo.connector import getConnectorHandler
38 39 40
from neo.client.dispatcher import Dispatcher
from neo.client.poll import ThreadedPoll
from neo.event import EventManager
41
from neo.locking import RLock, Lock
42

Aurel's avatar
Aurel committed
43
from ZODB.POSException import UndoError, StorageTransactionError, ConflictError
Aurel's avatar
Aurel committed
44

45 46
class ConnectionPool(object):
    """This class manages a pool of connections to storage nodes."""
47

48
    def __init__(self, app, max_pool_size = 25):
49
        self.app = app
50
        self.max_pool_size = max_pool_size
51
        self.connection_dict = {}
52 53 54
        # Define a lock in order to create one connection to
        # a storage node at a time to avoid multiple connections
        # to the same node.
Yoshinori Okuji's avatar
Yoshinori Okuji committed
55
        l = RLock()
56 57
        self.connection_lock_acquire = l.acquire
        self.connection_lock_release = l.release
Aurel's avatar
Aurel committed
58

59
    def _initNodeConnection(self, cell):
Aurel's avatar
Aurel committed
60
        """Init a connection to a given storage node."""
61 62
        node = cell.getNode()
        addr = node.getServer()
63 64
        if addr is None:
            return None
65

Yoshinori Okuji's avatar
Yoshinori Okuji committed
66 67 68
        app = self.app

        # Loop until a connection is obtained.
69
        while True:
Aurel's avatar
Aurel committed
70
            logging.info('trying to connect to %s - %s', node, node.getState())
71
            app.setNodeReady()
72
            handler = StorageBootstrapHandler(app, app.dispatcher)
73
            conn = MTClientConnection(app.em, handler, addr,
74
                                      connector_handler=app.connector_handler)
75 76 77 78
            conn.lock()
            try:
                if conn.getConnector() is None:
                    # This happens, if a connection could not be established.
79
                    logging.error('Connection to storage node %s failed', node)
80
                    return None
Yoshinori Okuji's avatar
Yoshinori Okuji committed
81

82 83 84
                p = protocol.requestNodeIdentification(CLIENT_NODE_TYPE,
                            app.uuid, addr[0], addr[1], app.name)
                msg_id = conn.ask(p)
85
                app.dispatcher.register(conn, msg_id, app.local_var.queue)
86 87
            finally:
                conn.unlock()
88

Yoshinori Okuji's avatar
Yoshinori Okuji committed
89
            try:
90
                app._waitMessage(conn, msg_id, handler=handler)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
91
            except NEOStorageError:
92
                logging.error('Connection to storage node %s failed', node)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
93 94
                return None

95 96
            if app.isNodeReady():
                logging.info('connected to storage node %s', node)
97
                conn.setHandler(self.app.storage_handler)
98 99
                return conn
            else:
Yoshinori Okuji's avatar
Yoshinori Okuji committed
100
                # Connection failed, notify primary master node
101
                logging.info('Storage node %s not ready', node)
Aurel's avatar
Aurel committed
102
                return None
Yoshinori Okuji's avatar
Yoshinori Okuji committed
103

104 105
    def _dropConnections(self):
        """Drop connections."""
Aurel's avatar
Aurel committed
106 107
        for node_uuid, conn in self.connection_dict.items():
            # Drop first connection which looks not used
Aurel's avatar
Aurel committed
108 109
            conn.lock()
            try:
110
                if not conn.pending() and \
111
                        not self.app.dispatcher.registered(conn):
112
                    del self.connection_dict[conn.getUUID()]
Aurel's avatar
Aurel committed
113
                    conn.close()
Aurel's avatar
Aurel committed
114
                    logging.info('_dropConnections : connection to storage node %s:%d closed', 
115 116 117
                                 *(conn.getAddress()))
                    if len(self.connection_dict) <= self.max_pool_size:
                        break
Aurel's avatar
Aurel committed
118 119
            finally:
                conn.unlock()
120 121 122

    def _createNodeConnection(self, node):
        """Create a connection to a given storage node."""
123
        if len(self.connection_dict) > self.max_pool_size:
Aurel's avatar
Aurel committed
124
            # must drop some unused connections
125
            self._dropConnections()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
126 127 128 129 130 131 132

        self.connection_lock_release()
        try:
            conn = self._initNodeConnection(node)
        finally:
            self.connection_lock_acquire()

Aurel's avatar
Aurel committed
133 134
        if conn is None:
            return None
Yoshinori Okuji's avatar
Yoshinori Okuji committed
135

Aurel's avatar
Aurel committed
136 137 138 139 140 141 142 143 144 145 146
        # add node to node manager
        if self.app.nm.getNodeByServer(node.getServer()) is None:
            n = StorageNode(node.getServer())
            self.app.nm.add(n)
        self.connection_dict[node.getUUID()] = conn
        conn.lock()
        return conn

    def getConnForNode(self, node):
        """Return a locked connection object to a given node
        If no connection exists, create a new one"""
Aurel's avatar
Aurel committed
147 148
        if node.getNodeState() in (DOWN_STATE, HIDDEN_STATE):
            return None
Yoshinori Okuji's avatar
Yoshinori Okuji committed
149
        uuid = node.getUUID()
150 151
        self.connection_lock_acquire()
        try:
152 153
            try:
                conn = self.connection_dict[uuid]
Aurel's avatar
Aurel committed
154 155 156
                # Already connected to node
                conn.lock()
                return conn
157
            except KeyError:
Aurel's avatar
Aurel committed
158 159
                # Create new connection to node
                return self._createNodeConnection(node)
160 161 162
        finally:
            self.connection_lock_release()

163 164
    def removeConnection(self, node):
        """Explicitly remove connection when a node is broken."""
Aurel's avatar
Aurel committed
165 166
        self.connection_lock_acquire()
        try:
167 168 169 170
            try:
                del self.connection_dict[node.getUUID()]
            except KeyError:
                pass
Aurel's avatar
Aurel committed
171 172
        finally:
            self.connection_lock_release()
173

174

175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
class ThreadContext(object):

    _threads_dict = {}

    def __getThreadData(self):
        id = get_ident()
        try:
            result = self._threads_dict[id]
        except KeyError:
            self.clear(id)
            result = self._threads_dict[id]
        return result

    def __getattr__(self, name):
        thread_data = self.__getThreadData()
        try:
            return thread_data[name]
        except KeyError:
            raise AttributeError, name
        
    def __setattr__(self, name, value):
        thread_data = self.__getThreadData()
        thread_data[name] = value

    def clear(self, id=None):
        if id is None:
            id = get_ident()
        self._threads_dict[id] = {
            'tid': None,
            'txn': None,
            'data_dict': {},
            'object_stored': 0,
            'txn_voted': False,
            'txn_finished': False,
209
            'queue': Queue(5),
210 211 212
        }


Aurel's avatar
Aurel committed
213
class Application(object):
214 215
    """The client node application."""

216
    def __init__(self, master_nodes, name, connector, **kw):
217
        logging.basicConfig(level = logging.DEBUG)
218
        logging.debug('master node address are %s' %(master_nodes,))
219 220 221
        em = EventManager()
        # Start polling thread
        self.poll_thread = ThreadedPoll(em)
222
        # Internal Attributes common to all thread
Aurel's avatar
Aurel committed
223
        self.name = name
224
        self.em = em
225
        self.connector_handler = getConnectorHandler(connector)
226
        self.dispatcher = Dispatcher()
227
        self.nm = NodeManager()
228
        self.cp = ConnectionPool(self)
229
        self.pt = None
230
        self.master_conn = None
231
        self.primary_master_node = None
232
        self.master_node_list = master_nodes.split(' ')
233 234
        # no self-assigned UUID, primary master will supply us one
        self.uuid = INVALID_UUID
235
        self.mq_cache = MQ()
236
        self.new_oid_list = []
237
        self.ptid = INVALID_PTID
238 239
        self.storage_handler = StorageAnswersHandler(self, self.dispatcher)
        self.primary_handler = PrimaryAnswersHandler(self, self.dispatcher)
240
        self.notifications_handler = PrimaryNotificationsHandler(self, self.dispatcher)
241
        # Internal attribute distinct between thread
242
        self.local_var = ThreadContext()
243
        # Lock definition :
244
        # _load_lock is used to make loading and storing atomic
245
        lock = Lock()
Aurel's avatar
Aurel committed
246 247
        self._load_lock_acquire = lock.acquire
        self._load_lock_release = lock.release
248 249
        # _oid_lock is used in order to not call multiple oid
        # generation at the same time
250 251
        lock = Lock()
        self._oid_lock_acquire = lock.acquire
Aurel's avatar
Aurel committed
252
        self._oid_lock_release = lock.release
253
        lock = Lock()
254
        # _cache_lock is used for the client cache
255 256
        self._cache_lock_acquire = lock.acquire
        self._cache_lock_release = lock.release
257
        lock = Lock()
258 259
        # _connecting_to_master_node is used to prevent simultaneous master
        # node connection attemps
260 261
        self._connecting_to_master_node_acquire = lock.acquire
        self._connecting_to_master_node_release = lock.release
262 263 264 265
        # _nm ensure exclusive access to the node manager
        lock = Lock()
        self._nm_acquire = lock.acquire
        self._nm_release = lock.release
266 267 268 269 270 271 272 273 274 275

    def _notifyDeadStorage(self, s_node):
        """ Notify a storage failure to the primary master """
        if s_node is None:
            return
        s_uuid = s_node.getUUID()
        ip_address, port = s_node.getServer()
        m_conn = self._getMasterConnection()
        m_conn.lock()
        try:
Aurel's avatar
Aurel committed
276
            node_list = [(STORAGE_NODE_TYPE, ip_address, port, s_uuid, s_node.getState())]
277 278 279
            m_conn.notify(protocol.notifyNodeInformation(node_list))
        finally:
            m_conn.unlock()
280

281
    def _waitMessage(self, target_conn = None, msg_id = None, handler=None):
282
        """Wait for a message returned by the dispatcher in queues."""
283
        local_queue = self.local_var.queue
284 285
        if handler is None:
            handler = self.notifications_handler
286
        while 1:
287 288
            if msg_id is None:
                try:
289
                    conn, packet = local_queue.get_nowait()
290 291 292 293 294
                except Empty:
                    break
            else:
                conn, packet = local_queue.get()
            # check fake packet
Yoshinori Okuji's avatar
Yoshinori Okuji committed
295
            if packet is None:
296 297
                s_node = self.nm.getNodeByServer(conn.getAddress())
                self._notifyDeadStorage(s_node)
298
                if conn.getUUID() == target_conn.getUUID():
299
                    raise NEOStorageConnectionFailure('connection closed')
Yoshinori Okuji's avatar
Yoshinori Okuji committed
300 301
                else:
                    continue
302
            handler.dispatch(conn, packet)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
303 304
            if target_conn is conn and msg_id == packet.getId() \
                    and packet.getType() & 0x8000:
305
                break
306

307 308 309 310
    def _askStorage(self, conn, packet, timeout=5, additional_timeout=30):
        """ Send a request to a storage node and process it's answer """
        try:
            msg_id = conn.ask(packet, timeout, additional_timeout)
311
            self.dispatcher.register(conn, msg_id, self.local_var.queue)
312 313 314 315 316 317 318
        finally:
            # assume that the connection was already locked
            conn.unlock()
        self._waitMessage(conn, msg_id, self.storage_handler)

    def _askPrimary(self, packet, timeout=5, additional_timeout=30):
        """ Send a request to the primary master and process it's answer """
319
        conn = self._getMasterConnection()
320 321 322
        conn.lock()
        try:
            msg_id = conn.ask(packet, timeout, additional_timeout)
323
            self.dispatcher.register(conn, msg_id, self.local_var.queue)
324 325 326 327
        finally:
            conn.unlock()
        self._waitMessage(conn, msg_id, self.primary_handler)

328 329
    def _getMasterConnection(self):
        """ Connect to the primary master node on demand """
330 331 332 333 334 335 336 337 338 339 340
        # acquire the lock to allow only one thread to connect to the primary 
        lock = self._connecting_to_master_node_acquire(True)
        try:
            if self.master_conn is None:    
                self.master_conn = self._connectToPrimaryMasterNode()
            return self.master_conn
        finally:
            self._connecting_to_master_node_release()

    def _getPartitionTable(self):
        """ Return the partition table manager, reconnect the PMN if needed """
341 342 343 344 345 346 347 348 349
        # this ensure the master connection is established and the partition
        # table is up to date.
        self._getMasterConnection()
        return self.pt

    def _getCellListForID(self, id, readable=False, writable=False):
        """ Return the cells available for the specified (O|T)ID """
        pt = self._getPartitionTable()
        return pt.getCellListForID(id, readable, writable)
350 351 352 353 354 355 356 357

    def _connectToPrimaryMasterNode(self):
        logging.debug('connecting to primary master...')
        master_index = 0
        # Make application execute remaining message if any
        self._waitMessage()
        while True:
            self.setNodeReady()
358
            if self.primary_master_node in (None, -1):
359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387
                # Try with master node defined in config
                try:
                    addr, port = self.master_node_list[master_index].split(':')                        
                except IndexError:
                    master_index = 0
                    addr, port = self.master_node_list[master_index].split(':')
                port = int(port)
            else:
                addr, port = self.primary_master_node.getServer()
            # Request Node Identification
            handler = PrimaryBootstrapHandler(self, self.dispatcher)
            conn = MTClientConnection(self.em, handler, (addr, port), 
                 connector_handler=self.connector_handler)
            self._nm_acquire()
            try:
                if self.nm.getNodeByServer((addr, port)) is None:
                    n = MasterNode(server = (addr, port))
                    self.nm.add(n)
            finally:
                self._nm_release()

            conn.lock()
            try:
                p = protocol.requestNodeIdentification(CLIENT_NODE_TYPE, 
                        self.uuid, '0.0.0.0', 0, self.name)
                msg_id = conn.ask(p)
                self.dispatcher.register(conn, msg_id, self.local_var.queue)
            finally:
                conn.unlock()
388

389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407
            # Wait for answer
            while 1:
                self._waitMessage(handler=handler)
                # Now check result
                if self.primary_master_node is not None:
                    if self.primary_master_node == -1:
                        # Connection failed, try with another master node
                        self.primary_master_node = None
                        master_index += 1
                        break
                    elif self.primary_master_node.getServer() != (addr, port):
                        # Master node changed, connect to new one
                        break
                    elif not self.isNodeReady():
                        # Wait a bit and reask again
                        break
                    elif self.pt is not None and self.pt.operational():
                        # Connected to primary master node
                        break
408
                    sleep(0.1)
409 410 411 412 413 414 415 416 417 418
            if self.pt is not None and self.pt.operational() \
                    and self.uuid != INVALID_UUID:
                # Connected to primary master node and got all informations
                break
            sleep(1)

        logging.info("connected to primary master node %s" % self.primary_master_node)
        conn.setHandler(PrimaryNotificationsHandler(self, self.dispatcher))
        return conn
        
419 420 421
    def registerDB(self, db, limit):
        self._db = db

422 423 424
    def getDB(self):
        return self._db

425 426 427 428 429
    def new_oid(self):
        """Get a new OID."""
        self._oid_lock_acquire()
        try:
            if len(self.new_oid_list) == 0:
430 431 432 433
                # Get new oid list from master node
                # we manage a list of oid here to prevent
                # from asking too many time new oid one by one
                # from master node
434
                self._askPrimary(protocol.askNewOIDs(100))
435 436
                if len(self.new_oid_list) <= 0:
                    raise NEOStorageError('new_oid failed')
Aurel's avatar
Aurel committed
437
            return self.new_oid_list.pop()
438 439 440
        finally:
            self._oid_lock_release()

441

Aurel's avatar
Aurel committed
442 443 444 445
    def getSerial(self, oid):
        # Try in cache first
        self._cache_lock_acquire()
        try:
446 447
            if oid in self.mq_cache:
                return self.mq_cache[oid][0]
Aurel's avatar
Aurel committed
448 449 450
        finally:
            self._cache_lock_release()
        # history return serial, so use it
451
        hist = self.history(oid, length = 1, object_only = 1)
Aurel's avatar
Aurel committed
452 453
        if len(hist) == 0:
            raise NEOStorageNotFoundError()
454 455 456 457
        if hist[0] != oid:
            raise NEOStorageError('getSerial failed')
        return hist[1][0][0]

Aurel's avatar
Aurel committed
458

459
    def _load(self, oid, serial = INVALID_TID, tid = INVALID_TID, cache = 0):
Aurel's avatar
Aurel committed
460
        """Internal method which manage load ,loadSerial and loadBefore."""
461
        cell_list = self._getCellListForID(oid, readable=True)
462 463 464 465 466 467 468 469 470 471 472 473 474
        if len(cell_list) == 0:
            # No cells available, so why are we running ?
            logging.error('oid %s not found because no storage is available for it', dump(oid))
            raise NEOStorageNotFoundError()

        shuffle(cell_list)
        self.local_var.asked_object = 0
        for cell in cell_list:
            logging.debug('trying to load %s from %s',
                          dump(oid), dump(cell.getUUID()))
            conn = self.cp.getConnForNode(cell)
            if conn is None:
                continue
475

476
            try:
477
                self._askStorage(conn, protocol.askObject(oid, serial, tid))
478 479
            except NEOStorageConnectionFailure:
                continue
480

481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507
            if self.local_var.asked_object == -1:
                # OID not found
                break

            # Check data
            noid, start_serial, end_serial, compression, checksum, data \
                = self.local_var.asked_object
            if noid != oid:
                # Oops, try with next node
                logging.error('got wrong oid %s instead of %s from node %s',
                              noid, dump(oid), cell.getServer())
                self.local_var.asked_object = -1
                continue
            elif checksum != makeChecksum(data):
                # Check checksum.
                logging.error('wrong checksum from node %s for oid %s',
                              cell.getServer(), dump(oid))
                self.local_var.asked_object = -1
                continue
            else:
                # Everything looks alright.
                break

        if self.local_var.asked_object == 0:
            # We didn't got any object from all storage node because of connection error
            logging.warning('oid %s not found because of connection failure', dump(oid))
            raise NEOStorageNotFoundError()
508

Aurel's avatar
Aurel committed
509
        if self.local_var.asked_object == -1:
510
            # We didn't got any object from all storage node
511
            logging.info('oid %s not found', dump(oid))
512
            raise NEOStorageNotFoundError()
513

514
        # Uncompress data
Aurel's avatar
Aurel committed
515
        if compression:
516
            data = decompress(data)
517

Aurel's avatar
Aurel committed
518 519
        # Put in cache only when using load
        if cache:
Aurel's avatar
Aurel committed
520
            self._cache_lock_acquire()
Aurel's avatar
Aurel committed
521
            try:
522
                self.mq_cache[oid] = start_serial, data
Aurel's avatar
Aurel committed
523
            finally:
Aurel's avatar
Aurel committed
524
                self._cache_lock_release()
Aurel's avatar
Aurel committed
525 526
        if end_serial == INVALID_SERIAL:
            end_serial = None
527
        return data, start_serial, end_serial
528

529

530 531 532
    def load(self, oid, version=None):
        """Load an object for a given oid."""
        # First try from cache
Aurel's avatar
Aurel committed
533
        self._load_lock_acquire()
534
        try:
Aurel's avatar
Aurel committed
535 536 537 538
            self._cache_lock_acquire()
            try:
                if oid in self.mq_cache:
                    logging.debug('load oid %s is cached', dump(oid))
539
                    return self.mq_cache[oid][1], self.mq_cache[oid][0]
Aurel's avatar
Aurel committed
540 541 542 543
            finally:
                self._cache_lock_release()
            # Otherwise get it from storage node
            return self._load(oid, cache=1)[:2]
544
        finally:
Aurel's avatar
Aurel committed
545
            self._load_lock_release()
Aurel's avatar
Aurel committed
546

547

548
    def loadSerial(self, oid, serial):
Aurel's avatar
Aurel committed
549
        """Load an object for a given oid and serial."""
550 551
        # Do not try in cache as it manages only up-to-date object
        logging.debug('loading %s at %s', dump(oid), dump(serial))
552
        return self._load(oid, serial=serial)[0]
Aurel's avatar
Aurel committed
553

554

555
    def loadBefore(self, oid, tid):
Aurel's avatar
Aurel committed
556
        """Load an object for a given oid before tid committed."""
557
        # Do not try in cache as it manages only up-to-date object
558 559
        if tid is None:
            tid = INVALID_TID
560
        logging.debug('loading %s before %s', dump(oid), dump(tid))
561
        data, start, end = self._load(oid, tid=tid)
562 563 564 565 566
        if end is None:
            # No previous version
            return None
        else:
            return data, start, end
Aurel's avatar
Aurel committed
567

568

569 570 571
    def tpc_begin(self, transaction, tid=None, status=' '):
        """Begin a new transaction."""
        # First get a transaction, only one is allowed at a time
572
        if self.local_var.txn is transaction:
573
            # We already begin the same transaction
574 575 576
            return
        # Get a new transaction id if necessary
        if tid is None:
577
            self.local_var.tid = None
578
            self._askPrimary(protocol.askNewTID())
579
            if self.local_var.tid is None:
580
                raise NEOStorageError('tpc_begin failed')
581
        else:
582 583
            self.local_var.tid = tid
        self.local_var.txn = transaction            
584

585

586 587
    def store(self, oid, serial, data, version, transaction):
        """Store object."""
588
        if transaction is not self.local_var.txn:
589
            raise StorageTransactionError(self, transaction)
590 591
        if serial is None:
            serial = INVALID_SERIAL
Aurel's avatar
Aurel committed
592
        logging.debug('storing oid %s serial %s',
593
                     dump(oid), dump(serial))
594
        # Find which storage node to use
595
        cell_list = self._getCellListForID(oid, writable=True)
596
        if len(cell_list) == 0:
Aurel's avatar
Aurel committed
597 598
            # FIXME must wait for cluster to be ready
            raise NEOStorageError
599
        # Store data on each node
600
        compressed_data = compress(data)
601
        checksum = makeChecksum(compressed_data)
602
        self.local_var.object_stored_counter = 0
603
        for cell in cell_list:
604
            #logging.info("storing object %s %s" %(cell.getServer(),cell.getState()))
605
            conn = self.cp.getConnForNode(cell)
606
            if conn is None:                
607
                continue
608

609 610 611
            self.local_var.object_stored = 0
            p = protocol.askStoreObject(oid, serial, 1,
                     checksum, compressed_data, self.local_var.tid)
612 613 614 615
            try:
                self._askStorage(conn, p)
            except NEOStorageConnectionFailure:
                continue
616 617

            # Check we don't get any conflict
618 619
            if self.local_var.object_stored[0] == -1:
                if self.local_var.data_dict.has_key(oid):
Aurel's avatar
Aurel committed
620 621 622 623
                    # One storage already accept the object, is it normal ??
                    # remove from dict and raise ConflictError, don't care of
                    # previous node which already store data as it would be resent
                    # again if conflict is resolved or txn will be aborted
624 625
                    del self.local_var.data_dict[oid]
                self.conflict_serial = self.local_var.object_stored[1]
626
                raise NEOStorageConflictError
627 628
            # increase counter so that we know if a node has stored the object or not
            self.local_var.object_stored_counter += 1
629

630 631 632 633
        if self.local_var.object_stored_counter == 0:
            # no storage nodes were available
            raise NEOStorageError('tpc_store failed')
        
634
        # Store object in tmp cache
635 636
        noid, nserial = self.local_var.object_stored
        self.local_var.data_dict[oid] = data
637

638
        return self.local_var.tid
639

Aurel's avatar
Aurel committed
640

641 642
    def tpc_vote(self, transaction):
        """Store current transaction."""
643
        if transaction is not self.local_var.txn:
644
            raise StorageTransactionError(self, transaction)
645 646
        user = transaction.user
        desc = transaction.description
Aurel's avatar
Aurel committed
647
        ext = dumps(transaction._extension)
648
        oid_list = self.local_var.data_dict.keys()
649
        # Store data on each node
650
        pt = self._getPartitionTable()
651
        cell_list = self._getCellListForID(self.local_var.tid, writable=True)
652
        self.local_var.voted_counter = 0
653
        for cell in cell_list:
Aurel's avatar
Aurel committed
654
            logging.info("voting object %s %s" %(cell.getServer(), cell.getState()))
655
            conn = self.cp.getConnForNode(cell)
656 657
            if conn is None:
                continue
658

659 660 661
            self.local_var.txn_voted = False
            p = protocol.askStoreTransaction(self.local_var.tid, 
                    user, desc, ext, oid_list)
662 663 664 665
            try:
                self._askStorage(conn, p)
            except NEOStorageConnectionFailure:
                continue
666

667
            if not self.isTransactionVoted():
668
                raise NEOStorageError('tpc_vote failed')
669 670 671 672 673
            self.local_var.voted_counter += 1

        # check at least one storage node accepted
        if self.local_var.voted_counter == 0:
            raise NEOStorageError('tpc_vote failed')
674

675 676
    def tpc_abort(self, transaction):
        """Abort current transaction."""
677
        if transaction is not self.local_var.txn:
678
            return
Aurel's avatar
Aurel committed
679

680
        cell_set = set()
681 682
        # select nodes where objects were stored
        for oid in self.local_var.data_dict.iterkeys():
683
            cell_set |= set(self._getCellListForID(oid, writable=True))
684
        # select nodes where transaction was stored
685
        cell_set |= set(self._getCellListForID(self.local_var.tid, writable=True))
Aurel's avatar
Aurel committed
686

687 688 689 690 691 692
        # cancel transaction one all those nodes
        for cell in cell_set:
            conn = self.cp.getConnForNode(cell)
            if conn is None:
                continue
            try:
693
                conn.notify(protocol.abortTransaction(self.local_var.tid))
694 695
            finally:
                conn.unlock()
696

697
        # Abort the transaction in the primary master node.
698
        conn = self._getMasterConnection()
699 700
        conn.lock()
        try:
701
            conn.notify(protocol.abortTransaction(self.local_var.tid))
702 703
        finally:
            conn.unlock()
704
        self.local_var.clear()
705

706 707
    def tpc_finish(self, transaction, f=None):
        """Finish current transaction."""
708
        if self.local_var.txn is not transaction:
709
            return
Aurel's avatar
Aurel committed
710
        self._load_lock_acquire()
711
        try:
Aurel's avatar
Aurel committed
712 713
            # Call function given by ZODB
            if f is not None:
714
                f(self.local_var.tid)
Aurel's avatar
Aurel committed
715 716

            # Call finish on master
717
            oid_list = self.local_var.data_dict.keys()
718 719
            p = protocol.finishTransaction(oid_list, self.local_var.tid)
            self._askPrimary(p)
720

721
            if not self.isTransactionFinished():
Aurel's avatar
Aurel committed
722
                raise NEOStorageError('tpc_finish failed')
Aurel's avatar
Aurel committed
723

Aurel's avatar
Aurel committed
724 725 726
            # Update cache
            self._cache_lock_acquire()
            try:
727 728
                for oid in self.local_var.data_dict.iterkeys():
                    data = self.local_var.data_dict[oid]
Aurel's avatar
Aurel committed
729
                    # Now serial is same as tid
730
                    self.mq_cache[oid] = self.local_var.tid, data
Aurel's avatar
Aurel committed
731 732
            finally:
                self._cache_lock_release()
733
            self.local_var.clear()
734
            return self.local_var.tid
735
        finally:
Aurel's avatar
Aurel committed
736
            self._load_lock_release()
737

Aurel's avatar
Aurel committed
738
    def undo(self, transaction_id, txn, wrapper):
739
        if txn is not self.local_var.txn:
740
            raise StorageTransactionError(self, transaction_id)
741

742
        # First get transaction information from a storage node.
743
        cell_list = self._getCellListForID(transaction_id, writable=True)
744 745 746
        shuffle(cell_list)
        for cell in cell_list:
            conn = self.cp.getConnForNode(cell)
747 748
            if conn is None:
                continue
749

750
            self.local_var.txn_info = 0
751 752 753 754
            try:
                self._askStorage(conn, protocol.askTransactionInformation(transaction_id))
            except NEOStorageConnectionFailure:
                continue
755

756 757 758
            if self.local_var.txn_info == -1:
                # Tid not found, try with next node
                continue
759
            elif isinstance(self.local_var.txn_info, dict):
760 761 762 763
                break
            else:
                raise NEOStorageError('undo failed')

764
        if self.local_var.txn_info in (-1, 0):
765 766 767
            raise NEOStorageError('undo failed')

        oid_list = self.local_var.txn_info['oids']
Aurel's avatar
Aurel committed
768 769 770
        # Second get object data from storage node using loadBefore
        data_dict = {}
        for oid in oid_list:
771 772 773 774 775
            try:
                result = self.loadBefore(oid, transaction_id)
            except NEOStorageNotFoundError:
                # no previous revision, can't undo (as in filestorage)
                raise UndoError("no previous record", oid)
776
            data, start, end = result
Aurel's avatar
Aurel committed
777 778 779
            # end must be TID we are going to undone otherwise it means
            # a later transaction modify the object
            if end != transaction_id:
780
                raise UndoError("non-undoable transaction", oid)
Aurel's avatar
Aurel committed
781
            data_dict[oid] = data
Aurel's avatar
Aurel committed
782

783
        # Third do transaction with old data
784 785
        oid_list = data_dict.keys()
        for oid in oid_list:
Aurel's avatar
Aurel committed
786
            data = data_dict[oid]
Aurel's avatar
Aurel committed
787
            try:
788
                self.store(oid, transaction_id, data, None, txn)
Aurel's avatar
Aurel committed
789
            except NEOStorageConflictError, serial:
790 791
                if serial <= self.local_var.tid:
                    new_data = wrapper.tryToResolveConflict(oid, self.local_var.tid,
792
                                                            serial, data)
Aurel's avatar
Aurel committed
793
                    if new_data is not None:
794
                        self.store(oid, self.local_var.tid, new_data, None, txn)
Aurel's avatar
Aurel committed
795
                        continue
796
                raise ConflictError(oid = oid, serials = (self.local_var.tid, serial),
797
                                    data = data)
798
        return self.local_var.tid, oid_list
799

800
    def undoLog(self, first, last, filter=None, block=0):
801 802 803 804
        if last < 0:
            # See FileStorage.py for explanation
            last = first - last

805
        # First get a list of transactions from all storage nodes.
806 807
        # Each storage node will return TIDs only for UP_TO_DATE_STATE and
        # FEEDING_STATE cells
808 809
        pt = self._getPartitionTable()
        storage_node_list = pt.getNodeList()
810

811 812
        self.local_var.node_tids = {}
        for storage_node in storage_node_list:
813
            conn = self.cp.getConnForNode(storage_node)
814 815
            if conn is None:
                continue
816

817
            try:
818 819
                p = protocol.askTIDs(first, last, INVALID_PARTITION)
                msg_id = conn.ask(p)
820
                self.dispatcher.register(conn, msg_id, self.local_var.queue)
821 822 823 824 825
            finally:
                conn.unlock()

        # Wait for answers from all storages.
        # FIXME this is a busy loop.
826
        while True:
827 828 829 830
            try:
                self._waitMessage(handler=self.storage_handler)
            except NEOStorageConnectionFailure:
                continue
Aurel's avatar
Aurel committed
831
            if len(self.local_var.node_tids.keys()) == len(storage_node_list):
832 833 834 835 836
                break

        # Reorder tids
        ordered_tids = []
        for tids in self.local_var.node_tids.values():
Aurel's avatar
Aurel committed
837
            ordered_tids.extend(tids)
838 839
        # XXX do we need a special cmp function here ?
        ordered_tids.sort(reverse=True)
Aurel's avatar
Aurel committed
840
        logging.info("UndoLog, tids %s", ordered_tids)
841 842 843
        # For each transaction, get info
        undo_info = []
        for tid in ordered_tids:
844
            cell_list = self._getCellListForID(tid, readable=True)
845 846 847
            shuffle(cell_list)
            for cell in cell_list:
                conn = self.cp.getConnForNode(storage_node)
848 849
                if conn is None:
                    continue
850

851
                self.local_var.txn_info = 0
852 853 854 855
                try:
                    self._askStorage(conn, protocol.askTransactionInformation(tid))
                except NEOStorageConnectionFailure:
                    continue
856

857 858 859
                if self.local_var.txn_info == -1:
                    # TID not found, go on with next node
                    continue
860
                elif isinstance(self.local_var.txn_info, dict):
861
                    break
862

863
            if self.local_var.txn_info in (-1, 0):
Aurel's avatar
Aurel committed
864 865 866
                # TID not found at all
                continue

867 868 869
            # Filter result if needed
            if filter is not None:
                # Filter method return True if match
870
                if not filter(self.local_var.txn_info):
871 872 873 874 875
                    continue

            # Append to returned list
            self.local_var.txn_info.pop("oids")
            undo_info.append(self.local_var.txn_info)
876
            if len(undo_info) >= last - first:
877
                break
878 879 880 881
        # Check we return at least one element, otherwise call
        # again but extend offset
        if len(undo_info) == 0 and not block:
            undo_info = self.undoLog(first=first, last=last*5, filter=filter, block=1)
882 883
        return undo_info

884
    # FIXME: filter function isn't used 
885
    def history(self, oid, version=None, length=1, filter=None, object_only=0):
886
        # Get history informations for object first
887
        cell_list = self._getCellListForID(oid, readable=True)
888 889 890 891
        shuffle(cell_list)

        for cell in cell_list:
            conn = self.cp.getConnForNode(cell)
892 893
            if conn is None:
                continue
894

895
            self.local_var.history = None
896 897 898 899
            try:
                self._askStorage(conn, protocol.askObjectHistory(oid, 0, length))
            except NEOStorageConnectionFailure:
                continue
900

901 902 903 904 905 906
            if self.local_var.history == -1:
                # Not found, go on with next node
                continue
            if self.local_var.history[0] != oid:
                # Got history for wrong oid
                continue
907

908
        if not isinstance(self.local_var.history, tuple):
909 910 911 912 913 914
            raise NEOStorageError('history failed')
        if object_only:
            # Use by getSerial
            return self.local_var.history

        # Now that we have object informations, get txn informations
Aurel's avatar
Aurel committed
915
        history_list = []
916
        for serial, size in self.local_var.history[1]:
917
            self._getCellListForID(serial, readable=True)
918 919 920 921
            shuffle(cell_list)

            for cell in cell_list:
                conn = self.cp.getConnForNode(cell)
922 923
                if conn is None:
                    continue
924

925 926
                # ask transaction information
                self.local_var.txn_info = None
927 928 929 930
                try:
                    self._askStorage(conn, protocol.askTransactionInformation(serial))
                except NEOStorageConnectionFailure:
                    continue
931

932 933
                if self.local_var.txn_info == -1:
                    # TID not found
Aurel's avatar
Aurel committed
934
                    continue
935
                if isinstance(self.local_var.txn_info, dict):
936 937 938
                    break

            # create history dict
939 940
            self.local_var.txn_info.pop('id')
            self.local_var.txn_info.pop('oids')
941
            self.local_var.txn_info['tid'] = serial
942 943 944
            self.local_var.txn_info['version'] = None
            self.local_var.txn_info['size'] = size
            history_list.append(self.local_var.txn_info)
945 946

        return history_list
Aurel's avatar
Aurel committed
947

948 949 950 951 952 953
    def __del__(self):
        """Clear all connection."""
        # Due to bug in ZODB, close is not always called when shutting
        # down zope, so use __del__ to close connections
        for conn in self.em.getConnectionList():
            conn.close()
954 955
        # Stop polling thread
        self.poll_thread.stop()
956
    close = __del__
957 958

    def sync(self):
959
        self._waitMessage()
960

961 962 963 964 965 966 967 968 969
    def setNodeReady(self):
        self.local_var.node_ready = True

    def setNodeNotReady(self):
        self.local_var.node_ready = False

    def isNodeReady(self):
        return self.local_var.node_ready

970
    def setTID(self, value):
971
        self.local_var.tid = value
972 973

    def getTID(self):
974
        return self.local_var.tid
975 976 977 978 979

    def getConflictSerial(self):
        return self.conflict_serial

    def setTransactionFinished(self):
980
        self.local_var.txn_finished = True
981 982

    def isTransactionFinished(self):
983
        return self.local_var.txn_finished
984 985

    def setTransactionVoted(self):
986
        self.local_var.txn_voted = True
987 988

    def isTransactionVoted(self):
989
        return self.local_var.txn_voted
990