fixed compile of shm transporter

parent dddcc371
...@@ -88,11 +88,11 @@ ...@@ -88,11 +88,11 @@
#define CFG_CONNECTION_CHECKSUM 403 #define CFG_CONNECTION_CHECKSUM 403
#define CFG_CONNECTION_NODE_1_SYSTEM 404 #define CFG_CONNECTION_NODE_1_SYSTEM 404
#define CFG_CONNECTION_NODE_2_SYSTEM 405 #define CFG_CONNECTION_NODE_2_SYSTEM 405
#define CFG_CONNECTION_SERVER_PORT 406
#define CFG_TCP_HOSTNAME_1 450 #define CFG_TCP_HOSTNAME_1 450
#define CFG_TCP_HOSTNAME_2 451 #define CFG_TCP_HOSTNAME_2 451
#define CFG_TCP_SERVER 452 #define CFG_TCP_SERVER 452
#define CFG_TCP_SERVER_PORT 453
#define CFG_TCP_SEND_BUFFER_SIZE 454 #define CFG_TCP_SEND_BUFFER_SIZE 454
#define CFG_TCP_RECEIVE_BUFFER_SIZE 455 #define CFG_TCP_RECEIVE_BUFFER_SIZE 455
#define CFG_TCP_PROXY 456 #define CFG_TCP_PROXY 456
......
...@@ -69,6 +69,7 @@ struct TCP_TransporterConfiguration { ...@@ -69,6 +69,7 @@ struct TCP_TransporterConfiguration {
* SHM Transporter Configuration * SHM Transporter Configuration
*/ */
struct SHM_TransporterConfiguration { struct SHM_TransporterConfiguration {
Uint32 port;
NodeId remoteNodeId; NodeId remoteNodeId;
NodeId localNodeId; NodeId localNodeId;
bool compression; bool compression;
......
...@@ -104,6 +104,7 @@ ConfigInfo::m_SectionRules[] = { ...@@ -104,6 +104,7 @@ ConfigInfo::m_SectionRules[] = {
{ "OSE", fixHostname, "HostName2" }, { "OSE", fixHostname, "HostName2" },
{ "TCP", fixPortNumber, 0 }, // has to come after fixHostName { "TCP", fixPortNumber, 0 }, // has to come after fixHostName
{ "SHM", fixPortNumber, 0 }, // has to come after fixHostName
//{ "SHM", fixShmKey, 0 }, //{ "SHM", fixShmKey, 0 },
/** /**
...@@ -1559,14 +1560,14 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { ...@@ -1559,14 +1560,14 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
0x7FFFFFFF }, 0x7FFFFFFF },
{ {
CFG_TCP_SERVER_PORT, CFG_CONNECTION_SERVER_PORT,
"PortNumber", "PortNumber",
"TCP", "TCP",
"Port used for this transporter", "Port used for this transporter",
ConfigInfo::USED, ConfigInfo::USED,
false, false,
ConfigInfo::INT, ConfigInfo::INT,
NDB_BASE_PORT+2, MANDATORY,
0, 0,
0x7FFFFFFF }, 0x7FFFFFFF },
...@@ -1695,6 +1696,18 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { ...@@ -1695,6 +1696,18 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
0, 0,
0x7FFFFFFF }, 0x7FFFFFFF },
{
CFG_CONNECTION_SERVER_PORT,
"PortNumber",
"SHM",
"Port used for this transporter",
ConfigInfo::USED,
false,
ConfigInfo::INT,
MANDATORY,
0,
0x7FFFFFFF },
{ {
KEY_INTERNAL, KEY_INTERNAL,
"ProcessId1", "ProcessId1",
......
...@@ -365,6 +365,16 @@ IPCConfig::configureTransporters(Uint32 nodeId, ...@@ -365,6 +365,16 @@ IPCConfig::configureTransporters(Uint32 nodeId,
Uint32 type = ~0; Uint32 type = ~0;
if(iter.get(CFG_TYPE_OF_SECTION, &type)) continue; if(iter.get(CFG_TYPE_OF_SECTION, &type)) continue;
Uint32 tmp_server_port= 0;
if(iter.get(CFG_CONNECTION_SERVER_PORT, &tmp_server_port)) break;
if (nodeId <= nodeId1 && nodeId <= nodeId2) {
if (server_port && server_port != tmp_server_port) {
ndbout << "internal error in config setup of server ports line= " << __LINE__ << endl;
exit(-1);
}
server_port= tmp_server_port;
}
switch(type){ switch(type){
case CONNECTION_TYPE_SHM:{ case CONNECTION_TYPE_SHM:{
SHM_TransporterConfiguration conf; SHM_TransporterConfiguration conf;
...@@ -378,6 +388,8 @@ IPCConfig::configureTransporters(Uint32 nodeId, ...@@ -378,6 +388,8 @@ IPCConfig::configureTransporters(Uint32 nodeId,
if(iter.get(CFG_SHM_KEY, &conf.shmKey)) break; if(iter.get(CFG_SHM_KEY, &conf.shmKey)) break;
if(iter.get(CFG_SHM_BUFFER_MEM, &conf.shmSize)) break; if(iter.get(CFG_SHM_BUFFER_MEM, &conf.shmSize)) break;
conf.port= tmp_server_port;
if(!tr.createTransporter(&conf)){ if(!tr.createTransporter(&conf)){
ndbout << "Failed to create SHM Transporter from: " ndbout << "Failed to create SHM Transporter from: "
<< conf.localNodeId << " to: " << conf.remoteNodeId << endl; << conf.localNodeId << " to: " << conf.remoteNodeId << endl;
...@@ -429,10 +441,11 @@ IPCConfig::configureTransporters(Uint32 nodeId, ...@@ -429,10 +441,11 @@ IPCConfig::configureTransporters(Uint32 nodeId,
if(iter.get(CFG_TCP_HOSTNAME_1, &host1)) break; if(iter.get(CFG_TCP_HOSTNAME_1, &host1)) break;
if(iter.get(CFG_TCP_HOSTNAME_2, &host2)) break; if(iter.get(CFG_TCP_HOSTNAME_2, &host2)) break;
if(iter.get(CFG_TCP_SERVER_PORT, &conf.port)) break;
if(iter.get(CFG_TCP_SEND_BUFFER_SIZE, &conf.sendBufferSize)) break; if(iter.get(CFG_TCP_SEND_BUFFER_SIZE, &conf.sendBufferSize)) break;
if(iter.get(CFG_TCP_RECEIVE_BUFFER_SIZE, &conf.maxReceiveSize)) break; if(iter.get(CFG_TCP_RECEIVE_BUFFER_SIZE, &conf.maxReceiveSize)) break;
conf.port= tmp_server_port;
const char * proxy; const char * proxy;
if (!iter.get(CFG_TCP_PROXY, &proxy)) { if (!iter.get(CFG_TCP_PROXY, &proxy)) {
if (strlen(proxy) > 0 && nodeId2 == nodeId) { if (strlen(proxy) > 0 && nodeId2 == nodeId) {
...@@ -441,14 +454,6 @@ IPCConfig::configureTransporters(Uint32 nodeId, ...@@ -441,14 +454,6 @@ IPCConfig::configureTransporters(Uint32 nodeId,
} }
} }
if (nodeId <= nodeId1 && nodeId <= nodeId2) {
if (server_port && server_port != conf.port) {
ndbout << "internal error in config setup of server ports line= " << __LINE__ << endl;
exit(-1);
}
server_port= conf.port;
}
conf.localNodeId = nodeId; conf.localNodeId = nodeId;
conf.remoteNodeId = remoteNodeId; conf.remoteNodeId = remoteNodeId;
conf.localHostName = (nodeId == nodeId1 ? host1 : host2); conf.localHostName = (nodeId == nodeId1 ? host1 : host2);
......
...@@ -24,12 +24,13 @@ NdbConfig_AllocHomePath(int _len) ...@@ -24,12 +24,13 @@ NdbConfig_AllocHomePath(int _len)
const char *path= NdbEnv_GetEnv("NDB_HOME", 0, 0); const char *path= NdbEnv_GetEnv("NDB_HOME", 0, 0);
int len= _len; int len= _len;
int path_len= 0; int path_len= 0;
char *buf;
if (path) if (path)
path_len= strlen(path); path_len= strlen(path);
len+= path_len; len+= path_len;
char *buf= malloc(len); buf= malloc(len);
if (path_len > 0) if (path_len > 0)
snprintf(buf, len, "%s%c", path, DIR_SEPARATOR); snprintf(buf, len, "%s%c", path, DIR_SEPARATOR);
else else
......
...@@ -29,20 +29,19 @@ ...@@ -29,20 +29,19 @@
#endif #endif
SHM_Transporter::SHM_Transporter(NodeId lNodeId, SHM_Transporter::SHM_Transporter(TransporterRegistry &t_reg,
const char *lHostName,
const char *rHostName,
int r_port,
NodeId lNodeId,
NodeId rNodeId, NodeId rNodeId,
key_t _shmKey,
Uint32 _shmSize,
bool compression, bool compression,
bool checksum, bool checksum,
bool signalId) : bool signalId,
Transporter(lNodeId, key_t _shmKey,
rNodeId, Uint32 _shmSize) :
0, Transporter(t_reg, lHostName, rHostName, r_port, lNodeId, rNodeId,
compression, 0, compression, checksum, signalId),
checksum,
signalId),
isServer(lNodeId < rNodeId),
shmKey(_shmKey), shmKey(_shmKey),
shmSize(_shmSize) shmSize(_shmSize)
{ {
...@@ -68,16 +67,6 @@ SHM_Transporter::initTransporter(){ ...@@ -68,16 +67,6 @@ SHM_Transporter::initTransporter(){
return true; return true;
} }
bool
SHM_Transporter::connectImpl(Uint32 timeOutMillis){
bool res;
if(isServer)
res = connectServer(timeOutMillis);
else
res = connectClient(timeOutMillis);
return res;
}
void void
SHM_Transporter::setupBuffers(){ SHM_Transporter::setupBuffers(){
Uint32 sharedSize = 0; Uint32 sharedSize = 0;
......
...@@ -32,13 +32,17 @@ typedef Uint32 key_t; ...@@ -32,13 +32,17 @@ typedef Uint32 key_t;
class SHM_Transporter : public Transporter { class SHM_Transporter : public Transporter {
friend class TransporterRegistry; friend class TransporterRegistry;
public: public:
SHM_Transporter(NodeId lNodeId, SHM_Transporter(TransporterRegistry &,
const char *lHostName,
const char *rHostName,
int r_port,
NodeId lNodeId,
NodeId rNodeId, NodeId rNodeId,
key_t shmKey,
Uint32 shmSize,
bool compression, bool compression,
bool checksum, bool checksum,
bool signalId); bool signalId,
key_t shmKey,
Uint32 shmSize);
/** /**
* SHM destructor * SHM destructor
...@@ -74,14 +78,6 @@ protected: ...@@ -74,14 +78,6 @@ protected:
*/ */
void disconnectImpl(); void disconnectImpl();
/**
* Invokes the connectServer or connectClient.
* @param timeOutMillis - the timeout the connect thread waits before
* retrying.
* @return True if connectImpl successful, otherwise false.
*/
bool connectImpl(Uint32 timeOutMillis);
/** /**
* Blocking * Blocking
* *
...@@ -94,7 +90,7 @@ protected: ...@@ -94,7 +90,7 @@ protected:
* i.e., both agrees that the other one has setup the segment. * i.e., both agrees that the other one has setup the segment.
* Otherwise false. * Otherwise false.
*/ */
bool connectServer(Uint32 timeOutMillis); virtual bool connect_server_impl(NDB_SOCKET_TYPE sockfd);
/** /**
* Blocking * Blocking
...@@ -108,7 +104,9 @@ protected: ...@@ -108,7 +104,9 @@ protected:
* i.e., both agrees that the other one has setup the segment. * i.e., both agrees that the other one has setup the segment.
* Otherwise false. * Otherwise false.
*/ */
bool connectClient(Uint32 timeOutMillis); virtual bool connect_client_impl(NDB_SOCKET_TYPE sockfd);
bool connect_common(NDB_SOCKET_TYPE sockfd);
/** /**
...@@ -127,7 +125,6 @@ private: ...@@ -127,7 +125,6 @@ private:
bool _shmSegCreated; bool _shmSegCreated;
bool _attached; bool _attached;
const bool isServer;
key_t shmKey; key_t shmKey;
volatile Uint32 * serverStatusFlag; volatile Uint32 * serverStatusFlag;
volatile Uint32 * clientStatusFlag; volatile Uint32 * clientStatusFlag;
......
...@@ -23,83 +23,98 @@ ...@@ -23,83 +23,98 @@
#include <NdbSleep.h> #include <NdbSleep.h>
#include <NdbOut.hpp> #include <NdbOut.hpp>
#include <InputStream.hpp>
#include <OutputStream.hpp>
#include <sys/ipc.h> #include <sys/ipc.h>
#include <sys/shm.h> #include <sys/shm.h>
bool bool
SHM_Transporter::connectServer(Uint32 timeOutMillis){ SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
{
SocketOutputStream s_output(sockfd);
SocketInputStream s_input(sockfd);
if(!_shmSegCreated){ if(!_shmSegCreated){
shmId = shmget(shmKey, shmSize, IPC_CREAT | 960); shmId = shmget(shmKey, shmSize, IPC_CREAT | 960);
if(shmId == -1){ if(shmId == -1){
perror("shmget: "); perror("shmget: ");
reportThreadError(remoteNodeId, TE_SHM_UNABLE_TO_CREATE_SEGMENT); report_error(TE_SHM_UNABLE_TO_CREATE_SEGMENT);
NdbSleep_MilliSleep(timeOutMillis); NdbSleep_MilliSleep(m_timeOutMillis);
NDB_CLOSE_SOCKET(sockfd);
return false; return false;
} }
_shmSegCreated = true; _shmSegCreated = true;
} }
if(!_attached){ s_output.println("shm server 1 ok");
shmBuf = (char *)shmat(shmId, 0, 0);
if(shmBuf == 0){ char buf[256];
perror("shmat: "); if (s_input.gets(buf, 256) == 0) {
reportThreadError(remoteNodeId, TE_SHM_UNABLE_TO_ATTACH_SEGMENT); NDB_CLOSE_SOCKET(sockfd);
NdbSleep_MilliSleep(timeOutMillis);
return false;
}
_attached = true;
}
struct shmid_ds info;
const int res = shmctl(shmId, IPC_STAT, &info);
if(res == -1){
perror("shmctl: ");
reportThreadError(remoteNodeId, TE_SHM_IPC_STAT);
NdbSleep_MilliSleep(timeOutMillis);
return false; return false;
} }
if(info.shm_nattch == 2 && !setupBuffersDone) {
setupBuffers();
setupBuffersDone=true;
}
if(setupBuffersDone) { int r= connect_common(sockfd);
NdbSleep_MilliSleep(timeOutMillis);
if(*serverStatusFlag==1 && *clientStatusFlag==1)
return true;
}
if(info.shm_nattch > 2){ if (r) {
reportThreadError(remoteNodeId, TE_SHM_DISCONNECT); s_output.println("shm server 2 ok");
NdbSleep_MilliSleep(timeOutMillis); if (s_input.gets(buf, 256) == 0) {
return false; NDB_CLOSE_SOCKET(sockfd);
return false;
}
} }
NdbSleep_MilliSleep(timeOutMillis); NDB_CLOSE_SOCKET(sockfd);
return false; return r;
} }
bool bool
SHM_Transporter::connectClient(Uint32 timeOutMillis){ SHM_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
if(!_shmSegCreated){ {
SocketInputStream s_input(sockfd);
SocketOutputStream s_output(sockfd);
char buf[256];
if (s_input.gets(buf, 256) == 0) {
NDB_CLOSE_SOCKET(sockfd);
return false;
}
if(!_shmSegCreated){
shmId = shmget(shmKey, shmSize, 0); shmId = shmget(shmKey, shmSize, 0);
if(shmId == -1){ if(shmId == -1){
NdbSleep_MilliSleep(timeOutMillis); NdbSleep_MilliSleep(m_timeOutMillis);
NDB_CLOSE_SOCKET(sockfd);
return false; return false;
} }
_shmSegCreated = true; _shmSegCreated = true;
} }
s_output.println("shm client 1 ok");
int r= connect_common(sockfd);
if (r) {
if (s_input.gets(buf, 256) == 0) {
NDB_CLOSE_SOCKET(sockfd);
return false;
}
s_output.println("shm client 2 ok");
}
NDB_CLOSE_SOCKET(sockfd);
return r;
}
bool
SHM_Transporter::connect_common(NDB_SOCKET_TYPE sockfd)
{
if(!_attached){ if(!_attached){
shmBuf = (char *)shmat(shmId, 0, 0); shmBuf = (char *)shmat(shmId, 0, 0);
if(shmBuf == 0){ if(shmBuf == 0){
reportThreadError(remoteNodeId, TE_SHM_UNABLE_TO_ATTACH_SEGMENT); report_error(TE_SHM_UNABLE_TO_ATTACH_SEGMENT);
NdbSleep_MilliSleep(timeOutMillis); NdbSleep_MilliSleep(m_timeOutMillis);
return false; return false;
} }
_attached = true; _attached = true;
...@@ -109,8 +124,8 @@ SHM_Transporter::connectClient(Uint32 timeOutMillis){ ...@@ -109,8 +124,8 @@ SHM_Transporter::connectClient(Uint32 timeOutMillis){
const int res = shmctl(shmId, IPC_STAT, &info); const int res = shmctl(shmId, IPC_STAT, &info);
if(res == -1){ if(res == -1){
reportThreadError(remoteNodeId, TE_SHM_IPC_STAT); report_error(TE_SHM_IPC_STAT);
NdbSleep_MilliSleep(timeOutMillis); NdbSleep_MilliSleep(m_timeOutMillis);
return false; return false;
} }
...@@ -121,18 +136,18 @@ SHM_Transporter::connectClient(Uint32 timeOutMillis){ ...@@ -121,18 +136,18 @@ SHM_Transporter::connectClient(Uint32 timeOutMillis){
} }
if(setupBuffersDone) { if(setupBuffersDone) {
NdbSleep_MilliSleep(timeOutMillis); NdbSleep_MilliSleep(m_timeOutMillis);
if(*serverStatusFlag==1 && *clientStatusFlag==1) if(*serverStatusFlag==1 && *clientStatusFlag==1)
return true; return true;
} }
if(info.shm_nattch > 2){ if(info.shm_nattch > 2){
reportThreadError(remoteNodeId, TE_SHM_DISCONNECT); report_error(TE_SHM_DISCONNECT);
NdbSleep_MilliSleep(timeOutMillis); NdbSleep_MilliSleep(m_timeOutMillis);
return false; return false;
} }
NdbSleep_MilliSleep(timeOutMillis); NdbSleep_MilliSleep(m_timeOutMillis);
return false; return false;
} }
...@@ -141,12 +156,12 @@ SHM_Transporter::checkConnected(){ ...@@ -141,12 +156,12 @@ SHM_Transporter::checkConnected(){
struct shmid_ds info; struct shmid_ds info;
const int res = shmctl(shmId, IPC_STAT, &info); const int res = shmctl(shmId, IPC_STAT, &info);
if(res == -1){ if(res == -1){
reportError(callbackObj, remoteNodeId, TE_SHM_IPC_STAT); report_error(TE_SHM_IPC_STAT);
return false; return false;
} }
if(info.shm_nattch != 2){ if(info.shm_nattch != 2){
reportError(callbackObj, remoteNodeId, TE_SHM_DISCONNECT); report_error(TE_SHM_DISCONNECT);
return false; return false;
} }
return true; return true;
...@@ -168,7 +183,7 @@ SHM_Transporter::disconnectImpl(){ ...@@ -168,7 +183,7 @@ SHM_Transporter::disconnectImpl(){
if(isServer && _shmSegCreated){ if(isServer && _shmSegCreated){
const int res = shmctl(shmId, IPC_RMID, 0); const int res = shmctl(shmId, IPC_RMID, 0);
if(res == -1){ if(res == -1){
reportError(callbackObj, remoteNodeId, TE_SHM_UNABLE_TO_REMOVE_SEGMENT); report_error(TE_SHM_UNABLE_TO_REMOVE_SEGMENT);
return; return;
} }
_shmSegCreated = false; _shmSegCreated = false;
......
...@@ -352,13 +352,17 @@ TransporterRegistry::createTransporter(SHM_TransporterConfiguration *config) { ...@@ -352,13 +352,17 @@ TransporterRegistry::createTransporter(SHM_TransporterConfiguration *config) {
if(theTransporters[config->remoteNodeId] != NULL) if(theTransporters[config->remoteNodeId] != NULL)
return false; return false;
SHM_Transporter * t = new SHM_Transporter(config->localNodeId, SHM_Transporter * t = new SHM_Transporter(*this,
"localhost",
"localhost",
config->port,
localNodeId,
config->remoteNodeId, config->remoteNodeId,
config->shmKey,
config->shmSize,
config->compression, config->compression,
config->checksum, config->checksum,
config->signalId config->signalId,
config->shmKey,
config->shmSize
); );
if (t == NULL) if (t == NULL)
return false; return false;
......
...@@ -390,11 +390,11 @@ MgmApiSession::get_nodeid(Parser_t::Context &, ...@@ -390,11 +390,11 @@ MgmApiSession::get_nodeid(Parser_t::Context &,
} }
struct sockaddr addr; struct sockaddr addr;
socklen_t addrlen; socklen_t addrlen= sizeof(addr);
int r; int r;
if (r= getsockname(m_socket, &addr, &addrlen)) { if (r= getpeername(m_socket, &addr, &addrlen)) {
m_output->println(cmd); m_output->println(cmd);
m_output->println("result: getsockname(%d) failed, err= %d", m_socket, r); m_output->println("result: getpeername(%d) failed, err= %d", m_socket, r);
m_output->println(""); m_output->println("");
return; return;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment