Commit fb998338 authored by Yoshinori Okuji's avatar Yoshinori Okuji

Add more protocols

git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@35 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent ef07cc94
......@@ -7,3 +7,7 @@ TODO
- Handling connection timeouts.
- Handling write timeouts.
- IdleEvent for a certain message type as well as a message ID
- Flushing write buffers only without reading packets
......@@ -2,3 +2,4 @@ class NeoException(Exception): pass
class ElectionFailure(NeoException): pass
class PrimaryFailure(NeoException): pass
class VerificationFailure(NeoException): pass
class OperationFailure(NeoException): pass
......@@ -10,7 +10,9 @@ from protocol import ERROR, REQUEST_NODE_IDENTIFICATION, ACCEPT_NODE_IDENTIFICAT
ANSWER_PARTITION_TABLE, SEND_PARTITION_TABLE, NOTIFY_PARTITION_CHANGES, \
ASK_UNFINISHED_TRANSACTIONS, ANSWER_UNFINISHED_TRANSACTIONS, \
ASK_OIDS_BY_TID, ANSWER_OIDS_BY_TID, ASK_OBJECT_PRESENT, ANSWER_OBJECT_PRESENT, \
DELETE_TRANSACTION, COMMIT_TRANSACTION, \
DELETE_TRANSACTION, COMMIT_TRANSACTION, ASK_NEW_TID, ANSWER_NEW_TID, \
FINISH_TRANSACTION, NOTIFY_TRANSACTION_FINISHED, LOCK_INFORMATION, \
NOTIFY_INFORMATION_LOCKED, INVALIDATE_OBJECTS, UNLOCK_INFORMATION, \
NOT_READY_CODE, OID_NOT_FOUND_CODE, SERIAL_NOT_FOUND_CODE, TID_NOT_FOUND_CODE, \
PROTOCOL_ERROR_CODE, TIMEOUT_ERROR_CODE, BROKEN_NODE_DISALLOWED_CODE, \
INTERNAL_ERROR_CODE
......@@ -176,6 +178,30 @@ class EventHandler(object):
def handleCommitTransaction(self, conn, packet, tid):
self.handleUnexpectedPacket(conn, packet)
def handleAskNewTID(self, conn, packet):
self.handleUnexpectedPacket(conn, packet)
def handleAnswerNewTID(self, conn, packet, tid):
self.handleUnexpectedPacket(conn, packet)
def handleFinishTransaction(self, conn, packet, oid_list, tid):
self.handleUnexpectedPacket(conn, packet)
def handleNotifyTransactionFinished(self, conn, packet, tid):
self.handleUnexpectedPacket(conn, packet)
def handleLockInformation(self, conn, packet, tid):
self.handleUnexpectedPacket(conn, packet)
def handleNotifyInformationLocked(self, conn, packet, tid):
self.handleUnexpectedPacket(conn, packet)
def handleInvalidateObjects(self, conn, packet, oid_list):
self.handleUnexpectedPacket(conn, packet)
def handleUnlockInformation(self, conn, packet, tid):
self.handleUnexpectedPacket(conn, packet)
# Error packet handlers.
......@@ -226,6 +252,14 @@ class EventHandler(object):
d[ANSWER_OBJECT_PRESENT] = self.handleAnswerObjectPresent
d[DELETE_TRANSACTION] = self.handleDeleteTransaction
d[COMMIT_TRANSACTION] = self.handleCommitTransaction
d[ASK_NEW_TID] = self.handleAskNewTID
d[ANSWER_NEW_TID] = self.handleAnswerNewTID
d[FINISH_TRANSACTION] = self.handleFinishTransaction
d[NOTIFY_TRANSACTION_FINISHED] = self.handleNotifyTransactionFinished
d[LOCK_INFORMATION] = self.handleLockInformation
d[NOTIFY_INFORMATION_LOCKED] = self.handleNotifyInformationLocked
d[INVALIDATE_OBJECTS] = self.handleInvalidateObjects
d[UNLOCK_INFORMATION] = self.handleUnlockInformation
self.packet_dispatch_table = d
......
......@@ -12,7 +12,8 @@ from neo.node import NodeManager, MasterNode, StorageNode, ClientNode
from neo.event import EventManager
from neo.util import dump
from neo.connection import ListeningConnection, ClientConnection, ServerConnection
from neo.exception import ElectionFailure, PrimaryFailure, VerificationFailure
from neo.exception import ElectionFailure, PrimaryFailure, VerificationFailure, \
OperationFailure
from neo.master.election import ElectionEventHandler
from neo.master.recovery import RecoveryEventHandler
from neo.master.verification import VerificationEventHandler
......@@ -596,6 +597,47 @@ class Application(object):
if cell_list:
app.broadcastPartitionChanges(self.getNextPartitionTableID(), cell_list)
def provideService(self):
"""This is the normal mode for a primary master node. Handle transactions
and stop the service only if a catastrophy happens or the user commits
a shutdown."""
logging.info('provide service')
handler = ServiceEventHandler()
em = self.em
nm = self.nm
# Make sure that every connection has the service event handler.
for conn in em.getConnectionList():
conn.setHandler(handler)
# Now storage nodes should know that the cluster is operational.
for conn in em.getConnectionList():
uuid = conn.getUUID()
if uuid is not None:
node = nm.getNodeByUUID(uuid)
if isinstance(node, StorageNode):
conn.addPacket(Packet().startOperation(conn.getNextId()))
# Now everything is passive.
while 1:
try:
em.poll(1)
except OperationFailure:
# If not operational, send Stop Operation packets to storage nodes
# and client nodes. Abort connections to client nodes.
logging.critical('No longer operational, so stopping the service')
for conn in em.getConnectionList():
uuid = conn.getUUID()
if uuid is not None:
node = nm.getNodeByUUID(uuid)
if isinstance(node, (StorageNode, ClientNode)):
conn.addPacket(Packet().stopOperation(conn.getNextId()))
if isinstance(node, ClientNode):
conn.abort()
# Then, go back, and restart.
return
def playPrimaryRole(self):
logging.info('play the primary role')
......@@ -606,17 +648,17 @@ class Application(object):
if node.getState() == RUNNING_STATE:
node.setState(TEMPORARILY_DOWN_STATE)
recovering = True
while recovering:
self.recoverStatus()
recovering = False
try:
self.verifyData()
except VerificationFailure:
recovering = True
# FIXME start a real operation
raise NotImplementedError
while 1:
recovering = True
while recovering:
self.recoverStatus()
recovering = False
try:
self.verifyData()
except VerificationFailure:
recovering = True
self.provideService()
def playSecondaryRole(self):
logging.info('play the secondary role')
......
......@@ -100,6 +100,30 @@ DELETE_TRANSACTION = 0x0010
# Commit a transaction. PM -> S.
COMMIT_TRANSACTION = 0x0011
# Ask a new transaction ID. C -> PM.
ASK_NEW_TID = 0x0012
# Answer a new transaction ID. PM -> C.
ANSWER_NEW_TID = 0x8012
# Finish a transaction. C -> PM.
FINISH_TRANSACTION = 0x0013
# Notify a transaction finished. PM -> C.
NOTIFY_TRANSACTION_FINISHED = 0x8013
# Lock information on a transaction. PM -> S.
LOCK_INFORMATION = 0x0014
# Notify information on a transaction locked. S -> PM.
NOTIFY_INFORMATION_LOCKED = 0x8014
# Invalidate objects. PM -> C.
INVALIDATE_OBJECTS = 0x0015
# Unlock information on a transaction. PM -> S.
UNLOCK_INFORMATION = 0x0016
# Error codes.
NOT_READY_CODE = 1
......@@ -388,6 +412,59 @@ class Packet(object):
self._body = tid
return self
def askNewTID(self, msg_id):
self._id = msg_id
self._type = ASK_NEW_TID
self._body = ''
return self
def answerNewTID(self, msg_id, tid):
self._id = msg_id
self._type = ANSWER_NEW_TID
self._body = tid
return self
def finishTransaction(self, msg_id, oid_list, tid):
self._id = msg_id
self._type = FINISH_TRANSACTION
body = [pack('!8sL', tid, len(oid_list))]
body.extend(oid_list)
self._body = ''.join(body)
return self
def notifyTransactionFinished(self, msg_id, tid):
self._id = msg_id
self._type = NOTIFY_TRANSACTION_FINISHED
self._body = tid
return self
def lockInformation(self, msg_id, tid):
self._id = msg_id
self._type = LOCK_INFORMATION
self._body = tid
return self
def notifyInformationLocked(self, msg_id, tid):
self._id = msg_id
self._type = NOTIFY_INFORMATION_LOCKED
self._body = tid
return self
def invalidateObjects(self, msg_id, oid_list):
self._id = msg_id
self._type = INVALIDATE_OBJECTS
body = [pack('!L', len(oid_list))]
body.extend(oid_list)
self._body = ''.join(body)
return self
def unlockInformation(self, msg_id, tid):
self._id = msg_id
self._type = UNLOCK_INFORMATION
self._body = tid
return self
# Decoders.
def decode(self):
try:
......@@ -642,3 +719,72 @@ class Packet(object):
raise ProtocolError(self, 'invalid commit transaction')
return tid
decode_table[COMMIT_TRANSACTION] = _decodeCommitTransaction
def _decodeAskNewTID(self):
pass
decode_table[ASK_NEW_TID] = _decodeAskNewTID
def _decodeAnswerNewTID(self):
try:
tid = unpack('8s', self._body)
except:
raise ProtocolError(self, 'invalid answer new tid')
return tid
decode_table[ANSWER_NEW_TID] = _decodeAnswerNewTID
def _decodeFinishTransaction(self):
try:
tid, n = unpack('!8sL', self._body[:12])
oid_list = []
for i in xrange(n):
oid = unpack('8s', self._body[12+i*8:20+i*8])
oid_list.append(oid)
except:
raise ProtocolError(self, 'invalid finish transaction')
return oid_list, tid
decode_table[FINISH_TRANSACTION] = _decodeFinishTransaction
def _decodeNotifyTransactionFinished(self):
try:
tid = unpack('8s', self._body)
except:
raise ProtocolError(self, 'invalid notify transactin finished')
return tid
decode_table[NOTIFY_TRANSACTION_FINISHED] = _decodeNotifyTransactionFinished
def _decodeLockInformation(self):
try:
tid = unpack('8s', self._body)
except:
raise ProtocolError(self, 'invalid lock information')
return tid
decode_table[LOCK_INFORMATION] = _decodeLockInformation
def _decodeNotifyInformationLocked(self):
try:
tid = unpack('8s', self._body)
except:
raise ProtocolError(self, 'invalid notify information locked')
return tid
decode_table[NOTIFY_INFORMATION_LOCKED] = _decodeNotifyInformationLocked
def _decodeInvalidateObjects(self):
try:
n = unpack('!L', self._body[:4])
oid_list = []
for i in xrange(n):
oid = unpack('8s', self._body[4+i*8:12+i*8])
oid_list.append(oid)
except:
raise ProtocolError(self, 'invalid finish transaction')
return oid_list
decode_table[INVALIDATE_OBJECTS] = _decodeInvalidateObjects
def _decodeUnlockInformation(self):
try:
tid = unpack('8s', self._body)
except:
raise ProtocolError(self, 'invalid unlock information')
return tid
decode_table[UNLOCK_INFORMATION] = _decodeUnlockInformation
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment