Commit a1539219 authored by Julien Muchembled's avatar Julien Muchembled

Do not change partition table when adding node and reimplement pt.tweak()

parent 07b1ce47
......@@ -78,6 +78,7 @@ class AdminEventHandler(EventHandler):
conn.answer(Packets.AnswerPrimary(master_node.getUUID()))
addPendingNodes = forward_ask(Packets.AddPendingNodes)
tweakPartitionTable = forward_ask(Packets.TweakPartitionTable)
setClusterState = forward_ask(Packets.SetClusterState)
checkReplicas = forward_ask(Packets.CheckReplicas)
......
......@@ -26,7 +26,7 @@ except ImportError:
pass
# The protocol version (major, minor).
PROTOCOL_VERSION = (11, 1)
PROTOCOL_VERSION = (12, 1)
# Size restrictions.
MIN_PACKET_SIZE = 10
......@@ -1150,6 +1150,16 @@ class AddPendingNodes(Packet):
_answer = Error
class TweakPartitionTable(Packet):
"""
Ask the primary to optimize the partition table. A -> PM.
"""
_fmt = PStruct('tweak_partition_table',
PFUUIDList,
)
_answer = Error
class NotifyNodeInformation(Packet):
"""
Notify information about one or more nodes. PM -> Any.
......@@ -1666,6 +1676,8 @@ class Packets(dict):
SetNodeState, ignore_when_closed=False)
AddPendingNodes = register(
AddPendingNodes, ignore_when_closed=False)
TweakPartitionTable = register(
TweakPartitionTable, ignore_when_closed=False)
AskNodeInformation, AnswerNodeInformation = register(
NodeInformation)
SetClusterState = register(
......
......@@ -130,37 +130,40 @@ class AdministrationHandler(MasterHandler):
uuids = ', '.join(map(uuid_str, uuid_list))
logging.debug('Add nodes %s', uuids)
app = self.app
nm = app.nm
em = app.em
pt = app.pt
cell_list = []
uuid_set = set()
if app.getClusterState() == ClusterStates.RUNNING:
state = app.getClusterState()
# XXX: Would it be safe to allow more states ?
if state not in (ClusterStates.RUNNING,
ClusterStates.STARTING_BACKUP,
ClusterStates.BACKINGUP):
raise ProtocolError('Can not add nodes in %s state' % state)
# take all pending nodes
for node in nm.getStorageList():
if node.isPending():
uuid_set.add(node.getUUID())
# keep only selected nodes
if uuid_list:
uuid_set = uuid_set.intersection(set(uuid_list))
# nothing to do
if not uuid_set:
logging.warning('No nodes added')
conn.answer(Errors.Ack('No nodes added'))
return
uuids = ', '.join(map(uuid_str, uuid_set))
logging.info('Adding nodes %s', uuids)
# switch nodes to running state
node_list = map(nm.getByUUID, uuid_set)
node_list = list(app.pt.addNodeList(node
for node in app.nm.getStorageList()
if node.isPending() and node.getUUID() in uuid_list))
if node_list:
p = Packets.StartOperation()
for node in node_list:
new_cells = pt.addNode(node)
cell_list.extend(new_cells)
node.setRunning()
node.getConnection().notify(Packets.StartOperation())
node.notify(p)
app.broadcastNodesInformation(node_list)
# broadcast the new partition table
app.broadcastPartitionChanges(cell_list)
conn.answer(Errors.Ack('Nodes added: %s' % (uuids, )))
conn.answer(Errors.Ack('Nodes added: %s' %
', '.join(uuid_str(x.getUUID()) for x in node_list)))
else:
logging.warning('No node added')
conn.answer(Errors.Ack('No node added'))
def tweakPartitionTable(self, conn, uuid_list):
app = self.app
state = app.getClusterState()
# XXX: Would it be safe to allow more states ?
if state not in (ClusterStates.RUNNING,
ClusterStates.STARTING_BACKUP,
ClusterStates.BACKINGUP):
raise ProtocolError('Can not tweak partition table in %s state'
% state)
app.broadcastPartitionChanges(app.pt.tweak(
map(app.nm.getByUUID, uuid_list)))
conn.answer(Errors.Ack(''))
def checkReplicas(self, conn, partition_dict, min_tid, max_tid):
app = self.app
......
......@@ -35,6 +35,16 @@ class Cell(neo.lib.pt.Cell):
neo.lib.pt.Cell = Cell
class MappedNode(object):
def __init__(self, node):
self.node = node
self.assigned = set()
def __getattr__(self, attr):
return getattr(self.node, attr)
class PartitionTable(neo.lib.pt.PartitionTable):
"""This class manages a partition table for the primary master node"""
......@@ -164,130 +174,101 @@ class PartitionTable(neo.lib.pt.PartitionTable):
return cell_list
def addNode(self, node):
"""Add a node. Take it into account that it might not be really a new
node. The strategy is, if a row does not contain a good number of
cells, add this node to the row, unless the node is already present
in the same row. Otherwise, check if this node should replace another
cell."""
cell_list = []
node_count = self.count_dict.get(node, 0)
for offset, row in enumerate(self.partition_list):
max_count = 0
max_cell = None
num_cells = 0
for cell in row:
if cell.getNode() is node:
break
if not cell.isFeeding():
num_cells += 1
count = self.count_dict[cell.getNode()]
if count > max_count:
max_count = count
max_cell = cell
else:
if self.nr < num_cells:
if node_count + 1 >= max_count:
continue
if max_cell.isReadable():
max_cell.setState(CellStates.FEEDING)
cell_list.append((offset, max_cell.getUUID(),
CellStates.FEEDING))
else:
row.remove(max_cell)
cell_list.append((offset, max_cell.getUUID(),
CellStates.DISCARDED))
self.count_dict[max_cell.getNode()] -= 1
row.append(Cell(node, CellStates.OUT_OF_DATE))
cell_list.append((offset, node.getUUID(),
CellStates.OUT_OF_DATE))
node_count += 1
self.count_dict[node] = node_count
self.log()
return cell_list
def addNodeList(self, node_list):
"""Add nodes"""
added_list = []
for node in node_list:
if node not in self.count_dict:
self.count_dict[node] = 0
added_list.append(node)
return added_list
def tweak(self):
"""Test if nodes are distributed uniformly. Otherwise, correct the
partition table."""
changed_cell_list = []
def tweak(self, drop_list=()):
"""Optimize partition table
This is done by computing a minimal diff between current partition table
and what make() would do.
"""
assigned_dict = dict((x, {}) for x in self.count_dict)
readable_dict = dict((i, set()) for i in xrange(self.np))
for offset, row in enumerate(self.partition_list):
removed_cell_list = []
feeding_cell = None
out_of_date_cell_list = []
up_to_date_cell_list = []
for cell in row:
if cell.getNode().isBroken():
# Remove a broken cell.
removed_cell_list.append(cell)
elif cell.isFeeding():
if feeding_cell is None:
feeding_cell = cell
else:
# Remove an excessive feeding cell.
removed_cell_list.append(cell)
elif cell.isUpToDate():
up_to_date_cell_list.append(cell)
else:
out_of_date_cell_list.append(cell)
# If all cells are up-to-date, a feeding cell is not required.
if len(out_of_date_cell_list) == 0 and feeding_cell is not None:
removed_cell_list.append(feeding_cell)
ideal_num = self.nr + 1
while len(out_of_date_cell_list) + len(up_to_date_cell_list) > \
ideal_num:
# This row contains too many cells.
if len(up_to_date_cell_list) > 1:
# There are multiple up-to-date cells, so choose whatever
# used too much.
cell_list = out_of_date_cell_list + up_to_date_cell_list
else:
# Drop an out-of-date cell.
cell_list = out_of_date_cell_list
max_count = 0
chosen_cell = None
for cell in cell_list:
count = self.count_dict[cell.getNode()]
if max_count < count:
max_count = count
chosen_cell = cell
removed_cell_list.append(chosen_cell)
try:
out_of_date_cell_list.remove(chosen_cell)
except ValueError:
up_to_date_cell_list.remove(chosen_cell)
# Now remove cells really.
for cell in removed_cell_list:
row.remove(cell)
if not cell.isFeeding():
self.count_dict[cell.getNode()] -= 1
changed_cell_list.append((offset, cell.getUUID(),
CellStates.DISCARDED))
# Add cells, if a row contains less than the number of replicas.
for offset, row in enumerate(self.partition_list):
num_cells = 0
if cell.isReadable():
readable_dict[offset].add(cell)
assigned_dict[cell.getNode()][offset] = cell
pt = PartitionTable(self.np, self.nr)
drop_list = set(x for x in drop_list if x in assigned_dict)
node_set = set(MappedNode(x) for x in assigned_dict
if x not in drop_list)
pt.make(node_set)
for offset, row in enumerate(pt.partition_list):
for cell in row:
if cell.isReadable():
cell.getNode().assigned.add(offset)
def map_nodes():
node_list = []
for node, assigned in assigned_dict.iteritems():
if node in drop_list:
yield node, frozenset()
continue
readable = set(offset for offset, cell in assigned.iteritems()
if cell.isReadable())
# the criterion on UUID is purely cosmetic
node_list.append((len(readable), len(assigned),
-node.getUUID(), readable, node))
node_list.sort(reverse=1)
for _, _, _, readable, node in node_list:
assigned = assigned_dict[node]
mapped = min(node_set, key=lambda m: (
len(m.assigned.symmetric_difference(assigned)),
len(m.assigned ^ readable)))
node_set.remove(mapped)
yield node, mapped.assigned
assert not node_set
changed_list = []
uptodate_set = set()
remove_dict = dict((i, []) for i in xrange(self.np))
for node, mapped in map_nodes():
uuid = node.getUUID()
assigned = assigned_dict[node]
for offset, cell in assigned.iteritems():
if offset in mapped:
if cell.isReadable():
uptodate_set.add(offset)
readable_dict[offset].remove(cell)
if cell.isFeeding():
self.count_dict[node] += 1
state = CellStates.UP_TO_DATE
cell.setState(state)
changed_list.append((offset, uuid, state))
else:
if not cell.isFeeding():
num_cells += 1
while num_cells <= self.nr:
node = self.findLeastUsedNode([cell.getNode() for cell in row])
if node is None:
break
row.append(Cell(node, CellStates.OUT_OF_DATE))
changed_cell_list.append((offset, node.getUUID(),
CellStates.OUT_OF_DATE))
self.count_dict[node] -= 1
remove_dict[offset].append(cell)
for offset in mapped.difference(assigned):
self.count_dict[node] += 1
num_cells += 1
self.log()
return changed_cell_list
state = CellStates.OUT_OF_DATE
self.partition_list[offset].append(Cell(node, state))
changed_list.append((offset, uuid, state))
count_dict = self.count_dict.copy()
for offset, cell_list in remove_dict.iteritems():
if not cell_list:
continue
row = self.partition_list[offset]
feeding = None if offset in uptodate_set else min(
readable_dict[offset], key=lambda x: count_dict[x.getNode()])
for cell in cell_list:
if cell is feeding:
count_dict[cell.getNode()] += 1
if cell.isFeeding():
continue
state = CellStates.FEEDING
cell.setState(state)
else:
state = CellStates.DISCARDED
row.remove(cell)
changed_list.append((offset, cell.getUUID(), state))
return changed_list
def outdate(self, lost_node=None):
"""Outdate all non-working nodes
......
......@@ -93,7 +93,6 @@ class VerificationManager(BaseServiceHandler):
return state, self
def run(self):
self.app.changeClusterState(ClusterStates.VERIFYING)
while True:
try:
......@@ -102,14 +101,7 @@ class VerificationManager(BaseServiceHandler):
continue
break
# At this stage, all non-working nodes are out-of-date.
cell_list = self.app.pt.outdate()
# Tweak the partition table, if the distribution of storage nodes
# is not uniform.
cell_list.extend(self.app.pt.tweak())
# If anything changed, send the changes.
self.app.broadcastPartitionChanges(cell_list)
self.app.broadcastPartitionChanges(self.app.pt.outdate())
def verifyData(self):
"""Verify the data in storage nodes and clean them up, if necessary."""
......
......@@ -34,6 +34,7 @@ action_dict = {
'check': 'checkReplicas',
'start': 'startCluster',
'add': 'enableStorageList',
'tweak': 'tweakPartitionTable',
'drop': 'dropNode',
}
......@@ -169,6 +170,14 @@ class TerminalNeoCTL(object):
uuid_list = map(self.asNode, params)
return self.neoctl.enableStorageList(uuid_list)
def tweakPartitionTable(self, params):
"""
Optimize partition table.
No partitition will be assigned to specified storage nodes.
Parameters: [node [...]]
"""
return self.neoctl.tweakPartitionTable(map(self.asNode, params))
def dropNode(self, params):
"""
Set node into DOWN state.
......
......@@ -84,6 +84,12 @@ class NeoCTL(object):
raise RuntimeError(response)
return response[2]
def tweakPartitionTable(self, uuid_list=()):
response = self.__ask(Packets.TweakPartitionTable(uuid_list))
if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
raise RuntimeError(response)
return response[2]
def setClusterState(self, state):
"""
Set cluster state.
......
......@@ -160,6 +160,7 @@ class StorageTests(NEOFunctionalTest):
# add it to the partition table
self.neo.neoctl.enableStorageList([stopped[0].getUUID()])
self.neo.expectRunning(stopped[0])
self.neo.neoctl.tweakPartitionTable()
self.neo.expectAssignedCells(stopped[0], number=10)
self.neo.expectClusterRunning()
......@@ -298,6 +299,7 @@ class StorageTests(NEOFunctionalTest):
stopped[0].start()
self.neo.expectPending(stopped[0])
self.neo.neoctl.enableStorageList([stopped[0].getUUID()])
self.neo.neoctl.tweakPartitionTable()
self.neo.expectRunning(stopped[0])
self.neo.expectClusterRunning()
self.neo.expectOudatedCells(number=0)
......@@ -356,6 +358,7 @@ class StorageTests(NEOFunctionalTest):
stopped[0].start()
self.neo.expectPending(stopped[0])
self.neo.neoctl.enableStorageList([stopped[0].getUUID()])
self.neo.neoctl.tweakPartitionTable()
self.neo.expectRunning(stopped[0])
self.neo.expectClusterRunning()
self.neo.expectAssignedCells(started[0], 10)
......@@ -469,6 +472,7 @@ class StorageTests(NEOFunctionalTest):
stopped[0].start()
self.neo.expectPending(stopped[0])
self.neo.neoctl.enableStorageList([stopped[0].getUUID()])
self.neo.neoctl.tweakPartitionTable()
self.neo.expectRunning(stopped[0])
self.neo.expectClusterRunning()
self.neo.expectAssignedCells(started[0], 10)
......
This diff is collapsed.
......@@ -184,13 +184,12 @@ class ReplicationTests(NEOThreadedTest):
another source.
Here are the different states of partitions over time:
pt: 0: U|U|U
pt: 0: UO|UO|UO
pt: 0: FOO|UO.|U.O # node 1 replicates from node 0
pt: 0: .OU|UO.|U.O # here node 0 lost partition 0
pt: 0: UO.|U.O|FOO
pt: 0: UU.|U.O|FOO
pt: 0: UU.|U.U|FOO # nodes 1 & 2 replicate from node 0
pt: 0: UU.|U.U|.OU # here node 0 lost partition 2
# and node 1 must switch to node 2
pt: 0: .OU|UO.|U.U
pt: 0: .OU|UU.|U.U
pt: 0: .UU|UU.|U.U
pt: 0: UU.|U.U|.UU
"""
def connected(orig, *args, **kw):
patch[0] = s1.filterConnection(s0)
......@@ -218,6 +217,7 @@ class ReplicationTests(NEOThreadedTest):
s2.start()
cluster.tic()
cluster.neoctl.enableStorageList([s1.uuid, s2.uuid])
cluster.neoctl.tweakPartitionTable()
offset, = [offset for offset, row in enumerate(
cluster.master.pt.partition_list)
for cell in row if cell.isFeeding()]
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment