Merge tulin@bk-internal.mysql.com:/home/bk/mysql-4.1-ndb

into poseidon.bredbandsbolaget.se:/home/tomas/mysql-4.1-ndb
parents 5865b63f efa105bc
......@@ -80,13 +80,15 @@ class ApiRegConf {
friend class ClusterMgr;
public:
STATIC_CONST( SignalLength = 3 + NodeState::DataLength );
STATIC_CONST( SignalLength = 3 + NodeState::DataLength +
NdbNodeBitmask::Size );
private:
Uint32 qmgrRef;
Uint32 version; // Version of NDB node
Uint32 apiHeartbeatFrequency;
NodeState nodeState;
Bitmask<NdbNodeBitmask::Size>::Data connected_nodes;
};
#endif
......@@ -666,6 +666,11 @@ extern "C" {
*/
struct ndb_mgm_configuration * ndb_mgm_get_configuration(NdbMgmHandle handle,
unsigned version);
int ndb_mgm_alloc_nodeid(NdbMgmHandle handle,
unsigned version,
unsigned *pnodeid,
int nodetype);
/**
* Config iterator
*/
......
......@@ -76,6 +76,8 @@
#define CFG_DB_DISCLESS 148
#define CFG_DB_SERVER_PORT 149
#define CFG_NODE_ARBIT_RANK 200
#define CFG_NODE_ARBIT_DELAY 201
......
......@@ -77,7 +77,7 @@ public:
* Get config using socket
*/
struct ndb_mgm_configuration * getConfig(const char * mgmhost, short port,
int versionId);
int versionId, int nodetype);
/**
* Get config from file
*/
......@@ -98,7 +98,7 @@ private:
char * m_connectString;
char * m_defaultConnectString;
/**
* Verify config
*/
......
......@@ -64,7 +64,7 @@ typedef int socklen_t;
#define NDB_NONBLOCK O_NONBLOCK
#define NDB_SOCKET_TYPE int
#define NDB_INVALID_SOCKET -1
#define NDB_CLOSE_SOCKET(x) close(x)
#define NDB_CLOSE_SOCKET(x) ::close(x)
#define InetErrno errno
......
......@@ -29,20 +29,10 @@
#define TransporterRegistry_H
#include "TransporterDefinitions.hpp"
#include <SocketServer.hpp>
#include <NdbTCP.h>
// A transporter is always in a PerformState.
// PerformIO is used initially and as long as any of the events
// PerformConnect, ...
enum PerformState {
PerformNothing = 4, // Does nothing
PerformIO = 0, // Is connected
PerformConnect = 1, // Is trying to connect
PerformDisconnect = 2, // Trying to disconnect
RemoveTransporter = 3 // Will be removed
};
// A transporter is always in an IOState.
// NoHalt is used initially and as long as it is no restrictions on
// sending or receiving.
......@@ -60,18 +50,45 @@ enum TransporterType {
tt_OSE_TRANSPORTER = 4
};
static const char *performStateString[] =
{ "is connected",
"is trying to connect",
"does nothing",
"is trying to disconnect" };
class Transporter;
class TCP_Transporter;
class SCI_Transporter;
class SHM_Transporter;
class OSE_Transporter;
class TransporterRegistry;
class SocketAuthenticator;
class TransporterService : public SocketServer::Service {
SocketAuthenticator * m_auth;
TransporterRegistry * m_transporter_registry;
public:
TransporterService(SocketAuthenticator *auth= 0)
{
m_auth= auth;
m_transporter_registry= 0;
}
void setTransporterRegistry(TransporterRegistry *t)
{
m_transporter_registry= t;
}
SocketServer::Session * newSession(NDB_SOCKET_TYPE socket);
};
/**
* @class TransporterRegistry
* @brief ...
*/
class TransporterRegistry {
friend class OSE_Receiver;
friend class Transporter;
friend class TransporterService;
public:
/**
* Constructor
......@@ -98,6 +115,12 @@ public:
*/
~TransporterRegistry();
bool start_service(SocketServer& server);
bool start_clients();
bool stop_clients();
void start_clients_thread();
void update_connections();
/**
* Start/Stop receiving
*/
......@@ -110,16 +133,26 @@ public:
void startSending();
void stopSending();
// A transporter is always in a PerformState.
// PerformIO is used initially and as long as any of the events
// PerformConnect, ...
enum PerformState {
CONNECTED = 0,
CONNECTING = 1,
DISCONNECTED = 2,
DISCONNECTING = 3
};
const char *getPerformStateString(NodeId nodeId) const
{ return performStateString[(unsigned)performStates[nodeId]]; };
/**
* Get and set methods for PerformState
*/
PerformState performState(NodeId nodeId);
void setPerformState(NodeId nodeId, PerformState state);
/**
* Set perform state for all transporters
*/
void setPerformState(PerformState state);
void do_connect(NodeId node_id);
void do_disconnect(NodeId node_id);
bool is_connected(NodeId node_id) { return performStates[node_id] == CONNECTED; };
void report_connect(NodeId node_id);
void report_disconnect(NodeId node_id, int errnum);
/**
* Get and set methods for IOState
......@@ -174,8 +207,6 @@ public:
void performReceive();
void performSend();
void checkConnections();
/**
* Force sending if more than or equal to sendLimit
* number have asked for send. Returns 0 if not sending
......@@ -192,6 +223,12 @@ protected:
private:
void * callbackObj;
TransporterService *m_transporter_service;
unsigned short m_service_port;
char *m_interface_name;
struct NdbThread *m_start_clients_thread;
bool m_run_start_clients_thread;
int sendCounter;
NodeId localNodeId;
bool nodeIdSpecified;
......@@ -202,11 +239,6 @@ private:
int nSHMTransporters;
int nOSETransporters;
int m_ccCount;
int m_ccIndex;
int m_ccStep;
int m_nTransportersPerformConnect;
bool m_ccReady;
/**
* Arrays holding all transporters in the order they are created
*/
......
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef SOCKET_AUTHENTICATOR_HPP
#define SOCKET_AUTHENTICATOR_HPP
class SocketAuthenticator
{
public:
virtual ~SocketAuthenticator() {};
virtual bool client_authenticate(int sockfd) = 0;
virtual bool server_authenticate(int sockfd) = 0;
};
class SocketAuthSimple : public SocketAuthenticator
{
const char *m_passwd;
char *m_buf;
public:
SocketAuthSimple(const char *passwd);
virtual ~SocketAuthSimple();
virtual bool client_authenticate(int sockfd);
virtual bool server_authenticate(int sockfd);
};
#endif // SOCKET_AUTHENTICATOR_HPP
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef SOCKET_CLIENT_HPP
#define SOCKET_CLIENT_HPP
#include <NdbTCP.h>
class SocketAuthenticator;
class SocketClient
{
NDB_SOCKET_TYPE m_sockfd;
struct sockaddr_in m_servaddr;
unsigned short m_port;
char *m_server_name;
SocketAuthenticator *m_auth;
public:
SocketClient(const char *server_name, unsigned short port, SocketAuthenticator *sa = 0);
~SocketClient();
bool init();
NDB_SOCKET_TYPE connect();
bool close();
};
#endif // SOCKET_ClIENT_HPP
......@@ -146,13 +146,17 @@ const int ConfigInfo::m_NoOfRules = sizeof(m_SectionRules)/sizeof(SectionRule);
/****************************************************************************
* Config Rules declarations
****************************************************************************/
bool addNodeConnections(Vector<ConfigInfo::ConfigRuleSection>&sections,
struct InitConfigFileParser::Context &ctx,
const char * ruleData);
bool add_node_connections(Vector<ConfigInfo::ConfigRuleSection>&sections,
struct InitConfigFileParser::Context &ctx,
const char * rule_data);
bool add_db_ports(Vector<ConfigInfo::ConfigRuleSection>&sections,
struct InitConfigFileParser::Context &ctx,
const char * rule_data);
const ConfigInfo::ConfigRule
ConfigInfo::m_ConfigRules[] = {
{ addNodeConnections, 0 },
{ add_node_connections, 0 },
{ add_db_ports, 0 },
{ 0, 0 }
};
......@@ -376,6 +380,18 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
1,
(MAX_NODES - 1) },
{
CFG_DB_SERVER_PORT,
"ServerPort",
"DB",
"Port used to setup transporter",
ConfigInfo::USED,
false,
ConfigInfo::INT,
2202,
0,
0x7FFFFFFF },
{
CFG_DB_NO_REPLICAS,
"NoOfReplicas",
......@@ -1231,7 +1247,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
ConfigInfo::USED,
false,
ConfigInfo::STRING,
MANDATORY,
0,
0,
0x7FFFFFFF },
......@@ -1330,7 +1346,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
ConfigInfo::USED,
false,
ConfigInfo::STRING,
MANDATORY,
0,
0,
0x7FFFFFFF },
......@@ -2510,10 +2526,14 @@ fixNodeHostname(InitConfigFileParser::Context & ctx, const char * data){
const char * compId;
if(!ctx.m_currentSection->get("ExecuteOnComputer", &compId)){
require(ctx.m_currentSection->put("HostName", ""));
return true;
#if 0
ctx.reportError("Parameter \"ExecuteOnComputer\" missing from section "
"[%s] starting at line: %d",
ctx.fname, ctx.m_sectionLineno);
return false;
#endif
}
const Properties * computer;
......@@ -3158,9 +3178,9 @@ saveInConfigValues(InitConfigFileParser::Context & ctx, const char * data){
}
bool
addNodeConnections(Vector<ConfigInfo::ConfigRuleSection>&sections,
add_node_connections(Vector<ConfigInfo::ConfigRuleSection>&sections,
struct InitConfigFileParser::Context &ctx,
const char * ruleData)
const char * rule_data)
{
Properties * props= ctx.m_config;
Properties p_connections;
......@@ -3241,3 +3261,10 @@ addNodeConnections(Vector<ConfigInfo::ConfigRuleSection>&sections,
return true;
}
bool add_db_ports(Vector<ConfigInfo::ConfigRuleSection>&sections,
struct InitConfigFileParser::Context &ctx,
const char * rule_data)
{
return true;
}
......@@ -114,7 +114,8 @@ ConfigRetriever::getConfig(int verId, int nodeType) {
struct ndb_mgm_configuration * p = 0;
switch(m->type){
case MgmId_TCP:
p = getConfig(m->data.tcp.remoteHost, m->data.tcp.port, verId);
p = getConfig(m->data.tcp.remoteHost, m->data.tcp.port,
verId, nodeType);
break;
case MgmId_File:
p = getConfig(m->data.file.filename, verId);
......@@ -155,7 +156,8 @@ ConfigRetriever::getConfig(int verId, int nodeType) {
ndb_mgm_configuration *
ConfigRetriever::getConfig(const char * mgmhost,
short port,
int versionId){
int versionId,
int nodetype){
NdbMgmHandle h;
h = ndb_mgm_create_handle();
......@@ -175,6 +177,21 @@ ConfigRetriever::getConfig(const char * mgmhost,
ndb_mgm_configuration * conf = ndb_mgm_get_configuration(h, versionId);
if(conf == 0){
setError(CR_ERROR, ndb_mgm_get_latest_error_desc(h));
ndb_mgm_destroy_handle(&h);
return 0;
}
{
unsigned nodeid= getOwnNodeId();
int res= ndb_mgm_alloc_nodeid(h, versionId, &nodeid, nodetype);
if(res != 0) {
setError(CR_ERROR, ndb_mgm_get_latest_error_desc(h));
ndb_mgm_destroy_handle(&h);
return 0;
}
_ownNodeId= nodeid;
}
ndb_mgm_disconnect(h);
......@@ -329,6 +346,9 @@ ConfigRetriever::verifyConfig(const struct ndb_mgm_configuration * conf,
}
do {
if(strlen(hostname) == 0)
break;
if(strcasecmp(hostname, localhost) == 0)
break;
......
......@@ -21,6 +21,7 @@
LocalConfig::LocalConfig(){
ids = 0; size = 0; items = 0;
error_line = 0; error_msg[0] = 0;
_ownNodeId= 0;
}
bool
......@@ -95,6 +96,11 @@ LocalConfig::init(bool onlyNodeId,
return false;
}
//7. Check
if(readConnectString("host=localhost:2200", onlyNodeId)){
return true;
}
setError(0, "");
return false;
......
......@@ -63,27 +63,23 @@ ndbstrerror::~ndbstrerror(void)
#define ndbstrerror strerror
#endif
TCP_Transporter::TCP_Transporter(int sendBufSize, int maxRecvSize,
int portNo,
const char *rHostName,
TCP_Transporter::TCP_Transporter(TransporterRegistry &t_reg,
int sendBufSize, int maxRecvSize,
const char *lHostName,
NodeId rNodeId, NodeId lNodeId,
const char *rHostName,
int r_port,
NodeId lNodeId,
NodeId rNodeId,
int byte_order,
bool compr, bool chksm, bool signalId,
Uint32 _reportFreq) :
Transporter(lNodeId, rNodeId, byte_order, compr, chksm, signalId),
m_sendBuffer(sendBufSize),
isServer(lNodeId < rNodeId),
port(portNo)
Transporter(t_reg, lHostName, rHostName, r_port, lNodeId, rNodeId,
byte_order, compr, chksm, signalId),
m_sendBuffer(sendBufSize)
{
maxReceiveSize = maxRecvSize;
strncpy(remoteHostName, rHostName, sizeof(remoteHostName));
// Initialize member variables
Ndb_getInAddr(&remoteHostAddress, rHostName);
Ndb_getInAddr(&localHostAddress, lHostName);
theSocket = NDB_INVALID_SOCKET;
sendCount = receiveCount = 0;
......@@ -108,6 +104,24 @@ TCP_Transporter::~TCP_Transporter() {
receiveBuffer.destroy();
}
bool TCP_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
{
return connect_common(sockfd);
}
bool TCP_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
{
return connect_common(sockfd);
}
bool TCP_Transporter::connect_common(NDB_SOCKET_TYPE sockfd)
{
theSocket = sockfd;
setSocketOptions();
setSocketNonBlocking(theSocket);
return true;
}
bool
TCP_Transporter::initTransporter() {
......@@ -316,7 +330,7 @@ TCP_Transporter::doSend() {
sendCount ++;
sendSize += nBytesSent;
if(sendCount == reportFreq){
reportSendLen(callbackObj,remoteNodeId, sendCount, sendSize);
reportSendLen(get_callback_obj(), remoteNodeId, sendCount, sendSize);
sendCount = 0;
sendSize = 0;
}
......@@ -331,7 +345,7 @@ TCP_Transporter::doSend() {
#endif
if(DISCONNECT_ERRNO(InetErrno, nBytesSent)){
doDisconnect();
reportDisconnect(callbackObj, remoteNodeId, InetErrno);
report_disconnect(InetErrno);
}
return false;
......@@ -361,14 +375,15 @@ TCP_Transporter::doReceive() {
#endif
ndbout_c("receiveBuffer.sizeOfData(%d) > receiveBuffer.sizeOfBuffer(%d)",
receiveBuffer.sizeOfData, receiveBuffer.sizeOfBuffer);
reportError(callbackObj, remoteNodeId, TE_INVALID_MESSAGE_LENGTH);
report_error(TE_INVALID_MESSAGE_LENGTH);
return 0;
}
receiveCount ++;
receiveSize += nBytesRead;
if(receiveCount == reportFreq){
reportReceiveLen(callbackObj, remoteNodeId, receiveCount, receiveSize);
reportReceiveLen(get_callback_obj(), remoteNodeId, receiveCount, receiveSize);
receiveCount = 0;
receiveSize = 0;
}
......@@ -384,60 +399,17 @@ TCP_Transporter::doReceive() {
if(DISCONNECT_ERRNO(InetErrno, nBytesRead)){
// The remote node has closed down
doDisconnect();
reportDisconnect(callbackObj, remoteNodeId,InetErrno);
report_disconnect(InetErrno);
}
}
return nBytesRead;
}
bool
TCP_Transporter::connectImpl(Uint32 timeOutMillis){
struct timeval timeout = {0, 0};
timeout.tv_sec = timeOutMillis / 1000;
timeout.tv_usec = (timeOutMillis % 1000)*1000;
bool retVal = false;
if(isServer){
if(theSocket == NDB_INVALID_SOCKET){
startTCPServer();
}
if(theSocket == NDB_INVALID_SOCKET)
{
NdbSleep_MilliSleep(timeOutMillis);
return false;
}
retVal = acceptClient(&timeout);
} else {
// Is client
retVal = connectClient(&timeout);
}
if(!retVal) {
NdbSleep_MilliSleep(timeOutMillis);
return false;
}
#if defined NDB_OSE || defined NDB_SOFTOSE
if(setsockopt(theSocket, SOL_SOCKET, SO_OSEOWNER,
&theReceiverPid, sizeof(PROCESS)) != 0){
ndbout << "Failed to transfer ownership of socket" << endl;
NDB_CLOSE_SOCKET(theSocket);
theSocket = -1;
return false;
}
#endif
return true;
}
void
TCP_Transporter::disconnectImpl() {
TCP_Transporter::disconnectImpl() {
if(theSocket != NDB_INVALID_SOCKET){
if(NDB_CLOSE_SOCKET(theSocket) < 0){
reportError(callbackObj, remoteNodeId, TE_ERROR_CLOSING_SOCKET);
report_error(TE_ERROR_CLOSING_SOCKET);
}
}
......@@ -447,155 +419,3 @@ TCP_Transporter::disconnectImpl() {
theSocket = NDB_INVALID_SOCKET;
}
bool
TCP_Transporter::startTCPServer() {
int bindResult, listenResult;
// The server variable is the remote server when we are a client
// htonl and htons returns the parameter in network byte order
// INADDR_ANY tells the OS kernel to choose the IP address
struct sockaddr_in server;
memset((void*)&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_addr.s_addr = localHostAddress.s_addr;
server.sin_port = htons(port);
if (theSocket != NDB_INVALID_SOCKET) {
return true; // Server socket is already initialized
}
// Create the socket
theSocket = socket(AF_INET, SOCK_STREAM, 0);
if (theSocket == NDB_INVALID_SOCKET) {
reportThreadError(remoteNodeId, TE_COULD_NOT_CREATE_SOCKET);
return false;
}
// Set the socket reuse addr to true, so we are sure we can bind the
// socket
int reuseAddr = 1;
setsockopt(theSocket, SOL_SOCKET, SO_REUSEADDR,
(char*)&reuseAddr, sizeof(reuseAddr));
// Set the TCP_NODELAY option so also small packets are sent
// as soon as possible
int nodelay = 1;
setsockopt(theSocket, IPPROTO_TCP, TCP_NODELAY,
(char*)&nodelay, sizeof(nodelay));
// Bind the socket
bindResult = bind(theSocket, (struct sockaddr *) &server,
sizeof(server));
if (bindResult < 0) {
reportThreadError(remoteNodeId, TE_COULD_NOT_BIND_SOCKET);
NDB_CLOSE_SOCKET(theSocket);
theSocket = NDB_INVALID_SOCKET;
return false;
}
// Perform listen.
listenResult = listen(theSocket, 1);
if (listenResult == 1) {
reportThreadError(remoteNodeId, TE_LISTEN_FAILED);
NDB_CLOSE_SOCKET(theSocket);
theSocket = NDB_INVALID_SOCKET;
return false;
}
return true;
}
bool
TCP_Transporter::acceptClient (struct timeval * timeout){
struct sockaddr_in clientAddress;
fd_set readset;
FD_ZERO(&readset);
FD_SET(theSocket, &readset);
const int res = select(theSocket + 1, &readset, 0, 0, timeout);
if(res == 0)
return false;
if(res < 0){
reportThreadError(remoteNodeId, TE_ERROR_IN_SELECT_BEFORE_ACCEPT);
return false;
}
NDB_SOCKLEN_T clientAddressLen = sizeof(clientAddress);
const NDB_SOCKET_TYPE clientSocket = accept(theSocket,
(struct sockaddr*)&clientAddress,
&clientAddressLen);
if (clientSocket == NDB_INVALID_SOCKET) {
reportThreadError(remoteNodeId, TE_ACCEPT_RETURN_ERROR);
return false;
}
if (clientAddress.sin_addr.s_addr != remoteHostAddress.s_addr) {
ndbout_c("Wrong client connecting!");
ndbout_c("connecting address: %s", inet_ntoa(clientAddress.sin_addr));
ndbout_c("expecting address: %s", inet_ntoa(remoteHostAddress));
// The newly connected host is not the remote host
// we wanted to connect to. Disconnect it.
// XXX This is not valid. We cannot disconnect it.
NDB_CLOSE_SOCKET(clientSocket);
return false;
} else {
NDB_CLOSE_SOCKET(theSocket);
theSocket = clientSocket;
setSocketOptions();
setSocketNonBlocking(theSocket);
return true;
}
}
bool
TCP_Transporter::connectClient (struct timeval * timeout){
// Create the socket
theSocket = socket(AF_INET, SOCK_STREAM, 0);
if (theSocket == NDB_INVALID_SOCKET) {
reportThreadError(remoteNodeId, TE_COULD_NOT_CREATE_SOCKET);
return false;
}
struct sockaddr_in server;
memset((void*)&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_addr = remoteHostAddress;
server.sin_port = htons(port);
struct sockaddr_in client;
memset((void*)&client, 0, sizeof(client));
client.sin_family = AF_INET;
client.sin_addr = localHostAddress;
client.sin_port = 0; // Any port
// Bind the socket
const int bindResult = bind(theSocket, (struct sockaddr *) &client,
sizeof(client));
if (bindResult < 0) {
reportThreadError(remoteNodeId, TE_COULD_NOT_BIND_SOCKET);
NDB_CLOSE_SOCKET(theSocket);
theSocket = NDB_INVALID_SOCKET;
return false;
}
const int connectRes = ::connect(theSocket, (struct sockaddr *) &server,
sizeof(server));
if(connectRes == 0){
setSocketOptions();
setSocketNonBlocking(theSocket);
return true;
}
NDB_CLOSE_SOCKET(theSocket);
theSocket = NDB_INVALID_SOCKET;
return false;
}
......@@ -14,24 +14,8 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
//****************************************************************************
//
// AUTHOR
// sa Fransson
//
// NAME
// TCP_Transporter
//
// DESCRIPTION
// A TCP_Transporter instance is created when TCP/IP-communication
// shall be used (user specified). It handles connect, disconnect,
// send and receive.
//
//
//
//***************************************************************************/
#ifndef TCP_Transporter_H
#define TCP_Transporter_H
#ifndef TCP_TRANSPORTER_HPP
#define TCP_TRANSPORTER_HPP
#include "Transporter.hpp"
#include "SendBuffer.hpp"
......@@ -61,11 +45,13 @@ class TCP_Transporter : public Transporter {
friend class TransporterRegistry;
private:
// Initialize member variables
TCP_Transporter(int sendBufferSize, int maxReceiveSize,
int port,
const char *rHostName,
TCP_Transporter(TransporterRegistry&,
int sendBufferSize, int maxReceiveSize,
const char *lHostName,
NodeId rHostId, NodeId lHostId,
const char *rHostName,
int r_port,
NodeId lHostId,
NodeId rHostId,
int byteorder,
bool compression, bool checksum, bool signalId,
Uint32 reportFreq = 4096);
......@@ -121,12 +107,14 @@ protected:
* A client connects to the remote server
* A server accepts any new connections
*/
bool connectImpl(Uint32 timeOutMillis);
virtual bool connect_server_impl(NDB_SOCKET_TYPE sockfd);
virtual bool connect_client_impl(NDB_SOCKET_TYPE sockfd);
bool connect_common(NDB_SOCKET_TYPE sockfd);
/**
* Disconnects a TCP/IP node. Empty send and receivebuffer.
*/
void disconnectImpl();
virtual void disconnectImpl();
private:
/**
......@@ -134,21 +122,11 @@ private:
*/
SendBuffer m_sendBuffer;
const bool isServer;
const unsigned int port;
// Sending/Receiving socket used by both client and server
NDB_SOCKET_TYPE theSocket;
Uint32 maxReceiveSize;
/**
* Remote host name/and address
*/
char remoteHostName[256];
struct in_addr remoteHostAddress;
struct in_addr localHostAddress;
/**
* Socket options
*/
......@@ -163,43 +141,6 @@ private:
bool sendIsPossible(struct timeval * timeout);
/**
* startTCPServer - None blocking
*
* create a server socket
* bind
* listen
*
* Note: Does not call accept
*/
bool startTCPServer();
/**
* acceptClient - Blocking
*
* Accept a connection
* checks if "right" client has connected
* if so
* close server socket
* else
* close newly created socket and goto begin
*/
bool acceptClient(struct timeval * timeout);
/**
* Creates a client socket
*
* Note does not call connect
*/
bool createClientSocket();
/**
* connectClient - Blocking
*
* connects to remote host
*/
bool connectClient(struct timeval * timeout);
/**
* Statistics
*/
......
......@@ -15,132 +15,125 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <TransporterRegistry.hpp>
#include <TransporterCallback.hpp>
#include "Transporter.hpp"
#include "TransporterInternalDefinitions.hpp"
#include <NdbSleep.h>
Transporter::Transporter(NodeId lNodeId, NodeId rNodeId,
#include <SocketAuthenticator.hpp>
#include <InputStream.hpp>
#include <OutputStream.hpp>
Transporter::Transporter(TransporterRegistry &t_reg,
const char *lHostName,
const char *rHostName,
int r_port,
NodeId lNodeId,
NodeId rNodeId,
int _byteorder,
bool _compression, bool _checksum, bool _signalId)
: localNodeId(lNodeId), remoteNodeId(rNodeId),
m_packer(_signalId, _checksum)
: m_r_port(r_port), localNodeId(lNodeId), remoteNodeId(rNodeId),
isServer(lNodeId < rNodeId),
m_packer(_signalId, _checksum),
m_transporter_registry(t_reg)
{
if (rHostName && strlen(rHostName) > 0){
strncpy(remoteHostName, rHostName, sizeof(remoteHostName));
Ndb_getInAddr(&remoteHostAddress, rHostName);
}
else
{
if (!isServer) {
ndbout << "Unable to setup transporter. Node " << rNodeId
<< " must have hostname. Update configuration." << endl;
exit(-1);
}
remoteHostName[0]= 0;
}
strncpy(localHostName, lHostName, sizeof(localHostName));
if (strlen(lHostName) > 0)
Ndb_getInAddr(&localHostAddress, lHostName);
byteOrder = _byteorder;
compressionUsed = _compression;
checksumUsed = _checksum;
signalIdUsed = _signalId;
_threadError = TE_NO_ERROR;
_connecting = false;
_disconnecting = false;
_connected = false;
_timeOutMillis = 1000;
theThreadPtr = NULL;
theMutexPtr = NdbMutex_Create();
}
Transporter::~Transporter(){
NdbMutex_Destroy(theMutexPtr);
m_connected = false;
m_timeOutMillis = 1000;
if(theThreadPtr != 0){
void * retVal;
NdbThread_WaitFor(theThreadPtr, &retVal);
NdbThread_Destroy(&theThreadPtr);
if (isServer)
m_socket_client= 0;
else
{
unsigned short tmp_port= 3307+rNodeId;
m_socket_client= new SocketClient(remoteHostName, tmp_port,
new SocketAuthSimple("ndbd passwd"));
}
}
extern "C"
void *
runConnect_C(void * me)
{
runConnect(me);
NdbThread_Exit(0);
return NULL;
}
void *
runConnect(void * me){
Transporter * t = (Transporter *) me;
DEBUG("Connect thread to " << t->remoteNodeId << " started");
while(true){
NdbMutex_Lock(t->theMutexPtr);
if(t->_disconnecting){
t->_connecting = false;
NdbMutex_Unlock(t->theMutexPtr);
DEBUG("Connect Thread " << t->remoteNodeId << " stop due to disconnect");
return 0;
}
NdbMutex_Unlock(t->theMutexPtr);
bool res = t->connectImpl(t->_timeOutMillis); // 1000 ms
DEBUG("Waiting for " << t->remoteNodeId << "...");
if(res){
t->_connected = true;
t->_connecting = false;
t->_errorCount = 0;
t->_threadError = TE_NO_ERROR;
DEBUG("Connect Thread " << t->remoteNodeId << " stop due to connect");
return 0;
}
}
Transporter::~Transporter(){
if (m_socket_client)
delete m_socket_client;
}
void
Transporter::doConnect() {
bool
Transporter::connect_server(NDB_SOCKET_TYPE sockfd) {
if(m_connected)
return true; // TODO assert(0);
NdbMutex_Lock(theMutexPtr);
if(_connecting || _disconnecting || _connected){
NdbMutex_Unlock(theMutexPtr);
return;
bool res = connect_server_impl(sockfd);
if(res){
m_connected = true;
m_errorCount = 0;
}
_connecting = true;
_threadError = TE_NO_ERROR;
return res;
}
// Start thread
bool
Transporter::connect_client() {
if(m_connected)
return true;
NDB_SOCKET_TYPE sockfd = m_socket_client->connect();
char buf[16];
snprintf(buf, sizeof(buf), "ndb_con_%d", remoteNodeId);
if (sockfd < 0)
return false;
// send info about own id
SocketOutputStream s_output(sockfd);
s_output.println("%d", localNodeId);
// get remote id
int nodeId;
SocketInputStream s_input(sockfd);
char buf[256];
if (s_input.gets(buf, 256) == 0) {
NDB_CLOSE_SOCKET(sockfd);
return false;
}
if (sscanf(buf, "%d", &nodeId) != 1) {
NDB_CLOSE_SOCKET(sockfd);
return false;
}
if(theThreadPtr != 0){
void * retVal;
NdbThread_WaitFor(theThreadPtr, &retVal);
NdbThread_Destroy(&theThreadPtr);
bool res = connect_client_impl(sockfd);
if(res){
m_connected = true;
m_errorCount = 0;
}
theThreadPtr = NdbThread_Create(runConnect_C,
(void**)this,
32768,
buf,
NDB_THREAD_PRIO_LOW);
NdbSleep_MilliSleep(100); // Let thread start
NdbMutex_Unlock(theMutexPtr);
return res;
}
void
Transporter::doDisconnect() {
NdbMutex_Lock(theMutexPtr);
_disconnecting = true;
while(_connecting){
DEBUG("Waiting for connect to finish...");
NdbMutex_Unlock(theMutexPtr);
NdbSleep_MilliSleep(500);
NdbMutex_Lock(theMutexPtr);
}
_connected = false;
Transporter::doDisconnect() {
if(!m_connected)
return; //assert(0); TODO will fail
disconnectImpl();
_threadError = TE_NO_ERROR;
_disconnecting = false;
NdbMutex_Unlock(theMutexPtr);
m_connected= false;
}
......@@ -19,6 +19,9 @@
#include <ndb_global.h>
#include <SocketClient.hpp>
#include <TransporterRegistry.hpp>
#include <TransporterCallback.hpp>
#include "TransporterDefinitions.hpp"
#include "Packer.hpp"
......@@ -40,8 +43,9 @@ public:
* None blocking
* Use isConnected() to check status
*/
virtual void doConnect();
bool connect_client();
bool connect_server(NDB_SOCKET_TYPE socket);
/**
* Blocking
*/
......@@ -60,14 +64,17 @@ public:
*/
NodeId getRemoteNodeId() const;
/**
* Set callback object
* Local (own) Node Id
*/
void setCallbackObject(void * callback);
NodeId getLocalNodeId() const;
protected:
Transporter(NodeId lNodeId,
Transporter(TransporterRegistry &,
const char *lHostName,
const char *rHostName,
int r_port,
NodeId lNodeId,
NodeId rNodeId,
int byteorder,
bool compression,
......@@ -78,58 +85,59 @@ protected:
* Blocking, for max timeOut milli seconds
* Returns true if connect succeded
*/
virtual bool connectImpl(Uint32 timeOut) = 0;
virtual bool connect_server_impl(NDB_SOCKET_TYPE sockfd) = 0;
virtual bool connect_client_impl(NDB_SOCKET_TYPE sockfd) = 0;
/**
* Blocking
*/
virtual void disconnectImpl() = 0;
const NodeId localNodeId;
/**
* Remote host name/and address
*/
char remoteHostName[256];
char localHostName[256];
struct in_addr remoteHostAddress;
struct in_addr localHostAddress;
const unsigned int m_r_port;
const NodeId remoteNodeId;
const NodeId localNodeId;
const bool isServer;
unsigned createIndex;
int byteOrder;
bool compressionUsed;
bool checksumUsed;
bool signalIdUsed;
Packer m_packer;
Packer m_packer;
private:
/**
* Thread and mutex for connect
*/
NdbThread* theThreadPtr;
friend void* runConnect(void * me);
SocketClient *m_socket_client;
protected:
/**
* Error reporting from connect thread(s)
*/
void reportThreadError(NodeId nodeId,
TransporterError errorCode);
Uint32 getErrorCount();
TransporterError getThreadError();
void resetThreadError();
TransporterError _threadError;
Uint32 _timeOutMillis;
Uint32 _errorCount;
protected:
NdbMutex* theMutexPtr;
bool _connected; // Are we connected
bool _connecting; // Connect thread is running
bool _disconnecting; // We are disconnecting
void * callbackObj;
Uint32 m_errorCount;
Uint32 m_timeOutMillis;
protected:
bool m_connected; // Are we connected
TransporterRegistry &m_transporter_registry;
void *get_callback_obj() { return m_transporter_registry.callbackObj; };
void report_disconnect(int err){m_transporter_registry.report_disconnect(remoteNodeId,err);};
void report_error(enum TransporterError err){reportError(get_callback_obj(),remoteNodeId,err);};
};
inline
bool
Transporter::isConnected() const {
return _connected;
return m_connected;
}
inline
......@@ -138,42 +146,17 @@ Transporter::getRemoteNodeId() const {
return remoteNodeId;
}
inline
void
Transporter::reportThreadError(NodeId nodeId, TransporterError errorCode)
{
#if 0
ndbout_c("Transporter::reportThreadError (NodeId: %d, Error code: %d)",
nodeId, errorCode);
#endif
_threadError = errorCode;
_errorCount++;
}
inline
TransporterError
Transporter::getThreadError(){
return _threadError;
NodeId
Transporter::getLocalNodeId() const {
return remoteNodeId;
}
inline
Uint32
Transporter::getErrorCount()
{
return _errorCount;
}
inline
void
Transporter::resetThreadError()
{
_threadError = TE_NO_ERROR;
}
inline
void
Transporter::setCallbackObject(void * callback) {
callbackObj = callback;
return m_errorCount;
}
#endif // Define of Transporter_H
......@@ -3,7 +3,8 @@ noinst_LTLIBRARIES = libgeneral.la
libgeneral_la_SOURCES = \
File.cpp md5_hash.cpp Properties.cpp socket_io.cpp \
SimpleProperties.cpp Parser.cpp InputStream.cpp SocketServer.cpp \
SimpleProperties.cpp Parser.cpp InputStream.cpp \
SocketServer.cpp SocketClient.cpp SocketAuthenticator.cpp\
OutputStream.cpp NdbOut.cpp BaseString.cpp Base64.cpp \
NdbSqlUtil.cpp new.cpp \
uucode.c random.c getarg.c version.c \
......
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <ndb_global.h>
#include <SocketClient.hpp>
#include <SocketAuthenticator.hpp>
#include <NdbOut.hpp>
SocketAuthSimple::SocketAuthSimple(const char *passwd) {
m_passwd= strdup(passwd);
m_buf= (char*)malloc(strlen(passwd)+1);
}
SocketAuthSimple::~SocketAuthSimple()
{
if (m_passwd)
free((void*)m_passwd);
if (m_buf)
free(m_buf);
}
bool SocketAuthSimple::client_authenticate(int sockfd)
{
if (!m_passwd)
return false;
int len = strlen(m_passwd);
int r;
r= send(sockfd, m_passwd, len, 0);
r= recv(sockfd, m_buf, len, 0);
m_buf[r]= '\0';
return true;
}
bool SocketAuthSimple::server_authenticate(int sockfd)
{
if (!m_passwd)
return false;
int len = strlen(m_passwd), r;
r= recv(sockfd, m_buf, len, 0);
m_buf[r]= '\0';
r= send(sockfd, m_passwd, len, 0);
return true;
}
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <ndb_global.h>
#include <NdbOut.hpp>
#include <SocketClient.hpp>
#include <SocketAuthenticator.hpp>
SocketClient::SocketClient(const char *server_name, unsigned short port, SocketAuthenticator *sa)
{
m_auth= sa;
m_port= port;
m_server_name= strdup(server_name);
m_sockfd= -1;
}
SocketClient::~SocketClient()
{
if (m_server_name)
free(m_server_name);
if (m_sockfd >= 0)
NDB_CLOSE_SOCKET(m_sockfd);
if (m_auth)
delete m_auth;
}
bool
SocketClient::init()
{
if (m_sockfd >= 0)
NDB_CLOSE_SOCKET(m_sockfd);
memset(&m_servaddr, 0, sizeof(m_servaddr));
m_servaddr.sin_family = AF_INET;
m_servaddr.sin_port = htons(m_port);
// Convert ip address presentation format to numeric format
if (Ndb_getInAddr(&m_servaddr.sin_addr, m_server_name))
return false;
m_sockfd= socket(AF_INET, SOCK_STREAM, 0);
if (m_sockfd == NDB_INVALID_SOCKET) {
return false;
}
return true;
}
NDB_SOCKET_TYPE
SocketClient::connect()
{
if (m_sockfd < 0)
{
if (!init()) {
ndbout << "SocketClient::connect() failed " << m_server_name << " " << m_port << endl;
return -1;
}
}
const int r = ::connect(m_sockfd, (struct sockaddr*) &m_servaddr, sizeof(m_servaddr));
if (r == -1)
return -1;
if (m_auth)
if (!m_auth->client_authenticate(m_sockfd))
{
NDB_CLOSE_SOCKET(m_sockfd);
m_sockfd= -1;
return -1;
}
NDB_SOCKET_TYPE sockfd= m_sockfd;
m_sockfd= -1;
return sockfd;
}
......@@ -17,7 +17,7 @@
#include <ndb_global.h>
#include "SocketServer.hpp"
#include <SocketServer.hpp>
#include <NdbTCP.h>
#include <NdbOut.hpp>
......
......@@ -4,7 +4,7 @@ include $(top_srcdir)/ndb/config/common.mk.am
ndbbin_PROGRAMS = ndbd
ndbd_SOURCES = Main.cpp SimBlockList.cpp
ndbd_SOURCES = main.cpp SimBlockList.cpp
include $(top_srcdir)/ndb/config/type_kernel.mk.am
......
......@@ -362,7 +362,7 @@ void Cmvmi::execCLOSE_COMREQ(Signal* signal)
sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB);
globalTransporterRegistry.setIOState(i, HaltIO);
globalTransporterRegistry.setPerformState(i, PerformDisconnect);
globalTransporterRegistry.do_disconnect(i);
/**
* Cancel possible event subscription
......@@ -390,7 +390,7 @@ void Cmvmi::execOPEN_COMREQ(Signal* signal)
const Uint32 len = signal->getLength();
if(len == 2){
globalTransporterRegistry.setPerformState(tStartingNode, PerformConnect);
globalTransporterRegistry.do_connect(tStartingNode);
globalTransporterRegistry.setIOState(tStartingNode, HaltIO);
//-----------------------------------------------------
......@@ -405,7 +405,7 @@ void Cmvmi::execOPEN_COMREQ(Signal* signal)
jam();
if (i != getOwnNodeId() && getNodeInfo(i).m_type == tData2){
jam();
globalTransporterRegistry.setPerformState(i, PerformConnect);
globalTransporterRegistry.do_connect(i);
globalTransporterRegistry.setIOState(i, HaltIO);
signal->theData[0] = EventReport::CommunicationOpened;
......@@ -456,34 +456,21 @@ void Cmvmi::execDISCONNECT_REP(Signal *signal)
const NodeInfo::NodeType type = getNodeInfo(hostId).getType();
ndbrequire(type != NodeInfo::INVALID);
if (globalTransporterRegistry.performState(hostId) != PerformDisconnect) {
if(type == NodeInfo::DB || globalData.theStartLevel == NodeState::SL_STARTED){
jam();
// -------------------------------------------------------------------
// We do not report the disconnection when disconnection is already ongoing.
// This reporting should be looked into but this secures that we avoid
// crashes due to too quick re-reporting of disconnection.
// -------------------------------------------------------------------
if(type == NodeInfo::DB || globalData.theStartLevel == NodeState::SL_STARTED){
jam();
DisconnectRep * const rep = (DisconnectRep *)&signal->theData[0];
rep->nodeId = hostId;
rep->err = errNo;
sendSignal(QMGR_REF, GSN_DISCONNECT_REP, signal,
DisconnectRep::SignalLength, JBA);
globalTransporterRegistry.setPerformState(hostId, PerformDisconnect);
} else if(globalData.theStartLevel == NodeState::SL_CMVMI ||
globalData.theStartLevel == NodeState::SL_STARTING) {
/**
* Someone disconnected during cmvmi period
*/
if(type == NodeInfo::MGM){
jam();
globalTransporterRegistry.setPerformState(hostId, PerformConnect);
} else {
globalTransporterRegistry.setPerformState(hostId, PerformDisconnect);
}
}
DisconnectRep * const rep = (DisconnectRep *)&signal->theData[0];
rep->nodeId = hostId;
rep->err = errNo;
sendSignal(QMGR_REF, GSN_DISCONNECT_REP, signal,
DisconnectRep::SignalLength, JBA);
} else if((globalData.theStartLevel == NodeState::SL_CMVMI ||
globalData.theStartLevel == NodeState::SL_STARTING)
&& type == NodeInfo::MGM) {
/**
* Someone disconnected during cmvmi period
*/
jam();
globalTransporterRegistry.do_connect(hostId);
}
signal->theData[0] = EventReport::Disconnected;
......@@ -522,7 +509,8 @@ void Cmvmi::execCONNECT_REP(Signal *signal){
/**
* Dont allow api nodes to connect
*/
globalTransporterRegistry.setPerformState(hostId, PerformDisconnect);
abort();
globalTransporterRegistry.do_disconnect(hostId);
}
}
......@@ -756,8 +744,8 @@ Cmvmi::execSTART_ORD(Signal* signal) {
*/
for(unsigned int i = 1; i < MAX_NODES; i++ ){
if (getNodeInfo(i).m_type == NodeInfo::MGM){
if(globalTransporterRegistry.performState(i) != PerformIO){
globalTransporterRegistry.setPerformState(i, PerformConnect);
if(!globalTransporterRegistry.is_connected(i)){
globalTransporterRegistry.do_connect(i);
globalTransporterRegistry.setIOState(i, NoHalt);
}
}
......@@ -783,7 +771,7 @@ Cmvmi::execSTART_ORD(Signal* signal) {
// without any connected nodes.
for(unsigned int i = 1; i < MAX_NODES; i++ ){
if (i != getOwnNodeId() && getNodeInfo(i).m_type != NodeInfo::MGM){
globalTransporterRegistry.setPerformState(i, PerformDisconnect);
globalTransporterRegistry.do_disconnect(i);
globalTransporterRegistry.setIOState(i, HaltIO);
}
}
......@@ -1062,29 +1050,10 @@ Cmvmi::execDUMP_STATE_ORD(Signal* signal)
if(nodeTypeStr == 0)
continue;
const char* actionStr = "";
switch (globalTransporterRegistry.performState(i)){
case PerformNothing:
actionStr = "does nothing";
break;
case PerformIO:
actionStr = "is connected";
break;
case PerformConnect:
actionStr = "is trying to connect";
break;
case PerformDisconnect:
actionStr = "is trying to disconnect";
break;
case RemoveTransporter:
actionStr = "will be removed";
break;
}
infoEvent("Connection to %d (%s) %s",
i,
nodeTypeStr,
actionStr);
globalTransporterRegistry.getPerformStateString(i));
}
}
......
......@@ -1704,6 +1704,7 @@ void Qmgr::sendApiFailReq(Signal* signal, Uint16 failedNodeNo)
sendSignal(DBTC_REF, GSN_API_FAILREQ, signal, 2, JBA);
sendSignal(DBDICT_REF, GSN_API_FAILREQ, signal, 2, JBA);
sendSignal(SUMA_REF, GSN_API_FAILREQ, signal, 2, JBA);
/**
* GREP also need the information that an API node
* (actually a REP node) has failed.
......@@ -1978,8 +1979,11 @@ void Qmgr::execAPI_REGREQ(Signal* signal)
apiRegConf->nodeState.dynamicId = -dynamicId;
}
}
c_connectedNodes.copyto(NdbNodeBitmask::Size,
apiRegConf->connected_nodes.data);
sendSignal(ref, GSN_API_REGCONF, signal, ApiRegConf::SignalLength, JBB);
if ((getNodeState().startLevel == NodeState::SL_STARTED ||
getNodeState().getSingleUserMode())
&& apiNodePtr.p->phase == ZAPI_INACTIVE) {
......
......@@ -20,7 +20,7 @@
#include "Configuration.hpp"
#include <TransporterRegistry.hpp>
#include "SimBlockList.hpp"
#include "vm/SimBlockList.hpp"
#include "ThreadConfig.hpp"
#include <SignalLoggerManager.hpp>
#include <NdbOut.hpp>
......@@ -171,13 +171,29 @@ NDB_MAIN(ndb_kernel){
NDB_ASSERT(0, "Illegal state globalData.theRestartFlag");
}
SocketServer socket_server;
globalTransporterRegistry.startSending();
globalTransporterRegistry.startReceiving();
if (!globalTransporterRegistry.start_service(socket_server))
NDB_ASSERT(0, "globalTransporterRegistry.start_service() failed");
if (!globalTransporterRegistry.start_clients())
NDB_ASSERT(0, "globalTransporterRegistry.start_clients() failed");
globalEmulatorData.theWatchDog->doStart();
socket_server.startServer();
globalEmulatorData.theThreadConfig->ipControlLoop();
NdbShutdown(NST_Normal);
socket_server.stopServer();
socket_server.stopSessions();
globalTransporterRegistry.stop_clients();
return NRT_Default;
}
......
......@@ -147,8 +147,8 @@ void ThreadConfig::ipControlLoop()
// plus checking for any received messages.
//--------------------------------------------------------------------
if (i++ >= 20) {
globalTransporterRegistry.update_connections();
globalData.incrementWatchDogCounter(5);
globalTransporterRegistry.checkConnections();
i = 0;
}//if
......
......@@ -24,6 +24,7 @@
#include <NdbOut.hpp>
#include <SocketServer.hpp>
#include <SocketClient.hpp>
#include <Parser.hpp>
#include <OutputStream.hpp>
#include <InputStream.hpp>
......@@ -318,8 +319,8 @@ ndb_mgm_call(NdbMgmHandle handle, const ParserRow<ParserDummy> *command_reply,
/**
* Print some info about why the parser returns NULL
*/
// ndbout << " status=" << ctx.m_status << ", curr="
// << ctx.m_currentToken << endl;
//ndbout << " status=" << ctx.m_status << ", curr="
//<< ctx.m_currentToken << endl;
}
#ifdef MGMAPI_LOG
else {
......@@ -362,30 +363,11 @@ ndb_mgm_connect(NdbMgmHandle handle, const char * mgmsrv)
/**
* Do connect
*/
const NDB_SOCKET_TYPE sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd == NDB_INVALID_SOCKET) {
SET_ERROR(handle, NDB_MGM_ILLEGAL_SOCKET, "");
return -1;
}
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(handle->port);
// Convert ip address presentation format to numeric format
const int res1 = Ndb_getInAddr(&servaddr.sin_addr, handle->hostname);
if (res1 != 0) {
DEBUG("Ndb_getInAddr(...) == -1");
setError(handle, EINVAL, __LINE__, "Invalid hostname/address");
return -1;
}
const int res2 = connect(sockfd, (struct sockaddr*) &servaddr,
sizeof(servaddr));
if (res2 == -1) {
NDB_CLOSE_SOCKET(sockfd);
setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__, "Unable to connect to %s",
mgmsrv);
SocketClient s(handle->hostname, handle->port);
const NDB_SOCKET_TYPE sockfd = s.connect();
if (sockfd < 0) {
setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__,
"Unable to connect to %s", mgmsrv);
return -1;
}
......@@ -1523,6 +1505,55 @@ ndb_mgm_get_configuration(NdbMgmHandle handle, unsigned int version) {
return 0;
}
extern "C"
int
ndb_mgm_alloc_nodeid(NdbMgmHandle handle, unsigned int version, unsigned *pnodeid, int nodetype)
{
CHECK_HANDLE(handle, 0);
CHECK_CONNECTED(handle, 0);
Properties args;
args.put("version", version);
args.put("nodetype", nodetype);
args.put("nodeid", *pnodeid);
args.put("user", "mysqld");
args.put("password", "mysqld");
args.put("public key", "a public key");
const ParserRow<ParserDummy> reply[]= {
MGM_CMD("get nodeid reply", NULL, ""),
MGM_ARG("nodeid", Int, Optional, "Error message"),
MGM_ARG("result", String, Mandatory, "Error message"),
MGM_END()
};
const Properties *prop;
prop= ndb_mgm_call(handle, reply, "get nodeid", &args);
if(prop == NULL) {
SET_ERROR(handle, EIO, "Unable to alloc nodeid");
return -1;
}
int res= -1;
do {
const char * buf;
if(!prop->get("result", &buf) || strcmp(buf, "Ok") != 0){
ndbout_c("ERROR Message: %s\n", buf);
break;
}
if(!prop->get("nodeid", pnodeid) != 0){
ndbout_c("ERROR Message: <nodeid Unspecified>\n");
break;
}
res= 0;
}while(0);
delete prop;
return res;
}
/*****************************************************************************
* Global Replication
******************************************************************************/
......
......@@ -43,7 +43,7 @@
#include <DebuggerNames.hpp>
#include <ndb_version.h>
#include "SocketServer.hpp"
#include <SocketServer.hpp>
#include "NodeLogLevel.hpp"
#include <NdbConfig.h>
......@@ -390,6 +390,95 @@ MgmtSrvr::getNodeCount(enum ndb_mgm_node_type type) const
return count;
}
int
MgmtSrvr::getPort() const {
const Properties *mgmProps;
ndb_mgm_configuration_iterator * iter =
ndb_mgm_create_configuration_iterator(_config->m_configValues,
CFG_SECTION_NODE);
if(iter == 0)
return 0;
if(ndb_mgm_find(iter, CFG_NODE_ID, getOwnNodeId()) != 0){
ndbout << "Could not retrieve configuration for Node "
<< getOwnNodeId() << " in config file." << endl
<< "Have you set correct NodeId for this node?" << endl;
ndb_mgm_destroy_iterator(iter);
return 0;
}
unsigned type;
if(ndb_mgm_get_int_parameter(iter, CFG_TYPE_OF_SECTION, &type) != 0 ||
type != NODE_TYPE_MGM){
ndbout << "Local node id " << getOwnNodeId()
<< " is not defined as management server" << endl
<< "Have you set correct NodeId for this node?" << endl;
return 0;
}
Uint32 port = 0;
if(ndb_mgm_get_int_parameter(iter, CFG_MGM_PORT, &port) != 0){
ndbout << "Could not find PortNumber in the configuration file." << endl;
return 0;
}
/*****************
* Set Stat Port *
*****************/
#if 0
if (!mgmProps->get("PortNumberStats", &tmp)){
ndbout << "Could not find PortNumberStats in the configuration file."
<< endl;
return false;
}
glob.port_stats = tmp;
#endif
#if 0
const char * host;
if(ndb_mgm_get_string_parameter(iter, mgmProps->get("ExecuteOnComputer", host)){
ndbout << "Failed to find \"ExecuteOnComputer\" for my node" << endl;
ndbout << "Unable to verify own hostname" << endl;
return false;
}
const char * hostname;
{
const Properties * p;
char buf[255];
snprintf(buf, sizeof(buf), "Computer_%s", host.c_str());
if(!glob.cluster_config->get(buf, &p)){
ndbout << "Failed to find computer " << host << " in config" << endl;
ndbout << "Unable to verify own hostname" << endl;
return false;
}
if(!p->get("HostName", &hostname)){
ndbout << "Failed to find \"HostName\" for computer " << host
<< " in config" << endl;
ndbout << "Unable to verify own hostname" << endl;
return false;
}
if(NdbHost_GetHostName(buf) != 0){
ndbout << "Unable to get own hostname" << endl;
ndbout << "Unable to verify own hostname" << endl;
return false;
}
}
const char * ip_address;
if(mgmProps->get("IpAddress", &ip_address)){
glob.use_specific_ip = true;
glob.interface_name = strdup(ip_address);
return true;
}
glob.interface_name = strdup(hostname);
#endif
return port;
}
int
MgmtSrvr::getStatPort() const {
#if 0
......@@ -419,7 +508,6 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId,
theWaitState(WAIT_SUBSCRIBE_CONF),
theConfCount(0) {
_ownNodeId = nodeId;
_config = NULL;
_isStatPortActive = false;
_isClusterLogStatActive = false;
......@@ -429,6 +517,8 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId,
_logLevelThreadSleep = 500;
_startedNodeId = 0;
theFacade = 0;
m_newConfig = NULL;
m_configFilename = configFilename;
setCallback(CmdBackupCallback);
......@@ -486,6 +576,15 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId,
_clusterLogLevelList = new NodeLogLevelList();
_props = NULL;
_ownNodeId= 0;
NodeId tmp= nodeId > 0 ? nodeId-1 : 0;
if (getNextFreeNodeId(&tmp, NDB_MGM_NODE_TYPE_MGM)){
_ownNodeId= tmp;
if (nodeId != 0 && nodeId != tmp)
_ownNodeId= 0; // did not get nodeid requested
} else
NDB_ASSERT(0, "Unable to retrieve own node id");
}
......@@ -510,8 +609,7 @@ MgmtSrvr::start()
return false;
}
theFacade = TransporterFacade::start_instance
(_ownNodeId,
(ndb_mgm_configuration*)_config->m_configValues);
(_ownNodeId,(ndb_mgm_configuration*)_config->m_configValues);
if(theFacade == 0) {
DEBUG("MgmtSrvr.cpp: theFacade is NULL.");
......@@ -1896,6 +1994,7 @@ MgmtSrvr::handleReceivedSignal(NdbApiSignal* signal)
int returnCode;
int gsn = signal->readSignalNumber();
switch (gsn) {
case GSN_API_VERSION_CONF: {
if (theWaitState == WAIT_VERSION) {
......@@ -2187,6 +2286,36 @@ MgmtSrvr::getNodeType(NodeId nodeId) const
return nodeTypes[nodeId];
}
bool
MgmtSrvr::getNextFreeNodeId(NodeId * nodeId,
enum ndb_mgm_node_type type) const
{
#if 0
ndbout << "MgmtSrvr::getNextFreeNodeId type=" << type
<< " *nodeid=" << *nodeId << endl;
#endif
NodeId tmp= *nodeId;
if (theFacade && theFacade->theClusterMgr) {
while(getNextNodeId(&tmp, type)){
if (theFacade->theClusterMgr->m_connected_nodes.get(tmp))
continue;
#if 0
ndbout << "MgmtSrvr::getNextFreeNodeId ret=" << tmp << endl;
#endif
*nodeId= tmp;
return true;
}
} else if (getNextNodeId(&tmp, type)){
#if 0
ndbout << "MgmtSrvr::getNextFreeNodeId (theFacade==0) ret=" << tmp << endl;
#endif
*nodeId= tmp;
return true;
}
return false;
}
bool
MgmtSrvr::getNextNodeId(NodeId * nodeId, enum ndb_mgm_node_type type) const
{
......
......@@ -150,10 +150,12 @@ public:
enum LogMode {In, Out, InOut, Off};
/* Constructor */
MgmtSrvr(NodeId nodeId, /* Local nodeid */
const BaseString &config_filename, /* Where to save config */
const BaseString &ndb_config_filename, /* Ndb.cfg filename */
Config * config);
NodeId getOwnNodeId() const {return _ownNodeId;};
/**
* Read (initial) config file, create TransporterFacade,
......@@ -448,6 +450,7 @@ public:
* @return false if none found
*/
bool getNextNodeId(NodeId * _nodeId, enum ndb_mgm_node_type type) const ;
bool getNextFreeNodeId(NodeId * _nodeId, enum ndb_mgm_node_type type) const ;
/**
*
......@@ -492,6 +495,11 @@ public:
* @return statistic port number.
*/
int getStatPort() const;
/**
* Returns the port number.
* @return port number.
*/
int getPort() const;
//**************************************************************************
......
......@@ -121,6 +121,14 @@ ParserRow<MgmApiSession> commands[] = {
MGM_ARG("version", Int, Mandatory, "Configuration version number"),
MGM_ARG("node", Int, Optional, "Node ID"),
MGM_CMD("get nodeid", &MgmApiSession::get_nodeid, ""),
MGM_ARG("version", Int, Mandatory, "Configuration version number"),
MGM_ARG("nodetype", Int, Mandatory, "Node type"),
MGM_ARG("nodeid", Int, Optional, "Node ID"),
MGM_ARG("user", String, Mandatory, "Password"),
MGM_ARG("password", String, Mandatory, "Password"),
MGM_ARG("public key", String, Mandatory, "Public key"),
MGM_CMD("get version", &MgmApiSession::getVersion, ""),
MGM_CMD("get status", &MgmApiSession::getStatus, ""),
......@@ -332,6 +340,82 @@ backward(const char * base, const Properties* reply){
return ret;
}
void
MgmApiSession::get_nodeid(Parser_t::Context &,
const class Properties &args)
{
const char *cmd= "get nodeid reply";
Uint32 version, nodeid= 0, nodetype= 0xff;
const char * user;
const char * password;
const char * public_key;
args.get("version", &version);
args.get("nodetype", &nodetype);
args.get("nodeid", &nodeid);
args.get("user", &user);
args.get("password", &password);
args.get("public key", &public_key);
NodeId free_id= 0;
NodeId tmp= nodeid > 0 ? nodeid-1 : 0;
bool compatible;
switch (nodetype) {
case NODE_TYPE_MGM:
compatible = ndbCompatible_mgmt_api(NDB_VERSION, version);
if (m_mgmsrv.getNextFreeNodeId(&tmp, NDB_MGM_NODE_TYPE_MGM))
free_id= tmp;
break;
case NODE_TYPE_API:
compatible = ndbCompatible_mgmt_api(NDB_VERSION, version);
if (m_mgmsrv.getNextFreeNodeId(&tmp, NDB_MGM_NODE_TYPE_API))
free_id= tmp;
break;
case NODE_TYPE_DB:
compatible = ndbCompatible_mgmt_ndb(NDB_VERSION, version);
if (m_mgmsrv.getNextFreeNodeId(&tmp, NDB_MGM_NODE_TYPE_NDB))
free_id= tmp;
break;
default:
m_output->println(cmd);
m_output->println("result: unknown nodetype %d", nodetype);
m_output->println("");
return;
}
if (nodeid != 0 && free_id != nodeid){
m_output->println(cmd);
m_output->println("result: no free nodeid %d for nodetype %d",
nodeid, nodetype);
m_output->println("");
return;
}
if (free_id == 0){
m_output->println(cmd);
m_output->println("result: no free nodeid for nodetype %d", nodetype);
m_output->println("");
return;
}
#if 0
if (!compatible){
m_output->println(cmd);
m_output->println("result: incompatible version mgmt 0x%x and node 0x%x",
NDB_VERSION, version);
m_output->println("");
return;
}
#endif
m_output->println(cmd);
m_output->println("nodeid: %u", free_id);
m_output->println("result: Ok");
m_output->println("");
return;
}
void
MgmApiSession::getConfig_common(Parser_t::Context &,
const class Properties &args,
......@@ -432,7 +516,6 @@ MgmApiSession::getConfig_common(Parser_t::Context &,
m_output->println("Content-Transfer-Encoding: base64");
m_output->println("");
m_output->println(str.c_str());
m_output->println("");
return;
}
......
......@@ -51,6 +51,7 @@ public:
void getConfig_old(Parser_t::Context &ctx);
#endif /* MGM_GET_CONFIG_BACKWARDS_COMPAT */
void get_nodeid(Parser_t::Context &ctx, const class Properties &args);
void getVersion(Parser_t::Context &ctx, const class Properties &args);
void getStatus(Parser_t::Context &ctx, const class Properties &args);
void getInfoClusterLog(Parser_t::Context &ctx, const class Properties &args);
......
......@@ -20,7 +20,7 @@
#include "MgmtSrvr.hpp"
#include "EventLogger.hpp"
#include "Config.hpp"
#include <Config.hpp>
#include "InitConfigFileParser.hpp"
#include <SocketServer.hpp>
#include "Services.hpp"
......@@ -88,7 +88,6 @@ static MgmGlobals glob;
******************************************************************************/
static bool readLocalConfig();
static bool readGlobalConfig();
static bool setPortNo();
/**
* Global variables
......@@ -146,7 +145,9 @@ NDB_MAIN(mgmsrv){
exit(1);
}
glob.socketServer = new SocketServer();
MgmApiService * mapi = new MgmApiService();
MgmStatService * mstat = new MgmStatService();
/****************************
......@@ -157,9 +158,26 @@ NDB_MAIN(mgmsrv){
if (!readGlobalConfig())
goto error_end;
if (!setPortNo())
glob.mgmObject = new MgmtSrvr(glob.localNodeId,
BaseString(glob.config_filename),
BaseString(glob.local_config_filename == 0 ?
"" : glob.local_config_filename),
glob.cluster_config);
glob.cluster_config = 0;
glob.localNodeId= glob.mgmObject->getOwnNodeId();
if (glob.localNodeId == 0)
goto error_end;
glob.port= glob.mgmObject->getPort();
if (glob.port == 0)
goto error_end;
glob.interface_name = 0;
glob.use_specific_ip = false;
if(!glob.use_specific_ip){
if(!glob.socketServer->tryBind(glob.port, glob.interface_name)){
ndbout_c("Unable to setup port: %s:%d!\n"
......@@ -190,15 +208,8 @@ NDB_MAIN(mgmsrv){
goto error_end;
}
glob.mgmObject = new MgmtSrvr(glob.localNodeId,
BaseString(glob.config_filename),
BaseString(glob.local_config_filename == 0 ? "" : glob.local_config_filename),
glob.cluster_config);
glob.cluster_config = 0;
if(!glob.mgmObject->check_start()){
ndbout_c("Unable to start management server.");
ndbout_c("Unable to check start management server.");
ndbout_c("Probably caused by illegal initial configuration file.");
goto error_end;
}
......@@ -343,108 +354,3 @@ readGlobalConfig() {
}
return true;
}
/**
* @fn setPortNo
* @param glob : Global variables
* @return true if success, false otherwise.
*
* Port number:
* 2. Use port number from global configuration file
* 4. Use port number for statistics from global configuration file
*/
static bool
setPortNo(){
const Properties *mgmProps;
ndb_mgm_configuration_iterator * iter =
ndb_mgm_create_configuration_iterator(glob.cluster_config->m_configValues,
CFG_SECTION_NODE);
if(iter == 0)
return false;
if(ndb_mgm_find(iter, CFG_NODE_ID, glob.localNodeId) != 0){
ndbout << "Could not retrieve configuration for Node "
<< glob.localNodeId << " in config file." << endl
<< "Have you set correct NodeId for this node?" << endl;
ndb_mgm_destroy_iterator(iter);
return false;
}
unsigned type;
if(ndb_mgm_get_int_parameter(iter, CFG_TYPE_OF_SECTION, &type) != 0 ||
type != NODE_TYPE_MGM){
ndbout << "Local node id " << glob.localNodeId
<< " is not defined as management server" << endl
<< "Have you set correct NodeId for this node?" << endl;
return false;
}
/************
* Set Port *
************/
Uint32 tmp = 0;
if(ndb_mgm_get_int_parameter(iter, CFG_MGM_PORT, &tmp) != 0){
ndbout << "Could not find PortNumber in the configuration file." << endl;
return false;
}
glob.port = tmp;
/*****************
* Set Stat Port *
*****************/
#if 0
if (!mgmProps->get("PortNumberStats", &tmp)){
ndbout << "Could not find PortNumberStats in the configuration file."
<< endl;
return false;
}
glob.port_stats = tmp;
#endif
#if 0
const char * host;
if(ndb_mgm_get_string_parameter(iter, mgmProps->get("ExecuteOnComputer", host)){
ndbout << "Failed to find \"ExecuteOnComputer\" for my node" << endl;
ndbout << "Unable to verify own hostname" << endl;
return false;
}
const char * hostname;
{
const Properties * p;
char buf[255];
snprintf(buf, sizeof(buf), "Computer_%s", host.c_str());
if(!glob.cluster_config->get(buf, &p)){
ndbout << "Failed to find computer " << host << " in config" << endl;
ndbout << "Unable to verify own hostname" << endl;
return false;
}
if(!p->get("HostName", &hostname)){
ndbout << "Failed to find \"HostName\" for computer " << host
<< " in config" << endl;
ndbout << "Unable to verify own hostname" << endl;
return false;
}
if(NdbHost_GetHostName(buf) != 0){
ndbout << "Unable to get own hostname" << endl;
ndbout << "Unable to verify own hostname" << endl;
return false;
}
}
const char * ip_address;
if(mgmProps->get("IpAddress", &ip_address)){
glob.use_specific_ip = true;
glob.interface_name = strdup(ip_address);
return true;
}
glob.interface_name = strdup(hostname);
#endif
glob.interface_name = 0;
glob.use_specific_ip = false;
return true;
}
......@@ -295,11 +295,14 @@ ClusterMgr::execAPI_REGREQ(const Uint32 * theData){
}
int global_mgmt_server_check = 0; // set to one in mgmtsrvr main;
void
ClusterMgr::execAPI_REGCONF(const Uint32 * theData){
const ApiRegConf * const apiRegConf = (ApiRegConf *)&theData[0];
const NodeId nodeId = refToNode(apiRegConf->qmgrRef);
m_connected_nodes.assign(apiRegConf->connected_nodes);
#if 0
ndbout_c("ClusterMgr: Recd API_REGCONF from node %d", nodeId);
#endif
......@@ -309,6 +312,7 @@ ClusterMgr::execAPI_REGCONF(const Uint32 * theData){
Node & node = theNodes[nodeId];
assert(node.defined == true);
assert(node.connected == true);
if(node.m_info.m_version != apiRegConf->version){
node.m_info.m_version = apiRegConf->version;
if (global_mgmt_server_check == 1)
......@@ -422,6 +426,8 @@ ClusterMgr::reportDisconnected(NodeId nodeId){
void
ClusterMgr::reportNodeFailed(NodeId nodeId){
m_connected_nodes.clear(nodeId);
Node & theNode = theNodes[nodeId];
theNode.m_alive = false;
......
......@@ -78,6 +78,7 @@ public:
const Node & getNodeInfo(NodeId) const;
Uint32 getNoOfConnectedNodes() const;
NodeBitmask m_connected_nodes;
private:
Uint32 noOfConnectedNodes;
......
......@@ -39,6 +39,7 @@
#endif
//#define REPORT_TRANSPORTER
//#define API_TRACE;
#if defined DEBUG_TRANSPORTER
#define TRP_DEBUG(t) ndbout << __FILE__ << ":" << __LINE__ << ":" << t << endl;
......@@ -440,7 +441,17 @@ runSendRequest_C(void * me)
void TransporterFacade::threadMainSend(void)
{
SocketServer socket_server;
theTransporterRegistry->startSending();
if (!theTransporterRegistry->start_service(socket_server))
NDB_ASSERT(0, "Unable to start theTransporterRegistry->start_service");
if (!theTransporterRegistry->start_clients())
NDB_ASSERT(0, "Unable to start theTransporterRegistry->start_clients");
socket_server.startServer();
while(!theStopReceive) {
NdbSleep_MilliSleep(10);
NdbMutex_Lock(theMutexPtr);
......@@ -451,6 +462,11 @@ void TransporterFacade::threadMainSend(void)
NdbMutex_Unlock(theMutexPtr);
}
theTransporterRegistry->stopSending();
socket_server.stopServer();
socket_server.stopSessions();
theTransporterRegistry->stop_clients();
}
extern "C"
......@@ -466,7 +482,7 @@ void TransporterFacade::threadMainReceive(void)
{
theTransporterRegistry->startReceiving();
NdbMutex_Lock(theMutexPtr);
theTransporterRegistry->checkConnections();
theTransporterRegistry->update_connections();
NdbMutex_Unlock(theMutexPtr);
while(!theStopReceive) {
for(int i = 0; i<10; i++){
......@@ -478,7 +494,7 @@ void TransporterFacade::threadMainReceive(void)
}
}
NdbMutex_Lock(theMutexPtr);
theTransporterRegistry->checkConnections();
theTransporterRegistry->update_connections();
NdbMutex_Unlock(theMutexPtr);
}//while
theTransporterRegistry->stopReceiving();
......@@ -875,13 +891,13 @@ TransporterFacade::sendFragmentedSignalUnCond(NdbApiSignal* aSignal,
void
TransporterFacade::doConnect(int aNodeId){
theTransporterRegistry->setIOState(aNodeId, NoHalt);
theTransporterRegistry->setPerformState(aNodeId, PerformConnect);
theTransporterRegistry->do_connect(aNodeId);
}
void
TransporterFacade::doDisconnect(int aNodeId)
{
theTransporterRegistry->setPerformState(aNodeId, PerformDisconnect);
theTransporterRegistry->do_disconnect(aNodeId);
}
void
......@@ -906,7 +922,7 @@ TransporterFacade::ownId() const
bool
TransporterFacade::isConnected(NodeId aNodeId){
return theTransporterRegistry->performState(aNodeId) == PerformIO;
return theTransporterRegistry->is_connected(aNodeId);
}
NodeId
......
......@@ -110,7 +110,6 @@ public:
// Close this block number
int close_local(BlockNumber blockNumber);
void setState(Uint32 aNodeId, PerformState aState);
private:
/**
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment