Commit b079ca14 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Split election process into shorter methods to highlight logic.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@1463 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 651e51b3
......@@ -124,6 +124,7 @@ class Application(object):
# Reelect a new primary master.
self.electPrimary(bootstrap = False)
def electPrimary(self, bootstrap = True):
"""Elect a primary master node.
......@@ -136,131 +137,164 @@ class Application(object):
self.unconnected_master_node_set = set()
self.negotiating_master_node_set = set()
self.listening_conn.setHandler(election.ServerElectionHandler(self))
client_handler = election.ClientElectionHandler(self)
em = self.em
nm = self.nm
for node in nm.getMasterList():
for node in self.nm.getMasterList():
# For now, believe that every node should be available,
# since down or broken nodes may be already repaired.
node.setRunning()
while 1:
t = 0
self.primary = None
self.primary_master_node = None
for node in nm.getMasterList():
if node.isRunning():
self.unconnected_master_node_set.add(node.getAddress())
# Wait at most 20 seconds at bootstrap. Otherwise, wait at most
# 10 seconds to avoid stopping the whole cluster for a long time.
# Note that even if not all master are up in the first 20 seconds
# this is not an issue because the first up will timeout and take
# the primary role.
if bootstrap:
expiration = 20
else:
expiration = 10
try:
while True:
try:
while 1:
current_time = time()
if current_time >= t + 1:
t = current_time
for node in nm.getMasterList():
if node.isTemporarilyDown() \
and node.getLastStateChange() + \
expiration < current_time:
logging.info('%s is down' % (node, ))
node.setDown()
self.unconnected_master_node_set.discard(
node.getAddress())
# Try to connect to master nodes.
if self.unconnected_master_node_set:
for addr in list(self.unconnected_master_node_set):
ClientConnection(em, client_handler, addr=addr,
connector_handler=self.connector_handler)
em.poll(1)
if len(self.unconnected_master_node_set) == 0 \
and len(self.negotiating_master_node_set) == 0:
break
# handle new connected masters
for node in self.nm.getMasterList():
if node.isRunning():
self.unconnected_master_node_set.add(node.getAddress())
# start the election process
self.primary = None
self.primary_master_node = None
self._doElection(bootstrap)
# Now there are three situations:
# - I am the primary master
# - I am secondary but don't know who is primary
# - I am secondary and know who is primary
if self.primary is None:
# I am the primary.
self.primary = True
logging.debug('I am the primary, sending an announcement')
for conn in em.getClientList():
conn.notify(Packets.AnnouncePrimary())
conn.abort()
t = time()
while em.getClientList():
em.poll(1)
if t + 10 < time():
for conn in em.getClientList():
conn.close()
break
self.primary = self.primary is None
if self.primary:
# i'm the primary, send the announcement
self._announcePrimary()
else:
# Wait for an announcement. If this is too long, probably
# the primary master is down.
t = time()
while self.primary_master_node is None:
em.poll(1)
if t + 10 < time():
raise ElectionFailure, 'no primary master elected'
# Now I need only a connection to the primary master node.
primary = self.primary_master_node
addr = primary.getAddress()
for conn in em.getServerList():
conn.close()
for conn in em.getClientList():
if conn.getAddress() != addr:
conn.close()
# But if there is no such connection, something wrong
# happened.
for conn in em.getClientList():
if conn.getAddress() == addr:
break
else:
raise ElectionFailure, 'no connection remains to ' \
'the primary'
# otherwise, wait for the primary announcement
self._waitForPrimaryAnnouncement()
break
except ElectionFailure, m:
# something goes wrong, clean then restart
self._electionFailed(m)
bootstrap = False
return
except ElectionFailure, m:
logging.error('election failed; %s' % m)
# Ask all connected nodes to reelect a single primary master.
for conn in em.getClientList():
conn.notify(Packets.ReelectPrimary())
conn.abort()
def _doElection(self, bootstrap):
"""
Start the election process:
- Try to connect to any known master node
- Wait a most for the timeout defined by bootstrap parameter
When done, the current process si defined either as primary or
secondary master node
"""
# Wait at most 20 seconds at bootstrap. Otherwise, wait at most
# 10 seconds to avoid stopping the whole cluster for a long time.
# Note that even if not all master are up in the first 20 seconds
# this is not an issue because the first up will timeout and take
# the primary role.
if bootstrap:
expiration = 20
else:
expiration = 10
client_handler = election.ClientElectionHandler(self)
t = 0
while True:
current_time = time()
if current_time >= t + 1:
t = current_time
for node in self.nm.getMasterList():
if node.isTemporarilyDown() \
and node.getLastStateChange() + \
expiration < current_time:
logging.info('%s is down' % (node, ))
node.setDown()
self.unconnected_master_node_set.discard(
node.getAddress())
# Try to connect to master nodes.
if self.unconnected_master_node_set:
for addr in list(self.unconnected_master_node_set):
ClientConnection(self.em, client_handler, addr=addr,
connector_handler=self.connector_handler)
self.em.poll(1)
if len(self.unconnected_master_node_set) == 0 \
and len(self.negotiating_master_node_set) == 0:
break
# Wait until the connections are closed.
self.primary = None
self.primary_master_node = None
t = time()
while em.getClientList():
try:
em.poll(1)
except ElectionFailure:
pass
if time() > t + 10:
# If too long, do not wait.
break
# Close all connections.
for conn in em.getClientList():
conn.close()
for conn in em.getServerList():
def _announcePrimary(self):
"""
Broadcast the announce that I'm the primary
"""
# I am the primary.
logging.debug('I am the primary, sending an announcement')
for conn in self.em.getClientList():
conn.notify(Packets.AnnouncePrimary())
conn.abort()
t = time()
while self.em.getClientList():
self.em.poll(1)
if t + 10 < time():
for conn in self.em.getClientList():
conn.close()
bootstrap = False
break
def _waitForPrimaryAnnouncement(self):
"""
For for the primary announcement as i'm not the primary.
If this is too long, raise ElectionFailure to restart the whole
election process.
"""
# Wait for an announcement. If this is too long, probably
# the primary master is down.
t = time()
while self.primary_master_node is None:
self.em.poll(1)
if t + 10 < time():
# election timeout
raise ElectionFailure("Election timeout")
# Now I need only a connection to the primary master node.
primary = self.primary_master_node
addr = primary.getAddress()
for conn in self.em.getServerList():
conn.close()
for conn in self.em.getClientList():
if conn.getAddress() != addr:
conn.close()
# But if there is no such connection, something wrong
# happened.
for conn in self.em.getClientList():
if conn.getAddress() == addr:
# primary master elected and connected
break
else:
raise ElectionFailure('No connection remains to the primary')
def _electionFailed(self, m):
"""
Ask other masters to reelect a primary after an election failure.
"""
logging.error('election failed: %s', (m, ))
# Ask all connected nodes to reelect a single primary master.
for conn in self.em.getClientList():
conn.notify(Packets.ReelectPrimary())
conn.abort()
# Wait until the connections are closed.
self.primary = None
self.primary_master_node = None
t = time()
while self.em.getClientList():
try:
self.em.poll(1)
except ElectionFailure:
pass
if time() > t + 10:
# If too long, do not wait.
break
# Close all connections.
for conn in self.em.getClientList():
conn.close()
for conn in self.em.getServerList():
conn.close()
def broadcastNodesInformation(self, node_list):
"""
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment