Commit 7ee41db5 authored by Yoshinori Okuji's avatar Yoshinori Okuji

When a partition is replicated, it must benotified to a primary master node.

git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@211 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 041d2bca
...@@ -19,6 +19,12 @@ TODO ...@@ -19,6 +19,12 @@ TODO
- Replication. - Replication.
- History.
- Multiple undo.
- An addition of a new node to a partition table seeming a bit buggy
Requirements Requirements
- Python 2.4 or later - Python 2.4 or later
......
...@@ -31,7 +31,8 @@ class IdleEvent(object): ...@@ -31,7 +31,8 @@ class IdleEvent(object):
if t > self._critical_time: if t > self._critical_time:
conn.lock() conn.lock()
try: try:
logging.info('timeout with %s:%d', *(conn.getAddress())) logging.info('timeout for %r with %s:%d',
self._id, *(conn.getAddress()))
conn.getHandler().timeoutExpired(conn) conn.getHandler().timeoutExpired(conn)
conn.close() conn.close()
return True return True
...@@ -106,9 +107,10 @@ class SelectEventManager(object): ...@@ -106,9 +107,10 @@ class SelectEventManager(object):
t = time() t = time()
if t - self.prev_time >= 1: if t - self.prev_time >= 1:
self.prev_time = t self.prev_time = t
event_list.sort(key = lambda event: event.getTime()) event_list.sort(key = lambda event: event.getTime(),
reverse = True)
while event_list: while event_list:
event = event_list[0] event = event_list.pop()
if event(t): if event(t):
try: try:
event_list.remove(event) event_list.remove(event)
......
...@@ -81,3 +81,7 @@ class MasterEventHandler(EventHandler): ...@@ -81,3 +81,7 @@ class MasterEventHandler(EventHandler):
def handleAskUnfinishedTransactions(self, conn, packet): def handleAskUnfinishedTransactions(self, conn, packet):
logging.info('ignoring ask unfinished transactions') logging.info('ignoring ask unfinished transactions')
pass pass
def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list):
logging.info('ignoring notify partition changes')
pass
...@@ -2,7 +2,8 @@ import logging ...@@ -2,7 +2,8 @@ import logging
from copy import copy from copy import copy
from neo.protocol import MASTER_NODE_TYPE, CLIENT_NODE_TYPE, \ from neo.protocol import MASTER_NODE_TYPE, CLIENT_NODE_TYPE, \
RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \
UP_TO_DATE_STATE, FEEDING_STATE, DISCARDED_STATE
from neo.master.handler import MasterEventHandler from neo.master.handler import MasterEventHandler
from neo.protocol import Packet, INVALID_UUID from neo.protocol import Packet, INVALID_UUID
from neo.exception import OperationFailure, ElectionFailure from neo.exception import OperationFailure, ElectionFailure
...@@ -583,3 +584,43 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -583,3 +584,43 @@ class ServiceEventHandler(MasterEventHandler):
p.answerUnfinishedTransactions(packet.getId(), p.answerUnfinishedTransactions(packet.getId(),
app.finishing_transaction_dict.keys()) app.finishing_transaction_dict.keys())
conn.addPacket(p) conn.addPacket(p)
def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list):
# This should be sent when a cell becomes up-to-date because
# a replication has finished.
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app
node = app.nm.getNodeByUUID(uuid)
if node is None:
self.handleUnexpectedPacket(conn, packet)
return
new_cell_list = []
for cell in cell_list:
if cell[2] != UP_TO_DATE_STATE:
logging.warn('only up-to-date state should be sent')
continue
if uuid != cell[1]:
logging.warn('only a cell itself should send this packet')
continue
offset = cell[0]
app.pt.setCell(offset, node, UP_TO_DATE_STATE)
new_cell_list.append(cell)
# If the partition contains a feeding cell, drop it now.
for feeding_cell in app.pt.getCellList(offset):
if feeding_cell.getState() == FEEDING_STATE:
app.pt.removeCell(offset, feeding_cell.getNode())
new_cell_list.append((offset, feeding_cell.getUUID(),
DISCARDED_STATE))
break
if new_cell_list:
ptid = app.getNextPartitionTableID()
app.broadcastPartitionChanges(ptid, new_cell_list)
...@@ -111,7 +111,7 @@ class PartitionTable(object): ...@@ -111,7 +111,7 @@ class PartitionTable(object):
for cell in row: for cell in row:
if cell.getNode() == node: if cell.getNode() == node:
row.remove(cell) row.remove(cell)
if state != FEEDING_STATE: if cell.getState() != FEEDING_STATE:
self.count_dict[node] = self.count_dict.get(node, 0) - 1 self.count_dict[node] = self.count_dict.get(node, 0) - 1
break break
row.append(Cell(node, state)) row.append(Cell(node, state))
...@@ -219,10 +219,11 @@ class PartitionTable(object): ...@@ -219,10 +219,11 @@ class PartitionTable(object):
return cell_list return cell_list
def addNode(self, node): def addNode(self, node):
"""Add a node. Take it into account that it might not be really a new node. """Add a node. Take it into account that it might not be really a new
The strategy is, if a row does not contain a good number of cells, add this node. The strategy is, if a row does not contain a good number of
node to the row, unless the node is already present in the same row. Otherwise, cells, add this node to the row, unless the node is already present
check if this node should replace another cell.""" in the same row. Otherwise, check if this node should replace another
cell."""
cell_list = [] cell_list = []
node_count = self.count_dict.get(node, 0) node_count = self.count_dict.get(node, 0)
for offset, row in enumerate(self.partition_list): for offset, row in enumerate(self.partition_list):
...@@ -255,22 +256,26 @@ class PartitionTable(object): ...@@ -255,22 +256,26 @@ class PartitionTable(object):
if max_count - node_count > 1: if max_count - node_count > 1:
if feeding_cell is not None \ if feeding_cell is not None \
or max_cell.getState() == OUT_OF_DATE_STATE: or max_cell.getState() == OUT_OF_DATE_STATE:
# If there is a feeding cell already or it is out-of-date, # If there is a feeding cell already or it is
# just drop the node. # out-of-date, just drop the node.
row.remove(max_cell) row.remove(max_cell)
cell_list.append((offset, max_cell.getUUID(), DISCARDED_STATE)) cell_list.append((offset, max_cell.getUUID(),
DISCARDED_STATE))
self.count_dict[max_cell.getNode()] -= 1 self.count_dict[max_cell.getNode()] -= 1
else: else:
# Otherwise, use it as a feeding cell for safety. # Otherwise, use it as a feeding cell for safety.
max_cell.setState(FEEDING_STATE) max_cell.setState(FEEDING_STATE)
cell_list.append((offset, max_cell.getUUID(), FEEDING_STATE)) cell_list.append((offset, max_cell.getUUID(),
FEEDING_STATE))
# Don't count a feeding cell. # Don't count a feeding cell.
self.count_dict[max_cell.getNode()] -= 1 self.count_dict[max_cell.getNode()] -= 1
row.append(Cell(node, OUT_OF_DATE_STATE)) row.append(Cell(node, OUT_OF_DATE_STATE))
cell_list.append((offset, node.getUUID(), OUT_OF_DATE_STATE)) cell_list.append((offset, node.getUUID(),
OUT_OF_DATE_STATE))
node_count += 1 node_count += 1
self.count_dict[node] = node_count self.count_dict[node] = node_count
self.log()
return cell_list return cell_list
def getRow(self, offset): def getRow(self, offset):
...@@ -359,6 +364,7 @@ class PartitionTable(object): ...@@ -359,6 +364,7 @@ class PartitionTable(object):
# to reduce differences between frequently used nodes and rarely used # to reduce differences between frequently used nodes and rarely used
# nodes by replacing cells. # nodes by replacing cells.
self.log()
return changed_cell_list return changed_cell_list
def outdate(self): def outdate(self):
......
import logging import logging
from random import choice from random import choice
from neo.protocol import Packet, OUT_OF_DATE_STATE, STORAGE_NODE_TYPE, \ from neo.protocol import Packet, STORAGE_NODE_TYPE, \
UP_TO_DATE_STATE, OUT_OF_DATE_STATE, \
INVALID_OID, INVALID_TID, RUNNING_STATE INVALID_OID, INVALID_TID, RUNNING_STATE
from neo.connection import ClientConnection from neo.connection import ClientConnection
from neo.storage.handler import StorageEventHandler from neo.storage.handler import StorageEventHandler
...@@ -62,6 +63,9 @@ class ReplicationEventHandler(StorageEventHandler): ...@@ -62,6 +63,9 @@ class ReplicationEventHandler(StorageEventHandler):
def handleAnswerTIDs(self, conn, packet, tid_list): def handleAnswerTIDs(self, conn, packet, tid_list):
app = self.app app = self.app
if app.replicator.current_connection is not conn:
return
if tid_list: if tid_list:
# If I have pending TIDs, check which TIDs I don't have, and # If I have pending TIDs, check which TIDs I don't have, and
# request the data. # request the data.
...@@ -72,7 +76,7 @@ class ReplicationEventHandler(StorageEventHandler): ...@@ -72,7 +76,7 @@ class ReplicationEventHandler(StorageEventHandler):
p = Packet() p = Packet()
p.askTransactionInformation(msg_id, tid) p.askTransactionInformation(msg_id, tid)
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(timeout = 300) conn.expectMessage(msg_id, timeout = 300)
# And, ask more TIDs. # And, ask more TIDs.
app.replicator.tid_offset += 1000 app.replicator.tid_offset += 1000
...@@ -82,7 +86,7 @@ class ReplicationEventHandler(StorageEventHandler): ...@@ -82,7 +86,7 @@ class ReplicationEventHandler(StorageEventHandler):
p.askTIDs(msg_id, offset, offset + 1000, p.askTIDs(msg_id, offset, offset + 1000,
app.replicator.current_partition.getRID()) app.replicator.current_partition.getRID())
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(timeout = 300) conn.expectMessage(msg_id, timeout = 300)
else: else:
# If no more TID, a replication of transactions is finished. # If no more TID, a replication of transactions is finished.
# So start to replicate objects now. # So start to replicate objects now.
...@@ -91,17 +95,23 @@ class ReplicationEventHandler(StorageEventHandler): ...@@ -91,17 +95,23 @@ class ReplicationEventHandler(StorageEventHandler):
p.askOIDs(msg_id, 0, 1000, p.askOIDs(msg_id, 0, 1000,
app.replicator.current_partition.getRID()) app.replicator.current_partition.getRID())
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(timeout = 300) conn.expectMessage(msg_id, timeout = 300)
app.replicator.oid_offset = 0 app.replicator.oid_offset = 0
def handleAnswerTransactionInformation(self, conn, packet, tid, def handleAnswerTransactionInformation(self, conn, packet, tid,
user, desc, ext, oid_list): user, desc, ext, oid_list):
app = self.app app = self.app
if app.replicator.current_connection is not conn:
return
# Directly store the transaction. # Directly store the transaction.
app.dm.storeTransaction(tid, (), (oid_list, user, desc, ext), True) app.dm.storeTransaction(tid, (), (oid_list, user, desc, ext), True)
def handleAnswerOIDs(self, conn, packet, oid_list): def handleAnswerOIDs(self, conn, packet, oid_list):
app = self.app app = self.app
if app.replicator.current_connection is not conn:
return
if oid_list: if oid_list:
# Pick one up, and ask the history. # Pick one up, and ask the history.
oid = oid_list.pop() oid = oid_list.pop()
...@@ -109,7 +119,7 @@ class ReplicationEventHandler(StorageEventHandler): ...@@ -109,7 +119,7 @@ class ReplicationEventHandler(StorageEventHandler):
p = Packet() p = Packet()
p.askObjectHistory(msg_id, oid, 0, 1000) p.askObjectHistory(msg_id, oid, 0, 1000)
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(timeout = 300) conn.expectMessage(msg_id, timeout = 300)
app.replicator.serial_offset = 0 app.replicator.serial_offset = 0
app.replicator.oid_list = oid_list app.replicator.oid_list = oid_list
else: else:
...@@ -119,6 +129,9 @@ class ReplicationEventHandler(StorageEventHandler): ...@@ -119,6 +129,9 @@ class ReplicationEventHandler(StorageEventHandler):
def handleAnswerObjectHistory(self, conn, packet, oid, history_list): def handleAnswerObjectHistory(self, conn, packet, oid, history_list):
app = self.app app = self.app
if app.replicator.current_connection is not conn:
return
if history_list: if history_list:
# Check if I have objects, request those which I don't have. # Check if I have objects, request those which I don't have.
serial_list = [t[0] for t in history_list] serial_list = [t[0] for t in history_list]
...@@ -129,7 +142,7 @@ class ReplicationEventHandler(StorageEventHandler): ...@@ -129,7 +142,7 @@ class ReplicationEventHandler(StorageEventHandler):
p = Packet() p = Packet()
p.askObject(msg_id, oid, serial, INVALID_TID) p.askObject(msg_id, oid, serial, INVALID_TID)
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(timeout = 300) conn.expectMessage(msg_id, timeout = 300)
# And, ask more serials. # And, ask more serials.
app.replicator.serial_offset += 1000 app.replicator.serial_offset += 1000
...@@ -138,7 +151,7 @@ class ReplicationEventHandler(StorageEventHandler): ...@@ -138,7 +151,7 @@ class ReplicationEventHandler(StorageEventHandler):
p = Packet() p = Packet()
p.askObjectHistory(msg_id, oid, offset, offset + 1000) p.askObjectHistory(msg_id, oid, offset, offset + 1000)
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(timeout = 300) conn.expectMessage(msg_id, timeout = 300)
else: else:
# This OID is finished. So advance to next. # This OID is finished. So advance to next.
oid_list = app.replicator.oid_list oid_list = app.replicator.oid_list
...@@ -149,7 +162,7 @@ class ReplicationEventHandler(StorageEventHandler): ...@@ -149,7 +162,7 @@ class ReplicationEventHandler(StorageEventHandler):
p = Packet() p = Packet()
p.askObjectHistory(msg_id, oid, 0, 1000) p.askObjectHistory(msg_id, oid, 0, 1000)
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(timeout = 300) conn.expectMessage(msg_id, timeout = 300)
app.replicator.serial_offset = 0 app.replicator.serial_offset = 0
else: else:
# Otherwise, acquire more OIDs. # Otherwise, acquire more OIDs.
...@@ -160,11 +173,14 @@ class ReplicationEventHandler(StorageEventHandler): ...@@ -160,11 +173,14 @@ class ReplicationEventHandler(StorageEventHandler):
p.askOIDs(msg_id, offset, offset + 1000, p.askOIDs(msg_id, offset, offset + 1000,
app.replicator.current_partition.getRID()) app.replicator.current_partition.getRID())
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(timeout = 300) conn.expectMessage(msg_id, timeout = 300)
def answerObject(self, msg_id, oid, serial_start, serial_end, compression, def handleAnswerObject(self, conn, packet, oid, serial_start,
checksum, data): serial_end, compression, checksum, data):
app = self.app app = self.app
if app.replicator.current_connection is not conn:
return
# Directly store the transaction. # Directly store the transaction.
obj = (oid, compression, checksum, data) obj = (oid, compression, checksum, data)
app.dm.storeTransaction(serial_start, [obj], None, True) app.dm.storeTransaction(serial_start, [obj], None, True)
...@@ -319,10 +335,27 @@ class Replicator(object): ...@@ -319,10 +335,27 @@ class Replicator(object):
p = Packet() p = Packet()
p.askTIDs(msg_id, 0, 1000, self.current_partition.getRID()) p.askTIDs(msg_id, 0, 1000, self.current_partition.getRID())
self.current_connection.addPacket(p) self.current_connection.addPacket(p)
self.current_connection.expectMessage(timeout = 300) self.current_connection.expectMessage(msg_id, timeout = 300)
self.replication_done = False self.replication_done = False
def _finishReplication(self):
app = self.app
try:
self.partition_list.remove(self.current_partition)
# Notify to a primary master node that my cell is now up-to-date.
conn = self.primary_master_connection
p = Packet()
p.notifyPartitionChanges(conn.getNextId(),
app.lptid,
[(self.current_partition.getRID(),
app.uuid,
UP_TO_DATE_STATE)])
conn.addPacket(p)
except ValueError:
pass
self.current_partition = None
def act(self): def act(self):
# If the new partition list is not empty, I must ask a critical # If the new partition list is not empty, I must ask a critical
# TID to a primary master node. # TID to a primary master node.
...@@ -356,8 +389,4 @@ class Replicator(object): ...@@ -356,8 +389,4 @@ class Replicator(object):
else: else:
if self.replication_done: if self.replication_done:
logging.info('replication is done') logging.info('replication is done')
try: self._finishReplication()
self.partition_list.remove(self.current_partition)
except ValueError:
pass
self.current_partition = None
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment