Commit 233a33da authored by unknown's avatar unknown

Impl 2 of WL2278 - Dynamic port allocation of cluster nodes.

In "client connect thread", let the client read the port to connect to using
ndb_mgm_get_connection_int_parameter.

The request for the port is resent on every connect attempt.


ndb/include/mgmapi/mgmapi_debug.h:
  Make ndb_mgm_get_connection_int_parameter return a Uint32 value - this is what Properties etc use, so we'll be consistent.
ndb/include/transporter/TransporterRegistry.hpp:
  Add NdbMgmHandle to constructor. This is used to get the port number
  to connect to from mgmd. Defaults to NULL, although things will go badly
  if you don't change this (by calling the new set_mgm_handle method) pretty
  quickly.
  
  Add set_mgm_handle(NdbMgmHandle) method.
   - sets the MgmHandle to use when requesting from mgmd what port to connect to a node on.
ndb/src/common/transporter/Transporter.hpp:
  Make remote port not a const.
  Add method to set remote port - set_r_port(unsigned int)
  
  Make getLocalNodeId return localNodeId, not remoteNodeId.
ndb/src/common/transporter/TransporterRegistry.cpp:
  TransporterRegistry::TransporterRegistry()
   - accept NdbMgmHandle parameter
   - set m_mgm_handle to this
  
  TransporterRegistry::start_clients_thread()
   - If we're connecting to a node, and the server_port (from the config) is <=0,
  	we request the port number to connect to from mgmd.
  
  (note: in testing, the <=0 check was commented out so the code was run.
  There is no harm in always running it, it's just an extra round-trip to mgmd
  that we may not need).
ndb/src/kernel/main.cpp:
  Set the mgm_handle for globalTransporterRegistry soon after we have set up theConfig (which sets up the mgmHandle).
ndb/src/mgmapi/mgmapi.cpp:
  - Remove dead #else on #if 1
  
  - Print an error message and warning if the parser returns NULL.
    this will no longer silently fail, it will give output with
    information to help the programmer find out where things went wrong.
    In normal operation, this codepath should never be hit.
  
  - fix handlers for 'get|set connection parameter' calls.
ndb/src/mgmsrv/MgmtSrvr.cpp:
  - Create TransporterFacade with the mgmHandle.
  - Don't worry about the order of node1 and node2 in getConnectionDbParameter
  - use a proper DBUG_RETURN in getConnectionParameter
ndb/src/mgmsrv/Services.cpp:
  - fix reply to 'get connection parameter'
  - optimise reply size.
ndb/src/ndbapi/TransporterFacade.cpp:
  - create TransporterRegistry with m_mgm_handle
  - set m_mgm_handle in constructor
ndb/src/ndbapi/TransporterFacade.hpp:
  Introduce m_mgm_handle member.
ndb/src/ndbapi/ndb_cluster_connection.cpp:
  create TransporterFacade (with mgmHandle) after the ConfigRetriever has been created
parent 168e5719
...@@ -165,7 +165,7 @@ extern "C" { ...@@ -165,7 +165,7 @@ extern "C" {
int node1, int node1,
int node2, int node2,
int param, int param,
unsigned *value, Uint32 *value,
struct ndb_mgm_reply* reply); struct ndb_mgm_reply* reply);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -33,6 +33,8 @@ ...@@ -33,6 +33,8 @@
#include <NdbTCP.h> #include <NdbTCP.h>
#include <mgmapi/mgmapi.h>
// A transporter is always in an IOState. // A transporter is always in an IOState.
// NoHalt is used initially and as long as it is no restrictions on // NoHalt is used initially and as long as it is no restrictions on
// sending or receiving. // sending or receiving.
...@@ -94,10 +96,13 @@ public: ...@@ -94,10 +96,13 @@ public:
/** /**
* Constructor * Constructor
*/ */
TransporterRegistry(void * callback = 0 , TransporterRegistry(NdbMgmHandle mgm_handle=NULL,
void * callback = 0 ,
unsigned maxTransporters = MAX_NTRANSPORTERS, unsigned maxTransporters = MAX_NTRANSPORTERS,
unsigned sizeOfLongSignalMemory = 100); unsigned sizeOfLongSignalMemory = 100);
void set_mgm_handle(NdbMgmHandle h) { m_mgm_handle = h; };
bool init(NodeId localNodeId); bool init(NodeId localNodeId);
/** /**
...@@ -236,6 +241,8 @@ protected: ...@@ -236,6 +241,8 @@ protected:
private: private:
void * callbackObj; void * callbackObj;
NdbMgmHandle m_mgm_handle;
struct NdbThread *m_start_clients_thread; struct NdbThread *m_start_clients_thread;
bool m_run_start_clients_thread; bool m_run_start_clients_thread;
......
...@@ -69,6 +69,11 @@ public: ...@@ -69,6 +69,11 @@ public:
*/ */
NodeId getLocalNodeId() const; NodeId getLocalNodeId() const;
/**
* Set r_port to connect to
*/
void set_r_port(unsigned int port) { m_r_port = port; };
protected: protected:
Transporter(TransporterRegistry &, Transporter(TransporterRegistry &,
const char *lHostName, const char *lHostName,
...@@ -101,7 +106,7 @@ protected: ...@@ -101,7 +106,7 @@ protected:
struct in_addr remoteHostAddress; struct in_addr remoteHostAddress;
struct in_addr localHostAddress; struct in_addr localHostAddress;
const unsigned int m_r_port; unsigned int m_r_port;
const NodeId remoteNodeId; const NodeId remoteNodeId;
const NodeId localNodeId; const NodeId localNodeId;
...@@ -149,7 +154,7 @@ Transporter::getRemoteNodeId() const { ...@@ -149,7 +154,7 @@ Transporter::getRemoteNodeId() const {
inline inline
NodeId NodeId
Transporter::getLocalNodeId() const { Transporter::getLocalNodeId() const {
return remoteNodeId; return localNodeId;
} }
inline inline
......
...@@ -47,6 +47,8 @@ ...@@ -47,6 +47,8 @@
#include <InputStream.hpp> #include <InputStream.hpp>
#include <OutputStream.hpp> #include <OutputStream.hpp>
#include <mgmapi/mgmapi_debug.h>
int g_shm_pid = 0; int g_shm_pid = 0;
SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd) SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
...@@ -105,13 +107,15 @@ SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd) ...@@ -105,13 +107,15 @@ SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
DBUG_RETURN(0); DBUG_RETURN(0);
} }
TransporterRegistry::TransporterRegistry(void * callback, TransporterRegistry::TransporterRegistry(NdbMgmHandle mgm_handle,
void * callback,
unsigned _maxTransporters, unsigned _maxTransporters,
unsigned sizeOfLongSignalMemory) { unsigned sizeOfLongSignalMemory) {
nodeIdSpecified = false; nodeIdSpecified = false;
maxTransporters = _maxTransporters; maxTransporters = _maxTransporters;
sendCounter = 1; sendCounter = 1;
m_mgm_handle = mgm_handle;
callbackObj=callback; callbackObj=callback;
...@@ -1136,8 +1140,27 @@ TransporterRegistry::start_clients_thread() ...@@ -1136,8 +1140,27 @@ TransporterRegistry::start_clients_thread()
const NodeId nodeId = t->getRemoteNodeId(); const NodeId nodeId = t->getRemoteNodeId();
switch(performStates[nodeId]){ switch(performStates[nodeId]){
case CONNECTING: case CONNECTING:
if(!t->isConnected() && !t->isServer) if(!t->isConnected() && !t->isServer) {
t->connect_client(); if(server_port <= 0) { // Port is dynamic
Uint32 server_port=0;
struct ndb_mgm_reply mgm_reply;
int res;
res=ndb_mgm_get_connection_int_parameter(m_mgm_handle,
t->getRemoteNodeId(),
t->getLocalNodeId(),
CFG_CONNECTION_SERVER_PORT,
&server_port,
&mgm_reply);
DBUG_PRINT("info",("Got dynamic port %u for %d -> %d (ret: %d)",
server_port,t->getRemoteNodeId(),
t->getLocalNodeId()));
if(res>=0)
t->set_r_port(server_port);
else
ndbout_c("Failed to get dynamic port to connect to.");
}
t->connect_client();
}
break; break;
case DISCONNECTING: case DISCONNECTING:
if(t->isConnected()) if(t->isConnected())
......
...@@ -91,6 +91,10 @@ int main(int argc, char** argv) ...@@ -91,6 +91,10 @@ int main(int argc, char** argv)
return 1; return 1;
} }
} }
globalTransporterRegistry.set_mgm_handle(theConfig
->get_config_retriever()
->get_mgmHandle());
#ifndef NDB_WIN32 #ifndef NDB_WIN32
for(pid_t child = fork(); child != 0; child = fork()){ for(pid_t child = fork(); child != 0; child = fork()){
......
...@@ -307,14 +307,17 @@ ndb_mgm_call(NdbMgmHandle handle, const ParserRow<ParserDummy> *command_reply, ...@@ -307,14 +307,17 @@ ndb_mgm_call(NdbMgmHandle handle, const ParserRow<ParserDummy> *command_reply,
ParserDummy session(handle->socket); ParserDummy session(handle->socket);
Parser_t parser(command_reply, in, true, true, true); Parser_t parser(command_reply, in, true, true, true);
#if 1
const Properties* p = parser.parse(ctx, session); const Properties* p = parser.parse(ctx, session);
if (p == NULL){ if (p == NULL){
/** /**
* Print some info about why the parser returns NULL * Print some info about why the parser returns NULL
*/ */
//ndbout << " status=" << ctx.m_status << ", curr=" ndbout << "Error in mgm protocol parser. "
//<< ctx.m_currentToken << endl; << "cmd: '" << cmd
<< "' status=" << ctx.m_status
<< ", curr=" << ctx.m_currentToken
<< endl;
DBUG_PRINT("info",("parser.parse returned NULL"));
} }
#ifdef MGMAPI_LOG #ifdef MGMAPI_LOG
else { else {
...@@ -325,9 +328,6 @@ ndb_mgm_call(NdbMgmHandle handle, const ParserRow<ParserDummy> *command_reply, ...@@ -325,9 +328,6 @@ ndb_mgm_call(NdbMgmHandle handle, const ParserRow<ParserDummy> *command_reply,
} }
#endif #endif
return p; return p;
#else
return parser.parse(ctx, session);
#endif
} }
/** /**
...@@ -1998,7 +1998,8 @@ ndb_mgm_set_connection_int_parameter(NdbMgmHandle handle, ...@@ -1998,7 +1998,8 @@ ndb_mgm_set_connection_int_parameter(NdbMgmHandle handle,
const ParserRow<ParserDummy> reply[]= { const ParserRow<ParserDummy> reply[]= {
MGM_CMD("set connection parameter reply", NULL, ""), MGM_CMD("set connection parameter reply", NULL, ""),
MGM_ARG("result", String, Mandatory, "Error message"), MGM_ARG("message", String, Mandatory, "Error Message"),
MGM_ARG("result", String, Mandatory, "Status Result"),
MGM_END() MGM_END()
}; };
...@@ -2026,7 +2027,7 @@ ndb_mgm_get_connection_int_parameter(NdbMgmHandle handle, ...@@ -2026,7 +2027,7 @@ ndb_mgm_get_connection_int_parameter(NdbMgmHandle handle,
int node1, int node1,
int node2, int node2,
int param, int param,
unsigned *value, Uint32 *value,
struct ndb_mgm_reply* mgmreply){ struct ndb_mgm_reply* mgmreply){
DBUG_ENTER("ndb_mgm_get_connection_int_parameter"); DBUG_ENTER("ndb_mgm_get_connection_int_parameter");
CHECK_HANDLE(handle, -1); CHECK_HANDLE(handle, -1);
...@@ -2036,17 +2037,17 @@ ndb_mgm_get_connection_int_parameter(NdbMgmHandle handle, ...@@ -2036,17 +2037,17 @@ ndb_mgm_get_connection_int_parameter(NdbMgmHandle handle,
args.put("node1", node1); args.put("node1", node1);
args.put("node2", node2); args.put("node2", node2);
args.put("param", param); args.put("param", param);
const ParserRow<ParserDummy> reply[]= { const ParserRow<ParserDummy> reply[]= {
MGM_CMD("get connection parameter reply", NULL, ""), MGM_CMD("get connection parameter reply", NULL, ""),
MGM_ARG("result", String, Mandatory, "Error message"),
MGM_ARG("value", Int, Mandatory, "Current Value"), MGM_ARG("value", Int, Mandatory, "Current Value"),
MGM_ARG("result", String, Mandatory, "Result"),
MGM_END() MGM_END()
}; };
const Properties *prop; const Properties *prop;
prop= ndb_mgm_call(handle, reply, "get connection parameter", &args); prop = ndb_mgm_call(handle, reply, "get connection parameter", &args);
CHECK_REPLY(prop, -1); CHECK_REPLY(prop, -2);
int res= -1; int res= -1;
do { do {
...@@ -2058,10 +2059,13 @@ ndb_mgm_get_connection_int_parameter(NdbMgmHandle handle, ...@@ -2058,10 +2059,13 @@ ndb_mgm_get_connection_int_parameter(NdbMgmHandle handle,
res= 0; res= 0;
} while(0); } while(0);
prop->get("value",value); if(!prop->get("value",value)){
ndbout_c("Unable to get value");
res = -3;
}
delete prop; delete prop;
return res; DBUG_RETURN(res);
} }
......
...@@ -584,7 +584,8 @@ MgmtSrvr::start(BaseString &error_string) ...@@ -584,7 +584,8 @@ MgmtSrvr::start(BaseString &error_string)
return false; return false;
} }
} }
theFacade= TransporterFacade::theFacadeInstance= new TransporterFacade(); theFacade= TransporterFacade::theFacadeInstance
= new TransporterFacade(m_config_retriever->get_mgmHandle());
if(theFacade == 0) { if(theFacade == 0) {
DEBUG("MgmtSrvr.cpp: theFacade is NULL."); DEBUG("MgmtSrvr.cpp: theFacade is NULL.");
...@@ -2806,7 +2807,8 @@ MgmtSrvr::getConnectionDbParameter(int node1, ...@@ -2806,7 +2807,8 @@ MgmtSrvr::getConnectionDbParameter(int node1,
Uint32 n1,n2; Uint32 n1,n2;
iter.get(CFG_CONNECTION_NODE_1, &n1); iter.get(CFG_CONNECTION_NODE_1, &n1);
iter.get(CFG_CONNECTION_NODE_2, &n2); iter.get(CFG_CONNECTION_NODE_2, &n2);
if(n1 == (unsigned)node1 && n2 == (unsigned)node2) if((n1 == (unsigned)node1 && n2 == (unsigned)node2)
|| (n1 == (unsigned)node2 && n2 == (unsigned)node1))
break; break;
} }
if(!iter.valid()) { if(!iter.valid()) {
...@@ -2820,7 +2822,7 @@ MgmtSrvr::getConnectionDbParameter(int node1, ...@@ -2820,7 +2822,7 @@ MgmtSrvr::getConnectionDbParameter(int node1,
} }
msg.assfmt("%u",*value); msg.assfmt("%u",*value);
return 1; DBUG_RETURN(1);
} }
template class Vector<SigMatch>; template class Vector<SigMatch>;
......
...@@ -1399,10 +1399,9 @@ MgmApiSession::getConnectionParameter(Parser_t::Context &ctx, ...@@ -1399,10 +1399,9 @@ MgmApiSession::getConnectionParameter(Parser_t::Context &ctx,
&value, &value,
result); result);
m_output->println("set connection parameter reply"); m_output->println("get connection parameter reply");
m_output->println("message: %s", result.c_str());
m_output->println("value: %u", value); m_output->println("value: %u", value);
m_output->println("result: %s", (ret>0)?"Ok":"Failed"); m_output->println("result: %s", (ret>0)?"Ok":result.c_str());
m_output->println(""); m_output->println("");
} }
......
...@@ -472,12 +472,13 @@ void TransporterFacade::threadMainReceive(void) ...@@ -472,12 +472,13 @@ void TransporterFacade::threadMainReceive(void)
theTransporterRegistry->stopReceiving(); theTransporterRegistry->stopReceiving();
} }
TransporterFacade::TransporterFacade() : TransporterFacade::TransporterFacade(NdbMgmHandle mgm_handle) :
theTransporterRegistry(0), theTransporterRegistry(0),
theStopReceive(0), theStopReceive(0),
theSendThread(NULL), theSendThread(NULL),
theReceiveThread(NULL), theReceiveThread(NULL),
m_fragmented_signal_id(0) m_fragmented_signal_id(0),
m_mgm_handle(mgm_handle)
{ {
theOwnId = 0; theOwnId = 0;
...@@ -501,7 +502,7 @@ bool ...@@ -501,7 +502,7 @@ bool
TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props) TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props)
{ {
theOwnId = nodeId; theOwnId = nodeId;
theTransporterRegistry = new TransporterRegistry(this); theTransporterRegistry = new TransporterRegistry(m_mgm_handle,this);
const int res = IPCConfig::configureTransporters(nodeId, const int res = IPCConfig::configureTransporters(nodeId,
* props, * props,
......
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
#include <NdbMutex.h> #include <NdbMutex.h>
#include "DictCache.hpp" #include "DictCache.hpp"
#include <BlockNumbers.h> #include <BlockNumbers.h>
#include <mgmapi.h>
class ClusterMgr; class ClusterMgr;
class ArbitMgr; class ArbitMgr;
...@@ -46,7 +47,7 @@ extern "C" { ...@@ -46,7 +47,7 @@ extern "C" {
class TransporterFacade class TransporterFacade
{ {
public: public:
TransporterFacade(); TransporterFacade(NdbMgmHandle mgm_handle);
virtual ~TransporterFacade(); virtual ~TransporterFacade();
bool init(Uint32, const ndb_mgm_configuration *); bool init(Uint32, const ndb_mgm_configuration *);
...@@ -131,7 +132,8 @@ private: ...@@ -131,7 +132,8 @@ private:
bool isConnected(NodeId aNodeId); bool isConnected(NodeId aNodeId);
void doStop(); void doStop();
NdbMgmHandle m_mgm_handle;
TransporterRegistry* theTransporterRegistry; TransporterRegistry* theTransporterRegistry;
SocketServer m_socket_server; SocketServer m_socket_server;
int sendPerformedLastInterval; int sendPerformedLastInterval;
......
...@@ -40,7 +40,6 @@ Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string) ...@@ -40,7 +40,6 @@ Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string)
{ {
DBUG_ENTER("Ndb_cluster_connection"); DBUG_ENTER("Ndb_cluster_connection");
DBUG_PRINT("enter",("Ndb_cluster_connection this=0x%x", this)); DBUG_PRINT("enter",("Ndb_cluster_connection this=0x%x", this));
m_facade= TransporterFacade::theFacadeInstance= new TransporterFacade();
m_config_retriever= 0; m_config_retriever= 0;
m_connect_thread= 0; m_connect_thread= 0;
...@@ -58,6 +57,10 @@ Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string) ...@@ -58,6 +57,10 @@ Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string)
#endif #endif
m_config_retriever= m_config_retriever=
new ConfigRetriever(connect_string, NDB_VERSION, NODE_TYPE_API); new ConfigRetriever(connect_string, NDB_VERSION, NODE_TYPE_API);
m_facade= TransporterFacade::theFacadeInstance
= new TransporterFacade(m_config_retriever->get_mgmHandle());
if (m_config_retriever->hasError()) if (m_config_retriever->hasError())
{ {
printf("Could not connect initialize handle to management server: %s", printf("Could not connect initialize handle to management server: %s",
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment