Commit dc103d94 authored by stewart@mysql.com's avatar stewart@mysql.com

Impl 2 of WL2278 - Dynamic port allocation of cluster nodes.

In "client connect thread", let the client read the port to connect to using
ndb_mgm_get_connection_int_parameter.

The request for the port is resent on every connect attempt.
parent a7608be2
......@@ -165,7 +165,7 @@ extern "C" {
int node1,
int node2,
int param,
unsigned *value,
Uint32 *value,
struct ndb_mgm_reply* reply);
#ifdef __cplusplus
......
......@@ -33,6 +33,8 @@
#include <NdbTCP.h>
#include <mgmapi/mgmapi.h>
// A transporter is always in an IOState.
// NoHalt is used initially and as long as it is no restrictions on
// sending or receiving.
......@@ -94,10 +96,13 @@ public:
/**
* Constructor
*/
TransporterRegistry(void * callback = 0 ,
TransporterRegistry(NdbMgmHandle mgm_handle=NULL,
void * callback = 0 ,
unsigned maxTransporters = MAX_NTRANSPORTERS,
unsigned sizeOfLongSignalMemory = 100);
void set_mgm_handle(NdbMgmHandle h) { m_mgm_handle = h; };
bool init(NodeId localNodeId);
/**
......@@ -236,6 +241,8 @@ protected:
private:
void * callbackObj;
NdbMgmHandle m_mgm_handle;
struct NdbThread *m_start_clients_thread;
bool m_run_start_clients_thread;
......
......@@ -69,6 +69,11 @@ public:
*/
NodeId getLocalNodeId() const;
/**
* Set r_port to connect to
*/
void set_r_port(unsigned int port) { m_r_port = port; };
protected:
Transporter(TransporterRegistry &,
const char *lHostName,
......@@ -101,7 +106,7 @@ protected:
struct in_addr remoteHostAddress;
struct in_addr localHostAddress;
const unsigned int m_r_port;
unsigned int m_r_port;
const NodeId remoteNodeId;
const NodeId localNodeId;
......@@ -149,7 +154,7 @@ Transporter::getRemoteNodeId() const {
inline
NodeId
Transporter::getLocalNodeId() const {
return remoteNodeId;
return localNodeId;
}
inline
......
......@@ -47,6 +47,8 @@
#include <InputStream.hpp>
#include <OutputStream.hpp>
#include <mgmapi/mgmapi_debug.h>
int g_shm_pid = 0;
SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
......@@ -105,13 +107,15 @@ SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
DBUG_RETURN(0);
}
TransporterRegistry::TransporterRegistry(void * callback,
TransporterRegistry::TransporterRegistry(NdbMgmHandle mgm_handle,
void * callback,
unsigned _maxTransporters,
unsigned sizeOfLongSignalMemory) {
nodeIdSpecified = false;
maxTransporters = _maxTransporters;
sendCounter = 1;
m_mgm_handle = mgm_handle;
callbackObj=callback;
......@@ -1136,8 +1140,27 @@ TransporterRegistry::start_clients_thread()
const NodeId nodeId = t->getRemoteNodeId();
switch(performStates[nodeId]){
case CONNECTING:
if(!t->isConnected() && !t->isServer)
if(!t->isConnected() && !t->isServer) {
if(server_port <= 0) { // Port is dynamic
Uint32 server_port=0;
struct ndb_mgm_reply mgm_reply;
int res;
res=ndb_mgm_get_connection_int_parameter(m_mgm_handle,
t->getRemoteNodeId(),
t->getLocalNodeId(),
CFG_CONNECTION_SERVER_PORT,
&server_port,
&mgm_reply);
DBUG_PRINT("info",("Got dynamic port %u for %d -> %d (ret: %d)",
server_port,t->getRemoteNodeId(),
t->getLocalNodeId()));
if(res>=0)
t->set_r_port(server_port);
else
ndbout_c("Failed to get dynamic port to connect to.");
}
t->connect_client();
}
break;
case DISCONNECTING:
if(t->isConnected())
......
......@@ -92,6 +92,10 @@ int main(int argc, char** argv)
}
}
globalTransporterRegistry.set_mgm_handle(theConfig
->get_config_retriever()
->get_mgmHandle());
#ifndef NDB_WIN32
for(pid_t child = fork(); child != 0; child = fork()){
/**
......
......@@ -307,14 +307,17 @@ ndb_mgm_call(NdbMgmHandle handle, const ParserRow<ParserDummy> *command_reply,
ParserDummy session(handle->socket);
Parser_t parser(command_reply, in, true, true, true);
#if 1
const Properties* p = parser.parse(ctx, session);
if (p == NULL){
/**
* Print some info about why the parser returns NULL
*/
//ndbout << " status=" << ctx.m_status << ", curr="
//<< ctx.m_currentToken << endl;
ndbout << "Error in mgm protocol parser. "
<< "cmd: '" << cmd
<< "' status=" << ctx.m_status
<< ", curr=" << ctx.m_currentToken
<< endl;
DBUG_PRINT("info",("parser.parse returned NULL"));
}
#ifdef MGMAPI_LOG
else {
......@@ -325,9 +328,6 @@ ndb_mgm_call(NdbMgmHandle handle, const ParserRow<ParserDummy> *command_reply,
}
#endif
return p;
#else
return parser.parse(ctx, session);
#endif
}
/**
......@@ -1998,7 +1998,8 @@ ndb_mgm_set_connection_int_parameter(NdbMgmHandle handle,
const ParserRow<ParserDummy> reply[]= {
MGM_CMD("set connection parameter reply", NULL, ""),
MGM_ARG("result", String, Mandatory, "Error message"),
MGM_ARG("message", String, Mandatory, "Error Message"),
MGM_ARG("result", String, Mandatory, "Status Result"),
MGM_END()
};
......@@ -2026,7 +2027,7 @@ ndb_mgm_get_connection_int_parameter(NdbMgmHandle handle,
int node1,
int node2,
int param,
unsigned *value,
Uint32 *value,
struct ndb_mgm_reply* mgmreply){
DBUG_ENTER("ndb_mgm_get_connection_int_parameter");
CHECK_HANDLE(handle, -1);
......@@ -2039,14 +2040,14 @@ ndb_mgm_get_connection_int_parameter(NdbMgmHandle handle,
const ParserRow<ParserDummy> reply[]= {
MGM_CMD("get connection parameter reply", NULL, ""),
MGM_ARG("result", String, Mandatory, "Error message"),
MGM_ARG("value", Int, Mandatory, "Current Value"),
MGM_ARG("result", String, Mandatory, "Result"),
MGM_END()
};
const Properties *prop;
prop= ndb_mgm_call(handle, reply, "get connection parameter", &args);
CHECK_REPLY(prop, -1);
prop = ndb_mgm_call(handle, reply, "get connection parameter", &args);
CHECK_REPLY(prop, -2);
int res= -1;
do {
......@@ -2058,10 +2059,13 @@ ndb_mgm_get_connection_int_parameter(NdbMgmHandle handle,
res= 0;
} while(0);
prop->get("value",value);
if(!prop->get("value",value)){
ndbout_c("Unable to get value");
res = -3;
}
delete prop;
return res;
DBUG_RETURN(res);
}
......
......@@ -584,7 +584,8 @@ MgmtSrvr::start(BaseString &error_string)
return false;
}
}
theFacade= TransporterFacade::theFacadeInstance= new TransporterFacade();
theFacade= TransporterFacade::theFacadeInstance
= new TransporterFacade(m_config_retriever->get_mgmHandle());
if(theFacade == 0) {
DEBUG("MgmtSrvr.cpp: theFacade is NULL.");
......@@ -2806,7 +2807,8 @@ MgmtSrvr::getConnectionDbParameter(int node1,
Uint32 n1,n2;
iter.get(CFG_CONNECTION_NODE_1, &n1);
iter.get(CFG_CONNECTION_NODE_2, &n2);
if(n1 == (unsigned)node1 && n2 == (unsigned)node2)
if((n1 == (unsigned)node1 && n2 == (unsigned)node2)
|| (n1 == (unsigned)node2 && n2 == (unsigned)node1))
break;
}
if(!iter.valid()) {
......@@ -2820,7 +2822,7 @@ MgmtSrvr::getConnectionDbParameter(int node1,
}
msg.assfmt("%u",*value);
return 1;
DBUG_RETURN(1);
}
template class Vector<SigMatch>;
......
......@@ -1399,10 +1399,9 @@ MgmApiSession::getConnectionParameter(Parser_t::Context &ctx,
&value,
result);
m_output->println("set connection parameter reply");
m_output->println("message: %s", result.c_str());
m_output->println("get connection parameter reply");
m_output->println("value: %u", value);
m_output->println("result: %s", (ret>0)?"Ok":"Failed");
m_output->println("result: %s", (ret>0)?"Ok":result.c_str());
m_output->println("");
}
......
......@@ -472,12 +472,13 @@ void TransporterFacade::threadMainReceive(void)
theTransporterRegistry->stopReceiving();
}
TransporterFacade::TransporterFacade() :
TransporterFacade::TransporterFacade(NdbMgmHandle mgm_handle) :
theTransporterRegistry(0),
theStopReceive(0),
theSendThread(NULL),
theReceiveThread(NULL),
m_fragmented_signal_id(0)
m_fragmented_signal_id(0),
m_mgm_handle(mgm_handle)
{
theOwnId = 0;
......@@ -501,7 +502,7 @@ bool
TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props)
{
theOwnId = nodeId;
theTransporterRegistry = new TransporterRegistry(this);
theTransporterRegistry = new TransporterRegistry(m_mgm_handle,this);
const int res = IPCConfig::configureTransporters(nodeId,
* props,
......
......@@ -24,6 +24,7 @@
#include <NdbMutex.h>
#include "DictCache.hpp"
#include <BlockNumbers.h>
#include <mgmapi.h>
class ClusterMgr;
class ArbitMgr;
......@@ -46,7 +47,7 @@ extern "C" {
class TransporterFacade
{
public:
TransporterFacade();
TransporterFacade(NdbMgmHandle mgm_handle);
virtual ~TransporterFacade();
bool init(Uint32, const ndb_mgm_configuration *);
......@@ -132,6 +133,7 @@ private:
bool isConnected(NodeId aNodeId);
void doStop();
NdbMgmHandle m_mgm_handle;
TransporterRegistry* theTransporterRegistry;
SocketServer m_socket_server;
int sendPerformedLastInterval;
......
......@@ -40,7 +40,6 @@ Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string)
{
DBUG_ENTER("Ndb_cluster_connection");
DBUG_PRINT("enter",("Ndb_cluster_connection this=0x%x", this));
m_facade= TransporterFacade::theFacadeInstance= new TransporterFacade();
m_config_retriever= 0;
m_connect_thread= 0;
......@@ -58,6 +57,10 @@ Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string)
#endif
m_config_retriever=
new ConfigRetriever(connect_string, NDB_VERSION, NODE_TYPE_API);
m_facade= TransporterFacade::theFacadeInstance
= new TransporterFacade(m_config_retriever->get_mgmHandle());
if (m_config_retriever->hasError())
{
printf("Could not connect initialize handle to management server: %s",
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment