Commit fd22e3e6 authored by Yoshinori Okuji's avatar Yoshinori Okuji

Many changes.

git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@26 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 03bcb91c
...@@ -91,9 +91,11 @@ class Connection(BaseConnection): ...@@ -91,9 +91,11 @@ class Connection(BaseConnection):
def getNextId(self): def getNextId(self):
next_id = self.cur_id next_id = self.cur_id
self.cur_id += 1 # Deal with an overflow.
if self.cur_id > 0xffff: if self.cur_id == 0xffffffff:
self.cur_id = 0 self.cur_id = 0
else:
self.cur_id += 1
return next_id return next_id
def close(self): def close(self):
......
...@@ -7,7 +7,7 @@ from time import time ...@@ -7,7 +7,7 @@ from time import time
from neo.config import ConfigurationManager from neo.config import ConfigurationManager
from neo.protocol import Packet, ProtocolError, \ from neo.protocol import Packet, ProtocolError, \
RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, BROKEN_STATE, \ RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, BROKEN_STATE, \
INVALID_UUID, INVALID_OID, INVALID_TID INVALID_UUID, INVALID_OID, INVALID_TID, INVALID_PTID
from neo.node import NodeManager, MasterNode, StorageNode, ClientNode from neo.node import NodeManager, MasterNode, StorageNode, ClientNode
from neo.event import EventManager from neo.event import EventManager
from neo.util import dump from neo.util import dump
...@@ -49,8 +49,12 @@ class Application(object): ...@@ -49,8 +49,12 @@ class Application(object):
break break
self.uuid = uuid self.uuid = uuid
# The last OID.
self.loid = INVALID_OID self.loid = INVALID_OID
# The last TID.
self.ltid = INVALID_TID self.ltid = INVALID_TID
# The last Partition Table ID.
self.lptid = INVALID_PTID
def run(self): def run(self):
"""Make sure that the status is sane and start a loop.""" """Make sure that the status is sane and start a loop."""
...@@ -113,8 +117,10 @@ class Application(object): ...@@ -113,8 +117,10 @@ class Application(object):
self.primary_master_node = None self.primary_master_node = None
for node in nm.getMasterNodeList(): for node in nm.getMasterNodeList():
if node.getState() in (RUNNING_STATE, TEMPORARILY_DOWN_STATE): self.unconnected_master_node_set.add(node.getServer())
self.unconnected_master_node_set.add(node.getServer()) # For now, believe that every node should be available,
# since down or broken nodes may be already repaired.
node.setState(RUNNING_STATE)
self.negotiating_master_node_set.clear() self.negotiating_master_node_set.clear()
try: try:
......
...@@ -35,7 +35,7 @@ class ElectionEventHandler(MasterEventHandler): ...@@ -35,7 +35,7 @@ class ElectionEventHandler(MasterEventHandler):
addr = conn.getAddress() addr = conn.getAddress()
app.negotiating_master_node_set.discard(addr) app.negotiating_master_node_set.discard(addr)
node = app.nm.getNodeByServer(addr) node = app.nm.getNodeByServer(addr)
if node.getState() not in (DOWN_STATE, BROKEN_STATE): if node.getState() == RUNNING_STATE:
app.unconnected_master_node_set.add(addr) app.unconnected_master_node_set.add(addr)
node.setState(TEMPORARILY_DOWN_STATE) node.setState(TEMPORARILY_DOWN_STATE)
MasterEventHandler.connectionFailed(self, conn) MasterEventHandler.connectionFailed(self, conn)
......
...@@ -35,7 +35,8 @@ class Node(object): ...@@ -35,7 +35,8 @@ class Node(object):
self.manager.unregisterServer(self) self.manager.unregisterServer(self)
self.server = server self.server = server
self.manager.registerServer(self) if server is not None:
self.manager.registerServer(self)
def getServer(self): def getServer(self):
return self.server return self.server
...@@ -46,7 +47,8 @@ class Node(object): ...@@ -46,7 +47,8 @@ class Node(object):
self.manager.unregisterUUID(self) self.manager.unregisterUUID(self)
self.uuid = uuid self.uuid = uuid
self.manager.registerUUID(self) if uuid is not None:
self.manager.registerUUID(self)
def getUUID(self): def getUUID(self):
return self.uuid return self.uuid
......
...@@ -12,21 +12,69 @@ MIN_PACKET_SIZE = 10 ...@@ -12,21 +12,69 @@ MIN_PACKET_SIZE = 10
MAX_PACKET_SIZE = 0x100000 MAX_PACKET_SIZE = 0x100000
# Message types. # Message types.
# Error is a special type of message, because this can be sent against any other message,
# even if such a message does not expect a reply usually. Any -> Any.
ERROR = 0x8000 ERROR = 0x8000
REQUEST_NODE_IDENTIFICATION = 0x0001
ACCEPT_NODE_IDENTIFICATION = 0x8001 # Check if a peer is still alive. Any -> Any.
PING = 0x0002 PING = 0x0001
PONG = 0x8002
# Notify being alive. Any -> Any.
PONG = 0x8001
# Request a node identification. This must be the first packet for any connection.
# Any -> Any.
REQUEST_NODE_IDENTIFICATION = 0x0002
# Accept a node identification. This should be a reply to Request Node Identification.
# Any -> Any.
ACCEPT_NODE_IDENTIFICATION = 0x8002
# Ask a current primary master node. This must be the second message when connecting
# to a master node. Any -> M.
ASK_PRIMARY_MASTER = 0x0003 ASK_PRIMARY_MASTER = 0x0003
# Reply to Ask Primary Master. This message includes a list of known master nodes,
# to make sure that a peer has the same information. M -> Any.
ANSWER_PRIMARY_MASTER = 0x8003 ANSWER_PRIMARY_MASTER = 0x8003
# Announce a primary master node election. PM -> SM.
ANNOUNCE_PRIMARY_MASTER = 0x0004 ANNOUNCE_PRIMARY_MASTER = 0x0004
# Force a re-election of a primary master node. M -> M.
REELECT_PRIMARY_MASTER = 0x0005 REELECT_PRIMARY_MASTER = 0x0005
# Notify information about one or more nodes. Any -> PM, PM -> Any.
NOTIFY_NODE_INFORMATION = 0x0006 NOTIFY_NODE_INFORMATION = 0x0006
START_OPERATION = 0x0007
STOP_OPERATION = 0x0008 # Ask the last OID, the last TID and the last Partition Table ID that a storage node
ASK_FINISHING_TRANSACTIONS = 0x0009 # stores. Used to recover information. PM -> S.
ANSWER_FINISHING_TRANSACTIONS = 0x8009 ASK_LAST_IDS = 0x0007
FINISH_TRANSACTIONS = 0x000a
# Reply to Ask Last IDs. S -> PM.
ANSWER_LAST_IDS = 0x8007
# Ask rows in a partition table that a storage node stores. Used to recover
# information. PM -> S.
ASK_PARTITION_TABLE = 0x0008
# Answer rows in a partition table. S -> PM.
ANSWER_PARTITION_TABLE = 0x8008
# Send rows in a partition table to update other nodes. PM -> S, C.
SEND_PARTITION_TABLE = 0x0009
# Notify a subset of a partition table. This is used to notify changes. PM -> S, C.
NOTIFY_PARTITION_CHANGES = 0x000a
# Tell a storage nodes to start an operation. Until a storage node receives this
# message, it must not serve client nodes. PM -> S.
START_OPERATION = 0x000b
# Tell a storage node to stop an operation. Once a storage node receives this message,
# it must not serve client nodes. PM -> S.
STOP_OPERATION = 0x000c
# Error codes. # Error codes.
NOT_READY_CODE = 1 NOT_READY_CODE = 1
...@@ -47,17 +95,27 @@ VALID_NODE_TYPE_LIST = (MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE) ...@@ -47,17 +95,27 @@ VALID_NODE_TYPE_LIST = (MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE)
# Node states. # Node states.
RUNNING_STATE = 0 RUNNING_STATE = 0
TEMPORARILY_DOWN_STATE = 2 TEMPORARILY_DOWN_STATE = 1
DOWN_STATE = 3 DOWN_STATE = 2
BROKEN_STATE = 4 BROKEN_STATE = 3
VALID_NODE_STATE_LIST = (RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, BROKEN_STATE) VALID_NODE_STATE_LIST = (RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, BROKEN_STATE)
# Partition cell states.
UP_TO_DATE_STATE = 0
OUT_OF_DATE_STATE = 1
FEEDING_STATE = 2
DISCARDED_STATE = 3
VALID_CELL_STATE_LIST = (UP_TO_DATE_STATE, OUT_OF_DATE_STATE, FEEDING_STATE,
DISCARDED_STATE)
# Other constants. # Other constants.
INVALID_UUID = '\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0' INVALID_UUID = '\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0'
INVALID_TID = '\0\0\0\0\0\0\0\0' INVALID_TID = '\0\0\0\0\0\0\0\0'
INVALID_SERIAL = '\0\0\0\0\0\0\0\0' INVALID_SERIAL = '\0\0\0\0\0\0\0\0'
INVALID_OID = '\0\0\0\0\0\0\0\0' INVALID_OID = '\0\0\0\0\0\0\0\0'
INVALID_PTID = '\0\0\0\0\0\0\0\0'
class ProtocolError(Exception): pass class ProtocolError(Exception): pass
...@@ -73,9 +131,7 @@ class Packet(object): ...@@ -73,9 +131,7 @@ class Packet(object):
# logging.debug('parsing %s', dump(msg)) # logging.debug('parsing %s', dump(msg))
if len(msg) < MIN_PACKET_SIZE: if len(msg) < MIN_PACKET_SIZE:
return None return None
msg_id, msg_type, msg_len, reserved = unpack('!HHLH', msg[:10]) msg_id, msg_type, msg_len = unpack('!LHL', msg[:10])
if reserved != 0:
raise ProtocolError(cls(msg_id, msg_type), 'reserved is non-zero')
if msg_len > MAX_PACKET_SIZE: if msg_len > MAX_PACKET_SIZE:
raise ProtocolError(cls(msg_id, msg_type), raise ProtocolError(cls(msg_id, msg_type),
'message too big (%d)' % msg_len) 'message too big (%d)' % msg_len)
...@@ -103,7 +159,7 @@ class Packet(object): ...@@ -103,7 +159,7 @@ class Packet(object):
# Encoders. # Encoders.
def encode(self): def encode(self):
msg = pack('!HHLH', self._id, self._type, 10 + len(self._body), 0) + self._body msg = pack('!LHL', self._id, self._type, 10 + len(self._body)) + self._body
if len(msg) > MAX_PACKET_SIZE: if len(msg) > MAX_PACKET_SIZE:
raise ProtocolError(self, 'message too big (%d)' % len(msg)) raise ProtocolError(self, 'message too big (%d)' % len(msg))
return msg return msg
...@@ -191,6 +247,70 @@ class Packet(object): ...@@ -191,6 +247,70 @@ class Packet(object):
self._body = ''.join(body) self._body = ''.join(body)
return self return self
def askLastIDs(self, msg_id):
self._id = msg_id
self._type = ASK_LAST_IDS
self._body = ''
return self
def answerLastIDs(self, msg_id, loid, ltid, lptid):
self._id = msg_id
self._type = ANSWER_LAST_IDS
self._body = loid + ltid + lptid
return self
def askPartitionTable(self, msg_id, offset_list):
self._id = msg_id
self._type = ASK_PARTITION_TABLE
body = [pack('!L', len(offset_list))]
for offset in offset_list:
body.append(pack('!L', offset))
self._body = ''.join(body)
return self
def answerPartitionTable(self, msg_id, ptid, row_list):
self._id = msg_id
self._type = ANSWER_PARTITION_TABLE
body = [pack('!8sL', ptid, len(row_list))]
for offset, cell_list in row_list:
body.append(pack('!LL', offset, len(cell_list)))
for uuid, state in cell_list:
body.append(pack('!16sH', uuid, state))
self._body = ''.join(body)
return self
def sendPartitionTable(self, msg_id, ptid, offset_list, row_list):
self._id = msg_id
self._type = SEND_PARTITION_TABLE
body = [pack('!8sL', ptid, len(row_list))]
for offset, cell_list in row_list:
body.append(pack('!LL', offset, len(cell_list)))
for uuid, state in cell_list:
body.append(pack('!16sH', uuid, state))
self._body = ''.join(body)
return self
def notifyPartitionChanges(self, msg_id, ptid, cell_list):
self._id = msg_id
self._type = NOTIFY_PARTITION_CHANGES
body = [pack('!8sL', ptid, len(cell_list))]
for offset, uuid, state in cell_list:
body.append(pack('!L16sH', offset, uuid, state))
self._body = ''.join(body)
return self
def startOperation(self, msg_id):
self._id = msg_id
self._type = START_OPERATION
self._body = ''
return self
def stopOperation(self, msg_id):
self._id = msg_id
self._type = START_OPERATION
self._body = ''
return self
# Decoders. # Decoders.
def decode(self): def decode(self):
try: try:
...@@ -294,3 +414,86 @@ class Packet(object): ...@@ -294,3 +414,86 @@ class Packet(object):
raise ProtocolError(self, 'invalid answer node information') raise ProtocolError(self, 'invalid answer node information')
return node_list return node_list
decode_table[NOTIFY_NODE_INFORMATION] = _decodeNotifyNodeInformation decode_table[NOTIFY_NODE_INFORMATION] = _decodeNotifyNodeInformation
def _decodeAskLastIDs(self):
pass
decode_table[ASK_LAST_IDS] = _decodeAskLastIDs
def _decodeAnswerLastIDs(self):
try:
loid, ltid, lptid = unpack('!8s8s8s', self._body)
except:
raise ProtocolError(self, 'invalid answer last ids')
return loid, ltid, lptid
decode_table[ANSWER_LAST_IDS] = _decodeAnswerLastIDs
def _decodeAskPartitionTable(self):
try:
n = unpack('!L', self._body[:4])
offset_list = []
for i in xrange(n):
offset = unpack('!L', self._body[4+i*4:8+i*4])
offset_list.append(offset)
except:
raise ProtocolError(self, 'invalid ask partition table')
return offset_list
decode_table[ASK_PARTITION_TABLE] = _decodeAskPartitionTable
def _decodeAnswerPartitionTable(self):
try:
ptid, n = unpack('!8sL', self._body[:12])
index = 12
row_list = []
cell_list = []
for i in xrange(n):
offset, m = unpack('!LL', self._body[index:index+8])
index += 8
for j in xrange(m):
cell = unpack('!16sH', self._body[index:index+18])
index += 18
cell_list.append(cell)
row_list.append((offset, cell_list))
del cell_list[:]
except:
raise ProtocolError(self, 'invalid answer partition table')
return row_list
decode_table[ANSWER_PARTITION_TABLE] = _decodeAnswerPartitionTable
def _decodeSendPartitionTable(self):
try:
ptid, n = unpack('!8sL', self._body[:12])
index = 12
row_list = []
cell_list = []
for i in xrange(n):
offset, m = unpack('!LL', self._body[index:index+8])
index += 8
for j in xrange(m):
cell = unpack('!16sH', self._body[index:index+18])
index += 18
cell_list.append(cell)
row_list.append((offset, cell_list))
del cell_list[:]
except:
raise ProtocolError(self, 'invalid send partition table')
return row_list
decode_table[SEND_PARTITION_TABLE] = _decodeSendPartitionTable
def _decodeNotifyPartitionChanges(self):
try:
ptid, n = unpack('!8sL', self._body[:12])
for i in xrange(n):
cell = unpack('!L16sH', self._body[12+i*22:34+i*22])
cell_list.append(cell)
except:
raise ProtocolError(self, 'invalid notify partition changes')
return cell_list
decode_table[NOTIFY_PARTITION_CHANGES] = _decodeNotifyPartitionChanges
def _decodeStartOperation(self):
pass
decode_table[START_OPERATION] = _decodeStartOperation
def _decodeStopOperation(self):
pass
decode_table[START_OPERATION] = _decodeStopOperation
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment