Commit 452c1ca0 authored by Vincent Pelletier's avatar Vincent Pelletier

Rewrite of neoctl to split it into 2 parts:

- reusable library (neoctl.py)
- console front-end (app.py)


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@1015 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 80f94b31
...@@ -15,195 +15,177 @@ ...@@ -15,195 +15,177 @@
# along with this program; if not, write to the Free Software # along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import logging from neo.neoctl.neoctl import NeoCTL
from neo.util import bin, dump
from neo.protocol import node_types, node_states
from neo.event import EventManager
from neo.connection import ClientConnection
from neo.neoctl.handler import CommandEventHandler
from neo.connector import getConnectorHandler
from neo.util import bin
from neo import protocol from neo import protocol
class ActionError(Exception): action_dict = {
pass 'print': {
'pt': 'getPartitionRowList',
'node': 'getNodeList',
'cluster': 'getClusterState',
},
'set': {
'node': 'setNodeState',
'cluster': 'setClusterState',
},
'start': {
'cluster': 'startCluster',
},
'add': 'enableStorageList',
'drop': 'dropNode',
}
class TerminalNeoCTL(object):
def __init__(self, ip, port, handler):
self.neoctl = NeoCTL(ip, port, handler)
# Utility methods (could be functions)
def asNodeState(self, value):
if not value.endswith('_NODE_STATE'):
value += '_NODE_STATE'
return protocol.node_states.getFromStr(value)
def asNodeType(self, value):
if not value.endswith('_NODE_TYPE'):
value += '_NODE_TYPE'
return protocol.node_types.getFromStr(value)
def asClusterState(self, value):
if not value.endswith('_CLUSTER_STATE'):
value += '_CLUSTER_STATE'
return protocol.cluster_states.getFromStr(value)
def asNode(self, value):
return bin(value)
def formatRowList(self, row_list):
return '\n'.join('%s | %s' % (offset,
''.join('%s - %s |' % (dump(uuid), state)
for (uuid, state) in cell_list))
for (offset, cell_list) in row_list)
def formatNodeList(self, node_list):
result = []
for node_type, address, uuid, state in node_list:
if address is None:
address = (None, None)
ip, port = address
result.append('%s - %s - %s:%s - %s' % (node_type, dump(uuid), ip,
port, state))
return '\n'.join(result)
def addAction(options): # Actual actions
""" def getPartitionRowList(self, params):
Change node state from "pending" to "running". """
Parameters: Get a list of partition rows, bounded by min & max and involving
node uuid given node.
UUID of node to add, or "all". Parameters: [min [max [node]]]
""" min: offset of the first row to fetch (starts at 0)
if len(options) == 1 and options[0] == 'all': max: offset of the last row to fetch (0 for no limit)
uuid_list = [] node: filters the list of nodes serving a line to this node
"""
params = params + [0, 0, None][len(params):]
min_offset, max_offset, node = params
min_offset = int(min_offset)
max_offset = int(max_offset)
if node is not None:
node = self.asNode(node)
ptid, row_list = self.neoctl.getPartitionRowList(min_offset=min_offset,
max_offset=max_offset,
node=node)
# TODO: return ptid
return self.formatRowList(row_list)
def getNodeList(self, params):
"""
Get a list of nodes, filtering with given type.
Parameters: [type]
type: type of node to display
"""
assert len(params) < 2
if len(params):
node_type = self.asNodeType(params[0])
else: else:
uuid_list = [bin(opt) for opt in options] node_type = None
return protocol.addPendingNodes(uuid_list) node_list = self.neoctl.getNodeList(node_type=node_type)
return self.formatNodeList(node_list)
def setClusterAction(options): def getClusterState(self, params):
""" """
Set cluster of given name to given state. Get cluster state.
Parameters:
cluster state
State to put the cluster in.
""" """
# XXX: why do we ask for a cluster name ? assert len(params) == 0
# We connect to only one cluster that we get from configuration file, return str(self.neoctl.getClusterState())
# anyway.
name, state = options
state = protocol.cluster_states.getFromStr(state)
if state is None:
raise ActionError('unknown cluster state')
return protocol.setClusterState(name, state)
def setNodeAction(options): def setNodeState(self, params):
""" """
Put given node into given state. Set node state, and allow (or not) updating partition table.
Parameters: Parameters: node state [update]
node uuid node: node to modify
UUID of target node state: state to put the node in
node state update: disallow (0, default) or allow (other integer) partition
Node state to set. table to be updated
change partition table (optional)
If given with a 1 value, allow partition table to be changed.
""" """
uuid = bin(options.pop(0)) assert len(params) in (2, 3)
state = options.pop(0) + '_STATE' node = self.asNode(params[0])
state = node_states.getFromStr(state) state = self.asNodeState(params[1])
if state is None: if len(params) == 3:
raise ActionError('unknown state type') update_partition_table = not(not(int(params[2])))
if len(options):
modify = int(options.pop(0))
else: else:
modify = 0 update_partition_table = False
return protocol.setNodeState(uuid, state, modify) self.neoctl.setNodeState(node, state,
update_partition_table=update_partition_table)
def printClusterAction(options): def setClusterState(self, params):
""" """
Print cluster state. Set cluster state.
Parameters: state
state: state to put the cluster in
""" """
return protocol.askClusterState() assert len(params) == 1
self.neoctl.setClusterState(self.asClusterState(params[0]))
def printNodeAction(options): def startCluster(self, params):
""" """
Print nodes of a given type. Starts cluster operation after a startup.
Parameters: Equivalent to:
node type set cluster VERIFYING
Print known nodes of given type.
""" """
node_type = options.pop(0) + '_NODE_TYPE' assert len(params) == 0
node_type = node_types.getFromStr(node_type) self.neoctl.startCluster()
if node_type is None:
raise ActionError('unknown node type')
return protocol.askNodeList(node_type)
def printPTAction(options): def enableStorageList(self, params):
""" """
Print the partition table. Enable cluster to make use of pending storages.
Parameters: Parameters: all
range node [node [...]]
all Prints the entire partition table. node: if "all", add all pending storage nodes.
1 10 Prints rows 1 to 10 of partition table. otherwise, the list of storage nodes to enable.
10 0 Prints rows from 10 to the end of partition table.
node uuid (optional)
If given, prints only the rows in which given node is used.
""" """
offset = options.pop(0) if len(params) == 1 and params[0] == 'all':
if offset == "all": node_list = self.neoctl.getNodeList(
min_offset = 0 node_type=protocol.STORAGE_NODE_TYPE)
max_offset = 0
else:
min_offset = int(offset)
max_offset = int(options.pop(0))
if len(options):
uuid = bin(options.pop(0))
else: else:
uuid = None node_list = [self.asNode(x) for x in params]
return protocol.askPartitionList(min_offset, max_offset, uuid) self.neoctl.enableStorageList(node_list)
def startCluster(options): def dropNode(self, params):
""" """
Allow it to leave the recovery stage and accept the current partition Set node into DOWN state.
table, or make an empty if nothing was found. Parameters: node
Parameter: Cluster name node: node the pu into DOWN state
Equivalent to:
set node state (node) DOWN
""" """
name = options.pop(0) assert len(params) == 1
return protocol.setClusterState(name, protocol.VERIFYING) self.neoctl.dropNode(self.asNode(params[0]))
def dropNode(options):
"""
Drop one or more storage node from the partition table. Its content
is definitely lost. Be carefull because currently there is no check
about the cluster operational status, so you can drop a node with
non-replicated data.
Parameter: UUID
"""
uuid = bin(options.pop(0))
return protocol.setNodeState(uuid, protocol.DOWN_STATE, 1)
action_dict = {
'print': {
'pt': printPTAction,
'node': printNodeAction,
'cluster': printClusterAction,
},
'set': {
'node': setNodeAction,
'cluster': setClusterAction,
},
'start': {
'cluster': startCluster,
},
'add': addAction,
'drop': dropNode,
}
class Application(object): class Application(object):
"""The storage node application.""" """The storage node application."""
conn = None
def __init__(self, ip, port, handler): def __init__(self, ip, port, handler):
self.neoctl = TerminalNeoCTL(ip, port, handler)
self.connector_handler = getConnectorHandler(handler)
self.server = (ip, port)
self.em = EventManager()
self.ptid = None
self.trying_admin_node = False
self.result = ''
def getConnection(self):
if self.conn is None:
handler = CommandEventHandler(self)
# connect to admin node
self.trying_admin_node = True
conn = None
while self.trying_admin_node:
self.em.poll(1)
if conn is None:
self.trying_admin_node = True
logging.debug('connecting to address %s:%d', *(self.server))
conn = ClientConnection(self.em, handler, \
addr = self.server,
connector_handler = self.connector_handler)
self.conn = conn
return self.conn
def doAction(self, packet):
conn = self.getConnection()
conn.ask(packet)
self.result = ""
while 1:
self.em.poll(1)
if len(self.result):
break
def __del__(self):
if self.conn is not None:
self.conn.close()
def execute(self, args): def execute(self, args):
"""Execute the command given.""" """Execute the command given."""
...@@ -218,29 +200,35 @@ class Application(object): ...@@ -218,29 +200,35 @@ class Application(object):
isinstance(current_action, dict): isinstance(current_action, dict):
current_action = current_action.get(args[level]) current_action = current_action.get(args[level])
level += 1 level += 1
if callable(current_action): if isinstance(current_action, basestring):
try: action = getattr(self.neoctl, current_action, None)
p = current_action(args[level:])
except ActionError, message:
self.result = message
else: else:
self.doAction(p) action = None
if action is None:
result = self.usage('unknown command')
if result is None:
result = 'Ok'
else: else:
self.result = usage('unknown command') result = action(args[level:])
return self.result return result
def _usage(action_dict, level=0): def _usage(self, action_dict, level=0):
result = [] result = []
append = result.append append = result.append
sub_level = level + 1 sub_level = level + 1
for name, action in action_dict.iteritems(): for name, action in action_dict.iteritems():
append('%s%s' % (' ' * level, name)) append('%s%s' % (' ' * level, name))
if isinstance(action, dict): if isinstance(action, dict):
append(_usage(action, level=sub_level)) append(self._usage(action, level=sub_level))
else: else:
docstring_line_list = getattr(action, '__doc__', real_action = getattr(self.neoctl, action, None)
'(no docstring)').split('\n') if real_action is None:
continue
docstring = getattr(real_action, '__doc__', None)
if docstring is None:
docstring = '(no docstring)'
docstring_line_list = docstring.split('\n')
# Strip empty lines at begining & end of line list # Strip empty lines at begining & end of line list
for end in (0, -1): for end in (0, -1):
while len(docstring_line_list) \ while len(docstring_line_list) \
...@@ -254,7 +242,7 @@ def _usage(action_dict, level=0): ...@@ -254,7 +242,7 @@ def _usage(action_dict, level=0):
for x in docstring_line_list]) for x in docstring_line_list])
return '\n'.join(result) return '\n'.join(result)
def usage(message): def usage(self, message):
output_list = [message, 'Available commands:', _usage(action_dict)] output_list = [message, 'Available commands:', self._usage(action_dict)]
return '\n'.join(output_list) return '\n'.join(output_list)
...@@ -16,71 +16,55 @@ ...@@ -16,71 +16,55 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from neo.handler import EventHandler from neo.handler import EventHandler
from neo.protocol import UnexpectedPacketError from neo import protocol
from neo.exception import OperationFailure
from neo.util import dump
class CommandEventHandler(EventHandler): class CommandEventHandler(EventHandler):
""" Base handler for command """ """ Base handler for command """
def connectionAccepted(self, conn, s, addr):
"""Called when a connection is accepted."""
raise UnexpectedPacketError
def connectionCompleted(self, conn): def connectionCompleted(self, conn):
# connected to admin node # connected to admin node
self.app.trying_admin_node = False self.app.connected = True
EventHandler.connectionCompleted(self, conn) EventHandler.connectionCompleted(self, conn)
def connectionFailed(self, conn): def __disconnected(self):
EventHandler.connectionFailed(self, conn) app = self.app
raise OperationFailure, "impossible to connect to admin node %s:%d" % conn.getAddress() app.connected = False
app.connection = None
def timeoutExpired(self, conn): def __respond(self, response):
EventHandler.timeoutExpired(self, conn) self.app.response_queue.append(response)
raise OperationFailure, "connection to admin node %s:%d timeout" % conn.getAddress()
def connectionClosed(self, conn): def connectionClosed(self, conn):
if self.app.trying_admin_node: super(CommandEventHandler, self).connectionClosed(conn)
raise OperationFailure, "cannot connect to admin node %s:%d" % conn.getAddress() self.__disconnected()
EventHandler.connectionClosed(self, conn)
def connectionFailed(self, conn):
super(CommandEventHandler, self).connectionFailed(conn)
self.__disconnected()
def timeoutExpired(self, conn):
super(CommandEventHandler, self).timeoutExpired(conn)
self.__disconnected()
def peerBroken(self, conn): def peerBroken(self, conn):
EventHandler.peerBroken(self, conn) super(CommandEventHandler, self).peerBroken(conn)
raise OperationFailure, "connect to admin node %s:%d broken" % conn.getAddress() self.__disconnected()
def handleAnswerPartitionList(self, conn, packet, ptid, row_list): def handleAnswerPartitionList(self, conn, packet, ptid, row_list):
data = "" self.__respond((packet.getType(), ptid, row_list))
if len(row_list) == 0:
data = "No partition"
else:
for offset, cell_list in row_list:
data += "\n%s | " % offset
for uuid, state in cell_list:
data += "%s - %s |" % (dump(uuid), state)
self.app.result = data
def handleAnswerNodeList(self, conn, packet, node_list): def handleAnswerNodeList(self, conn, packet, node_list):
data = "" self.__respond((packet.getType(), node_list))
if len(node_list) == 0:
data = "No Node"
else:
for node_type, address, uuid, state in node_list:
if address is None:
address = (None, None)
ip, port = address
data += "\n%s - %s - %s:%s - %s" % (node_type, dump(uuid), ip, port, state)
self.app.result = data
def handleAnswerNodeState(self, conn, packet, uuid, state): def handleAnswerNodeState(self, conn, packet, uuid, state):
self.app.result = "Node %s set to state %s" % (dump(uuid), state) self.__respond((packet.getType(), uuid, state))
def handleAnswerClusterState(self, conn, packet, state): def handleAnswerClusterState(self, conn, packet, state):
self.app.result = "Cluster state : %s" % state self.__respond((packet.getType(), state))
def handleAnswerNewNodes(self, conn, packet, uuid_list): def handleAnswerNewNodes(self, conn, packet, uuid_list):
uuids = ', '.join([dump(uuid) for uuid in uuid_list]) self.__respond((packet.getType(), uuid_list))
self.app.result = 'New storage nodes : %s' % uuids
def handleNoError(self, conn, packet, msg): def handleNoError(self, conn, packet, msg):
self.app.result = msg self.__respond((packet.getType(), protocol.NO_ERROR_CODE, msg))
#
# Copyright (C) 2006-2009 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from neo.connector import getConnectorHandler
from neo.connection import ClientConnection
from neo.event import EventManager
from neo.neoctl.handler import CommandEventHandler
from neo import protocol
class NeoCTL(object):
connection = None
connected = False
def __init__(self, ip, port, handler):
self.connector_handler = getConnectorHandler(handler)
self.server = (ip, port)
self.em = EventManager()
self.handler = CommandEventHandler(self)
self.response_queue = []
def __getConnection(self):
while not self.connected:
self.connection = ClientConnection(
self.em, self.handler, addr=self.server,
connector_handler=self.connector_handler)
while not self.connected and self.connection is not None:
self.em.poll(0)
return self.connection
def __ask(self, packet):
# TODO: make thread-safe
connection = self.__getConnection()
connection.ask(packet)
response_queue = self.response_queue
assert len(response_queue) == 0
while len(response_queue) == 0:
self.em.poll(0)
if not self.connected:
raise Exception, 'Connection closed'
return response_queue.pop()
def enableStorageList(self, node_list):
"""
Put all given storage nodes in "running" state.
"""
packet = protocol.addPendingNodes(node_list)
response = self.__ask(packet)
assert response[0] == protocol.ERROR
assert response[1] == protocol.NO_ERROR_CODE
def setClusterState(self, state):
"""
Set cluster state.
"""
packet = protocol.setClusterState(state)
response = self.__ask(packet)
assert response[0] == protocol.ANSWER_CLUSTER_STATE
assert state == response[1]
def setNodeState(self, node, state, update_partition_table=False):
"""
Set node state, and allow (or not) updating partition table.
"""
if update_partition_table:
update_partition_table = 1
else:
update_partition_table = 0
packet = protocol.setNodeState(node, state, update_partition_table)
response = self.__ask(packet)
assert response[0] == protocol.ANSWER_NODE_STATE
assert node == response[1]
assert state == response[2]
def getClusterState(self):
"""
Get cluster state.
"""
packet = protocol.askClusterState()
response = self.__ask(packet)
assert response[0] == protocol.ANSWER_CLUSTER_STATE
return response[1]
def getNodeList(self, node_type=None):
"""
Get a list of nodes, filtering with given type.
"""
packet = protocol.askNodeList(node_type)
response = self.__ask(packet)
assert response[0] == protocol.ANSWER_NODE_LIST
return response[1]
def getPartitionRowList(self, min_offset=0, max_offset=0, node=None):
"""
Get a list of partition rows, bounded by min & max and involving
given node.
"""
packet = protocol.askPartitionList(min_offset, max_offset, node)
response = self.__ask(packet)
assert response[0] == protocol.ANSWER_PARTITION_LIST
return (response[1], response[2])
def startCluster(self):
"""
Set cluster into "verifying" state.
"""
self.setClusterState(protocol.VERIFYING)
def dropNode(self, node):
"""
Set node into "down" state and remove it from partition table.
"""
self.setNodeState(node, protocol.DOWN_STATE, update_partition_table=1)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment