Commit c69eb876 authored by unknown's avatar unknown

Merge


ndb/src/ndbapi/NdbDictionaryImpl.hpp:
  Auto merged
ndb/src/ndbapi/NdbReceiver.cpp:
  Auto merged
ndb/src/ndbapi/TransporterFacade.cpp:
  Auto merged
ndb/src/ndbapi/TransporterFacade.hpp:
  Auto merged
ndb/src/ndbapi/NdbDictionaryImpl.cpp:
  SCCS merged
parents f856038c b9b4338f
...@@ -16,6 +16,7 @@ ndbapi/NdbError.hpp \ ...@@ -16,6 +16,7 @@ ndbapi/NdbError.hpp \
ndbapi/NdbEventOperation.hpp \ ndbapi/NdbEventOperation.hpp \
ndbapi/NdbIndexOperation.hpp \ ndbapi/NdbIndexOperation.hpp \
ndbapi/NdbOperation.hpp \ ndbapi/NdbOperation.hpp \
ndbapi/ndb_cluster_connection.hpp \
ndbapi/NdbBlob.hpp \ ndbapi/NdbBlob.hpp \
ndbapi/NdbPool.hpp \ ndbapi/NdbPool.hpp \
ndbapi/NdbRecAttr.hpp \ ndbapi/NdbRecAttr.hpp \
......
...@@ -37,7 +37,7 @@ public: ...@@ -37,7 +37,7 @@ public:
*/ */
int init(); int init();
int do_connect(); int do_connect(int exit_on_connect_failure= false);
/** /**
* Get configuration for current (nodeId given in local config file) node. * Get configuration for current (nodeId given in local config file) node.
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
extern "C" { extern "C" {
#endif #endif
const char* NdbConfig_get_path(int *len);
void NdbConfig_SetPath(const char *path); void NdbConfig_SetPath(const char *path);
char* NdbConfig_NdbCfgName(int with_ndb_home); char* NdbConfig_NdbCfgName(int with_ndb_home);
char* NdbConfig_ErrorFileName(int node_id); char* NdbConfig_ErrorFileName(int node_id);
......
...@@ -860,6 +860,7 @@ ...@@ -860,6 +860,7 @@
#include <ndb_types.h> #include <ndb_types.h>
#include <ndbapi_limits.h> #include <ndbapi_limits.h>
#include <ndb_cluster_connection.hpp>
#include <NdbError.hpp> #include <NdbError.hpp>
#include <NdbDictionary.hpp> #include <NdbDictionary.hpp>
...@@ -992,6 +993,8 @@ public: ...@@ -992,6 +993,8 @@ public:
* deprecated. * deprecated.
*/ */
Ndb(const char* aCatalogName = "", const char* aSchemaName = "def"); Ndb(const char* aCatalogName = "", const char* aSchemaName = "def");
Ndb(Ndb_cluster_connection *ndb_cluster_connection,
const char* aCatalogName = "", const char* aSchemaName = "def");
~Ndb(); ~Ndb();
...@@ -1081,7 +1084,10 @@ public: ...@@ -1081,7 +1084,10 @@ public:
* @return 0: Ndb is ready and timeout has not occurred.<br> * @return 0: Ndb is ready and timeout has not occurred.<br>
* -1: Timeout has expired * -1: Timeout has expired
*/ */
int waitUntilReady(int timeout = 60); int waitUntilReady(int timeout = 60);
void connected(Uint32 block_reference);
/** @} *********************************************************************/ /** @} *********************************************************************/
...@@ -1447,6 +1453,9 @@ public: ...@@ -1447,6 +1453,9 @@ public:
****************************************************************************/ ****************************************************************************/
private: private:
void setup(Ndb_cluster_connection *ndb_cluster_connection,
const char* aCatalogName, const char* aSchemaName);
NdbConnection* startTransactionLocal(Uint32 aPrio, Uint32 aFragmentId); NdbConnection* startTransactionLocal(Uint32 aPrio, Uint32 aFragmentId);
// Connect the connection object to the Database. // Connect the connection object to the Database.
...@@ -1585,6 +1594,7 @@ private: ...@@ -1585,6 +1594,7 @@ private:
* These are the private variables in this class. * These are the private variables in this class.
*****************************************************************************/ *****************************************************************************/
NdbObjectIdMap* theNdbObjectIdMap; NdbObjectIdMap* theNdbObjectIdMap;
Ndb_cluster_connection *m_ndb_cluster_connection;
NdbConnection** thePreparedTransactionsArray; NdbConnection** thePreparedTransactionsArray;
NdbConnection** theSentTransactionsArray; NdbConnection** theSentTransactionsArray;
...@@ -1703,7 +1713,7 @@ private: ...@@ -1703,7 +1713,7 @@ private:
static void executeMessage(void*, NdbApiSignal *, static void executeMessage(void*, NdbApiSignal *,
struct LinearSectionPtr ptr[3]); struct LinearSectionPtr ptr[3]);
static void statusMessage(void*, Uint16, bool, bool); static void statusMessage(void*, Uint32, bool, bool);
#ifdef VM_TRACE #ifdef VM_TRACE
void printState(const char* fmt, ...); void printState(const char* fmt, ...);
#endif #endif
......
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef CLUSTER_CONNECTION_HPP
#define CLUSTER_CONNECTION_HPP
class TransporterFacade;
class ConfigRetriever;
class NdbThread;
extern "C" {
void* run_ndb_cluster_connection_connect_thread(void*);
}
class Ndb_cluster_connection {
public:
Ndb_cluster_connection(const char * connect_string = 0);
~Ndb_cluster_connection();
int connect(int reconnect= 0);
int start_connect_thread(int (*connect_callback)(void)= 0);
private:
friend void* run_ndb_cluster_connection_connect_thread(void*);
void connect_thread();
char *m_connect_string;
TransporterFacade *m_facade;
ConfigRetriever *m_config_retriever;
NdbThread *m_connect_thread;
int (*m_connect_callback)(void);
};
#endif
...@@ -78,7 +78,7 @@ ConfigRetriever::init() { ...@@ -78,7 +78,7 @@ ConfigRetriever::init() {
} }
int int
ConfigRetriever::do_connect(){ ConfigRetriever::do_connect(int exit_on_connect_failure){
if(!m_handle) if(!m_handle)
m_handle= ndb_mgm_create_handle(); m_handle= ndb_mgm_create_handle();
...@@ -102,6 +102,8 @@ ConfigRetriever::do_connect(){ ...@@ -102,6 +102,8 @@ ConfigRetriever::do_connect(){
if (ndb_mgm_connect(m_handle, tmp.c_str()) == 0) { if (ndb_mgm_connect(m_handle, tmp.c_str()) == 0) {
return 0; return 0;
} }
if (exit_on_connect_failure)
return 1;
setError(CR_RETRY, ndb_mgm_get_latest_error_desc(m_handle)); setError(CR_RETRY, ndb_mgm_get_latest_error_desc(m_handle));
case MgmId_File: case MgmId_File:
break; break;
......
...@@ -21,27 +21,34 @@ ...@@ -21,27 +21,34 @@
static char *datadir_path= 0; static char *datadir_path= 0;
static char* const char *
NdbConfig_AllocHomePath(int _len) NdbConfig_get_path(int *_len)
{ {
const char *path= NdbEnv_GetEnv("NDB_HOME", 0, 0); const char *path= NdbEnv_GetEnv("NDB_HOME", 0, 0);
int len= _len;
int path_len= 0; int path_len= 0;
char *buf;
if (path == 0)
path= datadir_path;
if (path) if (path)
path_len= strlen(path); path_len= strlen(path);
if (path_len == 0 && datadir_path) {
path= datadir_path;
path_len= strlen(path);
}
if (path_len == 0) {
path= ".";
path_len= strlen(path);
}
if (_len)
*_len= path_len;
return path;
}
len+= path_len; static char*
buf= NdbMem_Allocate(len); NdbConfig_AllocHomePath(int _len)
if (path_len > 0) {
snprintf(buf, len, "%s%s", path, DIR_SEPARATOR); int path_len;
else const char *path= NdbConfig_get_path(&path_len);
buf[0]= 0; int len= _len+path_len;
char *buf= NdbMem_Allocate(len);
snprintf(buf, len, "%s%s", path, DIR_SEPARATOR);
return buf; return buf;
} }
......
...@@ -74,6 +74,8 @@ NDB_MAIN(ndb_kernel){ ...@@ -74,6 +74,8 @@ NDB_MAIN(ndb_kernel){
theConfig->fetch_configuration(); theConfig->fetch_configuration();
} }
chdir(NdbConfig_get_path(0));
if (theConfig->getDaemonMode()) { if (theConfig->getDaemonMode()) {
// Become a daemon // Become a daemon
char *lockfile= NdbConfig_PidFileName(globalData.ownId); char *lockfile= NdbConfig_PidFileName(globalData.ownId);
......
...@@ -15,6 +15,9 @@ ...@@ -15,6 +15,9 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <ndb_global.h>
#include <my_pthread.h>
#include "WatchDog.hpp" #include "WatchDog.hpp"
#include "GlobalData.hpp" #include "GlobalData.hpp"
#include <NdbOut.hpp> #include <NdbOut.hpp>
...@@ -24,7 +27,9 @@ ...@@ -24,7 +27,9 @@
extern "C" extern "C"
void* void*
runWatchDog(void* w){ runWatchDog(void* w){
my_thread_init();
((WatchDog*)w)->run(); ((WatchDog*)w)->run();
my_thread_end();
NdbThread_Exit(0); NdbThread_Exit(0);
return NULL; return NULL;
} }
......
MYSQLDATAdir = $(localstatedir) MYSQLDATAdir = $(localstatedir)
MYSQLSHAREdir = $(pkgdatadir) MYSQLSHAREdir = $(pkgdatadir)
MYSQLBASEdir= $(prefix) MYSQLBASEdir= $(prefix)
MYSQLCLUSTERdir= $(prefix)/mysql-cluster #MYSQLCLUSTERdir= $(prefix)/mysql-cluster
MYSQLCLUSTERdir= .
ndbbin_PROGRAMS = ndb_mgmd ndbbin_PROGRAMS = ndb_mgmd
......
...@@ -629,13 +629,16 @@ MgmtSrvr::start() ...@@ -629,13 +629,16 @@ MgmtSrvr::start()
if (!check_start()) if (!check_start())
return false; return false;
} }
theFacade = TransporterFacade::start_instance theFacade= TransporterFacade::theFacadeInstance = new TransporterFacade();
(_ownNodeId,(ndb_mgm_configuration*)_config->m_configValues);
if(theFacade == 0) { if(theFacade == 0) {
DEBUG("MgmtSrvr.cpp: theFacade is NULL."); DEBUG("MgmtSrvr.cpp: theFacade == 0.");
return false; return false;
} }
if ( theFacade->start_instance
(_ownNodeId, (ndb_mgm_configuration*)_config->m_configValues) < 0) {
DEBUG("MgmtSrvr.cpp: TransporterFacade::start_instance < 0.");
return false;
}
MGM_REQUIRE(_blockNumber == 1); MGM_REQUIRE(_blockNumber == 1);
...@@ -2295,7 +2298,7 @@ MgmtSrvr::signalReceivedNotification(void* mgmtSrvr, ...@@ -2295,7 +2298,7 @@ MgmtSrvr::signalReceivedNotification(void* mgmtSrvr,
//**************************************************************************** //****************************************************************************
//**************************************************************************** //****************************************************************************
void void
MgmtSrvr::nodeStatusNotification(void* mgmSrv, NodeId nodeId, MgmtSrvr::nodeStatusNotification(void* mgmSrv, Uint32 nodeId,
bool alive, bool nfComplete) bool alive, bool nfComplete)
{ {
if(!(!alive && nfComplete)) if(!(!alive && nfComplete))
......
...@@ -699,7 +699,7 @@ private: ...@@ -699,7 +699,7 @@ private:
* shall receive the notification. * shall receive the notification.
* @param processId: Id of the dead process. * @param processId: Id of the dead process.
*/ */
static void nodeStatusNotification(void* mgmSrv, NodeId nodeId, static void nodeStatusNotification(void* mgmSrv, Uint32 nodeId,
bool alive, bool nfCompleted); bool alive, bool nfCompleted);
/** /**
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <ndb_global.h> #include <ndb_global.h>
#include <my_pthread.h>
#include <ndb_limits.h> #include <ndb_limits.h>
#include <ndb_version.h> #include <ndb_version.h>
...@@ -64,7 +65,8 @@ ClusterMgr::ClusterMgr(TransporterFacade & _facade): ...@@ -64,7 +65,8 @@ ClusterMgr::ClusterMgr(TransporterFacade & _facade):
{ {
ndbSetOwnVersion(); ndbSetOwnVersion();
clusterMgrThreadMutex = NdbMutex_Create(); clusterMgrThreadMutex = NdbMutex_Create();
noOfConnectedNodes = 0; noOfConnectedNodes= 0;
theClusterMgrThread= 0;
} }
ClusterMgr::~ClusterMgr(){ ClusterMgr::~ClusterMgr(){
...@@ -137,20 +139,21 @@ ClusterMgr::startThread() { ...@@ -137,20 +139,21 @@ ClusterMgr::startThread() {
void void
ClusterMgr::doStop( ){ ClusterMgr::doStop( ){
DBUG_ENTER("ClusterMgr::doStop");
NdbMutex_Lock(clusterMgrThreadMutex); NdbMutex_Lock(clusterMgrThreadMutex);
if(theStop){ if(theStop){
NdbMutex_Unlock(clusterMgrThreadMutex); NdbMutex_Unlock(clusterMgrThreadMutex);
return; DBUG_VOID_RETURN;
} }
void *status; void *status;
theStop = 1; theStop = 1;
if (theClusterMgrThread) {
NdbThread_WaitFor(theClusterMgrThread, &status); NdbThread_WaitFor(theClusterMgrThread, &status);
NdbThread_Destroy(&theClusterMgrThread); NdbThread_Destroy(&theClusterMgrThread);
theClusterMgrThread= 0;
}
NdbMutex_Unlock(clusterMgrThreadMutex); NdbMutex_Unlock(clusterMgrThreadMutex);
DBUG_VOID_RETURN;
} }
void void
...@@ -524,6 +527,7 @@ ArbitMgr::doChoose(const Uint32* theData) ...@@ -524,6 +527,7 @@ ArbitMgr::doChoose(const Uint32* theData)
void void
ArbitMgr::doStop(const Uint32* theData) ArbitMgr::doStop(const Uint32* theData)
{ {
DBUG_ENTER("ArbitMgr::doStop");
ArbitSignal aSignal; ArbitSignal aSignal;
NdbMutex_Lock(theThreadMutex); NdbMutex_Lock(theThreadMutex);
if (theThread != NULL) { if (theThread != NULL) {
...@@ -540,6 +544,7 @@ ArbitMgr::doStop(const Uint32* theData) ...@@ -540,6 +544,7 @@ ArbitMgr::doStop(const Uint32* theData)
theState = StateInit; theState = StateInit;
} }
NdbMutex_Unlock(theThreadMutex); NdbMutex_Unlock(theThreadMutex);
DBUG_VOID_RETURN;
} }
// private methods // private methods
...@@ -548,7 +553,9 @@ extern "C" ...@@ -548,7 +553,9 @@ extern "C"
void* void*
runArbitMgr_C(void* me) runArbitMgr_C(void* me)
{ {
my_thread_init();
((ArbitMgr*) me)->threadMain(); ((ArbitMgr*) me)->threadMain();
my_thread_end();
NdbThread_Exit(0); NdbThread_Exit(0);
return NULL; return NULL;
} }
......
...@@ -34,6 +34,7 @@ libndbapi_la_SOURCES = \ ...@@ -34,6 +34,7 @@ libndbapi_la_SOURCES = \
NdbDictionary.cpp \ NdbDictionary.cpp \
NdbDictionaryImpl.cpp \ NdbDictionaryImpl.cpp \
DictCache.cpp \ DictCache.cpp \
ndb_cluster_connection.cpp \
NdbBlob.cpp NdbBlob.cpp
INCLUDES_LOC = -I$(top_srcdir)/ndb/src/mgmapi INCLUDES_LOC = -I$(top_srcdir)/ndb/src/mgmapi
......
...@@ -207,9 +207,11 @@ Remark: Disconnect all connections to the database. ...@@ -207,9 +207,11 @@ Remark: Disconnect all connections to the database.
void void
Ndb::doDisconnect() Ndb::doDisconnect()
{ {
DBUG_ENTER("Ndb::doDisconnect");
NdbConnection* tNdbCon; NdbConnection* tNdbCon;
CHECK_STATUS_MACRO_VOID; CHECK_STATUS_MACRO_VOID;
DBUG_PRINT("info", ("theNoOfDBnodes=%d", theNoOfDBnodes));
Uint32 tNoOfDbNodes = theNoOfDBnodes; Uint32 tNoOfDbNodes = theNoOfDBnodes;
UintR i; UintR i;
for (i = 0; i < tNoOfDbNodes; i++) { for (i = 0; i < tNoOfDbNodes; i++) {
...@@ -227,6 +229,7 @@ Ndb::doDisconnect() ...@@ -227,6 +229,7 @@ Ndb::doDisconnect()
tNdbCon = tNdbCon->theNext; tNdbCon = tNdbCon->theNext;
releaseConnectToNdb(tmpNdbCon); releaseConnectToNdb(tmpNdbCon);
}//while }//while
DBUG_VOID_RETURN;
}//Ndb::disconnect() }//Ndb::disconnect()
/***************************************************************************** /*****************************************************************************
...@@ -239,6 +242,7 @@ Remark: Waits until a node has status != 0 ...@@ -239,6 +242,7 @@ Remark: Waits until a node has status != 0
int int
Ndb::waitUntilReady(int timeout) Ndb::waitUntilReady(int timeout)
{ {
DBUG_ENTER("Ndb::waitUntilReady");
int secondsCounter = 0; int secondsCounter = 0;
int milliCounter = 0; int milliCounter = 0;
int noChecksSinceFirstAliveFound = 0; int noChecksSinceFirstAliveFound = 0;
...@@ -246,7 +250,7 @@ Ndb::waitUntilReady(int timeout) ...@@ -246,7 +250,7 @@ Ndb::waitUntilReady(int timeout)
if (theInitState != Initialised) { if (theInitState != Initialised) {
// Ndb::init is not called // Ndb::init is not called
theError.code = 4256; theError.code = 4256;
return -1; DBUG_RETURN(-1);
} }
do { do {
...@@ -265,13 +269,13 @@ Ndb::waitUntilReady(int timeout) ...@@ -265,13 +269,13 @@ Ndb::waitUntilReady(int timeout)
tp->unlock_mutex(); tp->unlock_mutex();
if (foundAliveNode == theNoOfDBnodes) { if (foundAliveNode == theNoOfDBnodes) {
return 0; DBUG_RETURN(0);
}//if }//if
if (foundAliveNode > 0) { if (foundAliveNode > 0) {
noChecksSinceFirstAliveFound++; noChecksSinceFirstAliveFound++;
}//if }//if
if (noChecksSinceFirstAliveFound > 30) { if (noChecksSinceFirstAliveFound > 30) {
return 0; DBUG_RETURN(0);
}//if }//if
NdbSleep_MilliSleep(100); NdbSleep_MilliSleep(100);
milliCounter += 100; milliCounter += 100;
...@@ -281,9 +285,9 @@ Ndb::waitUntilReady(int timeout) ...@@ -281,9 +285,9 @@ Ndb::waitUntilReady(int timeout)
}//if }//if
} while ( secondsCounter < timeout ); } while ( secondsCounter < timeout );
if (noChecksSinceFirstAliveFound > 0) { if (noChecksSinceFirstAliveFound > 0) {
return 0; DBUG_RETURN(0);
}//if }//if
return -1; DBUG_RETURN(-1);
} }
/***************************************************************************** /*****************************************************************************
...@@ -1060,6 +1064,9 @@ Ndb::StartTransactionNodeSelectionData::init(Uint32 noOfNodes, ...@@ -1060,6 +1064,9 @@ Ndb::StartTransactionNodeSelectionData::init(Uint32 noOfNodes,
* This algorithm should be implemented in Dbdih * This algorithm should be implemented in Dbdih
*/ */
{ {
if (fragment2PrimaryNodeMap != 0)
abort();
fragment2PrimaryNodeMap = new Uint32[noOfFragments]; fragment2PrimaryNodeMap = new Uint32[noOfFragments];
Uint32 i; Uint32 i;
for(i = 0; i<noOfNodes; i++){ for(i = 0; i<noOfNodes; i++){
......
...@@ -14,19 +14,7 @@ ...@@ -14,19 +14,7 @@
along with this program; if not, write to the Free Software along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <ndb_global.h>
/*****************************************************************************
Name: NdbConnection.C
Include:
Link:
Author: UABMNST Mona Natterkvist UAB/B/UL
Date: 970829
Version: 0.1
Description: Interface between TIS and NDB
Documentation:
Adjust: 971022 UABMNST First version.
*****************************************************************************/
#include <NdbOut.hpp> #include <NdbOut.hpp>
#include <NdbConnection.hpp> #include <NdbConnection.hpp>
#include <NdbOperation.hpp> #include <NdbOperation.hpp>
...@@ -104,7 +92,9 @@ Remark: Deletes the connection object. ...@@ -104,7 +92,9 @@ Remark: Deletes the connection object.
*****************************************************************************/ *****************************************************************************/
NdbConnection::~NdbConnection() NdbConnection::~NdbConnection()
{ {
DBUG_ENTER("NdbConnection::~NdbConnection");
theNdb->theNdbObjectIdMap->unmap(theId, this); theNdb->theNdbObjectIdMap->unmap(theId, this);
DBUG_VOID_RETURN;
}//NdbConnection::~NdbConnection() }//NdbConnection::~NdbConnection()
/***************************************************************************** /*****************************************************************************
......
...@@ -660,6 +660,7 @@ NdbDictionaryImpl::getIndexTable(NdbIndexImpl * index, ...@@ -660,6 +660,7 @@ NdbDictionaryImpl::getIndexTable(NdbIndexImpl * index,
return getTable(m_ndb.externalizeTableName(internalName)); return getTable(m_ndb.externalizeTableName(internalName));
} }
#if 0
bool bool
NdbDictInterface::setTransporter(class TransporterFacade * tf) NdbDictInterface::setTransporter(class TransporterFacade * tf)
{ {
...@@ -683,11 +684,11 @@ NdbDictInterface::setTransporter(class TransporterFacade * tf) ...@@ -683,11 +684,11 @@ NdbDictInterface::setTransporter(class TransporterFacade * tf)
return true; return true;
} }
#endif
bool bool
NdbDictInterface::setTransporter(class Ndb* ndb, class TransporterFacade * tf) NdbDictInterface::setTransporter(class Ndb* ndb, class TransporterFacade * tf)
{ {
m_blockNumber = -1;
m_reference = ndb->getReference(); m_reference = ndb->getReference();
m_transporter = tf; m_transporter = tf;
m_waiter.m_mutex = tf->theMutexPtr; m_waiter.m_mutex = tf->theMutexPtr;
...@@ -697,10 +698,6 @@ NdbDictInterface::setTransporter(class Ndb* ndb, class TransporterFacade * tf) ...@@ -697,10 +698,6 @@ NdbDictInterface::setTransporter(class Ndb* ndb, class TransporterFacade * tf)
NdbDictInterface::~NdbDictInterface() NdbDictInterface::~NdbDictInterface()
{ {
if (m_transporter != NULL){
if (m_blockNumber != -1)
m_transporter->close(m_blockNumber, 0);
}
} }
void void
...@@ -787,7 +784,7 @@ NdbDictInterface::execSignal(void* dictImpl, ...@@ -787,7 +784,7 @@ NdbDictInterface::execSignal(void* dictImpl,
} }
void void
NdbDictInterface::execNodeStatus(void* dictImpl, NodeId aNode, NdbDictInterface::execNodeStatus(void* dictImpl, Uint32 aNode,
bool alive, bool nfCompleted) bool alive, bool nfCompleted)
{ {
NdbDictInterface * tmp = (NdbDictInterface*)dictImpl; NdbDictInterface * tmp = (NdbDictInterface*)dictImpl;
......
...@@ -241,7 +241,6 @@ public: ...@@ -241,7 +241,6 @@ public:
NdbDictInterface(NdbError& err) : m_error(err) { NdbDictInterface(NdbError& err) : m_error(err) {
m_reference = 0; m_reference = 0;
m_masterNodeId = 0; m_masterNodeId = 0;
m_blockNumber = -1;
m_transporter= NULL; m_transporter= NULL;
} }
~NdbDictInterface(); ~NdbDictInterface();
...@@ -309,7 +308,6 @@ public: ...@@ -309,7 +308,6 @@ public:
private: private:
Uint32 m_reference; Uint32 m_reference;
Uint32 m_masterNodeId; Uint32 m_masterNodeId;
int m_blockNumber;
NdbWaiter m_waiter; NdbWaiter m_waiter;
class TransporterFacade * m_transporter; class TransporterFacade * m_transporter;
...@@ -319,7 +317,7 @@ private: ...@@ -319,7 +317,7 @@ private:
class NdbApiSignal* signal, class NdbApiSignal* signal,
class LinearSectionPtr ptr[3]); class LinearSectionPtr ptr[3]);
static void execNodeStatus(void* dictImpl, NodeId, static void execNodeStatus(void* dictImpl, Uint32,
bool alive, bool nfCompleted); bool alive, bool nfCompleted);
void execGET_TABINFO_REF(NdbApiSignal *, LinearSectionPtr ptr[3]); void execGET_TABINFO_REF(NdbApiSignal *, LinearSectionPtr ptr[3]);
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
along with this program; if not, write to the Free Software along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <ndb_global.h>
#include "NdbImpl.hpp" #include "NdbImpl.hpp"
#include <NdbReceiver.hpp> #include <NdbReceiver.hpp>
#include "NdbDictionaryImpl.hpp" #include "NdbDictionaryImpl.hpp"
...@@ -36,10 +37,12 @@ NdbReceiver::NdbReceiver(Ndb *aNdb) : ...@@ -36,10 +37,12 @@ NdbReceiver::NdbReceiver(Ndb *aNdb) :
NdbReceiver::~NdbReceiver() NdbReceiver::~NdbReceiver()
{ {
DBUG_ENTER("NdbReceiver::~NdbReceiver");
if (m_id != NdbObjectIdMap::InvalidId) { if (m_id != NdbObjectIdMap::InvalidId) {
m_ndb->theNdbObjectIdMap->unmap(m_id, this); m_ndb->theNdbObjectIdMap->unmap(m_id, this);
} }
delete[] m_rows; delete[] m_rows;
DBUG_VOID_RETURN;
} }
void void
......
...@@ -15,6 +15,8 @@ ...@@ -15,6 +15,8 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <ndb_global.h>
#include "NdbApiSignal.hpp" #include "NdbApiSignal.hpp"
#include "NdbImpl.hpp" #include "NdbImpl.hpp"
#include "NdbOperation.hpp" #include "NdbOperation.hpp"
...@@ -53,6 +55,8 @@ ...@@ -53,6 +55,8 @@
int int
Ndb::init(int aMaxNoOfTransactions) Ndb::init(int aMaxNoOfTransactions)
{ {
DBUG_ENTER("Ndb::init");
int i; int i;
int aNrOfCon; int aNrOfCon;
int aNrOfOp; int aNrOfOp;
...@@ -67,7 +71,7 @@ Ndb::init(int aMaxNoOfTransactions) ...@@ -67,7 +71,7 @@ Ndb::init(int aMaxNoOfTransactions)
theError.code = 4104; theError.code = 4104;
break; break;
} }
return -1; DBUG_RETURN(-1);
}//if }//if
theInitState = StartingInit; theInitState = StartingInit;
TransporterFacade * theFacade = TransporterFacade::instance(); TransporterFacade * theFacade = TransporterFacade::instance();
...@@ -75,37 +79,17 @@ Ndb::init(int aMaxNoOfTransactions) ...@@ -75,37 +79,17 @@ Ndb::init(int aMaxNoOfTransactions)
const int tBlockNo = theFacade->open(this, const int tBlockNo = theFacade->open(this,
executeMessage, executeMessage,
statusMessage); statusMessage);
if ( tBlockNo == -1 ) { if ( tBlockNo == -1 ) {
theError.code = 4105; theError.code = 4105;
theFacade->unlock_mutex(); theFacade->unlock_mutex();
return -1; // no more free blocknumbers DBUG_RETURN(-1); // no more free blocknumbers
}//if }//if
theNdbBlockNumber = tBlockNo; theNdbBlockNumber = tBlockNo;
theNode = theFacade->ownId();
theMyRef = numberToRef(theNdbBlockNumber, theNode);
for (i = 1; i < MAX_NDB_NODES; i++){
if (theFacade->getIsDbNode(i)){
theDBnodes[theNoOfDBnodes] = i;
theNoOfDBnodes++;
}
}
theFirstTransId = ((Uint64)theNdbBlockNumber << 52)+((Uint64)theNode << 40);
theFirstTransId += theFacade->m_max_trans_id;
theFacade->unlock_mutex(); theFacade->unlock_mutex();
theDictionary = new NdbDictionaryImpl(*this);
if (theDictionary == NULL) {
theError.code = 4000;
return -1;
}
theDictionary->setTransporter(this, theFacade); theDictionary->setTransporter(this, theFacade);
aNrOfCon = theNoOfDBnodes; aNrOfCon = theNoOfDBnodes;
...@@ -144,9 +128,6 @@ Ndb::init(int aMaxNoOfTransactions) ...@@ -144,9 +128,6 @@ Ndb::init(int aMaxNoOfTransactions)
theSentTransactionsArray[i] = NULL; theSentTransactionsArray[i] = NULL;
theCompletedTransactionsArray[i] = NULL; theCompletedTransactionsArray[i] = NULL;
}//for }//for
startTransactionNodeSelectionData.init(theNoOfDBnodes, theDBnodes);
for (i = 0; i < 16; i++){ for (i = 0; i < 16; i++){
tSignal[i] = getSignal(); tSignal[i] = getSignal();
if(tSignal[i] == NULL) { if(tSignal[i] == NULL) {
...@@ -156,11 +137,8 @@ Ndb::init(int aMaxNoOfTransactions) ...@@ -156,11 +137,8 @@ Ndb::init(int aMaxNoOfTransactions)
} }
for (i = 0; i < 16; i++) for (i = 0; i < 16; i++)
releaseSignal(tSignal[i]); releaseSignal(tSignal[i]);
theInitState = Initialised; theInitState = Initialised;
DBUG_RETURN(0);
theCommitAckSignal = new NdbApiSignal(theMyRef);
return 0;
error_handler: error_handler:
ndbout << "error_handler" << endl; ndbout << "error_handler" << endl;
...@@ -176,12 +154,13 @@ Ndb::init(int aMaxNoOfTransactions) ...@@ -176,12 +154,13 @@ Ndb::init(int aMaxNoOfTransactions)
delete theDictionary; delete theDictionary;
TransporterFacade::instance()->close(theNdbBlockNumber, 0); TransporterFacade::instance()->close(theNdbBlockNumber, 0);
return -1; DBUG_RETURN(-1);
} }
void void
Ndb::releaseTransactionArrays() Ndb::releaseTransactionArrays()
{ {
DBUG_ENTER("Ndb::releaseTransactionArrays");
if (thePreparedTransactionsArray != NULL) { if (thePreparedTransactionsArray != NULL) {
delete [] thePreparedTransactionsArray; delete [] thePreparedTransactionsArray;
}//if }//if
...@@ -191,6 +170,7 @@ Ndb::releaseTransactionArrays() ...@@ -191,6 +170,7 @@ Ndb::releaseTransactionArrays()
if (theCompletedTransactionsArray != NULL) { if (theCompletedTransactionsArray != NULL) {
delete [] theCompletedTransactionsArray; delete [] theCompletedTransactionsArray;
}//if }//if
DBUG_VOID_RETURN;
}//Ndb::releaseTransactionArrays() }//Ndb::releaseTransactionArrays()
void void
...@@ -202,13 +182,46 @@ Ndb::executeMessage(void* NdbObject, ...@@ -202,13 +182,46 @@ Ndb::executeMessage(void* NdbObject,
tNdb->handleReceivedSignal(aSignal, ptr); tNdb->handleReceivedSignal(aSignal, ptr);
} }
void Ndb::connected(Uint32 ref)
{
theMyRef= ref;
theNode= refToNode(theMyRef);
if (theNdbBlockNumber >= 0)
assert(theMyRef == numberToRef(theNdbBlockNumber, theNode));
TransporterFacade * theFacade = TransporterFacade::instance();
int i;
theNoOfDBnodes= 0;
for (i = 1; i < MAX_NDB_NODES; i++){
if (theFacade->getIsDbNode(i)){
theDBnodes[theNoOfDBnodes] = i;
theNoOfDBnodes++;
}
}
theFirstTransId = ((Uint64)theNdbBlockNumber << 52)+
((Uint64)theNode << 40);
theFirstTransId += theFacade->m_max_trans_id;
// assert(0);
DBUG_PRINT("info",("connected with ref=%x, id=%d, no_db_nodes=%d, first_trans_id=%d",
theMyRef,
theNode,
theNoOfDBnodes,
theFirstTransId));
startTransactionNodeSelectionData.init(theNoOfDBnodes, theDBnodes);
theCommitAckSignal = new NdbApiSignal(theMyRef);
theDictionary->m_receiver.m_reference= theMyRef;
}
void void
Ndb::statusMessage(void* NdbObject, NodeId a_node, bool alive, bool nfComplete) Ndb::statusMessage(void* NdbObject, Uint32 a_node, bool alive, bool nfComplete)
{ {
DBUG_ENTER("Ndb::statusMessage");
Ndb* tNdb = (Ndb*)NdbObject; Ndb* tNdb = (Ndb*)NdbObject;
if (alive) { if (alive) {
if (nfComplete) { if (nfComplete) {
assert(0); tNdb->connected(a_node);
DBUG_VOID_RETURN;
}//if }//if
} else { } else {
if (nfComplete) { if (nfComplete) {
...@@ -219,6 +232,7 @@ Ndb::statusMessage(void* NdbObject, NodeId a_node, bool alive, bool nfComplete) ...@@ -219,6 +232,7 @@ Ndb::statusMessage(void* NdbObject, NodeId a_node, bool alive, bool nfComplete)
}//if }//if
NdbDictInterface::execNodeStatus(&tNdb->theDictionary->m_receiver, NdbDictInterface::execNodeStatus(&tNdb->theDictionary->m_receiver,
a_node, alive, nfComplete); a_node, alive, nfComplete);
DBUG_VOID_RETURN;
} }
void void
......
...@@ -42,6 +42,7 @@ void NdbGlobalEventBuffer_drop(NdbGlobalEventBufferHandle *); ...@@ -42,6 +42,7 @@ void NdbGlobalEventBuffer_drop(NdbGlobalEventBufferHandle *);
static int theNoOfNdbObjects = 0; static int theNoOfNdbObjects = 0;
static char *ndbConnectString = 0; static char *ndbConnectString = 0;
static Ndb_cluster_connection *global_ndb_cluster_connection= 0;
#if defined NDB_WIN32 || defined SCO #if defined NDB_WIN32 || defined SCO
static NdbMutex & createNdbMutex = * NdbMutex_Create(); static NdbMutex & createNdbMutex = * NdbMutex_Create();
...@@ -56,45 +57,74 @@ Ndb(const char* aDataBase); ...@@ -56,45 +57,74 @@ Ndb(const char* aDataBase);
Parameters: aDataBase : Name of the database. Parameters: aDataBase : Name of the database.
Remark: Connect to the database. Remark: Connect to the database.
***************************************************************************/ ***************************************************************************/
Ndb::Ndb( const char* aDataBase , const char* aSchema) : Ndb::Ndb( const char* aDataBase , const char* aSchema) {
theNdbObjectIdMap(0), if (global_ndb_cluster_connection == 0) {
thePreparedTransactionsArray(NULL), if (theNoOfNdbObjects > 0)
theSentTransactionsArray(NULL), abort(); // old and new Ndb constructor used mixed
theCompletedTransactionsArray(NULL), global_ndb_cluster_connection= new Ndb_cluster_connection(ndbConnectString);
theNoOfPreparedTransactions(0), global_ndb_cluster_connection->connect();
theNoOfSentTransactions(0), }
theNoOfCompletedTransactions(0), setup(global_ndb_cluster_connection, aDataBase, aSchema);
theNoOfAllocatedTransactions(0), }
theMaxNoOfTransactions(0),
theMinNoOfEventsToWakeUp(0), Ndb::Ndb( Ndb_cluster_connection *ndb_cluster_connection,
prefixEnd(NULL), const char* aDataBase , const char* aSchema)
theImpl(NULL), {
theDictionary(NULL), if (global_ndb_cluster_connection != 0 &&
theConIdleList(NULL), global_ndb_cluster_connection != ndb_cluster_connection)
theOpIdleList(NULL), abort(); // old and new Ndb constructor used mixed
theScanOpIdleList(NULL), setup(ndb_cluster_connection, aDataBase, aSchema);
theIndexOpIdleList(NULL), }
// theSchemaConIdleList(NULL),
// theSchemaConToNdbList(NULL), void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection,
theTransactionList(NULL), const char* aDataBase , const char* aSchema)
theConnectionArray(NULL),
theRecAttrIdleList(NULL),
theSignalIdleList(NULL),
theLabelList(NULL),
theBranchList(NULL),
theSubroutineList(NULL),
theCallList(NULL),
theScanList(NULL),
theNdbBlobIdleList(NULL),
theNoOfDBnodes(0),
theDBnodes(NULL),
the_release_ind(NULL),
the_last_check_time(0),
theFirstTransId(0),
theRestartGCI(0),
theNdbBlockNumber(-1),
theInitState(NotConstructed)
{ {
DBUG_ENTER("Ndb::setup");
theNdbObjectIdMap= 0;
m_ndb_cluster_connection= ndb_cluster_connection;
thePreparedTransactionsArray= NULL;
theSentTransactionsArray= NULL;
theCompletedTransactionsArray= NULL;
theNoOfPreparedTransactions= 0;
theNoOfSentTransactions= 0;
theNoOfCompletedTransactions= 0;
theNoOfAllocatedTransactions= 0;
theMaxNoOfTransactions= 0;
theMinNoOfEventsToWakeUp= 0;
prefixEnd= NULL;
theImpl= NULL;
theDictionary= NULL;
theConIdleList= NULL;
theOpIdleList= NULL;
theScanOpIdleList= NULL;
theIndexOpIdleList= NULL;
// theSchemaConIdleList= NULL;
// theSchemaConToNdbList= NULL;
theTransactionList= NULL;
theConnectionArray= NULL;
theRecAttrIdleList= NULL;
theSignalIdleList= NULL;
theLabelList= NULL;
theBranchList= NULL;
theSubroutineList= NULL;
theCallList= NULL;
theScanList= NULL;
theNdbBlobIdleList= NULL;
theNoOfDBnodes= 0;
theDBnodes= NULL;
the_release_ind= NULL;
the_last_check_time= 0;
theFirstTransId= 0;
theRestartGCI= 0;
theNdbBlockNumber= -1;
theInitState= NotConstructed;
theNode= 0;
theFirstTransId= 0;
theMyRef= 0;
theNoOfDBnodes= 0;
fullyQualifiedNames = true; fullyQualifiedNames = true;
cgetSignals =0; cgetSignals =0;
...@@ -135,18 +165,8 @@ Ndb::Ndb( const char* aDataBase , const char* aSchema) : ...@@ -135,18 +165,8 @@ Ndb::Ndb( const char* aDataBase , const char* aSchema) :
NdbMutex_Lock(&createNdbMutex); NdbMutex_Lock(&createNdbMutex);
TransporterFacade * m_facade = 0; theWaiter.m_mutex = TransporterFacade::instance()->theMutexPtr;
if(theNoOfNdbObjects == 0){
if ((m_facade = TransporterFacade::start_instance(ndbConnectString)) == 0)
theInitState = InitConfigError;
} else {
m_facade = TransporterFacade::instance();
}
if(m_facade != 0){
theWaiter.m_mutex = m_facade->theMutexPtr;
}
// For keeping track of how many Ndb objects that exists. // For keeping track of how many Ndb objects that exists.
theNoOfNdbObjects += 1; theNoOfNdbObjects += 1;
...@@ -167,6 +187,13 @@ Ndb::Ndb( const char* aDataBase , const char* aSchema) : ...@@ -167,6 +187,13 @@ Ndb::Ndb( const char* aDataBase , const char* aSchema) :
} }
NdbMutex_Unlock(&createNdbMutex); NdbMutex_Unlock(&createNdbMutex);
theDictionary = new NdbDictionaryImpl(*this);
if (theDictionary == NULL) {
ndbout_c("Ndb cailed to allocate dictionary");
exit(-1);
}
DBUG_VOID_RETURN;
} }
...@@ -187,6 +214,7 @@ void Ndb::setConnectString(const char * connectString) ...@@ -187,6 +214,7 @@ void Ndb::setConnectString(const char * connectString)
*****************************************************************************/ *****************************************************************************/
Ndb::~Ndb() Ndb::~Ndb()
{ {
DBUG_ENTER("Ndb::~Ndb()");
doDisconnect(); doDisconnect();
delete theDictionary; delete theDictionary;
...@@ -203,6 +231,10 @@ Ndb::~Ndb() ...@@ -203,6 +231,10 @@ Ndb::~Ndb()
theNoOfNdbObjects -= 1; theNoOfNdbObjects -= 1;
if(theNoOfNdbObjects == 0){ if(theNoOfNdbObjects == 0){
TransporterFacade::stop_instance(); TransporterFacade::stop_instance();
if (global_ndb_cluster_connection != 0) {
delete global_ndb_cluster_connection;
global_ndb_cluster_connection= 0;
}
}//if }//if
NdbMutex_Unlock(&createNdbMutex); NdbMutex_Unlock(&createNdbMutex);
...@@ -271,6 +303,7 @@ Ndb::~Ndb() ...@@ -271,6 +303,7 @@ Ndb::~Ndb()
assert(cnewSignals == cfreeSignals); assert(cnewSignals == cfreeSignals);
assert(cgetSignals == creleaseSignals); assert(cgetSignals == creleaseSignals);
#endif #endif
DBUG_VOID_RETURN;
} }
NdbWaiter::NdbWaiter(){ NdbWaiter::NdbWaiter(){
......
...@@ -783,6 +783,7 @@ Remark: Release and disconnect from DBTC a connection and seize it to th ...@@ -783,6 +783,7 @@ Remark: Release and disconnect from DBTC a connection and seize it to th
void void
Ndb::releaseConnectToNdb(NdbConnection* a_con) Ndb::releaseConnectToNdb(NdbConnection* a_con)
{ {
DBUG_ENTER("Ndb::releaseConnectToNdb");
NdbApiSignal tSignal(theMyRef); NdbApiSignal tSignal(theMyRef);
int tConPtr; int tConPtr;
...@@ -790,7 +791,7 @@ Ndb::releaseConnectToNdb(NdbConnection* a_con) ...@@ -790,7 +791,7 @@ Ndb::releaseConnectToNdb(NdbConnection* a_con)
// manage to reach NDB or not. // manage to reach NDB or not.
if (a_con == NULL) if (a_con == NULL)
return; DBUG_VOID_RETURN;
Uint32 node_id = a_con->getConnectedNodeId(); Uint32 node_id = a_con->getConnectedNodeId();
Uint32 conn_seq = a_con->theNodeSequence; Uint32 conn_seq = a_con->theNodeSequence;
...@@ -821,6 +822,6 @@ Ndb::releaseConnectToNdb(NdbConnection* a_con) ...@@ -821,6 +822,6 @@ Ndb::releaseConnectToNdb(NdbConnection* a_con)
abort(); abort();
}//if }//if
releaseNdbCon(a_con); releaseNdbCon(a_con);
return; DBUG_VOID_RETURN;
} }
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <ndb_global.h> #include <ndb_global.h>
#include <my_pthread.h>
#include <ndb_limits.h> #include <ndb_limits.h>
#include "TransporterFacade.hpp" #include "TransporterFacade.hpp"
#include "ClusterMgr.hpp" #include "ClusterMgr.hpp"
...@@ -37,6 +38,16 @@ ...@@ -37,6 +38,16 @@
//#define REPORT_TRANSPORTER //#define REPORT_TRANSPORTER
//#define API_TRACE; //#define API_TRACE;
static int numberToIndex(int number)
{
return number - MIN_API_BLOCK_NO;
}
static int indexToNumber(int index)
{
return index + MIN_API_BLOCK_NO;
}
#if defined DEBUG_TRANSPORTER #if defined DEBUG_TRANSPORTER
#define TRP_DEBUG(t) ndbout << __FILE__ << ":" << __LINE__ << ":" << t << endl; #define TRP_DEBUG(t) ndbout << __FILE__ << ":" << __LINE__ << ":" << t << endl;
#else #else
...@@ -44,8 +55,6 @@ ...@@ -44,8 +55,6 @@
#endif #endif
TransporterFacade* TransporterFacade::theFacadeInstance = NULL; TransporterFacade* TransporterFacade::theFacadeInstance = NULL;
ConfigRetriever *TransporterFacade::s_config_retriever= 0;
/***************************************************************************** /*****************************************************************************
* Call back functions * Call back functions
...@@ -321,12 +330,6 @@ copy(Uint32 * & insertPtr, ...@@ -321,12 +330,6 @@ copy(Uint32 * & insertPtr,
abort(); abort();
} }
extern "C"
void
atexit_stop_instance(){
TransporterFacade::stop_instance();
}
/** /**
* Note that this function need no locking since its * Note that this function need no locking since its
* only called from the constructor of Ndb (the NdbObject) * only called from the constructor of Ndb (the NdbObject)
...@@ -334,64 +337,14 @@ atexit_stop_instance(){ ...@@ -334,64 +337,14 @@ atexit_stop_instance(){
* Which is protected by a mutex * Which is protected by a mutex
*/ */
int
TransporterFacade*
TransporterFacade::start_instance(const char * connectString){
// TransporterFacade used from API get config from mgmt srvr
s_config_retriever= new ConfigRetriever(NDB_VERSION, NODE_TYPE_API);
s_config_retriever->setConnectString(connectString);
const char* error = 0;
do {
if(s_config_retriever->init() == -1)
break;
if(s_config_retriever->do_connect() == -1)
break;
Uint32 nodeId = s_config_retriever->allocNodeId();
for(Uint32 i = 0; nodeId == 0 && i<5; i++){
NdbSleep_SecSleep(3);
nodeId = s_config_retriever->allocNodeId();
}
if(nodeId == 0)
break;
ndb_mgm_configuration * props = s_config_retriever->getConfig();
if(props == 0)
break;
TransporterFacade * tf = start_instance(nodeId, props);
free(props);
return tf;
} while(0);
ndbout << "Configuration error: ";
const char* erString = s_config_retriever->getErrorString();
if (erString == 0) {
erString = "No error specified!";
}
ndbout << erString << endl;
return 0;
}
TransporterFacade*
TransporterFacade::start_instance(int nodeId, TransporterFacade::start_instance(int nodeId,
const ndb_mgm_configuration* props) const ndb_mgm_configuration* props)
{ {
TransporterFacade* tf = new TransporterFacade(); if (! theFacadeInstance->init(nodeId, props)) {
if (! tf->init(nodeId, props)) { return -1;
delete tf;
return NULL;
} }
/**
* Install atexit handler
*/
atexit(atexit_stop_instance);
/** /**
* Install signal handler for SIGPIPE * Install signal handler for SIGPIPE
* *
...@@ -402,19 +355,7 @@ TransporterFacade::start_instance(int nodeId, ...@@ -402,19 +355,7 @@ TransporterFacade::start_instance(int nodeId,
signal(SIGPIPE, SIG_IGN); signal(SIGPIPE, SIG_IGN);
#endif #endif
if(theFacadeInstance == NULL){ return 0;
theFacadeInstance = tf;
}
return tf;
}
void
TransporterFacade::close_configuration(){
if (s_config_retriever) {
delete s_config_retriever;
s_config_retriever= 0;
}
} }
/** /**
...@@ -425,23 +366,15 @@ TransporterFacade::close_configuration(){ ...@@ -425,23 +366,15 @@ TransporterFacade::close_configuration(){
*/ */
void void
TransporterFacade::stop_instance(){ TransporterFacade::stop_instance(){
DBUG_ENTER("TransporterFacade::stop_instance");
close_configuration(); if(theFacadeInstance)
theFacadeInstance->doStop();
if(theFacadeInstance == NULL){ DBUG_VOID_RETURN;
/**
* We are called from atexit function
*/
return;
}
theFacadeInstance->doStop();
delete theFacadeInstance; theFacadeInstance = NULL;
} }
void void
TransporterFacade::doStop(){ TransporterFacade::doStop(){
DBUG_ENTER("TransporterFacade::doStop");
/** /**
* First stop the ClusterMgr because it needs to send one more signal * First stop the ClusterMgr because it needs to send one more signal
* and also uses theFacadeInstance to lock/unlock theMutexPtr * and also uses theFacadeInstance to lock/unlock theMutexPtr
...@@ -454,17 +387,26 @@ TransporterFacade::doStop(){ ...@@ -454,17 +387,26 @@ TransporterFacade::doStop(){
*/ */
void *status; void *status;
theStopReceive = 1; theStopReceive = 1;
NdbThread_WaitFor(theReceiveThread, &status); if (theReceiveThread) {
NdbThread_WaitFor(theSendThread, &status); NdbThread_WaitFor(theReceiveThread, &status);
NdbThread_Destroy(&theReceiveThread); NdbThread_Destroy(&theReceiveThread);
NdbThread_Destroy(&theSendThread); theReceiveThread= 0;
}
if (theSendThread) {
NdbThread_WaitFor(theSendThread, &status);
NdbThread_Destroy(&theSendThread);
theSendThread= 0;
}
DBUG_VOID_RETURN;
} }
extern "C" extern "C"
void* void*
runSendRequest_C(void * me) runSendRequest_C(void * me)
{ {
my_thread_init();
((TransporterFacade*) me)->threadMainSend(); ((TransporterFacade*) me)->threadMainSend();
my_thread_end();
NdbThread_Exit(0); NdbThread_Exit(0);
return me; return me;
} }
...@@ -507,7 +449,9 @@ extern "C" ...@@ -507,7 +449,9 @@ extern "C"
void* void*
runReceiveResponse_C(void * me) runReceiveResponse_C(void * me)
{ {
my_thread_init();
((TransporterFacade*) me)->threadMainReceive(); ((TransporterFacade*) me)->threadMainReceive();
my_thread_end();
NdbThread_Exit(0); NdbThread_Exit(0);
return me; return me;
} }
...@@ -540,6 +484,8 @@ TransporterFacade::TransporterFacade() : ...@@ -540,6 +484,8 @@ TransporterFacade::TransporterFacade() :
theSendThread(NULL), theSendThread(NULL),
theReceiveThread(NULL) theReceiveThread(NULL)
{ {
theOwnId = 0;
theMutexPtr = NdbMutex_Create(); theMutexPtr = NdbMutex_Create();
sendPerformedLastInterval = 0; sendPerformedLastInterval = 0;
...@@ -552,6 +498,8 @@ TransporterFacade::TransporterFacade() : ...@@ -552,6 +498,8 @@ TransporterFacade::TransporterFacade() :
m_batch_byte_size= SCAN_BATCH_SIZE; m_batch_byte_size= SCAN_BATCH_SIZE;
m_batch_size= DEF_BATCH_SIZE; m_batch_size= DEF_BATCH_SIZE;
m_max_trans_id = 0; m_max_trans_id = 0;
theClusterMgr = new ClusterMgr(* this);
} }
bool bool
...@@ -570,7 +518,6 @@ TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props) ...@@ -570,7 +518,6 @@ TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props)
ndb_mgm_configuration_iterator iter(* props, CFG_SECTION_NODE); ndb_mgm_configuration_iterator iter(* props, CFG_SECTION_NODE);
iter.first(); iter.first();
theClusterMgr = new ClusterMgr(* this);
theClusterMgr->init(iter); theClusterMgr->init(iter);
/** /**
...@@ -622,7 +569,6 @@ TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props) ...@@ -622,7 +569,6 @@ TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props)
32768, 32768,
"ndb_send", "ndb_send",
NDB_THREAD_PRIO_LOW); NDB_THREAD_PRIO_LOW);
theClusterMgr->startThread(); theClusterMgr->startThread();
#ifdef API_TRACE #ifdef API_TRACE
...@@ -633,6 +579,21 @@ TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props) ...@@ -633,6 +579,21 @@ TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props)
} }
void
TransporterFacade::connected()
{
DBUG_ENTER("TransporterFacade::connected");
Uint32 sz = m_threads.m_statusNext.size();
for (Uint32 i = 0; i < sz ; i ++) {
if (m_threads.getInUse(i)){
void * obj = m_threads.m_objectExecute[i].m_object;
NodeStatusFunction RegPC = m_threads.m_statusFunction[i];
(*RegPC) (obj, numberToRef(indexToNumber(i), theOwnId), true, true);
}
}
DBUG_VOID_RETURN;
}
void void
TransporterFacade::ReportNodeDead(NodeId tNodeId) TransporterFacade::ReportNodeDead(NodeId tNodeId)
{ {
...@@ -719,7 +680,16 @@ TransporterFacade::open(void* objRef, ...@@ -719,7 +680,16 @@ TransporterFacade::open(void* objRef,
ExecuteFunction fun, ExecuteFunction fun,
NodeStatusFunction statusFun) NodeStatusFunction statusFun)
{ {
return m_threads.open(objRef, fun, statusFun); DBUG_ENTER("TransporterFacade::open");
int r= m_threads.open(objRef, fun, statusFun);
if (r < 0)
DBUG_RETURN(r);
#if 1
if (theOwnId > 0) {
(*statusFun)(objRef, numberToRef(r, theOwnId), true, true);
}
#endif
DBUG_RETURN(r);
} }
TransporterFacade::~TransporterFacade(){ TransporterFacade::~TransporterFacade(){
...@@ -762,7 +732,7 @@ TransporterFacade::calculateSendLimit() ...@@ -762,7 +732,7 @@ TransporterFacade::calculateSendLimit()
//------------------------------------------------- //-------------------------------------------------
void TransporterFacade::forceSend(Uint32 block_number) { void TransporterFacade::forceSend(Uint32 block_number) {
checkCounter--; checkCounter--;
m_threads.m_statusNext[block_number - MIN_API_BLOCK_NO] = ThreadData::ACTIVE; m_threads.m_statusNext[numberToIndex(block_number)] = ThreadData::ACTIVE;
sendPerformedLastInterval = 1; sendPerformedLastInterval = 1;
if (checkCounter < 0) { if (checkCounter < 0) {
calculateSendLimit(); calculateSendLimit();
...@@ -775,7 +745,7 @@ void TransporterFacade::forceSend(Uint32 block_number) { ...@@ -775,7 +745,7 @@ void TransporterFacade::forceSend(Uint32 block_number) {
//------------------------------------------------- //-------------------------------------------------
void void
TransporterFacade::checkForceSend(Uint32 block_number) { TransporterFacade::checkForceSend(Uint32 block_number) {
m_threads.m_statusNext[block_number - MIN_API_BLOCK_NO] = ThreadData::ACTIVE; m_threads.m_statusNext[numberToIndex(block_number)] = ThreadData::ACTIVE;
//------------------------------------------------- //-------------------------------------------------
// This code is an adaptive algorithm to discover when // This code is an adaptive algorithm to discover when
// the API should actually send its buffers. The reason // the API should actually send its buffers. The reason
...@@ -1016,11 +986,12 @@ TransporterFacade::ThreadData::expand(Uint32 size){ ...@@ -1016,11 +986,12 @@ TransporterFacade::ThreadData::expand(Uint32 size){
m_firstFree = m_statusNext.size() - size; m_firstFree = m_statusNext.size() - size;
} }
int int
TransporterFacade::ThreadData::open(void* objRef, TransporterFacade::ThreadData::open(void* objRef,
ExecuteFunction fun, ExecuteFunction fun,
NodeStatusFunction fun2){ NodeStatusFunction fun2)
{
Uint32 nextFree = m_firstFree; Uint32 nextFree = m_firstFree;
if(m_statusNext.size() >= MAX_NO_THREADS && nextFree == END_OF_LIST){ if(m_statusNext.size() >= MAX_NO_THREADS && nextFree == END_OF_LIST){
...@@ -1040,12 +1011,12 @@ TransporterFacade::ThreadData::open(void* objRef, ...@@ -1040,12 +1011,12 @@ TransporterFacade::ThreadData::open(void* objRef,
m_objectExecute[nextFree] = oe; m_objectExecute[nextFree] = oe;
m_statusFunction[nextFree] = fun2; m_statusFunction[nextFree] = fun2;
return nextFree + MIN_API_BLOCK_NO; return indexToNumber(nextFree);
} }
int int
TransporterFacade::ThreadData::close(int number){ TransporterFacade::ThreadData::close(int number){
number -= MIN_API_BLOCK_NO; number= numberToIndex(number);
assert(getInUse(number)); assert(getInUse(number));
m_statusNext[number] = m_firstFree; m_statusNext[number] = m_firstFree;
m_firstFree = number; m_firstFree = number;
......
...@@ -35,7 +35,7 @@ class Ndb; ...@@ -35,7 +35,7 @@ class Ndb;
class NdbApiSignal; class NdbApiSignal;
typedef void (* ExecuteFunction)(void *, NdbApiSignal *, LinearSectionPtr ptr[3]); typedef void (* ExecuteFunction)(void *, NdbApiSignal *, LinearSectionPtr ptr[3]);
typedef void (* NodeStatusFunction)(void *, NodeId, bool nodeAlive, bool nfComplete); typedef void (* NodeStatusFunction)(void *, Uint32, bool nodeAlive, bool nfComplete);
extern "C" { extern "C" {
void* runSendRequest_C(void*); void* runSendRequest_C(void*);
...@@ -55,9 +55,7 @@ public: ...@@ -55,9 +55,7 @@ public:
bool init(Uint32, const ndb_mgm_configuration *); bool init(Uint32, const ndb_mgm_configuration *);
static TransporterFacade* instance(); static TransporterFacade* instance();
static TransporterFacade* start_instance(int, const ndb_mgm_configuration*); int start_instance(int, const ndb_mgm_configuration*);
static TransporterFacade* start_instance(const char *connectString);
static void close_configuration();
static void stop_instance(); static void stop_instance();
/** /**
...@@ -93,6 +91,8 @@ public: ...@@ -93,6 +91,8 @@ public:
// My own processor id // My own processor id
NodeId ownId() const; NodeId ownId() const;
void connected();
void doConnect(int NodeId); void doConnect(int NodeId);
void reportConnected(int NodeId); void reportConnected(int NodeId);
void doDisconnect(int NodeId); void doDisconnect(int NodeId);
...@@ -130,6 +130,7 @@ private: ...@@ -130,6 +130,7 @@ private:
friend class ExtSender; ///< @todo Hack to be able to sendSignalUnCond friend class ExtSender; ///< @todo Hack to be able to sendSignalUnCond
friend class GrepSS; friend class GrepSS;
friend class Ndb; friend class Ndb;
friend class Ndb_cluster_connection;
int sendSignalUnCond(NdbApiSignal *, NodeId nodeId); int sendSignalUnCond(NdbApiSignal *, NodeId nodeId);
......
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <ndb_global.h>
#include <my_pthread.h>
#include <ndb_cluster_connection.hpp>
#include <TransporterFacade.hpp>
#include <NdbOut.hpp>
#include <NdbSleep.h>
#include <NdbThread.h>
#include <ndb_limits.h>
#include <ConfigRetriever.hpp>
#include <ndb_version.h>
static int g_run_connect_thread= 0;
Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string)
{
m_facade= TransporterFacade::theFacadeInstance= new TransporterFacade();
if (connect_string)
m_connect_string= strdup(connect_string);
else
m_connect_string= 0;
m_config_retriever= 0;
m_connect_thread= 0;
m_connect_callback= 0;
}
extern "C" pthread_handler_decl(run_ndb_cluster_connection_connect_thread, me)
{
my_thread_init();
g_run_connect_thread= 1;
((Ndb_cluster_connection*) me)->connect_thread();
my_thread_end();
NdbThread_Exit(0);
return me;
}
void Ndb_cluster_connection::connect_thread()
{
DBUG_ENTER("Ndb_cluster_connection::connect_thread");
int r;
while (g_run_connect_thread) {
if ((r = connect(1)) == 0)
break;
if (r == -1) {
printf("Ndb_cluster_connection::connect_thread error\n");
abort();
}
}
if (m_connect_callback)
(*m_connect_callback)();
DBUG_VOID_RETURN;
}
int Ndb_cluster_connection::start_connect_thread(int (*connect_callback)(void))
{
DBUG_ENTER("Ndb_cluster_connection::start_connect_thread");
m_connect_callback= connect_callback;
m_connect_thread= NdbThread_Create(run_ndb_cluster_connection_connect_thread,
(void**)this,
32768,
"ndb_cluster_connection",
NDB_THREAD_PRIO_LOW);
DBUG_RETURN(0);
}
int Ndb_cluster_connection::connect(int reconnect)
{
DBUG_ENTER("Ndb_cluster_connection::connect");
const char* error = 0;
do {
if (m_config_retriever == 0)
{
m_config_retriever= new ConfigRetriever(NDB_VERSION, NODE_TYPE_API);
m_config_retriever->setConnectString(m_connect_string);
if(m_config_retriever->init() == -1)
break;
}
else
if (reconnect == 0)
DBUG_RETURN(0);
if (reconnect)
{
int r= m_config_retriever->do_connect(1);
if (r == 1)
DBUG_RETURN(1); // mgmt server not up yet
if (r == -1)
break;
}
else
if(m_config_retriever->do_connect() == -1)
break;
Uint32 nodeId = m_config_retriever->allocNodeId();
for(Uint32 i = 0; nodeId == 0 && i<5; i++){
NdbSleep_SecSleep(3);
nodeId = m_config_retriever->allocNodeId();
}
if(nodeId == 0)
break;
ndb_mgm_configuration * props = m_config_retriever->getConfig();
if(props == 0)
break;
m_facade->start_instance(nodeId, props);
free(props);
m_facade->connected();
DBUG_RETURN(0);
} while(0);
ndbout << "Configuration error: ";
const char* erString = m_config_retriever->getErrorString();
if (erString == 0) {
erString = "No error specified!";
}
ndbout << erString << endl;
DBUG_RETURN(-1);
}
Ndb_cluster_connection::~Ndb_cluster_connection()
{
if (m_connect_thread)
{
void *status;
g_run_connect_thread= 0;
NdbThread_WaitFor(m_connect_thread, &status);
NdbThread_Destroy(&m_connect_thread);
m_connect_thread= 0;
}
if (m_facade != 0)
{
delete m_facade;
if (m_facade != TransporterFacade::theFacadeInstance)
abort();
TransporterFacade::theFacadeInstance= 0;
}
if (m_connect_string)
free(m_connect_string);
if (m_config_retriever)
delete m_config_retriever;
}
...@@ -22,12 +22,13 @@ ...@@ -22,12 +22,13 @@
*/ */
#include <ndb_global.h> #include <ndb_global.h>
#include <my_sys.h>
#include <getarg.h> #include <getarg.h>
#include <NdbApi.hpp> #include <NdbApi.hpp>
#include <NDBT.hpp> #include <NDBT.hpp>
static Ndb_cluster_connection *ndb_cluster_connection= 0;
static Ndb* ndb = 0; static Ndb* ndb = 0;
static NdbDictionary::Dictionary* dic = 0; static NdbDictionary::Dictionary* dic = 0;
static int _unqualified = 0; static int _unqualified = 0;
...@@ -48,6 +49,22 @@ fatal(char const* fmt, ...) ...@@ -48,6 +49,22 @@ fatal(char const* fmt, ...)
exit(1); exit(1);
} }
static void
fatal_dict(char const* fmt, ...)
{
va_list ap;
char buf[500];
va_start(ap, fmt);
vsnprintf(buf, sizeof(buf), fmt, ap);
va_end(ap);
ndbout << buf;
if (dic)
ndbout << " - " << dic->getNdbError();
ndbout << endl;
NDBT_ProgramExit(NDBT_FAILED);
exit(1);
}
static void static void
list(const char * tabname, list(const char * tabname,
NdbDictionary::Object::Type type) NdbDictionary::Object::Type type)
...@@ -55,10 +72,10 @@ list(const char * tabname, ...@@ -55,10 +72,10 @@ list(const char * tabname,
NdbDictionary::Dictionary::List list; NdbDictionary::Dictionary::List list;
if (tabname == 0) { if (tabname == 0) {
if (dic->listObjects(list, type) == -1) if (dic->listObjects(list, type) == -1)
fatal("listObjects"); fatal_dict("listObjects");
} else { } else {
if (dic->listIndexes(list, tabname) == -1) if (dic->listIndexes(list, tabname) == -1)
fatal("listIndexes"); fatal_dict("listIndexes");
} }
if (ndb->usingFullyQualifiedNames()) if (ndb->usingFullyQualifiedNames())
ndbout_c("%-5s %-20s %-8s %-7s %-12s %-8s %s", "id", "type", "state", "logging", "database", "schema", "name"); ndbout_c("%-5s %-20s %-8s %-7s %-12s %-8s %s", "id", "type", "state", "logging", "database", "schema", "name");
...@@ -145,12 +162,17 @@ list(const char * tabname, ...@@ -145,12 +162,17 @@ list(const char * tabname,
} }
} }
#ifndef DBUG_OFF
const char *debug_option= 0;
#endif
int main(int argc, const char** argv){ int main(int argc, const char** argv){
int _loops = 1; int _loops = 1;
const char* _tabname = NULL; const char* _tabname = NULL;
const char* _dbname = "TEST_DB"; const char* _dbname = "TEST_DB";
int _type = 0; int _type = 0;
int _help = 0; int _help = 0;
const char* _connect_str = NULL;
struct getargs args[] = { struct getargs args[] = {
{ "loops", 'l', arg_integer, &_loops, "loops", { "loops", 'l', arg_integer, &_loops, "loops",
...@@ -161,6 +183,13 @@ int main(int argc, const char** argv){ ...@@ -161,6 +183,13 @@ int main(int argc, const char** argv){
"Name of database table is in"}, "Name of database table is in"},
{ "type", 't', arg_integer, &_type, "type", { "type", 't', arg_integer, &_type, "type",
"Type of objects to show, see NdbDictionary.hpp for numbers(default = 0)" }, "Type of objects to show, see NdbDictionary.hpp for numbers(default = 0)" },
{ "connect-string", 'c', arg_string, &_connect_str,
"Set connect string for connecting to ndb_mgmd. <constr>=\"host=<hostname:port>[;nodeid=<id>]\". Overides specifying entries in NDB_CONNECTSTRING and config file",
"<constr>" },
#ifndef DBUG_OFF
{ "debug", 0, arg_string, &debug_option,
"Specify debug options e.g. d:t:i:o,out.trace", "options" },
#endif
{ "usage", '?', arg_flag, &_help, "Print help", "" } { "usage", '?', arg_flag, &_help, "Print help", "" }
}; };
int num_args = sizeof(args) / sizeof(args[0]); int num_args = sizeof(args) / sizeof(args[0]);
...@@ -179,10 +208,18 @@ int main(int argc, const char** argv){ ...@@ -179,10 +208,18 @@ int main(int argc, const char** argv){
} }
_tabname = argv[optind]; _tabname = argv[optind];
ndb = new Ndb(_dbname); #ifndef DBUG_OFF
my_init();
if (debug_option)
DBUG_PUSH(debug_option);
#endif
ndb_cluster_connection = new Ndb_cluster_connection(_connect_str);
ndb = new Ndb(ndb_cluster_connection, _dbname);
ndb->useFullyQualifiedNames(!_unqualified); ndb->useFullyQualifiedNames(!_unqualified);
if (ndb->init() != 0) if (ndb->init() != 0)
fatal("init"); fatal("init");
ndb_cluster_connection->connect();
if (ndb->waitUntilReady(30) < 0) if (ndb->waitUntilReady(30) < 0)
fatal("waitUntilReady"); fatal("waitUntilReady");
dic = ndb->getDictionary(); dic = ndb->getDictionary();
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#include <ndb_global.h> #include <ndb_global.h>
#include <my_sys.h>
#include <NdbOut.hpp> #include <NdbOut.hpp>
...@@ -26,6 +27,9 @@ ...@@ -26,6 +27,9 @@
#include <getarg.h> #include <getarg.h>
#include <NdbScanFilter.hpp> #include <NdbScanFilter.hpp>
#ifndef DBUG_OFF
const char *debug_option= 0;
#endif
int scanReadRecords(Ndb*, int scanReadRecords(Ndb*,
const NdbDictionary::Table*, const NdbDictionary::Table*,
...@@ -58,6 +62,10 @@ int main(int argc, const char** argv){ ...@@ -58,6 +62,10 @@ int main(int argc, const char** argv){
"Output numbers in hexadecimal format", "useHexFormat" }, "Output numbers in hexadecimal format", "useHexFormat" },
{ "delimiter", 'd', arg_string, &_delimiter, "Column delimiter", { "delimiter", 'd', arg_string, &_delimiter, "Column delimiter",
"delimiter" }, "delimiter" },
#ifndef DBUG_OFF
{ "debug", 0, arg_string, &debug_option,
"Specify debug options e.g. d:t:i:o,out.trace", "options" },
#endif
{ "usage", '?', arg_flag, &_help, "Print help", "" }, { "usage", '?', arg_flag, &_help, "Print help", "" },
{ "lock", 'l', arg_integer, &_lock, { "lock", 'l', arg_integer, &_lock,
"Read(0), Read-hold(1), Exclusive(2)", "lock"}, "Read(0), Read-hold(1), Exclusive(2)", "lock"},
...@@ -80,6 +88,12 @@ int main(int argc, const char** argv){ ...@@ -80,6 +88,12 @@ int main(int argc, const char** argv){
} }
_tabname = argv[optind]; _tabname = argv[optind];
#ifndef DBUG_OFF
my_init();
if (debug_option)
DBUG_PUSH(debug_option);
#endif
// Connect to Ndb // Connect to Ndb
Ndb MyNdb(_dbname); Ndb MyNdb(_dbname);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment