Commit b9b4338f authored by unknown's avatar unknown

added connect thread to Ndb_cluster_connection +

some other small fixes


ndb/include/mgmcommon/ConfigRetriever.hpp:
  added options to do_connect to contol how connects failures should be treated
ndb/include/mgmcommon/NdbConfig.h:
  method to retrieve datadir path (to user for chdir)
ndb/include/ndbapi/ndb_cluster_connection.hpp:
  Added connect thread
ndb/src/common/mgmcommon/ConfigRetriever.cpp:
  added options to do_connect to contol how connects failures should be treated
ndb/src/common/mgmcommon/NdbConfig.c:
  method to retrieve datadir path (to user for chdir)
ndb/src/kernel/main.cpp:
  ndbd to do chdir
ndb/src/kernel/vm/WatchDog.cpp:
  added my_thread_init for debug
ndb/src/ndbapi/ClusterMgr.cpp:
  added my_thread_init for debug
ndb/src/ndbapi/TransporterFacade.cpp:
  removed call to atexit
ndb/src/ndbapi/ndb_cluster_connection.cpp:
  added connect thread
parent fe89db88
...@@ -37,7 +37,7 @@ public: ...@@ -37,7 +37,7 @@ public:
*/ */
int init(); int init();
int do_connect(); int do_connect(int exit_on_connect_failure= false);
/** /**
* Get configuration for current (nodeId given in local config file) node. * Get configuration for current (nodeId given in local config file) node.
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
extern "C" { extern "C" {
#endif #endif
const char* NdbConfig_get_path(int *len);
void NdbConfig_SetPath(const char *path); void NdbConfig_SetPath(const char *path);
char* NdbConfig_NdbCfgName(int with_ndb_home); char* NdbConfig_NdbCfgName(int with_ndb_home);
char* NdbConfig_ErrorFileName(int node_id); char* NdbConfig_ErrorFileName(int node_id);
......
...@@ -20,16 +20,26 @@ ...@@ -20,16 +20,26 @@
class TransporterFacade; class TransporterFacade;
class ConfigRetriever; class ConfigRetriever;
class NdbThread;
extern "C" {
void* run_ndb_cluster_connection_connect_thread(void*);
}
class Ndb_cluster_connection { class Ndb_cluster_connection {
public: public:
Ndb_cluster_connection(const char * connect_string = 0); Ndb_cluster_connection(const char * connect_string = 0);
~Ndb_cluster_connection(); ~Ndb_cluster_connection();
int connect(); int connect(int reconnect= 0);
int start_connect_thread(int (*connect_callback)(void)= 0);
private: private:
friend void* run_ndb_cluster_connection_connect_thread(void*);
void connect_thread();
char *m_connect_string; char *m_connect_string;
TransporterFacade *m_facade; TransporterFacade *m_facade;
ConfigRetriever *m_config_retriever; ConfigRetriever *m_config_retriever;
NdbThread *m_connect_thread;
int (*m_connect_callback)(void);
}; };
#endif #endif
...@@ -78,7 +78,7 @@ ConfigRetriever::init() { ...@@ -78,7 +78,7 @@ ConfigRetriever::init() {
} }
int int
ConfigRetriever::do_connect(){ ConfigRetriever::do_connect(int exit_on_connect_failure){
if(!m_handle) if(!m_handle)
m_handle= ndb_mgm_create_handle(); m_handle= ndb_mgm_create_handle();
...@@ -102,6 +102,8 @@ ConfigRetriever::do_connect(){ ...@@ -102,6 +102,8 @@ ConfigRetriever::do_connect(){
if (ndb_mgm_connect(m_handle, tmp.c_str()) == 0) { if (ndb_mgm_connect(m_handle, tmp.c_str()) == 0) {
return 0; return 0;
} }
if (exit_on_connect_failure)
return 1;
setError(CR_RETRY, ndb_mgm_get_latest_error_desc(m_handle)); setError(CR_RETRY, ndb_mgm_get_latest_error_desc(m_handle));
case MgmId_File: case MgmId_File:
break; break;
......
...@@ -21,27 +21,34 @@ ...@@ -21,27 +21,34 @@
static char *datadir_path= 0; static char *datadir_path= 0;
static char* const char *
NdbConfig_AllocHomePath(int _len) NdbConfig_get_path(int *_len)
{ {
const char *path= NdbEnv_GetEnv("NDB_HOME", 0, 0); const char *path= NdbEnv_GetEnv("NDB_HOME", 0, 0);
int len= _len;
int path_len= 0; int path_len= 0;
char *buf;
if (path == 0)
path= datadir_path;
if (path) if (path)
path_len= strlen(path); path_len= strlen(path);
if (path_len == 0 && datadir_path) {
path= datadir_path;
path_len= strlen(path);
}
if (path_len == 0) {
path= ".";
path_len= strlen(path);
}
if (_len)
*_len= path_len;
return path;
}
len+= path_len; static char*
buf= NdbMem_Allocate(len); NdbConfig_AllocHomePath(int _len)
if (path_len > 0) {
snprintf(buf, len, "%s%s", path, DIR_SEPARATOR); int path_len;
else const char *path= NdbConfig_get_path(&path_len);
buf[0]= 0; int len= _len+path_len;
char *buf= NdbMem_Allocate(len);
snprintf(buf, len, "%s%s", path, DIR_SEPARATOR);
return buf; return buf;
} }
......
...@@ -74,6 +74,8 @@ NDB_MAIN(ndb_kernel){ ...@@ -74,6 +74,8 @@ NDB_MAIN(ndb_kernel){
theConfig->fetch_configuration(); theConfig->fetch_configuration();
} }
chdir(NdbConfig_get_path(0));
if (theConfig->getDaemonMode()) { if (theConfig->getDaemonMode()) {
// Become a daemon // Become a daemon
char *lockfile= NdbConfig_PidFileName(globalData.ownId); char *lockfile= NdbConfig_PidFileName(globalData.ownId);
......
...@@ -15,6 +15,9 @@ ...@@ -15,6 +15,9 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <ndb_global.h>
#include <my_pthread.h>
#include "WatchDog.hpp" #include "WatchDog.hpp"
#include "GlobalData.hpp" #include "GlobalData.hpp"
#include <NdbOut.hpp> #include <NdbOut.hpp>
...@@ -24,7 +27,9 @@ ...@@ -24,7 +27,9 @@
extern "C" extern "C"
void* void*
runWatchDog(void* w){ runWatchDog(void* w){
my_thread_init();
((WatchDog*)w)->run(); ((WatchDog*)w)->run();
my_thread_end();
NdbThread_Exit(0); NdbThread_Exit(0);
return NULL; return NULL;
} }
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <ndb_global.h> #include <ndb_global.h>
#include <my_pthread.h>
#include <ndb_limits.h> #include <ndb_limits.h>
#include <ndb_version.h> #include <ndb_version.h>
...@@ -64,7 +65,8 @@ ClusterMgr::ClusterMgr(TransporterFacade & _facade): ...@@ -64,7 +65,8 @@ ClusterMgr::ClusterMgr(TransporterFacade & _facade):
{ {
ndbSetOwnVersion(); ndbSetOwnVersion();
clusterMgrThreadMutex = NdbMutex_Create(); clusterMgrThreadMutex = NdbMutex_Create();
noOfConnectedNodes = 0; noOfConnectedNodes= 0;
theClusterMgrThread= 0;
} }
ClusterMgr::~ClusterMgr(){ ClusterMgr::~ClusterMgr(){
...@@ -137,20 +139,21 @@ ClusterMgr::startThread() { ...@@ -137,20 +139,21 @@ ClusterMgr::startThread() {
void void
ClusterMgr::doStop( ){ ClusterMgr::doStop( ){
DBUG_ENTER("ClusterMgr::doStop");
NdbMutex_Lock(clusterMgrThreadMutex); NdbMutex_Lock(clusterMgrThreadMutex);
if(theStop){ if(theStop){
NdbMutex_Unlock(clusterMgrThreadMutex); NdbMutex_Unlock(clusterMgrThreadMutex);
return; DBUG_VOID_RETURN;
} }
void *status; void *status;
theStop = 1; theStop = 1;
if (theClusterMgrThread) {
NdbThread_WaitFor(theClusterMgrThread, &status); NdbThread_WaitFor(theClusterMgrThread, &status);
NdbThread_Destroy(&theClusterMgrThread); NdbThread_Destroy(&theClusterMgrThread);
theClusterMgrThread= 0;
}
NdbMutex_Unlock(clusterMgrThreadMutex); NdbMutex_Unlock(clusterMgrThreadMutex);
DBUG_VOID_RETURN;
} }
void void
...@@ -524,6 +527,7 @@ ArbitMgr::doChoose(const Uint32* theData) ...@@ -524,6 +527,7 @@ ArbitMgr::doChoose(const Uint32* theData)
void void
ArbitMgr::doStop(const Uint32* theData) ArbitMgr::doStop(const Uint32* theData)
{ {
DBUG_ENTER("ArbitMgr::doStop");
ArbitSignal aSignal; ArbitSignal aSignal;
NdbMutex_Lock(theThreadMutex); NdbMutex_Lock(theThreadMutex);
if (theThread != NULL) { if (theThread != NULL) {
...@@ -540,6 +544,7 @@ ArbitMgr::doStop(const Uint32* theData) ...@@ -540,6 +544,7 @@ ArbitMgr::doStop(const Uint32* theData)
theState = StateInit; theState = StateInit;
} }
NdbMutex_Unlock(theThreadMutex); NdbMutex_Unlock(theThreadMutex);
DBUG_VOID_RETURN;
} }
// private methods // private methods
...@@ -548,7 +553,9 @@ extern "C" ...@@ -548,7 +553,9 @@ extern "C"
void* void*
runArbitMgr_C(void* me) runArbitMgr_C(void* me)
{ {
my_thread_init();
((ArbitMgr*) me)->threadMain(); ((ArbitMgr*) me)->threadMain();
my_thread_end();
NdbThread_Exit(0); NdbThread_Exit(0);
return NULL; return NULL;
} }
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <ndb_global.h> #include <ndb_global.h>
#include <my_pthread.h>
#include <ndb_limits.h> #include <ndb_limits.h>
#include "TransporterFacade.hpp" #include "TransporterFacade.hpp"
#include "ClusterMgr.hpp" #include "ClusterMgr.hpp"
...@@ -329,14 +330,6 @@ copy(Uint32 * & insertPtr, ...@@ -329,14 +330,6 @@ copy(Uint32 * & insertPtr,
abort(); abort();
} }
extern "C"
void
atexit_stop_instance(){
DBUG_ENTER("atexit_stop_instance");
TransporterFacade::stop_instance();
DBUG_VOID_RETURN;
}
/** /**
* Note that this function need no locking since its * Note that this function need no locking since its
* only called from the constructor of Ndb (the NdbObject) * only called from the constructor of Ndb (the NdbObject)
...@@ -352,11 +345,6 @@ TransporterFacade::start_instance(int nodeId, ...@@ -352,11 +345,6 @@ TransporterFacade::start_instance(int nodeId,
return -1; return -1;
} }
/**
* Install atexit handler
*/
atexit(atexit_stop_instance);
/** /**
* Install signal handler for SIGPIPE * Install signal handler for SIGPIPE
* *
...@@ -379,14 +367,8 @@ TransporterFacade::start_instance(int nodeId, ...@@ -379,14 +367,8 @@ TransporterFacade::start_instance(int nodeId,
void void
TransporterFacade::stop_instance(){ TransporterFacade::stop_instance(){
DBUG_ENTER("TransporterFacade::stop_instance"); DBUG_ENTER("TransporterFacade::stop_instance");
if(theFacadeInstance == NULL){ if(theFacadeInstance)
/** theFacadeInstance->doStop();
* We are called from atexit function
*/
DBUG_VOID_RETURN;
}
theFacadeInstance->doStop();
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -405,10 +387,16 @@ TransporterFacade::doStop(){ ...@@ -405,10 +387,16 @@ TransporterFacade::doStop(){
*/ */
void *status; void *status;
theStopReceive = 1; theStopReceive = 1;
NdbThread_WaitFor(theReceiveThread, &status); if (theReceiveThread) {
NdbThread_WaitFor(theSendThread, &status); NdbThread_WaitFor(theReceiveThread, &status);
NdbThread_Destroy(&theReceiveThread); NdbThread_Destroy(&theReceiveThread);
NdbThread_Destroy(&theSendThread); theReceiveThread= 0;
}
if (theSendThread) {
NdbThread_WaitFor(theSendThread, &status);
NdbThread_Destroy(&theSendThread);
theSendThread= 0;
}
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -416,7 +404,9 @@ extern "C" ...@@ -416,7 +404,9 @@ extern "C"
void* void*
runSendRequest_C(void * me) runSendRequest_C(void * me)
{ {
my_thread_init();
((TransporterFacade*) me)->threadMainSend(); ((TransporterFacade*) me)->threadMainSend();
my_thread_end();
NdbThread_Exit(0); NdbThread_Exit(0);
return me; return me;
} }
...@@ -459,7 +449,9 @@ extern "C" ...@@ -459,7 +449,9 @@ extern "C"
void* void*
runReceiveResponse_C(void * me) runReceiveResponse_C(void * me)
{ {
my_thread_init();
((TransporterFacade*) me)->threadMainReceive(); ((TransporterFacade*) me)->threadMainReceive();
my_thread_end();
NdbThread_Exit(0); NdbThread_Exit(0);
return me; return me;
} }
......
...@@ -15,16 +15,19 @@ ...@@ -15,16 +15,19 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <ndb_global.h> #include <ndb_global.h>
#include <pthread.h> #include <my_pthread.h>
#include <ndb_cluster_connection.hpp> #include <ndb_cluster_connection.hpp>
#include <TransporterFacade.hpp> #include <TransporterFacade.hpp>
#include <NdbOut.hpp> #include <NdbOut.hpp>
#include <NdbSleep.h> #include <NdbSleep.h>
#include <NdbThread.h>
#include <ndb_limits.h> #include <ndb_limits.h>
#include <ConfigRetriever.hpp> #include <ConfigRetriever.hpp>
#include <ndb_version.h> #include <ndb_version.h>
static int g_run_connect_thread= 0;
Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string) Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string)
{ {
m_facade= TransporterFacade::theFacadeInstance= new TransporterFacade(); m_facade= TransporterFacade::theFacadeInstance= new TransporterFacade();
...@@ -33,26 +36,75 @@ Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string) ...@@ -33,26 +36,75 @@ Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string)
else else
m_connect_string= 0; m_connect_string= 0;
m_config_retriever= 0; m_config_retriever= 0;
m_connect_thread= 0;
m_connect_callback= 0;
} }
int Ndb_cluster_connection::connect() extern "C" pthread_handler_decl(run_ndb_cluster_connection_connect_thread, me)
{ {
DBUG_ENTER("Ndb_cluster_connection::connect"); my_thread_init();
if (m_config_retriever != 0) { g_run_connect_thread= 1;
DBUG_RETURN(0); ((Ndb_cluster_connection*) me)->connect_thread();
my_thread_end();
NdbThread_Exit(0);
return me;
}
void Ndb_cluster_connection::connect_thread()
{
DBUG_ENTER("Ndb_cluster_connection::connect_thread");
int r;
while (g_run_connect_thread) {
if ((r = connect(1)) == 0)
break;
if (r == -1) {
printf("Ndb_cluster_connection::connect_thread error\n");
abort();
}
} }
if (m_connect_callback)
(*m_connect_callback)();
DBUG_VOID_RETURN;
}
m_config_retriever= new ConfigRetriever(NDB_VERSION, NODE_TYPE_API); int Ndb_cluster_connection::start_connect_thread(int (*connect_callback)(void))
m_config_retriever->setConnectString(m_connect_string); {
DBUG_ENTER("Ndb_cluster_connection::start_connect_thread");
m_connect_callback= connect_callback;
m_connect_thread= NdbThread_Create(run_ndb_cluster_connection_connect_thread,
(void**)this,
32768,
"ndb_cluster_connection",
NDB_THREAD_PRIO_LOW);
DBUG_RETURN(0);
}
int Ndb_cluster_connection::connect(int reconnect)
{
DBUG_ENTER("Ndb_cluster_connection::connect");
const char* error = 0; const char* error = 0;
do { do {
if(m_config_retriever->init() == -1) if (m_config_retriever == 0)
break; {
m_config_retriever= new ConfigRetriever(NDB_VERSION, NODE_TYPE_API);
if(m_config_retriever->do_connect() == -1) m_config_retriever->setConnectString(m_connect_string);
break; if(m_config_retriever->init() == -1)
break;
}
else
if (reconnect == 0)
DBUG_RETURN(0);
if (reconnect)
{
int r= m_config_retriever->do_connect(1);
if (r == 1)
DBUG_RETURN(1); // mgmt server not up yet
if (r == -1)
break;
}
else
if(m_config_retriever->do_connect() == -1)
break;
Uint32 nodeId = m_config_retriever->allocNodeId(); Uint32 nodeId = m_config_retriever->allocNodeId();
for(Uint32 i = 0; nodeId == 0 && i<5; i++){ for(Uint32 i = 0; nodeId == 0 && i<5; i++){
NdbSleep_SecSleep(3); NdbSleep_SecSleep(3);
...@@ -60,15 +112,12 @@ int Ndb_cluster_connection::connect() ...@@ -60,15 +112,12 @@ int Ndb_cluster_connection::connect()
} }
if(nodeId == 0) if(nodeId == 0)
break; break;
ndb_mgm_configuration * props = m_config_retriever->getConfig(); ndb_mgm_configuration * props = m_config_retriever->getConfig();
if(props == 0) if(props == 0)
break; break;
m_facade->start_instance(nodeId, props); m_facade->start_instance(nodeId, props);
free(props); free(props);
m_facade->connected(); m_facade->connected();
DBUG_RETURN(0); DBUG_RETURN(0);
} while(0); } while(0);
...@@ -83,7 +132,16 @@ int Ndb_cluster_connection::connect() ...@@ -83,7 +132,16 @@ int Ndb_cluster_connection::connect()
Ndb_cluster_connection::~Ndb_cluster_connection() Ndb_cluster_connection::~Ndb_cluster_connection()
{ {
if (m_facade != 0) { if (m_connect_thread)
{
void *status;
g_run_connect_thread= 0;
NdbThread_WaitFor(m_connect_thread, &status);
NdbThread_Destroy(&m_connect_thread);
m_connect_thread= 0;
}
if (m_facade != 0)
{
delete m_facade; delete m_facade;
if (m_facade != TransporterFacade::theFacadeInstance) if (m_facade != TransporterFacade::theFacadeInstance)
abort(); abort();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment