/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#include "ExtNDB.hpp"
#include "ConfigRetriever.hpp"
#include <NdbSleep.h>

#include <NdbApiSignal.hpp>

#include <signaldata/DictTabInfo.hpp>
#include <signaldata/GetTabInfo.hpp>
#include <signaldata/SumaImpl.hpp>
#include <AttributeHeader.hpp>
#include <rep/rep_version.hpp>
#include <ndb_limits.h>

/*****************************************************************************
 * Constructor / Destructor / Init
 *****************************************************************************/
ExtNDB::ExtNDB(GCIContainerPS * gciContainer, ExtAPI * extAPI)
{
  m_grepSender = new ExtSender();
  if (!m_grepSender) REPABORT("Could not allocate object");
  m_gciContainerPS = gciContainer;

  m_nodeGroupInfo = new NodeGroupInfo();
  m_gciContainerPS->setNodeGroupInfo(m_nodeGroupInfo);
  
  m_doneSetGrepSender = false;
  m_subId = 0;
  m_subKey = 0;
  m_firstGCI = 0;
  m_dataLogStarted = false;

  m_extAPI = extAPI;
  if (!m_extAPI) REPABORT("Could not allocate object");
}

ExtNDB::~ExtNDB()
{
  delete m_grepSender;
  delete m_nodeGroupInfo;
}    

void 
ExtNDB::signalErrorHandler(NdbApiSignal  * signal, Uint32 nodeId) 
{
  //const Uint32 gsn = signal->readSignalNumber();
  //const Uint32 len = signal->getLength();
  RLOG(("Send signal failed. Signal %p", signal));
}

bool
ExtNDB::init(const char * connectString) 
{
  m_signalExecThread = NdbThread_Create(signalExecThread_C,
					(void **)this,
					32768,
					"ExtNDB_Service",
					NDB_THREAD_PRIO_LOW);

#if 0
  /**
   * I don't see that this does anything
   *
   * Jonas 13/2-04
   */
  ConfigRetriever cr; cr.setConnectString(connectString);

  ndb_mgm_configuration * config = cr.getConfig(NDB_VERSION, NODE_TYPE_REP);
  if (config == 0) {
    ndbout << "ExtNDB: Configuration error: ";
    const char* erString = cr.getErrorString();
    if (erString == 0) {
      erString = "No error specified!";
    }
    ndbout << erString << endl;
    return false;
  }
  NdbAutoPtr autoPtr(config);
  m_ownNodeId = r.getOwnNodeId();
  
  /**
   * Check which GREPs to connect to (in configuration)
   * 
   * @note SYSTEM LIMITATION: Only connects to one GREP
   */
  Uint32 noOfConnections=0;
  NodeId grepNodeId=0;
  const Properties * connection;

  config->get("NoOfConnections", &noOfConnections);
  for (Uint32 i=0; i<noOfConnections; i++) {
    Uint32 nodeId1, nodeId2;
    config->get("Connection", i, &connection);
    connection->get("NodeId1", &nodeId1);
    connection->get("NodeId2", &nodeId2);
    if (!connection->contains("System1") &&
	!connection->contains("System2") &&
	(nodeId1 == m_ownNodeId || nodeId2 == m_ownNodeId)) {
      /**
       * Found connection 
       */
      if (nodeId1 == m_ownNodeId) {
	grepNodeId = nodeId2;
      } else {
	grepNodeId = nodeId1;
      }
    }
  }
#endif

  m_transporterFacade = TransporterFacade::instance();
  
  assert(m_transporterFacade != 0);
  
  m_ownBlockNo = m_transporterFacade->open(this, execSignal, execNodeStatus);
  assert(m_ownBlockNo > 0);
  m_ownRef = numberToRef(m_ownBlockNo, m_ownNodeId);
  ndbout_c("EXTNDB blockno %d ownref %d ", m_ownBlockNo, m_ownRef);
  assert(m_ownNodeId == m_transporterFacade->ownId());
  
  m_grepSender->setOwnRef(m_ownRef);
  m_grepSender->setTransporterFacade(m_transporterFacade);

  if(!m_grepSender->connected(50000)){
    ndbout_c("ExtNDB: Failed to connect to DB nodes!");
    ndbout_c("ExtNDB: Tried to create transporter as (node %d, block %d).",
	     m_ownNodeId, m_ownBlockNo);
    ndbout_c("ExtNDB: Check that DB nodes are started.");
    return false; 
  }
  ndbout_c("Phase 3 (ExtNDB): Connection %d to NDB Cluster opened (Extractor)",
	   m_ownBlockNo);
  
  for (Uint32 i=1; i<MAX_NDB_NODES; i++) {
    if (m_transporterFacade->getIsDbNode(i) && 
	m_transporterFacade->getIsNodeSendable(i)) 
      {
	Uint32 nodeGrp = m_transporterFacade->getNodeGrp(i);
	m_nodeGroupInfo->addNodeToNodeGrp(i, true, nodeGrp);
	Uint32 nodeId = m_nodeGroupInfo->getFirstConnectedNode(nodeGrp);
	m_grepSender->setNodeId(nodeId);
	if(m_nodeGroupInfo->getPrimaryNode(nodeGrp) == 0) {
	  m_nodeGroupInfo->setPrimaryNode(nodeGrp, nodeId);
	}
	m_doneSetGrepSender = true;
#if 0
	RLOG(("Added node %d to node group %d", i, nodeGrp));
#endif
    }
  }

  return true;
}

/*****************************************************************************
 * Signal Queue Executor
 *****************************************************************************/

class SigMatch 
{
public:
  int gsn;
  void (ExtNDB::* function)(NdbApiSignal *signal);

  SigMatch() { gsn = 0; function = NULL; };

  SigMatch(int _gsn, void (ExtNDB::* _function)(NdbApiSignal *signal)) {
    gsn = _gsn;
    function = _function;
  };

  bool check(NdbApiSignal *signal) {
    if(signal->readSignalNumber() == gsn)
      return true;
    return false;
  };
};

extern "C"
void *signalExecThread_C(void *r) 
{
  ExtNDB *grepps = (ExtNDB*)r;

  grepps->signalExecThreadRun();

  NdbThread_Exit(0);
  /* NOTREACHED */
  return 0;
}


void
ExtNDB::signalExecThreadRun() 
{
  Vector<SigMatch> sl;

  /**
   * Signals to be executed
   */
  sl.push_back(SigMatch(GSN_SUB_GCP_COMPLETE_REP, 
			&ExtNDB::execSUB_GCP_COMPLETE_REP));
  
  /**
   * Is also forwarded to SSCoord
   */
  sl.push_back(SigMatch(GSN_GREP_SUB_START_CONF,
			&ExtNDB::execGREP_SUB_START_CONF));
  sl.push_back(SigMatch(GSN_GREP_SUB_CREATE_CONF,
			&ExtNDB::execGREP_SUB_CREATE_CONF));
  sl.push_back(SigMatch(GSN_GREP_SUB_REMOVE_CONF, 
			&ExtNDB::execGREP_SUB_REMOVE_CONF));
  /**
   * Signals to be forwarded 
   */  
  sl.push_back(SigMatch(GSN_GREP_CREATE_SUBID_CONF, 
			&ExtNDB::execGREP_CREATE_SUBID_CONF));

  sl.push_back(SigMatch(GSN_GREP_SUB_SYNC_CONF, &ExtNDB::sendSignalRep));

  sl.push_back(SigMatch(GSN_GREP_SUB_REMOVE_REF, &ExtNDB::sendSignalRep));
  sl.push_back(SigMatch(GSN_GREP_SUB_SYNC_REF, &ExtNDB::sendSignalRep));
  sl.push_back(SigMatch(GSN_GREP_CREATE_SUBID_REF, &ExtNDB::sendSignalRep));

  sl.push_back(SigMatch(GSN_GREP_SUB_START_REF, &ExtNDB::sendSignalRep));
  sl.push_back(SigMatch(GSN_GREP_SUB_CREATE_REF, &ExtNDB::sendSignalRep));


  while(1) {
    SigMatch *handler = NULL;
    NdbApiSignal *signal = NULL;

    if(m_signalRecvQueue.waitFor(sl, handler, signal, DEFAULT_TIMEOUT)) {
#if 0
      RLOG(("Removed signal from queue (GSN: %d, QSize: %d)",
	    signal->readSignalNumber(), m_signalRecvQueue.size()));
#endif
      if(handler->function != 0) {
	(this->*handler->function)(signal);
	delete signal;  signal = 0;
      } else {
	REPABORT("Illegal handler for signal");
      }
    }
  }
}

void
ExtNDB::sendSignalRep(NdbApiSignal * s)
{
  if(m_repSender->sendSignal(s) == -1)
  {
    signalErrorHandler(s, 0);
  }
}

void
ExtNDB::execSignal(void* executorObj, NdbApiSignal* signal, 
		   class LinearSectionPtr ptr[3])
{
  ExtNDB * executor = (ExtNDB*)executorObj;
     
  const Uint32 gsn = signal->readSignalNumber();
  const Uint32 len = signal->getLength();

  NdbApiSignal * s = new NdbApiSignal(executor->m_ownRef);
  switch(gsn){
  case GSN_SUB_GCP_COMPLETE_REP:
  case GSN_GREP_CREATE_SUBID_CONF:
  case GSN_GREP_SUB_CREATE_CONF:
  case GSN_GREP_SUB_START_CONF:
  case GSN_GREP_SUB_SYNC_CONF:
  case GSN_GREP_SUB_REMOVE_CONF:
  case GSN_GREP_CREATE_SUBID_REF:
  case GSN_GREP_SUB_CREATE_REF:
  case GSN_GREP_SUB_START_REF:
  case GSN_GREP_SUB_SYNC_REF:
  case GSN_GREP_SUB_REMOVE_REF:
    s->set(0, SSREPBLOCKNO, gsn, len);
    memcpy(s->getDataPtrSend(), signal->getDataPtr(), 4 * len);
    executor->m_signalRecvQueue.receive(s);    
    break;
  case GSN_SUB_TABLE_DATA:
    executor->execSUB_TABLE_DATA(signal, ptr);
    delete s;  s=0;
    break;
  case GSN_SUB_META_DATA:
    executor->execSUB_META_DATA(signal, ptr);
    delete s;  s=0;
    break;
  default:
    REPABORT1("Illegal signal received in execSignal", gsn);
  }
  s=0;
#if 0
  ndbout_c("ExtNDB: Inserted signal into queue (GSN: %d, Len: %d)",
	   signal->readSignalNumber(), len);
#endif
}

void 
ExtNDB::execNodeStatus(void* obj, Uint16 nodeId, bool alive, bool nfCompleted)
{
  ExtNDB * thisObj = (ExtNDB*)obj;

  RLOG(("Changed node status (Id %d, Alive %d, nfCompleted %d)",
	nodeId, alive, nfCompleted));
  
  if(alive) {
    /**
     *  Connected
     */
    Uint32 nodeGrp = thisObj->m_transporterFacade->getNodeGrp(nodeId);
    RLOG(("DB node %d of node group %d connected", nodeId, nodeGrp));
  
    thisObj->m_nodeGroupInfo->addNodeToNodeGrp(nodeId, true, nodeGrp);
    Uint32 firstNode = thisObj->m_nodeGroupInfo->getPrimaryNode(nodeGrp);
      
    if(firstNode == 0)
      thisObj->m_nodeGroupInfo->setPrimaryNode(nodeGrp, nodeId);

    if (!thisObj->m_doneSetGrepSender) {
      thisObj->m_grepSender->setNodeId(firstNode);
      thisObj->m_doneSetGrepSender = true;
    }

    RLOG(("Connect: First connected node in nodegroup: %d", 
	  thisObj->m_nodeGroupInfo->getPrimaryNode(nodeGrp)));

  } else if (!nfCompleted) {
    
    /**
     *  Set node as "disconnected" in m_nodeGroupInfo until 
     *  node comes up again.
     */
    Uint32 nodeGrp = thisObj->m_transporterFacade->getNodeGrp(nodeId);
    RLOG(("DB node %d of node group %d disconnected", 
	  nodeId, nodeGrp));
    thisObj->m_nodeGroupInfo->setConnectStatus(nodeId, false);
    /**
     * The node that crashed was also the primary node, the we must change
     * primary node 
     */
    if(nodeId == thisObj->m_nodeGroupInfo->getPrimaryNode(nodeGrp)) {
      Uint32 node = thisObj->m_nodeGroupInfo->getFirstConnectedNode(nodeGrp);
      if(node > 0) {
	thisObj->m_grepSender->setNodeId(node);
	thisObj->m_nodeGroupInfo->setPrimaryNode(nodeGrp, node);
      }
      else {
	thisObj->sendDisconnectRep(nodeGrp);
      }
    }
    RLOG(("Disconnect: First connected node in nodegroup: %d", 
	  thisObj->m_nodeGroupInfo->getPrimaryNode(nodeGrp)));

  } else if(nfCompleted) {
  } else {
    REPABORT("Function execNodeStatus with wrong parameters");
  }
}

/*****************************************************************************
 * Signal Receivers for LOG and SCAN
 *****************************************************************************/

/**
 * Receive datalog/datascan from GREP/SUMA
 */
void
ExtNDB::execSUB_TABLE_DATA(NdbApiSignal * signal, LinearSectionPtr ptr[3])
{
  SubTableData * const data = (SubTableData*)signal->getDataPtr();
  Uint32 tableId            = data->tableId;
  Uint32 operation          = data->operation;
  Uint32 gci                = data->gci;
  Uint32 nodeId             = refToNode(signal->theSendersBlockRef);

  if((SubTableData::LogType)data->logType == SubTableData::SCAN) 
  {
    Uint32 nodeGrp =  m_nodeGroupInfo->findNodeGroup(nodeId);

    NodeGroupInfo::iterator * it;  
    it = new NodeGroupInfo::iterator(nodeGrp, m_nodeGroupInfo);
    for(NodeConnectInfo * nci=it->first(); it->exists();nci=it->next()) {
      m_gciContainerPS->insertLogRecord(nci->nodeId, tableId, 
					operation, ptr, gci);	
    }
    delete it;  it = 0;
  } else {
    m_gciContainerPS->insertLogRecord(nodeId, tableId, operation, ptr, gci);   
  }
}

/**
 * Receive metalog/metascan from GREP/SUMA
 */
void
ExtNDB::execSUB_META_DATA(NdbApiSignal * signal, LinearSectionPtr ptr[3]) 
{
  Uint32 nodeId = refToNode(signal->theSendersBlockRef);
  SubMetaData * const data = (SubMetaData*)signal->getDataPtr();
  Uint32 tableId           = data->tableId;
  Uint32 gci               = data->gci;

  Uint32 nodeGrp = m_nodeGroupInfo->findNodeGroup(nodeId);

  NodeGroupInfo::iterator * it;  
  it = new NodeGroupInfo::iterator(nodeGrp, m_nodeGroupInfo);
  for(NodeConnectInfo * nci=it->first(); it->exists();nci=it->next()) {
    m_gciContainerPS->insertMetaRecord(nci->nodeId, tableId, ptr, gci);
    RLOG(("Received meta record in %d[%d]", nci->nodeId, gci));
  }

  delete it;  it = 0;    
}


/*****************************************************************************
 * Signal Receivers (Signals that are actually just forwarded to SS REP)
 *****************************************************************************/

void 
ExtNDB::execGREP_CREATE_SUBID_CONF(NdbApiSignal * signal) 
{
  CreateSubscriptionIdConf const * conf = 
    (CreateSubscriptionIdConf *)signal->getDataPtr();
  Uint32 subId  = conf->subscriptionId;
  Uint32 subKey = conf->subscriptionKey;
  ndbout_c("GREP_CREATE_SUBID_CONF m_extAPI=%p\n", m_extAPI);
  m_extAPI->eventSubscriptionIdCreated(subId, subKey);
}

/*****************************************************************************
 * Signal Receivers 
 *****************************************************************************/

/**
 * Receive information about completed GCI from GREP/SUMA
 *
 * GCI completed, i.e. no more unsent log records exists in SUMA
 * @todo use node id to identify buffers?
 */
void
ExtNDB::execSUB_GCP_COMPLETE_REP(NdbApiSignal * signal) 
{
  SubGcpCompleteRep * const rep = (SubGcpCompleteRep*)signal->getDataPtr();
  const Uint32 gci              = rep->gci;
  Uint32 nodeId                 = refToNode(rep->senderRef);

  RLOG(("Epoch %d completed at node %d", gci, nodeId));
  m_gciContainerPS->setCompleted(gci, nodeId);

  if(m_firstGCI == gci && !m_dataLogStarted) {
    sendGREP_SUB_START_CONF(signal, m_firstGCI);
    m_dataLogStarted = true;
  }
}

/**
 * Send info that scan is competed to SS REP
 *
 * @todo  Use node id to identify buffers?
 */
void 
ExtNDB::sendGREP_SUB_START_CONF(NdbApiSignal * signal, Uint32 gci)
{
  RLOG(("Datalog started (Epoch %d)", gci));
  GrepSubStartConf * conf = (GrepSubStartConf *)signal->getDataPtrSend();  
  conf->firstGCI                = gci;
  conf->subscriptionId          = m_subId;
  conf->subscriptionKey         = m_subKey;
  conf->part                    = SubscriptionData::TableData;
  signal->m_noOfSections = 0;
  signal->set(0, SSREPBLOCKNO, GSN_GREP_SUB_START_CONF,
	      GrepSubStartConf::SignalLength);  
  sendSignalRep(signal);
}

/**
 * Scan is completed... says SUMA/GREP
 *
 * @todo  Use node id to identify buffers?
 */
void 
ExtNDB::execGREP_SUB_START_CONF(NdbApiSignal * signal)
{
  GrepSubStartConf * const conf = (GrepSubStartConf *)signal->getDataPtr();  
  Uint32 part                   = conf->part;
  //Uint32 nodeId                 = refToNode(conf->senderRef);
  m_firstGCI                    = conf->firstGCI;

  if (part == SubscriptionData::TableData) {
    RLOG(("Datalog started (Epoch %d)", m_firstGCI));
    return;
  } 
  RLOG(("Metalog started (Epoch %d)", m_firstGCI));

  signal->set(0, SSREPBLOCKNO, GSN_GREP_SUB_START_CONF,
	      GrepSubStartConf::SignalLength);  
  sendSignalRep(signal);
}

/**
 * Receive no of node groups that PS has and pass signal on to SS
 */
void 
ExtNDB::execGREP_SUB_CREATE_CONF(NdbApiSignal * signal) 
{
  GrepSubCreateConf * conf = (GrepSubCreateConf *)signal->getDataPtrSend();  
  m_subId                  = conf->subscriptionId;
  m_subKey                 = conf->subscriptionKey;

  conf->noOfNodeGroups  = m_nodeGroupInfo->getNoOfNodeGroups();
  sendSignalRep(signal);
}

/**
 * Receive conf that subscription has been remove in GREP/SUMA
 *
 * Pass signal on to TransPS
 */
void 
ExtNDB::execGREP_SUB_REMOVE_CONF(NdbApiSignal * signal) 
{  
  m_gciContainerPS->reset();
  sendSignalRep(signal);
}

/**
 * If all PS nodes has disconnected, then remove all epochs 
 * for this subscription.
 */
void
ExtNDB::sendDisconnectRep(Uint32 nodeId) 
{
  NdbApiSignal * signal = new NdbApiSignal(m_ownRef);
  signal->set(0, SSREPBLOCKNO, GSN_REP_DISCONNECT_REP,
	      RepDisconnectRep::SignalLength);
  RepDisconnectRep * rep = (RepDisconnectRep*) signal->getDataPtrSend();
  rep->nodeId = nodeId;
  rep->subId  = m_subId;
  rep->subKey = m_subKey;
  sendSignalRep(signal);
}