iterator.py 5.44 KB
Newer Older
1
#
Grégory Wisniewski's avatar
Grégory Wisniewski committed
2
# Copyright (C) 2006-2010  Nexedi SA
3
#
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
9 10 11 12 13 14 15
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17 18

from ZODB import BaseStorage
19 20
from zope.interface import implements
import ZODB.interfaces
Grégory Wisniewski's avatar
Grégory Wisniewski committed
21
from neo import util
22
from neo.client.exception import NEOStorageCreationUndoneError
23
from neo.client.exception import NEOStorageNotFoundError
24 25 26 27

class Record(BaseStorage.DataRecord):
    """ TBaseStorageransaction record yielded by the Transaction object """

28
    def __init__(self, oid, tid, data, prev):
29
        BaseStorage.DataRecord.__init__(self, oid, tid, data, prev)
30 31 32 33 34 35 36 37 38 39 40

    def __str__(self):
        oid = util.u64(self.oid)
        tid = util.u64(self.tid)
        args = (oid, tid, len(self.data), self.data_txn)
        return 'Record %s:%s: %s (%s)' % args


class Transaction(BaseStorage.TransactionRecord):
    """ Transaction object yielded by the NEO iterator """

Vincent Pelletier's avatar
Vincent Pelletier committed
41 42
    def __init__(self, app, tid, status, user, desc, ext, oid_list,
            prev_serial_dict):
43 44
        BaseStorage.TransactionRecord.__init__( self, tid, status, user, desc,
            ext)
45 46
        self.app = app
        self.oid_list = oid_list
47
        self.oid_index = 0
48
        self.history = []
Vincent Pelletier's avatar
Vincent Pelletier committed
49
        self.prev_serial_dict = prev_serial_dict
50 51 52 53 54 55 56

    def __iter__(self):
        return self

    def next(self):
        """ Iterate over the transaction records """
        app = self.app
57 58
        oid_list = self.oid_list
        oid_index = self.oid_index
59 60 61 62 63
        oid_len = len(oid_list)
        # load an object
        while oid_index < oid_len:
            oid = oid_list[oid_index]
            try:
Vincent Pelletier's avatar
Vincent Pelletier committed
64
                data, _, next_tid = app.load(None, oid, serial=self.tid)
65 66 67 68 69 70 71 72 73 74 75
            except NEOStorageCreationUndoneError:
                data = next_tid = None
            except NEOStorageNotFoundError:
                # Transactions are not updated after a pack, so their object
                # will not be found in the database. Skip them.
                oid_list.pop(oid_index)
                oid_len -= 1
                continue
            oid_index += 1
            break
        else:
76
            # no more records for this transaction
77
            self.oid_index = 0
78
            raise StopIteration
79
        self.oid_index = oid_index
80
        record = Record(oid, self.tid, data,
Vincent Pelletier's avatar
Vincent Pelletier committed
81 82 83 84 85
            self.prev_serial_dict.get(oid))
        if next_tid is None:
            self.prev_serial_dict.pop(oid, None)
        else:
            self.prev_serial_dict[oid] = self.tid
86 87 88 89 90 91 92 93 94 95 96 97 98 99
        return record

    def __str__(self):
        tid = util.u64(self.tid)
        args = (tid, self.user, self.status)
        return 'Transaction #%s: %s %s' % args


class Iterator(object):
    """ An iterator for the NEO storage """

    def __init__(self, app, start, stop):
        self.app = app
        self.txn_list = []
100
        self._stop = stop
101 102 103 104
        # next index to load from storage nodes
        self._next = 0
        # index of current iteration
        self._index = 0
105
        self._closed = False
Vincent Pelletier's avatar
Vincent Pelletier committed
106 107 108
        # OID -> previous TID mapping
        # TODO: prune old entries while walking ?
        self._prev_serial_dict = {}
109 110
        if start is not None:
            self.txn_list = self._skip(start)
111 112 113 114

    def __iter__(self):
        return self

115 116 117 118 119 120
    def __getitem__(self, index):
        """ Simple index-based iterator """
        if index != self._index:
            raise IndexError, index
        return self.next()

121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
    def _read(self):
        """ Request more transactions """
        chunk = self.app.transactionLog(self._next, self._next + 100)
        if not chunk:
            # nothing more
            raise StopIteration
        self._next += len(chunk)
        return chunk

    def _skip(self, start):
        """ Skip transactions until 'start' is reached """
        chunk = self._read()
        while chunk[0]['id'] < start:
            chunk = self._read()
        if chunk[-1]['id'] < start:
            for index, txn in enumerate(reversed(chunk)):
                if txn['id'] >= start:
                    break
            # keep only greater transactions
            chunk = chunk[:-index]
        return chunk

143 144
    def next(self):
        """ Return an iterator for the next transaction"""
145 146
        if self._closed:
            raise IOError, 'iterator closed'
147
        if not self.txn_list:
148
            self.txn_list = self._read()
149
        txn = self.txn_list.pop()
150
        self._index += 1
151
        tid = txn['id']
152 153 154 155
        stop = self._stop
        if stop is not None and stop < tid:
            # stop reached
            raise StopIteration
156 157 158
        user = txn['user_name']
        desc = txn['description']
        oid_list = txn['oids']
159
        extension = txn['ext']
Vincent Pelletier's avatar
Vincent Pelletier committed
160 161
        txn = Transaction(self.app, tid, ' ', user, desc, extension, oid_list,
            self._prev_serial_dict)
162 163 164 165 166
        return txn

    def __str__(self):
        return 'NEO transactions iteratpr'

167
    def close(self):
168
        self._closed = True