Commit e795feca authored by stewart@mysql.com's avatar stewart@mysql.com

Impl5 of WL2278 - dynamic port allocation for cluster nodes

Treat the management server specially.

It should always be the server in a transporter as we then have a known
port to connect to on node restart.

allows a mgm connection (i.e. to the management server port, our known port)
to be transformed into a transporter connection.

Also, clean up the struct TransporterConfiguration (used to be a struct for
each transporter type. now there's just one)
parent 0b24883e
......@@ -49,74 +49,50 @@ enum SendStatus {
const Uint32 MAX_MESSAGE_SIZE = (12+4+4+(4*25)+(3*4)+4*4096);
/**
* TCP Transporter Configuration
* TransporterConfiguration
*
* used for setting up a transporter. the union member specific is for
* information specific to a transporter type.
*/
struct TCP_TransporterConfiguration {
Uint32 port;
struct TransporterConfiguration {
Uint32 port;
const char *remoteHostName;
const char *localHostName;
NodeId remoteNodeId;
NodeId localNodeId;
Uint32 sendBufferSize; // Size of SendBuffer of priority B
Uint32 maxReceiveSize; // Maximum no of bytes to receive
NodeId serverNodeId;
bool checksum;
bool signalId;
};
/**
* SHM Transporter Configuration
*/
struct SHM_TransporterConfiguration {
Uint32 port;
const char *remoteHostName;
const char *localHostName;
NodeId remoteNodeId;
NodeId localNodeId;
bool checksum;
bool signalId;
Uint32 shmKey;
Uint32 shmSize;
int signum;
};
/**
* OSE Transporter Configuration
*/
struct OSE_TransporterConfiguration {
const char *remoteHostName;
const char *localHostName;
NodeId remoteNodeId;
NodeId localNodeId;
bool checksum;
bool signalId;
Uint32 prioASignalSize;
Uint32 prioBSignalSize;
Uint32 receiveBufferSize; // In number of signals
};
/**
* SCI Transporter Configuration
*/
struct SCI_TransporterConfiguration {
const char *remoteHostName;
const char *localHostName;
Uint32 port;
Uint32 sendLimit; // Packet size
Uint32 bufferSize; // Buffer size
Uint32 nLocalAdapters; // 1 or 2, the number of adapters on local host
Uint32 remoteSciNodeId0; // SCInodeId for adapter 1
Uint32 remoteSciNodeId1; // SCInodeId for adapter 2
NodeId localNodeId; // Local node Id
NodeId remoteNodeId; // Remote node Id
bool checksum;
bool signalId;
bool isMgmConnection; // is a mgm connection, requires transforming
union { // Transporter specific configuration information
struct {
Uint32 sendBufferSize; // Size of SendBuffer of priority B
Uint32 maxReceiveSize; // Maximum no of bytes to receive
} tcp;
struct {
Uint32 shmKey;
Uint32 shmSize;
int signum;
} shm;
struct {
Uint32 prioASignalSize;
Uint32 prioBSignalSize;
} ose;
struct {
Uint32 sendLimit; // Packet size
Uint32 bufferSize; // Buffer size
Uint32 nLocalAdapters; // 1 or 2, the number of adapters on local host
Uint32 remoteSciNodeId0; // SCInodeId for adapter 1
Uint32 remoteSciNodeId1; // SCInodeId for adapter 2
} sci;
};
};
struct SignalHeader {
......
......@@ -102,6 +102,7 @@ public:
unsigned sizeOfLongSignalMemory = 100);
void set_mgm_handle(NdbMgmHandle h) { m_mgm_handle = h; };
NdbMgmHandle get_mgm_handle(void) { return m_mgm_handle; };
bool init(NodeId localNodeId);
......@@ -179,10 +180,10 @@ public:
* started, startServer is called. A transporter of the selected kind
* is created and it is put in the transporter arrays.
*/
bool createTransporter(struct TCP_TransporterConfiguration * config);
bool createTransporter(struct SCI_TransporterConfiguration * config);
bool createTransporter(struct SHM_TransporterConfiguration * config);
bool createTransporter(struct OSE_TransporterConfiguration * config);
bool createTCPTransporter(struct TransporterConfiguration * config);
bool createSCITransporter(struct TransporterConfiguration * config);
bool createSHMTransporter(struct TransporterConfiguration * config);
bool createOSETransporter(struct TransporterConfiguration * config);
/**
* prepareSend
......
......@@ -36,6 +36,7 @@ public:
m_servaddr.sin_port = htons(m_port);
};
NDB_SOCKET_TYPE connect();
NDB_SOCKET_TYPE connect_without_auth();
bool close();
};
......
This diff is collapsed.
......@@ -32,6 +32,7 @@ OSE_Transporter::OSE_Transporter(int _prioASignalSize,
NodeId localNodeId,
const char * lHostName,
NodeId remoteNodeId,
NodeId serverNodeId,
const char * rHostName,
int byteorder,
bool compression,
......@@ -40,6 +41,7 @@ OSE_Transporter::OSE_Transporter(int _prioASignalSize,
Uint32 reportFreq) :
Transporter(localNodeId,
remoteNodeId,
serverNodeId,
byteorder,
compression,
checksum,
......
......@@ -48,6 +48,7 @@ public:
NodeId localNodeId,
const char * lHostName,
NodeId remoteNodeId,
NodeId serverNodeId,
const char * rHostName,
int byteorder,
bool compression,
......
......@@ -34,19 +34,21 @@ SCI_Transporter::SCI_Transporter(TransporterRegistry &t_reg,
const char *lHostName,
const char *rHostName,
int r_port,
bool isMgmConnection,
Uint32 packetSize,
Uint32 bufferSize,
Uint32 nAdapters,
Uint16 remoteSciNodeId0,
Uint16 remoteSciNodeId1,
NodeId _localNodeId,
NodeId _remoteNodeId,
NodeId _remoteNodeId,
NodeId serverNodeId,
bool chksm,
bool signalId,
Uint32 reportFreq) :
Transporter(t_reg, tt_SCI_TRANSPORTER,
lHostName, rHostName, r_port, _localNodeId,
_remoteNodeId, 0, false, chksm, signalId)
lHostName, rHostName, r_port, isMgmConnection, _localNodeId,
_remoteNodeId, serverNodeID, 0, false, chksm, signalId)
{
DBUG_ENTER("SCI_Transporter::SCI_Transporter");
m_PacketSize = (packetSize + 3)/4 ;
......
......@@ -139,13 +139,15 @@ private:
const char *local_host,
const char *remote_host,
int port,
bool isMgmConnection,
Uint32 packetSize,
Uint32 bufferSize,
Uint32 nAdapters,
Uint16 remoteSciNodeId0,
Uint16 remoteSciNodeId1,
NodeId localNodeID,
NodeId remoteNodeID,
NodeId remoteNodeID,
NodeId serverNodeId,
bool checksum,
bool signalId,
Uint32 reportFreq = 4096);
......
......@@ -32,14 +32,17 @@ SHM_Transporter::SHM_Transporter(TransporterRegistry &t_reg,
const char *lHostName,
const char *rHostName,
int r_port,
bool isMgmConnection,
NodeId lNodeId,
NodeId rNodeId,
NodeId rNodeId,
NodeId serverNodeId,
bool checksum,
bool signalId,
key_t _shmKey,
Uint32 _shmSize) :
Transporter(t_reg, tt_SHM_TRANSPORTER,
lHostName, rHostName, r_port, lNodeId, rNodeId,
lHostName, rHostName, r_port, isMgmConnection,
lNodeId, rNodeId, serverNodeId,
0, false, checksum, signalId),
shmKey(_shmKey),
shmSize(_shmSize)
......
......@@ -36,8 +36,10 @@ public:
const char *lHostName,
const char *rHostName,
int r_port,
bool isMgmConnection,
NodeId lNodeId,
NodeId rNodeId,
NodeId rNodeId,
NodeId serverNodeId,
bool checksum,
bool signalId,
key_t shmKey,
......
......@@ -68,12 +68,15 @@ TCP_Transporter::TCP_Transporter(TransporterRegistry &t_reg,
const char *lHostName,
const char *rHostName,
int r_port,
bool isMgmConnection,
NodeId lNodeId,
NodeId rNodeId,
NodeId serverNodeId,
bool chksm, bool signalId,
Uint32 _reportFreq) :
Transporter(t_reg, tt_TCP_TRANSPORTER,
lHostName, rHostName, r_port, lNodeId, rNodeId,
lHostName, rHostName, r_port, isMgmConnection,
lNodeId, rNodeId, serverNodeId,
0, false, chksm, signalId),
m_sendBuffer(sendBufSize)
{
......
......@@ -49,9 +49,11 @@ private:
int sendBufferSize, int maxReceiveSize,
const char *lHostName,
const char *rHostName,
int r_port,
int r_port,
bool isMgmConnection,
NodeId lHostId,
NodeId rHostId,
NodeId serverNodeId,
bool checksum, bool signalId,
Uint32 reportFreq = 4096);
......
......@@ -32,12 +32,14 @@ Transporter::Transporter(TransporterRegistry &t_reg,
const char *lHostName,
const char *rHostName,
int r_port,
bool _isMgmConnection,
NodeId lNodeId,
NodeId rNodeId,
NodeId rNodeId,
NodeId serverNodeId,
int _byteorder,
bool _compression, bool _checksum, bool _signalId)
: m_r_port(r_port), remoteNodeId(rNodeId), localNodeId(lNodeId),
isServer(lNodeId < rNodeId),
isServer(lNodeId==serverNodeId), isMgmConnection(_isMgmConnection),
m_packer(_signalId, _checksum),
m_type(_type),
m_transporter_registry(t_reg)
......@@ -109,22 +111,46 @@ Transporter::connect_server(NDB_SOCKET_TYPE sockfd) {
bool
Transporter::connect_client() {
NDB_SOCKET_TYPE sockfd;
if(m_connected)
return true;
NDB_SOCKET_TYPE sockfd = m_socket_client->connect();
if (sockfd == NDB_INVALID_SOCKET)
return false;
DBUG_ENTER("Transporter::connect_client");
DBUG_PRINT("info",("port %d isMgmConnection=%d",m_r_port,isMgmConnection));
if(isMgmConnection)
sockfd= m_socket_client->connect_without_auth();
else
sockfd= m_socket_client->connect();
if(sockfd<0)
return false;
SocketOutputStream s_output(sockfd);
SocketInputStream s_input(sockfd);
if(isMgmConnection)
{
/*
We issue the magic command to the management server to
switch into transporter mode.
*/
s_output.println("transporter connect");
s_output.println("");
}
if (sockfd == NDB_INVALID_SOCKET)
return false;
// send info about own id
// send info about own transporter type
SocketOutputStream s_output(sockfd);
s_output.println("%d %d", localNodeId, m_type);
// get remote id
int nodeId, remote_transporter_type= -1;
SocketInputStream s_input(sockfd);
char buf[256];
if (s_input.gets(buf, 256) == 0) {
NDB_CLOSE_SOCKET(sockfd);
......
......@@ -89,8 +89,10 @@ protected:
const char *lHostName,
const char *rHostName,
int r_port,
bool isMgmConnection,
NodeId lNodeId,
NodeId rNodeId,
NodeId rNodeId,
NodeId serverNodeId,
int byteorder,
bool compression,
bool checksum,
......@@ -133,6 +135,12 @@ protected:
private:
/**
* means that we transform an MGM connection into
* a transporter connection
*/
bool isMgmConnection;
SocketClient *m_socket_client;
protected:
......
......@@ -248,7 +248,7 @@ TransporterRegistry::connect_server(NDB_SOCKET_TYPE sockfd)
}
bool
TransporterRegistry::createTransporter(TCP_TransporterConfiguration *config) {
TransporterRegistry::createTCPTransporter(TransporterConfiguration *config) {
#ifdef NDB_TCP_TRANSPORTER
if(!nodeIdSpecified){
......@@ -262,13 +262,15 @@ TransporterRegistry::createTransporter(TCP_TransporterConfiguration *config) {
return false;
TCP_Transporter * t = new TCP_Transporter(*this,
config->sendBufferSize,
config->maxReceiveSize,
config->tcp.sendBufferSize,
config->tcp.maxReceiveSize,
config->localHostName,
config->remoteHostName,
config->port,
config->isMgmConnection,
localNodeId,
config->remoteNodeId,
config->serverNodeId,
config->checksum,
config->signalId);
if (t == NULL)
......@@ -297,7 +299,7 @@ TransporterRegistry::createTransporter(TCP_TransporterConfiguration *config) {
}
bool
TransporterRegistry::createTransporter(OSE_TransporterConfiguration *conf) {
TransporterRegistry::createOSETransporter(TransporterConfiguration *conf) {
#ifdef NDB_OSE_TRANSPORTER
if(!nodeIdSpecified){
......@@ -316,11 +318,12 @@ TransporterRegistry::createTransporter(OSE_TransporterConfiguration *conf) {
localNodeId);
}
OSE_Transporter * t = new OSE_Transporter(conf->prioASignalSize,
conf->prioBSignalSize,
OSE_Transporter * t = new OSE_Transporter(conf->ose.prioASignalSize,
conf->ose.prioBSignalSize,
localNodeId,
conf->localHostName,
conf->remoteNodeId,
conf->serverNodeId,
conf->remoteHostName,
conf->checksum,
conf->signalId);
......@@ -346,7 +349,7 @@ TransporterRegistry::createTransporter(OSE_TransporterConfiguration *conf) {
}
bool
TransporterRegistry::createTransporter(SCI_TransporterConfiguration *config) {
TransporterRegistry::createSCITransporter(TransporterConfiguration *config) {
#ifdef NDB_SCI_TRANSPORTER
if(!SCI_Transporter::initSCI())
......@@ -366,13 +369,15 @@ TransporterRegistry::createTransporter(SCI_TransporterConfiguration *config) {
config->localHostName,
config->remoteHostName,
config->port,
config->sendLimit,
config->bufferSize,
config->nLocalAdapters,
config->remoteSciNodeId0,
config->remoteSciNodeId1,
config->isMgmConnection,
config->sci.sendLimit,
config->sci.bufferSize,
config->sci.nLocalAdapters,
config->sci.remoteSciNodeId0,
config->sci.remoteSciNodeId1,
localNodeId,
config->remoteNodeId,
config->serverNodeId,
config->checksum,
config->signalId);
......@@ -397,7 +402,7 @@ TransporterRegistry::createTransporter(SCI_TransporterConfiguration *config) {
}
bool
TransporterRegistry::createTransporter(SHM_TransporterConfiguration *config) {
TransporterRegistry::createSHMTransporter(TransporterConfiguration *config) {
DBUG_ENTER("TransporterRegistry::createTransporter SHM");
#ifdef NDB_SHM_TRANSPORTER
if(!nodeIdSpecified){
......@@ -408,7 +413,7 @@ TransporterRegistry::createTransporter(SHM_TransporterConfiguration *config) {
return false;
if (!g_ndb_shm_signum) {
g_ndb_shm_signum= config->signum;
g_ndb_shm_signum= config->shm.signum;
DBUG_PRINT("info",("Block signum %d",g_ndb_shm_signum));
/**
* Make sure to block g_ndb_shm_signum
......@@ -420,7 +425,7 @@ TransporterRegistry::createTransporter(SHM_TransporterConfiguration *config) {
pthread_sigmask(SIG_BLOCK, &mask, 0);
}
if(config->signum != g_ndb_shm_signum)
if(config->shm.signum != g_ndb_shm_signum)
return false;
if(theTransporters[config->remoteNodeId] != NULL)
......@@ -430,12 +435,14 @@ TransporterRegistry::createTransporter(SHM_TransporterConfiguration *config) {
config->localHostName,
config->remoteHostName,
config->port,
config->isMgmConnection,
localNodeId,
config->remoteNodeId,
config->serverNodeId,
config->checksum,
config->signalId,
config->shmKey,
config->shmSize
config->shm.shmKey,
config->shm.shmSize
);
if (t == NULL)
return false;
......
......@@ -60,6 +60,27 @@ SocketClient::init()
return true;
}
/**
* SocketClient::connect_without_auth()
*
* Temporarily disables authentication and connects.
* This is useful if you're trying to change what this
* SocketClient object is for (e.g. from mgm to ndb)
*/
NDB_SOCKET_TYPE
SocketClient::connect_without_auth()
{
SocketAuthenticator *tmp;
NDB_SOCKET_TYPE retval;
tmp= m_auth;
m_auth= NULL;
retval= connect();
m_auth= tmp;
return retval;
}
NDB_SOCKET_TYPE
SocketClient::connect()
{
......
......@@ -333,11 +333,18 @@ sessionThread_C(void* _sc){
return 0;
}
if(!si->m_stop){
si->m_stopped = false;
si->runSession();
} else {
NDB_CLOSE_SOCKET(si->m_socket);
/**
* may have m_stopped set if we're transforming a mgm
* connection into a transporter connection.
*/
if(!si->m_stopped)
{
if(!si->m_stop){
si->m_stopped = false;
si->runSession();
} else {
NDB_CLOSE_SOCKET(si->m_socket);
}
}
si->m_stopped = true;
......
......@@ -3132,8 +3132,8 @@ fixPortNumber(InitConfigFileParser::Context & ctx, const char * data){
const Properties * node;
require(ctx.m_config->get("Node", id1, &node));
BaseString hostname(hostName1);
// require(node->get("HostName", hostname));
if (hostname.c_str()[0] == 0) {
ctx.reportError("Hostname required on nodeid %d since it will "
......@@ -3142,6 +3142,19 @@ fixPortNumber(InitConfigFileParser::Context & ctx, const char * data){
}
Uint32 port= 0;
const char * type1;
const char * type2;
const Properties * node2;
node->get("Type", &type1);
ctx.m_config->get("Node", id2, &node2);
node2->get("Type", &type2);
if(strcmp(type1, MGM_TOKEN)==0)
node->get("PortNumber",&port);
else if(strcmp(type2, MGM_TOKEN)==0)
node2->get("PortNumber",&port);
if (!node->get("ServerPort", &port) &&
!ctx.m_userProperties.get("ServerPort_", id1, &port)) {
ctx.m_currentSection->put("PortNumber", port);
......
......@@ -2883,6 +2883,11 @@ MgmtSrvr::getConnectionDbParameter(int node1,
DBUG_RETURN(1);
}
void MgmtSrvr::transporter_connect(NDB_SOCKET_TYPE sockfd)
{
theFacade->get_registry()->connect_server(sockfd);
}
int MgmtSrvr::set_connect_string(const char *str)
{
return ndb_mgm_set_connectstring(m_config_retriever->get_mgmHandle(),str);
......
......@@ -515,6 +515,8 @@ public:
int set_connect_string(const char *str);
void transporter_connect(NDB_SOCKET_TYPE sockfd);
ConfigRetriever *get_config_retriever() { return m_config_retriever; };
const char *get_connect_address(Uint32 node_id) { return inet_ntoa(m_connect_address[node_id]); }
......
......@@ -264,6 +264,8 @@ ParserRow<MgmApiSession> commands[] = {
MGM_CMD("check connection", &MgmApiSession::check_connection, ""),
MGM_CMD("transporter connect", &MgmApiSession::transporter_connect, ""),
MGM_END()
};
......@@ -1538,5 +1540,17 @@ MgmApiSession::check_connection(Parser_t::Context &ctx,
m_output->println("");
}
void
MgmApiSession::transporter_connect(Parser_t::Context &ctx,
Properties const &args) {
NDB_SOCKET_TYPE s= m_socket;
m_stop= true;
m_stopped= true; // force a stop (no closing socket)
m_socket= -1; // so nobody closes it
m_mgmsrv.transporter_connect(s);
}
template class MutexVector<int>;
template class Vector<ParserRow<MgmApiSession> const*>;
......@@ -97,6 +97,8 @@ public:
void purge_stale_sessions(Parser_t::Context &ctx, const class Properties &args);
void check_connection(Parser_t::Context &ctx, const class Properties &args);
void transporter_connect(Parser_t::Context &ctx, Properties const &args);
void repCommand(Parser_t::Context &ctx, const class Properties &args);
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment