Commit 6d5e2561 authored by unknown's avatar unknown

Impl5 of WL2278 - dynamic port allocation for cluster nodes

Treat the management server specially.

It should always be the server in a transporter as we then have a known
port to connect to on node restart.

allows a mgm connection (i.e. to the management server port, our known port)
to be transformed into a transporter connection.

Also, clean up the struct TransporterConfiguration (used to be a struct for
each transporter type. now there's just one)


ndb/include/transporter/TransporterDefinitions.hpp:
  Clean up XXX_TransporterConfiguration and create one TransporterConfiguration structure.
  
  Makes things easier to read (especially in IPCConfig.cpp)
ndb/include/transporter/TransporterRegistry.hpp:
  add get_mgm_handle (to compliment the set_mgm_handle function)
  
  clean up createTransporter to use just one TransporterConfiguration struct
ndb/include/util/SocketClient.hpp:
  Introduce connect_without_auth() to ignore any authentication method that may have been set.
ndb/src/common/mgmcommon/IPCConfig.cpp:
  Remove dead IPCConfig::configureTransporters(TransporterRegistry*)
  
  Fixup IPCConfig::configureTransporters(Uint32 nodeId...)
  - use the 'one struct TransporterConfiguration to rule them all'
  - make MGM node the server
  - fix switch statement for transporter types
    - close } in strange place
    - possible inadvertent fall through
ndb/src/common/transporter/OSE_Transporter.cpp:
  a partial fix for the introduction of new parameters.
  
  OSE shouldn't build how it is now. Better to keep the build broken than have it build and fail strangely at runtime.
ndb/src/common/transporter/OSE_Transporter.hpp:
  a partial fix for the introduction of new parameters.
  
  OSE shouldn't build how it is now. Better to keep the build broken than have it build and fail strangely at runtime.
ndb/src/common/transporter/SCI_Transporter.cpp:
  should be correct for SCI transporter.
ndb/src/common/transporter/SCI_Transporter.hpp:
  should be correct for SCI transporter
ndb/src/common/transporter/SHM_Transporter.cpp:
  add new parameters for Transporter constructor
ndb/src/common/transporter/SHM_Transporter.hpp:
  add new parameters for Transporter constructor
ndb/src/common/transporter/TCP_Transporter.cpp:
  add new parameters for Transporter constructor
ndb/src/common/transporter/TCP_Transporter.hpp:
  add new parameters for Transporter constructor
ndb/src/common/transporter/Transporter.cpp:
  Add new parameters
   - isMgmConnection
  	requires transforming from mgm to transporter
   - serverNodeId
  	node id that will serve as the server
  
  Treat connection differently if isMgmConnection (send a special mgm command first)
ndb/src/common/transporter/Transporter.hpp:
  add fields to constructor
  
  add isMgmConnection member (if true, have to transform a mgm connection)
ndb/src/common/transporter/TransporterRegistry.cpp:
  createTransporter -> createTCPTransporter (etc)
  
  add extra transporter constructor parameters (from config)
  
  modify to use changes to TransporterConfiguration
ndb/src/common/util/SocketClient.cpp:
  SocketClient::connect_without_auth()
  
  Temporarily disables authentication and connects.
  This is useful if you're trying to change what this
  SocketClient object is for (e.g. from mgm to ndb)
ndb/src/common/util/SocketServer.cpp:
  Don't runSession or close socket when entering sessionThread if m_stopped
ndb/src/mgmsrv/ConfigInfo.cpp:
  fixPortNumber
  - Get port number from the MGM node as it will always be the server
ndb/src/mgmsrv/MgmtSrvr.cpp:
  transporter_connect(sockfd)
  - transform this mgm connection into a transporter connection
ndb/src/mgmsrv/MgmtSrvr.hpp:
  prototype for transporter_connect
ndb/src/mgmsrv/Services.cpp:
  add command: "transporter connect"
  
  stops the MgmApiSession and replaces it with a transporter connection
ndb/src/mgmsrv/Services.hpp:
  prototype for transporter_connect
parent 00b6f1b9
......@@ -49,60 +49,41 @@ enum SendStatus {
const Uint32 MAX_MESSAGE_SIZE = (12+4+4+(4*25)+(3*4)+4*4096);
/**
* TCP Transporter Configuration
* TransporterConfiguration
*
* used for setting up a transporter. the union member specific is for
* information specific to a transporter type.
*/
struct TCP_TransporterConfiguration {
struct TransporterConfiguration {
Uint32 port;
const char *remoteHostName;
const char *localHostName;
NodeId remoteNodeId;
NodeId localNodeId;
Uint32 sendBufferSize; // Size of SendBuffer of priority B
Uint32 maxReceiveSize; // Maximum no of bytes to receive
NodeId serverNodeId;
bool checksum;
bool signalId;
};
bool isMgmConnection; // is a mgm connection, requires transforming
/**
* SHM Transporter Configuration
*/
struct SHM_TransporterConfiguration {
Uint32 port;
const char *remoteHostName;
const char *localHostName;
NodeId remoteNodeId;
NodeId localNodeId;
bool checksum;
bool signalId;
union { // Transporter specific configuration information
struct {
Uint32 sendBufferSize; // Size of SendBuffer of priority B
Uint32 maxReceiveSize; // Maximum no of bytes to receive
} tcp;
struct {
Uint32 shmKey;
Uint32 shmSize;
int signum;
};
/**
* OSE Transporter Configuration
*/
struct OSE_TransporterConfiguration {
const char *remoteHostName;
const char *localHostName;
NodeId remoteNodeId;
NodeId localNodeId;
bool checksum;
bool signalId;
} shm;
struct {
Uint32 prioASignalSize;
Uint32 prioBSignalSize;
Uint32 receiveBufferSize; // In number of signals
};
} ose;
/**
* SCI Transporter Configuration
*/
struct SCI_TransporterConfiguration {
const char *remoteHostName;
const char *localHostName;
Uint32 port;
struct {
Uint32 sendLimit; // Packet size
Uint32 bufferSize; // Buffer size
......@@ -110,13 +91,8 @@ struct SCI_TransporterConfiguration {
Uint32 remoteSciNodeId0; // SCInodeId for adapter 1
Uint32 remoteSciNodeId1; // SCInodeId for adapter 2
NodeId localNodeId; // Local node Id
NodeId remoteNodeId; // Remote node Id
bool checksum;
bool signalId;
} sci;
};
};
struct SignalHeader {
......
......@@ -102,6 +102,7 @@ public:
unsigned sizeOfLongSignalMemory = 100);
void set_mgm_handle(NdbMgmHandle h) { m_mgm_handle = h; };
NdbMgmHandle get_mgm_handle(void) { return m_mgm_handle; };
bool init(NodeId localNodeId);
......@@ -179,10 +180,10 @@ public:
* started, startServer is called. A transporter of the selected kind
* is created and it is put in the transporter arrays.
*/
bool createTransporter(struct TCP_TransporterConfiguration * config);
bool createTransporter(struct SCI_TransporterConfiguration * config);
bool createTransporter(struct SHM_TransporterConfiguration * config);
bool createTransporter(struct OSE_TransporterConfiguration * config);
bool createTCPTransporter(struct TransporterConfiguration * config);
bool createSCITransporter(struct TransporterConfiguration * config);
bool createSHMTransporter(struct TransporterConfiguration * config);
bool createOSETransporter(struct TransporterConfiguration * config);
/**
* prepareSend
......
......@@ -36,6 +36,7 @@ public:
m_servaddr.sin_port = htons(m_port);
};
NDB_SOCKET_TYPE connect();
NDB_SOCKET_TYPE connect_without_auth();
bool close();
};
......
......@@ -110,175 +110,6 @@ IPCConfig::addRemoteNodeId(NodeId nodeId){
return true;
}
/**
* Returns no of transporters configured
*/
int
IPCConfig::configureTransporters(TransporterRegistry * theTransporterRegistry){
int noOfTransportersCreated = 0;
Uint32 noOfConnections;
if(!props->get("NoOfConnections", &noOfConnections)) return -1;
for (Uint32 i = 0; i < noOfConnections; i++){
const Properties * tmp;
Uint32 nodeId1, nodeId2;
const char * host1;
const char * host2;
if(!props->get("Connection", i, &tmp)) continue;
if(!tmp->get("NodeId1", &nodeId1)) continue;
if(!tmp->get("NodeId2", &nodeId2)) continue;
if(nodeId1 != the_ownId && nodeId2 != the_ownId) continue;
Uint32 sendSignalId;
Uint32 compression;
Uint32 checksum;
if(!tmp->get("SendSignalId", &sendSignalId)) continue;
if(!tmp->get("Checksum", &checksum)) continue;
const char * type;
if(!tmp->get("Type", &type)) continue;
if(strcmp("SHM", type) == 0){
SHM_TransporterConfiguration conf;
conf.localNodeId = the_ownId;
conf.remoteNodeId = (nodeId1 != the_ownId ? nodeId1 : nodeId2);
conf.checksum = checksum;
conf.signalId = sendSignalId;
if(!tmp->get("ShmKey", &conf.shmKey)) continue;
if(!tmp->get("ShmSize", &conf.shmSize)) continue;
if(!theTransporterRegistry->createTransporter(&conf)){
ndbout << "Failed to create SHM Transporter from: "
<< conf.localNodeId << " to: " << conf.remoteNodeId << endl;
continue;
} else {
noOfTransportersCreated++;
continue;
}
} else if(strcmp("SCI", type) == 0){
SCI_TransporterConfiguration conf;
conf.localNodeId = the_ownId;
conf.remoteNodeId = (nodeId1 != the_ownId ? nodeId1 : nodeId2);
conf.checksum = checksum;
conf.signalId = sendSignalId;
if(!tmp->get("SendLimit", &conf.sendLimit)) continue;
if(!tmp->get("SharedBufferSize", &conf.bufferSize)) continue;
if(the_ownId == nodeId1){
if(!tmp->get("Node1_NoOfAdapters", &conf.nLocalAdapters)) continue;
if(!tmp->get("Node2_Adapter", 0, &conf.remoteSciNodeId0)) continue;
if(conf.nLocalAdapters > 1){
if(!tmp->get("Node2_Adapter", 1, &conf.remoteSciNodeId1)) continue;
}
} else {
if(!tmp->get("Node2_NoOfAdapters", &conf.nLocalAdapters)) continue;
if(!tmp->get("Node1_Adapter", 0, &conf.remoteSciNodeId0)) continue;
if(conf.nLocalAdapters > 1){
if(!tmp->get("Node1_Adapter", 1, &conf.remoteSciNodeId1)) continue;
}
}
if(!theTransporterRegistry->createTransporter(&conf)){
ndbout << "Failed to create SCI Transporter from: "
<< conf.localNodeId << " to: " << conf.remoteNodeId << endl;
continue;
} else {
noOfTransportersCreated++;
continue;
}
}
if(!tmp->get("HostName1", &host1)) continue;
if(!tmp->get("HostName2", &host2)) continue;
Uint32 ownNodeId;
Uint32 remoteNodeId;
const char * ownHostName;
const char * remoteHostName;
if(nodeId1 == the_ownId){
ownNodeId = nodeId1;
ownHostName = host1;
remoteNodeId = nodeId2;
remoteHostName = host2;
} else if(nodeId2 == the_ownId){
ownNodeId = nodeId2;
ownHostName = host2;
remoteNodeId = nodeId1;
remoteHostName = host1;
} else {
continue;
}
if(strcmp("TCP", type) == 0){
TCP_TransporterConfiguration conf;
if(!tmp->get("PortNumber", &conf.port)) continue;
if(!tmp->get("SendBufferSize", &conf.sendBufferSize)) continue;
if(!tmp->get("MaxReceiveSize", &conf.maxReceiveSize)) continue;
const char * proxy;
if (tmp->get("Proxy", &proxy)) {
if (strlen(proxy) > 0 && nodeId2 == the_ownId) {
// TODO handle host:port
conf.port = atoi(proxy);
}
}
conf.sendBufferSize *= MAX_MESSAGE_SIZE;
conf.maxReceiveSize *= MAX_MESSAGE_SIZE;
conf.remoteHostName = remoteHostName;
conf.localHostName = ownHostName;
conf.remoteNodeId = remoteNodeId;
conf.localNodeId = ownNodeId;
conf.checksum = checksum;
conf.signalId = sendSignalId;
if(!theTransporterRegistry->createTransporter(&conf)){
ndbout << "Failed to create TCP Transporter from: "
<< ownNodeId << " to: " << remoteNodeId << endl;
} else {
noOfTransportersCreated++;
}
} else if(strcmp("OSE", type) == 0){
OSE_TransporterConfiguration conf;
if(!tmp->get("PrioASignalSize", &conf.prioASignalSize))
continue;
if(!tmp->get("PrioBSignalSize", &conf.prioBSignalSize))
continue;
if(!tmp->get("ReceiveArraySize", &conf.receiveBufferSize))
continue;
conf.remoteHostName = remoteHostName;
conf.localHostName = ownHostName;
conf.remoteNodeId = remoteNodeId;
conf.localNodeId = ownNodeId;
conf.checksum = checksum;
conf.signalId = sendSignalId;
if(!theTransporterRegistry->createTransporter(&conf)){
ndbout << "Failed to create OSE Transporter from: "
<< ownNodeId << " to: " << remoteNodeId << endl;
} else {
noOfTransportersCreated++;
}
} else {
continue;
}
}
return noOfTransportersCreated;
}
/**
* Supply a nodeId,
* and get next higher node id
......@@ -335,6 +166,8 @@ Uint32
IPCConfig::configureTransporters(Uint32 nodeId,
const class ndb_mgm_configuration & config,
class TransporterRegistry & tr){
TransporterConfiguration conf;
DBUG_ENTER("IPCConfig::configureTransporters");
Uint32 noOfTransportersCreated= 0;
......@@ -368,9 +201,35 @@ IPCConfig::configureTransporters(Uint32 nodeId,
Uint32 server_port= 0;
if(iter.get(CFG_CONNECTION_SERVER_PORT, &server_port)) break;
if (nodeId <= nodeId1 && nodeId <= nodeId2) {
/*
We check the node type. MGM node becomes server.
*/
Uint32 node1type, node2type;
ndb_mgm_configuration_iterator node1iter(config, CFG_SECTION_NODE);
ndb_mgm_configuration_iterator node2iter(config, CFG_SECTION_NODE);
node1iter.find(CFG_NODE_ID,nodeId1);
node2iter.find(CFG_NODE_ID,nodeId2);
node1iter.get(CFG_TYPE_OF_SECTION,&node1type);
node2iter.get(CFG_TYPE_OF_SECTION,&node2type);
conf.serverNodeId= (nodeId1 < nodeId2)? nodeId1:nodeId2;
conf.isMgmConnection= false;
if(node2type==NODE_TYPE_MGM)
{
conf.isMgmConnection= true;
conf.serverNodeId= nodeId2;
}
else if(node1type==NODE_TYPE_MGM)
{
conf.isMgmConnection= true;
conf.serverNodeId= nodeId1;
}
else if (nodeId == conf.serverNodeId) {
tr.add_transporter_interface(remoteNodeId, localHostName, server_port);
}
DBUG_PRINT("info", ("Transporter between this node %d and node %d using port %d, signalId %d, checksum %d",
nodeId, remoteNodeId, server_port, sendSignalId, checksum));
/*
......@@ -386,27 +245,24 @@ IPCConfig::configureTransporters(Uint32 nodeId,
if((int)server_port<0)
server_port= -server_port;
switch(type){
case CONNECTION_TYPE_SHM:{
SHM_TransporterConfiguration conf;
conf.localNodeId = nodeId;
conf.remoteNodeId = remoteNodeId;
conf.checksum = checksum;
conf.signalId = sendSignalId;
conf.port = server_port;
conf.localHostName = localHostName;
conf.remoteHostName = remoteHostName;
switch(type){
case CONNECTION_TYPE_SHM:
if(iter.get(CFG_SHM_KEY, &conf.shm.shmKey)) break;
if(iter.get(CFG_SHM_BUFFER_MEM, &conf.shm.shmSize)) break;
if(iter.get(CFG_SHM_KEY, &conf.shmKey)) break;
if(iter.get(CFG_SHM_BUFFER_MEM, &conf.shmSize)) break;
{
Uint32 tmp;
if(iter.get(CFG_SHM_SIGNUM, &tmp)) break;
conf.signum= tmp;
}
conf.port= server_port;
conf.localHostName = localHostName;
conf.remoteHostName = remoteHostName;
conf.shm.signum= tmp;
if(!tr.createTransporter(&conf)){
if(!tr.createSHMTransporter(&conf)){
DBUG_PRINT("error", ("Failed to create SHM Transporter from %d to %d",
conf.localNodeId, conf.remoteNodeId));
ndbout << "Failed to create SHM Transporter from: "
......@@ -414,60 +270,53 @@ IPCConfig::configureTransporters(Uint32 nodeId,
} else {
noOfTransportersCreated++;
}
DBUG_PRINT("info", ("Created SHM Transporter using shmkey %d, buf size = %d",
conf.shmKey, conf.shmSize));
break;
}
case CONNECTION_TYPE_SCI:{
SCI_TransporterConfiguration conf;
conf.localNodeId = nodeId;
conf.remoteNodeId = remoteNodeId;
conf.checksum = checksum;
conf.signalId = sendSignalId;
conf.port= server_port;
DBUG_PRINT("info", ("Created SHM Transporter using shmkey %d, "
"buf size = %d", conf.shm.shmKey, conf.shm.shmSize));
conf.localHostName = localHostName;
conf.remoteHostName = remoteHostName;
break;
if(iter.get(CFG_SCI_SEND_LIMIT, &conf.sendLimit)) break;
if(iter.get(CFG_SCI_BUFFER_MEM, &conf.bufferSize)) break;
case CONNECTION_TYPE_SCI:
if(iter.get(CFG_SCI_SEND_LIMIT, &conf.sci.sendLimit)) break;
if(iter.get(CFG_SCI_BUFFER_MEM, &conf.sci.bufferSize)) break;
if (nodeId == nodeId1) {
if(iter.get(CFG_SCI_HOST2_ID_0, &conf.remoteSciNodeId0)) break;
if(iter.get(CFG_SCI_HOST2_ID_1, &conf.remoteSciNodeId1)) break;
if(iter.get(CFG_SCI_HOST2_ID_0, &conf.sci.remoteSciNodeId0)) break;
if(iter.get(CFG_SCI_HOST2_ID_1, &conf.sci.remoteSciNodeId1)) break;
} else {
if(iter.get(CFG_SCI_HOST1_ID_0, &conf.remoteSciNodeId0)) break;
if(iter.get(CFG_SCI_HOST1_ID_1, &conf.remoteSciNodeId1)) break;
if(iter.get(CFG_SCI_HOST1_ID_0, &conf.sci.remoteSciNodeId0)) break;
if(iter.get(CFG_SCI_HOST1_ID_1, &conf.sci.remoteSciNodeId1)) break;
}
if (conf.remoteSciNodeId1 == 0) {
conf.nLocalAdapters = 1;
if (conf.sci.remoteSciNodeId1 == 0) {
conf.sci.nLocalAdapters = 1;
} else {
conf.nLocalAdapters = 2;
conf.sci.nLocalAdapters = 2;
}
if(!tr.createTransporter(&conf)){
if(!tr.createSCITransporter(&conf)){
DBUG_PRINT("error", ("Failed to create SCI Transporter from %d to %d",
conf.localNodeId, conf.remoteNodeId));
ndbout << "Failed to create SCI Transporter from: "
<< conf.localNodeId << " to: " << conf.remoteNodeId << endl;
} else {
DBUG_PRINT("info", ("Created SCI Transporter: Adapters = %d, remote SCI node id %d",
conf.nLocalAdapters, conf.remoteSciNodeId0));
DBUG_PRINT("info", ("Host 1 = %s, Host 2 = %s, sendLimit = %d, buf size = %d",
conf.localHostName, conf.remoteHostName, conf.sendLimit, conf.bufferSize));
if (conf.nLocalAdapters > 1) {
DBUG_PRINT("info", ("Fault-tolerant with 2 Remote Adapters, second remote SCI node id = %d",
conf.remoteSciNodeId1));
DBUG_PRINT("info", ("Created SCI Transporter: Adapters = %d, "
"remote SCI node id %d",
conf.sci.nLocalAdapters, conf.sci.remoteSciNodeId0));
DBUG_PRINT("info", ("Host 1 = %s, Host 2 = %s, sendLimit = %d, "
"buf size = %d", conf.localHostName,
conf.remoteHostName, conf.sci.sendLimit,
conf.sci.bufferSize));
if (conf.sci.nLocalAdapters > 1) {
DBUG_PRINT("info", ("Fault-tolerant with 2 Remote Adapters, "
"second remote SCI node id = %d",
conf.sci.remoteSciNodeId1));
}
noOfTransportersCreated++;
continue;
}
}
case CONNECTION_TYPE_TCP:{
TCP_TransporterConfiguration conf;
break;
if(iter.get(CFG_TCP_SEND_BUFFER_SIZE, &conf.sendBufferSize)) break;
if(iter.get(CFG_TCP_RECEIVE_BUFFER_SIZE, &conf.maxReceiveSize)) break;
case CONNECTION_TYPE_TCP:
if(iter.get(CFG_TCP_SEND_BUFFER_SIZE, &conf.tcp.sendBufferSize)) break;
if(iter.get(CFG_TCP_RECEIVE_BUFFER_SIZE, &conf.tcp.maxReceiveSize)) break;
conf.port= server_port;
const char * proxy;
if (!iter.get(CFG_TCP_PROXY, &proxy)) {
if (strlen(proxy) > 0 && nodeId2 == nodeId) {
......@@ -476,50 +325,35 @@ IPCConfig::configureTransporters(Uint32 nodeId,
}
}
conf.localNodeId = nodeId;
conf.remoteNodeId = remoteNodeId;
conf.localHostName = localHostName;
conf.remoteHostName = remoteHostName;
conf.checksum = checksum;
conf.signalId = sendSignalId;
if(!tr.createTransporter(&conf)){
if(!tr.createTCPTransporter(&conf)){
ndbout << "Failed to create TCP Transporter from: "
<< nodeId << " to: " << remoteNodeId << endl;
} else {
noOfTransportersCreated++;
}
DBUG_PRINT("info", ("Created TCP Transporter: sendBufferSize = %d, maxReceiveSize = %d",
conf.sendBufferSize, conf.maxReceiveSize));
DBUG_PRINT("info", ("Created TCP Transporter: sendBufferSize = %d, "
"maxReceiveSize = %d", conf.tcp.sendBufferSize,
conf.tcp.maxReceiveSize));
break;
case CONNECTION_TYPE_OSE:{
OSE_TransporterConfiguration conf;
if(iter.get(CFG_OSE_PRIO_A_SIZE, &conf.prioASignalSize)) break;
if(iter.get(CFG_OSE_PRIO_B_SIZE, &conf.prioBSignalSize)) break;
if(iter.get(CFG_OSE_RECEIVE_ARRAY_SIZE, &conf.receiveBufferSize)) break;
conf.localNodeId = nodeId;
conf.remoteNodeId = remoteNodeId;
conf.localHostName = localHostName;
conf.remoteHostName = remoteHostName;
conf.checksum = checksum;
conf.signalId = sendSignalId;
case CONNECTION_TYPE_OSE:
if(iter.get(CFG_OSE_PRIO_A_SIZE, &conf.ose.prioASignalSize)) break;
if(iter.get(CFG_OSE_PRIO_B_SIZE, &conf.ose.prioBSignalSize)) break;
if(!tr.createTransporter(&conf)){
if(!tr.createOSETransporter(&conf)){
ndbout << "Failed to create OSE Transporter from: "
<< nodeId << " to: " << remoteNodeId << endl;
} else {
noOfTransportersCreated++;
}
}
break;
default:
ndbout << "Unknown transporter type from: " << nodeId <<
" to: " << remoteNodeId << endl;
break;
}
}
}
} // switch
} // for
DBUG_RETURN(noOfTransportersCreated);
}
......@@ -32,6 +32,7 @@ OSE_Transporter::OSE_Transporter(int _prioASignalSize,
NodeId localNodeId,
const char * lHostName,
NodeId remoteNodeId,
NodeId serverNodeId,
const char * rHostName,
int byteorder,
bool compression,
......@@ -40,6 +41,7 @@ OSE_Transporter::OSE_Transporter(int _prioASignalSize,
Uint32 reportFreq) :
Transporter(localNodeId,
remoteNodeId,
serverNodeId,
byteorder,
compression,
checksum,
......
......@@ -48,6 +48,7 @@ public:
NodeId localNodeId,
const char * lHostName,
NodeId remoteNodeId,
NodeId serverNodeId,
const char * rHostName,
int byteorder,
bool compression,
......
......@@ -34,6 +34,7 @@ SCI_Transporter::SCI_Transporter(TransporterRegistry &t_reg,
const char *lHostName,
const char *rHostName,
int r_port,
bool isMgmConnection,
Uint32 packetSize,
Uint32 bufferSize,
Uint32 nAdapters,
......@@ -41,12 +42,13 @@ SCI_Transporter::SCI_Transporter(TransporterRegistry &t_reg,
Uint16 remoteSciNodeId1,
NodeId _localNodeId,
NodeId _remoteNodeId,
NodeId serverNodeId,
bool chksm,
bool signalId,
Uint32 reportFreq) :
Transporter(t_reg, tt_SCI_TRANSPORTER,
lHostName, rHostName, r_port, _localNodeId,
_remoteNodeId, 0, false, chksm, signalId)
lHostName, rHostName, r_port, isMgmConnection, _localNodeId,
_remoteNodeId, serverNodeID, 0, false, chksm, signalId)
{
DBUG_ENTER("SCI_Transporter::SCI_Transporter");
m_PacketSize = (packetSize + 3)/4 ;
......
......@@ -139,6 +139,7 @@ private:
const char *local_host,
const char *remote_host,
int port,
bool isMgmConnection,
Uint32 packetSize,
Uint32 bufferSize,
Uint32 nAdapters,
......@@ -146,6 +147,7 @@ private:
Uint16 remoteSciNodeId1,
NodeId localNodeID,
NodeId remoteNodeID,
NodeId serverNodeId,
bool checksum,
bool signalId,
Uint32 reportFreq = 4096);
......
......@@ -32,14 +32,17 @@ SHM_Transporter::SHM_Transporter(TransporterRegistry &t_reg,
const char *lHostName,
const char *rHostName,
int r_port,
bool isMgmConnection,
NodeId lNodeId,
NodeId rNodeId,
NodeId serverNodeId,
bool checksum,
bool signalId,
key_t _shmKey,
Uint32 _shmSize) :
Transporter(t_reg, tt_SHM_TRANSPORTER,
lHostName, rHostName, r_port, lNodeId, rNodeId,
lHostName, rHostName, r_port, isMgmConnection,
lNodeId, rNodeId, serverNodeId,
0, false, checksum, signalId),
shmKey(_shmKey),
shmSize(_shmSize)
......
......@@ -36,8 +36,10 @@ public:
const char *lHostName,
const char *rHostName,
int r_port,
bool isMgmConnection,
NodeId lNodeId,
NodeId rNodeId,
NodeId serverNodeId,
bool checksum,
bool signalId,
key_t shmKey,
......
......@@ -68,12 +68,15 @@ TCP_Transporter::TCP_Transporter(TransporterRegistry &t_reg,
const char *lHostName,
const char *rHostName,
int r_port,
bool isMgmConnection,
NodeId lNodeId,
NodeId rNodeId,
NodeId serverNodeId,
bool chksm, bool signalId,
Uint32 _reportFreq) :
Transporter(t_reg, tt_TCP_TRANSPORTER,
lHostName, rHostName, r_port, lNodeId, rNodeId,
lHostName, rHostName, r_port, isMgmConnection,
lNodeId, rNodeId, serverNodeId,
0, false, chksm, signalId),
m_sendBuffer(sendBufSize)
{
......
......@@ -50,8 +50,10 @@ private:
const char *lHostName,
const char *rHostName,
int r_port,
bool isMgmConnection,
NodeId lHostId,
NodeId rHostId,
NodeId serverNodeId,
bool checksum, bool signalId,
Uint32 reportFreq = 4096);
......
......@@ -32,12 +32,14 @@ Transporter::Transporter(TransporterRegistry &t_reg,
const char *lHostName,
const char *rHostName,
int r_port,
bool _isMgmConnection,
NodeId lNodeId,
NodeId rNodeId,
NodeId serverNodeId,
int _byteorder,
bool _compression, bool _checksum, bool _signalId)
: m_r_port(r_port), remoteNodeId(rNodeId), localNodeId(lNodeId),
isServer(lNodeId < rNodeId),
isServer(lNodeId==serverNodeId), isMgmConnection(_isMgmConnection),
m_packer(_signalId, _checksum),
m_type(_type),
m_transporter_registry(t_reg)
......@@ -109,22 +111,46 @@ Transporter::connect_server(NDB_SOCKET_TYPE sockfd) {
bool
Transporter::connect_client() {
NDB_SOCKET_TYPE sockfd;
if(m_connected)
return true;
NDB_SOCKET_TYPE sockfd = m_socket_client->connect();
if (sockfd == NDB_INVALID_SOCKET)
DBUG_ENTER("Transporter::connect_client");
DBUG_PRINT("info",("port %d isMgmConnection=%d",m_r_port,isMgmConnection));
if(isMgmConnection)
sockfd= m_socket_client->connect_without_auth();
else
sockfd= m_socket_client->connect();
if(sockfd<0)
return false;
DBUG_ENTER("Transporter::connect_client");
SocketOutputStream s_output(sockfd);
SocketInputStream s_input(sockfd);
if(isMgmConnection)
{
/*
We issue the magic command to the management server to
switch into transporter mode.
*/
s_output.println("transporter connect");
s_output.println("");
}
if (sockfd == NDB_INVALID_SOCKET)
return false;
// send info about own id
// send info about own transporter type
SocketOutputStream s_output(sockfd);
s_output.println("%d %d", localNodeId, m_type);
// get remote id
int nodeId, remote_transporter_type= -1;
SocketInputStream s_input(sockfd);
char buf[256];
if (s_input.gets(buf, 256) == 0) {
NDB_CLOSE_SOCKET(sockfd);
......
......@@ -89,8 +89,10 @@ protected:
const char *lHostName,
const char *rHostName,
int r_port,
bool isMgmConnection,
NodeId lNodeId,
NodeId rNodeId,
NodeId serverNodeId,
int byteorder,
bool compression,
bool checksum,
......@@ -133,6 +135,12 @@ protected:
private:
/**
* means that we transform an MGM connection into
* a transporter connection
*/
bool isMgmConnection;
SocketClient *m_socket_client;
protected:
......
......@@ -248,7 +248,7 @@ TransporterRegistry::connect_server(NDB_SOCKET_TYPE sockfd)
}
bool
TransporterRegistry::createTransporter(TCP_TransporterConfiguration *config) {
TransporterRegistry::createTCPTransporter(TransporterConfiguration *config) {
#ifdef NDB_TCP_TRANSPORTER
if(!nodeIdSpecified){
......@@ -262,13 +262,15 @@ TransporterRegistry::createTransporter(TCP_TransporterConfiguration *config) {
return false;
TCP_Transporter * t = new TCP_Transporter(*this,
config->sendBufferSize,
config->maxReceiveSize,
config->tcp.sendBufferSize,
config->tcp.maxReceiveSize,
config->localHostName,
config->remoteHostName,
config->port,
config->isMgmConnection,
localNodeId,
config->remoteNodeId,
config->serverNodeId,
config->checksum,
config->signalId);
if (t == NULL)
......@@ -297,7 +299,7 @@ TransporterRegistry::createTransporter(TCP_TransporterConfiguration *config) {
}
bool
TransporterRegistry::createTransporter(OSE_TransporterConfiguration *conf) {
TransporterRegistry::createOSETransporter(TransporterConfiguration *conf) {
#ifdef NDB_OSE_TRANSPORTER
if(!nodeIdSpecified){
......@@ -316,11 +318,12 @@ TransporterRegistry::createTransporter(OSE_TransporterConfiguration *conf) {
localNodeId);
}
OSE_Transporter * t = new OSE_Transporter(conf->prioASignalSize,
conf->prioBSignalSize,
OSE_Transporter * t = new OSE_Transporter(conf->ose.prioASignalSize,
conf->ose.prioBSignalSize,
localNodeId,
conf->localHostName,
conf->remoteNodeId,
conf->serverNodeId,
conf->remoteHostName,
conf->checksum,
conf->signalId);
......@@ -346,7 +349,7 @@ TransporterRegistry::createTransporter(OSE_TransporterConfiguration *conf) {
}
bool
TransporterRegistry::createTransporter(SCI_TransporterConfiguration *config) {
TransporterRegistry::createSCITransporter(TransporterConfiguration *config) {
#ifdef NDB_SCI_TRANSPORTER
if(!SCI_Transporter::initSCI())
......@@ -366,13 +369,15 @@ TransporterRegistry::createTransporter(SCI_TransporterConfiguration *config) {
config->localHostName,
config->remoteHostName,
config->port,
config->sendLimit,
config->bufferSize,
config->nLocalAdapters,
config->remoteSciNodeId0,
config->remoteSciNodeId1,
config->isMgmConnection,
config->sci.sendLimit,
config->sci.bufferSize,
config->sci.nLocalAdapters,
config->sci.remoteSciNodeId0,
config->sci.remoteSciNodeId1,
localNodeId,
config->remoteNodeId,
config->serverNodeId,
config->checksum,
config->signalId);
......@@ -397,7 +402,7 @@ TransporterRegistry::createTransporter(SCI_TransporterConfiguration *config) {
}
bool
TransporterRegistry::createTransporter(SHM_TransporterConfiguration *config) {
TransporterRegistry::createSHMTransporter(TransporterConfiguration *config) {
DBUG_ENTER("TransporterRegistry::createTransporter SHM");
#ifdef NDB_SHM_TRANSPORTER
if(!nodeIdSpecified){
......@@ -408,7 +413,7 @@ TransporterRegistry::createTransporter(SHM_TransporterConfiguration *config) {
return false;
if (!g_ndb_shm_signum) {
g_ndb_shm_signum= config->signum;
g_ndb_shm_signum= config->shm.signum;
DBUG_PRINT("info",("Block signum %d",g_ndb_shm_signum));
/**
* Make sure to block g_ndb_shm_signum
......@@ -420,7 +425,7 @@ TransporterRegistry::createTransporter(SHM_TransporterConfiguration *config) {
pthread_sigmask(SIG_BLOCK, &mask, 0);
}
if(config->signum != g_ndb_shm_signum)
if(config->shm.signum != g_ndb_shm_signum)
return false;
if(theTransporters[config->remoteNodeId] != NULL)
......@@ -430,12 +435,14 @@ TransporterRegistry::createTransporter(SHM_TransporterConfiguration *config) {
config->localHostName,
config->remoteHostName,
config->port,
config->isMgmConnection,
localNodeId,
config->remoteNodeId,
config->serverNodeId,
config->checksum,
config->signalId,
config->shmKey,
config->shmSize
config->shm.shmKey,
config->shm.shmSize
);
if (t == NULL)
return false;
......
......@@ -60,6 +60,27 @@ SocketClient::init()
return true;
}
/**
* SocketClient::connect_without_auth()
*
* Temporarily disables authentication and connects.
* This is useful if you're trying to change what this
* SocketClient object is for (e.g. from mgm to ndb)
*/
NDB_SOCKET_TYPE
SocketClient::connect_without_auth()
{
SocketAuthenticator *tmp;
NDB_SOCKET_TYPE retval;
tmp= m_auth;
m_auth= NULL;
retval= connect();
m_auth= tmp;
return retval;
}
NDB_SOCKET_TYPE
SocketClient::connect()
{
......
......@@ -333,12 +333,19 @@ sessionThread_C(void* _sc){
return 0;
}
/**
* may have m_stopped set if we're transforming a mgm
* connection into a transporter connection.
*/
if(!si->m_stopped)
{
if(!si->m_stop){
si->m_stopped = false;
si->runSession();
} else {
NDB_CLOSE_SOCKET(si->m_socket);
}
}
si->m_stopped = true;
my_thread_end();
......
......@@ -3132,8 +3132,8 @@ fixPortNumber(InitConfigFileParser::Context & ctx, const char * data){
const Properties * node;
require(ctx.m_config->get("Node", id1, &node));
BaseString hostname(hostName1);
// require(node->get("HostName", hostname));
if (hostname.c_str()[0] == 0) {
ctx.reportError("Hostname required on nodeid %d since it will "
......@@ -3142,6 +3142,19 @@ fixPortNumber(InitConfigFileParser::Context & ctx, const char * data){
}
Uint32 port= 0;
const char * type1;
const char * type2;
const Properties * node2;
node->get("Type", &type1);
ctx.m_config->get("Node", id2, &node2);
node2->get("Type", &type2);
if(strcmp(type1, MGM_TOKEN)==0)
node->get("PortNumber",&port);
else if(strcmp(type2, MGM_TOKEN)==0)
node2->get("PortNumber",&port);
if (!node->get("ServerPort", &port) &&
!ctx.m_userProperties.get("ServerPort_", id1, &port)) {
ctx.m_currentSection->put("PortNumber", port);
......
......@@ -2883,6 +2883,11 @@ MgmtSrvr::getConnectionDbParameter(int node1,
DBUG_RETURN(1);
}
void MgmtSrvr::transporter_connect(NDB_SOCKET_TYPE sockfd)
{
theFacade->get_registry()->connect_server(sockfd);
}
int MgmtSrvr::set_connect_string(const char *str)
{
return ndb_mgm_set_connectstring(m_config_retriever->get_mgmHandle(),str);
......
......@@ -515,6 +515,8 @@ public:
int set_connect_string(const char *str);
void transporter_connect(NDB_SOCKET_TYPE sockfd);
ConfigRetriever *get_config_retriever() { return m_config_retriever; };
const char *get_connect_address(Uint32 node_id) { return inet_ntoa(m_connect_address[node_id]); }
......
......@@ -264,6 +264,8 @@ ParserRow<MgmApiSession> commands[] = {
MGM_CMD("check connection", &MgmApiSession::check_connection, ""),
MGM_CMD("transporter connect", &MgmApiSession::transporter_connect, ""),
MGM_END()
};
......@@ -1538,5 +1540,17 @@ MgmApiSession::check_connection(Parser_t::Context &ctx,
m_output->println("");
}
void
MgmApiSession::transporter_connect(Parser_t::Context &ctx,
Properties const &args) {
NDB_SOCKET_TYPE s= m_socket;
m_stop= true;
m_stopped= true; // force a stop (no closing socket)
m_socket= -1; // so nobody closes it
m_mgmsrv.transporter_connect(s);
}
template class MutexVector<int>;
template class Vector<ParserRow<MgmApiSession> const*>;
......@@ -98,6 +98,8 @@ public:
void purge_stale_sessions(Parser_t::Context &ctx, const class Properties &args);
void check_connection(Parser_t::Context &ctx, const class Properties &args);
void transporter_connect(Parser_t::Context &ctx, Properties const &args);
void repCommand(Parser_t::Context &ctx, const class Properties &args);
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment