/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include "ExtNDB.hpp" #include "ConfigRetriever.hpp" #include <NdbSleep.h> #include <NdbApiSignal.hpp> #include <signaldata/DictTabInfo.hpp> #include <signaldata/GetTabInfo.hpp> #include <signaldata/SumaImpl.hpp> #include <AttributeHeader.hpp> #include <rep/rep_version.hpp> #include <ndb_limits.h> /***************************************************************************** * Constructor / Destructor / Init *****************************************************************************/ ExtNDB::ExtNDB(GCIContainerPS * gciContainer, ExtAPI * extAPI) { m_grepSender = new ExtSender(); if (!m_grepSender) REPABORT("Could not allocate object"); m_gciContainerPS = gciContainer; m_nodeGroupInfo = new NodeGroupInfo(); m_gciContainerPS->setNodeGroupInfo(m_nodeGroupInfo); m_doneSetGrepSender = false; m_subId = 0; m_subKey = 0; m_firstGCI = 0; m_dataLogStarted = false; m_extAPI = extAPI; if (!m_extAPI) REPABORT("Could not allocate object"); } ExtNDB::~ExtNDB() { delete m_grepSender; delete m_nodeGroupInfo; } void ExtNDB::signalErrorHandler(NdbApiSignal * signal, Uint32 nodeId) { //const Uint32 gsn = signal->readSignalNumber(); //const Uint32 len = signal->getLength(); RLOG(("Send signal failed. Signal %p", signal)); } bool ExtNDB::init(const char * connectString) { m_signalExecThread = NdbThread_Create(signalExecThread_C, (void **)this, 32768, "ExtNDB_Service", NDB_THREAD_PRIO_LOW); #if 0 /** * I don't see that this does anything * * Jonas 13/2-04 */ ConfigRetriever cr; cr.setConnectString(connectString); ndb_mgm_configuration * config = cr.getConfig(NDB_VERSION, NODE_TYPE_REP); if (config == 0) { ndbout << "ExtNDB: Configuration error: "; const char* erString = cr.getErrorString(); if (erString == 0) { erString = "No error specified!"; } ndbout << erString << endl; return false; } NdbAutoPtr autoPtr(config); m_ownNodeId = r.getOwnNodeId(); /** * Check which GREPs to connect to (in configuration) * * @note SYSTEM LIMITATION: Only connects to one GREP */ Uint32 noOfConnections=0; NodeId grepNodeId=0; const Properties * connection; config->get("NoOfConnections", &noOfConnections); for (Uint32 i=0; i<noOfConnections; i++) { Uint32 nodeId1, nodeId2; config->get("Connection", i, &connection); connection->get("NodeId1", &nodeId1); connection->get("NodeId2", &nodeId2); if (!connection->contains("System1") && !connection->contains("System2") && (nodeId1 == m_ownNodeId || nodeId2 == m_ownNodeId)) { /** * Found connection */ if (nodeId1 == m_ownNodeId) { grepNodeId = nodeId2; } else { grepNodeId = nodeId1; } } } #endif m_transporterFacade = TransporterFacade::instance(); assert(m_transporterFacade != 0); m_ownBlockNo = m_transporterFacade->open(this, execSignal, execNodeStatus); assert(m_ownBlockNo > 0); m_ownRef = numberToRef(m_ownBlockNo, m_ownNodeId); ndbout_c("EXTNDB blockno %d ownref %d ", m_ownBlockNo, m_ownRef); assert(m_ownNodeId == m_transporterFacade->ownId()); m_grepSender->setOwnRef(m_ownRef); m_grepSender->setTransporterFacade(m_transporterFacade); if(!m_grepSender->connected(50000)){ ndbout_c("ExtNDB: Failed to connect to DB nodes!"); ndbout_c("ExtNDB: Tried to create transporter as (node %d, block %d).", m_ownNodeId, m_ownBlockNo); ndbout_c("ExtNDB: Check that DB nodes are started."); return false; } ndbout_c("Phase 3 (ExtNDB): Connection %d to NDB Cluster opened (Extractor)", m_ownBlockNo); for (Uint32 i=1; i<MAX_NDB_NODES; i++) { if (m_transporterFacade->getIsDbNode(i) && m_transporterFacade->getIsNodeSendable(i)) { Uint32 nodeGrp = m_transporterFacade->getNodeGrp(i); m_nodeGroupInfo->addNodeToNodeGrp(i, true, nodeGrp); Uint32 nodeId = m_nodeGroupInfo->getFirstConnectedNode(nodeGrp); m_grepSender->setNodeId(nodeId); if(m_nodeGroupInfo->getPrimaryNode(nodeGrp) == 0) { m_nodeGroupInfo->setPrimaryNode(nodeGrp, nodeId); } m_doneSetGrepSender = true; #if 0 RLOG(("Added node %d to node group %d", i, nodeGrp)); #endif } } return true; } /***************************************************************************** * Signal Queue Executor *****************************************************************************/ class SigMatch { public: int gsn; void (ExtNDB::* function)(NdbApiSignal *signal); SigMatch() { gsn = 0; function = NULL; }; SigMatch(int _gsn, void (ExtNDB::* _function)(NdbApiSignal *signal)) { gsn = _gsn; function = _function; }; bool check(NdbApiSignal *signal) { if(signal->readSignalNumber() == gsn) return true; return false; }; }; extern "C" void *signalExecThread_C(void *r) { ExtNDB *grepps = (ExtNDB*)r; grepps->signalExecThreadRun(); NdbThread_Exit(0); /* NOTREACHED */ return 0; } void ExtNDB::signalExecThreadRun() { Vector<SigMatch> sl; /** * Signals to be executed */ sl.push_back(SigMatch(GSN_SUB_GCP_COMPLETE_REP, &ExtNDB::execSUB_GCP_COMPLETE_REP)); /** * Is also forwarded to SSCoord */ sl.push_back(SigMatch(GSN_GREP_SUB_START_CONF, &ExtNDB::execGREP_SUB_START_CONF)); sl.push_back(SigMatch(GSN_GREP_SUB_CREATE_CONF, &ExtNDB::execGREP_SUB_CREATE_CONF)); sl.push_back(SigMatch(GSN_GREP_SUB_REMOVE_CONF, &ExtNDB::execGREP_SUB_REMOVE_CONF)); /** * Signals to be forwarded */ sl.push_back(SigMatch(GSN_GREP_CREATE_SUBID_CONF, &ExtNDB::execGREP_CREATE_SUBID_CONF)); sl.push_back(SigMatch(GSN_GREP_SUB_SYNC_CONF, &ExtNDB::sendSignalRep)); sl.push_back(SigMatch(GSN_GREP_SUB_REMOVE_REF, &ExtNDB::sendSignalRep)); sl.push_back(SigMatch(GSN_GREP_SUB_SYNC_REF, &ExtNDB::sendSignalRep)); sl.push_back(SigMatch(GSN_GREP_CREATE_SUBID_REF, &ExtNDB::sendSignalRep)); sl.push_back(SigMatch(GSN_GREP_SUB_START_REF, &ExtNDB::sendSignalRep)); sl.push_back(SigMatch(GSN_GREP_SUB_CREATE_REF, &ExtNDB::sendSignalRep)); while(1) { SigMatch *handler = NULL; NdbApiSignal *signal = NULL; if(m_signalRecvQueue.waitFor(sl, handler, signal, DEFAULT_TIMEOUT)) { #if 0 RLOG(("Removed signal from queue (GSN: %d, QSize: %d)", signal->readSignalNumber(), m_signalRecvQueue.size())); #endif if(handler->function != 0) { (this->*handler->function)(signal); delete signal; signal = 0; } else { REPABORT("Illegal handler for signal"); } } } } void ExtNDB::sendSignalRep(NdbApiSignal * s) { if(m_repSender->sendSignal(s) == -1) { signalErrorHandler(s, 0); } } void ExtNDB::execSignal(void* executorObj, NdbApiSignal* signal, class LinearSectionPtr ptr[3]) { ExtNDB * executor = (ExtNDB*)executorObj; const Uint32 gsn = signal->readSignalNumber(); const Uint32 len = signal->getLength(); NdbApiSignal * s = new NdbApiSignal(executor->m_ownRef); switch(gsn){ case GSN_SUB_GCP_COMPLETE_REP: case GSN_GREP_CREATE_SUBID_CONF: case GSN_GREP_SUB_CREATE_CONF: case GSN_GREP_SUB_START_CONF: case GSN_GREP_SUB_SYNC_CONF: case GSN_GREP_SUB_REMOVE_CONF: case GSN_GREP_CREATE_SUBID_REF: case GSN_GREP_SUB_CREATE_REF: case GSN_GREP_SUB_START_REF: case GSN_GREP_SUB_SYNC_REF: case GSN_GREP_SUB_REMOVE_REF: s->set(0, SSREPBLOCKNO, gsn, len); memcpy(s->getDataPtrSend(), signal->getDataPtr(), 4 * len); executor->m_signalRecvQueue.receive(s); break; case GSN_SUB_TABLE_DATA: executor->execSUB_TABLE_DATA(signal, ptr); delete s; s=0; break; case GSN_SUB_META_DATA: executor->execSUB_META_DATA(signal, ptr); delete s; s=0; break; default: REPABORT1("Illegal signal received in execSignal", gsn); } s=0; #if 0 ndbout_c("ExtNDB: Inserted signal into queue (GSN: %d, Len: %d)", signal->readSignalNumber(), len); #endif } void ExtNDB::execNodeStatus(void* obj, Uint16 nodeId, bool alive, bool nfCompleted) { ExtNDB * thisObj = (ExtNDB*)obj; RLOG(("Changed node status (Id %d, Alive %d, nfCompleted %d)", nodeId, alive, nfCompleted)); if(alive) { /** * Connected */ Uint32 nodeGrp = thisObj->m_transporterFacade->getNodeGrp(nodeId); RLOG(("DB node %d of node group %d connected", nodeId, nodeGrp)); thisObj->m_nodeGroupInfo->addNodeToNodeGrp(nodeId, true, nodeGrp); Uint32 firstNode = thisObj->m_nodeGroupInfo->getPrimaryNode(nodeGrp); if(firstNode == 0) thisObj->m_nodeGroupInfo->setPrimaryNode(nodeGrp, nodeId); if (!thisObj->m_doneSetGrepSender) { thisObj->m_grepSender->setNodeId(firstNode); thisObj->m_doneSetGrepSender = true; } RLOG(("Connect: First connected node in nodegroup: %d", thisObj->m_nodeGroupInfo->getPrimaryNode(nodeGrp))); } else if (!nfCompleted) { /** * Set node as "disconnected" in m_nodeGroupInfo until * node comes up again. */ Uint32 nodeGrp = thisObj->m_transporterFacade->getNodeGrp(nodeId); RLOG(("DB node %d of node group %d disconnected", nodeId, nodeGrp)); thisObj->m_nodeGroupInfo->setConnectStatus(nodeId, false); /** * The node that crashed was also the primary node, the we must change * primary node */ if(nodeId == thisObj->m_nodeGroupInfo->getPrimaryNode(nodeGrp)) { Uint32 node = thisObj->m_nodeGroupInfo->getFirstConnectedNode(nodeGrp); if(node > 0) { thisObj->m_grepSender->setNodeId(node); thisObj->m_nodeGroupInfo->setPrimaryNode(nodeGrp, node); } else { thisObj->sendDisconnectRep(nodeGrp); } } RLOG(("Disconnect: First connected node in nodegroup: %d", thisObj->m_nodeGroupInfo->getPrimaryNode(nodeGrp))); } else if(nfCompleted) { } else { REPABORT("Function execNodeStatus with wrong parameters"); } } /***************************************************************************** * Signal Receivers for LOG and SCAN *****************************************************************************/ /** * Receive datalog/datascan from GREP/SUMA */ void ExtNDB::execSUB_TABLE_DATA(NdbApiSignal * signal, LinearSectionPtr ptr[3]) { SubTableData * const data = (SubTableData*)signal->getDataPtr(); Uint32 tableId = data->tableId; Uint32 operation = data->operation; Uint32 gci = data->gci; Uint32 nodeId = refToNode(signal->theSendersBlockRef); if((SubTableData::LogType)data->logType == SubTableData::SCAN) { Uint32 nodeGrp = m_nodeGroupInfo->findNodeGroup(nodeId); NodeGroupInfo::iterator * it; it = new NodeGroupInfo::iterator(nodeGrp, m_nodeGroupInfo); for(NodeConnectInfo * nci=it->first(); it->exists();nci=it->next()) { m_gciContainerPS->insertLogRecord(nci->nodeId, tableId, operation, ptr, gci); } delete it; it = 0; } else { m_gciContainerPS->insertLogRecord(nodeId, tableId, operation, ptr, gci); } } /** * Receive metalog/metascan from GREP/SUMA */ void ExtNDB::execSUB_META_DATA(NdbApiSignal * signal, LinearSectionPtr ptr[3]) { Uint32 nodeId = refToNode(signal->theSendersBlockRef); SubMetaData * const data = (SubMetaData*)signal->getDataPtr(); Uint32 tableId = data->tableId; Uint32 gci = data->gci; Uint32 nodeGrp = m_nodeGroupInfo->findNodeGroup(nodeId); NodeGroupInfo::iterator * it; it = new NodeGroupInfo::iterator(nodeGrp, m_nodeGroupInfo); for(NodeConnectInfo * nci=it->first(); it->exists();nci=it->next()) { m_gciContainerPS->insertMetaRecord(nci->nodeId, tableId, ptr, gci); RLOG(("Received meta record in %d[%d]", nci->nodeId, gci)); } delete it; it = 0; } /***************************************************************************** * Signal Receivers (Signals that are actually just forwarded to SS REP) *****************************************************************************/ void ExtNDB::execGREP_CREATE_SUBID_CONF(NdbApiSignal * signal) { CreateSubscriptionIdConf const * conf = (CreateSubscriptionIdConf *)signal->getDataPtr(); Uint32 subId = conf->subscriptionId; Uint32 subKey = conf->subscriptionKey; ndbout_c("GREP_CREATE_SUBID_CONF m_extAPI=%p\n", m_extAPI); m_extAPI->eventSubscriptionIdCreated(subId, subKey); } /***************************************************************************** * Signal Receivers *****************************************************************************/ /** * Receive information about completed GCI from GREP/SUMA * * GCI completed, i.e. no more unsent log records exists in SUMA * @todo use node id to identify buffers? */ void ExtNDB::execSUB_GCP_COMPLETE_REP(NdbApiSignal * signal) { SubGcpCompleteRep * const rep = (SubGcpCompleteRep*)signal->getDataPtr(); const Uint32 gci = rep->gci; Uint32 nodeId = refToNode(rep->senderRef); RLOG(("Epoch %d completed at node %d", gci, nodeId)); m_gciContainerPS->setCompleted(gci, nodeId); if(m_firstGCI == gci && !m_dataLogStarted) { sendGREP_SUB_START_CONF(signal, m_firstGCI); m_dataLogStarted = true; } } /** * Send info that scan is competed to SS REP * * @todo Use node id to identify buffers? */ void ExtNDB::sendGREP_SUB_START_CONF(NdbApiSignal * signal, Uint32 gci) { RLOG(("Datalog started (Epoch %d)", gci)); GrepSubStartConf * conf = (GrepSubStartConf *)signal->getDataPtrSend(); conf->firstGCI = gci; conf->subscriptionId = m_subId; conf->subscriptionKey = m_subKey; conf->part = SubscriptionData::TableData; signal->m_noOfSections = 0; signal->set(0, SSREPBLOCKNO, GSN_GREP_SUB_START_CONF, GrepSubStartConf::SignalLength); sendSignalRep(signal); } /** * Scan is completed... says SUMA/GREP * * @todo Use node id to identify buffers? */ void ExtNDB::execGREP_SUB_START_CONF(NdbApiSignal * signal) { GrepSubStartConf * const conf = (GrepSubStartConf *)signal->getDataPtr(); Uint32 part = conf->part; //Uint32 nodeId = refToNode(conf->senderRef); m_firstGCI = conf->firstGCI; if (part == SubscriptionData::TableData) { RLOG(("Datalog started (Epoch %d)", m_firstGCI)); return; } RLOG(("Metalog started (Epoch %d)", m_firstGCI)); signal->set(0, SSREPBLOCKNO, GSN_GREP_SUB_START_CONF, GrepSubStartConf::SignalLength); sendSignalRep(signal); } /** * Receive no of node groups that PS has and pass signal on to SS */ void ExtNDB::execGREP_SUB_CREATE_CONF(NdbApiSignal * signal) { GrepSubCreateConf * conf = (GrepSubCreateConf *)signal->getDataPtrSend(); m_subId = conf->subscriptionId; m_subKey = conf->subscriptionKey; conf->noOfNodeGroups = m_nodeGroupInfo->getNoOfNodeGroups(); sendSignalRep(signal); } /** * Receive conf that subscription has been remove in GREP/SUMA * * Pass signal on to TransPS */ void ExtNDB::execGREP_SUB_REMOVE_CONF(NdbApiSignal * signal) { m_gciContainerPS->reset(); sendSignalRep(signal); } /** * If all PS nodes has disconnected, then remove all epochs * for this subscription. */ void ExtNDB::sendDisconnectRep(Uint32 nodeId) { NdbApiSignal * signal = new NdbApiSignal(m_ownRef); signal->set(0, SSREPBLOCKNO, GSN_REP_DISCONNECT_REP, RepDisconnectRep::SignalLength); RepDisconnectRep * rep = (RepDisconnectRep*) signal->getDataPtrSend(); rep->nodeId = nodeId; rep->subId = m_subId; rep->subKey = m_subKey; sendSignalRep(signal); }