Commit fd8b0f26 authored by joreland@mysql.com's avatar joreland@mysql.com

ndb - Handle shm-transporter wo/ busy-wait + also

      handled mixed tcp/shm transporters
+ bug#7124
parent 4614cdfa
......@@ -26,6 +26,8 @@
#include <InputStream.hpp>
#include <OutputStream.hpp>
extern int g_shm_pid;
SHM_Transporter::SHM_Transporter(TransporterRegistry &t_reg,
const char *lHostName,
const char *rHostName,
......@@ -52,6 +54,7 @@ SHM_Transporter::SHM_Transporter(TransporterRegistry &t_reg,
#ifdef DEBUG_TRANSPORTER
printf("shm key (%d - %d) = %d\n", lNodeId, rNodeId, shmKey);
#endif
m_signal_threshold = 4096;
}
SHM_Transporter::~SHM_Transporter(){
......@@ -182,42 +185,6 @@ SHM_Transporter::setupBuffers(){
#endif
}
#if 0
SendStatus
SHM_Transporter::prepareSend(const SignalHeader * const signalHeader,
Uint8 prio,
const Uint32 * const signalData,
const LinearSegmentPtr ptr[3],
bool force){
if(isConnected()){
const Uint32 lenBytes = m_packer.getMessageLength(signalHeader, ptr);
Uint32 * insertPtr = (Uint32 *)writer->getWritePtr(lenBytes);
if(insertPtr != 0){
m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);
/**
* Do funky membar stuff
*/
writer->updateWritePtr(lenBytes);
return SEND_OK;
} else {
// NdbSleep_MilliSleep(3);
//goto tryagain;
return SEND_BUFFER_FULL;
}
}
return SEND_DISCONNECTED;
}
#endif
bool
SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
{
......@@ -247,10 +214,17 @@ SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
}
// Send ok to client
s_output.println("shm server 1 ok");
s_output.println("shm server 1 ok: %d", g_shm_pid);
// Wait for ok from client
if (s_input.gets(buf, 256) == 0) {
if (s_input.gets(buf, 256) == 0)
{
NDB_CLOSE_SOCKET(sockfd);
DBUG_RETURN(false);
}
if(sscanf(buf, "shm client 1 ok: %d", &m_remote_pid) != 1)
{
NDB_CLOSE_SOCKET(sockfd);
DBUG_RETURN(false);
}
......@@ -289,6 +263,12 @@ SHM_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
DBUG_RETURN(false);
}
if(sscanf(buf, "shm server 1 ok: %d", &m_remote_pid) != 1)
{
NDB_CLOSE_SOCKET(sockfd);
DBUG_RETURN(false);
}
// Create
if(!_shmSegCreated){
if (!ndb_shm_get()) {
......@@ -313,10 +293,10 @@ SHM_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
}
// Send ok to server
s_output.println("shm client 1 ok");
s_output.println("shm client 1 ok: %d", g_shm_pid);
int r= connect_common(sockfd);
if (r) {
// Wait for ok from server
if (s_input.gets(buf, 256) == 0) {
......@@ -344,18 +324,33 @@ SHM_Transporter::connect_common(NDB_SOCKET_TYPE sockfd)
return false;
}
if(!setupBuffersDone) {
if(!setupBuffersDone)
{
setupBuffers();
setupBuffersDone=true;
}
if(setupBuffersDone) {
if(setupBuffersDone)
{
NdbSleep_MilliSleep(m_timeOutMillis);
if(*serverStatusFlag == 1 && *clientStatusFlag == 1)
{
m_last_signal = 0;
return true;
}
}
DBUG_PRINT("error", ("Failed to set up buffers to node %d",
remoteNodeId));
return false;
}
void
SHM_Transporter::doSend()
{
if(m_last_signal)
{
m_last_signal = 0;
kill(m_remote_pid, SIGUSR1);
}
}
......@@ -47,18 +47,25 @@ public:
* SHM destructor
*/
virtual ~SHM_Transporter();
/**
* Do initialization
*/
bool initTransporter();
Uint32 * getWritePtr(Uint32 lenBytes, Uint32 prio){
Uint32 * getWritePtr(Uint32 lenBytes, Uint32 prio)
{
return (Uint32 *)writer->getWritePtr(lenBytes);
}
void updateWritePtr(Uint32 lenBytes, Uint32 prio){
void updateWritePtr(Uint32 lenBytes, Uint32 prio)
{
writer->updateWritePtr(lenBytes);
m_last_signal += lenBytes;
if(m_last_signal >= m_signal_threshold)
{
doSend();
}
}
void getReceivePtr(Uint32 ** ptr, Uint32 ** eod){
......@@ -123,28 +130,36 @@ protected:
*/
void setupBuffers();
/**
* doSend (i.e signal receiver)
*/
void doSend();
int m_remote_pid;
Uint32 m_last_signal;
Uint32 m_signal_threshold;
private:
bool _shmSegCreated;
bool _attached;
bool m_connected;
key_t shmKey;
volatile Uint32 * serverStatusFlag;
volatile Uint32 * clientStatusFlag;
bool setupBuffersDone;
#ifdef NDB_WIN32
HANDLE hFileMapping;
#else
int shmId;
#endif
int shmSize;
char * shmBuf;
SHM_Reader * reader;
SHM_Writer * writer;
/**
* @return - True if the reader has data to read on its segment.
*/
......
......@@ -49,6 +49,8 @@ void catchsigs(bool ignore); // for process signal handling
extern "C" void handler_shutdown(int signum); // for process signal handling
extern "C" void handler_error(int signum); // for process signal handling
extern int g_shm_pid;
// Shows system information
void systemInfo(const Configuration & conf,
const LogLevel & ll);
......@@ -145,6 +147,7 @@ int main(int argc, char** argv)
}
g_eventLogger.info("Angel pid: %d ndb pid: %d", getppid(), getpid());
g_shm_pid = getpid();
theConfig->setupConfiguration();
systemInfo(* theConfig, * theConfig->m_logLevel);
......
......@@ -82,7 +82,7 @@ static bool transformConnection(InitConfigFileParser::Context & ctx, const char
static bool applyDefaultValues(InitConfigFileParser::Context & ctx, const char *);
static bool checkMandatory(InitConfigFileParser::Context & ctx, const char *);
static bool fixPortNumber(InitConfigFileParser::Context & ctx, const char *);
static bool fixShmkey(InitConfigFileParser::Context & ctx, const char *);
static bool fixShmKey(InitConfigFileParser::Context & ctx, const char *);
static bool checkDbConstraints(InitConfigFileParser::Context & ctx, const char *);
static bool checkConnectionConstraints(InitConfigFileParser::Context &, const char *);
static bool checkTCPConstraints(InitConfigFileParser::Context &, const char *);
......@@ -131,13 +131,15 @@ ConfigInfo::m_SectionRules[] = {
{ "TCP", fixHostname, "HostName2" },
{ "SCI", fixHostname, "HostName1" },
{ "SCI", fixHostname, "HostName2" },
{ "SHM", fixHostname, "HostName1" },
{ "SHM", fixHostname, "HostName2" },
{ "OSE", fixHostname, "HostName1" },
{ "OSE", fixHostname, "HostName2" },
{ "TCP", fixPortNumber, 0 }, // has to come after fixHostName
{ "SHM", fixPortNumber, 0 }, // has to come after fixHostName
{ "SCI", fixPortNumber, 0 }, // has to come after fixHostName
//{ "SHM", fixShmKey, 0 },
{ "SHM", fixShmKey, 0 },
/**
* fixExtConnection must be after fixNodeId
......@@ -168,6 +170,8 @@ ConfigInfo::m_SectionRules[] = {
{ "TCP", checkTCPConstraints, "HostName2" },
{ "SCI", checkTCPConstraints, "HostName1" },
{ "SCI", checkTCPConstraints, "HostName2" },
{ "SHM", checkTCPConstraints, "HostName1" },
{ "SHM", checkTCPConstraints, "HostName2" },
{ "*", checkMandatory, 0 },
......@@ -1687,16 +1691,27 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
0, 0 },
{
CFG_CONNECTION_NODE_1,
"NodeId1",
CFG_CONNECTION_HOSTNAME_1,
"HostName1",
"SHM",
"Id of node ("DB_TOKEN_PRINT", "API_TOKEN_PRINT" or "MGM_TOKEN_PRINT") on one side of the connection",
ConfigInfo::USED,
"Name/IP of computer on one side of the connection",
ConfigInfo::INTERNAL,
false,
ConfigInfo::STRING,
MANDATORY,
UNDEFINED,
0, 0 },
{
CFG_CONNECTION_HOSTNAME_2,
"HostName2",
"SHM",
"Name/IP of computer on one side of the connection",
ConfigInfo::INTERNAL,
false,
ConfigInfo::STRING,
UNDEFINED,
0, 0 },
{
CFG_CONNECTION_SERVER_PORT,
"PortNumber",
......@@ -1709,6 +1724,17 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
"0",
STR_VALUE(MAX_INT_RNIL) },
{
CFG_CONNECTION_NODE_1,
"NodeId1",
"SHM",
"Id of node ("DB_TOKEN_PRINT", "API_TOKEN_PRINT" or "MGM_TOKEN_PRINT") on one side of the connection",
ConfigInfo::USED,
false,
ConfigInfo::STRING,
MANDATORY,
0, 0 },
{
CFG_CONNECTION_NODE_2,
"NodeId2",
......@@ -3025,15 +3051,32 @@ fixPortNumber(InitConfigFileParser::Context & ctx, const char * data){
<< "per connection, please remove from config. "
<< "Will be changed to " << port << endl;
ctx.m_currentSection->put("PortNumber", port, true);
} else
}
else
{
ctx.m_currentSection->put("PortNumber", port);
}
DBUG_PRINT("info", ("connection %d-%d port %d host %s",
id1, id2, port, hostname.c_str()));
DBUG_RETURN(true);
}
static
bool
fixShmKey(InitConfigFileParser::Context & ctx, const char *)
{
Uint32 id1= 0, id2= 0, key= 0;
require(ctx.m_currentSection->get("NodeId1", &id1));
require(ctx.m_currentSection->get("NodeId2", &id2));
if(ctx.m_currentSection->get("ShmKey", &key))
return true;
key= (id1 > id2 ? id1 << 16 | id2 : id2 << 16 | id1);
ctx.m_currentSection->put("ShmKey", key);
return true;
}
/**
* DB Node rule: Check various constraints
*/
......
......@@ -450,8 +450,12 @@ runReceiveResponse_C(void * me)
return me;
}
extern int g_shm_pid;
void TransporterFacade::threadMainReceive(void)
{
g_shm_pid = getpid();
theTransporterRegistry->startReceiving();
NdbMutex_Lock(theMutexPtr);
theTransporterRegistry->update_connections();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment