app.py 34.7 KB
Newer Older
1
#
Grégory Wisniewski's avatar
Grégory Wisniewski committed
2
# Copyright (C) 2006-2010  Nexedi SA
3
#
Aurel's avatar
Aurel committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
Aurel's avatar
Aurel committed
9 10 11 12 13 14 15
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
Aurel's avatar
Aurel committed
17

18
from thread import get_ident
19
from cPickle import dumps
20
from zlib import compress, decompress
21
from Queue import Queue, Empty
22
from random import shuffle
Yoshinori Okuji's avatar
Yoshinori Okuji committed
23
from time import sleep
24

25 26
from ZODB.POSException import UndoError, StorageTransactionError, ConflictError

27 28 29 30
from neo import setupLog
setupLog('CLIENT', verbose=True)

from neo import logging
31
from neo import protocol
32
from neo.protocol import NodeTypes, Packets
33
from neo.event import EventManager
34
from neo.util import makeChecksum, dump
35
from neo.locking import Lock
36
from neo.connection import MTClientConnection
37
from neo.node import NodeManager
38
from neo.connector import getConnectorHandler
39
from neo.client.exception import NEOStorageError, NEOStorageConflictError
40
from neo.client.exception import NEOStorageNotFoundError, ConnectionClosed
41 42
from neo.exception import NeoException
from neo.client.handlers import storage, master
43
from neo.dispatcher import Dispatcher
44
from neo.client.poll import ThreadedPoll
45 46
from neo.client.iterator import Iterator
from neo.client.mq import MQ
47
from neo.client.pool import ConnectionPool
48
from neo.util import u64, parseMasterList
49

Aurel's avatar
Aurel committed
50

51 52 53 54
class ThreadContext(object):

    _threads_dict = {}

55 56 57 58 59 60 61 62 63 64
    def __init__(self):
        self.txn_info = 0
        self.history = None
        self.node_tids = {}
        self.node_ready = False
        self.conflict_serial = 0
        self.asked_object = 0
        self.object_stored_counter = 0
        self.voted_counter = 0

65
    def __getThreadData(self):
66
        thread_id = get_ident()
67
        try:
Grégory Wisniewski's avatar
Grégory Wisniewski committed
68
            result = self._threads_dict[thread_id]
69
        except KeyError:
70 71
            self.clear(thread_id)
            result = self._threads_dict[thread_id]
72 73 74 75 76 77 78 79
        return result

    def __getattr__(self, name):
        thread_data = self.__getThreadData()
        try:
            return thread_data[name]
        except KeyError:
            raise AttributeError, name
80

81 82 83 84
    def __setattr__(self, name, value):
        thread_data = self.__getThreadData()
        thread_data[name] = value

85 86 87 88
    def clear(self, thread_id=None):
        if thread_id is None:
            thread_id = get_ident()
        self._threads_dict[thread_id] = {
89 90 91 92 93 94
            'tid': None,
            'txn': None,
            'data_dict': {},
            'object_stored': 0,
            'txn_voted': False,
            'txn_finished': False,
95
            'queue': Queue(5),
96 97 98
        }


Aurel's avatar
Aurel committed
99
class Application(object):
100 101
    """The client node application."""

102
    def __init__(self, master_nodes, name, connector=None, **kw):
103
        # Start polling thread
104 105
        self.em = EventManager()
        self.poll_thread = ThreadedPoll(self.em)
106
        # Internal Attributes common to all thread
107
        self._db = None
Aurel's avatar
Aurel committed
108
        self.name = name
109
        self.connector_handler = getConnectorHandler(connector)
110
        self.dispatcher = Dispatcher()
111
        self.nm = NodeManager()
112
        self.cp = ConnectionPool(self)
113
        self.pt = None
114
        self.master_conn = None
115
        self.primary_master_node = None
116
        self.trying_master_node = None
117 118

        # load master node list
119
        for address in parseMasterList(master_nodes):
120
            self.nm.createMaster(address=address)
121

122
        # no self-assigned UUID, primary master will supply us one
123
        self.uuid = None
124
        self.mq_cache = MQ()
125
        self.new_oid_list = []
126
        self.last_oid = '\0' * 8
127
        self.storage_event_handler = storage.StorageEventHandler(self)
128
        self.storage_bootstrap_handler = storage.StorageBootstrapHandler(self)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
129
        self.storage_handler = storage.StorageAnswersHandler(self)
130 131
        self.primary_handler = master.PrimaryAnswersHandler(self)
        self.primary_bootstrap_handler = master.PrimaryBootstrapHandler(self)
132
        self.notifications_handler = master.PrimaryNotificationsHandler( self)
133
        # Internal attribute distinct between thread
134
        self.local_var = ThreadContext()
135
        # Lock definition :
136
        # _load_lock is used to make loading and storing atomic
137
        lock = Lock()
138 139
        self._load_lock_acquire = lock.acquire
        self._load_lock_release = lock.release
140 141
        # _oid_lock is used in order to not call multiple oid
        # generation at the same time
142 143
        lock = Lock()
        self._oid_lock_acquire = lock.acquire
Aurel's avatar
Aurel committed
144
        self._oid_lock_release = lock.release
145
        lock = Lock()
146
        # _cache_lock is used for the client cache
147 148
        self._cache_lock_acquire = lock.acquire
        self._cache_lock_release = lock.release
149
        lock = Lock()
150 151
        # _connecting_to_master_node is used to prevent simultaneous master
        # node connection attemps
152 153
        self._connecting_to_master_node_acquire = lock.acquire
        self._connecting_to_master_node_release = lock.release
154 155 156 157
        # _nm ensure exclusive access to the node manager
        lock = Lock()
        self._nm_acquire = lock.acquire
        self._nm_release = lock.release
158

159
    def _waitMessage(self, target_conn = None, msg_id = None, handler=None):
160
        """Wait for a message returned by the dispatcher in queues."""
161
        local_queue = self.local_var.queue
162
        while True:
163 164
            if msg_id is None:
                try:
165
                    conn, packet = local_queue.get_nowait()
166 167 168 169 170
                except Empty:
                    break
            else:
                conn, packet = local_queue.get()
            # check fake packet
Yoshinori Okuji's avatar
Yoshinori Okuji committed
171
            if packet is None:
172
                if conn.getUUID() == target_conn.getUUID():
173
                    raise ConnectionClosed
Yoshinori Okuji's avatar
Yoshinori Okuji committed
174 175
                else:
                    continue
176 177 178
            # Guess the handler to use based on the type of node on the
            # connection
            if handler is None:
179
                node = self.nm.getByAddress(conn.getAddress())
180 181 182 183
                if node is None:
                    raise ValueError, 'Expecting an answer from a node ' \
                        'which type is not known... Is this right ?'
                else:
184
                    if node.isStorage():
185
                        handler = self.storage_handler
186
                    elif node.isMaster():
187 188 189
                        handler = self.primary_handler
                    else:
                        raise ValueError, 'Unknown node type: %r' % (
190
                            node.__class__, )
191
            handler.dispatch(conn, packet)
192
            if target_conn is conn and msg_id == packet.getId():
193
                break
194

195 196 197
    def _askStorage(self, conn, packet, timeout=5, additional_timeout=30):
        """ Send a request to a storage node and process it's answer """
        try:
198 199
            msg_id = conn.ask(self.local_var.queue, packet, timeout,
                              additional_timeout)
200 201 202 203 204 205 206
        finally:
            # assume that the connection was already locked
            conn.unlock()
        self._waitMessage(conn, msg_id, self.storage_handler)

    def _askPrimary(self, packet, timeout=5, additional_timeout=30):
        """ Send a request to the primary master and process it's answer """
207
        conn = self._getMasterConnection()
208 209
        conn.lock()
        try:
210 211
            msg_id = conn.ask(self.local_var.queue, packet, timeout,
                              additional_timeout)
212 213 214 215
        finally:
            conn.unlock()
        self._waitMessage(conn, msg_id, self.primary_handler)

216 217
    def _getMasterConnection(self):
        """ Connect to the primary master node on demand """
218
        # acquire the lock to allow only one thread to connect to the primary
219
        self._connecting_to_master_node_acquire()
220
        try:
221
            if self.master_conn is None:
222
                self.new_oid_list = []
223
                self.master_conn = self._connectToPrimaryNode()
224 225 226 227
            return self.master_conn
        finally:
            self._connecting_to_master_node_release()

228
    def _getPartitionTable(self):
229
        """ Return the partition table manager, reconnect the PMN if needed """
230 231 232 233 234
        # this ensure the master connection is established and the partition
        # table is up to date.
        self._getMasterConnection()
        return self.pt

235 236
    def _getCellListForOID(self, oid, readable=False, writable=False):
        """ Return the cells available for the specified OID """
237
        pt = self._getPartitionTable()
238 239 240 241
        return pt.getCellListForOID(oid, readable, writable)

    def _getCellListForTID(self, tid, readable=False, writable=False):
        """ Return the cells available for the specified TID """
242
        pt = self._getPartitionTable()
243
        return pt.getCellListForTID(tid, readable, writable)
244

245
    def _connectToPrimaryNode(self):
246
        logging.debug('connecting to primary master...')
247 248 249 250 251 252 253 254 255 256
        ready = False
        nm = self.nm
        while not ready:
            # Get network connection to primary master
            index = 0
            connected = False
            while not connected:
                if self.primary_master_node is not None:
                    # If I know a primary master node, pinpoint it.
                    self.trying_master_node = self.primary_master_node
257
                    self.primary_master_node = None
258 259
                else:
                    # Otherwise, check one by one.
260
                    master_list = nm.getMasterList()
261 262 263
                    try:
                        self.trying_master_node = master_list[index]
                    except IndexError:
264
                        sleep(1)
265 266 267 268
                        index = 0
                        self.trying_master_node = master_list[0]
                    index += 1
                # Connect to master
269
                conn = MTClientConnection(self.em, self.notifications_handler,
270 271 272
                        addr=self.trying_master_node.getAddress(),
                        connector_handler=self.connector_handler,
                        dispatcher=self.dispatcher)
273 274
                # Query for primary master node
                conn.lock()
275
                try:
276
                    if conn.getConnector() is None:
277
                        # This happens if a connection could not be established.
278 279 280
                        logging.error('Connection to master node %s failed',
                                      self.trying_master_node)
                        continue
281
                    msg_id = conn.ask(self.local_var.queue,
282
                            Packets.AskPrimary())
283 284
                finally:
                    conn.unlock()
285
                try:
286
                    self._waitMessage(conn, msg_id,
287
                            handler=self.primary_bootstrap_handler)
288 289
                except ConnectionClosed:
                    continue
290
                # If we reached the primary master node, mark as connected
291 292
                connected = self.primary_master_node is not None and \
                        self.primary_master_node is self.trying_master_node
293

294
            logging.info('connected to a primary master node')
295
            # Identify to primary master and request initial data
296 297 298
            while conn.getUUID() is None:
                conn.lock()
                try:
299 300 301 302 303
                    if conn.getConnector() is None:
                        logging.error('Connection to master node %s lost',
                                      self.trying_master_node)
                        self.primary_master_node = None
                        break
304
                    p = Packets.RequestIdentification(NodeTypes.CLIENT,
305
                            self.uuid, None, self.name)
306
                    msg_id = conn.ask(self.local_var.queue, p)
307 308
                finally:
                    conn.unlock()
309
                try:
310
                    self._waitMessage(conn, msg_id,
311
                            handler=self.primary_bootstrap_handler)
312 313 314
                except ConnectionClosed:
                    self.primary_master_node = None
                    break
315
                if conn.getUUID() is None:
316
                    # Node identification was refused by master.
317
                    sleep(1)
318
            if self.uuid is not None:
319 320
                conn.lock()
                try:
321
                    msg_id = conn.ask(self.local_var.queue,
322
                                      Packets.AskNodeInformation())
323 324
                finally:
                    conn.unlock()
325
                self._waitMessage(conn, msg_id,
326
                        handler=self.primary_bootstrap_handler)
327 328
                conn.lock()
                try:
329
                    msg_id = conn.ask(self.local_var.queue,
330
                                      Packets.AskPartitionTable([]))
331 332
                finally:
                    conn.unlock()
333
                self._waitMessage(conn, msg_id,
334
                        handler=self.primary_bootstrap_handler)
335
            ready = self.uuid is not None and self.pt is not None \
336
                                 and self.pt.operational()
337
        logging.info("connected to primary master node %s" %
338
                self.primary_master_node)
339
        return conn
340

341 342 343
    def registerDB(self, db, limit):
        self._db = db

344 345 346
    def getDB(self):
        return self._db

347 348 349 350 351
    def new_oid(self):
        """Get a new OID."""
        self._oid_lock_acquire()
        try:
            if len(self.new_oid_list) == 0:
352 353 354 355
                # Get new oid list from master node
                # we manage a list of oid here to prevent
                # from asking too many time new oid one by one
                # from master node
356
                self._askPrimary(Packets.AskNewOIDs(100))
357 358
                if len(self.new_oid_list) <= 0:
                    raise NEOStorageError('new_oid failed')
359 360
            self.last_oid = self.new_oid_list.pop()
            return self.last_oid
361 362 363
        finally:
            self._oid_lock_release()

364 365 366
    def getStorageSize(self):
        # return the last OID used, this is innacurate
        return int(u64(self.last_oid))
367

Aurel's avatar
Aurel committed
368 369 370 371
    def getSerial(self, oid):
        # Try in cache first
        self._cache_lock_acquire()
        try:
372 373
            if oid in self.mq_cache:
                return self.mq_cache[oid][0]
Aurel's avatar
Aurel committed
374 375 376
        finally:
            self._cache_lock_release()
        # history return serial, so use it
377
        hist = self.history(oid, length = 1, object_only = 1)
Aurel's avatar
Aurel committed
378 379
        if len(hist) == 0:
            raise NEOStorageNotFoundError()
380 381 382 383
        if hist[0] != oid:
            raise NEOStorageError('getSerial failed')
        return hist[1][0][0]

Aurel's avatar
Aurel committed
384

385
    def _load(self, oid, serial=None, tid=None, cache=0):
Aurel's avatar
Aurel committed
386
        """Internal method which manage load ,loadSerial and loadBefore."""
387
        cell_list = self._getCellListForOID(oid, readable=True)
388 389
        if len(cell_list) == 0:
            # No cells available, so why are we running ?
390 391
            logging.error('oid %s not found because no storage is ' \
                    'available for it', dump(oid))
392 393 394 395 396 397 398
            raise NEOStorageNotFoundError()

        shuffle(cell_list)
        self.local_var.asked_object = 0
        for cell in cell_list:
            logging.debug('trying to load %s from %s',
                          dump(oid), dump(cell.getUUID()))
399
            conn = self.cp.getConnForCell(cell)
400 401
            if conn is None:
                continue
402

403
            try:
404
                self._askStorage(conn, Packets.AskObject(oid, serial, tid))
405
            except ConnectionClosed:
406
                continue
407

408 409 410 411 412 413 414 415 416 417
            if self.local_var.asked_object == -1:
                # OID not found
                break

            # Check data
            noid, start_serial, end_serial, compression, checksum, data \
                = self.local_var.asked_object
            if noid != oid:
                # Oops, try with next node
                logging.error('got wrong oid %s instead of %s from node %s',
418
                              noid, dump(oid), cell.getAddress())
419 420 421 422 423
                self.local_var.asked_object = -1
                continue
            elif checksum != makeChecksum(data):
                # Check checksum.
                logging.error('wrong checksum from node %s for oid %s',
424
                              cell.getAddress(), dump(oid))
425 426 427 428 429 430 431
                self.local_var.asked_object = -1
                continue
            else:
                # Everything looks alright.
                break

        if self.local_var.asked_object == 0:
432
            # We didn't got any object from all storage node because of
433
            # connection error
434
            logging.warning('oid %s not found because of connection failure',
435
                    dump(oid))
436
            raise NEOStorageNotFoundError()
437

Aurel's avatar
Aurel committed
438
        if self.local_var.asked_object == -1:
439
            # We didn't got any object from all storage node
440
            logging.info('oid %s not found', dump(oid))
441
            raise NEOStorageNotFoundError()
442

443
        # Uncompress data
Aurel's avatar
Aurel committed
444
        if compression:
445
            data = decompress(data)
446

Aurel's avatar
Aurel committed
447 448
        # Put in cache only when using load
        if cache:
Aurel's avatar
Aurel committed
449
            self._cache_lock_acquire()
Aurel's avatar
Aurel committed
450
            try:
451
                self.mq_cache[oid] = start_serial, data
Aurel's avatar
Aurel committed
452
            finally:
Aurel's avatar
Aurel committed
453
                self._cache_lock_release()
454 455
        if data == '':
            data = None
456
        return data, start_serial, end_serial
457

458

459 460 461
    def load(self, oid, version=None):
        """Load an object for a given oid."""
        # First try from cache
462
        self._load_lock_acquire()
463
        try:
464 465 466 467
            self._cache_lock_acquire()
            try:
                if oid in self.mq_cache:
                    logging.debug('load oid %s is cached', dump(oid))
468 469
                    serial, data = self.mq_cache[oid]
                    return data, serial
470 471 472 473
            finally:
                self._cache_lock_release()
            # Otherwise get it from storage node
            return self._load(oid, cache=1)[:2]
474
        finally:
475
            self._load_lock_release()
Aurel's avatar
Aurel committed
476

477

478
    def loadSerial(self, oid, serial):
Aurel's avatar
Aurel committed
479
        """Load an object for a given oid and serial."""
480 481
        # Do not try in cache as it manages only up-to-date object
        logging.debug('loading %s at %s', dump(oid), dump(serial))
482
        return self._load(oid, serial=serial)[0]
Aurel's avatar
Aurel committed
483

484

485
    def loadBefore(self, oid, tid):
Aurel's avatar
Aurel committed
486
        """Load an object for a given oid before tid committed."""
487 488
        # Do not try in cache as it manages only up-to-date object
        logging.debug('loading %s before %s', dump(oid), dump(tid))
489
        data, start, end = self._load(oid, tid=tid)
490 491 492 493 494
        if end is None:
            # No previous version
            return None
        else:
            return data, start, end
Aurel's avatar
Aurel committed
495

496

497 498 499
    def tpc_begin(self, transaction, tid=None, status=' '):
        """Begin a new transaction."""
        # First get a transaction, only one is allowed at a time
500
        if self.local_var.txn is transaction:
501
            # We already begin the same transaction
502
            return
503 504 505 506
        # ask the primary master to start a transaction, if no tid is supplied,
        # the master will supply us one. Otherwise the requested tid will be
        # used if possible.
        self.local_var.tid = None
507
        self._askPrimary(Packets.AskBeginTransaction(tid))
508 509
        if self.local_var.tid is None:
            raise NEOStorageError('tpc_begin failed')
510
        self.local_var.txn = transaction
511

512

513 514
    def store(self, oid, serial, data, version, transaction):
        """Store object."""
515
        if transaction is not self.local_var.txn:
516
            raise StorageTransactionError(self, transaction)
Aurel's avatar
Aurel committed
517
        logging.debug('storing oid %s serial %s',
518
                     dump(oid), dump(serial))
519
        # Find which storage node to use
520
        cell_list = self._getCellListForOID(oid, writable=True)
521
        if len(cell_list) == 0:
Aurel's avatar
Aurel committed
522
            raise NEOStorageError
523 524 525
        if data is None:
            # this is a George Bailey object, stored as an empty string
            data = ''
526
        compressed_data = compress(data)
527
        checksum = makeChecksum(compressed_data)
528 529
        p = Packets.AskStoreObject(oid, serial, 1,
                 checksum, compressed_data, self.local_var.tid)
530
        # Store data on each node
531
        self.local_var.object_stored_counter = 0
532
        for cell in cell_list:
533
            conn = self.cp.getConnForCell(cell)
534
            if conn is None:
535
                continue
536

537
            self.local_var.object_stored = 0
538 539
            try:
                self._askStorage(conn, p)
540
            except ConnectionClosed:
541
                continue
542 543

            # Check we don't get any conflict
544 545
            if self.local_var.object_stored[0] == -1:
                if self.local_var.data_dict.has_key(oid):
Aurel's avatar
Aurel committed
546 547
                    # One storage already accept the object, is it normal ??
                    # remove from dict and raise ConflictError, don't care of
548 549
                    # previous node which already store data as it would be
                    # resent again if conflict is resolved or txn will be
550
                    # aborted
551
                    del self.local_var.data_dict[oid]
552
                self.local_var.conflict_serial = self.local_var.object_stored[1]
553
                raise NEOStorageConflictError
554
            # increase counter so that we know if a node has stored the object
555
            # or not
556
            self.local_var.object_stored_counter += 1
557

558 559 560
        if self.local_var.object_stored_counter == 0:
            # no storage nodes were available
            raise NEOStorageError('tpc_store failed')
561

562
        # Store object in tmp cache
563
        self.local_var.data_dict[oid] = data
564

565
        return self.local_var.tid
566

Aurel's avatar
Aurel committed
567

568 569
    def tpc_vote(self, transaction):
        """Store current transaction."""
570
        if transaction is not self.local_var.txn:
571
            raise StorageTransactionError(self, transaction)
572 573
        user = transaction.user
        desc = transaction.description
Aurel's avatar
Aurel committed
574
        ext = dumps(transaction._extension)
575
        oid_list = self.local_var.data_dict.keys()
576
        # Store data on each node
577
        cell_list = self._getCellListForTID(self.local_var.tid, writable=True)
578
        self.local_var.voted_counter = 0
579
        for cell in cell_list:
580
            logging.debug("voting object %s %s" %(cell.getAddress(),
581
                cell.getState()))
582
            conn = self.cp.getConnForCell(cell)
583 584
            if conn is None:
                continue
585

586
            self.local_var.txn_voted = False
587
            p = Packets.AskStoreTransaction(self.local_var.tid,
588
                    user, desc, ext, oid_list)
589 590
            try:
                self._askStorage(conn, p)
591
            except ConnectionClosed:
592
                continue
593

594
            if not self.isTransactionVoted():
595
                raise NEOStorageError('tpc_vote failed')
596 597 598 599 600
            self.local_var.voted_counter += 1

        # check at least one storage node accepted
        if self.local_var.voted_counter == 0:
            raise NEOStorageError('tpc_vote failed')
601 602 603 604 605
        # Check if master connection is still alive.
        # This is just here to lower the probability of detecting a problem
        # in tpc_finish, as we should do our best to detect problem before
        # tpc_finish.
        self._getMasterConnection()
606

607 608
    def tpc_abort(self, transaction):
        """Abort current transaction."""
609
        if transaction is not self.local_var.txn:
610
            return
Aurel's avatar
Aurel committed
611

612
        cell_set = set()
613 614
        # select nodes where objects were stored
        for oid in self.local_var.data_dict.iterkeys():
615
            cell_set |= set(self._getCellListForOID(oid, writable=True))
616
        # select nodes where transaction was stored
617
        cell_set |= set(self._getCellListForTID(self.local_var.tid,
618
            writable=True))
Aurel's avatar
Aurel committed
619

620 621
        # cancel transaction one all those nodes
        for cell in cell_set:
622
            conn = self.cp.getConnForCell(cell)
623 624 625
            if conn is None:
                continue
            try:
626
                conn.notify(Packets.AbortTransaction(self.local_var.tid))
627 628
            finally:
                conn.unlock()
629

630
        # Abort the transaction in the primary master node.
631
        conn = self._getMasterConnection()
632 633
        conn.lock()
        try:
634
            conn.notify(Packets.AbortTransaction(self.local_var.tid))
635 636
        finally:
            conn.unlock()
637
        self.local_var.clear()
638

639 640
    def tpc_finish(self, transaction, f=None):
        """Finish current transaction."""
641
        if self.local_var.txn is not transaction:
642
            return
643
        self._load_lock_acquire()
644
        try:
645 646
            # Call function given by ZODB
            if f is not None:
647
                f(self.local_var.tid)
648 649

            # Call finish on master
650
            oid_list = self.local_var.data_dict.keys()
651
            p = Packets.AskFinishTransaction(oid_list, self.local_var.tid)
652
            self._askPrimary(p)
653

654
            if not self.isTransactionFinished():
655
                raise NEOStorageError('tpc_finish failed')
Aurel's avatar
Aurel committed
656

657 658 659
            # Update cache
            self._cache_lock_acquire()
            try:
660 661
                for oid in self.local_var.data_dict.iterkeys():
                    data = self.local_var.data_dict[oid]
662
                    # Now serial is same as tid
663
                    self.mq_cache[oid] = self.local_var.tid, data
664 665
            finally:
                self._cache_lock_release()
666
            self.local_var.clear()
667
            return self.local_var.tid
668
        finally:
669
            self._load_lock_release()
670

Aurel's avatar
Aurel committed
671
    def undo(self, transaction_id, txn, wrapper):
672
        if txn is not self.local_var.txn:
673
            raise StorageTransactionError(self, transaction_id)
674

675
        # First get transaction information from a storage node.
676
        cell_list = self._getCellListForTID(transaction_id, writable=True)
677 678
        shuffle(cell_list)
        for cell in cell_list:
679
            conn = self.cp.getConnForCell(cell)
680 681
            if conn is None:
                continue
682

683
            self.local_var.txn_info = 0
684
            try:
685 686
                self._askStorage(conn, Packets.AskTransactionInformation(
                    transaction_id))
687
            except ConnectionClosed:
688
                continue
689

690 691 692
            if self.local_var.txn_info == -1:
                # Tid not found, try with next node
                continue
693
            elif isinstance(self.local_var.txn_info, dict):
694 695 696 697
                break
            else:
                raise NEOStorageError('undo failed')

698
        if self.local_var.txn_info in (-1, 0):
699 700 701
            raise NEOStorageError('undo failed')

        oid_list = self.local_var.txn_info['oids']
Aurel's avatar
Aurel committed
702 703 704
        # Second get object data from storage node using loadBefore
        data_dict = {}
        for oid in oid_list:
705 706 707 708 709
            try:
                result = self.loadBefore(oid, transaction_id)
            except NEOStorageNotFoundError:
                # no previous revision, can't undo (as in filestorage)
                raise UndoError("no previous record", oid)
710
            data, start, end = result
Aurel's avatar
Aurel committed
711 712 713
            # end must be TID we are going to undone otherwise it means
            # a later transaction modify the object
            if end != transaction_id:
714
                raise UndoError("non-undoable transaction", oid)
Aurel's avatar
Aurel committed
715
            data_dict[oid] = data
Aurel's avatar
Aurel committed
716

717
        # Third do transaction with old data
718 719
        oid_list = data_dict.keys()
        for oid in oid_list:
Aurel's avatar
Aurel committed
720
            data = data_dict[oid]
Aurel's avatar
Aurel committed
721
            try:
722
                self.store(oid, transaction_id, data, None, txn)
Aurel's avatar
Aurel committed
723
            except NEOStorageConflictError, serial:
724
                if serial <= self.local_var.tid:
725
                    new_data = wrapper.tryToResolveConflict(oid,
726
                            self.local_var.tid, serial, data)
Aurel's avatar
Aurel committed
727
                    if new_data is not None:
728
                        self.store(oid, self.local_var.tid, new_data, None, txn)
Aurel's avatar
Aurel committed
729
                        continue
730
                raise ConflictError(oid = oid, serials = (self.local_var.tid,
731
                    serial),
732
                                    data = data)
733
        return self.local_var.tid, oid_list
734

735
    def __undoLog(self, first, last, filter=None, block=0, with_oids=False):
736 737 738 739
        if last < 0:
            # See FileStorage.py for explanation
            last = first - last

740
        # First get a list of transactions from all storage nodes.
741 742
        # Each storage node will return TIDs only for UP_TO_DATE state and
        # FEEDING state cells
743
        pt = self._getPartitionTable()
744
        storage_node_list = pt.getNodeList()
745

746 747
        self.local_var.node_tids = {}
        for storage_node in storage_node_list:
748
            conn = self.cp.getConnForNode(storage_node)
749 750
            if conn is None:
                continue
751

752
            try:
753
                conn.ask(self.local_var.queue, Packets.AskTIDs(first, last,
754
                    protocol.INVALID_PARTITION))
755 756 757 758
            finally:
                conn.unlock()

        # Wait for answers from all storages.
759
        while len(self.local_var.node_tids) != len(storage_node_list):
760 761
            try:
                self._waitMessage(handler=self.storage_handler)
762
            except ConnectionClosed:
763
                continue
764 765

        # Reorder tids
766 767 768
        ordered_tids = set()
        update = ordered_tids.update
        for tid_list in self.local_var.node_tids.itervalues():
769
            update(tid_list)
770
        ordered_tids = list(ordered_tids)
771
        ordered_tids.sort(reverse=True)
772
        logging.debug("UndoLog, tids %s", ordered_tids)
773 774
        # For each transaction, get info
        undo_info = []
775
        append = undo_info.append
776
        for tid in ordered_tids:
777
            cell_list = self._getCellListForTID(tid, readable=True)
778 779
            shuffle(cell_list)
            for cell in cell_list:
780
                conn = self.cp.getConnForCell(cell)
781 782 783
                if conn is not None:
                    self.local_var.txn_info = 0
                    try:
784
                        self._askStorage(conn,
785
                                Packets.AskTransactionInformation(tid))
786
                    except ConnectionClosed:
787 788 789
                        continue
                    if isinstance(self.local_var.txn_info, dict):
                        break
790

791
            if self.local_var.txn_info in (-1, 0):
Aurel's avatar
Aurel committed
792
                # TID not found at all
793 794 795
                raise NeoException, 'Data inconsistency detected: ' \
                                    'transaction info for TID %r could not ' \
                                    'be found' % (tid, )
Aurel's avatar
Aurel committed
796

797
            if filter is None or filter(self.local_var.txn_info):
798 799
                if not with_oids:
                    self.local_var.txn_info.pop("oids")
800 801 802
                append(self.local_var.txn_info)
                if len(undo_info) >= last - first:
                    break
803 804 805
        # Check we return at least one element, otherwise call
        # again but extend offset
        if len(undo_info) == 0 and not block:
806
            undo_info = self.undoLog(first=first, last=last*5, filter=filter,
807
                    block=1)
808 809
        return undo_info

810
    def undoLog(self, first, last, filter=None, block=0):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
811
        return self.__undoLog(first, last, filter, block)
812 813

    def transactionLog(self, first, last):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
814
        return self.__undoLog(first, last, with_oids=True)
815

816
    def history(self, oid, version=None, length=1, filter=None, object_only=0):
817
        # Get history informations for object first
818
        cell_list = self._getCellListForOID(oid, readable=True)
819 820 821
        shuffle(cell_list)

        for cell in cell_list:
822
            conn = self.cp.getConnForCell(cell)
823 824
            if conn is None:
                continue
825

826
            self.local_var.history = None
827
            try:
828
                self._askStorage(conn, Packets.AskObjectHistory(oid, 0, length))
829
            except ConnectionClosed:
830
                continue
831

832 833 834 835 836
            if self.local_var.history == -1:
                # Not found, go on with next node
                continue
            if self.local_var.history[0] != oid:
                # Got history for wrong oid
837 838 839
                raise NEOStorageError('inconsistency in storage: asked oid ' \
                                      '%r, got %r' % (
                                      oid, self.local_var.history[0]))
840

841
        if not isinstance(self.local_var.history, tuple):
842 843 844 845 846 847
            raise NEOStorageError('history failed')
        if object_only:
            # Use by getSerial
            return self.local_var.history

        # Now that we have object informations, get txn informations
Aurel's avatar
Aurel committed
848
        history_list = []
849
        for serial, size in self.local_var.history[1]:
850
            self._getCellListForTID(serial, readable=True)
851 852 853
            shuffle(cell_list)

            for cell in cell_list:
854
                conn = self.cp.getConnForCell(cell)
855 856
                if conn is None:
                    continue
857

858 859
                # ask transaction information
                self.local_var.txn_info = None
860
                try:
861
                    self._askStorage(conn,
862
                            Packets.AskTransactionInformation(serial))
863
                except ConnectionClosed:
864
                    continue
865

866 867
                if self.local_var.txn_info == -1:
                    # TID not found
Aurel's avatar
Aurel committed
868
                    continue
869
                if isinstance(self.local_var.txn_info, dict):
870 871 872
                    break

            # create history dict
873 874
            self.local_var.txn_info.pop('id')
            self.local_var.txn_info.pop('oids')
875
            self.local_var.txn_info['tid'] = serial
876 877
            self.local_var.txn_info['version'] = None
            self.local_var.txn_info['size'] = size
878 879
            if filter is None or filter(self.local_var.txn_info):
                history_list.append(self.local_var.txn_info)
880 881

        return history_list
Aurel's avatar
Aurel committed
882

883 884 885
    def iterator(self, start=None, stop=None):
        return Iterator(self, start, stop)

886 887 888 889 890
    def __del__(self):
        """Clear all connection."""
        # Due to bug in ZODB, close is not always called when shutting
        # down zope, so use __del__ to close connections
        for conn in self.em.getConnectionList():
891
            conn.close()
892 893
        # Stop polling thread
        self.poll_thread.stop()
894
    close = __del__
895 896

    def sync(self):
897
        self._waitMessage()
898

899 900 901 902 903 904 905 906 907
    def setNodeReady(self):
        self.local_var.node_ready = True

    def setNodeNotReady(self):
        self.local_var.node_ready = False

    def isNodeReady(self):
        return self.local_var.node_ready

908
    def setTID(self, value):
909
        self.local_var.tid = value
910 911

    def getTID(self):
912
        return self.local_var.tid
913 914

    def getConflictSerial(self):
915
        return self.local_var.conflict_serial
916 917

    def setTransactionFinished(self):
918
        self.local_var.txn_finished = True
919 920

    def isTransactionFinished(self):
921
        return self.local_var.txn_finished
922 923

    def setTransactionVoted(self):
924
        self.local_var.txn_voted = True
925 926

    def isTransactionVoted(self):
927
        return self.local_var.txn_voted
928