Commit f44c3ee9 authored by unknown's avatar unknown

Merge tulin@bk-internal.mysql.com:/home/bk/mysql-4.1-ndb

into poseidon.bredbandsbolaget.se:/home/tomas/mysql-4.1-ndb


ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp:
  Auto merged
ndb/src/kernel/main.cpp:
  Auto merged
ndb/src/mgmsrv/MgmtSrvr.cpp:
  Auto merged
ndb/src/ndbapi/ClusterMgr.cpp:
  Auto merged
ndb/src/ndbapi/TransporterFacade.cpp:
  Auto merged
ndb/src/ndbapi/TransporterFacade.hpp:
  Auto merged
parents 190d9e72 dc2544fd
...@@ -80,13 +80,15 @@ class ApiRegConf { ...@@ -80,13 +80,15 @@ class ApiRegConf {
friend class ClusterMgr; friend class ClusterMgr;
public: public:
STATIC_CONST( SignalLength = 3 + NodeState::DataLength ); STATIC_CONST( SignalLength = 3 + NodeState::DataLength +
NdbNodeBitmask::Size );
private: private:
Uint32 qmgrRef; Uint32 qmgrRef;
Uint32 version; // Version of NDB node Uint32 version; // Version of NDB node
Uint32 apiHeartbeatFrequency; Uint32 apiHeartbeatFrequency;
NodeState nodeState; NodeState nodeState;
Bitmask<NdbNodeBitmask::Size>::Data connected_nodes;
}; };
#endif #endif
...@@ -666,6 +666,11 @@ extern "C" { ...@@ -666,6 +666,11 @@ extern "C" {
*/ */
struct ndb_mgm_configuration * ndb_mgm_get_configuration(NdbMgmHandle handle, struct ndb_mgm_configuration * ndb_mgm_get_configuration(NdbMgmHandle handle,
unsigned version); unsigned version);
int ndb_mgm_alloc_nodeid(NdbMgmHandle handle,
unsigned version,
unsigned *pnodeid,
int nodetype);
/** /**
* Config iterator * Config iterator
*/ */
......
...@@ -76,6 +76,8 @@ ...@@ -76,6 +76,8 @@
#define CFG_DB_DISCLESS 148 #define CFG_DB_DISCLESS 148
#define CFG_DB_SERVER_PORT 149
#define CFG_NODE_ARBIT_RANK 200 #define CFG_NODE_ARBIT_RANK 200
#define CFG_NODE_ARBIT_DELAY 201 #define CFG_NODE_ARBIT_DELAY 201
......
...@@ -77,7 +77,7 @@ public: ...@@ -77,7 +77,7 @@ public:
* Get config using socket * Get config using socket
*/ */
struct ndb_mgm_configuration * getConfig(const char * mgmhost, short port, struct ndb_mgm_configuration * getConfig(const char * mgmhost, short port,
int versionId); int versionId, int nodetype);
/** /**
* Get config from file * Get config from file
*/ */
......
...@@ -64,7 +64,7 @@ typedef int socklen_t; ...@@ -64,7 +64,7 @@ typedef int socklen_t;
#define NDB_NONBLOCK O_NONBLOCK #define NDB_NONBLOCK O_NONBLOCK
#define NDB_SOCKET_TYPE int #define NDB_SOCKET_TYPE int
#define NDB_INVALID_SOCKET -1 #define NDB_INVALID_SOCKET -1
#define NDB_CLOSE_SOCKET(x) close(x) #define NDB_CLOSE_SOCKET(x) ::close(x)
#define InetErrno errno #define InetErrno errno
......
...@@ -29,20 +29,10 @@ ...@@ -29,20 +29,10 @@
#define TransporterRegistry_H #define TransporterRegistry_H
#include "TransporterDefinitions.hpp" #include "TransporterDefinitions.hpp"
#include <SocketServer.hpp>
#include <NdbTCP.h> #include <NdbTCP.h>
// A transporter is always in a PerformState.
// PerformIO is used initially and as long as any of the events
// PerformConnect, ...
enum PerformState {
PerformNothing = 4, // Does nothing
PerformIO = 0, // Is connected
PerformConnect = 1, // Is trying to connect
PerformDisconnect = 2, // Trying to disconnect
RemoveTransporter = 3 // Will be removed
};
// A transporter is always in an IOState. // A transporter is always in an IOState.
// NoHalt is used initially and as long as it is no restrictions on // NoHalt is used initially and as long as it is no restrictions on
// sending or receiving. // sending or receiving.
...@@ -60,18 +50,45 @@ enum TransporterType { ...@@ -60,18 +50,45 @@ enum TransporterType {
tt_OSE_TRANSPORTER = 4 tt_OSE_TRANSPORTER = 4
}; };
static const char *performStateString[] =
{ "is connected",
"is trying to connect",
"does nothing",
"is trying to disconnect" };
class Transporter; class Transporter;
class TCP_Transporter; class TCP_Transporter;
class SCI_Transporter; class SCI_Transporter;
class SHM_Transporter; class SHM_Transporter;
class OSE_Transporter; class OSE_Transporter;
class TransporterRegistry;
class SocketAuthenticator;
class TransporterService : public SocketServer::Service {
SocketAuthenticator * m_auth;
TransporterRegistry * m_transporter_registry;
public:
TransporterService(SocketAuthenticator *auth= 0)
{
m_auth= auth;
m_transporter_registry= 0;
}
void setTransporterRegistry(TransporterRegistry *t)
{
m_transporter_registry= t;
}
SocketServer::Session * newSession(NDB_SOCKET_TYPE socket);
};
/** /**
* @class TransporterRegistry * @class TransporterRegistry
* @brief ... * @brief ...
*/ */
class TransporterRegistry { class TransporterRegistry {
friend class OSE_Receiver; friend class OSE_Receiver;
friend class Transporter;
friend class TransporterService;
public: public:
/** /**
* Constructor * Constructor
...@@ -98,6 +115,12 @@ public: ...@@ -98,6 +115,12 @@ public:
*/ */
~TransporterRegistry(); ~TransporterRegistry();
bool start_service(SocketServer& server);
bool start_clients();
bool stop_clients();
void start_clients_thread();
void update_connections();
/** /**
* Start/Stop receiving * Start/Stop receiving
*/ */
...@@ -110,16 +133,26 @@ public: ...@@ -110,16 +133,26 @@ public:
void startSending(); void startSending();
void stopSending(); void stopSending();
/** // A transporter is always in a PerformState.
* Get and set methods for PerformState // PerformIO is used initially and as long as any of the events
*/ // PerformConnect, ...
PerformState performState(NodeId nodeId); enum PerformState {
void setPerformState(NodeId nodeId, PerformState state); CONNECTED = 0,
CONNECTING = 1,
DISCONNECTED = 2,
DISCONNECTING = 3
};
const char *getPerformStateString(NodeId nodeId) const
{ return performStateString[(unsigned)performStates[nodeId]]; };
/** /**
* Set perform state for all transporters * Get and set methods for PerformState
*/ */
void setPerformState(PerformState state); void do_connect(NodeId node_id);
void do_disconnect(NodeId node_id);
bool is_connected(NodeId node_id) { return performStates[node_id] == CONNECTED; };
void report_connect(NodeId node_id);
void report_disconnect(NodeId node_id, int errnum);
/** /**
* Get and set methods for IOState * Get and set methods for IOState
...@@ -174,8 +207,6 @@ public: ...@@ -174,8 +207,6 @@ public:
void performReceive(); void performReceive();
void performSend(); void performSend();
void checkConnections();
/** /**
* Force sending if more than or equal to sendLimit * Force sending if more than or equal to sendLimit
* number have asked for send. Returns 0 if not sending * number have asked for send. Returns 0 if not sending
...@@ -192,6 +223,12 @@ protected: ...@@ -192,6 +223,12 @@ protected:
private: private:
void * callbackObj; void * callbackObj;
TransporterService *m_transporter_service;
unsigned short m_service_port;
char *m_interface_name;
struct NdbThread *m_start_clients_thread;
bool m_run_start_clients_thread;
int sendCounter; int sendCounter;
NodeId localNodeId; NodeId localNodeId;
bool nodeIdSpecified; bool nodeIdSpecified;
...@@ -202,11 +239,6 @@ private: ...@@ -202,11 +239,6 @@ private:
int nSHMTransporters; int nSHMTransporters;
int nOSETransporters; int nOSETransporters;
int m_ccCount;
int m_ccIndex;
int m_ccStep;
int m_nTransportersPerformConnect;
bool m_ccReady;
/** /**
* Arrays holding all transporters in the order they are created * Arrays holding all transporters in the order they are created
*/ */
......
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef SOCKET_AUTHENTICATOR_HPP
#define SOCKET_AUTHENTICATOR_HPP
class SocketAuthenticator
{
public:
virtual ~SocketAuthenticator() {};
virtual bool client_authenticate(int sockfd) = 0;
virtual bool server_authenticate(int sockfd) = 0;
};
class SocketAuthSimple : public SocketAuthenticator
{
const char *m_passwd;
char *m_buf;
public:
SocketAuthSimple(const char *passwd);
virtual ~SocketAuthSimple();
virtual bool client_authenticate(int sockfd);
virtual bool server_authenticate(int sockfd);
};
#endif // SOCKET_AUTHENTICATOR_HPP
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef SOCKET_CLIENT_HPP
#define SOCKET_CLIENT_HPP
#include <NdbTCP.h>
class SocketAuthenticator;
class SocketClient
{
NDB_SOCKET_TYPE m_sockfd;
struct sockaddr_in m_servaddr;
unsigned short m_port;
char *m_server_name;
SocketAuthenticator *m_auth;
public:
SocketClient(const char *server_name, unsigned short port, SocketAuthenticator *sa = 0);
~SocketClient();
bool init();
NDB_SOCKET_TYPE connect();
bool close();
};
#endif // SOCKET_ClIENT_HPP
...@@ -146,13 +146,17 @@ const int ConfigInfo::m_NoOfRules = sizeof(m_SectionRules)/sizeof(SectionRule); ...@@ -146,13 +146,17 @@ const int ConfigInfo::m_NoOfRules = sizeof(m_SectionRules)/sizeof(SectionRule);
/**************************************************************************** /****************************************************************************
* Config Rules declarations * Config Rules declarations
****************************************************************************/ ****************************************************************************/
bool addNodeConnections(Vector<ConfigInfo::ConfigRuleSection>&sections, bool add_node_connections(Vector<ConfigInfo::ConfigRuleSection>&sections,
struct InitConfigFileParser::Context &ctx, struct InitConfigFileParser::Context &ctx,
const char * ruleData); const char * rule_data);
bool add_db_ports(Vector<ConfigInfo::ConfigRuleSection>&sections,
struct InitConfigFileParser::Context &ctx,
const char * rule_data);
const ConfigInfo::ConfigRule const ConfigInfo::ConfigRule
ConfigInfo::m_ConfigRules[] = { ConfigInfo::m_ConfigRules[] = {
{ addNodeConnections, 0 }, { add_node_connections, 0 },
{ add_db_ports, 0 },
{ 0, 0 } { 0, 0 }
}; };
...@@ -376,6 +380,18 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { ...@@ -376,6 +380,18 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
1, 1,
(MAX_NODES - 1) }, (MAX_NODES - 1) },
{
CFG_DB_SERVER_PORT,
"ServerPort",
"DB",
"Port used to setup transporter",
ConfigInfo::USED,
false,
ConfigInfo::INT,
2202,
0,
0x7FFFFFFF },
{ {
CFG_DB_NO_REPLICAS, CFG_DB_NO_REPLICAS,
"NoOfReplicas", "NoOfReplicas",
...@@ -1231,7 +1247,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { ...@@ -1231,7 +1247,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
ConfigInfo::USED, ConfigInfo::USED,
false, false,
ConfigInfo::STRING, ConfigInfo::STRING,
MANDATORY, 0,
0, 0,
0x7FFFFFFF }, 0x7FFFFFFF },
...@@ -1330,7 +1346,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { ...@@ -1330,7 +1346,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
ConfigInfo::USED, ConfigInfo::USED,
false, false,
ConfigInfo::STRING, ConfigInfo::STRING,
MANDATORY, 0,
0, 0,
0x7FFFFFFF }, 0x7FFFFFFF },
...@@ -2510,10 +2526,14 @@ fixNodeHostname(InitConfigFileParser::Context & ctx, const char * data){ ...@@ -2510,10 +2526,14 @@ fixNodeHostname(InitConfigFileParser::Context & ctx, const char * data){
const char * compId; const char * compId;
if(!ctx.m_currentSection->get("ExecuteOnComputer", &compId)){ if(!ctx.m_currentSection->get("ExecuteOnComputer", &compId)){
require(ctx.m_currentSection->put("HostName", ""));
return true;
#if 0
ctx.reportError("Parameter \"ExecuteOnComputer\" missing from section " ctx.reportError("Parameter \"ExecuteOnComputer\" missing from section "
"[%s] starting at line: %d", "[%s] starting at line: %d",
ctx.fname, ctx.m_sectionLineno); ctx.fname, ctx.m_sectionLineno);
return false; return false;
#endif
} }
const Properties * computer; const Properties * computer;
...@@ -3158,9 +3178,9 @@ saveInConfigValues(InitConfigFileParser::Context & ctx, const char * data){ ...@@ -3158,9 +3178,9 @@ saveInConfigValues(InitConfigFileParser::Context & ctx, const char * data){
} }
bool bool
addNodeConnections(Vector<ConfigInfo::ConfigRuleSection>&sections, add_node_connections(Vector<ConfigInfo::ConfigRuleSection>&sections,
struct InitConfigFileParser::Context &ctx, struct InitConfigFileParser::Context &ctx,
const char * ruleData) const char * rule_data)
{ {
Properties * props= ctx.m_config; Properties * props= ctx.m_config;
Properties p_connections; Properties p_connections;
...@@ -3241,3 +3261,10 @@ addNodeConnections(Vector<ConfigInfo::ConfigRuleSection>&sections, ...@@ -3241,3 +3261,10 @@ addNodeConnections(Vector<ConfigInfo::ConfigRuleSection>&sections,
return true; return true;
} }
bool add_db_ports(Vector<ConfigInfo::ConfigRuleSection>&sections,
struct InitConfigFileParser::Context &ctx,
const char * rule_data)
{
return true;
}
...@@ -114,7 +114,8 @@ ConfigRetriever::getConfig(int verId, int nodeType) { ...@@ -114,7 +114,8 @@ ConfigRetriever::getConfig(int verId, int nodeType) {
struct ndb_mgm_configuration * p = 0; struct ndb_mgm_configuration * p = 0;
switch(m->type){ switch(m->type){
case MgmId_TCP: case MgmId_TCP:
p = getConfig(m->data.tcp.remoteHost, m->data.tcp.port, verId); p = getConfig(m->data.tcp.remoteHost, m->data.tcp.port,
verId, nodeType);
break; break;
case MgmId_File: case MgmId_File:
p = getConfig(m->data.file.filename, verId); p = getConfig(m->data.file.filename, verId);
...@@ -155,7 +156,8 @@ ConfigRetriever::getConfig(int verId, int nodeType) { ...@@ -155,7 +156,8 @@ ConfigRetriever::getConfig(int verId, int nodeType) {
ndb_mgm_configuration * ndb_mgm_configuration *
ConfigRetriever::getConfig(const char * mgmhost, ConfigRetriever::getConfig(const char * mgmhost,
short port, short port,
int versionId){ int versionId,
int nodetype){
NdbMgmHandle h; NdbMgmHandle h;
h = ndb_mgm_create_handle(); h = ndb_mgm_create_handle();
...@@ -175,6 +177,21 @@ ConfigRetriever::getConfig(const char * mgmhost, ...@@ -175,6 +177,21 @@ ConfigRetriever::getConfig(const char * mgmhost,
ndb_mgm_configuration * conf = ndb_mgm_get_configuration(h, versionId); ndb_mgm_configuration * conf = ndb_mgm_get_configuration(h, versionId);
if(conf == 0){ if(conf == 0){
setError(CR_ERROR, ndb_mgm_get_latest_error_desc(h)); setError(CR_ERROR, ndb_mgm_get_latest_error_desc(h));
ndb_mgm_destroy_handle(&h);
return 0;
}
{
unsigned nodeid= getOwnNodeId();
int res= ndb_mgm_alloc_nodeid(h, versionId, &nodeid, nodetype);
if(res != 0) {
setError(CR_ERROR, ndb_mgm_get_latest_error_desc(h));
ndb_mgm_destroy_handle(&h);
return 0;
}
_ownNodeId= nodeid;
} }
ndb_mgm_disconnect(h); ndb_mgm_disconnect(h);
...@@ -329,6 +346,9 @@ ConfigRetriever::verifyConfig(const struct ndb_mgm_configuration * conf, ...@@ -329,6 +346,9 @@ ConfigRetriever::verifyConfig(const struct ndb_mgm_configuration * conf,
} }
do { do {
if(strlen(hostname) == 0)
break;
if(strcasecmp(hostname, localhost) == 0) if(strcasecmp(hostname, localhost) == 0)
break; break;
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
LocalConfig::LocalConfig(){ LocalConfig::LocalConfig(){
ids = 0; size = 0; items = 0; ids = 0; size = 0; items = 0;
error_line = 0; error_msg[0] = 0; error_line = 0; error_msg[0] = 0;
_ownNodeId= 0;
} }
bool bool
...@@ -95,6 +96,11 @@ LocalConfig::init(bool onlyNodeId, ...@@ -95,6 +96,11 @@ LocalConfig::init(bool onlyNodeId,
return false; return false;
} }
//7. Check
if(readConnectString("host=localhost:2200", onlyNodeId)){
return true;
}
setError(0, ""); setError(0, "");
return false; return false;
......
...@@ -63,27 +63,23 @@ ndbstrerror::~ndbstrerror(void) ...@@ -63,27 +63,23 @@ ndbstrerror::~ndbstrerror(void)
#define ndbstrerror strerror #define ndbstrerror strerror
#endif #endif
TCP_Transporter::TCP_Transporter(int sendBufSize, int maxRecvSize, TCP_Transporter::TCP_Transporter(TransporterRegistry &t_reg,
int portNo, int sendBufSize, int maxRecvSize,
const char *rHostName,
const char *lHostName, const char *lHostName,
NodeId rNodeId, NodeId lNodeId, const char *rHostName,
int r_port,
NodeId lNodeId,
NodeId rNodeId,
int byte_order, int byte_order,
bool compr, bool chksm, bool signalId, bool compr, bool chksm, bool signalId,
Uint32 _reportFreq) : Uint32 _reportFreq) :
Transporter(lNodeId, rNodeId, byte_order, compr, chksm, signalId), Transporter(t_reg, lHostName, rHostName, r_port, lNodeId, rNodeId,
m_sendBuffer(sendBufSize), byte_order, compr, chksm, signalId),
isServer(lNodeId < rNodeId), m_sendBuffer(sendBufSize)
port(portNo)
{ {
maxReceiveSize = maxRecvSize; maxReceiveSize = maxRecvSize;
strncpy(remoteHostName, rHostName, sizeof(remoteHostName));
// Initialize member variables // Initialize member variables
Ndb_getInAddr(&remoteHostAddress, rHostName);
Ndb_getInAddr(&localHostAddress, lHostName);
theSocket = NDB_INVALID_SOCKET; theSocket = NDB_INVALID_SOCKET;
sendCount = receiveCount = 0; sendCount = receiveCount = 0;
...@@ -108,6 +104,24 @@ TCP_Transporter::~TCP_Transporter() { ...@@ -108,6 +104,24 @@ TCP_Transporter::~TCP_Transporter() {
receiveBuffer.destroy(); receiveBuffer.destroy();
} }
bool TCP_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
{
return connect_common(sockfd);
}
bool TCP_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
{
return connect_common(sockfd);
}
bool TCP_Transporter::connect_common(NDB_SOCKET_TYPE sockfd)
{
theSocket = sockfd;
setSocketOptions();
setSocketNonBlocking(theSocket);
return true;
}
bool bool
TCP_Transporter::initTransporter() { TCP_Transporter::initTransporter() {
...@@ -316,7 +330,7 @@ TCP_Transporter::doSend() { ...@@ -316,7 +330,7 @@ TCP_Transporter::doSend() {
sendCount ++; sendCount ++;
sendSize += nBytesSent; sendSize += nBytesSent;
if(sendCount == reportFreq){ if(sendCount == reportFreq){
reportSendLen(callbackObj,remoteNodeId, sendCount, sendSize); reportSendLen(get_callback_obj(), remoteNodeId, sendCount, sendSize);
sendCount = 0; sendCount = 0;
sendSize = 0; sendSize = 0;
} }
...@@ -331,7 +345,7 @@ TCP_Transporter::doSend() { ...@@ -331,7 +345,7 @@ TCP_Transporter::doSend() {
#endif #endif
if(DISCONNECT_ERRNO(InetErrno, nBytesSent)){ if(DISCONNECT_ERRNO(InetErrno, nBytesSent)){
doDisconnect(); doDisconnect();
reportDisconnect(callbackObj, remoteNodeId, InetErrno); report_disconnect(InetErrno);
} }
return false; return false;
...@@ -361,14 +375,15 @@ TCP_Transporter::doReceive() { ...@@ -361,14 +375,15 @@ TCP_Transporter::doReceive() {
#endif #endif
ndbout_c("receiveBuffer.sizeOfData(%d) > receiveBuffer.sizeOfBuffer(%d)", ndbout_c("receiveBuffer.sizeOfData(%d) > receiveBuffer.sizeOfBuffer(%d)",
receiveBuffer.sizeOfData, receiveBuffer.sizeOfBuffer); receiveBuffer.sizeOfData, receiveBuffer.sizeOfBuffer);
reportError(callbackObj, remoteNodeId, TE_INVALID_MESSAGE_LENGTH); report_error(TE_INVALID_MESSAGE_LENGTH);
return 0; return 0;
} }
receiveCount ++; receiveCount ++;
receiveSize += nBytesRead; receiveSize += nBytesRead;
if(receiveCount == reportFreq){ if(receiveCount == reportFreq){
reportReceiveLen(callbackObj, remoteNodeId, receiveCount, receiveSize); reportReceiveLen(get_callback_obj(), remoteNodeId, receiveCount, receiveSize);
receiveCount = 0; receiveCount = 0;
receiveSize = 0; receiveSize = 0;
} }
...@@ -384,60 +399,17 @@ TCP_Transporter::doReceive() { ...@@ -384,60 +399,17 @@ TCP_Transporter::doReceive() {
if(DISCONNECT_ERRNO(InetErrno, nBytesRead)){ if(DISCONNECT_ERRNO(InetErrno, nBytesRead)){
// The remote node has closed down // The remote node has closed down
doDisconnect(); doDisconnect();
reportDisconnect(callbackObj, remoteNodeId,InetErrno); report_disconnect(InetErrno);
} }
} }
return nBytesRead; return nBytesRead;
} }
bool
TCP_Transporter::connectImpl(Uint32 timeOutMillis){
struct timeval timeout = {0, 0};
timeout.tv_sec = timeOutMillis / 1000;
timeout.tv_usec = (timeOutMillis % 1000)*1000;
bool retVal = false;
if(isServer){
if(theSocket == NDB_INVALID_SOCKET){
startTCPServer();
}
if(theSocket == NDB_INVALID_SOCKET)
{
NdbSleep_MilliSleep(timeOutMillis);
return false;
}
retVal = acceptClient(&timeout);
} else {
// Is client
retVal = connectClient(&timeout);
}
if(!retVal) {
NdbSleep_MilliSleep(timeOutMillis);
return false;
}
#if defined NDB_OSE || defined NDB_SOFTOSE
if(setsockopt(theSocket, SOL_SOCKET, SO_OSEOWNER,
&theReceiverPid, sizeof(PROCESS)) != 0){
ndbout << "Failed to transfer ownership of socket" << endl;
NDB_CLOSE_SOCKET(theSocket);
theSocket = -1;
return false;
}
#endif
return true;
}
void void
TCP_Transporter::disconnectImpl() { TCP_Transporter::disconnectImpl() {
if(theSocket != NDB_INVALID_SOCKET){ if(theSocket != NDB_INVALID_SOCKET){
if(NDB_CLOSE_SOCKET(theSocket) < 0){ if(NDB_CLOSE_SOCKET(theSocket) < 0){
reportError(callbackObj, remoteNodeId, TE_ERROR_CLOSING_SOCKET); report_error(TE_ERROR_CLOSING_SOCKET);
} }
} }
...@@ -447,155 +419,3 @@ TCP_Transporter::disconnectImpl() { ...@@ -447,155 +419,3 @@ TCP_Transporter::disconnectImpl() {
theSocket = NDB_INVALID_SOCKET; theSocket = NDB_INVALID_SOCKET;
} }
bool
TCP_Transporter::startTCPServer() {
int bindResult, listenResult;
// The server variable is the remote server when we are a client
// htonl and htons returns the parameter in network byte order
// INADDR_ANY tells the OS kernel to choose the IP address
struct sockaddr_in server;
memset((void*)&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_addr.s_addr = localHostAddress.s_addr;
server.sin_port = htons(port);
if (theSocket != NDB_INVALID_SOCKET) {
return true; // Server socket is already initialized
}
// Create the socket
theSocket = socket(AF_INET, SOCK_STREAM, 0);
if (theSocket == NDB_INVALID_SOCKET) {
reportThreadError(remoteNodeId, TE_COULD_NOT_CREATE_SOCKET);
return false;
}
// Set the socket reuse addr to true, so we are sure we can bind the
// socket
int reuseAddr = 1;
setsockopt(theSocket, SOL_SOCKET, SO_REUSEADDR,
(char*)&reuseAddr, sizeof(reuseAddr));
// Set the TCP_NODELAY option so also small packets are sent
// as soon as possible
int nodelay = 1;
setsockopt(theSocket, IPPROTO_TCP, TCP_NODELAY,
(char*)&nodelay, sizeof(nodelay));
// Bind the socket
bindResult = bind(theSocket, (struct sockaddr *) &server,
sizeof(server));
if (bindResult < 0) {
reportThreadError(remoteNodeId, TE_COULD_NOT_BIND_SOCKET);
NDB_CLOSE_SOCKET(theSocket);
theSocket = NDB_INVALID_SOCKET;
return false;
}
// Perform listen.
listenResult = listen(theSocket, 1);
if (listenResult == 1) {
reportThreadError(remoteNodeId, TE_LISTEN_FAILED);
NDB_CLOSE_SOCKET(theSocket);
theSocket = NDB_INVALID_SOCKET;
return false;
}
return true;
}
bool
TCP_Transporter::acceptClient (struct timeval * timeout){
struct sockaddr_in clientAddress;
fd_set readset;
FD_ZERO(&readset);
FD_SET(theSocket, &readset);
const int res = select(theSocket + 1, &readset, 0, 0, timeout);
if(res == 0)
return false;
if(res < 0){
reportThreadError(remoteNodeId, TE_ERROR_IN_SELECT_BEFORE_ACCEPT);
return false;
}
NDB_SOCKLEN_T clientAddressLen = sizeof(clientAddress);
const NDB_SOCKET_TYPE clientSocket = accept(theSocket,
(struct sockaddr*)&clientAddress,
&clientAddressLen);
if (clientSocket == NDB_INVALID_SOCKET) {
reportThreadError(remoteNodeId, TE_ACCEPT_RETURN_ERROR);
return false;
}
if (clientAddress.sin_addr.s_addr != remoteHostAddress.s_addr) {
ndbout_c("Wrong client connecting!");
ndbout_c("connecting address: %s", inet_ntoa(clientAddress.sin_addr));
ndbout_c("expecting address: %s", inet_ntoa(remoteHostAddress));
// The newly connected host is not the remote host
// we wanted to connect to. Disconnect it.
// XXX This is not valid. We cannot disconnect it.
NDB_CLOSE_SOCKET(clientSocket);
return false;
} else {
NDB_CLOSE_SOCKET(theSocket);
theSocket = clientSocket;
setSocketOptions();
setSocketNonBlocking(theSocket);
return true;
}
}
bool
TCP_Transporter::connectClient (struct timeval * timeout){
// Create the socket
theSocket = socket(AF_INET, SOCK_STREAM, 0);
if (theSocket == NDB_INVALID_SOCKET) {
reportThreadError(remoteNodeId, TE_COULD_NOT_CREATE_SOCKET);
return false;
}
struct sockaddr_in server;
memset((void*)&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_addr = remoteHostAddress;
server.sin_port = htons(port);
struct sockaddr_in client;
memset((void*)&client, 0, sizeof(client));
client.sin_family = AF_INET;
client.sin_addr = localHostAddress;
client.sin_port = 0; // Any port
// Bind the socket
const int bindResult = bind(theSocket, (struct sockaddr *) &client,
sizeof(client));
if (bindResult < 0) {
reportThreadError(remoteNodeId, TE_COULD_NOT_BIND_SOCKET);
NDB_CLOSE_SOCKET(theSocket);
theSocket = NDB_INVALID_SOCKET;
return false;
}
const int connectRes = ::connect(theSocket, (struct sockaddr *) &server,
sizeof(server));
if(connectRes == 0){
setSocketOptions();
setSocketNonBlocking(theSocket);
return true;
}
NDB_CLOSE_SOCKET(theSocket);
theSocket = NDB_INVALID_SOCKET;
return false;
}
...@@ -14,24 +14,8 @@ ...@@ -14,24 +14,8 @@
along with this program; if not, write to the Free Software along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
//**************************************************************************** #ifndef TCP_TRANSPORTER_HPP
// #define TCP_TRANSPORTER_HPP
// AUTHOR
// sa Fransson
//
// NAME
// TCP_Transporter
//
// DESCRIPTION
// A TCP_Transporter instance is created when TCP/IP-communication
// shall be used (user specified). It handles connect, disconnect,
// send and receive.
//
//
//
//***************************************************************************/
#ifndef TCP_Transporter_H
#define TCP_Transporter_H
#include "Transporter.hpp" #include "Transporter.hpp"
#include "SendBuffer.hpp" #include "SendBuffer.hpp"
...@@ -61,11 +45,13 @@ class TCP_Transporter : public Transporter { ...@@ -61,11 +45,13 @@ class TCP_Transporter : public Transporter {
friend class TransporterRegistry; friend class TransporterRegistry;
private: private:
// Initialize member variables // Initialize member variables
TCP_Transporter(int sendBufferSize, int maxReceiveSize, TCP_Transporter(TransporterRegistry&,
int port, int sendBufferSize, int maxReceiveSize,
const char *rHostName,
const char *lHostName, const char *lHostName,
NodeId rHostId, NodeId lHostId, const char *rHostName,
int r_port,
NodeId lHostId,
NodeId rHostId,
int byteorder, int byteorder,
bool compression, bool checksum, bool signalId, bool compression, bool checksum, bool signalId,
Uint32 reportFreq = 4096); Uint32 reportFreq = 4096);
...@@ -121,12 +107,14 @@ protected: ...@@ -121,12 +107,14 @@ protected:
* A client connects to the remote server * A client connects to the remote server
* A server accepts any new connections * A server accepts any new connections
*/ */
bool connectImpl(Uint32 timeOutMillis); virtual bool connect_server_impl(NDB_SOCKET_TYPE sockfd);
virtual bool connect_client_impl(NDB_SOCKET_TYPE sockfd);
bool connect_common(NDB_SOCKET_TYPE sockfd);
/** /**
* Disconnects a TCP/IP node. Empty send and receivebuffer. * Disconnects a TCP/IP node. Empty send and receivebuffer.
*/ */
void disconnectImpl(); virtual void disconnectImpl();
private: private:
/** /**
...@@ -134,21 +122,11 @@ private: ...@@ -134,21 +122,11 @@ private:
*/ */
SendBuffer m_sendBuffer; SendBuffer m_sendBuffer;
const bool isServer;
const unsigned int port;
// Sending/Receiving socket used by both client and server // Sending/Receiving socket used by both client and server
NDB_SOCKET_TYPE theSocket; NDB_SOCKET_TYPE theSocket;
Uint32 maxReceiveSize; Uint32 maxReceiveSize;
/**
* Remote host name/and address
*/
char remoteHostName[256];
struct in_addr remoteHostAddress;
struct in_addr localHostAddress;
/** /**
* Socket options * Socket options
*/ */
...@@ -163,43 +141,6 @@ private: ...@@ -163,43 +141,6 @@ private:
bool sendIsPossible(struct timeval * timeout); bool sendIsPossible(struct timeval * timeout);
/**
* startTCPServer - None blocking
*
* create a server socket
* bind
* listen
*
* Note: Does not call accept
*/
bool startTCPServer();
/**
* acceptClient - Blocking
*
* Accept a connection
* checks if "right" client has connected
* if so
* close server socket
* else
* close newly created socket and goto begin
*/
bool acceptClient(struct timeval * timeout);
/**
* Creates a client socket
*
* Note does not call connect
*/
bool createClientSocket();
/**
* connectClient - Blocking
*
* connects to remote host
*/
bool connectClient(struct timeval * timeout);
/** /**
* Statistics * Statistics
*/ */
......
...@@ -15,132 +15,125 @@ ...@@ -15,132 +15,125 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <TransporterRegistry.hpp>
#include <TransporterCallback.hpp>
#include "Transporter.hpp" #include "Transporter.hpp"
#include "TransporterInternalDefinitions.hpp" #include "TransporterInternalDefinitions.hpp"
#include <NdbSleep.h> #include <NdbSleep.h>
#include <SocketAuthenticator.hpp>
Transporter::Transporter(NodeId lNodeId, NodeId rNodeId, #include <InputStream.hpp>
#include <OutputStream.hpp>
Transporter::Transporter(TransporterRegistry &t_reg,
const char *lHostName,
const char *rHostName,
int r_port,
NodeId lNodeId,
NodeId rNodeId,
int _byteorder, int _byteorder,
bool _compression, bool _checksum, bool _signalId) bool _compression, bool _checksum, bool _signalId)
: localNodeId(lNodeId), remoteNodeId(rNodeId), : m_r_port(r_port), localNodeId(lNodeId), remoteNodeId(rNodeId),
m_packer(_signalId, _checksum) isServer(lNodeId < rNodeId),
m_packer(_signalId, _checksum),
m_transporter_registry(t_reg)
{ {
if (rHostName && strlen(rHostName) > 0){
strncpy(remoteHostName, rHostName, sizeof(remoteHostName));
Ndb_getInAddr(&remoteHostAddress, rHostName);
}
else
{
if (!isServer) {
ndbout << "Unable to setup transporter. Node " << rNodeId
<< " must have hostname. Update configuration." << endl;
exit(-1);
}
remoteHostName[0]= 0;
}
strncpy(localHostName, lHostName, sizeof(localHostName));
if (strlen(lHostName) > 0)
Ndb_getInAddr(&localHostAddress, lHostName);
byteOrder = _byteorder; byteOrder = _byteorder;
compressionUsed = _compression; compressionUsed = _compression;
checksumUsed = _checksum; checksumUsed = _checksum;
signalIdUsed = _signalId; signalIdUsed = _signalId;
_threadError = TE_NO_ERROR; m_connected = false;
m_timeOutMillis = 1000;
_connecting = false;
_disconnecting = false;
_connected = false;
_timeOutMillis = 1000;
theThreadPtr = NULL;
theMutexPtr = NdbMutex_Create();
}
Transporter::~Transporter(){ if (isServer)
NdbMutex_Destroy(theMutexPtr); m_socket_client= 0;
else
if(theThreadPtr != 0){ {
void * retVal; unsigned short tmp_port= 3307+rNodeId;
NdbThread_WaitFor(theThreadPtr, &retVal); m_socket_client= new SocketClient(remoteHostName, tmp_port,
NdbThread_Destroy(&theThreadPtr); new SocketAuthSimple("ndbd passwd"));
} }
} }
extern "C" Transporter::~Transporter(){
void * if (m_socket_client)
runConnect_C(void * me) delete m_socket_client;
{
runConnect(me);
NdbThread_Exit(0);
return NULL;
} }
void * bool
runConnect(void * me){ Transporter::connect_server(NDB_SOCKET_TYPE sockfd) {
Transporter * t = (Transporter *) me; if(m_connected)
return true; // TODO assert(0);
DEBUG("Connect thread to " << t->remoteNodeId << " started");
while(true){
NdbMutex_Lock(t->theMutexPtr);
if(t->_disconnecting){
t->_connecting = false;
NdbMutex_Unlock(t->theMutexPtr);
DEBUG("Connect Thread " << t->remoteNodeId << " stop due to disconnect");
return 0;
}
NdbMutex_Unlock(t->theMutexPtr);
bool res = t->connectImpl(t->_timeOutMillis); // 1000 ms bool res = connect_server_impl(sockfd);
DEBUG("Waiting for " << t->remoteNodeId << "...");
if(res){ if(res){
t->_connected = true; m_connected = true;
t->_connecting = false; m_errorCount = 0;
t->_errorCount = 0;
t->_threadError = TE_NO_ERROR;
DEBUG("Connect Thread " << t->remoteNodeId << " stop due to connect");
return 0;
} }
}
}
void
Transporter::doConnect() {
NdbMutex_Lock(theMutexPtr); return res;
if(_connecting || _disconnecting || _connected){ }
NdbMutex_Unlock(theMutexPtr);
return;
}
_connecting = true; bool
Transporter::connect_client() {
if(m_connected)
return true;
_threadError = TE_NO_ERROR; NDB_SOCKET_TYPE sockfd = m_socket_client->connect();
// Start thread if (sockfd < 0)
return false;
char buf[16]; // send info about own id
snprintf(buf, sizeof(buf), "ndb_con_%d", remoteNodeId); SocketOutputStream s_output(sockfd);
s_output.println("%d", localNodeId);
if(theThreadPtr != 0){ // get remote id
void * retVal; int nodeId;
NdbThread_WaitFor(theThreadPtr, &retVal); SocketInputStream s_input(sockfd);
NdbThread_Destroy(&theThreadPtr); char buf[256];
if (s_input.gets(buf, 256) == 0) {
NDB_CLOSE_SOCKET(sockfd);
return false;
}
if (sscanf(buf, "%d", &nodeId) != 1) {
NDB_CLOSE_SOCKET(sockfd);
return false;
} }
theThreadPtr = NdbThread_Create(runConnect_C, bool res = connect_client_impl(sockfd);
(void**)this, if(res){
32768, m_connected = true;
buf, m_errorCount = 0;
NDB_THREAD_PRIO_LOW); }
return res;
NdbSleep_MilliSleep(100); // Let thread start
NdbMutex_Unlock(theMutexPtr);
} }
void void
Transporter::doDisconnect() { Transporter::doDisconnect() {
NdbMutex_Lock(theMutexPtr); if(!m_connected)
_disconnecting = true; return; //assert(0); TODO will fail
while(_connecting){
DEBUG("Waiting for connect to finish...");
NdbMutex_Unlock(theMutexPtr);
NdbSleep_MilliSleep(500);
NdbMutex_Lock(theMutexPtr);
}
_connected = false;
disconnectImpl(); disconnectImpl();
_threadError = TE_NO_ERROR;
_disconnecting = false;
NdbMutex_Unlock(theMutexPtr); m_connected= false;
} }
...@@ -19,6 +19,9 @@ ...@@ -19,6 +19,9 @@
#include <ndb_global.h> #include <ndb_global.h>
#include <SocketClient.hpp>
#include <TransporterRegistry.hpp>
#include <TransporterCallback.hpp> #include <TransporterCallback.hpp>
#include "TransporterDefinitions.hpp" #include "TransporterDefinitions.hpp"
#include "Packer.hpp" #include "Packer.hpp"
...@@ -40,7 +43,8 @@ public: ...@@ -40,7 +43,8 @@ public:
* None blocking * None blocking
* Use isConnected() to check status * Use isConnected() to check status
*/ */
virtual void doConnect(); bool connect_client();
bool connect_server(NDB_SOCKET_TYPE socket);
/** /**
* Blocking * Blocking
...@@ -60,14 +64,17 @@ public: ...@@ -60,14 +64,17 @@ public:
*/ */
NodeId getRemoteNodeId() const; NodeId getRemoteNodeId() const;
/** /**
* Set callback object * Local (own) Node Id
*/ */
void setCallbackObject(void * callback); NodeId getLocalNodeId() const;
protected: protected:
Transporter(NodeId lNodeId, Transporter(TransporterRegistry &,
const char *lHostName,
const char *rHostName,
int r_port,
NodeId lNodeId,
NodeId rNodeId, NodeId rNodeId,
int byteorder, int byteorder,
bool compression, bool compression,
...@@ -78,15 +85,28 @@ protected: ...@@ -78,15 +85,28 @@ protected:
* Blocking, for max timeOut milli seconds * Blocking, for max timeOut milli seconds
* Returns true if connect succeded * Returns true if connect succeded
*/ */
virtual bool connectImpl(Uint32 timeOut) = 0; virtual bool connect_server_impl(NDB_SOCKET_TYPE sockfd) = 0;
virtual bool connect_client_impl(NDB_SOCKET_TYPE sockfd) = 0;
/** /**
* Blocking * Blocking
*/ */
virtual void disconnectImpl() = 0; virtual void disconnectImpl() = 0;
const NodeId localNodeId; /**
* Remote host name/and address
*/
char remoteHostName[256];
char localHostName[256];
struct in_addr remoteHostAddress;
struct in_addr localHostAddress;
const unsigned int m_r_port;
const NodeId remoteNodeId; const NodeId remoteNodeId;
const NodeId localNodeId;
const bool isServer;
unsigned createIndex; unsigned createIndex;
...@@ -96,40 +116,28 @@ protected: ...@@ -96,40 +116,28 @@ protected:
bool signalIdUsed; bool signalIdUsed;
Packer m_packer; Packer m_packer;
private: private:
/**
* Thread and mutex for connect SocketClient *m_socket_client;
*/
NdbThread* theThreadPtr;
friend void* runConnect(void * me);
protected: protected:
/**
* Error reporting from connect thread(s)
*/
void reportThreadError(NodeId nodeId,
TransporterError errorCode);
Uint32 getErrorCount(); Uint32 getErrorCount();
TransporterError getThreadError(); Uint32 m_errorCount;
void resetThreadError(); Uint32 m_timeOutMillis;
TransporterError _threadError;
Uint32 _timeOutMillis;
Uint32 _errorCount;
protected: protected:
NdbMutex* theMutexPtr; bool m_connected; // Are we connected
bool _connected; // Are we connected
bool _connecting; // Connect thread is running
bool _disconnecting; // We are disconnecting
void * callbackObj; TransporterRegistry &m_transporter_registry;
void *get_callback_obj() { return m_transporter_registry.callbackObj; };
void report_disconnect(int err){m_transporter_registry.report_disconnect(remoteNodeId,err);};
void report_error(enum TransporterError err){reportError(get_callback_obj(),remoteNodeId,err);};
}; };
inline inline
bool bool
Transporter::isConnected() const { Transporter::isConnected() const {
return _connected; return m_connected;
} }
inline inline
...@@ -139,41 +147,16 @@ Transporter::getRemoteNodeId() const { ...@@ -139,41 +147,16 @@ Transporter::getRemoteNodeId() const {
} }
inline inline
void NodeId
Transporter::reportThreadError(NodeId nodeId, TransporterError errorCode) Transporter::getLocalNodeId() const {
{ return remoteNodeId;
#if 0
ndbout_c("Transporter::reportThreadError (NodeId: %d, Error code: %d)",
nodeId, errorCode);
#endif
_threadError = errorCode;
_errorCount++;
}
inline
TransporterError
Transporter::getThreadError(){
return _threadError;
} }
inline inline
Uint32 Uint32
Transporter::getErrorCount() Transporter::getErrorCount()
{ {
return _errorCount; return m_errorCount;
}
inline
void
Transporter::resetThreadError()
{
_threadError = TE_NO_ERROR;
}
inline
void
Transporter::setCallbackObject(void * callback) {
callbackObj = callback;
} }
#endif // Define of Transporter_H #endif // Define of Transporter_H
...@@ -16,10 +16,11 @@ ...@@ -16,10 +16,11 @@
#include <ndb_global.h> #include <ndb_global.h>
#include "TransporterRegistry.hpp" #include <TransporterRegistry.hpp>
#include "TransporterInternalDefinitions.hpp" #include "TransporterInternalDefinitions.hpp"
#include "Transporter.hpp" #include "Transporter.hpp"
#include <SocketAuthenticator.hpp>
#ifdef NDB_TCP_TRANSPORTER #ifdef NDB_TCP_TRANSPORTER
#include "TCP_Transporter.hpp" #include "TCP_Transporter.hpp"
...@@ -42,20 +43,67 @@ ...@@ -42,20 +43,67 @@
#include "NdbOut.hpp" #include "NdbOut.hpp"
#include <NdbSleep.h> #include <NdbSleep.h>
#include <NdbTick.h> #include <NdbTick.h>
#define STEPPING 1 #include <InputStream.hpp>
#include <OutputStream.hpp>
SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
{
if (m_auth && !m_auth->server_authenticate(sockfd)){
NDB_CLOSE_SOCKET(sockfd);
return 0;
}
{
// read node id from client
int nodeId;
SocketInputStream s_input(sockfd);
char buf[256];
if (s_input.gets(buf, 256) == 0) {
NDB_CLOSE_SOCKET(sockfd);
return 0;
}
if (sscanf(buf, "%d", &nodeId) != 1) {
NDB_CLOSE_SOCKET(sockfd);
return 0;
}
//check that nodeid is valid and that there is an allocated transporter
if ( nodeId < 0 || nodeId >= m_transporter_registry->maxTransporters) {
NDB_CLOSE_SOCKET(sockfd);
return 0;
}
if (m_transporter_registry->theTransporters[nodeId] == 0) {
NDB_CLOSE_SOCKET(sockfd);
return 0;
}
//check that the transporter should be connected
if (m_transporter_registry->performStates[nodeId] != TransporterRegistry::CONNECTING) {
NDB_CLOSE_SOCKET(sockfd);
return 0;
}
Transporter *t= m_transporter_registry->theTransporters[nodeId];
// send info about own id (just as response to acnowledge connection)
SocketOutputStream s_output(sockfd);
s_output.println("%d", t->getLocalNodeId());
// setup transporter (transporter responsable for closing sockfd)
t->connect_server(sockfd);
}
return 0;
}
TransporterRegistry::TransporterRegistry(void * callback, TransporterRegistry::TransporterRegistry(void * callback,
unsigned _maxTransporters, unsigned _maxTransporters,
unsigned sizeOfLongSignalMemory) { unsigned sizeOfLongSignalMemory) {
m_transporter_service= 0;
nodeIdSpecified = false; nodeIdSpecified = false;
maxTransporters = _maxTransporters; maxTransporters = _maxTransporters;
sendCounter = 1; sendCounter = 1;
m_ccCount = 0;
m_ccIndex = 0;
m_ccStep = STEPPING;
m_ccReady = false;
m_nTransportersPerformConnect=0;
callbackObj=callback; callbackObj=callback;
...@@ -82,7 +130,7 @@ TransporterRegistry::TransporterRegistry(void * callback, ...@@ -82,7 +130,7 @@ TransporterRegistry::TransporterRegistry(void * callback,
theSHMTransporters[i] = NULL; theSHMTransporters[i] = NULL;
theOSETransporters[i] = NULL; theOSETransporters[i] = NULL;
theTransporters[i] = NULL; theTransporters[i] = NULL;
performStates[i] = PerformNothing; performStates[i] = DISCONNECTED;
ioStates[i] = NoHalt; ioStates[i] = NoHalt;
} }
theOSEReceiver = 0; theOSEReceiver = 0;
...@@ -154,13 +202,14 @@ TransporterRegistry::createTransporter(TCP_TransporterConfiguration *config) { ...@@ -154,13 +202,14 @@ TransporterRegistry::createTransporter(TCP_TransporterConfiguration *config) {
return false; return false;
TCP_Transporter * t = new TCP_Transporter(config->sendBufferSize, TCP_Transporter * t = new TCP_Transporter(*this,
config->sendBufferSize,
config->maxReceiveSize, config->maxReceiveSize,
config->port,
config->remoteHostName,
config->localHostName, config->localHostName,
config->remoteNodeId, config->remoteHostName,
config->port,
localNodeId, localNodeId,
config->remoteNodeId,
config->byteOrder, config->byteOrder,
config->compression, config->compression,
config->checksum, config->checksum,
...@@ -172,13 +221,11 @@ TransporterRegistry::createTransporter(TCP_TransporterConfiguration *config) { ...@@ -172,13 +221,11 @@ TransporterRegistry::createTransporter(TCP_TransporterConfiguration *config) {
return false; return false;
} }
t->setCallbackObject(callbackObj);
// Put the transporter in the transporter arrays // Put the transporter in the transporter arrays
theTCPTransporters[nTCPTransporters] = t; theTCPTransporters[nTCPTransporters] = t;
theTransporters[t->getRemoteNodeId()] = t; theTransporters[t->getRemoteNodeId()] = t;
theTransporterTypes[t->getRemoteNodeId()] = tt_TCP_TRANSPORTER; theTransporterTypes[t->getRemoteNodeId()] = tt_TCP_TRANSPORTER;
performStates[t->getRemoteNodeId()] = PerformNothing; performStates[t->getRemoteNodeId()] = DISCONNECTED;
nTransporters++; nTransporters++;
nTCPTransporters++; nTCPTransporters++;
...@@ -228,12 +275,11 @@ TransporterRegistry::createTransporter(OSE_TransporterConfiguration *conf) { ...@@ -228,12 +275,11 @@ TransporterRegistry::createTransporter(OSE_TransporterConfiguration *conf) {
delete t; delete t;
return false; return false;
} }
t->setCallbackObject(callbackObj);
// Put the transporter in the transporter arrays // Put the transporter in the transporter arrays
theOSETransporters[nOSETransporters] = t; theOSETransporters[nOSETransporters] = t;
theTransporters[t->getRemoteNodeId()] = t; theTransporters[t->getRemoteNodeId()] = t;
theTransporterTypes[t->getRemoteNodeId()] = tt_OSE_TRANSPORTER; theTransporterTypes[t->getRemoteNodeId()] = tt_OSE_TRANSPORTER;
performStates[t->getRemoteNodeId()] = PerformNothing; performStates[t->getRemoteNodeId()] = DISCONNECTED;
nTransporters++; nTransporters++;
nOSETransporters++; nOSETransporters++;
...@@ -279,12 +325,11 @@ TransporterRegistry::createTransporter(SCI_TransporterConfiguration *config) { ...@@ -279,12 +325,11 @@ TransporterRegistry::createTransporter(SCI_TransporterConfiguration *config) {
delete t; delete t;
return false; return false;
} }
t->setCallbackObject(callbackObj);
// Put the transporter in the transporter arrays // Put the transporter in the transporter arrays
theSCITransporters[nSCITransporters] = t; theSCITransporters[nSCITransporters] = t;
theTransporters[t->getRemoteNodeId()] = t; theTransporters[t->getRemoteNodeId()] = t;
theTransporterTypes[t->getRemoteNodeId()] = tt_SCI_TRANSPORTER; theTransporterTypes[t->getRemoteNodeId()] = tt_SCI_TRANSPORTER;
performStates[t->getRemoteNodeId()] = PerformNothing; performStates[t->getRemoteNodeId()] = DISCONNECTED;
nTransporters++; nTransporters++;
nSCITransporters++; nSCITransporters++;
...@@ -321,12 +366,11 @@ TransporterRegistry::createTransporter(SHM_TransporterConfiguration *config) { ...@@ -321,12 +366,11 @@ TransporterRegistry::createTransporter(SHM_TransporterConfiguration *config) {
delete t; delete t;
return false; return false;
} }
t->setCallbackObject(callbackObj);
// Put the transporter in the transporter arrays // Put the transporter in the transporter arrays
theSHMTransporters[nSHMTransporters] = t; theSHMTransporters[nSHMTransporters] = t;
theTransporters[t->getRemoteNodeId()] = t; theTransporters[t->getRemoteNodeId()] = t;
theTransporterTypes[t->getRemoteNodeId()] = tt_SHM_TRANSPORTER; theTransporterTypes[t->getRemoteNodeId()] = tt_SHM_TRANSPORTER;
performStates[t->getRemoteNodeId()] = PerformNothing; performStates[t->getRemoteNodeId()] = DISCONNECTED;
nTransporters++; nTransporters++;
nSHMTransporters++; nSHMTransporters++;
...@@ -781,7 +825,7 @@ TransporterRegistry::performReceive(){ ...@@ -781,7 +825,7 @@ TransporterRegistry::performReceive(){
TCP_Transporter *t = theTCPTransporters[i]; TCP_Transporter *t = theTCPTransporters[i];
const NodeId nodeId = t->getRemoteNodeId(); const NodeId nodeId = t->getRemoteNodeId();
const NDB_SOCKET_TYPE socket = t->getSocket(); const NDB_SOCKET_TYPE socket = t->getSocket();
if(performStates[nodeId] == PerformIO){ if(is_connected(nodeId)){
if(t->isConnected() && FD_ISSET(socket, &tcpReadset)) { if(t->isConnected() && FD_ISSET(socket, &tcpReadset)) {
const int receiveSize = t->doReceive(); const int receiveSize = t->doReceive();
if(receiveSize > 0){ if(receiveSize > 0){
...@@ -804,7 +848,7 @@ TransporterRegistry::performReceive(){ ...@@ -804,7 +848,7 @@ TransporterRegistry::performReceive(){
checkJobBuffer(); checkJobBuffer();
SCI_Transporter *t = theSCITransporters[i]; SCI_Transporter *t = theSCITransporters[i];
const NodeId nodeId = t->getRemoteNodeId(); const NodeId nodeId = t->getRemoteNodeId();
if(performStates[nodeId] == PerformIO){ if(is_connected(nodeId)){
if(t->isConnected() && t->checkConnected()){ if(t->isConnected() && t->checkConnected()){
Uint32 * readPtr, * eodPtr; Uint32 * readPtr, * eodPtr;
t->getReceivePtr(&readPtr, &eodPtr); t->getReceivePtr(&readPtr, &eodPtr);
...@@ -819,7 +863,7 @@ TransporterRegistry::performReceive(){ ...@@ -819,7 +863,7 @@ TransporterRegistry::performReceive(){
checkJobBuffer(); checkJobBuffer();
SHM_Transporter *t = theSHMTransporters[i]; SHM_Transporter *t = theSHMTransporters[i];
const NodeId nodeId = t->getRemoteNodeId(); const NodeId nodeId = t->getRemoteNodeId();
if(performStates[nodeId] == PerformIO){ if(is_connected(nodeId)){
if(t->isConnected() && t->checkConnected()){ if(t->isConnected() && t->checkConnected()){
Uint32 * readPtr, * eodPtr; Uint32 * readPtr, * eodPtr;
t->getReceivePtr(&readPtr, &eodPtr); t->getReceivePtr(&readPtr, &eodPtr);
...@@ -840,7 +884,7 @@ TransporterRegistry::performSend(){ ...@@ -840,7 +884,7 @@ TransporterRegistry::performSend(){
#ifdef NDB_OSE_TRANSPORTER #ifdef NDB_OSE_TRANSPORTER
for (int i = 0; i < nOSETransporters; i++){ for (int i = 0; i < nOSETransporters; i++){
OSE_Transporter *t = theOSETransporters[i]; OSE_Transporter *t = theOSETransporters[i];
if((performStates[t->getRemoteNodeId()] == PerformIO) && if((is_connected(t->getRemoteNodeId()) &&
(t->isConnected())) { (t->isConnected())) {
t->doSend(); t->doSend();
}//if }//if
...@@ -887,7 +931,7 @@ TransporterRegistry::performSend(){ ...@@ -887,7 +931,7 @@ TransporterRegistry::performSend(){
TCP_Transporter *t = theTCPTransporters[i]; TCP_Transporter *t = theTCPTransporters[i];
const NodeId nodeId = t->getRemoteNodeId(); const NodeId nodeId = t->getRemoteNodeId();
const int socket = t->getSocket(); const int socket = t->getSocket();
if(performStates[nodeId] == PerformIO){ if(is_connected(nodeId)){
if(t->isConnected() && FD_ISSET(socket, &writeset)) { if(t->isConnected() && FD_ISSET(socket, &writeset)) {
t->doSend(); t->doSend();
}//if }//if
...@@ -901,7 +945,7 @@ TransporterRegistry::performSend(){ ...@@ -901,7 +945,7 @@ TransporterRegistry::performSend(){
if (t && if (t &&
(t->hasDataToSend()) && (t->hasDataToSend()) &&
(t->isConnected()) && (t->isConnected()) &&
(performStates[t->getRemoteNodeId()] == PerformIO)) { (is_connected(t->getRemoteNodeId()))) {
t->doSend(); t->doSend();
}//if }//if
}//for }//for
...@@ -910,7 +954,7 @@ TransporterRegistry::performSend(){ ...@@ -910,7 +954,7 @@ TransporterRegistry::performSend(){
if (t && if (t &&
(t->hasDataToSend()) && (t->hasDataToSend()) &&
(t->isConnected()) && (t->isConnected()) &&
(performStates[t->getRemoteNodeId()] == PerformIO)) { (is_connected(t->getRemoteNodeId()))) {
t->doSend(); t->doSend();
}//if }//if
}//for }//for
...@@ -925,7 +969,7 @@ TransporterRegistry::performSend(){ ...@@ -925,7 +969,7 @@ TransporterRegistry::performSend(){
SCI_Transporter *t = theSCITransporters[i]; SCI_Transporter *t = theSCITransporters[i];
const NodeId nodeId = t->getRemoteNodeId(); const NodeId nodeId = t->getRemoteNodeId();
if(performStates[nodeId] == PerformIO){ if(is_connected(nodeId)){
if(t->isConnected() && t->hasDataToSend()) { if(t->isConnected() && t->hasDataToSend()) {
t->doSend(); t->doSend();
} //if } //if
...@@ -961,70 +1005,210 @@ TransporterRegistry::printState(){ ...@@ -961,70 +1005,210 @@ TransporterRegistry::printState(){
} }
#endif #endif
PerformState IOState
TransporterRegistry::performState(NodeId nodeId) { TransporterRegistry::ioState(NodeId nodeId) {
return performStates[nodeId]; return ioStates[nodeId];
} }
#ifdef DEBUG_TRANSPORTER void
const char * TransporterRegistry::setIOState(NodeId nodeId, IOState state) {
performStateString(PerformState state){ DEBUG("TransporterRegistry::setIOState("
switch(state){ << nodeId << ", " << state << ")");
case PerformNothing: ioStates[nodeId] = state;
return "PerformNothing"; }
break;
case PerformIO: static void *
return "PerformIO"; run_start_clients_C(void * me)
{
((TransporterRegistry*) me)->start_clients_thread();
NdbThread_Exit(0);
return me;
}
// Run by kernel thread
void
TransporterRegistry::do_connect(NodeId node_id)
{
PerformState &curr_state = performStates[node_id];
switch(curr_state){
case DISCONNECTED:
break; break;
case PerformConnect: case CONNECTED:
return "PerformConnect"; return;
case CONNECTING:
return;
case DISCONNECTING:
break; break;
case PerformDisconnect: }
return "PerformDisconnect"; curr_state= CONNECTING;
}
void
TransporterRegistry::do_disconnect(NodeId node_id)
{
PerformState &curr_state = performStates[node_id];
switch(curr_state){
case DISCONNECTED:
return;
case CONNECTED:
break; break;
case RemoveTransporter: case CONNECTING:
return "RemoveTransporter";
break; break;
case DISCONNECTING:
return;
} }
return "Unknown"; curr_state= DISCONNECTING;
} }
#endif
void void
TransporterRegistry::setPerformState(NodeId nodeId, PerformState state) { TransporterRegistry::report_connect(NodeId node_id)
DEBUG("TransporterRegistry::setPerformState(" {
<< nodeId << ", " << performStateString(state) << ")"); performStates[node_id] = CONNECTED;
reportConnect(callbackObj, node_id);
}
performStates[nodeId] = state; void
TransporterRegistry::report_disconnect(NodeId node_id, int errnum)
{
performStates[node_id] = DISCONNECTED;
reportDisconnect(callbackObj, node_id, errnum);
} }
void void
TransporterRegistry::setPerformState(PerformState state) { TransporterRegistry::update_connections()
int count = 0; {
int index = 0; for (int i= 0, n= 0; n < nTransporters; i++){
while(count < nTransporters){ Transporter * t = theTransporters[i];
if(theTransporters[index] != 0){ if (!t)
setPerformState(theTransporters[index]->getRemoteNodeId(), state); continue;
count ++; n++;
const NodeId nodeId = t->getRemoteNodeId();
switch(performStates[nodeId]){
case CONNECTED:
case DISCONNECTED:
break;
case CONNECTING:
if(t->isConnected())
report_connect(nodeId);
break;
case DISCONNECTING:
if(!t->isConnected())
report_disconnect(nodeId, 0);
break;
} }
index ++;
} }
} }
IOState // run as own thread
TransporterRegistry::ioState(NodeId nodeId) { void
return ioStates[nodeId]; TransporterRegistry::start_clients_thread()
{
while (m_run_start_clients_thread) {
NdbSleep_MilliSleep(100);
for (int i= 0, n= 0; n < nTransporters && m_run_start_clients_thread; i++){
Transporter * t = theTransporters[i];
if (!t)
continue;
n++;
const NodeId nodeId = t->getRemoteNodeId();
switch(performStates[nodeId]){
case CONNECTING:
if(!t->isConnected() && !t->isServer)
t->connect_client();
break;
case DISCONNECTING:
if(t->isConnected())
t->doDisconnect();
break;
default:
break;
}
}
}
} }
void bool
TransporterRegistry::setIOState(NodeId nodeId, IOState state) { TransporterRegistry::start_clients()
DEBUG("TransporterRegistry::setIOState(" {
<< nodeId << ", " << state << ")"); m_run_start_clients_thread= true;
ioStates[nodeId] = state; m_start_clients_thread= NdbThread_Create(run_start_clients_C,
(void**)this,
32768,
"ndb_start_clients",
NDB_THREAD_PRIO_LOW);
if (m_start_clients_thread == 0) {
m_run_start_clients_thread= false;
return false;
}
return true;
}
bool
TransporterRegistry::stop_clients()
{
if (m_start_clients_thread) {
m_run_start_clients_thread= false;
void* status;
int r= NdbThread_WaitFor(m_start_clients_thread, &status);
NdbThread_Destroy(&m_start_clients_thread);
}
return true;
}
bool
TransporterRegistry::start_service(SocketServer& socket_server)
{
#if 0
for (int i= 0, n= 0; n < nTransporters; i++){
Transporter * t = theTransporters[i];
if (!t)
continue;
n++;
if (t->isServer) {
t->m_service = new TransporterService(new SocketAuthSimple("ndbd passwd"));
if(!socket_server.setup(t->m_service, t->m_r_port, 0))
{
ndbout_c("Unable to setup transporter service port: %d!\n"
"Please check if the port is already used,\n"
"(perhaps a mgmtsrvrserver is already running)",
m_service_port);
delete t->m_service;
return false;
}
}
}
#endif
m_transporter_service = new TransporterService(new SocketAuthSimple("ndbd passwd"));
if (nodeIdSpecified != true) {
ndbout_c("TransporterRegistry::startReceiving: localNodeId not specified");
return false;
}
m_service_port = 3307 + localNodeId;
//m_interface_name = "ndbd";
m_interface_name = 0;
if(!socket_server.setup(m_transporter_service, m_service_port, m_interface_name))
{
ndbout_c("Unable to setup transporter service port: %d!\n"
"Please check if the port is already used,\n"
"(perhaps a mgmtsrvrserver is already running)",
m_service_port);
delete m_transporter_service;
return false;
}
m_transporter_service->setTransporterRegistry(this);
return true;
} }
void void
TransporterRegistry::startReceiving(){ TransporterRegistry::startReceiving()
{
#ifdef NDB_OSE_TRANSPORTER #ifdef NDB_OSE_TRANSPORTER
if(theOSEReceiver != NULL){ if(theOSEReceiver != NULL){
theOSEReceiver->createPhantom(); theOSEReceiver->createPhantom();
...@@ -1081,99 +1265,6 @@ TransporterRegistry::stopSending(){ ...@@ -1081,99 +1265,6 @@ TransporterRegistry::stopSending(){
#endif #endif
} }
/**
* The old implementation did not scale with a large
* number of nodes. (Watchdog killed NDB because
* it took too long time to allocated threads in
* doConnect.
*
* The new implementation only checks the connection
* for a number of transporters (STEPPING), until to
* the point where all transporters has executed
* doConnect once. After that, the behaviour is as
* in the old implemenation, i.e, checking the connection
* for all transporters.
* @todo: instead of STEPPING, maybe we should only
* allow checkConnections to execute for a certain
* time that somehow factors in heartbeat times and
* watchdog times.
*
*/
void
TransporterRegistry::checkConnections(){
if(m_ccStep > nTransporters)
m_ccStep = nTransporters;
while(m_ccCount < m_ccStep){
if(theTransporters[m_ccIndex] != 0){
Transporter * t = theTransporters[m_ccIndex];
const NodeId nodeId = t->getRemoteNodeId();
if(t->getThreadError() != 0) {
reportError(callbackObj, nodeId, t->getThreadError());
t->resetThreadError();
}
switch(performStates[nodeId]){
case PerformConnect:
if(!t->isConnected()){
t->doConnect();
if(m_nTransportersPerformConnect!=nTransporters)
m_nTransportersPerformConnect++;
} else {
performStates[nodeId] = PerformIO;
reportConnect(callbackObj, nodeId);
}
break;
case PerformDisconnect:
{
bool wasConnected = t->isConnected();
t->doDisconnect();
performStates[nodeId] = PerformNothing;
if(wasConnected){
reportDisconnect(callbackObj, nodeId,0);
}
}
break;
case RemoveTransporter:
removeTransporter(nodeId);
break;
case PerformNothing:
case PerformIO:
break;
}
m_ccCount ++;
}
m_ccIndex ++;
}
if(!m_ccReady) {
if(m_ccCount < nTransporters) {
if(nTransporters - m_ccStep < STEPPING)
m_ccStep += nTransporters-m_ccStep;
else
m_ccStep += STEPPING;
// ndbout_c("count %d step %d ", m_ccCount, m_ccStep);
}
else {
m_ccCount = 0;
m_ccIndex = 0;
m_ccStep = STEPPING;
// ndbout_c("count %d step %d ", m_ccCount, m_ccStep);
}
}
if((nTransporters == m_nTransportersPerformConnect) || m_ccReady) {
m_ccReady = true;
m_ccCount = 0;
m_ccIndex = 0;
m_ccStep = nTransporters;
// ndbout_c("alla count %d step %d ", m_ccCount, m_ccStep);
}
}//TransporterRegistry::checkConnections()
NdbOut & operator <<(NdbOut & out, SignalHeader & sh){ NdbOut & operator <<(NdbOut & out, SignalHeader & sh){
out << "-- Signal Header --" << endl; out << "-- Signal Header --" << endl;
out << "theLength: " << sh.theLength << endl; out << "theLength: " << sh.theLength << endl;
......
...@@ -3,7 +3,8 @@ noinst_LTLIBRARIES = libgeneral.la ...@@ -3,7 +3,8 @@ noinst_LTLIBRARIES = libgeneral.la
libgeneral_la_SOURCES = \ libgeneral_la_SOURCES = \
File.cpp md5_hash.cpp Properties.cpp socket_io.cpp \ File.cpp md5_hash.cpp Properties.cpp socket_io.cpp \
SimpleProperties.cpp Parser.cpp InputStream.cpp SocketServer.cpp \ SimpleProperties.cpp Parser.cpp InputStream.cpp \
SocketServer.cpp SocketClient.cpp SocketAuthenticator.cpp\
OutputStream.cpp NdbOut.cpp BaseString.cpp Base64.cpp \ OutputStream.cpp NdbOut.cpp BaseString.cpp Base64.cpp \
NdbSqlUtil.cpp new.cpp \ NdbSqlUtil.cpp new.cpp \
uucode.c random.c getarg.c version.c \ uucode.c random.c getarg.c version.c \
......
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <ndb_global.h>
#include <SocketClient.hpp>
#include <SocketAuthenticator.hpp>
#include <NdbOut.hpp>
SocketAuthSimple::SocketAuthSimple(const char *passwd) {
m_passwd= strdup(passwd);
m_buf= (char*)malloc(strlen(passwd)+1);
}
SocketAuthSimple::~SocketAuthSimple()
{
if (m_passwd)
free((void*)m_passwd);
if (m_buf)
free(m_buf);
}
bool SocketAuthSimple::client_authenticate(int sockfd)
{
if (!m_passwd)
return false;
int len = strlen(m_passwd);
int r;
r= send(sockfd, m_passwd, len, 0);
r= recv(sockfd, m_buf, len, 0);
m_buf[r]= '\0';
return true;
}
bool SocketAuthSimple::server_authenticate(int sockfd)
{
if (!m_passwd)
return false;
int len = strlen(m_passwd), r;
r= recv(sockfd, m_buf, len, 0);
m_buf[r]= '\0';
r= send(sockfd, m_passwd, len, 0);
return true;
}
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <ndb_global.h>
#include <NdbOut.hpp>
#include <SocketClient.hpp>
#include <SocketAuthenticator.hpp>
SocketClient::SocketClient(const char *server_name, unsigned short port, SocketAuthenticator *sa)
{
m_auth= sa;
m_port= port;
m_server_name= strdup(server_name);
m_sockfd= -1;
}
SocketClient::~SocketClient()
{
if (m_server_name)
free(m_server_name);
if (m_sockfd >= 0)
NDB_CLOSE_SOCKET(m_sockfd);
if (m_auth)
delete m_auth;
}
bool
SocketClient::init()
{
if (m_sockfd >= 0)
NDB_CLOSE_SOCKET(m_sockfd);
memset(&m_servaddr, 0, sizeof(m_servaddr));
m_servaddr.sin_family = AF_INET;
m_servaddr.sin_port = htons(m_port);
// Convert ip address presentation format to numeric format
if (Ndb_getInAddr(&m_servaddr.sin_addr, m_server_name))
return false;
m_sockfd= socket(AF_INET, SOCK_STREAM, 0);
if (m_sockfd == NDB_INVALID_SOCKET) {
return false;
}
return true;
}
NDB_SOCKET_TYPE
SocketClient::connect()
{
if (m_sockfd < 0)
{
if (!init()) {
ndbout << "SocketClient::connect() failed " << m_server_name << " " << m_port << endl;
return -1;
}
}
const int r = ::connect(m_sockfd, (struct sockaddr*) &m_servaddr, sizeof(m_servaddr));
if (r == -1)
return -1;
if (m_auth)
if (!m_auth->client_authenticate(m_sockfd))
{
NDB_CLOSE_SOCKET(m_sockfd);
m_sockfd= -1;
return -1;
}
NDB_SOCKET_TYPE sockfd= m_sockfd;
m_sockfd= -1;
return sockfd;
}
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
#include <ndb_global.h> #include <ndb_global.h>
#include "SocketServer.hpp" #include <SocketServer.hpp>
#include <NdbTCP.h> #include <NdbTCP.h>
#include <NdbOut.hpp> #include <NdbOut.hpp>
......
...@@ -4,7 +4,7 @@ include $(top_srcdir)/ndb/config/common.mk.am ...@@ -4,7 +4,7 @@ include $(top_srcdir)/ndb/config/common.mk.am
ndbbin_PROGRAMS = ndbd ndbbin_PROGRAMS = ndbd
ndbd_SOURCES = Main.cpp SimBlockList.cpp ndbd_SOURCES = main.cpp SimBlockList.cpp
include $(top_srcdir)/ndb/config/type_kernel.mk.am include $(top_srcdir)/ndb/config/type_kernel.mk.am
......
...@@ -362,7 +362,7 @@ void Cmvmi::execCLOSE_COMREQ(Signal* signal) ...@@ -362,7 +362,7 @@ void Cmvmi::execCLOSE_COMREQ(Signal* signal)
sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB); sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB);
globalTransporterRegistry.setIOState(i, HaltIO); globalTransporterRegistry.setIOState(i, HaltIO);
globalTransporterRegistry.setPerformState(i, PerformDisconnect); globalTransporterRegistry.do_disconnect(i);
/** /**
* Cancel possible event subscription * Cancel possible event subscription
...@@ -390,7 +390,7 @@ void Cmvmi::execOPEN_COMREQ(Signal* signal) ...@@ -390,7 +390,7 @@ void Cmvmi::execOPEN_COMREQ(Signal* signal)
const Uint32 len = signal->getLength(); const Uint32 len = signal->getLength();
if(len == 2){ if(len == 2){
globalTransporterRegistry.setPerformState(tStartingNode, PerformConnect); globalTransporterRegistry.do_connect(tStartingNode);
globalTransporterRegistry.setIOState(tStartingNode, HaltIO); globalTransporterRegistry.setIOState(tStartingNode, HaltIO);
//----------------------------------------------------- //-----------------------------------------------------
...@@ -405,7 +405,7 @@ void Cmvmi::execOPEN_COMREQ(Signal* signal) ...@@ -405,7 +405,7 @@ void Cmvmi::execOPEN_COMREQ(Signal* signal)
jam(); jam();
if (i != getOwnNodeId() && getNodeInfo(i).m_type == tData2){ if (i != getOwnNodeId() && getNodeInfo(i).m_type == tData2){
jam(); jam();
globalTransporterRegistry.setPerformState(i, PerformConnect); globalTransporterRegistry.do_connect(i);
globalTransporterRegistry.setIOState(i, HaltIO); globalTransporterRegistry.setIOState(i, HaltIO);
signal->theData[0] = EventReport::CommunicationOpened; signal->theData[0] = EventReport::CommunicationOpened;
...@@ -456,14 +456,6 @@ void Cmvmi::execDISCONNECT_REP(Signal *signal) ...@@ -456,14 +456,6 @@ void Cmvmi::execDISCONNECT_REP(Signal *signal)
const NodeInfo::NodeType type = getNodeInfo(hostId).getType(); const NodeInfo::NodeType type = getNodeInfo(hostId).getType();
ndbrequire(type != NodeInfo::INVALID); ndbrequire(type != NodeInfo::INVALID);
if (globalTransporterRegistry.performState(hostId) != PerformDisconnect) {
jam();
// -------------------------------------------------------------------
// We do not report the disconnection when disconnection is already ongoing.
// This reporting should be looked into but this secures that we avoid
// crashes due to too quick re-reporting of disconnection.
// -------------------------------------------------------------------
if(type == NodeInfo::DB || globalData.theStartLevel == NodeState::SL_STARTED){ if(type == NodeInfo::DB || globalData.theStartLevel == NodeState::SL_STARTED){
jam(); jam();
DisconnectRep * const rep = (DisconnectRep *)&signal->theData[0]; DisconnectRep * const rep = (DisconnectRep *)&signal->theData[0];
...@@ -471,19 +463,14 @@ void Cmvmi::execDISCONNECT_REP(Signal *signal) ...@@ -471,19 +463,14 @@ void Cmvmi::execDISCONNECT_REP(Signal *signal)
rep->err = errNo; rep->err = errNo;
sendSignal(QMGR_REF, GSN_DISCONNECT_REP, signal, sendSignal(QMGR_REF, GSN_DISCONNECT_REP, signal,
DisconnectRep::SignalLength, JBA); DisconnectRep::SignalLength, JBA);
globalTransporterRegistry.setPerformState(hostId, PerformDisconnect); } else if((globalData.theStartLevel == NodeState::SL_CMVMI ||
} else if(globalData.theStartLevel == NodeState::SL_CMVMI || globalData.theStartLevel == NodeState::SL_STARTING)
globalData.theStartLevel == NodeState::SL_STARTING) { && type == NodeInfo::MGM) {
/** /**
* Someone disconnected during cmvmi period * Someone disconnected during cmvmi period
*/ */
if(type == NodeInfo::MGM){
jam(); jam();
globalTransporterRegistry.setPerformState(hostId, PerformConnect); globalTransporterRegistry.do_connect(hostId);
} else {
globalTransporterRegistry.setPerformState(hostId, PerformDisconnect);
}
}
} }
signal->theData[0] = EventReport::Disconnected; signal->theData[0] = EventReport::Disconnected;
...@@ -522,7 +509,8 @@ void Cmvmi::execCONNECT_REP(Signal *signal){ ...@@ -522,7 +509,8 @@ void Cmvmi::execCONNECT_REP(Signal *signal){
/** /**
* Dont allow api nodes to connect * Dont allow api nodes to connect
*/ */
globalTransporterRegistry.setPerformState(hostId, PerformDisconnect); abort();
globalTransporterRegistry.do_disconnect(hostId);
} }
} }
...@@ -756,8 +744,8 @@ Cmvmi::execSTART_ORD(Signal* signal) { ...@@ -756,8 +744,8 @@ Cmvmi::execSTART_ORD(Signal* signal) {
*/ */
for(unsigned int i = 1; i < MAX_NODES; i++ ){ for(unsigned int i = 1; i < MAX_NODES; i++ ){
if (getNodeInfo(i).m_type == NodeInfo::MGM){ if (getNodeInfo(i).m_type == NodeInfo::MGM){
if(globalTransporterRegistry.performState(i) != PerformIO){ if(!globalTransporterRegistry.is_connected(i)){
globalTransporterRegistry.setPerformState(i, PerformConnect); globalTransporterRegistry.do_connect(i);
globalTransporterRegistry.setIOState(i, NoHalt); globalTransporterRegistry.setIOState(i, NoHalt);
} }
} }
...@@ -783,7 +771,7 @@ Cmvmi::execSTART_ORD(Signal* signal) { ...@@ -783,7 +771,7 @@ Cmvmi::execSTART_ORD(Signal* signal) {
// without any connected nodes. // without any connected nodes.
for(unsigned int i = 1; i < MAX_NODES; i++ ){ for(unsigned int i = 1; i < MAX_NODES; i++ ){
if (i != getOwnNodeId() && getNodeInfo(i).m_type != NodeInfo::MGM){ if (i != getOwnNodeId() && getNodeInfo(i).m_type != NodeInfo::MGM){
globalTransporterRegistry.setPerformState(i, PerformDisconnect); globalTransporterRegistry.do_disconnect(i);
globalTransporterRegistry.setIOState(i, HaltIO); globalTransporterRegistry.setIOState(i, HaltIO);
} }
} }
...@@ -1062,29 +1050,10 @@ Cmvmi::execDUMP_STATE_ORD(Signal* signal) ...@@ -1062,29 +1050,10 @@ Cmvmi::execDUMP_STATE_ORD(Signal* signal)
if(nodeTypeStr == 0) if(nodeTypeStr == 0)
continue; continue;
const char* actionStr = "";
switch (globalTransporterRegistry.performState(i)){
case PerformNothing:
actionStr = "does nothing";
break;
case PerformIO:
actionStr = "is connected";
break;
case PerformConnect:
actionStr = "is trying to connect";
break;
case PerformDisconnect:
actionStr = "is trying to disconnect";
break;
case RemoveTransporter:
actionStr = "will be removed";
break;
}
infoEvent("Connection to %d (%s) %s", infoEvent("Connection to %d (%s) %s",
i, i,
nodeTypeStr, nodeTypeStr,
actionStr); globalTransporterRegistry.getPerformStateString(i));
} }
} }
......
...@@ -1704,6 +1704,7 @@ void Qmgr::sendApiFailReq(Signal* signal, Uint16 failedNodeNo) ...@@ -1704,6 +1704,7 @@ void Qmgr::sendApiFailReq(Signal* signal, Uint16 failedNodeNo)
sendSignal(DBTC_REF, GSN_API_FAILREQ, signal, 2, JBA); sendSignal(DBTC_REF, GSN_API_FAILREQ, signal, 2, JBA);
sendSignal(DBDICT_REF, GSN_API_FAILREQ, signal, 2, JBA); sendSignal(DBDICT_REF, GSN_API_FAILREQ, signal, 2, JBA);
sendSignal(SUMA_REF, GSN_API_FAILREQ, signal, 2, JBA); sendSignal(SUMA_REF, GSN_API_FAILREQ, signal, 2, JBA);
/** /**
* GREP also need the information that an API node * GREP also need the information that an API node
* (actually a REP node) has failed. * (actually a REP node) has failed.
...@@ -1978,6 +1979,9 @@ void Qmgr::execAPI_REGREQ(Signal* signal) ...@@ -1978,6 +1979,9 @@ void Qmgr::execAPI_REGREQ(Signal* signal)
apiRegConf->nodeState.dynamicId = -dynamicId; apiRegConf->nodeState.dynamicId = -dynamicId;
} }
} }
c_connectedNodes.copyto(NdbNodeBitmask::Size,
apiRegConf->connected_nodes.data);
sendSignal(ref, GSN_API_REGCONF, signal, ApiRegConf::SignalLength, JBB); sendSignal(ref, GSN_API_REGCONF, signal, ApiRegConf::SignalLength, JBB);
if ((getNodeState().startLevel == NodeState::SL_STARTED || if ((getNodeState().startLevel == NodeState::SL_STARTED ||
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
#include "Configuration.hpp" #include "Configuration.hpp"
#include <TransporterRegistry.hpp> #include <TransporterRegistry.hpp>
#include "SimBlockList.hpp" #include "vm/SimBlockList.hpp"
#include "ThreadConfig.hpp" #include "ThreadConfig.hpp"
#include <SignalLoggerManager.hpp> #include <SignalLoggerManager.hpp>
#include <NdbOut.hpp> #include <NdbOut.hpp>
...@@ -171,13 +171,29 @@ NDB_MAIN(ndb_kernel){ ...@@ -171,13 +171,29 @@ NDB_MAIN(ndb_kernel){
NDB_ASSERT(0, "Illegal state globalData.theRestartFlag"); NDB_ASSERT(0, "Illegal state globalData.theRestartFlag");
} }
SocketServer socket_server;
globalTransporterRegistry.startSending(); globalTransporterRegistry.startSending();
globalTransporterRegistry.startReceiving(); globalTransporterRegistry.startReceiving();
if (!globalTransporterRegistry.start_service(socket_server))
NDB_ASSERT(0, "globalTransporterRegistry.start_service() failed");
if (!globalTransporterRegistry.start_clients())
NDB_ASSERT(0, "globalTransporterRegistry.start_clients() failed");
globalEmulatorData.theWatchDog->doStart(); globalEmulatorData.theWatchDog->doStart();
socket_server.startServer();
globalEmulatorData.theThreadConfig->ipControlLoop(); globalEmulatorData.theThreadConfig->ipControlLoop();
NdbShutdown(NST_Normal); NdbShutdown(NST_Normal);
socket_server.stopServer();
socket_server.stopSessions();
globalTransporterRegistry.stop_clients();
return NRT_Default; return NRT_Default;
} }
......
...@@ -147,8 +147,8 @@ void ThreadConfig::ipControlLoop() ...@@ -147,8 +147,8 @@ void ThreadConfig::ipControlLoop()
// plus checking for any received messages. // plus checking for any received messages.
//-------------------------------------------------------------------- //--------------------------------------------------------------------
if (i++ >= 20) { if (i++ >= 20) {
globalTransporterRegistry.update_connections();
globalData.incrementWatchDogCounter(5); globalData.incrementWatchDogCounter(5);
globalTransporterRegistry.checkConnections();
i = 0; i = 0;
}//if }//if
......
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
#include <NdbOut.hpp> #include <NdbOut.hpp>
#include <SocketServer.hpp> #include <SocketServer.hpp>
#include <SocketClient.hpp>
#include <Parser.hpp> #include <Parser.hpp>
#include <OutputStream.hpp> #include <OutputStream.hpp>
#include <InputStream.hpp> #include <InputStream.hpp>
...@@ -318,8 +319,8 @@ ndb_mgm_call(NdbMgmHandle handle, const ParserRow<ParserDummy> *command_reply, ...@@ -318,8 +319,8 @@ ndb_mgm_call(NdbMgmHandle handle, const ParserRow<ParserDummy> *command_reply,
/** /**
* Print some info about why the parser returns NULL * Print some info about why the parser returns NULL
*/ */
// ndbout << " status=" << ctx.m_status << ", curr=" //ndbout << " status=" << ctx.m_status << ", curr="
// << ctx.m_currentToken << endl; //<< ctx.m_currentToken << endl;
} }
#ifdef MGMAPI_LOG #ifdef MGMAPI_LOG
else { else {
...@@ -362,30 +363,11 @@ ndb_mgm_connect(NdbMgmHandle handle, const char * mgmsrv) ...@@ -362,30 +363,11 @@ ndb_mgm_connect(NdbMgmHandle handle, const char * mgmsrv)
/** /**
* Do connect * Do connect
*/ */
const NDB_SOCKET_TYPE sockfd = socket(AF_INET, SOCK_STREAM, 0); SocketClient s(handle->hostname, handle->port);
if (sockfd == NDB_INVALID_SOCKET) { const NDB_SOCKET_TYPE sockfd = s.connect();
SET_ERROR(handle, NDB_MGM_ILLEGAL_SOCKET, ""); if (sockfd < 0) {
return -1; setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__,
} "Unable to connect to %s", mgmsrv);
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(handle->port);
// Convert ip address presentation format to numeric format
const int res1 = Ndb_getInAddr(&servaddr.sin_addr, handle->hostname);
if (res1 != 0) {
DEBUG("Ndb_getInAddr(...) == -1");
setError(handle, EINVAL, __LINE__, "Invalid hostname/address");
return -1;
}
const int res2 = connect(sockfd, (struct sockaddr*) &servaddr,
sizeof(servaddr));
if (res2 == -1) {
NDB_CLOSE_SOCKET(sockfd);
setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__, "Unable to connect to %s",
mgmsrv);
return -1; return -1;
} }
...@@ -1523,6 +1505,55 @@ ndb_mgm_get_configuration(NdbMgmHandle handle, unsigned int version) { ...@@ -1523,6 +1505,55 @@ ndb_mgm_get_configuration(NdbMgmHandle handle, unsigned int version) {
return 0; return 0;
} }
extern "C"
int
ndb_mgm_alloc_nodeid(NdbMgmHandle handle, unsigned int version, unsigned *pnodeid, int nodetype)
{
CHECK_HANDLE(handle, 0);
CHECK_CONNECTED(handle, 0);
Properties args;
args.put("version", version);
args.put("nodetype", nodetype);
args.put("nodeid", *pnodeid);
args.put("user", "mysqld");
args.put("password", "mysqld");
args.put("public key", "a public key");
const ParserRow<ParserDummy> reply[]= {
MGM_CMD("get nodeid reply", NULL, ""),
MGM_ARG("nodeid", Int, Optional, "Error message"),
MGM_ARG("result", String, Mandatory, "Error message"),
MGM_END()
};
const Properties *prop;
prop= ndb_mgm_call(handle, reply, "get nodeid", &args);
if(prop == NULL) {
SET_ERROR(handle, EIO, "Unable to alloc nodeid");
return -1;
}
int res= -1;
do {
const char * buf;
if(!prop->get("result", &buf) || strcmp(buf, "Ok") != 0){
ndbout_c("ERROR Message: %s\n", buf);
break;
}
if(!prop->get("nodeid", pnodeid) != 0){
ndbout_c("ERROR Message: <nodeid Unspecified>\n");
break;
}
res= 0;
}while(0);
delete prop;
return res;
}
/***************************************************************************** /*****************************************************************************
* Global Replication * Global Replication
******************************************************************************/ ******************************************************************************/
......
...@@ -43,7 +43,7 @@ ...@@ -43,7 +43,7 @@
#include <DebuggerNames.hpp> #include <DebuggerNames.hpp>
#include <ndb_version.h> #include <ndb_version.h>
#include "SocketServer.hpp" #include <SocketServer.hpp>
#include "NodeLogLevel.hpp" #include "NodeLogLevel.hpp"
#include <NdbConfig.h> #include <NdbConfig.h>
...@@ -390,6 +390,95 @@ MgmtSrvr::getNodeCount(enum ndb_mgm_node_type type) const ...@@ -390,6 +390,95 @@ MgmtSrvr::getNodeCount(enum ndb_mgm_node_type type) const
return count; return count;
} }
int
MgmtSrvr::getPort() const {
const Properties *mgmProps;
ndb_mgm_configuration_iterator * iter =
ndb_mgm_create_configuration_iterator(_config->m_configValues,
CFG_SECTION_NODE);
if(iter == 0)
return 0;
if(ndb_mgm_find(iter, CFG_NODE_ID, getOwnNodeId()) != 0){
ndbout << "Could not retrieve configuration for Node "
<< getOwnNodeId() << " in config file." << endl
<< "Have you set correct NodeId for this node?" << endl;
ndb_mgm_destroy_iterator(iter);
return 0;
}
unsigned type;
if(ndb_mgm_get_int_parameter(iter, CFG_TYPE_OF_SECTION, &type) != 0 ||
type != NODE_TYPE_MGM){
ndbout << "Local node id " << getOwnNodeId()
<< " is not defined as management server" << endl
<< "Have you set correct NodeId for this node?" << endl;
return 0;
}
Uint32 port = 0;
if(ndb_mgm_get_int_parameter(iter, CFG_MGM_PORT, &port) != 0){
ndbout << "Could not find PortNumber in the configuration file." << endl;
return 0;
}
/*****************
* Set Stat Port *
*****************/
#if 0
if (!mgmProps->get("PortNumberStats", &tmp)){
ndbout << "Could not find PortNumberStats in the configuration file."
<< endl;
return false;
}
glob.port_stats = tmp;
#endif
#if 0
const char * host;
if(ndb_mgm_get_string_parameter(iter, mgmProps->get("ExecuteOnComputer", host)){
ndbout << "Failed to find \"ExecuteOnComputer\" for my node" << endl;
ndbout << "Unable to verify own hostname" << endl;
return false;
}
const char * hostname;
{
const Properties * p;
char buf[255];
snprintf(buf, sizeof(buf), "Computer_%s", host.c_str());
if(!glob.cluster_config->get(buf, &p)){
ndbout << "Failed to find computer " << host << " in config" << endl;
ndbout << "Unable to verify own hostname" << endl;
return false;
}
if(!p->get("HostName", &hostname)){
ndbout << "Failed to find \"HostName\" for computer " << host
<< " in config" << endl;
ndbout << "Unable to verify own hostname" << endl;
return false;
}
if(NdbHost_GetHostName(buf) != 0){
ndbout << "Unable to get own hostname" << endl;
ndbout << "Unable to verify own hostname" << endl;
return false;
}
}
const char * ip_address;
if(mgmProps->get("IpAddress", &ip_address)){
glob.use_specific_ip = true;
glob.interface_name = strdup(ip_address);
return true;
}
glob.interface_name = strdup(hostname);
#endif
return port;
}
int int
MgmtSrvr::getStatPort() const { MgmtSrvr::getStatPort() const {
#if 0 #if 0
...@@ -419,7 +508,6 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId, ...@@ -419,7 +508,6 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId,
theWaitState(WAIT_SUBSCRIBE_CONF), theWaitState(WAIT_SUBSCRIBE_CONF),
theConfCount(0) { theConfCount(0) {
_ownNodeId = nodeId;
_config = NULL; _config = NULL;
_isStatPortActive = false; _isStatPortActive = false;
_isClusterLogStatActive = false; _isClusterLogStatActive = false;
...@@ -429,6 +517,8 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId, ...@@ -429,6 +517,8 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId,
_logLevelThreadSleep = 500; _logLevelThreadSleep = 500;
_startedNodeId = 0; _startedNodeId = 0;
theFacade = 0;
m_newConfig = NULL; m_newConfig = NULL;
m_configFilename = configFilename; m_configFilename = configFilename;
setCallback(CmdBackupCallback); setCallback(CmdBackupCallback);
...@@ -486,6 +576,15 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId, ...@@ -486,6 +576,15 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId,
_clusterLogLevelList = new NodeLogLevelList(); _clusterLogLevelList = new NodeLogLevelList();
_props = NULL; _props = NULL;
_ownNodeId= 0;
NodeId tmp= nodeId > 0 ? nodeId-1 : 0;
if (getNextFreeNodeId(&tmp, NDB_MGM_NODE_TYPE_MGM)){
_ownNodeId= tmp;
if (nodeId != 0 && nodeId != tmp)
_ownNodeId= 0; // did not get nodeid requested
} else
NDB_ASSERT(0, "Unable to retrieve own node id");
} }
...@@ -510,8 +609,7 @@ MgmtSrvr::start() ...@@ -510,8 +609,7 @@ MgmtSrvr::start()
return false; return false;
} }
theFacade = TransporterFacade::start_instance theFacade = TransporterFacade::start_instance
(_ownNodeId, (_ownNodeId,(ndb_mgm_configuration*)_config->m_configValues);
(ndb_mgm_configuration*)_config->m_configValues);
if(theFacade == 0) { if(theFacade == 0) {
DEBUG("MgmtSrvr.cpp: theFacade is NULL."); DEBUG("MgmtSrvr.cpp: theFacade is NULL.");
...@@ -1896,6 +1994,7 @@ MgmtSrvr::handleReceivedSignal(NdbApiSignal* signal) ...@@ -1896,6 +1994,7 @@ MgmtSrvr::handleReceivedSignal(NdbApiSignal* signal)
int returnCode; int returnCode;
int gsn = signal->readSignalNumber(); int gsn = signal->readSignalNumber();
switch (gsn) { switch (gsn) {
case GSN_API_VERSION_CONF: { case GSN_API_VERSION_CONF: {
if (theWaitState == WAIT_VERSION) { if (theWaitState == WAIT_VERSION) {
...@@ -2187,6 +2286,36 @@ MgmtSrvr::getNodeType(NodeId nodeId) const ...@@ -2187,6 +2286,36 @@ MgmtSrvr::getNodeType(NodeId nodeId) const
return nodeTypes[nodeId]; return nodeTypes[nodeId];
} }
bool
MgmtSrvr::getNextFreeNodeId(NodeId * nodeId,
enum ndb_mgm_node_type type) const
{
#if 0
ndbout << "MgmtSrvr::getNextFreeNodeId type=" << type
<< " *nodeid=" << *nodeId << endl;
#endif
NodeId tmp= *nodeId;
if (theFacade && theFacade->theClusterMgr) {
while(getNextNodeId(&tmp, type)){
if (theFacade->theClusterMgr->m_connected_nodes.get(tmp))
continue;
#if 0
ndbout << "MgmtSrvr::getNextFreeNodeId ret=" << tmp << endl;
#endif
*nodeId= tmp;
return true;
}
} else if (getNextNodeId(&tmp, type)){
#if 0
ndbout << "MgmtSrvr::getNextFreeNodeId (theFacade==0) ret=" << tmp << endl;
#endif
*nodeId= tmp;
return true;
}
return false;
}
bool bool
MgmtSrvr::getNextNodeId(NodeId * nodeId, enum ndb_mgm_node_type type) const MgmtSrvr::getNextNodeId(NodeId * nodeId, enum ndb_mgm_node_type type) const
{ {
......
...@@ -150,10 +150,12 @@ public: ...@@ -150,10 +150,12 @@ public:
enum LogMode {In, Out, InOut, Off}; enum LogMode {In, Out, InOut, Off};
/* Constructor */ /* Constructor */
MgmtSrvr(NodeId nodeId, /* Local nodeid */ MgmtSrvr(NodeId nodeId, /* Local nodeid */
const BaseString &config_filename, /* Where to save config */ const BaseString &config_filename, /* Where to save config */
const BaseString &ndb_config_filename, /* Ndb.cfg filename */ const BaseString &ndb_config_filename, /* Ndb.cfg filename */
Config * config); Config * config);
NodeId getOwnNodeId() const {return _ownNodeId;};
/** /**
* Read (initial) config file, create TransporterFacade, * Read (initial) config file, create TransporterFacade,
...@@ -448,6 +450,7 @@ public: ...@@ -448,6 +450,7 @@ public:
* @return false if none found * @return false if none found
*/ */
bool getNextNodeId(NodeId * _nodeId, enum ndb_mgm_node_type type) const ; bool getNextNodeId(NodeId * _nodeId, enum ndb_mgm_node_type type) const ;
bool getNextFreeNodeId(NodeId * _nodeId, enum ndb_mgm_node_type type) const ;
/** /**
* *
...@@ -492,6 +495,11 @@ public: ...@@ -492,6 +495,11 @@ public:
* @return statistic port number. * @return statistic port number.
*/ */
int getStatPort() const; int getStatPort() const;
/**
* Returns the port number.
* @return port number.
*/
int getPort() const;
//************************************************************************** //**************************************************************************
......
...@@ -121,6 +121,14 @@ ParserRow<MgmApiSession> commands[] = { ...@@ -121,6 +121,14 @@ ParserRow<MgmApiSession> commands[] = {
MGM_ARG("version", Int, Mandatory, "Configuration version number"), MGM_ARG("version", Int, Mandatory, "Configuration version number"),
MGM_ARG("node", Int, Optional, "Node ID"), MGM_ARG("node", Int, Optional, "Node ID"),
MGM_CMD("get nodeid", &MgmApiSession::get_nodeid, ""),
MGM_ARG("version", Int, Mandatory, "Configuration version number"),
MGM_ARG("nodetype", Int, Mandatory, "Node type"),
MGM_ARG("nodeid", Int, Optional, "Node ID"),
MGM_ARG("user", String, Mandatory, "Password"),
MGM_ARG("password", String, Mandatory, "Password"),
MGM_ARG("public key", String, Mandatory, "Public key"),
MGM_CMD("get version", &MgmApiSession::getVersion, ""), MGM_CMD("get version", &MgmApiSession::getVersion, ""),
MGM_CMD("get status", &MgmApiSession::getStatus, ""), MGM_CMD("get status", &MgmApiSession::getStatus, ""),
...@@ -332,6 +340,82 @@ backward(const char * base, const Properties* reply){ ...@@ -332,6 +340,82 @@ backward(const char * base, const Properties* reply){
return ret; return ret;
} }
void
MgmApiSession::get_nodeid(Parser_t::Context &,
const class Properties &args)
{
const char *cmd= "get nodeid reply";
Uint32 version, nodeid= 0, nodetype= 0xff;
const char * user;
const char * password;
const char * public_key;
args.get("version", &version);
args.get("nodetype", &nodetype);
args.get("nodeid", &nodeid);
args.get("user", &user);
args.get("password", &password);
args.get("public key", &public_key);
NodeId free_id= 0;
NodeId tmp= nodeid > 0 ? nodeid-1 : 0;
bool compatible;
switch (nodetype) {
case NODE_TYPE_MGM:
compatible = ndbCompatible_mgmt_api(NDB_VERSION, version);
if (m_mgmsrv.getNextFreeNodeId(&tmp, NDB_MGM_NODE_TYPE_MGM))
free_id= tmp;
break;
case NODE_TYPE_API:
compatible = ndbCompatible_mgmt_api(NDB_VERSION, version);
if (m_mgmsrv.getNextFreeNodeId(&tmp, NDB_MGM_NODE_TYPE_API))
free_id= tmp;
break;
case NODE_TYPE_DB:
compatible = ndbCompatible_mgmt_ndb(NDB_VERSION, version);
if (m_mgmsrv.getNextFreeNodeId(&tmp, NDB_MGM_NODE_TYPE_NDB))
free_id= tmp;
break;
default:
m_output->println(cmd);
m_output->println("result: unknown nodetype %d", nodetype);
m_output->println("");
return;
}
if (nodeid != 0 && free_id != nodeid){
m_output->println(cmd);
m_output->println("result: no free nodeid %d for nodetype %d",
nodeid, nodetype);
m_output->println("");
return;
}
if (free_id == 0){
m_output->println(cmd);
m_output->println("result: no free nodeid for nodetype %d", nodetype);
m_output->println("");
return;
}
#if 0
if (!compatible){
m_output->println(cmd);
m_output->println("result: incompatible version mgmt 0x%x and node 0x%x",
NDB_VERSION, version);
m_output->println("");
return;
}
#endif
m_output->println(cmd);
m_output->println("nodeid: %u", free_id);
m_output->println("result: Ok");
m_output->println("");
return;
}
void void
MgmApiSession::getConfig_common(Parser_t::Context &, MgmApiSession::getConfig_common(Parser_t::Context &,
const class Properties &args, const class Properties &args,
...@@ -432,7 +516,6 @@ MgmApiSession::getConfig_common(Parser_t::Context &, ...@@ -432,7 +516,6 @@ MgmApiSession::getConfig_common(Parser_t::Context &,
m_output->println("Content-Transfer-Encoding: base64"); m_output->println("Content-Transfer-Encoding: base64");
m_output->println(""); m_output->println("");
m_output->println(str.c_str()); m_output->println(str.c_str());
m_output->println("");
return; return;
} }
......
...@@ -51,6 +51,7 @@ public: ...@@ -51,6 +51,7 @@ public:
void getConfig_old(Parser_t::Context &ctx); void getConfig_old(Parser_t::Context &ctx);
#endif /* MGM_GET_CONFIG_BACKWARDS_COMPAT */ #endif /* MGM_GET_CONFIG_BACKWARDS_COMPAT */
void get_nodeid(Parser_t::Context &ctx, const class Properties &args);
void getVersion(Parser_t::Context &ctx, const class Properties &args); void getVersion(Parser_t::Context &ctx, const class Properties &args);
void getStatus(Parser_t::Context &ctx, const class Properties &args); void getStatus(Parser_t::Context &ctx, const class Properties &args);
void getInfoClusterLog(Parser_t::Context &ctx, const class Properties &args); void getInfoClusterLog(Parser_t::Context &ctx, const class Properties &args);
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
#include "MgmtSrvr.hpp" #include "MgmtSrvr.hpp"
#include "EventLogger.hpp" #include "EventLogger.hpp"
#include "Config.hpp" #include <Config.hpp>
#include "InitConfigFileParser.hpp" #include "InitConfigFileParser.hpp"
#include <SocketServer.hpp> #include <SocketServer.hpp>
#include "Services.hpp" #include "Services.hpp"
...@@ -88,7 +88,6 @@ static MgmGlobals glob; ...@@ -88,7 +88,6 @@ static MgmGlobals glob;
******************************************************************************/ ******************************************************************************/
static bool readLocalConfig(); static bool readLocalConfig();
static bool readGlobalConfig(); static bool readGlobalConfig();
static bool setPortNo();
/** /**
* Global variables * Global variables
...@@ -146,7 +145,9 @@ NDB_MAIN(mgmsrv){ ...@@ -146,7 +145,9 @@ NDB_MAIN(mgmsrv){
exit(1); exit(1);
} }
glob.socketServer = new SocketServer(); glob.socketServer = new SocketServer();
MgmApiService * mapi = new MgmApiService(); MgmApiService * mapi = new MgmApiService();
MgmStatService * mstat = new MgmStatService(); MgmStatService * mstat = new MgmStatService();
/**************************** /****************************
...@@ -157,9 +158,26 @@ NDB_MAIN(mgmsrv){ ...@@ -157,9 +158,26 @@ NDB_MAIN(mgmsrv){
if (!readGlobalConfig()) if (!readGlobalConfig())
goto error_end; goto error_end;
if (!setPortNo()) glob.mgmObject = new MgmtSrvr(glob.localNodeId,
BaseString(glob.config_filename),
BaseString(glob.local_config_filename == 0 ?
"" : glob.local_config_filename),
glob.cluster_config);
glob.cluster_config = 0;
glob.localNodeId= glob.mgmObject->getOwnNodeId();
if (glob.localNodeId == 0)
goto error_end; goto error_end;
glob.port= glob.mgmObject->getPort();
if (glob.port == 0)
goto error_end;
glob.interface_name = 0;
glob.use_specific_ip = false;
if(!glob.use_specific_ip){ if(!glob.use_specific_ip){
if(!glob.socketServer->tryBind(glob.port, glob.interface_name)){ if(!glob.socketServer->tryBind(glob.port, glob.interface_name)){
ndbout_c("Unable to setup port: %s:%d!\n" ndbout_c("Unable to setup port: %s:%d!\n"
...@@ -190,15 +208,8 @@ NDB_MAIN(mgmsrv){ ...@@ -190,15 +208,8 @@ NDB_MAIN(mgmsrv){
goto error_end; goto error_end;
} }
glob.mgmObject = new MgmtSrvr(glob.localNodeId,
BaseString(glob.config_filename),
BaseString(glob.local_config_filename == 0 ? "" : glob.local_config_filename),
glob.cluster_config);
glob.cluster_config = 0;
if(!glob.mgmObject->check_start()){ if(!glob.mgmObject->check_start()){
ndbout_c("Unable to start management server."); ndbout_c("Unable to check start management server.");
ndbout_c("Probably caused by illegal initial configuration file."); ndbout_c("Probably caused by illegal initial configuration file.");
goto error_end; goto error_end;
} }
...@@ -343,108 +354,3 @@ readGlobalConfig() { ...@@ -343,108 +354,3 @@ readGlobalConfig() {
} }
return true; return true;
} }
/**
* @fn setPortNo
* @param glob : Global variables
* @return true if success, false otherwise.
*
* Port number:
* 2. Use port number from global configuration file
* 4. Use port number for statistics from global configuration file
*/
static bool
setPortNo(){
const Properties *mgmProps;
ndb_mgm_configuration_iterator * iter =
ndb_mgm_create_configuration_iterator(glob.cluster_config->m_configValues,
CFG_SECTION_NODE);
if(iter == 0)
return false;
if(ndb_mgm_find(iter, CFG_NODE_ID, glob.localNodeId) != 0){
ndbout << "Could not retrieve configuration for Node "
<< glob.localNodeId << " in config file." << endl
<< "Have you set correct NodeId for this node?" << endl;
ndb_mgm_destroy_iterator(iter);
return false;
}
unsigned type;
if(ndb_mgm_get_int_parameter(iter, CFG_TYPE_OF_SECTION, &type) != 0 ||
type != NODE_TYPE_MGM){
ndbout << "Local node id " << glob.localNodeId
<< " is not defined as management server" << endl
<< "Have you set correct NodeId for this node?" << endl;
return false;
}
/************
* Set Port *
************/
Uint32 tmp = 0;
if(ndb_mgm_get_int_parameter(iter, CFG_MGM_PORT, &tmp) != 0){
ndbout << "Could not find PortNumber in the configuration file." << endl;
return false;
}
glob.port = tmp;
/*****************
* Set Stat Port *
*****************/
#if 0
if (!mgmProps->get("PortNumberStats", &tmp)){
ndbout << "Could not find PortNumberStats in the configuration file."
<< endl;
return false;
}
glob.port_stats = tmp;
#endif
#if 0
const char * host;
if(ndb_mgm_get_string_parameter(iter, mgmProps->get("ExecuteOnComputer", host)){
ndbout << "Failed to find \"ExecuteOnComputer\" for my node" << endl;
ndbout << "Unable to verify own hostname" << endl;
return false;
}
const char * hostname;
{
const Properties * p;
char buf[255];
snprintf(buf, sizeof(buf), "Computer_%s", host.c_str());
if(!glob.cluster_config->get(buf, &p)){
ndbout << "Failed to find computer " << host << " in config" << endl;
ndbout << "Unable to verify own hostname" << endl;
return false;
}
if(!p->get("HostName", &hostname)){
ndbout << "Failed to find \"HostName\" for computer " << host
<< " in config" << endl;
ndbout << "Unable to verify own hostname" << endl;
return false;
}
if(NdbHost_GetHostName(buf) != 0){
ndbout << "Unable to get own hostname" << endl;
ndbout << "Unable to verify own hostname" << endl;
return false;
}
}
const char * ip_address;
if(mgmProps->get("IpAddress", &ip_address)){
glob.use_specific_ip = true;
glob.interface_name = strdup(ip_address);
return true;
}
glob.interface_name = strdup(hostname);
#endif
glob.interface_name = 0;
glob.use_specific_ip = false;
return true;
}
...@@ -295,11 +295,14 @@ ClusterMgr::execAPI_REGREQ(const Uint32 * theData){ ...@@ -295,11 +295,14 @@ ClusterMgr::execAPI_REGREQ(const Uint32 * theData){
} }
int global_mgmt_server_check = 0; // set to one in mgmtsrvr main; int global_mgmt_server_check = 0; // set to one in mgmtsrvr main;
void void
ClusterMgr::execAPI_REGCONF(const Uint32 * theData){ ClusterMgr::execAPI_REGCONF(const Uint32 * theData){
const ApiRegConf * const apiRegConf = (ApiRegConf *)&theData[0]; const ApiRegConf * const apiRegConf = (ApiRegConf *)&theData[0];
const NodeId nodeId = refToNode(apiRegConf->qmgrRef); const NodeId nodeId = refToNode(apiRegConf->qmgrRef);
m_connected_nodes.assign(apiRegConf->connected_nodes);
#if 0 #if 0
ndbout_c("ClusterMgr: Recd API_REGCONF from node %d", nodeId); ndbout_c("ClusterMgr: Recd API_REGCONF from node %d", nodeId);
#endif #endif
...@@ -309,6 +312,7 @@ ClusterMgr::execAPI_REGCONF(const Uint32 * theData){ ...@@ -309,6 +312,7 @@ ClusterMgr::execAPI_REGCONF(const Uint32 * theData){
Node & node = theNodes[nodeId]; Node & node = theNodes[nodeId];
assert(node.defined == true); assert(node.defined == true);
assert(node.connected == true); assert(node.connected == true);
if(node.m_info.m_version != apiRegConf->version){ if(node.m_info.m_version != apiRegConf->version){
node.m_info.m_version = apiRegConf->version; node.m_info.m_version = apiRegConf->version;
if (global_mgmt_server_check == 1) if (global_mgmt_server_check == 1)
...@@ -422,6 +426,8 @@ ClusterMgr::reportDisconnected(NodeId nodeId){ ...@@ -422,6 +426,8 @@ ClusterMgr::reportDisconnected(NodeId nodeId){
void void
ClusterMgr::reportNodeFailed(NodeId nodeId){ ClusterMgr::reportNodeFailed(NodeId nodeId){
m_connected_nodes.clear(nodeId);
Node & theNode = theNodes[nodeId]; Node & theNode = theNodes[nodeId];
theNode.m_alive = false; theNode.m_alive = false;
......
...@@ -78,6 +78,7 @@ public: ...@@ -78,6 +78,7 @@ public:
const Node & getNodeInfo(NodeId) const; const Node & getNodeInfo(NodeId) const;
Uint32 getNoOfConnectedNodes() const; Uint32 getNoOfConnectedNodes() const;
NodeBitmask m_connected_nodes;
private: private:
Uint32 noOfConnectedNodes; Uint32 noOfConnectedNodes;
......
...@@ -39,6 +39,7 @@ ...@@ -39,6 +39,7 @@
#endif #endif
//#define REPORT_TRANSPORTER //#define REPORT_TRANSPORTER
//#define API_TRACE;
#if defined DEBUG_TRANSPORTER #if defined DEBUG_TRANSPORTER
#define TRP_DEBUG(t) ndbout << __FILE__ << ":" << __LINE__ << ":" << t << endl; #define TRP_DEBUG(t) ndbout << __FILE__ << ":" << __LINE__ << ":" << t << endl;
...@@ -440,7 +441,17 @@ runSendRequest_C(void * me) ...@@ -440,7 +441,17 @@ runSendRequest_C(void * me)
void TransporterFacade::threadMainSend(void) void TransporterFacade::threadMainSend(void)
{ {
SocketServer socket_server;
theTransporterRegistry->startSending(); theTransporterRegistry->startSending();
if (!theTransporterRegistry->start_service(socket_server))
NDB_ASSERT(0, "Unable to start theTransporterRegistry->start_service");
if (!theTransporterRegistry->start_clients())
NDB_ASSERT(0, "Unable to start theTransporterRegistry->start_clients");
socket_server.startServer();
while(!theStopReceive) { while(!theStopReceive) {
NdbSleep_MilliSleep(10); NdbSleep_MilliSleep(10);
NdbMutex_Lock(theMutexPtr); NdbMutex_Lock(theMutexPtr);
...@@ -451,6 +462,11 @@ void TransporterFacade::threadMainSend(void) ...@@ -451,6 +462,11 @@ void TransporterFacade::threadMainSend(void)
NdbMutex_Unlock(theMutexPtr); NdbMutex_Unlock(theMutexPtr);
} }
theTransporterRegistry->stopSending(); theTransporterRegistry->stopSending();
socket_server.stopServer();
socket_server.stopSessions();
theTransporterRegistry->stop_clients();
} }
extern "C" extern "C"
...@@ -466,7 +482,7 @@ void TransporterFacade::threadMainReceive(void) ...@@ -466,7 +482,7 @@ void TransporterFacade::threadMainReceive(void)
{ {
theTransporterRegistry->startReceiving(); theTransporterRegistry->startReceiving();
NdbMutex_Lock(theMutexPtr); NdbMutex_Lock(theMutexPtr);
theTransporterRegistry->checkConnections(); theTransporterRegistry->update_connections();
NdbMutex_Unlock(theMutexPtr); NdbMutex_Unlock(theMutexPtr);
while(!theStopReceive) { while(!theStopReceive) {
for(int i = 0; i<10; i++){ for(int i = 0; i<10; i++){
...@@ -478,7 +494,7 @@ void TransporterFacade::threadMainReceive(void) ...@@ -478,7 +494,7 @@ void TransporterFacade::threadMainReceive(void)
} }
} }
NdbMutex_Lock(theMutexPtr); NdbMutex_Lock(theMutexPtr);
theTransporterRegistry->checkConnections(); theTransporterRegistry->update_connections();
NdbMutex_Unlock(theMutexPtr); NdbMutex_Unlock(theMutexPtr);
}//while }//while
theTransporterRegistry->stopReceiving(); theTransporterRegistry->stopReceiving();
...@@ -875,13 +891,13 @@ TransporterFacade::sendFragmentedSignalUnCond(NdbApiSignal* aSignal, ...@@ -875,13 +891,13 @@ TransporterFacade::sendFragmentedSignalUnCond(NdbApiSignal* aSignal,
void void
TransporterFacade::doConnect(int aNodeId){ TransporterFacade::doConnect(int aNodeId){
theTransporterRegistry->setIOState(aNodeId, NoHalt); theTransporterRegistry->setIOState(aNodeId, NoHalt);
theTransporterRegistry->setPerformState(aNodeId, PerformConnect); theTransporterRegistry->do_connect(aNodeId);
} }
void void
TransporterFacade::doDisconnect(int aNodeId) TransporterFacade::doDisconnect(int aNodeId)
{ {
theTransporterRegistry->setPerformState(aNodeId, PerformDisconnect); theTransporterRegistry->do_disconnect(aNodeId);
} }
void void
...@@ -906,7 +922,7 @@ TransporterFacade::ownId() const ...@@ -906,7 +922,7 @@ TransporterFacade::ownId() const
bool bool
TransporterFacade::isConnected(NodeId aNodeId){ TransporterFacade::isConnected(NodeId aNodeId){
return theTransporterRegistry->performState(aNodeId) == PerformIO; return theTransporterRegistry->is_connected(aNodeId);
} }
NodeId NodeId
......
...@@ -110,7 +110,6 @@ public: ...@@ -110,7 +110,6 @@ public:
// Close this block number // Close this block number
int close_local(BlockNumber blockNumber); int close_local(BlockNumber blockNumber);
void setState(Uint32 aNodeId, PerformState aState);
private: private:
/** /**
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment