removed NdbMgmHandle from TransporterRegistry constructor

changed to set NdbMgmHandle from IPPConfig, and use info from configuration, instead of mgm handle from config retrieval
parent 6e53d8f7
......@@ -97,12 +97,11 @@ public:
/**
* Constructor
*/
TransporterRegistry(NdbMgmHandle mgm_handle=NULL,
void * callback = 0 ,
TransporterRegistry(void * callback = 0 ,
unsigned maxTransporters = MAX_NTRANSPORTERS,
unsigned sizeOfLongSignalMemory = 100);
void set_mgm_handle(NdbMgmHandle h) { m_mgm_handle = h; };
void set_mgm_handle(NdbMgmHandle h);
NdbMgmHandle get_mgm_handle(void) { return m_mgm_handle; };
bool init(NodeId localNodeId);
......
......@@ -170,6 +170,35 @@ IPCConfig::configureTransporters(Uint32 nodeId,
DBUG_ENTER("IPCConfig::configureTransporters");
/**
* Iterate over all MGM's an construct a connectstring
* create mgm_handle and give it to the Transporter Registry
*/
{
const char *separator= "";
BaseString connect_string;
ndb_mgm_configuration_iterator iter(config, CFG_SECTION_NODE);
for(iter.first(); iter.valid(); iter.next())
{
Uint32 type;
if(iter.get(CFG_TYPE_OF_SECTION, &type)) continue;
if(type != NODE_TYPE_MGM) continue;
const char* hostname;
Uint32 port;
if(iter.get(CFG_NODE_HOST, &hostname)) continue;
if( strlen(hostname) == 0 ) continue;
if(iter.get(CFG_MGM_PORT, &port)) continue;
connect_string.appfmt("%s%s:port",separator,hostname,port);
separator= ",";
}
NdbMgmHandle h= ndb_mgm_create_handle();
if ( h && connect_string.length() > 0 )
{
ndb_mgm_set_connectstring(h,connect_string.c_str());
tr.set_mgm_handle(h);
}
}
Uint32 noOfTransportersCreated= 0;
ndb_mgm_configuration_iterator iter(config, CFG_SECTION_CONNECTION);
......
......@@ -71,15 +71,14 @@ SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
DBUG_RETURN(0);
}
TransporterRegistry::TransporterRegistry(NdbMgmHandle mgm_handle,
void * callback,
TransporterRegistry::TransporterRegistry(void * callback,
unsigned _maxTransporters,
unsigned sizeOfLongSignalMemory) {
nodeIdSpecified = false;
maxTransporters = _maxTransporters;
sendCounter = 1;
m_mgm_handle = mgm_handle;
m_mgm_handle= 0;
callbackObj=callback;
......@@ -114,6 +113,27 @@ TransporterRegistry::TransporterRegistry(NdbMgmHandle mgm_handle,
theOSEJunkSocketRecv = 0;
}
void TransporterRegistry::set_mgm_handle(NdbMgmHandle h)
{
DBUG_ENTER("TransporterRegistry::set_mgm_handle");
if (m_mgm_handle)
ndb_mgm_destroy_handle(&m_mgm_handle);
m_mgm_handle= h;
#ifndef DBUG_OFF
if (h)
{
char buf[256];
DBUG_PRINT("info",("handle set with connectstring: %s",
ndb_mgm_get_connectstring(h,buf, sizeof(buf))));
}
else
{
DBUG_PRINT("info",("handle set to NULL"));
}
#endif
DBUG_VOID_RETURN;
};
TransporterRegistry::~TransporterRegistry() {
removeAll();
......@@ -134,6 +154,8 @@ TransporterRegistry::~TransporterRegistry() {
theOSEReceiver = 0;
}
#endif
if (m_mgm_handle)
ndb_mgm_destroy_handle(&m_mgm_handle);
}
void
......@@ -1211,40 +1233,32 @@ TransporterRegistry::start_clients_thread()
switch(performStates[nodeId]){
case CONNECTING:
if(!t->isConnected() && !t->isServer) {
int result= 0;
bool connected= false;
/**
* First, we try to connect (if we have a port number).
*/
if (t->get_s_port())
result= t->connect_client();
connected= t->connect_client();
if (result<0 && t->get_s_port()!=0)
g_eventLogger.warning("Error while trying to make connection "
"(Node %u to %u via port %u) "
"error: %d. Retrying...",
t->getRemoteNodeId(),
t->getLocalNodeId(),
t->get_s_port());
/**
* If dynamic, get the port for connecting from the management server
*/
if(t->get_s_port() <= 0) { // Port is dynamic
if( !connected && t->get_s_port() <= 0) { // Port is dynamic
int server_port= 0;
struct ndb_mgm_reply mgm_reply;
int res;
int res= -1;
if(!ndb_mgm_is_connected(m_mgm_handle))
if(ndb_mgm_connect(m_mgm_handle, 0, 0, 0)<0)
ndbout_c("Failed to reconnect to management server");
res= ndb_mgm_get_connection_int_parameter(m_mgm_handle,
t->getRemoteNodeId(),
t->getLocalNodeId(),
CFG_CONNECTION_SERVER_PORT,
&server_port,
&mgm_reply);
else
res=
ndb_mgm_get_connection_int_parameter(m_mgm_handle,
t->getRemoteNodeId(),
t->getLocalNodeId(),
CFG_CONNECTION_SERVER_PORT,
&server_port,
&mgm_reply);
DBUG_PRINT("info",("Got dynamic port %d for %d -> %d (ret: %d)",
server_port,t->getRemoteNodeId(),
t->getLocalNodeId(),res));
......
......@@ -92,10 +92,6 @@ int main(int argc, char** argv)
}
}
globalTransporterRegistry.set_mgm_handle(theConfig
->get_config_retriever()
->get_mgmHandle());
#ifndef NDB_WIN32
for(pid_t child = fork(); child != 0; child = fork()){
/**
......
......@@ -576,7 +576,7 @@ MgmtSrvr::start(BaseString &error_string)
}
}
theFacade= TransporterFacade::theFacadeInstance
= new TransporterFacade(m_config_retriever->get_mgmHandle());
= new TransporterFacade();
if(theFacade == 0) {
DEBUG("MgmtSrvr.cpp: theFacade is NULL.");
......
......@@ -84,7 +84,9 @@ NdbTransaction* Ndb::doConnect(Uint32 tConNode)
} else if (TretCode != 0) {
tAnyAlive= 1;
}//if
DBUG_PRINT("info",("tried node %d TretCode %d", tNode, TretCode));
DBUG_PRINT("info",("tried node %d, TretCode %d, error code %d, %s",
tNode, TretCode, getNdbError().code,
getNdbError().message));
}
}
else // just do a regular round robin
......
......@@ -1420,14 +1420,18 @@ Remark: Sets TC Connect pointer.
int
NdbTransaction::receiveTCSEIZEREF(NdbApiSignal* aSignal)
{
DBUG_ENTER("NdbTransaction::receiveTCSEIZEREF");
if (theStatus != Connecting)
{
return -1;
DBUG_RETURN(-1);
} else
{
theStatus = ConnectFailure;
theNdb->theError.code = aSignal->readData(2);
return 0;
DBUG_PRINT("info",("error code %d, %s",
theNdb->getNdbError().code,
theNdb->getNdbError().message));
DBUG_RETURN(0);
}
}//NdbTransaction::receiveTCSEIZEREF()
......
......@@ -466,8 +466,7 @@ void TransporterFacade::threadMainReceive(void)
theTransporterRegistry->stopReceiving();
}
TransporterFacade::TransporterFacade(NdbMgmHandle mgm_handle) :
m_mgm_handle(mgm_handle),
TransporterFacade::TransporterFacade() :
theTransporterRegistry(0),
theStopReceive(0),
theSendThread(NULL),
......@@ -496,7 +495,7 @@ bool
TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props)
{
theOwnId = nodeId;
theTransporterRegistry = new TransporterRegistry(m_mgm_handle,this);
theTransporterRegistry = new TransporterRegistry(this);
const int res = IPCConfig::configureTransporters(nodeId,
* props,
......
......@@ -47,7 +47,7 @@ extern "C" {
class TransporterFacade
{
public:
TransporterFacade(NdbMgmHandle mgm_handle);
TransporterFacade();
virtual ~TransporterFacade();
bool init(Uint32, const ndb_mgm_configuration *);
......@@ -133,7 +133,6 @@ private:
bool isConnected(NodeId aNodeId);
void doStop();
NdbMgmHandle m_mgm_handle;
TransporterRegistry* theTransporterRegistry;
SocketServer m_socket_server;
int sendPerformedLastInterval;
......
......@@ -284,7 +284,7 @@ Ndb_cluster_connection_impl::Ndb_cluster_connection_impl(const char *
m_transporter_facade=
TransporterFacade::theFacadeInstance=
new TransporterFacade(m_config_retriever->get_mgmHandle());
new TransporterFacade();
DBUG_VOID_RETURN;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment