Commit f507b377 authored by unknown's avatar unknown

ndb - Handle shm-transporter wo/ busy-wait + also

      handled mixed tcp/shm transporters
+ bug#7124


ndb/src/common/transporter/SHM_Transporter.cpp:
  Add remote/own pid for signaling availability of data on shm-segment
ndb/src/common/transporter/SHM_Transporter.hpp:
  Add remote/own pid for signaling availability of data on shm-segment
ndb/src/common/transporter/TransporterRegistry.cpp:
  Add remote/own pid for signaling availability of data on shm-segment
ndb/src/kernel/main.cpp:
  Set pid to use for shm-signaling
ndb/src/mgmsrv/ConfigInfo.cpp:
  bug#7124
ndb/src/ndbapi/TransporterFacade.cpp:
  Set pid to use for shm-signaling
parent f67fe154
...@@ -26,6 +26,8 @@ ...@@ -26,6 +26,8 @@
#include <InputStream.hpp> #include <InputStream.hpp>
#include <OutputStream.hpp> #include <OutputStream.hpp>
extern int g_shm_pid;
SHM_Transporter::SHM_Transporter(TransporterRegistry &t_reg, SHM_Transporter::SHM_Transporter(TransporterRegistry &t_reg,
const char *lHostName, const char *lHostName,
const char *rHostName, const char *rHostName,
...@@ -52,6 +54,7 @@ SHM_Transporter::SHM_Transporter(TransporterRegistry &t_reg, ...@@ -52,6 +54,7 @@ SHM_Transporter::SHM_Transporter(TransporterRegistry &t_reg,
#ifdef DEBUG_TRANSPORTER #ifdef DEBUG_TRANSPORTER
printf("shm key (%d - %d) = %d\n", lNodeId, rNodeId, shmKey); printf("shm key (%d - %d) = %d\n", lNodeId, rNodeId, shmKey);
#endif #endif
m_signal_threshold = 4096;
} }
SHM_Transporter::~SHM_Transporter(){ SHM_Transporter::~SHM_Transporter(){
...@@ -182,42 +185,6 @@ SHM_Transporter::setupBuffers(){ ...@@ -182,42 +185,6 @@ SHM_Transporter::setupBuffers(){
#endif #endif
} }
#if 0
SendStatus
SHM_Transporter::prepareSend(const SignalHeader * const signalHeader,
Uint8 prio,
const Uint32 * const signalData,
const LinearSegmentPtr ptr[3],
bool force){
if(isConnected()){
const Uint32 lenBytes = m_packer.getMessageLength(signalHeader, ptr);
Uint32 * insertPtr = (Uint32 *)writer->getWritePtr(lenBytes);
if(insertPtr != 0){
m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);
/**
* Do funky membar stuff
*/
writer->updateWritePtr(lenBytes);
return SEND_OK;
} else {
// NdbSleep_MilliSleep(3);
//goto tryagain;
return SEND_BUFFER_FULL;
}
}
return SEND_DISCONNECTED;
}
#endif
bool bool
SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd) SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
{ {
...@@ -247,10 +214,17 @@ SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd) ...@@ -247,10 +214,17 @@ SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
} }
// Send ok to client // Send ok to client
s_output.println("shm server 1 ok"); s_output.println("shm server 1 ok: %d", g_shm_pid);
// Wait for ok from client // Wait for ok from client
if (s_input.gets(buf, 256) == 0) { if (s_input.gets(buf, 256) == 0)
{
NDB_CLOSE_SOCKET(sockfd);
DBUG_RETURN(false);
}
if(sscanf(buf, "shm client 1 ok: %d", &m_remote_pid) != 1)
{
NDB_CLOSE_SOCKET(sockfd); NDB_CLOSE_SOCKET(sockfd);
DBUG_RETURN(false); DBUG_RETURN(false);
} }
...@@ -289,6 +263,12 @@ SHM_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd) ...@@ -289,6 +263,12 @@ SHM_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
DBUG_RETURN(false); DBUG_RETURN(false);
} }
if(sscanf(buf, "shm server 1 ok: %d", &m_remote_pid) != 1)
{
NDB_CLOSE_SOCKET(sockfd);
DBUG_RETURN(false);
}
// Create // Create
if(!_shmSegCreated){ if(!_shmSegCreated){
if (!ndb_shm_get()) { if (!ndb_shm_get()) {
...@@ -313,10 +293,10 @@ SHM_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd) ...@@ -313,10 +293,10 @@ SHM_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
} }
// Send ok to server // Send ok to server
s_output.println("shm client 1 ok"); s_output.println("shm client 1 ok: %d", g_shm_pid);
int r= connect_common(sockfd); int r= connect_common(sockfd);
if (r) { if (r) {
// Wait for ok from server // Wait for ok from server
if (s_input.gets(buf, 256) == 0) { if (s_input.gets(buf, 256) == 0) {
...@@ -344,18 +324,33 @@ SHM_Transporter::connect_common(NDB_SOCKET_TYPE sockfd) ...@@ -344,18 +324,33 @@ SHM_Transporter::connect_common(NDB_SOCKET_TYPE sockfd)
return false; return false;
} }
if(!setupBuffersDone) { if(!setupBuffersDone)
{
setupBuffers(); setupBuffers();
setupBuffersDone=true; setupBuffersDone=true;
} }
if(setupBuffersDone) { if(setupBuffersDone)
{
NdbSleep_MilliSleep(m_timeOutMillis); NdbSleep_MilliSleep(m_timeOutMillis);
if(*serverStatusFlag == 1 && *clientStatusFlag == 1) if(*serverStatusFlag == 1 && *clientStatusFlag == 1)
{
m_last_signal = 0;
return true; return true;
}
} }
DBUG_PRINT("error", ("Failed to set up buffers to node %d", DBUG_PRINT("error", ("Failed to set up buffers to node %d",
remoteNodeId)); remoteNodeId));
return false; return false;
} }
void
SHM_Transporter::doSend()
{
if(m_last_signal)
{
m_last_signal = 0;
kill(m_remote_pid, SIGUSR1);
}
}
...@@ -47,18 +47,25 @@ public: ...@@ -47,18 +47,25 @@ public:
* SHM destructor * SHM destructor
*/ */
virtual ~SHM_Transporter(); virtual ~SHM_Transporter();
/** /**
* Do initialization * Do initialization
*/ */
bool initTransporter(); bool initTransporter();
Uint32 * getWritePtr(Uint32 lenBytes, Uint32 prio){ Uint32 * getWritePtr(Uint32 lenBytes, Uint32 prio)
{
return (Uint32 *)writer->getWritePtr(lenBytes); return (Uint32 *)writer->getWritePtr(lenBytes);
} }
void updateWritePtr(Uint32 lenBytes, Uint32 prio){ void updateWritePtr(Uint32 lenBytes, Uint32 prio)
{
writer->updateWritePtr(lenBytes); writer->updateWritePtr(lenBytes);
m_last_signal += lenBytes;
if(m_last_signal >= m_signal_threshold)
{
doSend();
}
} }
void getReceivePtr(Uint32 ** ptr, Uint32 ** eod){ void getReceivePtr(Uint32 ** ptr, Uint32 ** eod){
...@@ -123,28 +130,36 @@ protected: ...@@ -123,28 +130,36 @@ protected:
*/ */
void setupBuffers(); void setupBuffers();
/**
* doSend (i.e signal receiver)
*/
void doSend();
int m_remote_pid;
Uint32 m_last_signal;
Uint32 m_signal_threshold;
private: private:
bool _shmSegCreated; bool _shmSegCreated;
bool _attached; bool _attached;
bool m_connected; bool m_connected;
key_t shmKey; key_t shmKey;
volatile Uint32 * serverStatusFlag; volatile Uint32 * serverStatusFlag;
volatile Uint32 * clientStatusFlag; volatile Uint32 * clientStatusFlag;
bool setupBuffersDone; bool setupBuffersDone;
#ifdef NDB_WIN32 #ifdef NDB_WIN32
HANDLE hFileMapping; HANDLE hFileMapping;
#else #else
int shmId; int shmId;
#endif #endif
int shmSize; int shmSize;
char * shmBuf; char * shmBuf;
SHM_Reader * reader; SHM_Reader * reader;
SHM_Writer * writer; SHM_Writer * writer;
/** /**
* @return - True if the reader has data to read on its segment. * @return - True if the reader has data to read on its segment.
*/ */
......
...@@ -47,6 +47,8 @@ ...@@ -47,6 +47,8 @@
#include <InputStream.hpp> #include <InputStream.hpp>
#include <OutputStream.hpp> #include <OutputStream.hpp>
int g_shm_pid = 0;
SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd) SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
{ {
DBUG_ENTER("SocketServer::Session * TransporterService::newSession"); DBUG_ENTER("SocketServer::Session * TransporterService::newSession");
...@@ -622,11 +624,28 @@ TransporterRegistry::pollReceive(Uint32 timeOutMillis){ ...@@ -622,11 +624,28 @@ TransporterRegistry::pollReceive(Uint32 timeOutMillis){
return retVal; return retVal;
#endif #endif
if((nSHMTransporters+nSCITransporters) > 0) if((nSCITransporters) > 0)
{
timeOutMillis=0; timeOutMillis=0;
}
#ifdef NDB_SHM_TRANSPORTER
if(nSHMTransporters > 0)
{
Uint32 res = poll_SHM(0);
if(res)
{
retVal |= res;
timeOutMillis = 0;
}
}
#endif
#ifdef NDB_TCP_TRANSPORTER #ifdef NDB_TCP_TRANSPORTER
if(nTCPTransporters > 0) if(nTCPTransporters > 0 || retVal == 0)
{
retVal |= poll_TCP(timeOutMillis); retVal |= poll_TCP(timeOutMillis);
}
else else
tcpReadSelectReply = 0; tcpReadSelectReply = 0;
#endif #endif
...@@ -635,8 +654,11 @@ TransporterRegistry::pollReceive(Uint32 timeOutMillis){ ...@@ -635,8 +654,11 @@ TransporterRegistry::pollReceive(Uint32 timeOutMillis){
retVal |= poll_SCI(timeOutMillis); retVal |= poll_SCI(timeOutMillis);
#endif #endif
#ifdef NDB_SHM_TRANSPORTER #ifdef NDB_SHM_TRANSPORTER
if(nSHMTransporters > 0) if(nSHMTransporters > 0 && retVal == 0)
retVal |= poll_SHM(timeOutMillis); {
int res = poll_SHM(0);
retVal |= res;
}
#endif #endif
return retVal; return retVal;
} }
...@@ -644,8 +666,8 @@ TransporterRegistry::pollReceive(Uint32 timeOutMillis){ ...@@ -644,8 +666,8 @@ TransporterRegistry::pollReceive(Uint32 timeOutMillis){
#ifdef NDB_SCI_TRANSPORTER #ifdef NDB_SCI_TRANSPORTER
Uint32 Uint32
TransporterRegistry::poll_SCI(Uint32 timeOutMillis){ TransporterRegistry::poll_SCI(Uint32 timeOutMillis)
{
for (int i=0; i<nSCITransporters; i++) { for (int i=0; i<nSCITransporters; i++) {
SCI_Transporter * t = theSCITransporters[i]; SCI_Transporter * t = theSCITransporters[i];
if (t->isConnected()) { if (t->isConnected()) {
...@@ -659,73 +681,29 @@ TransporterRegistry::poll_SCI(Uint32 timeOutMillis){ ...@@ -659,73 +681,29 @@ TransporterRegistry::poll_SCI(Uint32 timeOutMillis){
#ifdef NDB_SHM_TRANSPORTER #ifdef NDB_SHM_TRANSPORTER
static int g_shm_counter = 0;
Uint32 Uint32
TransporterRegistry::poll_SHM(Uint32 timeOutMillis) TransporterRegistry::poll_SHM(Uint32 timeOutMillis)
{ {
for(int j=0; j < 20; j++) for(int j=0; j < 100; j++)
for (int i=0; i<nSHMTransporters; i++) { {
SHM_Transporter * t = theSHMTransporters[i];
if (t->isConnected()) {
if(t->hasDataToRead()) {
return 1;
}
}
}
/**
* @note: granularity of linux/i386 timer is not good enough.
* Can't sleep if using SHM as it is now.
*/
/*
if(timeOutMillis > 0)
NdbSleep_MilliSleep(timeOutMillis);
else
NdbSleep_MilliSleep(1);
*/
return 0;
#if 0
NDB_TICKS startTime = NdbTick_CurrentMillisecond();
for(int i=0; i<100; i++) {
for (int i=0; i<nSHMTransporters; i++) { for (int i=0; i<nSHMTransporters; i++) {
SHM_Transporter * t = theSHMTransporters[i]; SHM_Transporter * t = theSHMTransporters[i];
if (t->isConnected()) { if (t->isConnected()) {
if(t->hasDataToRead()){ if(t->hasDataToRead()) {
return 1; return 1;
} }
else
continue;
}
else
continue;
}
if(NdbTick_CurrentMillisecond() > (startTime +timeOutMillis))
return 0;
}
NdbSleep_MilliSleep(5);
return 0;
#endif
#if 0
for(int j=0; j < 100; j++) {
for (int i=0; i<nSHMTransporters; i++) {
SHM_Transporter * t = theSHMTransporters[i];
if (t->isConnected()) {
if(t->hasDataToRead())
return 1;
} }
} }
} }
return 0; return 0;
#endif
} }
#endif #endif
#ifdef NDB_OSE_TRANSPORTER #ifdef NDB_OSE_TRANSPORTER
Uint32 Uint32
TransporterRegistry::poll_OSE(Uint32 timeOutMillis){ TransporterRegistry::poll_OSE(Uint32 timeOutMillis)
{
if(theOSEReceiver != NULL){ if(theOSEReceiver != NULL){
return theOSEReceiver->doReceive(timeOutMillis); return theOSEReceiver->doReceive(timeOutMillis);
} }
...@@ -736,18 +714,18 @@ TransporterRegistry::poll_OSE(Uint32 timeOutMillis){ ...@@ -736,18 +714,18 @@ TransporterRegistry::poll_OSE(Uint32 timeOutMillis){
#ifdef NDB_TCP_TRANSPORTER #ifdef NDB_TCP_TRANSPORTER
Uint32 Uint32
TransporterRegistry::poll_TCP(Uint32 timeOutMillis){ TransporterRegistry::poll_TCP(Uint32 timeOutMillis)
{
if (nTCPTransporters == 0){ if (false && nTCPTransporters == 0)
{
tcpReadSelectReply = 0; tcpReadSelectReply = 0;
return 0; return 0;
} }
struct timeval timeout; struct timeval timeout;
#ifdef NDB_OSE #ifdef NDB_OSE
// Return directly if there are no TCP transporters configured // Return directly if there are no TCP transporters configured
if(timeOutMillis <= 1){ if(timeOutMillis <= 1){
timeout.tv_sec = 0; timeout.tv_sec = 0;
timeout.tv_usec = 1025; timeout.tv_usec = 1025;
...@@ -760,7 +738,7 @@ TransporterRegistry::poll_TCP(Uint32 timeOutMillis){ ...@@ -760,7 +738,7 @@ TransporterRegistry::poll_TCP(Uint32 timeOutMillis){
timeout.tv_usec = (timeOutMillis % 1000) * 1000; timeout.tv_usec = (timeOutMillis % 1000) * 1000;
#endif #endif
NDB_SOCKET_TYPE maxSocketValue = 0; NDB_SOCKET_TYPE maxSocketValue = -1;
// Needed for TCP/IP connections // Needed for TCP/IP connections
// The read- and writeset are used by select // The read- and writeset are used by select
...@@ -788,23 +766,29 @@ TransporterRegistry::poll_TCP(Uint32 timeOutMillis){ ...@@ -788,23 +766,29 @@ TransporterRegistry::poll_TCP(Uint32 timeOutMillis){
maxSocketValue++; maxSocketValue++;
tcpReadSelectReply = select(maxSocketValue, &tcpReadset, 0, 0, &timeout); tcpReadSelectReply = select(maxSocketValue, &tcpReadset, 0, 0, &timeout);
if(false && tcpReadSelectReply == -1 && errno == EINTR)
ndbout_c("woke-up by signal");
#ifdef NDB_WIN32 #ifdef NDB_WIN32
if(tcpReadSelectReply == SOCKET_ERROR) if(tcpReadSelectReply == SOCKET_ERROR)
{ {
NdbSleep_MilliSleep(timeOutMillis); NdbSleep_MilliSleep(timeOutMillis);
} }
#endif #endif
return tcpReadSelectReply; return tcpReadSelectReply;
} }
#endif #endif
void void
TransporterRegistry::performReceive(){ TransporterRegistry::performReceive()
{
#ifdef NDB_OSE_TRANSPORTER #ifdef NDB_OSE_TRANSPORTER
if(theOSEReceiver != 0){ if(theOSEReceiver != 0)
while(theOSEReceiver->hasData()){ {
while(theOSEReceiver->hasData())
{
NodeId remoteNodeId; NodeId remoteNodeId;
Uint32 * readPtr; Uint32 * readPtr;
Uint32 sz = theOSEReceiver->getReceiveData(&remoteNodeId, &readPtr); Uint32 sz = theOSEReceiver->getReceiveData(&remoteNodeId, &readPtr);
...@@ -827,16 +811,20 @@ TransporterRegistry::performReceive(){ ...@@ -827,16 +811,20 @@ TransporterRegistry::performReceive(){
#endif #endif
#ifdef NDB_TCP_TRANSPORTER #ifdef NDB_TCP_TRANSPORTER
if(tcpReadSelectReply > 0){ if(tcpReadSelectReply > 0)
for (int i=0; i<nTCPTransporters; i++) { {
for (int i=0; i<nTCPTransporters; i++)
{
checkJobBuffer(); checkJobBuffer();
TCP_Transporter *t = theTCPTransporters[i]; TCP_Transporter *t = theTCPTransporters[i];
const NodeId nodeId = t->getRemoteNodeId(); const NodeId nodeId = t->getRemoteNodeId();
const NDB_SOCKET_TYPE socket = t->getSocket(); const NDB_SOCKET_TYPE socket = t->getSocket();
if(is_connected(nodeId)){ if(is_connected(nodeId)){
if(t->isConnected() && FD_ISSET(socket, &tcpReadset)) { if(t->isConnected() && FD_ISSET(socket, &tcpReadset))
{
const int receiveSize = t->doReceive(); const int receiveSize = t->doReceive();
if(receiveSize > 0){ if(receiveSize > 0)
{
Uint32 * ptr; Uint32 * ptr;
Uint32 sz = t->getReceiveData(&ptr); Uint32 sz = t->getReceiveData(&ptr);
Uint32 szUsed = unpack(ptr, sz, nodeId, ioStates[nodeId]); Uint32 szUsed = unpack(ptr, sz, nodeId, ioStates[nodeId]);
...@@ -847,142 +835,165 @@ TransporterRegistry::performReceive(){ ...@@ -847,142 +835,165 @@ TransporterRegistry::performReceive(){
} }
} }
#endif #endif
#ifdef NDB_SCI_TRANSPORTER #ifdef NDB_SCI_TRANSPORTER
//performReceive //performReceive
//do prepareReceive on the SCI transporters (prepareReceive(t,,,,)) //do prepareReceive on the SCI transporters (prepareReceive(t,,,,))
for (int i=0; i<nSCITransporters; i++) { for (int i=0; i<nSCITransporters; i++)
checkJobBuffer(); {
SCI_Transporter *t = theSCITransporters[i]; checkJobBuffer();
const NodeId nodeId = t->getRemoteNodeId(); SCI_Transporter *t = theSCITransporters[i];
if(is_connected(nodeId)){ const NodeId nodeId = t->getRemoteNodeId();
if(t->isConnected() && t->checkConnected()){ if(is_connected(nodeId))
Uint32 * readPtr, * eodPtr; {
t->getReceivePtr(&readPtr, &eodPtr); if(t->isConnected() && t->checkConnected())
Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]); {
t->updateReceivePtr(newPtr); Uint32 * readPtr, * eodPtr;
} t->getReceivePtr(&readPtr, &eodPtr);
} Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
} t->updateReceivePtr(newPtr);
}
}
}
#endif #endif
#ifdef NDB_SHM_TRANSPORTER #ifdef NDB_SHM_TRANSPORTER
for (int i=0; i<nSHMTransporters; i++) { for (int i=0; i<nSHMTransporters; i++)
checkJobBuffer(); {
SHM_Transporter *t = theSHMTransporters[i]; checkJobBuffer();
const NodeId nodeId = t->getRemoteNodeId(); SHM_Transporter *t = theSHMTransporters[i];
if(is_connected(nodeId)){ const NodeId nodeId = t->getRemoteNodeId();
if(t->isConnected() && t->checkConnected()){ if(is_connected(nodeId)){
Uint32 * readPtr, * eodPtr; if(t->isConnected() && t->checkConnected())
t->getReceivePtr(&readPtr, &eodPtr); {
Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]); Uint32 * readPtr, * eodPtr;
t->updateReceivePtr(newPtr); t->getReceivePtr(&readPtr, &eodPtr);
} Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
} t->updateReceivePtr(newPtr);
} }
}
}
#endif #endif
} }
static int x = 0; static int x = 0;
void void
TransporterRegistry::performSend(){ TransporterRegistry::performSend()
int i; {
sendCounter = 1; int i;
sendCounter = 1;
#ifdef NDB_OSE_TRANSPORTER #ifdef NDB_OSE_TRANSPORTER
for (int i = 0; i < nOSETransporters; i++){ for (int i = 0; i < nOSETransporters; i++)
OSE_Transporter *t = theOSETransporters[i]; {
if((is_connected(t->getRemoteNodeId()) && OSE_Transporter *t = theOSETransporters[i];
(t->isConnected())) { if(is_connected(t->getRemoteNodeId()) &&& (t->isConnected()))
t->doSend(); {
}//if t->doSend();
}//for }//if
}//for
#endif #endif
#ifdef NDB_TCP_TRANSPORTER #ifdef NDB_TCP_TRANSPORTER
#ifdef NDB_OSE #ifdef NDB_OSE
{
int maxSocketValue = 0;
// Needed for TCP/IP connections
// The writeset are used by select
fd_set writeset;
FD_ZERO(&writeset);
// Prepare for sending and receiving
for (i = 0; i < nTCPTransporters; i++) {
TCP_Transporter * t = theTCPTransporters[i];
// If the transporter is connected
if ((t->hasDataToSend()) && (t->isConnected())) {
const int socket = t->getSocket();
// Find the highest socket value. It will be used by select
if (socket > maxSocketValue) {
maxSocketValue = socket;
}//if
FD_SET(socket, &writeset);
}//if
}//for
// The highest socket value plus one
if(maxSocketValue == 0)
return;
maxSocketValue++;
struct timeval timeout = { 0, 1025 };
Uint32 tmp = select(maxSocketValue, 0, &writeset, 0, &timeout);
if (tmp == 0)
{ {
int maxSocketValue = 0; return;
}//if
// Needed for TCP/IP connections for (i = 0; i < nTCPTransporters; i++) {
// The writeset are used by select TCP_Transporter *t = theTCPTransporters[i];
fd_set writeset; const NodeId nodeId = t->getRemoteNodeId();
FD_ZERO(&writeset); const int socket = t->getSocket();
if(is_connected(nodeId)){
// Prepare for sending and receiving if(t->isConnected() && FD_ISSET(socket, &writeset)) {
for (i = 0; i < nTCPTransporters; i++) { t->doSend();
TCP_Transporter * t = theTCPTransporters[i]; }//if
}//if
// If the transporter is connected }//for
if ((t->hasDataToSend()) && (t->isConnected())) {
const int socket = t->getSocket();
// Find the highest socket value. It will be used by select
if (socket > maxSocketValue) {
maxSocketValue = socket;
}//if
FD_SET(socket, &writeset);
}//if
}//for
// The highest socket value plus one
if(maxSocketValue == 0)
return;
maxSocketValue++;
struct timeval timeout = { 0, 1025 };
Uint32 tmp = select(maxSocketValue, 0, &writeset, 0, &timeout);
if (tmp == 0) {
return;
}//if
for (i = 0; i < nTCPTransporters; i++) {
TCP_Transporter *t = theTCPTransporters[i];
const NodeId nodeId = t->getRemoteNodeId();
const int socket = t->getSocket();
if(is_connected(nodeId)){
if(t->isConnected() && FD_ISSET(socket, &writeset)) {
t->doSend();
}//if
}//if
}//for
} }
#endif #endif
#ifdef NDB_TCP_TRANSPORTER #ifdef NDB_TCP_TRANSPORTER
for (i = x; i < nTCPTransporters; i++) { for (i = x; i < nTCPTransporters; i++)
TCP_Transporter *t = theTCPTransporters[i]; {
if (t && TCP_Transporter *t = theTCPTransporters[i];
(t->hasDataToSend()) && if (t && t->hasDataToSend() && t->isConnected() &&
(t->isConnected()) && is_connected(t->getRemoteNodeId()))
(is_connected(t->getRemoteNodeId()))) { {
t->doSend(); t->doSend();
}//if }
}//for }
for (i = 0; i < x && i < nTCPTransporters; i++) { for (i = 0; i < x && i < nTCPTransporters; i++)
TCP_Transporter *t = theTCPTransporters[i]; {
if (t && TCP_Transporter *t = theTCPTransporters[i];
(t->hasDataToSend()) && if (t && t->hasDataToSend() && t->isConnected() &&
(t->isConnected()) && is_connected(t->getRemoteNodeId()))
(is_connected(t->getRemoteNodeId()))) { {
t->doSend(); t->doSend();
}//if }
}//for }
x++; x++;
if (x == nTCPTransporters) x = 0; if (x == nTCPTransporters) x = 0;
#endif #endif
#endif #endif
#ifdef NDB_SCI_TRANSPORTER #ifdef NDB_SCI_TRANSPORTER
//scroll through the SCI transporters, //scroll through the SCI transporters,
// get each transporter, check if connected, send data // get each transporter, check if connected, send data
for (i=0; i<nSCITransporters; i++) { for (i=0; i<nSCITransporters; i++) {
SCI_Transporter *t = theSCITransporters[i]; SCI_Transporter *t = theSCITransporters[i];
const NodeId nodeId = t->getRemoteNodeId(); const NodeId nodeId = t->getRemoteNodeId();
if(is_connected(nodeId)){ if(is_connected(nodeId))
if(t->isConnected() && t->hasDataToSend()) { {
t->doSend(); if(t->isConnected() && t->hasDataToSend()) {
} //if t->doSend();
} //if } //if
} //if } //if
}
#endif
#ifdef NDB_SHM_TRANSPORTER
for (i=0; i<nSHMTransporters; i++)
{
SHM_Transporter *t = theSHMTransporters[i];
const NodeId nodeId = t->getRemoteNodeId();
if(is_connected(nodeId))
{
if(t->isConnected())
{
t->doSend();
}
}
}
#endif #endif
} }
...@@ -1169,7 +1180,8 @@ TransporterRegistry::stop_clients() ...@@ -1169,7 +1180,8 @@ TransporterRegistry::stop_clients()
} }
void void
TransporterRegistry::add_transporter_interface(const char *interface, unsigned short port) TransporterRegistry::add_transporter_interface(const char *interface,
unsigned short port)
{ {
DBUG_ENTER("TransporterRegistry::add_transporter_interface"); DBUG_ENTER("TransporterRegistry::add_transporter_interface");
DBUG_PRINT("enter",("interface=%s, port= %d", interface, port)); DBUG_PRINT("enter",("interface=%s, port= %d", interface, port));
...@@ -1232,6 +1244,15 @@ TransporterRegistry::start_service(SocketServer& socket_server) ...@@ -1232,6 +1244,15 @@ TransporterRegistry::start_service(SocketServer& socket_server)
return true; return true;
} }
#ifdef NDB_SHM_TRANSPORTER
static
RETSIGTYPE
shm_sig_handler(int signo)
{
g_shm_counter++;
}
#endif
void void
TransporterRegistry::startReceiving() TransporterRegistry::startReceiving()
{ {
...@@ -1250,6 +1271,13 @@ TransporterRegistry::startReceiving() ...@@ -1250,6 +1271,13 @@ TransporterRegistry::startReceiving()
for(int i = 0; i<nTCPTransporters; i++) for(int i = 0; i<nTCPTransporters; i++)
theTCPTransporters[i]->theReceiverPid = theReceiverPid; theTCPTransporters[i]->theReceiverPid = theReceiverPid;
#endif #endif
#ifdef NDB_SHM_TRANSPORTER
if(nSHMTransporters)
{
signal(SIGUSR1, shm_sig_handler);
}
#endif
} }
void void
......
...@@ -49,6 +49,8 @@ void catchsigs(bool ignore); // for process signal handling ...@@ -49,6 +49,8 @@ void catchsigs(bool ignore); // for process signal handling
extern "C" void handler_shutdown(int signum); // for process signal handling extern "C" void handler_shutdown(int signum); // for process signal handling
extern "C" void handler_error(int signum); // for process signal handling extern "C" void handler_error(int signum); // for process signal handling
extern int g_shm_pid;
// Shows system information // Shows system information
void systemInfo(const Configuration & conf, void systemInfo(const Configuration & conf,
const LogLevel & ll); const LogLevel & ll);
...@@ -145,6 +147,7 @@ int main(int argc, char** argv) ...@@ -145,6 +147,7 @@ int main(int argc, char** argv)
} }
g_eventLogger.info("Angel pid: %d ndb pid: %d", getppid(), getpid()); g_eventLogger.info("Angel pid: %d ndb pid: %d", getppid(), getpid());
g_shm_pid = getpid();
theConfig->setupConfiguration(); theConfig->setupConfiguration();
systemInfo(* theConfig, * theConfig->m_logLevel); systemInfo(* theConfig, * theConfig->m_logLevel);
......
...@@ -82,7 +82,7 @@ static bool transformConnection(InitConfigFileParser::Context & ctx, const char ...@@ -82,7 +82,7 @@ static bool transformConnection(InitConfigFileParser::Context & ctx, const char
static bool applyDefaultValues(InitConfigFileParser::Context & ctx, const char *); static bool applyDefaultValues(InitConfigFileParser::Context & ctx, const char *);
static bool checkMandatory(InitConfigFileParser::Context & ctx, const char *); static bool checkMandatory(InitConfigFileParser::Context & ctx, const char *);
static bool fixPortNumber(InitConfigFileParser::Context & ctx, const char *); static bool fixPortNumber(InitConfigFileParser::Context & ctx, const char *);
static bool fixShmkey(InitConfigFileParser::Context & ctx, const char *); static bool fixShmKey(InitConfigFileParser::Context & ctx, const char *);
static bool checkDbConstraints(InitConfigFileParser::Context & ctx, const char *); static bool checkDbConstraints(InitConfigFileParser::Context & ctx, const char *);
static bool checkConnectionConstraints(InitConfigFileParser::Context &, const char *); static bool checkConnectionConstraints(InitConfigFileParser::Context &, const char *);
static bool checkTCPConstraints(InitConfigFileParser::Context &, const char *); static bool checkTCPConstraints(InitConfigFileParser::Context &, const char *);
...@@ -131,13 +131,15 @@ ConfigInfo::m_SectionRules[] = { ...@@ -131,13 +131,15 @@ ConfigInfo::m_SectionRules[] = {
{ "TCP", fixHostname, "HostName2" }, { "TCP", fixHostname, "HostName2" },
{ "SCI", fixHostname, "HostName1" }, { "SCI", fixHostname, "HostName1" },
{ "SCI", fixHostname, "HostName2" }, { "SCI", fixHostname, "HostName2" },
{ "SHM", fixHostname, "HostName1" },
{ "SHM", fixHostname, "HostName2" },
{ "OSE", fixHostname, "HostName1" }, { "OSE", fixHostname, "HostName1" },
{ "OSE", fixHostname, "HostName2" }, { "OSE", fixHostname, "HostName2" },
{ "TCP", fixPortNumber, 0 }, // has to come after fixHostName { "TCP", fixPortNumber, 0 }, // has to come after fixHostName
{ "SHM", fixPortNumber, 0 }, // has to come after fixHostName { "SHM", fixPortNumber, 0 }, // has to come after fixHostName
{ "SCI", fixPortNumber, 0 }, // has to come after fixHostName { "SCI", fixPortNumber, 0 }, // has to come after fixHostName
//{ "SHM", fixShmKey, 0 }, { "SHM", fixShmKey, 0 },
/** /**
* fixExtConnection must be after fixNodeId * fixExtConnection must be after fixNodeId
...@@ -168,6 +170,8 @@ ConfigInfo::m_SectionRules[] = { ...@@ -168,6 +170,8 @@ ConfigInfo::m_SectionRules[] = {
{ "TCP", checkTCPConstraints, "HostName2" }, { "TCP", checkTCPConstraints, "HostName2" },
{ "SCI", checkTCPConstraints, "HostName1" }, { "SCI", checkTCPConstraints, "HostName1" },
{ "SCI", checkTCPConstraints, "HostName2" }, { "SCI", checkTCPConstraints, "HostName2" },
{ "SHM", checkTCPConstraints, "HostName1" },
{ "SHM", checkTCPConstraints, "HostName2" },
{ "*", checkMandatory, 0 }, { "*", checkMandatory, 0 },
...@@ -1687,16 +1691,27 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { ...@@ -1687,16 +1691,27 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
0, 0 }, 0, 0 },
{ {
CFG_CONNECTION_NODE_1, CFG_CONNECTION_HOSTNAME_1,
"NodeId1", "HostName1",
"SHM", "SHM",
"Id of node ("DB_TOKEN_PRINT", "API_TOKEN_PRINT" or "MGM_TOKEN_PRINT") on one side of the connection", "Name/IP of computer on one side of the connection",
ConfigInfo::USED, ConfigInfo::INTERNAL,
false, false,
ConfigInfo::STRING, ConfigInfo::STRING,
MANDATORY, UNDEFINED,
0, 0 }, 0, 0 },
{
CFG_CONNECTION_HOSTNAME_2,
"HostName2",
"SHM",
"Name/IP of computer on one side of the connection",
ConfigInfo::INTERNAL,
false,
ConfigInfo::STRING,
UNDEFINED,
0, 0 },
{ {
CFG_CONNECTION_SERVER_PORT, CFG_CONNECTION_SERVER_PORT,
"PortNumber", "PortNumber",
...@@ -1709,6 +1724,17 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { ...@@ -1709,6 +1724,17 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
"0", "0",
STR_VALUE(MAX_INT_RNIL) }, STR_VALUE(MAX_INT_RNIL) },
{
CFG_CONNECTION_NODE_1,
"NodeId1",
"SHM",
"Id of node ("DB_TOKEN_PRINT", "API_TOKEN_PRINT" or "MGM_TOKEN_PRINT") on one side of the connection",
ConfigInfo::USED,
false,
ConfigInfo::STRING,
MANDATORY,
0, 0 },
{ {
CFG_CONNECTION_NODE_2, CFG_CONNECTION_NODE_2,
"NodeId2", "NodeId2",
...@@ -3025,15 +3051,32 @@ fixPortNumber(InitConfigFileParser::Context & ctx, const char * data){ ...@@ -3025,15 +3051,32 @@ fixPortNumber(InitConfigFileParser::Context & ctx, const char * data){
<< "per connection, please remove from config. " << "per connection, please remove from config. "
<< "Will be changed to " << port << endl; << "Will be changed to " << port << endl;
ctx.m_currentSection->put("PortNumber", port, true); ctx.m_currentSection->put("PortNumber", port, true);
} else }
else
{
ctx.m_currentSection->put("PortNumber", port); ctx.m_currentSection->put("PortNumber", port);
}
DBUG_PRINT("info", ("connection %d-%d port %d host %s", DBUG_PRINT("info", ("connection %d-%d port %d host %s",
id1, id2, port, hostname.c_str())); id1, id2, port, hostname.c_str()));
DBUG_RETURN(true); DBUG_RETURN(true);
} }
static
bool
fixShmKey(InitConfigFileParser::Context & ctx, const char *)
{
Uint32 id1= 0, id2= 0, key= 0;
require(ctx.m_currentSection->get("NodeId1", &id1));
require(ctx.m_currentSection->get("NodeId2", &id2));
if(ctx.m_currentSection->get("ShmKey", &key))
return true;
key= (id1 > id2 ? id1 << 16 | id2 : id2 << 16 | id1);
ctx.m_currentSection->put("ShmKey", key);
return true;
}
/** /**
* DB Node rule: Check various constraints * DB Node rule: Check various constraints
*/ */
......
...@@ -450,8 +450,12 @@ runReceiveResponse_C(void * me) ...@@ -450,8 +450,12 @@ runReceiveResponse_C(void * me)
return me; return me;
} }
extern int g_shm_pid;
void TransporterFacade::threadMainReceive(void) void TransporterFacade::threadMainReceive(void)
{ {
g_shm_pid = getpid();
theTransporterRegistry->startReceiving(); theTransporterRegistry->startReceiving();
NdbMutex_Lock(theMutexPtr); NdbMutex_Lock(theMutexPtr);
theTransporterRegistry->update_connections(); theTransporterRegistry->update_connections();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment