Commit 64826794 authored by Julien Muchembled's avatar Julien Muchembled

New neoctl command to flush the logs of all nodes in the cluster

parent d68e9053
...@@ -62,6 +62,11 @@ class AdminEventHandler(EventHandler): ...@@ -62,6 +62,11 @@ class AdminEventHandler(EventHandler):
master_node = self.app.master_node master_node = self.app.master_node
conn.answer(Packets.AnswerPrimary(master_node.getUUID())) conn.answer(Packets.AnswerPrimary(master_node.getUUID()))
@check_primary_master
def flushLog(self, conn):
self.app.master_conn.send(Packets.FlushLog())
super(AdminEventHandler, self).flushLog(conn)
askLastIDs = forward_ask(Packets.AskLastIDs) askLastIDs = forward_ask(Packets.AskLastIDs)
askLastTransaction = forward_ask(Packets.AskLastTransaction) askLastTransaction = forward_ask(Packets.AskLastTransaction)
addPendingNodes = forward_ask(Packets.AddPendingNodes) addPendingNodes = forward_ask(Packets.AddPendingNodes)
......
...@@ -201,6 +201,9 @@ class EventHandler(object): ...@@ -201,6 +201,9 @@ class EventHandler(object):
if not conn.client: if not conn.client:
conn.close() conn.close()
def flushLog(self, conn):
logging.flush()
# Error packet handlers. # Error packet handlers.
def error(self, conn, code, message, **kw): def error(self, conn, code, message, **kw):
......
...@@ -1630,6 +1630,13 @@ class Truncate(Packet): ...@@ -1630,6 +1630,13 @@ class Truncate(Packet):
_answer = Error _answer = Error
class FlushLog(Packet):
"""
Request all nodes to flush their logs.
:nodes: ctl -> A -> M -> *
"""
_next_code = 0 _next_code = 0
def register(request, ignore_when_closed=None): def register(request, ignore_when_closed=None):
...@@ -1805,6 +1812,8 @@ class Packets(dict): ...@@ -1805,6 +1812,8 @@ class Packets(dict):
AddObject) AddObject)
Truncate = register( Truncate = register(
Truncate) Truncate)
FlushLog = register(
FlushLog)
def Errors(): def Errors():
registry_dict = {} registry_dict = {}
......
...@@ -46,6 +46,13 @@ class AdministrationHandler(MasterHandler): ...@@ -46,6 +46,13 @@ class AdministrationHandler(MasterHandler):
if node is not None: if node is not None:
self.app.nm.remove(node) self.app.nm.remove(node)
def flushLog(self, conn):
p = Packets.FlushLog()
for node in self.app.nm.getConnectedList():
c = node.getConnection()
c is conn or c.send(p)
super(AdministrationHandler, self).flushLog(conn)
def setClusterState(self, conn, state): def setClusterState(self, conn, state):
app = self.app app = self.app
# check request # check request
......
...@@ -39,6 +39,7 @@ action_dict = { ...@@ -39,6 +39,7 @@ action_dict = {
'kill': 'killNode', 'kill': 'killNode',
'prune_orphan': 'pruneOrphan', 'prune_orphan': 'pruneOrphan',
'truncate': 'truncate', 'truncate': 'truncate',
'flush_log': 'flushLog',
} }
uuid_int = (lambda ns: lambda uuid: uuid_int = (lambda ns: lambda uuid:
...@@ -253,6 +254,15 @@ class TerminalNeoCTL(object): ...@@ -253,6 +254,15 @@ class TerminalNeoCTL(object):
partition_dict = dict.fromkeys(xrange(np), source) partition_dict = dict.fromkeys(xrange(np), source)
self.neoctl.checkReplicas(partition_dict, min_tid, max_tid) self.neoctl.checkReplicas(partition_dict, min_tid, max_tid)
def flushLog(self, params):
"""
Ask all nodes in the cluster to flush their logs.
If there are backup clusters, only their primary masters flush.
"""
assert not params
self.neoctl.flushLog()
class Application(object): class Application(object):
"""The storage node application.""" """The storage node application."""
......
...@@ -204,3 +204,9 @@ class NeoCTL(BaseApplication): ...@@ -204,3 +204,9 @@ class NeoCTL(BaseApplication):
if response[0] != Packets.Error or response[1] != ErrorCodes.ACK: if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
raise RuntimeError(response) raise RuntimeError(response)
return response[2] return response[2]
def flushLog(self):
conn = self.__getConnection()
conn.send(Packets.FlushLog())
while conn.pending():
self.em.poll(1)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment