Commit 08cfd95a authored by Yoshinori Okuji's avatar Yoshinori Okuji

Implement a part of the verification step. Now the handler must be implemented.

git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@31 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 086b597d
...@@ -407,6 +407,79 @@ class Application(object): ...@@ -407,6 +407,79 @@ class Application(object):
continue continue
break break
def verifyTransaction(self, tid):
em = self.em
uuid_set = set()
# Determine to which nodes I should ask.
partition = tid % self.num_partitions
transaction_uuid_list = [cell.getUUID() for cell \
in self.pt.getCellList(partition, True)]
if len(transaction_uuid_list) == 0:
raise VerificationFailure
uuid_list.update(transaction_uuid_list)
# Gather OIDs.
self.asking_uuid_dict = {}
self.unfinished_oid_set = set()
for conn in em.getConnectionList():
uuid = conn.getUUID()
if uuid in transaction_uuid_list:
self.asking_uuid_dict[uuid] = False
p = Packet()
msg_id = conn.getNextId()
p.askOIDsByTID(msg_id, tid)
conn.addPacket(p)
conn.expectMessage(msg_id)
break
else:
raise VerificationFailure
while 1:
em.poll(1)
if not self.pt.operational():
raise VerificationFailure
if False not in self.asking_uuid_dict.values():
break
if len(self.unfinished_oid_set) == 0:
# Not commitable.
return None
else:
# Verify that all objects are present.
for oid in self.unfinished_oid_set:
self.asking_uuid_dict.clear()
partition = oid % self.num_partitions
object_uuid_list = [cell.getUUID() for cell \
in self.pt.getCellList(partition, True)]
if len(object_uuid_list) == 0:
raise VerificationFailure
uuid_set.update(object_uuid_list)
self.object_present = True
for conn in em.getConnectionList():
uuid = conn.getUUID()
if uuid in object_uuid_list:
self.asking_uuid_dict[uuid] = False
p = Packet()
msg_id = conn.getNextId()
p.askObjectPresent(msg_id, oid, tid)
conn.addPacket(p)
conn.expectMessage(msg_id)
while 1:
em.poll(1)
if not self.pt.operational():
raise VerificationFailure
if False not in self.asking_uuid_dict.values():
break
if not self.object_present:
# Not commitable.
return None
return uuid_set
def verifyData(self): def verifyData(self):
"""Verify the data in storage nodes and clean them up, if necessary.""" """Verify the data in storage nodes and clean them up, if necessary."""
logging.info('start to verify data') logging.info('start to verify data')
...@@ -414,13 +487,17 @@ class Application(object): ...@@ -414,13 +487,17 @@ class Application(object):
em = self.em em = self.em
nm = self.nm nm = self.nm
# First, send the current partition table to storage nodes, so that # FIXME this part has a potential problem that the write buffers can
# be very huge. Thus it would be better to flush the buffers from time
# to time _without_ reading packets.
# Send the current partition table to storage nodes, so that
# all nodes share the same view. # all nodes share the same view.
for conn in em.getConnectionList(): for conn in em.getConnectionList():
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid is not None: if uuid is not None:
node = nm.getNodeByUUID(uuid) node = nm.getNodeByUUID(uuid)
if isinstance(node, StorageNode) and node.getState() == RUNNING_STATE: if isinstance(node, StorageNode):
# Split the packet if too huge. # Split the packet if too huge.
p = Packet() p = Packet()
row_list = [] row_list = []
...@@ -431,29 +508,78 @@ class Application(object): ...@@ -431,29 +508,78 @@ class Application(object):
conn.addPacket(p) conn.addPacket(p)
del row_list[:] del row_list[:]
if len(row_list) != 0: if len(row_list) != 0:
p.sendPartitionTable(self.lptid, row_list) p.sendPartitionTable(conn.getNextId(), self.lptid, row_list)
conn.addPacket(p) conn.addPacket(p)
# Secondly, tweak the partition table, if the distribution of storage nodes # Gather all unfinished transactions.
# is not uniform. self.asking_uuid_dict = {}
self.unfinished_tid_set = set()
for conn in em.getConnectionList():
uuid = conn.getUUID()
if uuid is not None:
node = nm.getNodeByUUID(uuid)
if isinstance(node, StorageNode):
self.asking_uuid_dict[uuid] = False
p = Packet()
msg_id = conn.getNextId()
p.askUnfinishedTransactions(msg_id)
conn.addPacket(p)
conn.expectMessage(msg_id)
while 1:
em.poll(1)
if not self.pt.operational():
raise VerificationFailure
if False not in self.asking_uuid_dict.values():
break
# Gather OIDs for each unfinished TID, and verify whether the transaction
# can be finished or must be aborted. This could be in parallel in theory,
# but not so easy. Thus do it one-by-one at the moment.
for tid in self.unfinished_tid_set:
uuid_set = self.verifyTransaction(tid)
if uuid_set is None:
# Make sure that no node has this transaction.
for conn in em.getConnectionList():
uuid = conn.getUUID()
if uuid is not None:
node = nm.getNodeByUUID(uuid)
if isinstance(node, StorageNode):
p = Packet()
p.deleteTransaction(conn.getNextId(), tid)
conn.addPacket(p)
else:
for conn in em.getConnectionList():
uuid = conn.getUUID()
if uuid in uuid_set:
p = Packet()
p.commitTransaction(conn.getNextId(), tid)
conn.addPacket(p)
# Secondly, gather all unfinished transactions. # If possible, send the packets now.
# FIXME em.poll(0)
# Thirdly, finish or abort unfinished transactions. # Tweak the partition table, if the distribution of storage nodes
# is not uniform.
cell_list = self.pt.tweak()
# FIXME # And, add unused nodes.
node_list = self.pt.getNodeList()
for node in nm.getStorageNodeList():
if node.getState() == RUNNING_STATE and node not in node_list:
cell_list.extend(self.pt.addNode(node))
# Forthly, verify some transactions to check for the health. # If anything changed, send the changes.
if cell_list:
app.broadcastPartitionChanges(self.getNextPartitionTableID(), cell_list)
# FIXME
def playPrimaryRole(self): def playPrimaryRole(self):
logging.info('play the primary role') logging.info('play the primary role')
# If I know any storage node, make sure that they are not in the running state, # If I know any storage node, make sure that they are not in the running state,
# because they are not connected at this stage. # because they are not connected at this stage.
for self.nm.getStorageNodeList(): for node in self.nm.getStorageNodeList():
if node.getState() == RUNNING_STATE: if node.getState() == RUNNING_STATE:
node.setState(TEMPORARILY_DOWN_STATE) node.setState(TEMPORARILY_DOWN_STATE)
...@@ -465,6 +591,8 @@ class Application(object): ...@@ -465,6 +591,8 @@ class Application(object):
self.verifyData() self.verifyData()
except VerificationFailure: except VerificationFailure:
recovering = True recovering = True
# FIXME start a real operation
raise NotImplementedError raise NotImplementedError
def playSecondaryRole(self): def playSecondaryRole(self):
......
...@@ -42,6 +42,21 @@ class PartitionTable(object): ...@@ -42,6 +42,21 @@ class PartitionTable(object):
self.partition_list = [None] * self.np self.partition_list = [None] * self.np
self.count_dict.clear() self.count_dict.clear()
def getNodeList(self):
"""Return all used nodes."""
node_list = []
for node, count in self.count_dict.iteritems():
if count > 0:
node_list.append(node)
return node_list
def getCellList(self, offset, usable_only = False):
if usable_only:
return [cell for cell in self.partition_list[offset] \
if cell.getState() in (UP_TO_DATE_STATE, FEEDING_STATE)]
return self.partition_list[offset]
def make(self, node_list): def make(self, node_list):
"""Make a new partition table from scratch.""" """Make a new partition table from scratch."""
# First, filter the list of nodes. # First, filter the list of nodes.
...@@ -136,17 +151,75 @@ class PartitionTable(object): ...@@ -136,17 +151,75 @@ class PartitionTable(object):
if row is not None: if row is not None:
for cell in row: for cell in row:
if cell.getNode() == node: if cell.getNode() == node:
if cell.getState() != FEEDING_STATE:
# If this cell is not feeding, find another node to be added.
node = self.findLeastUsedNode([cell.getNode() for cell in row])
if node is not None:
row.append(Cell(node, OUT_OF_DATE_STATE))
cell_list.append((offset, node.getUUID(),
OUT_OF_DATE_STATE))
row.remove(cell) row.remove(cell)
cell_list.append((offset, uuid, DISCARDED_STATE)) cell_list.append((offset, uuid, DISCARDED_STATE))
node = self.findLeastUsedNode()
if node is not None:
row.append(Cell(node, OUT_OF_DATE_STATE))
cell_list.append((offset, node.getUUID(), OUT_OF_DATE_STATE))
break break
del self.count_dict[node] del self.count_dict[node]
return cell_list return cell_list
def addNode(self, node):
"""Add a node. Take it into account that it might not be really a new node.
The strategy is, if a row does not contain a good number of cells, add this
node to the row, unless the node is already present in the same row. Otherwise,
check if this node should replace another cell."""
cell_list = []
node_count = self.count_dict.get(node, 0)
for offset, row in enumerate(self.partition_list):
feeding_cell = None
max_count = 0
max_cell = None
num_cells = 0
skip = False
for cell in row:
if cell.getNode() == node:
skip = True
break
if cell.getState() == FEEDING_STATE:
feeding_cell = cell
else:
num_cells += 1
count = self.count_dict[cell.getNode()]
if count > max_count:
max_count = count
max_cell = cell
if skip:
continue
if num_cells < self.nr:
row.append(Cell(node, OUT_OF_DATE_STATE))
cell_list.append((offset, node.getUUID(), OUT_OF_DATE_STATE))
node_count += 1
else:
if max_count - node_count > 1:
if feeding_cell is not None \
or max_cell.getState() == OUT_OF_DATE_STATE:
# If there is a feeding cell already or it is out-of-date,
# just drop the node.
row.remove(max_cell)
cell_list.append((offset, max_cell.getUUID(), DISCARDED_STATE))
self.count_dict[max_cell.getNode()] -= 1
else:
# Otherwise, use it as a feeding cell for safety.
max_cell.setState(FEEDING_STATE)
cell_list.append((offset, max_cell.getUUID(), FEEDING_STATE))
# Don't count a feeding cell.
self.count_dict[max_cell.getNode()] -= 1
row.append(Cell(node, OUT_OF_DATE_STATE))
cell_list.append((offset, node.getUUID(), OUT_OF_DATE_STATE))
node_count += 1
self.count_dict[node] = node_count
return cell_list
def getRow(self, offset): def getRow(self, offset):
row = self.partition_list[offset] row = self.partition_list[offset]
if row is None: if row is None:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment