Commit a0475132 authored by unknown's avatar unknown

added class Ndb_cluster_connection to prepare for making order of starting...

added class Ndb_cluster_connection to prepare for making order of starting mysqld and ndbd irrelevant


ndb/src/mgmsrv/Makefile.am:
  changed so that datadir is "." by default so that behaviour is the same as before
parent 9ed92fea
......@@ -16,6 +16,7 @@ ndbapi/NdbError.hpp \
ndbapi/NdbEventOperation.hpp \
ndbapi/NdbIndexOperation.hpp \
ndbapi/NdbOperation.hpp \
ndbapi/ndb_cluster_connection.hpp \
ndbapi/NdbBlob.hpp \
ndbapi/NdbPool.hpp \
ndbapi/NdbRecAttr.hpp \
......
......@@ -860,6 +860,7 @@
#include <ndb_types.h>
#include <ndbapi_limits.h>
#include <ndb_cluster_connection.hpp>
#include <NdbError.hpp>
#include <NdbDictionary.hpp>
......@@ -992,6 +993,8 @@ public:
* deprecated.
*/
Ndb(const char* aCatalogName = "", const char* aSchemaName = "def");
Ndb(Ndb_cluster_connection *ndb_cluster_connection,
const char* aCatalogName = "", const char* aSchemaName = "def");
~Ndb();
......@@ -1081,7 +1084,10 @@ public:
* @return 0: Ndb is ready and timeout has not occurred.<br>
* -1: Timeout has expired
*/
int waitUntilReady(int timeout = 60);
void connected(Uint32 block_reference);
/** @} *********************************************************************/
......@@ -1447,6 +1453,9 @@ public:
****************************************************************************/
private:
void setup(Ndb_cluster_connection *ndb_cluster_connection,
const char* aCatalogName, const char* aSchemaName);
NdbConnection* startTransactionLocal(Uint32 aPrio, Uint32 aFragmentId);
// Connect the connection object to the Database.
......@@ -1585,6 +1594,7 @@ private:
* These are the private variables in this class.
*****************************************************************************/
NdbObjectIdMap* theNdbObjectIdMap;
Ndb_cluster_connection *m_ndb_cluster_connection;
NdbConnection** thePreparedTransactionsArray;
NdbConnection** theSentTransactionsArray;
......@@ -1703,7 +1713,7 @@ private:
static void executeMessage(void*, NdbApiSignal *,
struct LinearSectionPtr ptr[3]);
static void statusMessage(void*, Uint16, bool, bool);
static void statusMessage(void*, Uint32, bool, bool);
#ifdef VM_TRACE
void printState(const char* fmt, ...);
#endif
......
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef CLUSTER_CONNECTION_HPP
#define CLUSTER_CONNECTION_HPP
class TransporterFacade;
class ConfigRetriever;
class Ndb_cluster_connection {
public:
Ndb_cluster_connection(const char * connect_string = 0);
~Ndb_cluster_connection();
int connect();
private:
char *m_connect_string;
TransporterFacade *m_facade;
ConfigRetriever *m_config_retriever;
};
#endif
MYSQLDATAdir = $(localstatedir)
MYSQLSHAREdir = $(pkgdatadir)
MYSQLBASEdir= $(prefix)
MYSQLCLUSTERdir= $(prefix)/mysql-cluster
#MYSQLCLUSTERdir= $(prefix)/mysql-cluster
MYSQLCLUSTERdir= .
ndbbin_PROGRAMS = ndb_mgmd
......
......@@ -629,13 +629,16 @@ MgmtSrvr::start()
if (!check_start())
return false;
}
theFacade = TransporterFacade::start_instance
(_ownNodeId,(ndb_mgm_configuration*)_config->m_configValues);
theFacade= TransporterFacade::theFacadeInstance = new TransporterFacade();
if(theFacade == 0) {
DEBUG("MgmtSrvr.cpp: theFacade is NULL.");
DEBUG("MgmtSrvr.cpp: theFacade == 0.");
return false;
}
if ( theFacade->start_instance
(_ownNodeId, (ndb_mgm_configuration*)_config->m_configValues) < 0) {
DEBUG("MgmtSrvr.cpp: TransporterFacade::start_instance < 0.");
return false;
}
MGM_REQUIRE(_blockNumber == 1);
......@@ -2295,7 +2298,7 @@ MgmtSrvr::signalReceivedNotification(void* mgmtSrvr,
//****************************************************************************
//****************************************************************************
void
MgmtSrvr::nodeStatusNotification(void* mgmSrv, NodeId nodeId,
MgmtSrvr::nodeStatusNotification(void* mgmSrv, Uint32 nodeId,
bool alive, bool nfComplete)
{
if(!(!alive && nfComplete))
......
......@@ -699,7 +699,7 @@ private:
* shall receive the notification.
* @param processId: Id of the dead process.
*/
static void nodeStatusNotification(void* mgmSrv, NodeId nodeId,
static void nodeStatusNotification(void* mgmSrv, Uint32 nodeId,
bool alive, bool nfCompleted);
/**
......
......@@ -34,6 +34,7 @@ libndbapi_la_SOURCES = \
NdbDictionary.cpp \
NdbDictionaryImpl.cpp \
DictCache.cpp \
ndb_cluster_connection.cpp \
NdbBlob.cpp
INCLUDES_LOC = -I$(top_srcdir)/ndb/src/mgmapi
......
......@@ -207,9 +207,11 @@ Remark: Disconnect all connections to the database.
void
Ndb::doDisconnect()
{
DBUG_ENTER("Ndb::doDisconnect");
NdbConnection* tNdbCon;
CHECK_STATUS_MACRO_VOID;
DBUG_PRINT("info", ("theNoOfDBnodes=%d", theNoOfDBnodes));
Uint32 tNoOfDbNodes = theNoOfDBnodes;
UintR i;
for (i = 0; i < tNoOfDbNodes; i++) {
......@@ -227,6 +229,7 @@ Ndb::doDisconnect()
tNdbCon = tNdbCon->theNext;
releaseConnectToNdb(tmpNdbCon);
}//while
DBUG_VOID_RETURN;
}//Ndb::disconnect()
/*****************************************************************************
......@@ -239,6 +242,7 @@ Remark: Waits until a node has status != 0
int
Ndb::waitUntilReady(int timeout)
{
DBUG_ENTER("Ndb::waitUntilReady");
int secondsCounter = 0;
int milliCounter = 0;
int noChecksSinceFirstAliveFound = 0;
......@@ -246,7 +250,7 @@ Ndb::waitUntilReady(int timeout)
if (theInitState != Initialised) {
// Ndb::init is not called
theError.code = 4256;
return -1;
DBUG_RETURN(-1);
}
do {
......@@ -265,13 +269,13 @@ Ndb::waitUntilReady(int timeout)
tp->unlock_mutex();
if (foundAliveNode == theNoOfDBnodes) {
return 0;
DBUG_RETURN(0);
}//if
if (foundAliveNode > 0) {
noChecksSinceFirstAliveFound++;
}//if
if (noChecksSinceFirstAliveFound > 30) {
return 0;
DBUG_RETURN(0);
}//if
NdbSleep_MilliSleep(100);
milliCounter += 100;
......@@ -281,9 +285,9 @@ Ndb::waitUntilReady(int timeout)
}//if
} while ( secondsCounter < timeout );
if (noChecksSinceFirstAliveFound > 0) {
return 0;
DBUG_RETURN(0);
}//if
return -1;
DBUG_RETURN(-1);
}
/*****************************************************************************
......@@ -1060,6 +1064,9 @@ Ndb::StartTransactionNodeSelectionData::init(Uint32 noOfNodes,
* This algorithm should be implemented in Dbdih
*/
{
if (fragment2PrimaryNodeMap != 0)
abort();
fragment2PrimaryNodeMap = new Uint32[noOfFragments];
Uint32 i;
for(i = 0; i<noOfNodes; i++){
......
......@@ -14,19 +14,7 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/*****************************************************************************
Name: NdbConnection.C
Include:
Link:
Author: UABMNST Mona Natterkvist UAB/B/UL
Date: 970829
Version: 0.1
Description: Interface between TIS and NDB
Documentation:
Adjust: 971022 UABMNST First version.
*****************************************************************************/
#include <ndb_global.h>
#include <NdbOut.hpp>
#include <NdbConnection.hpp>
#include <NdbOperation.hpp>
......@@ -104,7 +92,9 @@ Remark: Deletes the connection object.
*****************************************************************************/
NdbConnection::~NdbConnection()
{
DBUG_ENTER("NdbConnection::~NdbConnection");
theNdb->theNdbObjectIdMap->unmap(theId, this);
DBUG_VOID_RETURN;
}//NdbConnection::~NdbConnection()
/*****************************************************************************
......
......@@ -605,6 +605,7 @@ NdbDictionaryImpl::~NdbDictionaryImpl()
}
}
#if 0
void
initDict(NdbDictionary::Dictionary & d)
{
......@@ -624,6 +625,7 @@ NdbDictionaryImpl::setTransporter(class TransporterFacade * tf)
return false;
}
#endif
bool
NdbDictionaryImpl::setTransporter(class Ndb* ndb,
......@@ -643,6 +645,7 @@ NdbDictionaryImpl::getIndexTable(NdbIndexImpl * index,
return getTable(m_ndb.externalizeTableName(internalName));
}
#if 0
bool
NdbDictInterface::setTransporter(class TransporterFacade * tf)
{
......@@ -666,11 +669,11 @@ NdbDictInterface::setTransporter(class TransporterFacade * tf)
return true;
}
#endif
bool
NdbDictInterface::setTransporter(class Ndb* ndb, class TransporterFacade * tf)
{
m_blockNumber = -1;
m_reference = ndb->getReference();
m_transporter = tf;
m_waiter.m_mutex = tf->theMutexPtr;
......@@ -680,10 +683,6 @@ NdbDictInterface::setTransporter(class Ndb* ndb, class TransporterFacade * tf)
NdbDictInterface::~NdbDictInterface()
{
if (m_transporter != NULL){
if (m_blockNumber != -1)
m_transporter->close(m_blockNumber, 0);
}
}
void
......@@ -770,7 +769,7 @@ NdbDictInterface::execSignal(void* dictImpl,
}
void
NdbDictInterface::execNodeStatus(void* dictImpl, NodeId aNode,
NdbDictInterface::execNodeStatus(void* dictImpl, Uint32 aNode,
bool alive, bool nfCompleted)
{
NdbDictInterface * tmp = (NdbDictInterface*)dictImpl;
......
......@@ -240,7 +240,6 @@ public:
NdbDictInterface(NdbError& err) : m_error(err) {
m_reference = 0;
m_masterNodeId = 0;
m_blockNumber = -1;
m_transporter= NULL;
}
~NdbDictInterface();
......@@ -308,7 +307,6 @@ public:
private:
Uint32 m_reference;
Uint32 m_masterNodeId;
int m_blockNumber;
NdbWaiter m_waiter;
class TransporterFacade * m_transporter;
......@@ -318,7 +316,7 @@ private:
class NdbApiSignal* signal,
class LinearSectionPtr ptr[3]);
static void execNodeStatus(void* dictImpl, NodeId,
static void execNodeStatus(void* dictImpl, Uint32,
bool alive, bool nfCompleted);
void execGET_TABINFO_REF(NdbApiSignal *, LinearSectionPtr ptr[3]);
......
......@@ -14,6 +14,7 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <ndb_global.h>
#include "NdbImpl.hpp"
#include <NdbReceiver.hpp>
#include "NdbDictionaryImpl.hpp"
......@@ -35,10 +36,12 @@ NdbReceiver::NdbReceiver(Ndb *aNdb) :
NdbReceiver::~NdbReceiver()
{
DBUG_ENTER("NdbReceiver::~NdbReceiver");
if (m_id != NdbObjectIdMap::InvalidId) {
m_ndb->theNdbObjectIdMap->unmap(m_id, this);
}
delete[] m_rows;
DBUG_VOID_RETURN;
}
void
......
......@@ -15,6 +15,8 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <ndb_global.h>
#include "NdbApiSignal.hpp"
#include "NdbImpl.hpp"
#include "NdbOperation.hpp"
......@@ -53,6 +55,8 @@
int
Ndb::init(int aMaxNoOfTransactions)
{
DBUG_ENTER("Ndb::init");
int i;
int aNrOfCon;
int aNrOfOp;
......@@ -67,7 +71,7 @@ Ndb::init(int aMaxNoOfTransactions)
theError.code = 4104;
break;
}
return -1;
DBUG_RETURN(-1);
}//if
theInitState = StartingInit;
TransporterFacade * theFacade = TransporterFacade::instance();
......@@ -75,37 +79,17 @@ Ndb::init(int aMaxNoOfTransactions)
const int tBlockNo = theFacade->open(this,
executeMessage,
statusMessage);
statusMessage);
if ( tBlockNo == -1 ) {
theError.code = 4105;
theFacade->unlock_mutex();
return -1; // no more free blocknumbers
DBUG_RETURN(-1); // no more free blocknumbers
}//if
theNdbBlockNumber = tBlockNo;
theNode = theFacade->ownId();
theMyRef = numberToRef(theNdbBlockNumber, theNode);
for (i = 1; i < MAX_NDB_NODES; i++){
if (theFacade->getIsDbNode(i)){
theDBnodes[theNoOfDBnodes] = i;
theNoOfDBnodes++;
}
}
theFirstTransId = ((Uint64)theNdbBlockNumber << 52)+((Uint64)theNode << 40);
theFirstTransId += theFacade->m_max_trans_id;
theFacade->unlock_mutex();
theDictionary = new NdbDictionaryImpl(*this);
if (theDictionary == NULL) {
theError.code = 4000;
return -1;
}
theDictionary->setTransporter(this, theFacade);
aNrOfCon = theNoOfDBnodes;
......@@ -144,9 +128,6 @@ Ndb::init(int aMaxNoOfTransactions)
theSentTransactionsArray[i] = NULL;
theCompletedTransactionsArray[i] = NULL;
}//for
startTransactionNodeSelectionData.init(theNoOfDBnodes, theDBnodes);
for (i = 0; i < 16; i++){
tSignal[i] = getSignal();
if(tSignal[i] == NULL) {
......@@ -156,11 +137,8 @@ Ndb::init(int aMaxNoOfTransactions)
}
for (i = 0; i < 16; i++)
releaseSignal(tSignal[i]);
theInitState = Initialised;
theCommitAckSignal = new NdbApiSignal(theMyRef);
return 0;
DBUG_RETURN(0);
error_handler:
ndbout << "error_handler" << endl;
......@@ -176,12 +154,13 @@ Ndb::init(int aMaxNoOfTransactions)
delete theDictionary;
TransporterFacade::instance()->close(theNdbBlockNumber, 0);
return -1;
DBUG_RETURN(-1);
}
void
Ndb::releaseTransactionArrays()
{
DBUG_ENTER("Ndb::releaseTransactionArrays");
if (thePreparedTransactionsArray != NULL) {
delete [] thePreparedTransactionsArray;
}//if
......@@ -191,6 +170,7 @@ Ndb::releaseTransactionArrays()
if (theCompletedTransactionsArray != NULL) {
delete [] theCompletedTransactionsArray;
}//if
DBUG_VOID_RETURN;
}//Ndb::releaseTransactionArrays()
void
......@@ -202,13 +182,46 @@ Ndb::executeMessage(void* NdbObject,
tNdb->handleReceivedSignal(aSignal, ptr);
}
void Ndb::connected(Uint32 ref)
{
theMyRef= ref;
theNode= refToNode(theMyRef);
if (theNdbBlockNumber >= 0)
assert(theMyRef == numberToRef(theNdbBlockNumber, theNode));
TransporterFacade * theFacade = TransporterFacade::instance();
int i;
theNoOfDBnodes= 0;
for (i = 1; i < MAX_NDB_NODES; i++){
if (theFacade->getIsDbNode(i)){
theDBnodes[theNoOfDBnodes] = i;
theNoOfDBnodes++;
}
}
theFirstTransId = ((Uint64)theNdbBlockNumber << 52)+
((Uint64)theNode << 40);
theFirstTransId += theFacade->m_max_trans_id;
// assert(0);
DBUG_PRINT("info",("connected with ref=%x, id=%d, no_db_nodes=%d, first_trans_id=%d",
theMyRef,
theNode,
theNoOfDBnodes,
theFirstTransId));
startTransactionNodeSelectionData.init(theNoOfDBnodes, theDBnodes);
theCommitAckSignal = new NdbApiSignal(theMyRef);
theDictionary->m_receiver.m_reference= theMyRef;
}
void
Ndb::statusMessage(void* NdbObject, NodeId a_node, bool alive, bool nfComplete)
Ndb::statusMessage(void* NdbObject, Uint32 a_node, bool alive, bool nfComplete)
{
DBUG_ENTER("Ndb::statusMessage");
Ndb* tNdb = (Ndb*)NdbObject;
if (alive) {
if (nfComplete) {
assert(0);
tNdb->connected(a_node);
DBUG_VOID_RETURN;
}//if
} else {
if (nfComplete) {
......@@ -219,6 +232,7 @@ Ndb::statusMessage(void* NdbObject, NodeId a_node, bool alive, bool nfComplete)
}//if
NdbDictInterface::execNodeStatus(&tNdb->theDictionary->m_receiver,
a_node, alive, nfComplete);
DBUG_VOID_RETURN;
}
void
......
......@@ -42,6 +42,7 @@ void NdbGlobalEventBuffer_drop(NdbGlobalEventBufferHandle *);
static int theNoOfNdbObjects = 0;
static char *ndbConnectString = 0;
static Ndb_cluster_connection *global_ndb_cluster_connection= 0;
#if defined NDB_WIN32 || defined SCO
static NdbMutex & createNdbMutex = * NdbMutex_Create();
......@@ -56,45 +57,74 @@ Ndb(const char* aDataBase);
Parameters: aDataBase : Name of the database.
Remark: Connect to the database.
***************************************************************************/
Ndb::Ndb( const char* aDataBase , const char* aSchema) :
theNdbObjectIdMap(0),
thePreparedTransactionsArray(NULL),
theSentTransactionsArray(NULL),
theCompletedTransactionsArray(NULL),
theNoOfPreparedTransactions(0),
theNoOfSentTransactions(0),
theNoOfCompletedTransactions(0),
theNoOfAllocatedTransactions(0),
theMaxNoOfTransactions(0),
theMinNoOfEventsToWakeUp(0),
prefixEnd(NULL),
theImpl(NULL),
theDictionary(NULL),
theConIdleList(NULL),
theOpIdleList(NULL),
theScanOpIdleList(NULL),
theIndexOpIdleList(NULL),
// theSchemaConIdleList(NULL),
// theSchemaConToNdbList(NULL),
theTransactionList(NULL),
theConnectionArray(NULL),
theRecAttrIdleList(NULL),
theSignalIdleList(NULL),
theLabelList(NULL),
theBranchList(NULL),
theSubroutineList(NULL),
theCallList(NULL),
theScanList(NULL),
theNdbBlobIdleList(NULL),
theNoOfDBnodes(0),
theDBnodes(NULL),
the_release_ind(NULL),
the_last_check_time(0),
theFirstTransId(0),
theRestartGCI(0),
theNdbBlockNumber(-1),
theInitState(NotConstructed)
Ndb::Ndb( const char* aDataBase , const char* aSchema) {
if (global_ndb_cluster_connection == 0) {
if (theNoOfNdbObjects > 0)
abort(); // old and new Ndb constructor used mixed
global_ndb_cluster_connection= new Ndb_cluster_connection(ndbConnectString);
global_ndb_cluster_connection->connect();
}
setup(global_ndb_cluster_connection, aDataBase, aSchema);
}
Ndb::Ndb( Ndb_cluster_connection *ndb_cluster_connection,
const char* aDataBase , const char* aSchema)
{
if (global_ndb_cluster_connection != 0 &&
global_ndb_cluster_connection != ndb_cluster_connection)
abort(); // old and new Ndb constructor used mixed
setup(ndb_cluster_connection, aDataBase, aSchema);
}
void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection,
const char* aDataBase , const char* aSchema)
{
DBUG_ENTER("Ndb::setup");
theNdbObjectIdMap= 0;
m_ndb_cluster_connection= ndb_cluster_connection;
thePreparedTransactionsArray= NULL;
theSentTransactionsArray= NULL;
theCompletedTransactionsArray= NULL;
theNoOfPreparedTransactions= 0;
theNoOfSentTransactions= 0;
theNoOfCompletedTransactions= 0;
theNoOfAllocatedTransactions= 0;
theMaxNoOfTransactions= 0;
theMinNoOfEventsToWakeUp= 0;
prefixEnd= NULL;
theImpl= NULL;
theDictionary= NULL;
theConIdleList= NULL;
theOpIdleList= NULL;
theScanOpIdleList= NULL;
theIndexOpIdleList= NULL;
// theSchemaConIdleList= NULL;
// theSchemaConToNdbList= NULL;
theTransactionList= NULL;
theConnectionArray= NULL;
theRecAttrIdleList= NULL;
theSignalIdleList= NULL;
theLabelList= NULL;
theBranchList= NULL;
theSubroutineList= NULL;
theCallList= NULL;
theScanList= NULL;
theNdbBlobIdleList= NULL;
theNoOfDBnodes= 0;
theDBnodes= NULL;
the_release_ind= NULL;
the_last_check_time= 0;
theFirstTransId= 0;
theRestartGCI= 0;
theNdbBlockNumber= -1;
theInitState= NotConstructed;
theNode= 0;
theFirstTransId= 0;
theMyRef= 0;
theNoOfDBnodes= 0;
fullyQualifiedNames = true;
cgetSignals =0;
......@@ -135,18 +165,8 @@ Ndb::Ndb( const char* aDataBase , const char* aSchema) :
NdbMutex_Lock(&createNdbMutex);
TransporterFacade * m_facade = 0;
if(theNoOfNdbObjects == 0){
if ((m_facade = TransporterFacade::start_instance(ndbConnectString)) == 0)
theInitState = InitConfigError;
} else {
m_facade = TransporterFacade::instance();
}
if(m_facade != 0){
theWaiter.m_mutex = m_facade->theMutexPtr;
}
theWaiter.m_mutex = TransporterFacade::instance()->theMutexPtr;
// For keeping track of how many Ndb objects that exists.
theNoOfNdbObjects += 1;
......@@ -167,6 +187,13 @@ Ndb::Ndb( const char* aDataBase , const char* aSchema) :
}
NdbMutex_Unlock(&createNdbMutex);
theDictionary = new NdbDictionaryImpl(*this);
if (theDictionary == NULL) {
ndbout_c("Ndb cailed to allocate dictionary");
exit(-1);
}
DBUG_VOID_RETURN;
}
......@@ -187,6 +214,7 @@ void Ndb::setConnectString(const char * connectString)
*****************************************************************************/
Ndb::~Ndb()
{
DBUG_ENTER("Ndb::~Ndb()");
doDisconnect();
delete theDictionary;
......@@ -203,6 +231,10 @@ Ndb::~Ndb()
theNoOfNdbObjects -= 1;
if(theNoOfNdbObjects == 0){
TransporterFacade::stop_instance();
if (global_ndb_cluster_connection != 0) {
delete global_ndb_cluster_connection;
global_ndb_cluster_connection= 0;
}
}//if
NdbMutex_Unlock(&createNdbMutex);
......@@ -271,6 +303,7 @@ Ndb::~Ndb()
assert(cnewSignals == cfreeSignals);
assert(cgetSignals == creleaseSignals);
#endif
DBUG_VOID_RETURN;
}
NdbWaiter::NdbWaiter(){
......
......@@ -783,6 +783,7 @@ Remark: Release and disconnect from DBTC a connection and seize it to th
void
Ndb::releaseConnectToNdb(NdbConnection* a_con)
{
DBUG_ENTER("Ndb::releaseConnectToNdb");
NdbApiSignal tSignal(theMyRef);
int tConPtr;
......@@ -790,7 +791,7 @@ Ndb::releaseConnectToNdb(NdbConnection* a_con)
// manage to reach NDB or not.
if (a_con == NULL)
return;
DBUG_VOID_RETURN;
Uint32 node_id = a_con->getConnectedNodeId();
Uint32 conn_seq = a_con->theNodeSequence;
......@@ -821,6 +822,6 @@ Ndb::releaseConnectToNdb(NdbConnection* a_con)
abort();
}//if
releaseNdbCon(a_con);
return;
DBUG_VOID_RETURN;
}
......@@ -37,6 +37,16 @@
//#define REPORT_TRANSPORTER
//#define API_TRACE;
static int numberToIndex(int number)
{
return number - MIN_API_BLOCK_NO;
}
static int indexToNumber(int index)
{
return index + MIN_API_BLOCK_NO;
}
#if defined DEBUG_TRANSPORTER
#define TRP_DEBUG(t) ndbout << __FILE__ << ":" << __LINE__ << ":" << t << endl;
#else
......@@ -44,8 +54,6 @@
#endif
TransporterFacade* TransporterFacade::theFacadeInstance = NULL;
ConfigRetriever *TransporterFacade::s_config_retriever= 0;
/*****************************************************************************
* Call back functions
......@@ -324,7 +332,9 @@ copy(Uint32 * & insertPtr,
extern "C"
void
atexit_stop_instance(){
DBUG_ENTER("atexit_stop_instance");
TransporterFacade::stop_instance();
DBUG_VOID_RETURN;
}
/**
......@@ -334,57 +344,12 @@ atexit_stop_instance(){
* Which is protected by a mutex
*/
TransporterFacade*
TransporterFacade::start_instance(const char * connectString){
// TransporterFacade used from API get config from mgmt srvr
s_config_retriever= new ConfigRetriever(NDB_VERSION, NODE_TYPE_API);
s_config_retriever->setConnectString(connectString);
const char* error = 0;
do {
if(s_config_retriever->init() == -1)
break;
if(s_config_retriever->do_connect() == -1)
break;
Uint32 nodeId = s_config_retriever->allocNodeId();
for(Uint32 i = 0; nodeId == 0 && i<5; i++){
NdbSleep_SecSleep(3);
nodeId = s_config_retriever->allocNodeId();
}
if(nodeId == 0)
break;
ndb_mgm_configuration * props = s_config_retriever->getConfig();
if(props == 0)
break;
TransporterFacade * tf = start_instance(nodeId, props);
free(props);
return tf;
} while(0);
ndbout << "Configuration error: ";
const char* erString = s_config_retriever->getErrorString();
if (erString == 0) {
erString = "No error specified!";
}
ndbout << erString << endl;
return 0;
}
TransporterFacade*
int
TransporterFacade::start_instance(int nodeId,
const ndb_mgm_configuration* props)
{
TransporterFacade* tf = new TransporterFacade();
if (! tf->init(nodeId, props)) {
delete tf;
return NULL;
if (! theFacadeInstance->init(nodeId, props)) {
return -1;
}
/**
......@@ -402,19 +367,7 @@ TransporterFacade::start_instance(int nodeId,
signal(SIGPIPE, SIG_IGN);
#endif
if(theFacadeInstance == NULL){
theFacadeInstance = tf;
}
return tf;
}
void
TransporterFacade::close_configuration(){
if (s_config_retriever) {
delete s_config_retriever;
s_config_retriever= 0;
}
return 0;
}
/**
......@@ -425,23 +378,21 @@ TransporterFacade::close_configuration(){
*/
void
TransporterFacade::stop_instance(){
close_configuration();
DBUG_ENTER("TransporterFacade::stop_instance");
if(theFacadeInstance == NULL){
/**
* We are called from atexit function
*/
return;
DBUG_VOID_RETURN;
}
theFacadeInstance->doStop();
delete theFacadeInstance; theFacadeInstance = NULL;
DBUG_VOID_RETURN;
}
void
TransporterFacade::doStop(){
DBUG_ENTER("TransporterFacade::doStop");
/**
* First stop the ClusterMgr because it needs to send one more signal
* and also uses theFacadeInstance to lock/unlock theMutexPtr
......@@ -458,6 +409,7 @@ TransporterFacade::doStop(){
NdbThread_WaitFor(theSendThread, &status);
NdbThread_Destroy(&theReceiveThread);
NdbThread_Destroy(&theSendThread);
DBUG_VOID_RETURN;
}
extern "C"
......@@ -540,6 +492,8 @@ TransporterFacade::TransporterFacade() :
theSendThread(NULL),
theReceiveThread(NULL)
{
theOwnId = 0;
theMutexPtr = NdbMutex_Create();
sendPerformedLastInterval = 0;
......@@ -549,6 +503,8 @@ TransporterFacade::TransporterFacade() :
theArbitMgr = NULL;
theStartNodeId = 1;
m_max_trans_id = 0;
theClusterMgr = new ClusterMgr(* this);
}
bool
......@@ -567,7 +523,6 @@ TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props)
ndb_mgm_configuration_iterator iter(* props, CFG_SECTION_NODE);
iter.first();
theClusterMgr = new ClusterMgr(* this);
theClusterMgr->init(iter);
/**
......@@ -608,7 +563,6 @@ TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props)
32768,
"ndb_send",
NDB_THREAD_PRIO_LOW);
theClusterMgr->startThread();
#ifdef API_TRACE
......@@ -619,6 +573,21 @@ TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props)
}
void
TransporterFacade::connected()
{
DBUG_ENTER("TransporterFacade::connected");
Uint32 sz = m_threads.m_statusNext.size();
for (Uint32 i = 0; i < sz ; i ++) {
if (m_threads.getInUse(i)){
void * obj = m_threads.m_objectExecute[i].m_object;
NodeStatusFunction RegPC = m_threads.m_statusFunction[i];
(*RegPC) (obj, numberToRef(indexToNumber(i), theOwnId), true, true);
}
}
DBUG_VOID_RETURN;
}
void
TransporterFacade::ReportNodeDead(NodeId tNodeId)
{
......@@ -705,7 +674,16 @@ TransporterFacade::open(void* objRef,
ExecuteFunction fun,
NodeStatusFunction statusFun)
{
return m_threads.open(objRef, fun, statusFun);
DBUG_ENTER("TransporterFacade::open");
int r= m_threads.open(objRef, fun, statusFun);
if (r < 0)
DBUG_RETURN(r);
#if 1
if (theOwnId > 0) {
(*statusFun)(objRef, numberToRef(r, theOwnId), true, true);
}
#endif
DBUG_RETURN(r);
}
TransporterFacade::~TransporterFacade(){
......@@ -748,7 +726,7 @@ TransporterFacade::calculateSendLimit()
//-------------------------------------------------
void TransporterFacade::forceSend(Uint32 block_number) {
checkCounter--;
m_threads.m_statusNext[block_number - MIN_API_BLOCK_NO] = ThreadData::ACTIVE;
m_threads.m_statusNext[numberToIndex(block_number)] = ThreadData::ACTIVE;
sendPerformedLastInterval = 1;
if (checkCounter < 0) {
calculateSendLimit();
......@@ -761,7 +739,7 @@ void TransporterFacade::forceSend(Uint32 block_number) {
//-------------------------------------------------
void
TransporterFacade::checkForceSend(Uint32 block_number) {
m_threads.m_statusNext[block_number - MIN_API_BLOCK_NO] = ThreadData::ACTIVE;
m_threads.m_statusNext[numberToIndex(block_number)] = ThreadData::ACTIVE;
//-------------------------------------------------
// This code is an adaptive algorithm to discover when
// the API should actually send its buffers. The reason
......@@ -1002,11 +980,12 @@ TransporterFacade::ThreadData::expand(Uint32 size){
m_firstFree = m_statusNext.size() - size;
}
int
TransporterFacade::ThreadData::open(void* objRef,
ExecuteFunction fun,
NodeStatusFunction fun2){
ExecuteFunction fun,
NodeStatusFunction fun2)
{
Uint32 nextFree = m_firstFree;
if(m_statusNext.size() >= MAX_NO_THREADS && nextFree == END_OF_LIST){
......@@ -1026,12 +1005,12 @@ TransporterFacade::ThreadData::open(void* objRef,
m_objectExecute[nextFree] = oe;
m_statusFunction[nextFree] = fun2;
return nextFree + MIN_API_BLOCK_NO;
return indexToNumber(nextFree);
}
int
TransporterFacade::ThreadData::close(int number){
number -= MIN_API_BLOCK_NO;
number= numberToIndex(number);
assert(getInUse(number));
m_statusNext[number] = m_firstFree;
m_firstFree = number;
......
......@@ -35,7 +35,7 @@ class Ndb;
class NdbApiSignal;
typedef void (* ExecuteFunction)(void *, NdbApiSignal *, LinearSectionPtr ptr[3]);
typedef void (* NodeStatusFunction)(void *, NodeId, bool nodeAlive, bool nfComplete);
typedef void (* NodeStatusFunction)(void *, Uint32, bool nodeAlive, bool nfComplete);
extern "C" {
void* runSendRequest_C(void*);
......@@ -55,9 +55,7 @@ public:
bool init(Uint32, const ndb_mgm_configuration *);
static TransporterFacade* instance();
static TransporterFacade* start_instance(int, const ndb_mgm_configuration*);
static TransporterFacade* start_instance(const char *connectString);
static void close_configuration();
int start_instance(int, const ndb_mgm_configuration*);
static void stop_instance();
/**
......@@ -93,6 +91,8 @@ public:
// My own processor id
NodeId ownId() const;
void connected();
void doConnect(int NodeId);
void reportConnected(int NodeId);
void doDisconnect(int NodeId);
......@@ -125,6 +125,7 @@ private:
friend class ExtSender; ///< @todo Hack to be able to sendSignalUnCond
friend class GrepSS;
friend class Ndb;
friend class Ndb_cluster_connection;
int sendSignalUnCond(NdbApiSignal *, NodeId nodeId);
......
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <ndb_global.h>
#include <pthread.h>
#include <ndb_cluster_connection.hpp>
#include <TransporterFacade.hpp>
#include <NdbOut.hpp>
#include <NdbSleep.h>
#include <ndb_limits.h>
#include <ConfigRetriever.hpp>
#include <ndb_version.h>
Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string)
{
m_facade= TransporterFacade::theFacadeInstance= new TransporterFacade();
if (connect_string)
m_connect_string= strdup(connect_string);
else
m_connect_string= 0;
m_config_retriever= 0;
}
int Ndb_cluster_connection::connect()
{
DBUG_ENTER("Ndb_cluster_connection::connect");
if (m_config_retriever != 0) {
DBUG_RETURN(0);
}
m_config_retriever= new ConfigRetriever(NDB_VERSION, NODE_TYPE_API);
m_config_retriever->setConnectString(m_connect_string);
const char* error = 0;
do {
if(m_config_retriever->init() == -1)
break;
if(m_config_retriever->do_connect() == -1)
break;
Uint32 nodeId = m_config_retriever->allocNodeId();
for(Uint32 i = 0; nodeId == 0 && i<5; i++){
NdbSleep_SecSleep(3);
nodeId = m_config_retriever->allocNodeId();
}
if(nodeId == 0)
break;
ndb_mgm_configuration * props = m_config_retriever->getConfig();
if(props == 0)
break;
m_facade->start_instance(nodeId, props);
free(props);
m_facade->connected();
DBUG_RETURN(0);
} while(0);
ndbout << "Configuration error: ";
const char* erString = m_config_retriever->getErrorString();
if (erString == 0) {
erString = "No error specified!";
}
ndbout << erString << endl;
DBUG_RETURN(-1);
}
Ndb_cluster_connection::~Ndb_cluster_connection()
{
if (m_facade != 0) {
delete m_facade;
if (m_facade != TransporterFacade::theFacadeInstance)
abort();
TransporterFacade::theFacadeInstance= 0;
}
if (m_connect_string)
free(m_connect_string);
if (m_config_retriever)
delete m_config_retriever;
}
......@@ -22,12 +22,13 @@
*/
#include <ndb_global.h>
#include <my_sys.h>
#include <getarg.h>
#include <NdbApi.hpp>
#include <NDBT.hpp>
static Ndb_cluster_connection *ndb_cluster_connection= 0;
static Ndb* ndb = 0;
static NdbDictionary::Dictionary* dic = 0;
static int _unqualified = 0;
......@@ -48,6 +49,22 @@ fatal(char const* fmt, ...)
exit(1);
}
static void
fatal_dict(char const* fmt, ...)
{
va_list ap;
char buf[500];
va_start(ap, fmt);
vsnprintf(buf, sizeof(buf), fmt, ap);
va_end(ap);
ndbout << buf;
if (dic)
ndbout << " - " << dic->getNdbError();
ndbout << endl;
NDBT_ProgramExit(NDBT_FAILED);
exit(1);
}
static void
list(const char * tabname,
NdbDictionary::Object::Type type)
......@@ -55,10 +72,10 @@ list(const char * tabname,
NdbDictionary::Dictionary::List list;
if (tabname == 0) {
if (dic->listObjects(list, type) == -1)
fatal("listObjects");
fatal_dict("listObjects");
} else {
if (dic->listIndexes(list, tabname) == -1)
fatal("listIndexes");
fatal_dict("listIndexes");
}
if (ndb->usingFullyQualifiedNames())
ndbout_c("%-5s %-20s %-8s %-7s %-12s %-8s %s", "id", "type", "state", "logging", "database", "schema", "name");
......@@ -145,12 +162,17 @@ list(const char * tabname,
}
}
#ifndef DBUG_OFF
const char *debug_option= 0;
#endif
int main(int argc, const char** argv){
int _loops = 1;
const char* _tabname = NULL;
const char* _dbname = "TEST_DB";
int _type = 0;
int _help = 0;
const char* _connect_str = NULL;
struct getargs args[] = {
{ "loops", 'l', arg_integer, &_loops, "loops",
......@@ -161,6 +183,13 @@ int main(int argc, const char** argv){
"Name of database table is in"},
{ "type", 't', arg_integer, &_type, "type",
"Type of objects to show, see NdbDictionary.hpp for numbers(default = 0)" },
{ "connect-string", 'c', arg_string, &_connect_str,
"Set connect string for connecting to ndb_mgmd. <constr>=\"host=<hostname:port>[;nodeid=<id>]\". Overides specifying entries in NDB_CONNECTSTRING and config file",
"<constr>" },
#ifndef DBUG_OFF
{ "debug", 0, arg_string, &debug_option,
"Specify debug options e.g. d:t:i:o,out.trace", "options" },
#endif
{ "usage", '?', arg_flag, &_help, "Print help", "" }
};
int num_args = sizeof(args) / sizeof(args[0]);
......@@ -179,10 +208,18 @@ int main(int argc, const char** argv){
}
_tabname = argv[optind];
ndb = new Ndb(_dbname);
#ifndef DBUG_OFF
my_init();
if (debug_option)
DBUG_PUSH(debug_option);
#endif
ndb_cluster_connection = new Ndb_cluster_connection(_connect_str);
ndb = new Ndb(ndb_cluster_connection, _dbname);
ndb->useFullyQualifiedNames(!_unqualified);
if (ndb->init() != 0)
fatal("init");
ndb_cluster_connection->connect();
if (ndb->waitUntilReady(30) < 0)
fatal("waitUntilReady");
dic = ndb->getDictionary();
......
......@@ -16,6 +16,7 @@
#include <ndb_global.h>
#include <my_sys.h>
#include <NdbOut.hpp>
......@@ -26,6 +27,9 @@
#include <getarg.h>
#include <NdbScanFilter.hpp>
#ifndef DBUG_OFF
const char *debug_option= 0;
#endif
int scanReadRecords(Ndb*,
const NdbDictionary::Table*,
......@@ -58,6 +62,10 @@ int main(int argc, const char** argv){
"Output numbers in hexadecimal format", "useHexFormat" },
{ "delimiter", 'd', arg_string, &_delimiter, "Column delimiter",
"delimiter" },
#ifndef DBUG_OFF
{ "debug", 0, arg_string, &debug_option,
"Specify debug options e.g. d:t:i:o,out.trace", "options" },
#endif
{ "usage", '?', arg_flag, &_help, "Print help", "" },
{ "lock", 'l', arg_integer, &_lock,
"Read(0), Read-hold(1), Exclusive(2)", "lock"},
......@@ -80,6 +88,12 @@ int main(int argc, const char** argv){
}
_tabname = argv[optind];
#ifndef DBUG_OFF
my_init();
if (debug_option)
DBUG_PUSH(debug_option);
#endif
// Connect to Ndb
Ndb MyNdb(_dbname);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment