replicator.py 10.8 KB
Newer Older
Aurel's avatar
Aurel committed
1
#
Grégory Wisniewski's avatar
Grégory Wisniewski committed
2
# Copyright (C) 2006-2010  Nexedi SA
3
#
Aurel's avatar
Aurel committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
Aurel's avatar
Aurel committed
9 10 11 12 13 14 15
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
Aurel's avatar
Aurel committed
17

18
from neo import logging
19 20
from random import choice

Grégory Wisniewski's avatar
Grégory Wisniewski committed
21
from neo.storage.handlers import replication
22
from neo.protocol import NodeTypes, NodeStates, CellStates, Packets, ZERO_TID
23
from neo.connection import ClientConnection
24
from neo.util import dump
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39

class Partition(object):
    """This class abstracts the state of a partition."""

    def __init__(self, rid):
        self.rid = rid
        self.tid = None

    def getRID(self):
        return self.rid

    def getCriticalTID(self):
        return self.tid

    def setCriticalTID(self, tid):
40
        if tid is None:
41
            tid = ZERO_TID
42 43
        self.tid = tid

44 45 46 47
    def safe(self, min_pending_tid):
        tid = self.tid
        return tid is not None and (
            min_pending_tid is None or tid < min_pending_tid)
48 49 50

class Replicator(object):
    """This class handles replications of objects and transactions.
51

52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
    Assumptions:

        - Client nodes recognize partition changes reasonably quickly.

        - When an out of date partition is added, next transaction ID
          is given after the change is notified and serialized.

    Procedures:

        - Get the last TID right after a partition is added. This TID
          is called a "critical TID", because this and TIDs before this
          may not be present in this storage node yet. After a critical
          TID, all transactions must exist in this storage node.

        - Check if a primary master node still has pending transactions
          before and at a critical TID. If so, I must wait for them to be
          committed or aborted.

        - In order to copy data, first get the list of TIDs. This is done
          part by part, because the list can be very huge. When getting
          a part of the list, I verify if they are in my database, and
          ask data only for non-existing TIDs. This is performed until
          the check reaches a critical TID.

        - Next, get the list of OIDs. And, for each OID, ask the history,
          namely, a list of serials. This is also done part by part, and
          I ask only non-existing data. """

Vincent Pelletier's avatar
Vincent Pelletier committed
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
    # new_partition_dict
    #   outdated partitions for which no critical tid was asked to primary
    #   master yet
    # critical_tid_dict
    #   outdated partitions for which a critical tid was asked to primary
    #   master, but not answered so far
    #   XXX: could probably be made a list/set rather than a dict
    # partition_dict
    #   outdated partitions (with or without a critical tid - if without, it
    #   was asked to primary master)
    # current_partition
    #   partition being currently synchronised
    #   XXX: accessed (r) directly by ReplicationHandler
    # current_connection
    #   connection to a storage node we are replicating from
    #   XXX: accessed (r) directory by ReplicationHandler
    # waiting_for_unfinished_tids
    #   unfinished_tid_list has been asked to primary master node, but it
    #   didn't answer yet.
    # unfinished_tid_list
    #   The list of unfinished TIDs known by master node.
    # oid_list
    #   List of OIDs to replicate. Doesn't contains currently-replicated
    #   object.
    #   XXX: not defined here
    #   XXX: accessed (r/w) directly by ReplicationHandler
    # next_oid
    #   Next OID to ask when oid_list is empty.
    #   XXX: not defined here
    #   XXX: accessed (r/w) directly by ReplicationHandler
    # replication_done
    #   False if we know there is something to replicate.
    #   True when current_partition is replicated, or we don't know yet if
    #   there is something to replicate
    #   XXX: accessed (w) directly by ReplicationHandler

116 117
    def __init__(self, app):
        self.app = app
118 119 120 121 122 123 124

    def populate(self):
        """
        Populate partitions to replicate. Must be called when partition
        table is the one accepted by primary master.
        Implies a reset.
        """
125
        self.new_partition_dict = self._getOutdatedPartitionList()
126
        self.critical_tid_dict = {}
127
        self.partition_dict = {}
128
        self.reset()
129 130 131 132 133 134 135 136 137 138 139

    def reset(self):
        """Reset attributes to restart replicating."""
        self.current_partition = None
        self.current_connection = None
        self.waiting_for_unfinished_tids = False
        self.unfinished_tid_list = None
        self.replication_done = True

    def _getOutdatedPartitionList(self):
        app = self.app
140
        partition_dict = {}
141
        for offset in xrange(app.pt.getPartitions()):
142
            for uuid, state in app.pt.getRow(offset):
143
                if uuid == app.uuid and state == CellStates.OUT_OF_DATE:
144 145
                    partition_dict[offset] = Partition(offset)
        return partition_dict
146 147 148

    def pending(self):
        """Return whether there is any pending partition."""
149
        return len(self.partition_dict) or len(self.new_partition_dict)
150

151
    def setCriticalTID(self, uuid, tid):
152
        """This is a callback from MasterOperationHandler."""
153
        try:
154
            partition_list = self.critical_tid_dict.pop(uuid)
155 156 157 158
        except KeyError:
            logging.debug("setCriticalTID raised KeyError for %s" %
                    (dump(uuid), ))
        else:
159
            logging.debug('setting critical TID %s to %s', dump(tid),
160
                         ', '.join([str(p.getRID()) for p in partition_list]))
161
            for partition in partition_list:
162 163 164
                partition.setCriticalTID(tid)

    def _askCriticalTID(self):
165
        conn = self.app.master_conn
166 167 168
        conn.ask(Packets.AskLastIDs())
        uuid = conn.getUUID()
        self.critical_tid_dict[uuid] = self.new_partition_dict.values()
169 170
        self.partition_dict.update(self.new_partition_dict)
        self.new_partition_dict = {}
171 172

    def setUnfinishedTIDList(self, tid_list):
173
        """This is a callback from MasterOperationHandler."""
174 175
        logging.debug('setting unfinished TIDs %s',
                      ','.join([dump(tid) for tid in tid_list]))
176 177 178 179
        self.waiting_for_unfinished_tids = False
        self.unfinished_tid_list = tid_list

    def _askUnfinishedTIDs(self):
180
        conn = self.app.master_conn
181
        conn.ask(Packets.AskUnfinishedTransactions())
182 183 184 185 186
        self.waiting_for_unfinished_tids = True

    def _startReplication(self):
        # Choose a storage node for the source.
        app = self.app
187 188 189 190
        cell_list = app.pt.getCellList(self.current_partition.getRID(),
                                       readable=True)
        node_list = [cell.getNode() for cell in cell_list
                        if cell.getNodeState() == NodeStates.RUNNING]
191 192
        try:
            node = choice(node_list)
193
        except IndexError:
194
            # Not operational.
195 196
            logging.error('not operational', exc_info = 1)
            self.current_partition = None
197 198
            return

199
        addr = node.getAddress()
200
        if addr is None:
201
            logging.error("no address known for the selected node %s" %
202
                    (dump(node.getUUID()), ))
203
            return
204
        if self.current_connection is not None:
205
            if self.current_connection.getAddress() != addr:
206 207 208 209
                self.current_connection.close()
                self.current_connection = None

        if self.current_connection is None:
Grégory Wisniewski's avatar
Grégory Wisniewski committed
210
            handler = replication.ReplicationHandler(app)
211
            self.current_connection = ClientConnection(app.em, handler,
212
                   addr=addr, connector=app.connector_handler())
213
            p = Packets.RequestIdentification(NodeTypes.STORAGE,
214
                    app.uuid, app.server, app.name)
215
            self.current_connection.ask(p)
216

217 218
        p = Packets.AskTIDsFrom(ZERO_TID, 1000,
            self.current_partition.getRID())
219
        self.current_connection.ask(p, timeout=300)
220 221 222

        self.replication_done = False

223 224
    def _finishReplication(self):
        app = self.app
225
        # TODO: remove try..except: pass
226
        try:
227
            self.partition_dict.pop(self.current_partition.getRID())
228
            # Notify to a primary master node that my cell is now up-to-date.
229
            conn = self.app.master_conn
230 231
            offset = self.current_partition.getRID()
            conn.notify(Packets.NotifyReplicationDone(offset))
232
        except KeyError:
233 234
            pass
        self.current_partition = None
235 236 237
        if not self.pending():
            self.current_connection.close()
            self.current_connection = None
238

239 240 241
    def act(self):
        # If the new partition list is not empty, I must ask a critical
        # TID to a primary master node.
242
        if self.new_partition_dict:
243
            self._askCriticalTID()
244

245
        if self.current_partition is not None:
246
            if self.replication_done:
247
                # finish a replication
248
                logging.info('replication is done for %s' %
249
                        (self.current_partition.getRID(), ))
250
                self._finishReplication()
251 252 253 254 255 256 257 258 259 260 261 262 263 264
            return

        if self.waiting_for_unfinished_tids:
            # Still waiting.
            logging.debug('waiting for unfinished tids')
            return

        if self.unfinished_tid_list is None:
            # Ask pending transactions.
            logging.debug('asking unfinished tids')
            self._askUnfinishedTIDs()
            return

        # Try to select something.
265 266 267 268
        if len(self.unfinished_tid_list):
            min_unfinished_tid = min(self.unfinished_tid_list)
        else:
            min_unfinished_tid = None
269
        for partition in self.partition_dict.values():
270
            if partition.safe(min_unfinished_tid):
271 272 273 274
                self.current_partition = partition
                break
        else:
            # Not yet.
275
            self.unfinished_tid_list = None
276 277 278 279
            logging.debug('not ready yet')
            return

        self._startReplication()
280 281

    def removePartition(self, rid):
282
        """This is a callback from MasterOperationHandler."""
283 284
        self.partition_dict.pop(rid, None)
        self.new_partition_dict.pop(rid, None)
285 286

    def addPartition(self, rid):
287
        """This is a callback from MasterOperationHandler."""
288 289 290
        if not self.partition_dict.has_key(rid) \
                and not self.new_partition_dict.has_key(rid):
            self.new_partition_dict[rid] = Partition(rid)
291