pt.py 13 KB
Newer Older
Aurel's avatar
Aurel committed
1
#
Grégory Wisniewski's avatar
Grégory Wisniewski committed
2
# Copyright (C) 2006-2010  Nexedi SA
3
#
Aurel's avatar
Aurel committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
Aurel's avatar
Aurel committed
9 10 11 12 13 14 15
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
Aurel's avatar
Aurel committed
17

18
import neo
19

20 21 22 23
from neo.lib import protocol
from neo.lib.protocol import CellStates
from neo.lib.util import dump, u64
from neo.lib.locking import RLock
24

25 26 27 28
class PartitionTableException(Exception):
    """
        Base class for partition table exceptions
    """
29 30 31 32

class Cell(object):
    """This class represents a cell in a partition table."""

33
    def __init__(self, node, state = CellStates.UP_TO_DATE):
34 35 36
        self.node = node
        self.state = state

37 38 39 40 41 42 43
    def __repr__(self):
        return "<Cell(uuid=%s, address=%s, state=%s)>" % (
            dump(self.getUUID()),
            self.getAddress(),
            self.getState(),
        )

44 45 46 47 48 49
    def getState(self):
        return self.state

    def setState(self, state):
        self.state = state

50
    def isUpToDate(self):
51
        return self.state == CellStates.UP_TO_DATE
52 53

    def isOutOfDate(self):
54
        return self.state == CellStates.OUT_OF_DATE
55 56

    def isFeeding(self):
57
        return self.state == CellStates.FEEDING
58

59 60 61 62 63 64 65
    def getNode(self):
        return self.node

    def getNodeState(self):
        """This is a short hand."""
        return self.node.getState()

66 67 68
    def getUUID(self):
        return self.node.getUUID()

69 70
    def getAddress(self):
        return self.node.getAddress()
Aurel's avatar
Aurel committed
71

72

73 74 75 76
class PartitionTable(object):
    """This class manages a partition table."""

    def __init__(self, num_partitions, num_replicas):
77
        self._id = None
78 79 80
        self.np = num_partitions
        self.nr = num_replicas
        self.num_filled_rows = 0
81 82 83
        # Note: don't use [[]] * num_partition construct, as it duplicates
        # instance *references*, so the outer list contains really just one
        # inner list instance.
84
        self.partition_list = [[] for _ in xrange(num_partitions)]
85
        self.count_dict = {}
86

87
    def getID(self):
88
        return self._id
89

90
    def getPartitions(self):
91
        return self.np
92 93

    def getReplicas(self):
94
        return self.nr
95

96 97
    def clear(self):
        """Forget an existing partition table."""
98
        self._id = None
99
        self.num_filled_rows = 0
100 101 102
        # Note: don't use [[]] * self.np construct, as it duplicates
        # instance *references*, so the outer list contains really just one
        # inner list instance.
103
        self.partition_list = [[] for _ in xrange(self.np)]
104
        self.count_dict.clear()
105

106 107 108 109 110 111
    def getAssignedPartitionList(self, uuid):
        """ Return the partition assigned to the specified UUID """
        assigned_partitions = []
        for offset in xrange(self.np):
            for cell in self.getCellList(offset, readable=True):
                if cell.getUUID() == uuid:
112
                    assigned_partitions.append(offset)
113 114 115
                    break
        return assigned_partitions

116 117 118 119 120 121
    def hasOffset(self, offset):
        try:
            return len(self.partition_list[offset]) > 0
        except IndexError:
            return False

122 123
    def getNodeList(self):
        """Return all used nodes."""
124 125
        return [node for node, count in self.count_dict.iteritems() \
                if count > 0]
126

127 128
    def getCellList(self, offset, readable=False, writable=False):
        # allow all cell states
129
        state_set = set(CellStates.values())
130 131
        if readable or writable:
            # except non readables
132
            state_set.remove(CellStates.DISCARDED)
133 134
        if readable:
            # except non writables
135
            state_set.remove(CellStates.OUT_OF_DATE)
Aurel's avatar
Aurel committed
136
        try:
137
            return [cell for cell in self.partition_list[offset] \
138
                    if cell is not None and cell.getState() in state_set]
139
        except (TypeError, KeyError):
Aurel's avatar
Aurel committed
140
            return []
141

142
    def getCellListForTID(self, tid, readable=False, writable=False):
143
        return self.getCellList(self.getPartition(tid), readable, writable)
144 145

    def getCellListForOID(self, oid, readable=False, writable=False):
146 147 148 149
        return self.getCellList(self.getPartition(oid), readable, writable)

    def getPartition(self, oid_or_tid):
        return u64(oid_or_tid) % self.getPartitions()
150

151 152 153 154 155 156 157
    def getOutdatedOffsetListFor(self, uuid):
        return [
            offset for offset in xrange(self.np)
            for c in self.partition_list[offset]
            if c.getUUID() == uuid and c.getState() == CellStates.OUT_OF_DATE
        ]

158 159 160 161 162 163 164
    def isAssigned(self, oid, uuid):
        """ Check if the oid is assigned to the given node """
        for cell in self.partition_list[u64(oid) % self.np]:
            if cell.getUUID() == uuid:
                return True
        return False

165
    def setCell(self, offset, node, state):
166
        if state == CellStates.DISCARDED:
167
            return self.removeCell(offset, node)
168
        if node.isBroken() or node.isDown():
169
            raise PartitionTableException('Invalid node state')
170

171
        self.count_dict.setdefault(node, 0)
172
        row = self.partition_list[offset]
173
        if len(row) == 0:
174
            # Create a new row.
175
            row = [Cell(node, state), ]
176
            if state != CellStates.FEEDING:
177
                self.count_dict[node] += 1
178
            self.partition_list[offset] = row
179 180 181 182 183 184 185 186

            self.num_filled_rows += 1
        else:
            # XXX this can be slow, but it is necessary to remove a duplicate,
            # if any.
            for cell in row:
                if cell.getNode() == node:
                    row.remove(cell)
187
                    if not cell.isFeeding():
188
                        self.count_dict[node] -= 1
189 190
                    break
            row.append(Cell(node, state))
191
            if state != CellStates.FEEDING:
192
                self.count_dict[node] += 1
193
        return (offset, node.getUUID(), state)
194

195 196
    def removeCell(self, offset, node):
        row = self.partition_list[offset]
197 198 199 200 201 202 203
        assert row is not None
        for cell in row:
            if cell.getNode() == node:
                row.remove(cell)
                if not cell.isFeeding():
                    self.count_dict[node] -= 1
                break
204
        return (offset, node.getUUID(), CellStates.DISCARDED)
205

206
    def load(self, ptid, row_list, nm):
207
        """
208
        Load the partition table with the specified PTID, discard all previous
209
        content.
210
        """
211
        self.clear()
212
        self._id = ptid
213
        for offset, row in row_list:
214
            if offset >= self.getPartitions():
215
                raise IndexError
216
            for uuid, state in row:
217
                node = nm.getByUUID(uuid)
218
                # the node must be known by the node manager
219 220
                assert node is not None
                self.setCell(offset, node, state)
221
        neo.lib.logging.debug('partition table loaded')
222
        self.log()
223

224
    def update(self, ptid, cell_list, nm):
225 226 227 228 229
        """
        Update the partition with the cell list supplied. Ignore those changes
        if the partition table ID is not greater than the current one. If a node
        is not known, it is created in the node manager and set as unavailable
        """
230
        if ptid <= self._id:
231
            neo.lib.logging.warning('ignoring older partition changes')
232
            return
233
        self._id = ptid
234
        for offset, uuid, state in cell_list:
235
            node = nm.getByUUID(uuid)
236
            assert node is not None, 'No node found for uuid %r' % (dump(uuid), )
237
            self.setCell(offset, node, state)
238
        neo.lib.logging.debug('partition table updated')
239 240
        self.log()

241 242 243
    def filled(self):
        return self.num_filled_rows == self.np

244
    def log(self):
245
        for line in self._format():
246
            neo.lib.logging.debug(line)
247 248 249 250 251

    def format(self):
        return '\n'.join(self._format())

    def _format(self):
252 253 254 255 256 257 258 259 260 261
        """Help debugging partition table management.

        Output sample:
        DEBUG:root:pt: node 0: ad7ffe8ceef4468a0c776f3035c7a543, R
        DEBUG:root:pt: node 1: a68a01e8bf93e287bd505201c1405bc2, R
        DEBUG:root:pt: node 2: 67ae354b4ed240a0594d042cf5c01b28, R
        DEBUG:root:pt: node 3: df57d7298678996705cd0092d84580f4, R
        DEBUG:root:pt: 00000000: .UU.|U..U|.UU.|U..U|.UU.|U..U|.UU.|U..U|.UU.
        DEBUG:root:pt: 00000009: U..U|.UU.|U..U|.UU.|U..U|.UU.|U..U|.UU.|U..U

262
        Here, there are 4 nodes in RUNNING state.
263
        The first partition has 2 replicas in UP_TO_DATE state, on nodes 1 and
Aurel's avatar
Aurel committed
264
        2 (nodes 0 and 3 are displayed as unused for that partition by
265 266 267 268 269
        displaying a dot).
        The 8-digits number on the left represents the number of the first
        partition on the line (here, line length is 9 to keep the docstring
        width under 80 column).
        """
270 271
        result = []
        append = result.append
272
        node_list = self.count_dict.keys()
273
        node_list = [k for k, v in self.count_dict.items() if v != 0]
274 275 276
        node_list.sort()
        node_dict = {}
        for i, node in enumerate(node_list):
277 278
            uuid = node.getUUID()
            node_dict[uuid] = i
279 280
            append('pt: node %d: %s, %s' % (i, dump(uuid),
                protocol.node_state_prefix_dict[node.getState()]))
281 282
        line = []
        max_line_len = 20 # XXX: hardcoded number of partitions per line
283
        cell_state_dict = protocol.cell_state_prefix_dict
284
        for offset, row in enumerate(self.partition_list):
285
            if len(line) == max_line_len:
286 287
                append('pt: %08d: %s' % (offset - max_line_len,
                              '|'.join(line)))
288
                line = []
289
            if row is None:
290
                line.append('X' * len(node_list))
291
            else:
292
                cell = []
293
                cell_dict = dict([(node_dict.get(x.getUUID(), None), x)
294
                    for x in row])
295 296 297 298 299 300 301
                for node in xrange(len(node_list)):
                    if node in cell_dict:
                        cell.append(cell_state_dict[cell_dict[node].getState()])
                    else:
                        cell.append('.')
                line.append(''.join(cell))
        if len(line):
302 303 304
            append('pt: %08d: %s' % (offset - len(line) + 1,
                          '|'.join(line)))
        return result
305

306
    def operational(self):
307 308 309 310
        if not self.filled():
            return False
        for row in self.partition_list:
            for cell in row:
311 312
                if (cell.isUpToDate() or cell.isFeeding()) and \
                        cell.getNode().isRunning():
313 314 315 316
                    break
            else:
                return False
        return True
317

318 319 320
    def getRow(self, offset):
        row = self.partition_list[offset]
        if row is None:
321
            return []
322 323
        return [(cell.getUUID(), cell.getState()) for cell in row]

324 325 326 327
    def getRowList(self):
        getRow = self.getRow
        return [(x, getRow(x)) for x in xrange(self.np)]

328 329 330 331 332 333 334
    def getNodeMap(self):
        """ Return a list of 2-tuple: (uuid, partition_list) """
        uuid_map = {}
        for index, row in enumerate(self.partition_list):
            for cell in row:
                uuid_map.setdefault(cell.getNode(), []).append(index)
        return uuid_map
335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360

def thread_safe(method):
    def wrapper(self, *args, **kwargs):
        self.lock()
        try:
            return method(self, *args, **kwargs)
        finally:
            self.unlock()
    return wrapper


class MTPartitionTable(PartitionTable):
    """ Thread-safe aware version of the partition table, override only methods
        used in the client """

    def __init__(self, *args, **kwargs):
        self._lock = RLock()
        PartitionTable.__init__(self, *args, **kwargs)

    def lock(self):
        self._lock.acquire()

    def unlock(self):
        self._lock.release()

    @thread_safe
361 362 363 364 365 366
    def getCellListForTID(self, *args, **kwargs):
        return PartitionTable.getCellListForTID(self, *args, **kwargs)

    @thread_safe
    def getCellListForOID(self, *args, **kwargs):
        return PartitionTable.getCellListForOID(self, *args, **kwargs)
367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383

    @thread_safe
    def setCell(self, *args, **kwargs):
        return PartitionTable.setCell(self, *args, **kwargs)

    @thread_safe
    def clear(self, *args, **kwargs):
        return PartitionTable.clear(self, *args, **kwargs)

    @thread_safe
    def operational(self, *args, **kwargs):
        return PartitionTable.operational(self, *args, **kwargs)

    @thread_safe
    def getNodeList(self, *args, **kwargs):
        return PartitionTable.getNodeList(self, *args, **kwargs)

384 385 386 387
    @thread_safe
    def getNodeMap(self, *args, **kwargs):
        return PartitionTable.getNodeMap(self, *args, **kwargs)