Commit 18e037c4 authored by Yoshinori Okuji's avatar Yoshinori Okuji

Not tested enough, but the primary election is implemented.

git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype2@18 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent bb0e59a7
...@@ -130,7 +130,6 @@ class Connection: ...@@ -130,7 +130,6 @@ class Connection:
for event in self.event_dict.itervalues(): for event in self.event_dict.itervalues():
self.cm.removeIdleEvent(event) self.cm.removeIdleEvent(event)
self.event_dict.clear() self.event_dict.clear()
self.connectionClosed()
def abort(self): def abort(self):
"""Abort dealing with this connection.""" """Abort dealing with this connection."""
...@@ -194,7 +193,7 @@ class Connection: ...@@ -194,7 +193,7 @@ class Connection:
pass pass
self.packetReceived(packet) self.packetReceived(packet)
msg = msg[len(packet)] msg = msg[len(packet):]
if msg: if msg:
self.read_buf = [msg] self.read_buf = [msg]
...@@ -211,12 +210,18 @@ class Connection: ...@@ -211,12 +210,18 @@ class Connection:
r = s.recv(4096) r = s.recv(4096)
if not r: if not r:
logging.error('cannot read') logging.error('cannot read')
self.connectionClosed()
self.close() self.close()
else: else:
self.read_buf.append(r) self.read_buf.append(r)
except socket.error, m: except socket.error, m:
if m[0] == errno.EAGAIN: if m[0] == errno.EAGAIN:
return [] pass
elif m[0] == errno.ECONNRESET:
logging.error('cannot read')
self.connectionClosed()
self.close()
else:
raise raise
def send(self): def send(self):
...@@ -231,6 +236,7 @@ class Connection: ...@@ -231,6 +236,7 @@ class Connection:
r = s.send(msg) r = s.send(msg)
if not r: if not r:
logging.error('cannot write') logging.error('cannot write')
self.connectionClosed()
self.close() self.close()
elif r == len(msg): elif r == len(msg):
del self.write_buf[:] del self.write_buf[:]
...@@ -309,9 +315,10 @@ class Connection: ...@@ -309,9 +315,10 @@ class Connection:
def packetMalformed(self, packet, error_message): def packetMalformed(self, packet, error_message):
"""Called when a packet is malformed.""" """Called when a packet is malformed."""
self.peerBroken() logging.info('malformed packet: %s', error_message)
self.addPacket(Packet().protocolError(packet.getId(), error_message)) self.addPacket(Packet().protocolError(packet.getId(), error_message))
self.abort() self.abort()
self.peerBroken()
class ConnectionManager: class ConnectionManager:
"""This class manages connections and sockets.""" """This class manages connections and sockets."""
...@@ -337,6 +344,9 @@ class ConnectionManager: ...@@ -337,6 +344,9 @@ class ConnectionManager:
self.listening_socket = s self.listening_socket = s
self.reader_set.add(s) self.reader_set.add(s)
def getConnectionList(self):
return self.connection_dict.values()
def register(self, conn): def register(self, conn):
self.connection_dict[conn.getSocket()] = conn self.connection_dict[conn.getSocket()] = conn
...@@ -355,9 +365,11 @@ class ConnectionManager: ...@@ -355,9 +365,11 @@ class ConnectionManager:
for s in rlist: for s in rlist:
if s == self.listening_socket: if s == self.listening_socket:
try: try:
conn, addr = s.accept() new_s, addr = s.accept()
logging.info('accepted a connection from %s:%d', addr[0], addr[1]) logging.info('accepted a connection from %s:%d', addr[0], addr[1])
self.register(self.klass(self, conn, addr)) conn = self.klass(self, new_s, addr)
self.register(conn)
conn.connectionAccepted()
except socket.error, m: except socket.error, m:
if m[0] == errno.EAGAIN: if m[0] == errno.EAGAIN:
continue continue
...@@ -377,7 +389,7 @@ class ConnectionManager: ...@@ -377,7 +389,7 @@ class ConnectionManager:
t = time() t = time()
if t - self.prev_time >= 1: if t - self.prev_time >= 1:
self.prev_time = t self.prev_time = t
event_list.sort(key = lambda event: event.getTimeout()) event_list.sort(key = lambda event: event.getTime())
for event in tuple(event_list): for event in tuple(event_list):
if event(t): if event(t):
event_list.pop(0) event_list.pop(0)
...@@ -397,7 +409,7 @@ class ConnectionManager: ...@@ -397,7 +409,7 @@ class ConnectionManager:
self.reader_set.add(s) self.reader_set.add(s)
def removeReader(self, s): def removeReader(self, s):
self.read_set.discard(s) self.reader_set.discard(s)
def addWriter(self, s): def addWriter(self, s):
self.writer_set.add(s) self.writer_set.add(s)
......
import logging import logging
import MySQLdb import MySQLdb
import os import os
from socket import inet_aton
from time import time
from connection import ConnectionManager from connection import ConnectionManager
from connection import Connection as BaseConnection from connection import Connection as BaseConnection
from database import DatabaseManager from database import DatabaseManager
from config import ConfigurationManager from config import ConfigurationManager
from protocol import Packet, MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE from protocol import Packet, ProtocolError, \
MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \
INVALID_UUID, INVALID_TID, INVALID_OID, \
PROTOCOL_ERROR_CODE, TIMEOUT_ERROR_CODE, BROKEN_NODE_DISALLOWED_CODE, \
INTERNAL_ERROR_CODE, \
ERROR, REQUEST_NODE_IDENTIFICATION, ACCEPT_NODE_IDENTIFICATION, \
PING, PONG, ASK_PRIMARY_MASTER, ANSWER_PRIMARY_MASTER, \
ANNOUNCE_PRIMARY_MASTER, REELECT_PRIMARY_MASTER
from node import NodeManager, MasterNode, StorageNode, ClientNode, \ from node import NodeManager, MasterNode, StorageNode, ClientNode, \
RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, BROKEN_STATE RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, BROKEN_STATE
from util import dump
class ElectionFailure(Exception): pass
class PrimaryFailure(Exception): pass
class Connection(BaseConnection): class Connection(BaseConnection):
"""This class provides a master-specific connection.""" """This class provides a master-specific connection."""
_uuid = None
def setUUID(self, uuid):
self._uuid = uuid
def getUUID(self):
return self._uuid
# Feed callbacks to the master node. # Feed callbacks to the master node.
def connectionFailed(self): def connectionFailed(self):
self.cm.app.connectionFailed(self) self.cm.app.connectionFailed(self)
...@@ -42,7 +64,7 @@ class Connection(BaseConnection): ...@@ -42,7 +64,7 @@ class Connection(BaseConnection):
self.cm.app.peerBroken(self) self.cm.app.peerBroken(self)
BaseConnection.peerBroken(self) BaseConnection.peerBroken(self)
class Application: class Application(object):
"""The master node application.""" """The master node application."""
def __init__(self, file, section): def __init__(self, file, section):
...@@ -73,6 +95,14 @@ class Application: ...@@ -73,6 +95,14 @@ class Application:
self.cm = ConnectionManager(app = self, connection_klass = Connection) self.cm = ConnectionManager(app = self, connection_klass = Connection)
self.nm = NodeManager() self.nm = NodeManager()
self.primary = None
self.primary_master_node = None
# Co-operative threads. Simulated by generators.
self.thread_dict = {}
self.server_thread_method = None
self.event = None
def initializeDatabase(self): def initializeDatabase(self):
"""Initialize a database by recreating all the tables. """Initialize a database by recreating all the tables.
...@@ -87,18 +117,25 @@ class Application: ...@@ -87,18 +117,25 @@ class Application:
q("""CREATE TABLE loid ( q("""CREATE TABLE loid (
oid BINARY(8) NOT NULL oid BINARY(8) NOT NULL
) ENGINE = InnoDB COMMENT = 'Last Object ID'""") ) ENGINE = InnoDB COMMENT = 'Last Object ID'""")
q("""INSERT loid VALUES ('%s')""" % e('\0\0\0\0\0\0\0\0')) q("""INSERT loid VALUES ('%s')""" % e(INVALID_OID))
q("""CREATE TABLE ltid ( q("""CREATE TABLE ltid (
tid BINARY(8) NOT NULL tid BINARY(8) NOT NULL
) ENGINE = InnoDB COMMENT = 'Last Transaction ID'""") ) ENGINE = InnoDB COMMENT = 'Last Transaction ID'""")
q("""INSERT ltid VALUES ('%s')""" % e('\0\0\0\0\0\0\0\0')) q("""INSERT ltid VALUES ('%s')""" % e(INVALID_TID))
q("""CREATE TABLE self ( q("""CREATE TABLE self (
uuid BINARY(16) NOT NULL uuid BINARY(16) NOT NULL
) ENGINE = InnoDB COMMENT = 'UUID'""") ) ENGINE = InnoDB COMMENT = 'UUID'""")
# XXX Generate an UUID for self. For now, just use a random string. # XXX Generate an UUID for self. For now, just use a random string.
q("""INSERT self VALUES ('%s')""" % e(os.urandom(16))) # Avoid an invalid UUID.
while 1:
uuid = os.urandom(16)
if uuid != INVALID_UUID:
break
q("""INSERT self VALUES ('%s')""" % e(uuid))
q("""CREATE TABLE stn ( q("""CREATE TABLE stn (
nid INT UNSIGNED NOT NULL UNIQUE, nid INT UNSIGNED NOT NULL UNIQUE,
...@@ -120,19 +157,19 @@ class Application: ...@@ -120,19 +157,19 @@ class Application:
if len(result) != 1: if len(result) != 1:
raise RuntimeError, 'the table loid has %d rows' % len(result) raise RuntimeError, 'the table loid has %d rows' % len(result)
self.loid = result[0][0] self.loid = result[0][0]
logging.info('the last OID is %r' % self.loid) logging.info('the last OID is %r' % dump(self.loid))
result = q("""SELECT tid FROM ltid""") result = q("""SELECT tid FROM ltid""")
if len(result) != 1: if len(result) != 1:
raise RuntimeError, 'the table ltid has %d rows' % len(result) raise RuntimeError, 'the table ltid has %d rows' % len(result)
self.ltid = result[0][0] self.ltid = result[0][0]
logging.info('the last TID is %r' % self.ltid) logging.info('the last TID is %r' % dump(self.ltid))
result = q("""SELECT uuid FROM self""") result = q("""SELECT uuid FROM self""")
if len(result) != 1: if len(result) != 1:
raise RuntimeError, 'the table self has %d rows' % len(result) raise RuntimeError, 'the table self has %d rows' % len(result)
self.uuid = result[0][0] self.uuid = result[0][0]
logging.info('the UUID is %r' % self.uuid) logging.info('the UUID is %r' % dump(self.uuid))
# FIXME load storage and partition information here. # FIXME load storage and partition information here.
...@@ -166,84 +203,392 @@ class Application: ...@@ -166,84 +203,392 @@ class Application:
# Start the election of a primary master node. # Start the election of a primary master node.
self.electPrimary() self.electPrimary()
# Start a normal operation.
while 1:
try:
if self.primary:
self.playPrimaryRole()
else:
self.playSecondaryRole()
except (ElectionFailure, PrimaryFailure):
self.electPrimary(bootstrap = False)
CONNECTION_FAILED = 'connection failed'
def connectionFailed(self, conn): def connectionFailed(self, conn):
# The connection failed, so I must attempt to retry. addr = (conn.ip_address, conn.port)
self.unconnected_master_node_list.append((conn.ip_address, conn.port)) t = self.thread_dict[addr]
self.event = (self.CONNECTION_FAILED, conn)
try:
t.next()
except StopIteration:
del self.thread_dict[addr]
CONNECTION_COMPLETED = 'connection completed'
def connectionCompleted(self, conn): def connectionCompleted(self, conn):
self.connecting_master_node_list.remove((conn.ip_address, conn.port)) addr = (conn.ip_address, conn.port)
p = Packet() t = self.thread_dict[addr]
msg_id = conn.getNextId() self.event = (self.CONNECTION_COMPLETED, conn)
p.requestNodeIdentification(msg_id, MASTER_NODE_TYPE, self.uuid, try:
self.ip_address, self.port, self.name) t.next()
conn.addPacket(p) except StopIteration:
conn.expectMessage(msg_id) del self.thread_dict[addr]
CONNECTION_CLOSED = 'connection closed'
def connectionClosed(self, conn): def connectionClosed(self, conn):
pass addr = (conn.ip_address, conn.port)
t = self.thread_dict[addr]
self.event = (self.CONNECTION_CLOSED, conn)
try:
t.next()
except StopIteration:
del self.thread_dict[addr]
CONNECTION_ACCEPTED = 'connection accepted'
def connectionAccepted(self, conn): def connectionAccepted(self, conn):
pass addr = (conn.ip_address, conn.port)
logging.debug('making a server thread for %s:%d', conn.ip_address, conn.port)
t = self.server_thread_method()
self.thread_dict[addr] = t
self.event = (self.CONNECTION_ACCEPTED, conn)
try:
t.next()
except StopIteration:
del self.thread_dict[addr]
TIMEOUT_EXPIRED = 'timeout expired'
def timeoutExpired(self, conn): def timeoutExpired(self, conn):
pass addr = (conn.ip_address, conn.port)
t = self.thread_dict[addr]
self.event = (self.TIMEOUT_EXPIRED, conn)
try:
t.next()
except StopIteration:
del self.thread_dict[addr]
PEER_BROKEN = 'peer broken'
def peerBroken(self, conn): def peerBroken(self, conn):
pass addr = (conn.ip_address, conn.port)
t = self.thread_dict[addr]
self.event = (self.PEER_BROKEN, conn)
try:
t.next()
except StopIteration:
del self.thread_dict[addr]
PACKET_RECEIVED = 'packet received'
def packetReceived(self, conn, packet): def packetReceived(self, conn, packet):
addr = (conn.ip_address, conn.port)
t = self.thread_dict[addr]
self.event = (self.PACKET_RECEIVED, conn, packet)
try: try:
# If the packet is an error, deal with it as much as possible here. t.next()
except StopIteration:
del self.thread_dict[addr]
def electPrimaryClientIterator(self):
"""Handle events for a client connection."""
# The first event. This must be a connection failure or connection completion.
# Keep the Connection object and the server address only at this time,
# because they never change in this context.
method, conn = self.event[0], self.event[1]
logging.debug('method is %r, conn is %s:%d', method, conn.ip_address, conn.port)
serv = (conn.ip_address, conn.port)
node = self.nm.getNodeByServer(*serv)
if node is None:
raise RuntimeError, 'attempted to connect to an unknown node'
if not isinstance(node, MasterNode):
raise RuntimeError, 'should not happen'
if method is self.CONNECTION_FAILED:
self.negotiating_master_node_set.discard(serv)
self.unconnected_master_node_set.add(serv)
if node.getState() not in (DOWN_STATE, BROKEN_STATE):
node.setState(TEMPORARILY_DOWN_STATE)
return
elif method is self.CONNECTION_COMPLETED:
self.negotiating_master_node_set.add(serv)
# Request a node idenfitication.
p = Packet()
msg_id = conn.getNextId()
p.requestNodeIdentification(msg_id, MASTER_NODE_TYPE, self.uuid,
self.ip_address, self.port, self.name)
conn.addPacket(p)
conn.expectMessage(msg_id)
else:
raise RuntimeError, 'unexpected event %r' % (method,)
while 1:
# Wait for next event.
yield None
method = self.event[0]
logging.debug('method is %r, conn is %s:%d', method, conn.ip_address, conn.port)
if method in (self.CONNECTION_CLOSED, self.TIMEOUT_EXPIRED):
self.negotiating_master_node_set.discard(serv)
self.unconnected_master_node_set.add(serv)
if node.getState() not in (DOWN_STATE, BROKEN_STATE):
node.setState(TEMPORARILY_DOWN_STATE)
return
elif method is self.PEER_BROKEN:
self.negotiating_master_node_set.discard(serv)
# For now, do not use BROKEN_STATE, because the effect is unknown
# when a node was buggy and restarted immediately.
node.setState(DOWN_STATE)
return
elif method is self.PACKET_RECEIVED:
if node.getState() != BROKEN_STATE:
node.setState(RUNNING_STATE)
packet = self.event[2]
t = packet.getType() t = packet.getType()
try:
if t == ERROR: if t == ERROR:
code, msg = packet.decode() code, msg = packet.decode()
if code in (PROTOCOL_ERROR_CODE, TIMEOUT_ERROR_CODE, BROKEN_NODE_DISALLOWED_CODE): if code in (PROTOCOL_ERROR_CODE, TIMEOUT_ERROR_CODE,
BROKEN_NODE_DISALLOWED_CODE):
# In those cases, it is better to assume that I am unusable. # In those cases, it is better to assume that I am unusable.
logging.critical(msg) logging.critical(msg)
raise RuntimeError, msg raise RuntimeError, msg
elif t == PING: else:
self.addPacket(Packet().pong(packet.getId())) # Otherwise, the peer has an error.
logging.error('an error happened at the peer %s:%d',
conn.ip_address, conn.port)
node.setState(DOWN_STATE)
self.negotiating_master_node_set.discard(serv)
conn.close()
return return
elif t == PING:
logging.info('got a keep-alive message from %s:%d; overloaded?',
conn.ip_address, conn.port)
conn.addPacket(Packet().pong(packet.getId()))
elif t == PONG: elif t == PONG:
pass
elif t == ACCEPT_NODE_IDENTIFICATION:
node_type, uuid, ip_address, port = packet.decode()
if node_type != MASTER_NODE_TYPE:
# Why? Isn't this a master node?
self.nm.remove(node)
self.negotiating_master_node_set.discard(serv)
conn.close()
return return
if self.ready: conn.setUUID(uuid)
self.cm.app.packetReceived(self, packet) node.setUUID(uuid)
# Ask a primary master.
msg_id = conn.getNextId()
p = Packet()
conn.addPacket(p.askPrimaryMaster(msg_id, self.ltid, self.loid))
conn.expectMessage(msg_id)
elif t == ANSWER_PRIMARY_MASTER:
ltid, loid, primary_uuid, known_master_list = packet.decode()
# Register new master nodes.
for ip_address, port, uuid in known_master_list:
if self.ip_address == ip_address and self.port == port:
# This is self.
continue
else: else:
if self.from_self: n = self.nm.getNodeByServer(ip_address, port)
if t == ACCEPT_NODE_IDENTIFICATION: if n is None:
node_type, uuid, ip_address, port = packet.decode() n = MasterNode(ip_address, port)
self.node_type = node_type self.nm.add(n)
self.uuid = node_type self.unconnected_master_node_set.add((ip_address, port))
self.ip_address = ip_address if uuid != INVALID_UUID:
self.port = port n.setUUID(uuid)
self.ready = True elif uuid != INVALID_UUID:
# If I don't know the UUID yet, believe what the peer
# told me at the moment.
if n.getUUID() is None:
n.setUUID(uuid)
if primary_uuid != INVALID_UUID:
# The primary master is defined.
if self.primary_master_node is not None \
and self.primary_master_node.getUUID() != primary_uuid:
# There are multiple primary master nodes. This is
# dangerous.
raise ElectionFailure, 'multiple primary master nodes'
primary_node = self.nm.getNodeByUUID(primary_uuid)
if primary_node is None:
# I don't know such a node. Probably this information
# is old. So ignore it.
pass
else:
if node.getUUID() == primary_uuid:
if self.ltid <= ltid and self.loid <= loid:
# This one is good.
self.primary = False
self.primary_master_node = node
else:
# Not nice. I am newer. If the primary master is
# already serving, the situation is catastrophic.
# In this case, it will shutdown the cluster.
# Otherwise, I can be a new primary master, so
# continue this job.
pass
else:
# I will continue this, until I find the primary
# master.
pass
else:
if self.ltid < ltid or self.loid < loid \
or inet_aton(self.ip_address) > inet_aton(ip_address) \
or self.port > port:
# I lost.
self.primary = False
else:
# I won.
pass
self.negotiating_master_node_set.discard(serv)
else: else:
raise ProtocolError(packet, 'unexpected packet 0x%x' % t) raise ProtocolError(packet, 'unexpected packet 0x%x' % t)
except ProtocolError, m:
logging.debug('protocol problem %s', m[1])
conn.addPacket(Packet().protocolError(m[0].getId(), m[1]))
conn.abort()
self.negotiating_master_node_set.discard(serv)
self.unconnected_master_node_set.add(serv)
else: else:
if t == REQUEST_NODE_IDENTIFICATION: raise RuntimeError, 'unexpected event %r' % (method,)
def electPrimaryServerIterator(self):
"""Handle events for a server connection."""
# The first event. This must be a connection acception.
method, conn = self.event[0], self.event[1]
logging.debug('method is %r, conn is %s:%d', method, conn.ip_address, conn.port)
serv = (conn.ip_address, conn.port)
node = None
if method is self.CONNECTION_ACCEPTED:
# Nothing to do at the moment. The timeout handling is done in
# the connection itself.
pass
else:
raise RuntimeError, 'unexpected event %r' % (method,)
while 1:
# Wait for next event.
yield None
method = self.event[0]
logging.debug('method is %r, conn is %s:%d', method, conn.ip_address, conn.port)
if method in (self.CONNECTION_CLOSED, self.TIMEOUT_EXPIRED):
return
elif method is self.PEER_BROKEN:
if node is not None:
if isinstance(node, MasterNode):
node.setState(DOWN_STATE)
elif isinstance(node, (ClientNode, StorageNode)):
node.setState(BROKEN_STATE)
return
elif method is self.PACKET_RECEIVED:
if node is not None and node.getState() != BROKEN_STATE:
node.setState(RUNNING_STATE)
packet = self.event[2]
t = packet.getType()
try:
if t == ERROR:
code, msg = packet.decode()
if code in (PROTOCOL_ERROR_CODE, TIMEOUT_ERROR_CODE,
BROKEN_NODE_DISALLOWED_CODE):
# In those cases, it is better to assume that I am unusable.
logging.critical(msg)
raise RuntimeError, msg
else:
# Otherwise, the peer has an error.
logging.error('an error happened at the peer %s:%d',
conn.ip_address, conn.port)
if node is not None:
node.setState(BROKEN_STATE)
conn.close()
return
elif t == PING:
logging.info('got a keep-alive message from %s:%d; overloaded?',
conn.ip_address, conn.port)
conn.addPacket(Packet().pong(packet.getId()))
elif t == PONG:
pass
elif t == REQUEST_NODE_IDENTIFICATION:
node_type, uuid, ip_address, port, name = packet.decode() node_type, uuid, ip_address, port, name = packet.decode()
self.node_type = node_type if node_type != MASTER_NODE_TYPE:
self.uuid = uuid logging.info('reject a connection from a non-master')
self.ip_address = ip_address conn.addPacket(Packet().notReady(packet.getId(),
self.port = port 'retry later'))
self.name = name conn.abort()
if self.cm.app.verifyNodeIdentification(self): continue
self.cm.app.acceptNodeIdentification(self) if name != self.name:
self.ready = True logging.info('reject an alien cluster')
conn.addPacket(Packet().protocolError(packet.getId(),
'invalid cluster name'))
conn.abort()
continue
node = self.nm.getNodeByServer(ip_address, port)
if node is None:
node = MasterNode(ip_address, port, uuid)
self.nm.add(node)
self.unconnected_master_node_set.add((ip_address, port))
else: else:
self.addPacket(Packet().protocolError(packet.getId(), # Trust the UUID sent by the peer.
'invalid identification')) if node.getUUID() != uuid:
self.abort() node.setUUID(uuid)
conn.setUUID(uuid)
p = Packet()
p.acceptNodeIdentification(packet.getId(), MASTER_NODE_TYPE,
self.uuid, self.ip_address, self.port)
conn.addPacket(p)
conn.expectMessage()
elif t == ASK_PRIMARY_MASTER:
if node is None:
raise ProtocolError(packet, 'not identified')
ltid, loid = packet.decode()
p = Packet()
if self.primary:
uuid = self.uuid
elif self.primary_master_node is not None:
uuid = self.primary_master_node.getUUID()
else: else:
raise ProtocolError(packet, 'unexpected packet 0x%x' % t) uuid = INVALID_UUID
known_master_list = []
for n in self.nm.getMasterNodeList():
info = n.getServer() + (n.getUUID() or INVALID_UUID,)
known_master_list.append(info)
p.answerPrimaryMaster(packet.getId(), self.ltid, self.loid,
uuid, known_master_list)
conn.addPacket(p)
if self.ready: if self.primary and (self.ltid < ltid or self.loid < loid):
self.connectionReady() # I am not really primary... So restart the election.
raise ElectionFailure, 'not a primary master any longer'
elif t == ANNOUNCE_PRIMARY_MASTER:
if node is None:
raise ProtocolError(packet, 'not identified')
if self.primary:
# I am also the primary... So restart the election.
raise ElectionFailure, 'another primary arises'
self.primary = False
self.primary_master_node = node
elif t == REELECT_PRIMARY_MASTER:
raise ElectionFailure, 'reelection requested'
else:
raise ProtocolError(packet, 'unexpected packet 0x%x' % t)
except ProtocolError, m: except ProtocolError, m:
self.packetMalformed(*m) logging.debug('protocol problem %s', m[1])
conn.addPacket(Packet().protocolError(m[0].getId(), m[1]))
conn.abort()
else:
raise RuntimeError, 'unexpected event %r' % (method,)
def electPrimary(self): def electPrimary(self, bootstrap = True):
"""Elect a primary master node. """Elect a primary master node.
The difficulty is that a master node must accept connections from The difficulty is that a master node must accept connections from
...@@ -252,14 +597,148 @@ class Application: ...@@ -252,14 +597,148 @@ class Application:
to self as well as master nodes.""" to self as well as master nodes."""
logging.info('begin the election of a primary master') logging.info('begin the election of a primary master')
self.unconnected_master_node_list = list(self.master_node_list) self.server_thread_method = self.electPrimaryServerIterator
self.connecting_master_node_list = []
self.unconnected_master_node_set = set()
self.negotiating_master_node_set = set()
cm = self.cm cm = self.cm
nm = self.nm
while 1: while 1:
for node in list(self.unconnected_master_node_list): t = 0
self.unconnected_master_node_list.remove(node) self.primary = None
self.connecting_master_node_list.append(node) self.primary_master_node = None
for node in nm.getMasterNodeList():
if node.getState() in (RUNNING_STATE, TEMPORARILY_DOWN_STATE):
self.unconnected_master_node_set.add(node.getServer())
self.negotiating_master_node_set.clear()
try:
while 1:
cm.poll(1)
current_time = time()
if current_time >= t + 1:
t = current_time
# Expire temporarily down nodes. For now, assume that a node
# which is down for 60 seconds is really down, if this is a
# bootstrap. 60 seconds may sound too long, but this is reasonable
# when rebooting many cluster machines. Otherwise, wait for only
# 10 seconds, because stopping the whole cluster for a long time
# is a bad idea.
if bootstrap:
expiration = 60
else:
expiration = 10
for node in nm.getMasterNodeList():
if node.getState() == TEMPORARILY_DOWN_STATE \
and node.getLastStateChange() + expiration < current_time:
logging.info('%s:%d is down' % node.getServer())
node.setState(DOWN_STATE)
self.unconnected_master_node_set.discard(node.getServer())
# Try to connect to master nodes.
if self.unconnected_master_node_set:
for node in list(self.unconnected_master_node_set):
self.unconnected_master_node_set.remove(node)
self.negotiating_master_node_set.add(node)
client = self.electPrimaryClientIterator()
self.thread_dict[node] = client
cm.connect(*node) cm.connect(*node)
if len(self.unconnected_master_node_set) == 0 \
and len(self.negotiating_master_node_set) == 0:
break
# Now there are three situations:
# - I am the primary master
# - I am secondary but don't know who is primary
# - I am secondary and know who is primary
if self.primary is None:
# I am the primary.
self.primary = True
for conn in cm.getConnectionList():
if conn.from_self:
p = Packet().announcePrimaryMaster(conn.getNextId())
conn.addPacket(p)
conn.abort()
closed = False
t = time()
while not closed:
cm.poll(1) cm.poll(1)
closed = True
for conn in cm.getConnectionList():
if conn.from_self:
closed = False
if t + 10 < time():
for conn in cm.getConnectionList():
if conn.from_self:
del self.thread_dict[(conn.ip_address, conn.port)]
conn.close()
closed = True
else:
# Wait for an announcement. If this is too long, probably
# the primary master is down.
t = time()
while self.primary_master_node is None:
cm.poll(1)
if t + 10 < time():
raise ElectionFailure, 'no primary master elected'
# Now I need only a connection to the primary master node.
primary = self.primary_master_node
for conn in cm.getConnectionList():
if not conn.from_self \
or primary.getServer() != (conn.ip_address, conn.port):
del self.thread_dict[(conn.ip_address, conn.port)]
conn.close()
# But if there is no such connection, something wrong happened.
for conn in cm.getConnectionList():
if conn.from_self \
and primary.getServer() == (conn.ip_address, conn.port):
break
else:
raise ElectionFailure, 'no connection remains to the primary'
return
except ElectionFailure:
# Ask all connected nodes to reelect a single primary master.
for conn in cm.getConnectionList():
if conn.from_self:
conn.addPacket(Packet().reelectPrimaryMaster(conn.getNextId()))
conn.abort()
# Wait until the connections are closed.
self.primary = None
self.primary_master_node = None
closed = False
t = time()
while not closed:
try:
cm.poll(1)
except ElectionFailure:
pass
closed = True
for conn in cm.getConnectionList():
if conn.from_self:
# Still not closed.
closed = Falsed
break
if time() > t + 10:
# If too long, do not wait.
break
# Close all connections.
for conn in cm.getConnectionList():
conn.close()
self.thread_dict.clear()
bootstrap = False
def playPrimaryRole(self):
logging.info('play the primary role')
raise NotImplementedError
def playSecondaryRole(self):
logging.info('play the secondary role')
raise NotImplementedError
[DEFAULTS] [DEFAULT]
#master_nodes: master_nodes: 127.0.0.1:10010 127.0.0.1:10011
#replicas: 1 #replicas: 1
#partitions: 1009 #partitions: 1009
#name: main #name: main
[master] [master1]
#database: test database: master1
#user: test user: neo
#password: #password:
#server: 127.0.0.1:10010 server: 127.0.0.1:10010
[master2]
database: master2
user: neo
server: 127.0.0.1:10011
from time import time
RUNNING_STATE = 0 RUNNING_STATE = 0
TEMPORARILY_DOWN_STATE = 2 TEMPORARILY_DOWN_STATE = 2
DOWN_STATE = 3 DOWN_STATE = 3
...@@ -12,14 +14,26 @@ class Node(object): ...@@ -12,14 +14,26 @@ class Node(object):
self.port = port self.port = port
self.uuid = uuid self.uuid = uuid
self.manager = None self.manager = None
self.last_state_change = time()
def setManager(self, manager):
self.manager = manager
def getLastStateChange(self):
return self.last_state_change
def getState(self): def getState(self):
return self.state return self.state
def setState(self, new_state): def setState(self, new_state):
if self.state != new_state:
self.state = new_state self.state = new_state
self.last_state_change = time()
def setServer(self, ip_address, port): def setServer(self, ip_address, port):
if self.ip_address is not None:
self.manager.unregisterServer(self)
self.ip_address = ip_address self.ip_address = ip_address
self.port = port self.port = port
self.manager.registerServer(self) self.manager.registerServer(self)
...@@ -28,6 +42,9 @@ class Node(object): ...@@ -28,6 +42,9 @@ class Node(object):
return self.ip_address, self.port return self.ip_address, self.port
def setUUID(self, uuid): def setUUID(self, uuid):
if self.uuid is not None:
self.manager.unregisterUUID(self)
self.uuid = uuid self.uuid = uuid
self.manager.registerUUID(self) self.manager.registerUUID(self)
...@@ -55,6 +72,7 @@ class NodeManager(object): ...@@ -55,6 +72,7 @@ class NodeManager(object):
self.uuid_dict = {} self.uuid_dict = {}
def add(self, node): def add(self, node):
node.setManager(self)
self.node_list.append(node) self.node_list.append(node)
if node.getServer()[0] is not None: if node.getServer()[0] is not None:
self.registerServer(node) self.registerServer(node)
......
from struct import pack, unpack from struct import pack, unpack
from socket import inet_ntoa, inet_aton from socket import inet_ntoa, inet_aton
import logging
from util import dump
# The protocol version (major, minor). # The protocol version (major, minor).
PROTOCOL_VERSION = (4, 0) PROTOCOL_VERSION = (4, 0)
...@@ -9,7 +12,7 @@ MIN_PACKET_SIZE = 10 ...@@ -9,7 +12,7 @@ MIN_PACKET_SIZE = 10
MAX_PACKET_SIZE = 0x100000 MAX_PACKET_SIZE = 0x100000
# Message types. # Message types.
ERROR = 0x8000 ERROR = 0x0000
REQUEST_NODE_IDENTIFICATION = 0x0001 REQUEST_NODE_IDENTIFICATION = 0x0001
ACCEPT_NODE_IDENTIFICATION = 0x8001 ACCEPT_NODE_IDENTIFICATION = 0x8001
PING = 0x0002 PING = 0x0002
...@@ -17,6 +20,7 @@ PONG = 0x8002 ...@@ -17,6 +20,7 @@ PONG = 0x8002
ASK_PRIMARY_MASTER = 0x0003 ASK_PRIMARY_MASTER = 0x0003
ANSWER_PRIMARY_MASTER = 0x8003 ANSWER_PRIMARY_MASTER = 0x8003
ANNOUNCE_PRIMARY_MASTER = 0x0004 ANNOUNCE_PRIMARY_MASTER = 0x0004
REELECT_PRIMARY_MASTER = 0x0005
# Error codes. # Error codes.
NOT_READY_CODE = 1 NOT_READY_CODE = 1
...@@ -35,6 +39,12 @@ CLIENT_NODE_TYPE = 3 ...@@ -35,6 +39,12 @@ CLIENT_NODE_TYPE = 3
VALID_NODE_TYPE_LIST = (MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE) VALID_NODE_TYPE_LIST = (MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE)
# Other constants.
INVALID_UUID = '\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0'
INVALID_TID = '\0\0\0\0\0\0\0\0'
INVALID_SERIAL = '\0\0\0\0\0\0\0\0'
INVALID_OID = '\0\0\0\0\0\0\0\0'
class ProtocolError(Exception): pass class ProtocolError(Exception): pass
class Packet: class Packet:
...@@ -46,6 +56,7 @@ class Packet: ...@@ -46,6 +56,7 @@ class Packet:
@classmethod @classmethod
def parse(cls, msg): def parse(cls, msg):
# logging.debug('parsing %s', dump(msg))
if len(msg) < MIN_PACKET_SIZE: if len(msg) < MIN_PACKET_SIZE:
return None return None
msg_id, msg_type, msg_len, reserved = unpack('!HHLH', msg[:10]) msg_id, msg_type, msg_len, reserved = unpack('!HHLH', msg[:10])
...@@ -60,7 +71,7 @@ class Packet: ...@@ -60,7 +71,7 @@ class Packet:
if len(msg) < msg_len: if len(msg) < msg_len:
# Not enough. # Not enough.
return None return None
return cls(msg_id, msg_type, msg_len, msg[10:msg_len]) return cls(msg_id, msg_type, msg[10:msg_len])
def __init__(self, msg_id = None, msg_type = None, body = None): def __init__(self, msg_id = None, msg_type = None, body = None):
self._id = msg_id self._id = msg_id
...@@ -74,7 +85,7 @@ class Packet: ...@@ -74,7 +85,7 @@ class Packet:
return self._type return self._type
def __len__(self): def __len__(self):
return len(self._body) return 10 + len(self._body)
# Encoders. # Encoders.
def encode(self): def encode(self):
...@@ -89,33 +100,68 @@ class Packet: ...@@ -89,33 +100,68 @@ class Packet:
self._id = msg_id self._id = msg_id
self._type = ERROR self._type = ERROR
self._body = pack('!HL', error_code, len(error_message)) + error_message self._body = pack('!HL', error_code, len(error_message)) + error_message
return self
def protocolError(self, msg_id, error_message): def protocolError(self, msg_id, error_message):
self.error(msg_id, PROTOCOL_ERROR_CODE, 'protocol error: ' + error_message) return self.error(msg_id, PROTOCOL_ERROR_CODE, 'protocol error: ' + error_message)
def internalError(self, msg_id, error_message): def internalError(self, msg_id, error_message):
self.error(msg_id, INTERNAL_ERROR_CODE, 'internal error: ' + error_message) return self.error(msg_id, INTERNAL_ERROR_CODE, 'internal error: ' + error_message)
def notReady(self, msg_id, error_message):
return self.error(msg_id, NOT_READY, 'not ready: ' + error_message)
def ping(self, msg_id): def ping(self, msg_id):
self._id = msg_id self._id = msg_id
self._type = PING self._type = PING
self._body = '' self._body = ''
return self
def pong(self, msg_id): def pong(self, msg_id):
self._id = msg_id self._id = msg_id
self._type = PONG self._type = PONG
self._body = '' self._body = ''
return self
def requestNodeIdentification(self, msg_id, node_type, uuid, ip_address, port, name): def requestNodeIdentification(self, msg_id, node_type, uuid, ip_address, port, name):
self._id = msg_id self._id = msg_id
self._type = REQUEST_NODE_IDENTIFICATION self._type = REQUEST_NODE_IDENTIFICATION
self._body = pack('!LLH16s4sHL', protocol_version[0], protocol_version[1], self._body = pack('!LLH16s4sHL', PROTOCOL_VERSION[0], PROTOCOL_VERSION[1],
node_type, uuid, inet_aton(ip_address), port, len(name)) + name node_type, uuid, inet_aton(ip_address), port, len(name)) + name
return self
def acceptNodeIdentification(self, msg_id, node_type, uuid, ip_address, port): def acceptNodeIdentification(self, msg_id, node_type, uuid, ip_address, port):
self._id = msg_id self._id = msg_id
self._type = ACCEPT_NODE_IDENTIFICATION self._type = ACCEPT_NODE_IDENTIFICATION
self._body = pack('!H16s4sH', node_type, uuid, inet_aton(ip_address), port) self._body = pack('!H16s4sH', node_type, uuid, inet_aton(ip_address), port)
return self
def askPrimaryMaster(self, msg_id, ltid, loid):
self._id = msg_id
self._type = ASK_PRIMARY_MASTER
self._body = ltid + loid
return self
def answerPrimaryMaster(self, msg_id, ltid, loid, primary_uuid, known_master_list):
self._id = msg_id
self._type = ANSWER_PRIMARY_MASTER
body = [ltid, loid, primary_uuid, pack('!L', len(known_master_list))]
for master in known_master_list:
body.append(pack('!4sH16s', inet_aton(master[0]), master[1], master[2]))
self._body = ''.join(body)
return self
def announcePrimaryMaster(self, msg_id):
self._id = msg_id
self._type = ANNOUNCE_PRIMARY_MASTER
self._body = ''
return self
def reelectPrimaryMaster(self, msg_id):
self._id = msg_id
self._type = REELECT_PRIMARY_MASTER
self._body = ''
return self
# Decoders. # Decoders.
def decode(self): def decode(self):
...@@ -160,7 +206,7 @@ class Packet: ...@@ -160,7 +206,7 @@ class Packet:
raise ProtocolError(self, 'invalid name size') raise ProtocolError(self, 'invalid name size')
if node_type not in VALID_NODE_TYPE_LIST: if node_type not in VALID_NODE_TYPE_LIST:
raise ProtocolError(self, 'invalid node type %d' % node_type) raise ProtocolError(self, 'invalid node type %d' % node_type)
if (major, minor) != protocol_version: if (major, minor) != PROTOCOL_VERSION:
raise ProtocolError(self, 'protocol version mismatch') raise ProtocolError(self, 'protocol version mismatch')
return node_type, uuid, ip_address, port, name return node_type, uuid, ip_address, port, name
decode_table[REQUEST_NODE_IDENTIFICATION] = _decodeRequestNodeIdentification decode_table[REQUEST_NODE_IDENTIFICATION] = _decodeRequestNodeIdentification
...@@ -176,3 +222,31 @@ class Packet: ...@@ -176,3 +222,31 @@ class Packet:
return node_type, uuid, ip_address, port return node_type, uuid, ip_address, port
decode_table[ACCEPT_NODE_IDENTIFICATION] = _decodeAcceptNodeIdentification decode_table[ACCEPT_NODE_IDENTIFICATION] = _decodeAcceptNodeIdentification
def _decodeAskPrimaryMaster(self):
try:
ltid, loid = unpack('!8s8s', self._body)
except:
raise ProtocolError(self, 'invalid ask primary master')
return ltid, loid
decode_table[ASK_PRIMARY_MASTER] = _decodeAskPrimaryMaster
def _decodeAnswerPrimaryMaster(self):
try:
ltid, loid, primary_uuid, n = unpack('!8s8s16sL', self._body[:36])
known_master_list = []
for i in xrange(n):
ip_address, port, uuid = unpack('!4sH16s', self._body[36+i*22:58+i*22])
ip_address = inet_ntoa(ip_address)
known_master_list.append((ip_address, port, uuid))
except:
raise ProtocolError(self, 'invalid answer primary master')
return ltid, loid, primary_uuid, known_master_list
decode_table[ANSWER_PRIMARY_MASTER] = _decodeAnswerPrimaryMaster
def _decodeAnnouncePrimaryMaster(self):
pass
decode_table[ANNOUNCE_PRIMARY_MASTER] = _decodeAnnouncePrimaryMaster
def _decodeReelectPrimaryMaster(self):
pass
decode_table[REELECT_PRIMARY_MASTER] = _decodeReelectPrimaryMaster
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment