/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#include <ndb_global.h>
#include <my_sys.h>

//#define HAVE_GLOBAL_REPLICATION

#include <Vector.hpp>
#ifdef  HAVE_GLOBAL_REPLICATION
#include "../rep/repapi/repapi.h"
#endif

#include <mgmapi.h>

class MgmtSrvr;

/** 
 *  @class CommandInterpreter
 *  @brief Reads command line in management client
 *
 *  This class has one public method which reads a command line 
 *  from a stream. It then interpret that commmand line and calls a suitable 
 *  method in the MgmtSrvr class which executes the command.
 *
 *  For command syntax, see the HELP command.
 */ 
class CommandInterpreter {
public:
  /**
   *   Constructor
   *   @param mgmtSrvr: Management server to use when executing commands
   */
  CommandInterpreter(const char *, int verbose);
  ~CommandInterpreter();
  
  /**
   *   Reads one line from the stream, parse the line to find 
   *   a command and then calls a suitable method which executes 
   *   the command.
   *
   *   @return true until quit/bye/exit has been typed
   */
  int execute(const char *_line, int _try_reconnect=-1, int *error= 0);

private:
  void printError();
  int execute_impl(const char *_line);

  /**
   *   Analyse the command line, after the first token.
   *
   *   @param  processId:           DB process id to send command to or -1 if
   *                                command will be sent to all DB processes.
   *   @param  allAfterFirstToken:  What the client gave after the 
   *                                first token on the command line
   */
  void analyseAfterFirstToken(int processId, char* allAfterFirstTokenCstr);

  /**
   *   Parse the block specification part of the LOG* commands,
   *   things after LOG*: [BLOCK = {ALL|<blockName>+}]
   *
   *   @param  allAfterLog: What the client gave after the second token 
   *                        (LOG*) on the command line
   *   @param  blocks, OUT: ALL or name of all the blocks
   *   @return: true if correct syntax, otherwise false
   */
  bool parseBlockSpecification(const char* allAfterLog, 
			       Vector<const char*>& blocks);
  
  /**
   *   A bunch of execute functions: Executes one of the commands
   *
   *   @param  processId:   DB process id to send command to
   *   @param  parameters:  What the client gave after the command name 
   *                        on the command line.
   *   For example if complete input from user is: "1 LOGLEVEL 22" then the
   *   parameters argument is the string with everything after LOGLEVEL, in
   *   this case "22". Each function is responsible to check the parameters
   *   argument.
   */
  void executeHelp(char* parameters);
  void executeShow(char* parameters);
  void executeConnect(char* parameters);
  void executePurge(char* parameters);
  int  executeShutdown(char* parameters);
  void executeRun(char* parameters);
  void executeInfo(char* parameters);
  void executeClusterLog(char* parameters);

public:
  void executeStop(int processId, const char* parameters, bool all);
  void executeEnterSingleUser(char* parameters);
  void executeExitSingleUser(char* parameters);
  void executeStart(int processId, const char* parameters, bool all);
  void executeRestart(int processId, const char* parameters, bool all);
  void executeLogLevel(int processId, const char* parameters, bool all);
  void executeError(int processId, const char* parameters, bool all);
  void executeLog(int processId, const char* parameters, bool all);
  void executeLogIn(int processId, const char* parameters, bool all);
  void executeLogOut(int processId, const char* parameters, bool all);
  void executeLogOff(int processId, const char* parameters, bool all);
  void executeTestOn(int processId, const char* parameters, bool all);
  void executeTestOff(int processId, const char* parameters, bool all);
  void executeSet(int processId, const char* parameters, bool all);
  void executeGetStat(int processId, const char* parameters, bool all);
  void executeStatus(int processId, const char* parameters, bool all);
  void executeEventReporting(int processId, const char* parameters, bool all);
  void executeDumpState(int processId, const char* parameters, bool all);
  int executeStartBackup(char * parameters);
  void executeAbortBackup(char * parameters);

  void executeRep(char* parameters);

  void executeCpc(char * parameters);

public:
  bool connect();
  bool disconnect();

  /**
   * A execute function definition
   */
public:
  typedef void (CommandInterpreter::* ExecuteFunction)(int processId, 
						       const char * param, 
						       bool all);
  
  struct CommandFunctionPair {
    const char * command;
    ExecuteFunction executeFunction;
  };
private:
  /**
   * 
   */
  void executeForAll(const char * cmd, 
		     ExecuteFunction fun,
		     const char * param);

  NdbMgmHandle m_mgmsrv;
  NdbMgmHandle m_mgmsrv2;
  bool m_connected;
  int m_verbose;
  int try_reconnect;
  int m_error;
#ifdef HAVE_GLOBAL_REPLICATION  
  NdbRepHandle m_repserver;
  const char *rep_host;
  bool rep_connected;
#endif
  struct NdbThread* m_event_thread;
};


/*
 * Facade object for CommandInterpreter
 */

#include "ndb_mgmclient.hpp"
#include "ndb_mgmclient.h"

Ndb_mgmclient::Ndb_mgmclient(const char *host,int verbose)
{
  m_cmd= new CommandInterpreter(host,verbose);
}
Ndb_mgmclient::~Ndb_mgmclient()
{
  delete m_cmd;
}
int Ndb_mgmclient::execute(const char *_line, int _try_reconnect, int *error)
{
  return m_cmd->execute(_line,_try_reconnect,error);
}
int
Ndb_mgmclient::disconnect()
{
  return m_cmd->disconnect();
}

extern "C" {
  Ndb_mgmclient_handle ndb_mgmclient_handle_create(const char *connect_string)
  {
    return (Ndb_mgmclient_handle) new Ndb_mgmclient(connect_string);
  }
  int ndb_mgmclient_execute(Ndb_mgmclient_handle h, int argc, char** argv)
  {
    return ((Ndb_mgmclient*)h)->execute(argc, argv, 1);
  }
  int ndb_mgmclient_handle_destroy(Ndb_mgmclient_handle h)
  {
    delete (Ndb_mgmclient*)h;
    return 0;
  }
}
/*
 * The CommandInterpreter
 */

#include <mgmapi.h>
#include <mgmapi_debug.h>
#include <version.h>
#include <NdbAutoPtr.hpp>
#include <NdbOut.hpp>
#include <NdbSleep.h>
#include <NdbMem.h>
#include <EventLogger.hpp>
#include <signaldata/SetLogLevelOrd.hpp>
#include <signaldata/GrepImpl.hpp>
#ifdef HAVE_GLOBAL_REPLICATION

#endif // HAVE_GLOBAL_REPLICATION
#include "MgmtErrorReporter.hpp"
#include <Parser.hpp>
#include <SocketServer.hpp>
#include <util/InputStream.hpp>
#include <util/OutputStream.hpp>

int Ndb_mgmclient::execute(int argc, char** argv, int _try_reconnect, int *error)
{
  if (argc <= 0)
    return 0;
  BaseString _line(argv[0]);
  for (int i= 1; i < argc; i++)
  {
    _line.appfmt(" %s", argv[i]);
  }
  return m_cmd->execute(_line.c_str(),_try_reconnect, error);
}

/*****************************************************************************
 * HELP
 *****************************************************************************/
static const char* helpText =
"---------------------------------------------------------------------------\n"
" NDB Cluster -- Management Client -- Help\n"
"---------------------------------------------------------------------------\n"
"HELP                                   Print help text\n"
"HELP SHOW                              Help for SHOW command\n"
#ifdef HAVE_GLOBAL_REPLICATION
"HELP REPLICATION                       Help for global replication\n"
#endif // HAVE_GLOBAL_REPLICATION
#ifdef VM_TRACE // DEBUG ONLY
"HELP DEBUG                             Help for debug compiled version\n"
#endif
"SHOW                                   Print information about cluster\n"
#if 0
"SHOW CONFIG                            Print configuration\n"
"SHOW PARAMETERS                        Print configuration parameters\n"
#endif
"START BACKUP [NOWAIT | WAIT STARTED | WAIT COMPLETED]\n"
"                                       Start backup (default WAIT COMPLETED)\n"
"ABORT BACKUP <backup id>               Abort backup\n"
"SHUTDOWN                               Shutdown all processes in cluster\n"
"CLUSTERLOG ON [<severity>] ...         Enable Cluster logging\n"
"CLUSTERLOG OFF [<severity>] ...        Disable Cluster logging\n"
"CLUSTERLOG TOGGLE [<severity>] ...     Toggle severity filter on/off\n"
"CLUSTERLOG INFO                        Print cluster log information\n"
"<id> START                             Start DB node (started with -n)\n"
"<id> RESTART [-n] [-i]                 Restart DB node\n"
"<id> STOP                              Stop DB node\n"
"ENTER SINGLE USER MODE <api-node>      Enter single user mode\n"
"EXIT SINGLE USER MODE                  Exit single user mode\n"
"<id> STATUS                            Print status\n"
"<id> CLUSTERLOG {<category>=<level>}+  Set log level for cluster log\n"
#ifdef HAVE_GLOBAL_REPLICATION
"REP CONNECT <host:port>                Connect to REP server on host:port\n"
#endif
"PURGE STALE SESSIONS                   Reset reserved nodeid's in the mgmt server\n"
"CONNECT [<connectstring>]              Connect to management server (reconnect if already connected)\n"
"QUIT                                   Quit management client\n"
;

static const char* helpTextShow =
"---------------------------------------------------------------------------\n"
" NDB Cluster -- Management Client -- Help for SHOW command\n"
"---------------------------------------------------------------------------\n"
"SHOW prints NDB Cluster information\n\n"
"SHOW               Print information about cluster\n" 
#if 0
"SHOW CONFIG        Print configuration (in initial config file format)\n" 
"SHOW PARAMETERS    Print information about configuration parameters\n\n"
#endif
;

#ifdef HAVE_GLOBAL_REPLICATION
static const char* helpTextRep =
"---------------------------------------------------------------------------\n"
" NDB Cluster -- Management Client -- Help for Global Replication\n"
"---------------------------------------------------------------------------\n"
"Commands should be executed on the standby NDB Cluster\n"
"These features are in an experimental release state.\n"
"\n"
"Simple Commands:\n"
"REP START              Start Global Replication\n" 
"REP START REQUESTOR    Start Global Replication Requestor\n" 
"REP STATUS             Show Global Replication status\n" 
"REP STOP               Stop Global Replication\n"
"REP STOP REQUESTOR     Stop Global Replication Requestor\n"
"\n" 
"Advanced Commands:\n"
"REP START <protocol>   Starts protocol\n"
"REP STOP <protocol>    Stops protocol\n"
"<protocol> = TRANSFER | APPLY | DELETE\n"
"\n"
#ifdef VM_TRACE // DEBUG ONLY
"Debugging commands:\n"
"REP DELETE             Removes epochs stored in primary and standy systems\n"
"REP DROP <tableid>     Drop a table in SS identified by table id\n"
"REP SLOWSTOP           Stop Replication (Tries to synchonize with primary)\n" 
"REP FASTSTOP           Stop Replication (Stops in consistent state)\n" 
"<component> = SUBSCRIPTION\n"
"              METALOG | METASCAN | DATALOG | DATASCAN\n"
"              REQUESTOR | TRANSFER | APPLY | DELETE\n"
#endif
;
#endif // HAVE_GLOBAL_REPLICATION

#ifdef VM_TRACE // DEBUG ONLY
static const char* helpTextDebug =
"---------------------------------------------------------------------------\n"
" NDB Cluster -- Management Client -- Help for Debugging (Internal use only)\n"
"---------------------------------------------------------------------------\n"
"SHOW PROPERTIES                       Print config properties object\n"
"<id> LOGLEVEL {<category>=<level>}+   Set log level\n"
#ifdef ERROR_INSERT
"<id> ERROR <errorNo>                  Inject error into NDB node\n"
#endif
"<id> LOG [BLOCK = {ALL|<block>+}]     Set logging on in & out signals\n"
"<id> LOGIN [BLOCK = {ALL|<block>+}]   Set logging on in signals\n"
"<id> LOGOUT [BLOCK = {ALL|<block>+}]  Set logging on out signals\n"
"<id> LOGOFF [BLOCK = {ALL|<block>+}]  Unset signal logging\n"
"<id> TESTON                           Start signal logging\n"
"<id> TESTOFF                          Stop signal logging\n"
"<id> SET <configParamName> <value>    Update configuration variable\n"
"<id> DUMP <arg>                       Dump system state to cluster.log\n"
"<id> GETSTAT                          Print statistics\n"
"\n"
"<id>       = ALL | Any database node id\n"
;
#endif

static bool
convert(const char* s, int& val) {
  
  if (s == NULL)
    return false;

  if (strlen(s) == 0)
    return false;

  errno = 0;
  char* p;
  long v = strtol(s, &p, 10);
  if (errno != 0)
    return false;

  if (p != &s[strlen(s)])
    return false;
  
  val = v;
  return true;
}

/*
 * Constructor
 */
CommandInterpreter::CommandInterpreter(const char *_host,int verbose) 
  : m_verbose(verbose)
{
  m_mgmsrv = ndb_mgm_create_handle();
  if(m_mgmsrv == NULL) {
    ndbout_c("Cannot create handle to management server.");
    exit(-1);
  }
  m_mgmsrv2 = ndb_mgm_create_handle();
  if(m_mgmsrv2 == NULL) {
    ndbout_c("Cannot create handle to management server.");
    exit(-1);
  }
  if (ndb_mgm_set_connectstring(m_mgmsrv, _host))
  {
    printError();
    exit(-1);
  }

  m_connected= false;
  m_event_thread= 0;
  try_reconnect = 0;
#ifdef HAVE_GLOBAL_REPLICATION
  rep_host = NULL;
  m_repserver = NULL;
  rep_connected = false;
#endif
}

/*
 * Destructor
 */
CommandInterpreter::~CommandInterpreter() 
{
  disconnect();
  ndb_mgm_destroy_handle(&m_mgmsrv);
  ndb_mgm_destroy_handle(&m_mgmsrv2);
}

static bool 
emptyString(const char* s) 
{
  if (s == NULL) {
    return true;
  }

  for (unsigned int i = 0; i < strlen(s); ++i) {
    if (! isspace(s[i])) {
      return false;
    }
  }

  return true;
}

void
CommandInterpreter::printError() 
{
  if (ndb_mgm_check_connection(m_mgmsrv))
  {
    m_connected= false;
    disconnect();
  }
  ndbout_c("* %5d: %s", 
	   ndb_mgm_get_latest_error(m_mgmsrv),
	   ndb_mgm_get_latest_error_msg(m_mgmsrv));
  ndbout_c("*        %s", ndb_mgm_get_latest_error_desc(m_mgmsrv));
}

//*****************************************************************************
//*****************************************************************************

static int do_event_thread;
static void*
event_thread_run(void* m)
{
  NdbMgmHandle handle= *(NdbMgmHandle*)m;

  my_thread_init();

  int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_BACKUP, 0 };
  int fd = ndb_mgm_listen_event(handle, filter);
  if (fd > 0)
  {
    do_event_thread= 1;
    char *tmp= 0;
    char buf[1024];
    SocketInputStream in(fd,10);
    do {
      if (tmp == 0) NdbSleep_MilliSleep(10);
      if((tmp = in.gets(buf, 1024)))
	ndbout << tmp;
    } while(do_event_thread);
  }
  else
  {
    do_event_thread= -1;
  }

  my_thread_end();
  NdbThread_Exit(0);
  return 0;
}

bool
CommandInterpreter::connect() 
{
  if(!m_connected)
  {
    if(!ndb_mgm_connect(m_mgmsrv, try_reconnect-1, 5, 1))
    {
      const char *host= ndb_mgm_get_connected_host(m_mgmsrv);
      unsigned port= ndb_mgm_get_connected_port(m_mgmsrv);
      if(!ndb_mgm_set_connectstring(m_mgmsrv2,
				    BaseString(host).appfmt(":%d",port).c_str())
	 &&
	 !ndb_mgm_connect(m_mgmsrv2, try_reconnect-1, 5, 1))
      {
	assert(m_event_thread == 0);
	assert(do_event_thread == 0);
	do_event_thread= 0;
	m_event_thread = NdbThread_Create(event_thread_run,
					  (void**)&m_mgmsrv2,
					  32768,
					  "CommandInterpreted_event_thread",
					  NDB_THREAD_PRIO_LOW);
	if (m_event_thread != 0)
	{
	  int iter= 1000; // try for 30 seconds
	  while(do_event_thread == 0 &&
		iter-- > 0)
	    NdbSleep_MilliSleep(30);
	}
	if (m_event_thread == 0 ||
	    do_event_thread == 0 ||
	    do_event_thread == -1)
	{
	  printf("Warning, event thread startup failed, degraded printouts as result\n");
	  do_event_thread= 0;
	}
      }
      else
      {
	printf("Warning, event connect failed, degraded printouts as result\n");
      }
      m_connected= true;
      if (m_verbose)
      {
	printf("Connected to Management Server at: %s:%d\n",
	       host, port);
      }
    }
  }
  return m_connected;
}

bool 
CommandInterpreter::disconnect() 
{
  if (m_event_thread) {
    void *res;
    do_event_thread= 0;
    NdbThread_WaitFor(m_event_thread, &res);
    NdbThread_Destroy(&m_event_thread);
    m_event_thread= 0;
    ndb_mgm_disconnect(m_mgmsrv2);
  }
  if (m_connected)
  {
    if (ndb_mgm_disconnect(m_mgmsrv) == -1) {
      ndbout_c("Could not disconnect from management server");
      printError();
    }
    m_connected= false;
  }
  return true;
}

//*****************************************************************************
//*****************************************************************************

int 
CommandInterpreter::execute(const char *_line, int _try_reconnect,
			    int *error) 
{
  if (_try_reconnect >= 0)
    try_reconnect=_try_reconnect;
  int result= execute_impl(_line);
  if (error)
    *error= m_error;
  return result;
}

static void
invalid_command(const char *cmd)
{
  ndbout << "Invalid command: " << cmd << endl;
  ndbout << "Type HELP for help." << endl << endl;
}

int 
CommandInterpreter::execute_impl(const char *_line) 
{
  DBUG_ENTER("CommandInterpreter::execute_impl");
  DBUG_PRINT("enter",("line=\"%s\"",_line));
  m_error= 0;

  char * line;
  if(_line == NULL) {
    DBUG_RETURN(false);
  }
  line = my_strdup(_line,MYF(MY_WME));
  My_auto_ptr<char> ptr(line);

  int do_continue;
  do {
    do_continue= 0;
    BaseString::trim(line," \t");
    if (line[0] == 0 ||
	line[0] == '#')
    {
      DBUG_RETURN(true);
    }
    // for mysql client compatability remove trailing ';'
    {
      unsigned last= strlen(line)-1;
      if (line[last] == ';')
      {
	line[last]= 0;
	do_continue= 1;
      }
    }
  } while (do_continue);
  // if there is anything in the line proceed
  char* firstToken = strtok(line, " ");
  char* allAfterFirstToken = strtok(NULL, "");
  
  if (strcasecmp(firstToken, "HELP") == 0 ||
      strcasecmp(firstToken, "?") == 0) {
    executeHelp(allAfterFirstToken);
    DBUG_RETURN(true);
  }
  else if (strcasecmp(firstToken, "CONNECT") == 0) {
    executeConnect(allAfterFirstToken);
    DBUG_RETURN(true);
  }
  else if (strcasecmp(firstToken, "SLEEP") == 0) {
    if (allAfterFirstToken)
      sleep(atoi(allAfterFirstToken));
    DBUG_RETURN(true);
  }
  else if((strcasecmp(firstToken, "QUIT") == 0 ||
	  strcasecmp(firstToken, "EXIT") == 0 ||
	  strcasecmp(firstToken, "BYE") == 0) && 
	  allAfterFirstToken == NULL){
    DBUG_RETURN(false);
  }

  if (!connect())
    DBUG_RETURN(true);

  if (strcasecmp(firstToken, "SHOW") == 0) {
    executeShow(allAfterFirstToken);
    DBUG_RETURN(true);
  }
  else if (strcasecmp(firstToken, "SHUTDOWN") == 0) {
    m_error= executeShutdown(allAfterFirstToken);
    DBUG_RETURN(true);
  }
  else if (strcasecmp(firstToken, "CLUSTERLOG") == 0){
    executeClusterLog(allAfterFirstToken);
    DBUG_RETURN(true);
  }
  else if(strcasecmp(firstToken, "START") == 0 &&
	  allAfterFirstToken != NULL &&
	  strncasecmp(allAfterFirstToken, "BACKUP", sizeof("BACKUP") - 1) == 0){
    m_error= executeStartBackup(allAfterFirstToken);
    DBUG_RETURN(true);
  }
  else if(strcasecmp(firstToken, "ABORT") == 0 &&
	  allAfterFirstToken != NULL &&
	  strncasecmp(allAfterFirstToken, "BACKUP", sizeof("BACKUP") - 1) == 0){
    executeAbortBackup(allAfterFirstToken);
    DBUG_RETURN(true);
  }
  else if (strcasecmp(firstToken, "PURGE") == 0) {
    executePurge(allAfterFirstToken);
    DBUG_RETURN(true);
  } 
#ifdef HAVE_GLOBAL_REPLICATION
  else if(strcasecmp(firstToken, "REPLICATION") == 0 ||
	  strcasecmp(firstToken, "REP") == 0) {
    executeRep(allAfterFirstToken);
    DBUG_RETURN(true);
  }
#endif // HAVE_GLOBAL_REPLICATION
  else if(strcasecmp(firstToken, "ENTER") == 0 &&
	  allAfterFirstToken != NULL &&
	  strncasecmp(allAfterFirstToken, "SINGLE USER MODE ", 
		  sizeof("SINGLE USER MODE") - 1) == 0){
    executeEnterSingleUser(allAfterFirstToken);
    DBUG_RETURN(true);
  }
  else if(strcasecmp(firstToken, "EXIT") == 0 &&
	  allAfterFirstToken != NULL &&
	  strncasecmp(allAfterFirstToken, "SINGLE USER MODE ", 
		  sizeof("SINGLE USER MODE") - 1) == 0){
    executeExitSingleUser(allAfterFirstToken);
    DBUG_RETURN(true);
  }
  else if (strcasecmp(firstToken, "ALL") == 0) {
    analyseAfterFirstToken(-1, allAfterFirstToken);
  } else {
    /**
     * First token should be a digit, node ID
     */
    int nodeId;

    if (! convert(firstToken, nodeId)) {
      invalid_command(_line);
      DBUG_RETURN(true);
    }

    if (nodeId <= 0) {
      ndbout << "Invalid node ID: " << firstToken << "." << endl;
      DBUG_RETURN(true);
    }
    
    analyseAfterFirstToken(nodeId, allAfterFirstToken);
    
  }
  DBUG_RETURN(true);
}


/**
 * List of commands used as second command argument
 */
static const CommandInterpreter::CommandFunctionPair commands[] = {
  { "START", &CommandInterpreter::executeStart }
  ,{ "RESTART", &CommandInterpreter::executeRestart }
  ,{ "STOP", &CommandInterpreter::executeStop }
  ,{ "STATUS", &CommandInterpreter::executeStatus }
  ,{ "LOGLEVEL", &CommandInterpreter::executeLogLevel }
  ,{ "CLUSTERLOG", &CommandInterpreter::executeEventReporting }
#ifdef ERROR_INSERT
  ,{ "ERROR", &CommandInterpreter::executeError }
#endif
  ,{ "LOG", &CommandInterpreter::executeLog }
  ,{ "LOGIN", &CommandInterpreter::executeLogIn }
  ,{ "LOGOUT", &CommandInterpreter::executeLogOut }
  ,{ "LOGOFF", &CommandInterpreter::executeLogOff }
  ,{ "TESTON", &CommandInterpreter::executeTestOn }
  ,{ "TESTOFF", &CommandInterpreter::executeTestOff }
  ,{ "SET", &CommandInterpreter::executeSet }
  ,{ "GETSTAT", &CommandInterpreter::executeGetStat }
  ,{ "DUMP", &CommandInterpreter::executeDumpState }
};


//*****************************************************************************
//*****************************************************************************
void
CommandInterpreter::analyseAfterFirstToken(int processId,
					   char* allAfterFirstToken) {
  
  if (emptyString(allAfterFirstToken)) {
    ndbout << "Expected a command after "
	   << ((processId == -1) ? "ALL." : "node ID.") << endl;
    return;
  }
  
  char* secondToken = strtok(allAfterFirstToken, " ");
  char* allAfterSecondToken = strtok(NULL, "\0");

  const int tmpSize = sizeof(commands)/sizeof(CommandFunctionPair);
  ExecuteFunction fun = 0;
  const char * command = 0;
  for(int i = 0; i<tmpSize; i++){
    if(strcasecmp(secondToken, commands[i].command) == 0){
      fun = commands[i].executeFunction;
      command = commands[i].command;
      break;
    }
  }
  
  if(fun == 0){
    invalid_command(secondToken);
    return;
  }
  
  if(processId == -1){
    executeForAll(command, fun, allAfterSecondToken);
  } else {
    (this->*fun)(processId, allAfterSecondToken, false);
  }
  ndbout << endl;
}

/**
 * Get next nodeid larger than the give node_id. node_id will be
 * set to the next node_id in the list. node_id should be set
 * to 0 (zero) on the first call.
 *
 * @param handle the NDB management handle
 * @param node_id last node_id retreived, 0 at first call
 * @param type type of node to look for
 * @return 1 if a node was found, 0 if no more node exist
 */
static 
int 
get_next_nodeid(struct ndb_mgm_cluster_state *cl,
		int *node_id,
		enum ndb_mgm_node_type type)
{
  int i;
  
  if(cl == NULL)
    return 0;
  
  i=0;
  while((i < cl->no_of_nodes)) {
    if((*node_id < cl->node_states[i].node_id) &&
       (cl->node_states[i].node_type == type)) {
      
      if(i >= cl->no_of_nodes)
	return 0;
      
      *node_id = cl->node_states[i].node_id;
      return 1;
    }
    i++;
  }
  
  return 0;
}

void
CommandInterpreter::executeForAll(const char * cmd, ExecuteFunction fun, 
				  const char * allAfterSecondToken)
{
  int nodeId = 0;
  if(strcasecmp(cmd, "STOP") == 0) {
    ndbout_c("Executing STOP on all nodes.");
    (this->*fun)(nodeId, allAfterSecondToken, true);
  } else if(strcasecmp(cmd, "RESTART") == 0) {
    ndbout_c("Executing RESTART on all nodes.");
    ndbout_c("Starting shutdown. This may take a while. Please wait...");
    (this->*fun)(nodeId, allAfterSecondToken, true);
    ndbout_c("Trying to start all nodes of system.");
    ndbout_c("Use ALL STATUS to see the system start-up phases.");
  } else {
    struct ndb_mgm_cluster_state *cl= ndb_mgm_get_status(m_mgmsrv);
    if(cl == 0){
      ndbout_c("Unable get status from management server");
      printError();
      return;
    }
    NdbAutoPtr<char> ap1((char*)cl);
    while(get_next_nodeid(cl, &nodeId, NDB_MGM_NODE_TYPE_NDB))
      (this->*fun)(nodeId, allAfterSecondToken, true);
  }
}

//*****************************************************************************
//*****************************************************************************
bool 
CommandInterpreter::parseBlockSpecification(const char* allAfterLog,
					    Vector<const char*>& blocks) 
{
  // Parse: [BLOCK = {ALL|<blockName>+}]

  if (emptyString(allAfterLog)) {
    return true;
  }

  // Copy allAfterLog since strtok will modify it  
  char* newAllAfterLog = my_strdup(allAfterLog,MYF(MY_WME));
  My_auto_ptr<char> ap1(newAllAfterLog);
  char* firstTokenAfterLog = strtok(newAllAfterLog, " ");
  for (unsigned int i = 0; i < strlen(firstTokenAfterLog); ++i) {
    firstTokenAfterLog[i] = toupper(firstTokenAfterLog[i]);
  }
  
  if (strcasecmp(firstTokenAfterLog, "BLOCK") != 0) {
    ndbout << "Unexpected value: " << firstTokenAfterLog 
	   << ". Expected BLOCK." << endl;
    return false;
  }

  char* allAfterFirstToken = strtok(NULL, "\0");
  if (emptyString(allAfterFirstToken)) {
    ndbout << "Expected =." << endl;
    return false;
  }

  char* secondTokenAfterLog = strtok(allAfterFirstToken, " ");
  if (strcasecmp(secondTokenAfterLog, "=") != 0) {
    ndbout << "Unexpected value: " << secondTokenAfterLog 
	   << ". Expected =." << endl;
    return false;
  }

  char* blockName = strtok(NULL, " ");
  bool all = false;
  if (blockName != NULL && (strcasecmp(blockName, "ALL") == 0)) {
    all = true;
  }
  while (blockName != NULL) {
    blocks.push_back(strdup(blockName));
    blockName = strtok(NULL, " ");
  }

  if (blocks.size() == 0) {
    ndbout << "No block specified." << endl;
    return false;
  }
  if (blocks.size() > 1 && all) {
    // More than "ALL" specified
    ndbout << "Nothing expected after ALL." << endl;
    return false;
  }
  
  return true;
}



/*****************************************************************************
 * HELP
 *****************************************************************************/
void 
CommandInterpreter::executeHelp(char* parameters)
{
  if (emptyString(parameters)) {
    ndbout << helpText;

    ndbout << endl 
	   << "<severity> = " 
	   << "ALERT | CRITICAL | ERROR | WARNING | INFO | DEBUG"
	   << endl;

    ndbout << "<category> = ";
    for(int i = CFG_MIN_LOGLEVEL; i <= CFG_MAX_LOGLEVEL; i++){
      const char *str= ndb_mgm_get_event_category_string((ndb_mgm_event_category)i);
      if (str) {
	if (i != CFG_MIN_LOGLEVEL)
	  ndbout << " | ";
	ndbout << str;
      }
    }
    ndbout << endl;

    ndbout << "<level>    = " << "0 - 15" << endl;
    ndbout << "<id>       = " << "ALL | Any database node id" << endl;
    ndbout << endl;
  } else if (strcasecmp(parameters, "SHOW") == 0) {
    ndbout << helpTextShow;
#ifdef HAVE_GLOBAL_REPLICATION
  } else if (strcasecmp(parameters, "REPLICATION") == 0 ||
	     strcasecmp(parameters, "REP") == 0) {
    ndbout << helpTextRep;
#endif // HAVE_GLOBAL_REPLICATION
#ifdef VM_TRACE // DEBUG ONLY
  } else if (strcasecmp(parameters, "DEBUG") == 0) {
    ndbout << helpTextDebug;
#endif
  } else {
    invalid_command(parameters);
  }
}


/*****************************************************************************
 * SHUTDOWN
 *****************************************************************************/

int
CommandInterpreter::executeShutdown(char* parameters) 
{ 
  ndb_mgm_cluster_state *state = ndb_mgm_get_status(m_mgmsrv);
  if(state == NULL) {
    ndbout_c("Could not get status");
    printError();
    return 1;
  }
  NdbAutoPtr<char> ap1((char*)state);

  int result = 0;
  result = ndb_mgm_stop(m_mgmsrv, 0, 0);
  if (result < 0) {
    ndbout << "Shutdown off NDB Cluster storage node(s) failed." << endl;
    printError();
    return result;
  }

  ndbout << result << " NDB Cluster storage node(s) have shutdown." << endl;

  int mgm_id= 0;
  for(int i=0; i < state->no_of_nodes; i++) {
    if(state->node_states[i].node_type == NDB_MGM_NODE_TYPE_MGM &&
       state->node_states[i].version != 0){
      if (mgm_id == 0)
	mgm_id= state->node_states[i].node_id;
      else {
	ndbout << "Unable to locate management server, "
	       << "shutdown manually with <id> STOP"
	       << endl;
	return 1;
      }
    }
  }

  result = ndb_mgm_stop(m_mgmsrv, 1, &mgm_id);
  if (result <= 0) {
    ndbout << "Shutdown of NDB Cluster management server failed." << endl;
    printError();
    if (result == 0)
      return 1;
    return result;
  }

  m_connected= false;
  disconnect();
  ndbout << "NDB Cluster management server shutdown." << endl;
  return 0;
}

/*****************************************************************************
 * SHOW
 *****************************************************************************/


static
const char *status_string(ndb_mgm_node_status status)
{
  switch(status){
  case NDB_MGM_NODE_STATUS_NO_CONTACT:
    return "not connected";
  case NDB_MGM_NODE_STATUS_NOT_STARTED:
    return "not started";
  case NDB_MGM_NODE_STATUS_STARTING:
    return "starting";
  case NDB_MGM_NODE_STATUS_STARTED:
    return "started";
  case NDB_MGM_NODE_STATUS_SHUTTING_DOWN:
    return "shutting down";
  case NDB_MGM_NODE_STATUS_RESTARTING:
    return "restarting";
  case NDB_MGM_NODE_STATUS_SINGLEUSER:
    return "single user mode";
  default:
    return "unknown state";
  }
}

static void
print_nodes(ndb_mgm_cluster_state *state, ndb_mgm_configuration_iterator *it,
	    const char *proc_name, int no_proc, ndb_mgm_node_type type,
	    int master_id)
{ 
  int i;
  ndbout << "[" << proc_name
	 << "(" << ndb_mgm_get_node_type_string(type) << ")]\t"
	 << no_proc << " node(s)" << endl;
  for(i=0; i < state->no_of_nodes; i++) {
    struct ndb_mgm_node_state *node_state= &(state->node_states[i]);
    if(node_state->node_type == type) {
      int node_id= node_state->node_id;
      ndbout << "id=" << node_id;
      if(node_state->version != 0) {
	const char *hostname= node_state->connect_address;
	if (hostname == 0
	    || strlen(hostname) == 0
	    || strcasecmp(hostname,"0.0.0.0") == 0)
	  ndbout << " ";
	else
	  ndbout << "\t@" << hostname;
	ndbout << "  (Version: "
	       << getMajor(node_state->version) << "."
	       << getMinor(node_state->version) << "."
	       << getBuild(node_state->version);
	if (type == NDB_MGM_NODE_TYPE_NDB) {
	  if (node_state->node_status != NDB_MGM_NODE_STATUS_STARTED) {
	    ndbout << ", " << status_string(node_state->node_status);
	  }
	  if (node_state->node_group >= 0) {
	    ndbout << ", Nodegroup: " << node_state->node_group;
	    if (node_state->dynamic_id == master_id)
	      ndbout << ", Master";
	  }
	}
	ndbout << ")" << endl;
      } else {
	if(ndb_mgm_find(it, CFG_NODE_ID, node_id) != 0){
	  ndbout_c("Unable to find node with id: %d", node_id);
	  return;
	}
	const char *config_hostname= 0;
	ndb_mgm_get_string_parameter(it, CFG_NODE_HOST, &config_hostname);
	if (config_hostname == 0 || config_hostname[0] == 0)
	  config_hostname= "any host";
	ndbout << " (not connected, accepting connect from "
	       << config_hostname << ")" << endl;
      }
    }
  }
  ndbout << endl;
}

void
CommandInterpreter::executePurge(char* parameters) 
{ 
  int command_ok= 0;
  do {
    if (emptyString(parameters))
      break;
    char* firstToken = strtok(parameters, " ");
    char* nextToken = strtok(NULL, " \0");
    if (strcasecmp(firstToken,"STALE") == 0 &&
	nextToken &&
	strcasecmp(nextToken, "SESSIONS") == 0) {
      command_ok= 1;
      break;
    }
  } while(0);

  if (!command_ok) {
    ndbout_c("Unexpected command, expected: PURGE STALE SESSIONS");
    return;
  }

  int i;
  char *str;
  
  if (ndb_mgm_purge_stale_sessions(m_mgmsrv, &str)) {
    ndbout_c("Command failed");
    return;
  }
  if (str) {
    ndbout_c("Purged sessions with node id's: %s", str);
    free(str);
  }
  else
  {
    ndbout_c("No sessions purged");
  }
}

void
CommandInterpreter::executeShow(char* parameters) 
{ 
  int i;
  if (emptyString(parameters)) {
    ndb_mgm_cluster_state *state = ndb_mgm_get_status(m_mgmsrv);
    if(state == NULL) {
      ndbout_c("Could not get status");
      printError();
      return;
    }
    NdbAutoPtr<char> ap1((char*)state);

    ndb_mgm_configuration * conf = ndb_mgm_get_configuration(m_mgmsrv,0);
    if(conf == 0){
      ndbout_c("Could not get configuration");
      printError();
      return;
    }

    ndb_mgm_configuration_iterator * it;
    it = ndb_mgm_create_configuration_iterator((struct ndb_mgm_configuration *)conf, CFG_SECTION_NODE);

    if(it == 0){
      ndbout_c("Unable to create config iterator");
      return;
    }
    NdbAutoPtr<ndb_mgm_configuration_iterator> ptr(it);

    int
      master_id= 0,
      ndb_nodes= 0,
      api_nodes= 0,
      mgm_nodes= 0;

    for(i=0; i < state->no_of_nodes; i++) {
      if(state->node_states[i].node_type == NDB_MGM_NODE_TYPE_NDB &&
	 state->node_states[i].version != 0){
	master_id= state->node_states[i].dynamic_id;
	break;
      }
    }
    
    for(i=0; i < state->no_of_nodes; i++) {
      switch(state->node_states[i].node_type) {
      case NDB_MGM_NODE_TYPE_API:
	api_nodes++;
	break;
      case NDB_MGM_NODE_TYPE_NDB:
	if (state->node_states[i].dynamic_id < master_id)
	  master_id= state->node_states[i].dynamic_id;
	ndb_nodes++;
	break;
      case NDB_MGM_NODE_TYPE_MGM:
	mgm_nodes++;
	break;
      case NDB_MGM_NODE_TYPE_UNKNOWN:
        ndbout << "Error: Unknown Node Type" << endl;
        return;
      case NDB_MGM_NODE_TYPE_REP:
	abort();
      }
    }

    ndbout << "Cluster Configuration" << endl
	   << "---------------------" << endl;
    print_nodes(state, it, "ndbd",     ndb_nodes, NDB_MGM_NODE_TYPE_NDB, master_id);
    print_nodes(state, it, "ndb_mgmd", mgm_nodes, NDB_MGM_NODE_TYPE_MGM, 0);
    print_nodes(state, it, "mysqld",   api_nodes, NDB_MGM_NODE_TYPE_API, 0);
    //    ndbout << helpTextShow;
    return;
  } else if (strcasecmp(parameters, "PROPERTIES") == 0 ||
	     strcasecmp(parameters, "PROP") == 0) {
    ndbout << "SHOW PROPERTIES is not yet implemented." << endl;
    //  ndbout << "_mgmtSrvr.getConfig()->print();" << endl; /* XXX */
  } else if (strcasecmp(parameters, "CONFIGURATION") == 0 ||
	     strcasecmp(parameters, "CONFIG") == 0){
    ndbout << "SHOW CONFIGURATION is not yet implemented." << endl;
    //nbout << "_mgmtSrvr.getConfig()->printConfigFile();" << endl; /* XXX */
  } else if (strcasecmp(parameters, "PARAMETERS") == 0 ||
	     strcasecmp(parameters, "PARAMS") == 0 ||
	     strcasecmp(parameters, "PARAM") == 0) {
    ndbout << "SHOW PARAMETERS is not yet implemented." << endl;
    //    ndbout << "_mgmtSrvr.getConfig()->getConfigInfo()->print();" 
    //           << endl; /* XXX */
  } else {
    ndbout << "Invalid argument." << endl;
  }
}

void
CommandInterpreter::executeConnect(char* parameters) 
{
  disconnect();
  if (!emptyString(parameters)) {
    if (ndb_mgm_set_connectstring(m_mgmsrv,
				  BaseString(parameters).trim().c_str()))
    {
      printError();
      return;
    }
  }
  connect();
}

//*****************************************************************************
//*****************************************************************************
void 
CommandInterpreter::executeClusterLog(char* parameters) 
{
  DBUG_ENTER("CommandInterpreter::executeClusterLog");
  int i;
  if (emptyString(parameters))
  {
    ndbout << "Missing argument." << endl;
    DBUG_VOID_RETURN;
  }

  enum ndb_mgm_event_severity severity = NDB_MGM_EVENT_SEVERITY_ALL;
    
  char * tmpString = my_strdup(parameters,MYF(MY_WME));
  My_auto_ptr<char> ap1(tmpString);
  char * tmpPtr = 0;
  char * item = strtok_r(tmpString, " ", &tmpPtr);
  int enable;

  const unsigned int *enabled= ndb_mgm_get_logfilter(m_mgmsrv);
  if(enabled == NULL) {
    ndbout << "Couldn't get status" << endl;
    printError();
    DBUG_VOID_RETURN;
  }

  /********************
   * CLUSTERLOG INFO
   ********************/
  if (strcasecmp(item, "INFO") == 0) {
    DBUG_PRINT("info",("INFO"));
    if(enabled[0] == 0)
    {
      ndbout << "Cluster logging is disabled." << endl;
      DBUG_VOID_RETURN;
    }
#if 0 
    for(i = 0; i<7;i++)
      printf("enabled[%d] = %d\n", i, enabled[i]);
#endif
    ndbout << "Severities enabled: ";
    for(i = 1; i < (int)NDB_MGM_EVENT_SEVERITY_ALL; i++) {
      const char *str= ndb_mgm_get_event_severity_string((ndb_mgm_event_severity)i);
      if (str == 0)
      {
	DBUG_ASSERT(false);
	continue;
      }
      if(enabled[i])
	ndbout << BaseString(str).ndb_toupper() << " ";
    }
    ndbout << endl;
    DBUG_VOID_RETURN;

  } 
  else if (strcasecmp(item, "FILTER") == 0 ||
	   strcasecmp(item, "TOGGLE") == 0)
  {
    DBUG_PRINT("info",("TOGGLE"));
    enable= -1;
  } 
  else if (strcasecmp(item, "OFF") == 0) 
  {
    DBUG_PRINT("info",("OFF"));
    enable= 0;
  } else if (strcasecmp(item, "ON") == 0) {
    DBUG_PRINT("info",("ON"));
    enable= 1;
  } else {
    ndbout << "Invalid argument." << endl;
    DBUG_VOID_RETURN;
  }

  int res_enable;
  item = strtok_r(NULL, " ", &tmpPtr);
  if (item == NULL) {
    res_enable=
      ndb_mgm_set_clusterlog_severity_filter(m_mgmsrv,
					     NDB_MGM_EVENT_SEVERITY_ON,
					     enable, NULL);
    if (res_enable < 0)
    {
      ndbout << "Couldn't set filter" << endl;
      printError();
      DBUG_VOID_RETURN;
    }
    ndbout << "Cluster logging is " << (res_enable ? "enabled.":"disabled") << endl;
    DBUG_VOID_RETURN;
  }

  do {
    severity= NDB_MGM_ILLEGAL_EVENT_SEVERITY;
    if (strcasecmp(item, "ALL") == 0) {
      severity = NDB_MGM_EVENT_SEVERITY_ALL;	
    } else if (strcasecmp(item, "ALERT") == 0) {
      severity = NDB_MGM_EVENT_SEVERITY_ALERT;
    } else if (strcasecmp(item, "CRITICAL") == 0) { 
      severity = NDB_MGM_EVENT_SEVERITY_CRITICAL;
    } else if (strcasecmp(item, "ERROR") == 0) {
      severity = NDB_MGM_EVENT_SEVERITY_ERROR;
    } else if (strcasecmp(item, "WARNING") == 0) {
      severity = NDB_MGM_EVENT_SEVERITY_WARNING;
    } else if (strcasecmp(item, "INFO") == 0) {
      severity = NDB_MGM_EVENT_SEVERITY_INFO;
    } else if (strcasecmp(item, "DEBUG") == 0) {
      severity = NDB_MGM_EVENT_SEVERITY_DEBUG;
    } else if (strcasecmp(item, "OFF") == 0 ||
	       strcasecmp(item, "ON") == 0) {
      if (enable < 0) // only makes sense with toggle
	severity = NDB_MGM_EVENT_SEVERITY_ON;
    }
    if (severity == NDB_MGM_ILLEGAL_EVENT_SEVERITY) {
      ndbout << "Invalid severity level: " << item << endl;
      DBUG_VOID_RETURN;
    }

    res_enable= ndb_mgm_set_clusterlog_severity_filter(m_mgmsrv, severity,
						       enable, NULL);
    if (res_enable < 0)
    {
      ndbout << "Couldn't set filter" << endl;
      printError();
      DBUG_VOID_RETURN;
    }
    ndbout << BaseString(item).ndb_toupper().c_str() << " " << (res_enable ? "enabled":"disabled") << endl;

    item = strtok_r(NULL, " ", &tmpPtr);	
  } while(item != NULL);

  DBUG_VOID_RETURN;
} 

//*****************************************************************************
//*****************************************************************************

void
CommandInterpreter::executeStop(int processId, const char *, bool all) 
{
  int result = 0;
  if(all) {
    result = ndb_mgm_stop(m_mgmsrv, 0, 0);
  } else {
    result = ndb_mgm_stop(m_mgmsrv, 1, &processId);
  }
  if (result < 0) {
    ndbout << "Shutdown failed." << endl;
    printError();
  } else
    {
      if(all)
	ndbout << "NDB Cluster has shutdown." << endl;
      else
	ndbout << "Node " << processId << " has shutdown." << endl;
    }
}

void
CommandInterpreter::executeEnterSingleUser(char* parameters) 
{
  strtok(parameters, " ");
  struct ndb_mgm_reply reply;
  char* id = strtok(NULL, " ");
  id = strtok(NULL, " ");
  id = strtok(NULL, "\0");
  int nodeId = -1;
  if(id == 0 || sscanf(id, "%d", &nodeId) != 1){
    ndbout_c("Invalid arguments: expected <NodeId>");
    ndbout_c("Use SHOW to see what API nodes are configured");
    return;
  }
  int result = ndb_mgm_enter_single_user(m_mgmsrv, nodeId, &reply);
  
  if (result != 0) {
    ndbout_c("Entering single user mode for node %d failed", nodeId);
    printError();
  } else {
    ndbout_c("Entering single user mode");
    ndbout_c("Access will be granted for API node %d only.", nodeId);
    ndbout_c("Use ALL STATUS to see when single user mode has been entered.");
  }
}

void 
CommandInterpreter::executeExitSingleUser(char* parameters) 
{
  int result = ndb_mgm_exit_single_user(m_mgmsrv, 0);
  if (result != 0) {
    ndbout_c("Exiting single user mode failed.");
    printError();
  } else {
    ndbout_c("Exiting single user mode in progress.");
    ndbout_c("Use ALL STATUS to see when single user mode has been exited.");
  }
}

void
CommandInterpreter::executeStart(int processId, const char* parameters,
				 bool all) 
{
  int result;
  if(all) {
    result = ndb_mgm_start(m_mgmsrv, 0, 0);
  } else {
    result = ndb_mgm_start(m_mgmsrv, 1, &processId);
  }

  if (result <= 0) {
    ndbout << "Start failed." << endl;
    printError();
  } else
    {
      if(all)
	ndbout_c("NDB Cluster is being started.");
      else
	ndbout_c("Database node %d is being started.", processId);
    }
}

void
CommandInterpreter::executeRestart(int processId, const char* parameters,
				   bool all) 
{
  int result;
  int nostart = 0;
  int initialstart = 0;
  int abort = 0;

  if(parameters != 0 && strlen(parameters) != 0){
    char * tmpString = my_strdup(parameters,MYF(MY_WME));
    My_auto_ptr<char> ap1(tmpString);
    char * tmpPtr = 0;
    char * item = strtok_r(tmpString, " ", &tmpPtr);
    while(item != NULL){
      if(strcasecmp(item, "-N") == 0)
	nostart = 1;
      if(strcasecmp(item, "-I") == 0)
	initialstart = 1;
      if(strcasecmp(item, "-A") == 0)
	abort = 1;
      item = strtok_r(NULL, " ", &tmpPtr);
    }
  }

  if(all) {
    result = ndb_mgm_restart2(m_mgmsrv, 0, NULL, initialstart, nostart, abort);
  } else {
    int v[1];
    v[0] = processId;
    result = ndb_mgm_restart2(m_mgmsrv, 1, v, initialstart, nostart, abort);
  }
  
  if (result <= 0) {
    ndbout.println("Restart failed.", result);
    printError();
  } else
    {
      if(all)
	ndbout << "NDB Cluster is being restarted." << endl;
      else
	ndbout_c("Node %d is being restarted.", processId);
    }
}

void
CommandInterpreter::executeDumpState(int processId, const char* parameters,
				     bool all) 
{
  if(emptyString(parameters)){
    ndbout << "Expected argument" << endl;
    return;
  }

  Uint32 no = 0;
  int pars[25];
  
  char * tmpString = my_strdup(parameters,MYF(MY_WME));
  My_auto_ptr<char> ap1(tmpString);
  char * tmpPtr = 0;
  char * item = strtok_r(tmpString, " ", &tmpPtr);
  while(item != NULL){
    if (0x0 <= strtoll(item, NULL, 0) && strtoll(item, NULL, 0) <= 0xffffffff){
      pars[no] = strtoll(item, NULL, 0); 
    } else {
      ndbout << "Illegal value in argument to signal." << endl
	     << "(Value must be between 0 and 0xffffffff.)" 
	     << endl;
      return;
    }
    no++;
    item = strtok_r(NULL, " ", &tmpPtr);
  }
  ndbout << "Sending dump signal with data:" << endl;
  for (Uint32 i=0; i<no; i++) {
    ndbout.setHexFormat(1) << pars[i] << " ";
    if (!(i+1 & 0x3)) ndbout << endl;
  }
  
  struct ndb_mgm_reply reply;
  ndb_mgm_dump_state(m_mgmsrv, processId, pars, no, &reply);
}

void 
CommandInterpreter::executeStatus(int processId, 
				  const char* parameters, bool all) 
{
  if (! emptyString(parameters)) {
    ndbout_c("No parameters expected to this command.");
    return;
  }

  ndb_mgm_node_status status;
  Uint32 startPhase, version;
  bool system;
  
  struct ndb_mgm_cluster_state *cl;
  cl = ndb_mgm_get_status(m_mgmsrv);
  if(cl == NULL) {
    ndbout_c("Cannot get status of node %d.", processId);
    printError();
    return;
  }
  NdbAutoPtr<char> ap1((char*)cl);

  int i = 0;
  while((i < cl->no_of_nodes) && cl->node_states[i].node_id != processId)
    i++;
  if(cl->node_states[i].node_id != processId) {
    ndbout << processId << ": Node not found" << endl;
    return;
  }
  status = cl->node_states[i].node_status;
  startPhase = cl->node_states[i].start_phase;
  version = cl->node_states[i].version;

  ndbout << "Node " << processId << ": " << status_string(status);
  switch(status){
  case NDB_MGM_NODE_STATUS_STARTING:
    ndbout << " (Phase " << startPhase << ")";
    break;
  case NDB_MGM_NODE_STATUS_SHUTTING_DOWN:
    ndbout << " (Phase " << startPhase << ")";
    break;
  default:
    break;
  }
  if(status != NDB_MGM_NODE_STATUS_NO_CONTACT)
    ndbout_c(" (Version %d.%d.%d)", 
	     getMajor(version) ,
	     getMinor(version),
	     getBuild(version));
  else
    ndbout << endl;
}


//*****************************************************************************
//*****************************************************************************

void 
CommandInterpreter::executeLogLevel(int processId, const char* parameters, 
				    bool all) 
{
  (void) all;
  if (emptyString(parameters)) {
    ndbout << "Expected argument" << endl;
    return;
  } 
  BaseString tmp(parameters);
  Vector<BaseString> spec;
  tmp.split(spec, "=");
  if(spec.size() != 2){
    ndbout << "Invalid loglevel specification: " << parameters << endl;
    return;
  }

  spec[0].trim().ndb_toupper();
  int category = ndb_mgm_match_event_category(spec[0].c_str());
  if(category == NDB_MGM_ILLEGAL_EVENT_CATEGORY){
    category = atoi(spec[0].c_str());
    if(category < NDB_MGM_MIN_EVENT_CATEGORY ||
       category > NDB_MGM_MAX_EVENT_CATEGORY){
      ndbout << "Unknown category: \"" << spec[0].c_str() << "\"" << endl;
      return;
    }
  }
  
  int level = atoi(spec[1].c_str());
  if(level < 0 || level > 15){
    ndbout << "Invalid level: " << spec[1].c_str() << endl;
    return;
  }
  
  ndbout << "Executing LOGLEVEL on node " << processId << flush;

  struct ndb_mgm_reply reply;
  int result;
  result = ndb_mgm_set_loglevel_node(m_mgmsrv, 
				     processId,
				     (ndb_mgm_event_category)category,
				     level, 
				     &reply);
  
  if (result < 0) {
    ndbout_c(" failed.");
    printError();
  } else {
    ndbout_c(" OK!");
  }  
  
}

//*****************************************************************************
//*****************************************************************************
void CommandInterpreter::executeError(int processId, 
				      const char* parameters, bool /* all */) 
{
  if (emptyString(parameters)) {
    ndbout << "Missing error number." << endl;
    return;
  }

  // Copy parameters since strtok will modify it
  char* newpar = my_strdup(parameters,MYF(MY_WME)); 
  My_auto_ptr<char> ap1(newpar);
  char* firstParameter = strtok(newpar, " ");

  int errorNo;
  if (! convert(firstParameter, errorNo)) {
    ndbout << "Expected an integer." << endl;
    return;
  }

  char* allAfterFirstParameter = strtok(NULL, "\0");
  if (! emptyString(allAfterFirstParameter)) {
    ndbout << "Nothing expected after error number." << endl;
    return;
  }

  ndb_mgm_insert_error(m_mgmsrv, processId, errorNo, NULL);
}

//*****************************************************************************
//*****************************************************************************

void 
CommandInterpreter::executeLog(int processId,
			       const char* parameters, bool all) 
{
  struct ndb_mgm_reply reply;
  Vector<const char *> blocks;
  if (! parseBlockSpecification(parameters, blocks)) {
    return;
  }
  int len=1;
  Uint32 i;
  for(i=0; i<blocks.size(); i++) {
    len += strlen(blocks[i]) + 1;
  }
  char * blockNames = (char*)my_malloc(len,MYF(MY_WME));
  My_auto_ptr<char> ap1(blockNames);
  
  blockNames[0] = 0;
  for(i=0; i<blocks.size(); i++) {
    strcat(blockNames, blocks[i]);
    strcat(blockNames, "|");
  }
  
  int result = ndb_mgm_log_signals(m_mgmsrv,
				   processId, 
				   NDB_MGM_SIGNAL_LOG_MODE_INOUT, 
				   blockNames,
				   &reply);
  if (result != 0) {
    ndbout_c("Execute LOG on node %d failed.", processId);
    printError();
  }
}

//*****************************************************************************
//*****************************************************************************
void 
CommandInterpreter::executeLogIn(int /* processId */,
				 const char* parameters, bool /* all */) 
{
  ndbout << "Command LOGIN not implemented." << endl;
}

//*****************************************************************************
//*****************************************************************************
void 
CommandInterpreter::executeLogOut(int /*processId*/, 
				  const char* parameters, bool /*all*/) 
{
  ndbout << "Command LOGOUT not implemented." << endl;
}

//*****************************************************************************
//*****************************************************************************
void 
CommandInterpreter::executeLogOff(int /*processId*/,
				  const char* parameters, bool /*all*/) 
{
  ndbout << "Command LOGOFF not implemented." << endl;
}

//*****************************************************************************
//*****************************************************************************
void 
CommandInterpreter::executeTestOn(int processId,
				  const char* parameters, bool /*all*/) 
{
  if (! emptyString(parameters)) {
    ndbout << "No parameters expected to this command." << endl;
    return;
  }
  struct ndb_mgm_reply reply;
  int result = ndb_mgm_start_signallog(m_mgmsrv, processId, &reply);
  if (result != 0) {
    ndbout_c("Execute TESTON failed.");
    printError();
  }
}

//*****************************************************************************
//*****************************************************************************
void 
CommandInterpreter::executeTestOff(int processId,
				   const char* parameters, bool /*all*/) 
{
  if (! emptyString(parameters)) {
    ndbout << "No parameters expected to this command." << endl;
    return;
  }
  struct ndb_mgm_reply reply;
  int result = ndb_mgm_stop_signallog(m_mgmsrv, processId, &reply);
  if (result != 0) {
    ndbout_c("Execute TESTOFF failed.");
    printError();
  }
}


//*****************************************************************************
//*****************************************************************************
void 
CommandInterpreter::executeSet(int /*processId*/, 
			       const char* parameters, bool /*all*/) 
{
  if (emptyString(parameters)) {
    ndbout << "Missing parameter name." << endl;
    return;
  }
#if 0
  // Copy parameters since strtok will modify it
  char* newpar = my_strdup(parameters,MYF(MY_WME));
  My_auto_ptr<char> ap1(newpar);
  char* configParameterName = strtok(newpar, " ");

  char* allAfterParameterName = strtok(NULL, "\0");
  if (emptyString(allAfterParameterName)) {
    ndbout << "Missing parameter value." << endl;
    return;
  }

  char* value = strtok(allAfterParameterName, " ");

  char* allAfterValue = strtok(NULL, "\0");
  if (! emptyString(allAfterValue)) {
    ndbout << "Nothing expected after parameter value." << endl;
    return;
  }

  bool configBackupFileUpdated;
  bool configPrimaryFileUpdated;
  
  // TODO The handling of the primary and backup config files should be 
  // analysed further.
  // How it should be handled if only the backup is possible to write.

  int result = _mgmtSrvr.updateConfigParam(processId, configParameterName, 
					   value, configBackupFileUpdated, 
					   configPrimaryFileUpdated);
  if (result == 0) {
    if (configBackupFileUpdated && configPrimaryFileUpdated) {
      ndbout << "The configuration is updated." << endl;
    }
    else if (configBackupFileUpdated && !configPrimaryFileUpdated) {
      ndbout << "The configuration is updated but it was only possible " 
	     << "to update the backup configuration file, not the primary." 
	     << endl;
    }
    else {
      assert(false);
    }
  }
  else {
    ndbout << get_error_text(result) << endl;
    if (configBackupFileUpdated && configPrimaryFileUpdated) {
      ndbout << "The configuration files are however updated and "
	     << "the value will be used next time the process is restarted." 
	     << endl;
    }
    else if (configBackupFileUpdated && !configPrimaryFileUpdated) {
      ndbout << "It was only possible to update the backup "
	     << "configuration file, not the primary." << endl;
    }
    else if (!configBackupFileUpdated && !configPrimaryFileUpdated) {
      ndbout << "The configuration files are not updated." << endl;
    }
    else {
      // The primary is not tried to write if the write of backup file fails
      abort();
    }
  }
#endif
}

//*****************************************************************************
//*****************************************************************************
void CommandInterpreter::executeGetStat(int /*processId*/,
					const char* parameters, bool /*all*/) 
{
  if (! emptyString(parameters)) {
    ndbout << "No parameters expected to this command." << endl;
    return;
  }

#if 0
  MgmtSrvr::Statistics statistics;
  int result = _mgmtSrvr.getStatistics(processId, statistics);
  if (result != 0) {
    ndbout << get_error_text(result) << endl;
    return;
  }
#endif
  // Print statistic...
  /*
  ndbout << "Number of GETSTAT commands: " 
  << statistics._test1 << endl;
  */
}

//*****************************************************************************
//*****************************************************************************
				 
void 
CommandInterpreter::executeEventReporting(int processId,
					  const char* parameters, 
					  bool all) 
{
  if (emptyString(parameters)) {
    ndbout << "Expected argument" << endl;
    return;
  }
  BaseString tmp(parameters);
  Vector<BaseString> spec;
  tmp.split(spec, "=");
  if(spec.size() != 2){
    ndbout << "Invalid loglevel specification: " << parameters << endl;
    return;
  }

  spec[0].trim().ndb_toupper();
  int category = ndb_mgm_match_event_category(spec[0].c_str());
  if(category == NDB_MGM_ILLEGAL_EVENT_CATEGORY){
    if(!convert(spec[0].c_str(), category) ||
       category < NDB_MGM_MIN_EVENT_CATEGORY ||
       category > NDB_MGM_MAX_EVENT_CATEGORY){
      ndbout << "Unknown category: \"" << spec[0].c_str() << "\"" << endl;
      return;
    }
  }

  int level;
  if (!convert(spec[1].c_str(),level))
  {
    ndbout << "Invalid level: " << spec[1].c_str() << endl;
    return;
  }

  ndbout << "Executing CLUSTERLOG on node " << processId << flush;

  struct ndb_mgm_reply reply;
  int result;
  result = ndb_mgm_set_loglevel_clusterlog(m_mgmsrv, 
					   processId,
					   (ndb_mgm_event_category)category,
					   level, 
					   &reply);
  
  if (result != 0) {
    ndbout_c(" failed."); 
    printError();
  } else {
    ndbout_c(" OK!"); 
  }  
}

/*****************************************************************************
 * Backup
 *****************************************************************************/
int
CommandInterpreter::executeStartBackup(char* parameters)
{
  struct ndb_mgm_reply reply;
  unsigned int backupId;
#if 0
  int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_BACKUP, 0 };
  int fd = ndb_mgm_listen_event(m_mgmsrv, filter);
  if (fd < 0)
  {
    ndbout << "Initializing start of backup failed" << endl;
    printError();
    return fd;
  }
#endif
  Vector<BaseString> args;
  {
    BaseString(parameters).split(args);
    for (unsigned i= 0; i < args.size(); i++)
      if (args[i].length() == 0)
	args.erase(i--);
      else
	args[i].ndb_toupper();
  }
  int sz= args.size();

  int result;
  if (sz == 2 &&
      args[1] == "NOWAIT")
  {
    result = ndb_mgm_start_backup(m_mgmsrv, 0, &backupId, &reply);
  }
  else if (sz == 1 ||
	   (sz == 3 &&
	    args[1] == "WAIT" &&
	    args[2] == "COMPLETED"))
  {
    ndbout_c("Waiting for completed, this may take several minutes");
    result = ndb_mgm_start_backup(m_mgmsrv, 2, &backupId, &reply);
  }
  else if (sz == 3 &&
	   args[1] == "WAIT" &&
	   args[2] == "STARTED")
  {
    ndbout_c("Waiting for started, this may take several minutes");
    result = ndb_mgm_start_backup(m_mgmsrv, 1, &backupId, &reply);
  }
  else
  {
    invalid_command(parameters);
    return -1;
  }

  if (result != 0) {
    ndbout << "Start of backup failed" << endl;
    printError();
#if 0
    close(fd);
#endif
    return result;
  }
#if 0
  ndbout_c("Waiting for completed, this may take several minutes");
  char *tmp;
  char buf[1024];
  {
    SocketInputStream in(fd);
    int count = 0;
    do {
      tmp = in.gets(buf, 1024);
      if(tmp)
      {
	ndbout << tmp;
	unsigned int id;
	if(sscanf(tmp, "%*[^:]: Backup %d ", &id) == 1 && id == backupId){
	  count++;
	}
      }
    } while(count < 2);
  }

  SocketInputStream in(fd, 10);
  do {
    tmp = in.gets(buf, 1024);
    if(tmp && tmp[0] != 0)
    {
      ndbout << tmp;
    }
  } while(tmp && tmp[0] != 0);

  close(fd);
#endif  
  return 0;
}

void
CommandInterpreter::executeAbortBackup(char* parameters) 
{
  int bid = -1;
  struct ndb_mgm_reply reply;
  if (emptyString(parameters))
    goto executeAbortBackupError1;

  {
    strtok(parameters, " ");
    char* id = strtok(NULL, "\0");
    if(id == 0 || sscanf(id, "%d", &bid) != 1)
      goto executeAbortBackupError1;
  }
  {
    int result= ndb_mgm_abort_backup(m_mgmsrv, bid, &reply);
    if (result != 0) {
      ndbout << "Abort of backup " << bid << " failed" << endl;
      printError();
    } else {
      ndbout << "Abort of backup " << bid << " ordered" << endl;
    }
  }
  return;
 executeAbortBackupError1:
  ndbout << "Invalid arguments: expected <BackupId>" << endl;
  return;
}

#ifdef HAVE_GLOBAL_REPLICATION
/*****************************************************************************
 * Global Replication
 *
 * For information about the different commands, see
 * GrepReq::Request in file signaldata/grepImpl.cpp.
 *
 * Below are commands as of 2003-07-05 (may change!):
 * START = 0,            ///< Start Global Replication (all phases)
 * START_METALOG = 1,    ///< Start Global Replication (all phases)
 * START_METASCAN = 2,   ///< Start Global Replication (all phases)
 * START_DATALOG = 3,    ///< Start Global Replication (all phases)
 * START_DATASCAN = 4,   ///< Start Global Replication (all phases)
 * START_REQUESTOR = 5,  ///< Start Global Replication (all phases)
 * ABORT = 6,            ///< Immediate stop (removes subscription)
 * SLOW_STOP = 7,        ///< Stop after finishing applying current GCI epoch
 * FAST_STOP = 8,        ///< Stop after finishing applying all PS GCI epochs
 * START_TRANSFER = 9,   ///< Start SS-PS transfer
 * STOP_TRANSFER = 10,   ///< Stop SS-PS transfer
 * START_APPLY = 11,     ///< Start applying GCI epochs in SS
 * STOP_APPLY = 12,      ///< Stop applying GCI epochs in SS
 * STATUS = 13,           ///< Status
 * START_SUBSCR = 14,
 * REMOVE_BUFFERS = 15,
 * DROP_TABLE = 16

 *****************************************************************************/

void
CommandInterpreter::executeRep(char* parameters) 
{
  if (emptyString(parameters)) {
    ndbout << helpTextRep;
    return;
  }

  char * line = my_strdup(parameters,MYF(MY_WME));
  My_auto_ptr<char> ap1((char*)line);
  char * firstToken = strtok(line, " ");
  
  struct ndb_rep_reply  reply;
  unsigned int          repId;


  if (!strcasecmp(firstToken, "CONNECT")) {
    char * host = strtok(NULL, "\0");
    for (unsigned int i = 0; i < strlen(host); ++i) {
      host[i] = tolower(host[i]);
    }
    
    if(host == NULL)
    {
      ndbout_c("host:port must be specified.");
      return;
    }
    
    if(rep_connected) {
      if(m_repserver != NULL) {
	ndb_rep_disconnect(m_repserver);
	rep_connected = false;
      }       
    }
          
    if(m_repserver == NULL)
      m_repserver = ndb_rep_create_handle();
    if(ndb_rep_connect(m_repserver, host) < 0)
      ndbout_c("Failed to connect to %s", host); 
    else
      rep_connected=true;
    return;
    
    if(!rep_connected) {
      ndbout_c("Not connected to REP server");
    }
  }
    
  /********
   * START 
   ********/
  if (!strcasecmp(firstToken, "START")) {
    
    unsigned int          req;
    char *startType = strtok(NULL, "\0");
    
    if (startType == NULL) {                
      req = GrepReq::START;
    } else if (!strcasecmp(startType, "SUBSCRIPTION")) {  
      req = GrepReq::START_SUBSCR;
    } else if (!strcasecmp(startType, "METALOG")) { 
      req = GrepReq::START_METALOG;
    } else if (!strcasecmp(startType, "METASCAN")) {
      req = GrepReq::START_METASCAN;
    } else if (!strcasecmp(startType, "DATALOG")) {
      req = GrepReq::START_DATALOG;
    } else if (!strcasecmp(startType, "DATASCAN")) {
      req = GrepReq::START_DATASCAN;
    } else if (!strcasecmp(startType, "REQUESTOR")) {
      req = GrepReq::START_REQUESTOR;
    } else if (!strcasecmp(startType, "TRANSFER")) {
      req = GrepReq::START_TRANSFER;
    } else if (!strcasecmp(startType, "APPLY")) {
      req = GrepReq::START_APPLY;
    } else if (!strcasecmp(startType, "DELETE")) {
      req = GrepReq::START_DELETE;
    } else {
      ndbout_c("Illegal argument to command 'REPLICATION START'");
      return;
    }

    int result = ndb_rep_command(m_repserver, req, &repId, &reply);
    
    if (result != 0) {
      ndbout << "Start of Global Replication failed" << endl;
    } else {
      ndbout << "Start of Global Replication ordered" << endl;
    }
    return;
  }

  /********
   * STOP
   ********/
  if (!strcasecmp(firstToken, "STOP")) {    
    unsigned int          req;
    char *startType = strtok(NULL, " ");
    unsigned int epoch = 0;
    
    if (startType == NULL) {                 
      /**
       * Stop immediately
       */
      req = GrepReq::STOP;
    } else if (!strcasecmp(startType, "EPOCH")) {  
      char *strEpoch = strtok(NULL, "\0");
      if(strEpoch == NULL) {
	ndbout_c("Epoch expected!");
	return;
      }
      req = GrepReq::STOP;
      epoch=atoi(strEpoch);      
    } else if (!strcasecmp(startType, "SUBSCRIPTION")) {  
      req = GrepReq::STOP_SUBSCR;
    } else if (!strcasecmp(startType, "METALOG")) { 
      req = GrepReq::STOP_METALOG;
    } else if (!strcasecmp(startType, "METASCAN")) {
      req = GrepReq::STOP_METASCAN;
    } else if (!strcasecmp(startType, "DATALOG")) {
      req = GrepReq::STOP_DATALOG;
    } else if (!strcasecmp(startType, "DATASCAN")) {
      req = GrepReq::STOP_DATASCAN;
    } else if (!strcasecmp(startType, "REQUESTOR")) {
      req = GrepReq::STOP_REQUESTOR;
    } else if (!strcasecmp(startType, "TRANSFER")) {
      req = GrepReq::STOP_TRANSFER;
    } else if (!strcasecmp(startType, "APPLY")) {
      req = GrepReq::STOP_APPLY;
    } else if (!strcasecmp(startType, "DELETE")) {
      req = GrepReq::STOP_DELETE;
    } else {
      ndbout_c("Illegal argument to command 'REPLICATION STOP'");
      return;
    }
    int result = ndb_rep_command(m_repserver, req, &repId, &reply, epoch);
    
    if (result != 0) {
      ndbout << "Stop command failed" << endl;
    } else {
      ndbout << "Stop ordered" << endl;
    }
    return;
  }

  /*********
   * STATUS
   *********/
  if (!strcasecmp(firstToken, "STATUS")) {
    struct rep_state repstate;
    int result = 
      ndb_rep_get_status(m_repserver, &repId, &reply, &repstate);
    
    if (result != 0) {
      ndbout << "Status request of Global Replication failed" << endl;
    } else {
      ndbout << "Status request of Global Replication ordered" << endl;
      ndbout << "See printout at one of the DB nodes" << endl;
      ndbout << "(Better status report is under development.)" << endl;
      ndbout << " SubscriptionId " << repstate.subid 
	     << " SubscriptionKey " << repstate.subkey << endl;
    }
    return;
  }

  /*********
   * QUERY (see repapi.h for querable counters)
   *********/
  if (!strcasecmp(firstToken, "QUERY")) {
    char *query = strtok(NULL, "\0");
    int queryCounter=-1;
    if(query != NULL) {
      queryCounter = atoi(query);
    }
    struct rep_state repstate;
    unsigned repId = 0;
    int result = ndb_rep_query(m_repserver, (QueryCounter)queryCounter,
			       &repId, &reply, &repstate);
    
    if (result != 0) {
      ndbout << "Query repserver failed" << endl;
    } else {
      ndbout << "Query repserver sucessful" << endl;
      ndbout_c("repstate : QueryCounter %d, f=%d l=%d"
	       " nodegroups %d" , 
	       repstate.queryCounter,
	       repstate.first[0], repstate.last[0],
	       repstate.no_of_nodegroups );
    }
    return;
  }
}
#endif // HAVE_GLOBAL_REPLICATION

template class Vector<char const*>;