Commit cea15340 authored by tomas@poseidon.(none)'s avatar tomas@poseidon.(none)

added connect thread to Ndb_cluster_connection +

some other small fixes
parent 12c40078
......@@ -37,7 +37,7 @@ public:
*/
int init();
int do_connect();
int do_connect(int exit_on_connect_failure= false);
/**
* Get configuration for current (nodeId given in local config file) node.
......
......@@ -21,6 +21,7 @@
extern "C" {
#endif
const char* NdbConfig_get_path(int *len);
void NdbConfig_SetPath(const char *path);
char* NdbConfig_NdbCfgName(int with_ndb_home);
char* NdbConfig_ErrorFileName(int node_id);
......
......@@ -20,16 +20,26 @@
class TransporterFacade;
class ConfigRetriever;
class NdbThread;
extern "C" {
void* run_ndb_cluster_connection_connect_thread(void*);
}
class Ndb_cluster_connection {
public:
Ndb_cluster_connection(const char * connect_string = 0);
~Ndb_cluster_connection();
int connect();
int connect(int reconnect= 0);
int start_connect_thread(int (*connect_callback)(void)= 0);
private:
friend void* run_ndb_cluster_connection_connect_thread(void*);
void connect_thread();
char *m_connect_string;
TransporterFacade *m_facade;
ConfigRetriever *m_config_retriever;
NdbThread *m_connect_thread;
int (*m_connect_callback)(void);
};
#endif
......@@ -78,7 +78,7 @@ ConfigRetriever::init() {
}
int
ConfigRetriever::do_connect(){
ConfigRetriever::do_connect(int exit_on_connect_failure){
if(!m_handle)
m_handle= ndb_mgm_create_handle();
......@@ -102,6 +102,8 @@ ConfigRetriever::do_connect(){
if (ndb_mgm_connect(m_handle, tmp.c_str()) == 0) {
return 0;
}
if (exit_on_connect_failure)
return 1;
setError(CR_RETRY, ndb_mgm_get_latest_error_desc(m_handle));
case MgmId_File:
break;
......
......@@ -21,27 +21,34 @@
static char *datadir_path= 0;
static char*
NdbConfig_AllocHomePath(int _len)
const char *
NdbConfig_get_path(int *_len)
{
const char *path= NdbEnv_GetEnv("NDB_HOME", 0, 0);
int len= _len;
int path_len= 0;
char *buf;
if (path == 0)
path= datadir_path;
if (path)
path_len= strlen(path);
if (path_len == 0 && datadir_path) {
path= datadir_path;
path_len= strlen(path);
}
if (path_len == 0) {
path= ".";
path_len= strlen(path);
}
if (_len)
*_len= path_len;
return path;
}
len+= path_len;
buf= NdbMem_Allocate(len);
if (path_len > 0)
snprintf(buf, len, "%s%s", path, DIR_SEPARATOR);
else
buf[0]= 0;
static char*
NdbConfig_AllocHomePath(int _len)
{
int path_len;
const char *path= NdbConfig_get_path(&path_len);
int len= _len+path_len;
char *buf= NdbMem_Allocate(len);
snprintf(buf, len, "%s%s", path, DIR_SEPARATOR);
return buf;
}
......
......@@ -74,6 +74,8 @@ NDB_MAIN(ndb_kernel){
theConfig->fetch_configuration();
}
chdir(NdbConfig_get_path(0));
if (theConfig->getDaemonMode()) {
// Become a daemon
char *lockfile= NdbConfig_PidFileName(globalData.ownId);
......
......@@ -15,6 +15,9 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <ndb_global.h>
#include <my_pthread.h>
#include "WatchDog.hpp"
#include "GlobalData.hpp"
#include <NdbOut.hpp>
......@@ -24,7 +27,9 @@
extern "C"
void*
runWatchDog(void* w){
my_thread_init();
((WatchDog*)w)->run();
my_thread_end();
NdbThread_Exit(0);
return NULL;
}
......
......@@ -15,6 +15,7 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <ndb_global.h>
#include <my_pthread.h>
#include <ndb_limits.h>
#include <ndb_version.h>
......@@ -64,7 +65,8 @@ ClusterMgr::ClusterMgr(TransporterFacade & _facade):
{
ndbSetOwnVersion();
clusterMgrThreadMutex = NdbMutex_Create();
noOfConnectedNodes = 0;
noOfConnectedNodes= 0;
theClusterMgrThread= 0;
}
ClusterMgr::~ClusterMgr(){
......@@ -137,20 +139,21 @@ ClusterMgr::startThread() {
void
ClusterMgr::doStop( ){
DBUG_ENTER("ClusterMgr::doStop");
NdbMutex_Lock(clusterMgrThreadMutex);
if(theStop){
NdbMutex_Unlock(clusterMgrThreadMutex);
return;
DBUG_VOID_RETURN;
}
void *status;
theStop = 1;
NdbThread_WaitFor(theClusterMgrThread, &status);
NdbThread_Destroy(&theClusterMgrThread);
if (theClusterMgrThread) {
NdbThread_WaitFor(theClusterMgrThread, &status);
NdbThread_Destroy(&theClusterMgrThread);
theClusterMgrThread= 0;
}
NdbMutex_Unlock(clusterMgrThreadMutex);
DBUG_VOID_RETURN;
}
void
......@@ -524,6 +527,7 @@ ArbitMgr::doChoose(const Uint32* theData)
void
ArbitMgr::doStop(const Uint32* theData)
{
DBUG_ENTER("ArbitMgr::doStop");
ArbitSignal aSignal;
NdbMutex_Lock(theThreadMutex);
if (theThread != NULL) {
......@@ -540,6 +544,7 @@ ArbitMgr::doStop(const Uint32* theData)
theState = StateInit;
}
NdbMutex_Unlock(theThreadMutex);
DBUG_VOID_RETURN;
}
// private methods
......@@ -548,7 +553,9 @@ extern "C"
void*
runArbitMgr_C(void* me)
{
my_thread_init();
((ArbitMgr*) me)->threadMain();
my_thread_end();
NdbThread_Exit(0);
return NULL;
}
......
......@@ -15,6 +15,7 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <ndb_global.h>
#include <my_pthread.h>
#include <ndb_limits.h>
#include "TransporterFacade.hpp"
#include "ClusterMgr.hpp"
......@@ -329,14 +330,6 @@ copy(Uint32 * & insertPtr,
abort();
}
extern "C"
void
atexit_stop_instance(){
DBUG_ENTER("atexit_stop_instance");
TransporterFacade::stop_instance();
DBUG_VOID_RETURN;
}
/**
* Note that this function need no locking since its
* only called from the constructor of Ndb (the NdbObject)
......@@ -352,11 +345,6 @@ TransporterFacade::start_instance(int nodeId,
return -1;
}
/**
* Install atexit handler
*/
atexit(atexit_stop_instance);
/**
* Install signal handler for SIGPIPE
*
......@@ -379,14 +367,8 @@ TransporterFacade::start_instance(int nodeId,
void
TransporterFacade::stop_instance(){
DBUG_ENTER("TransporterFacade::stop_instance");
if(theFacadeInstance == NULL){
/**
* We are called from atexit function
*/
DBUG_VOID_RETURN;
}
theFacadeInstance->doStop();
if(theFacadeInstance)
theFacadeInstance->doStop();
DBUG_VOID_RETURN;
}
......@@ -405,10 +387,16 @@ TransporterFacade::doStop(){
*/
void *status;
theStopReceive = 1;
NdbThread_WaitFor(theReceiveThread, &status);
NdbThread_WaitFor(theSendThread, &status);
NdbThread_Destroy(&theReceiveThread);
NdbThread_Destroy(&theSendThread);
if (theReceiveThread) {
NdbThread_WaitFor(theReceiveThread, &status);
NdbThread_Destroy(&theReceiveThread);
theReceiveThread= 0;
}
if (theSendThread) {
NdbThread_WaitFor(theSendThread, &status);
NdbThread_Destroy(&theSendThread);
theSendThread= 0;
}
DBUG_VOID_RETURN;
}
......@@ -416,7 +404,9 @@ extern "C"
void*
runSendRequest_C(void * me)
{
my_thread_init();
((TransporterFacade*) me)->threadMainSend();
my_thread_end();
NdbThread_Exit(0);
return me;
}
......@@ -459,7 +449,9 @@ extern "C"
void*
runReceiveResponse_C(void * me)
{
my_thread_init();
((TransporterFacade*) me)->threadMainReceive();
my_thread_end();
NdbThread_Exit(0);
return me;
}
......
......@@ -15,16 +15,19 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <ndb_global.h>
#include <pthread.h>
#include <my_pthread.h>
#include <ndb_cluster_connection.hpp>
#include <TransporterFacade.hpp>
#include <NdbOut.hpp>
#include <NdbSleep.h>
#include <NdbThread.h>
#include <ndb_limits.h>
#include <ConfigRetriever.hpp>
#include <ndb_version.h>
static int g_run_connect_thread= 0;
Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string)
{
m_facade= TransporterFacade::theFacadeInstance= new TransporterFacade();
......@@ -33,26 +36,75 @@ Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string)
else
m_connect_string= 0;
m_config_retriever= 0;
m_connect_thread= 0;
m_connect_callback= 0;
}
int Ndb_cluster_connection::connect()
extern "C" pthread_handler_decl(run_ndb_cluster_connection_connect_thread, me)
{
DBUG_ENTER("Ndb_cluster_connection::connect");
if (m_config_retriever != 0) {
DBUG_RETURN(0);
my_thread_init();
g_run_connect_thread= 1;
((Ndb_cluster_connection*) me)->connect_thread();
my_thread_end();
NdbThread_Exit(0);
return me;
}
void Ndb_cluster_connection::connect_thread()
{
DBUG_ENTER("Ndb_cluster_connection::connect_thread");
int r;
while (g_run_connect_thread) {
if ((r = connect(1)) == 0)
break;
if (r == -1) {
printf("Ndb_cluster_connection::connect_thread error\n");
abort();
}
}
if (m_connect_callback)
(*m_connect_callback)();
DBUG_VOID_RETURN;
}
m_config_retriever= new ConfigRetriever(NDB_VERSION, NODE_TYPE_API);
m_config_retriever->setConnectString(m_connect_string);
int Ndb_cluster_connection::start_connect_thread(int (*connect_callback)(void))
{
DBUG_ENTER("Ndb_cluster_connection::start_connect_thread");
m_connect_callback= connect_callback;
m_connect_thread= NdbThread_Create(run_ndb_cluster_connection_connect_thread,
(void**)this,
32768,
"ndb_cluster_connection",
NDB_THREAD_PRIO_LOW);
DBUG_RETURN(0);
}
int Ndb_cluster_connection::connect(int reconnect)
{
DBUG_ENTER("Ndb_cluster_connection::connect");
const char* error = 0;
do {
if(m_config_retriever->init() == -1)
break;
if(m_config_retriever->do_connect() == -1)
break;
if (m_config_retriever == 0)
{
m_config_retriever= new ConfigRetriever(NDB_VERSION, NODE_TYPE_API);
m_config_retriever->setConnectString(m_connect_string);
if(m_config_retriever->init() == -1)
break;
}
else
if (reconnect == 0)
DBUG_RETURN(0);
if (reconnect)
{
int r= m_config_retriever->do_connect(1);
if (r == 1)
DBUG_RETURN(1); // mgmt server not up yet
if (r == -1)
break;
}
else
if(m_config_retriever->do_connect() == -1)
break;
Uint32 nodeId = m_config_retriever->allocNodeId();
for(Uint32 i = 0; nodeId == 0 && i<5; i++){
NdbSleep_SecSleep(3);
......@@ -60,15 +112,12 @@ int Ndb_cluster_connection::connect()
}
if(nodeId == 0)
break;
ndb_mgm_configuration * props = m_config_retriever->getConfig();
if(props == 0)
break;
m_facade->start_instance(nodeId, props);
free(props);
m_facade->connected();
DBUG_RETURN(0);
} while(0);
......@@ -83,7 +132,16 @@ int Ndb_cluster_connection::connect()
Ndb_cluster_connection::~Ndb_cluster_connection()
{
if (m_facade != 0) {
if (m_connect_thread)
{
void *status;
g_run_connect_thread= 0;
NdbThread_WaitFor(m_connect_thread, &status);
NdbThread_Destroy(&m_connect_thread);
m_connect_thread= 0;
}
if (m_facade != 0)
{
delete m_facade;
if (m_facade != TransporterFacade::theFacadeInstance)
abort();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment