Commit 8ff19719 authored by unknown's avatar unknown

Merge mysql.com:/home/jonas/src/mysql-4.1-fix

into mysql.com:/home/jonas/src/mysql-5.0-ndb


ndb/src/kernel/main.cpp:
  Auto merged
ndb/src/ndbapi/TransporterFacade.cpp:
  Auto merged
parents a59a6361 f507b377
......@@ -26,6 +26,8 @@
#include <InputStream.hpp>
#include <OutputStream.hpp>
extern int g_shm_pid;
SHM_Transporter::SHM_Transporter(TransporterRegistry &t_reg,
const char *lHostName,
const char *rHostName,
......@@ -52,6 +54,7 @@ SHM_Transporter::SHM_Transporter(TransporterRegistry &t_reg,
#ifdef DEBUG_TRANSPORTER
printf("shm key (%d - %d) = %d\n", lNodeId, rNodeId, shmKey);
#endif
m_signal_threshold = 4096;
}
SHM_Transporter::~SHM_Transporter(){
......@@ -182,42 +185,6 @@ SHM_Transporter::setupBuffers(){
#endif
}
#if 0
SendStatus
SHM_Transporter::prepareSend(const SignalHeader * const signalHeader,
Uint8 prio,
const Uint32 * const signalData,
const LinearSegmentPtr ptr[3],
bool force){
if(isConnected()){
const Uint32 lenBytes = m_packer.getMessageLength(signalHeader, ptr);
Uint32 * insertPtr = (Uint32 *)writer->getWritePtr(lenBytes);
if(insertPtr != 0){
m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);
/**
* Do funky membar stuff
*/
writer->updateWritePtr(lenBytes);
return SEND_OK;
} else {
// NdbSleep_MilliSleep(3);
//goto tryagain;
return SEND_BUFFER_FULL;
}
}
return SEND_DISCONNECTED;
}
#endif
bool
SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
{
......@@ -247,10 +214,17 @@ SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
}
// Send ok to client
s_output.println("shm server 1 ok");
s_output.println("shm server 1 ok: %d", g_shm_pid);
// Wait for ok from client
if (s_input.gets(buf, 256) == 0) {
if (s_input.gets(buf, 256) == 0)
{
NDB_CLOSE_SOCKET(sockfd);
DBUG_RETURN(false);
}
if(sscanf(buf, "shm client 1 ok: %d", &m_remote_pid) != 1)
{
NDB_CLOSE_SOCKET(sockfd);
DBUG_RETURN(false);
}
......@@ -289,6 +263,12 @@ SHM_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
DBUG_RETURN(false);
}
if(sscanf(buf, "shm server 1 ok: %d", &m_remote_pid) != 1)
{
NDB_CLOSE_SOCKET(sockfd);
DBUG_RETURN(false);
}
// Create
if(!_shmSegCreated){
if (!ndb_shm_get()) {
......@@ -313,7 +293,7 @@ SHM_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
}
// Send ok to server
s_output.println("shm client 1 ok");
s_output.println("shm client 1 ok: %d", g_shm_pid);
int r= connect_common(sockfd);
......@@ -344,18 +324,33 @@ SHM_Transporter::connect_common(NDB_SOCKET_TYPE sockfd)
return false;
}
if(!setupBuffersDone) {
if(!setupBuffersDone)
{
setupBuffers();
setupBuffersDone=true;
}
if(setupBuffersDone) {
if(setupBuffersDone)
{
NdbSleep_MilliSleep(m_timeOutMillis);
if(*serverStatusFlag == 1 && *clientStatusFlag == 1)
{
m_last_signal = 0;
return true;
}
}
DBUG_PRINT("error", ("Failed to set up buffers to node %d",
remoteNodeId));
return false;
}
void
SHM_Transporter::doSend()
{
if(m_last_signal)
{
m_last_signal = 0;
kill(m_remote_pid, SIGUSR1);
}
}
......@@ -53,12 +53,19 @@ public:
*/
bool initTransporter();
Uint32 * getWritePtr(Uint32 lenBytes, Uint32 prio){
Uint32 * getWritePtr(Uint32 lenBytes, Uint32 prio)
{
return (Uint32 *)writer->getWritePtr(lenBytes);
}
void updateWritePtr(Uint32 lenBytes, Uint32 prio){
void updateWritePtr(Uint32 lenBytes, Uint32 prio)
{
writer->updateWritePtr(lenBytes);
m_last_signal += lenBytes;
if(m_last_signal >= m_signal_threshold)
{
doSend();
}
}
void getReceivePtr(Uint32 ** ptr, Uint32 ** eod){
......@@ -123,6 +130,14 @@ protected:
*/
void setupBuffers();
/**
* doSend (i.e signal receiver)
*/
void doSend();
int m_remote_pid;
Uint32 m_last_signal;
Uint32 m_signal_threshold;
private:
bool _shmSegCreated;
bool _attached;
......
......@@ -47,6 +47,8 @@
#include <InputStream.hpp>
#include <OutputStream.hpp>
int g_shm_pid = 0;
SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
{
DBUG_ENTER("SocketServer::Session * TransporterService::newSession");
......@@ -622,11 +624,28 @@ TransporterRegistry::pollReceive(Uint32 timeOutMillis){
return retVal;
#endif
if((nSHMTransporters+nSCITransporters) > 0)
if((nSCITransporters) > 0)
{
timeOutMillis=0;
}
#ifdef NDB_SHM_TRANSPORTER
if(nSHMTransporters > 0)
{
Uint32 res = poll_SHM(0);
if(res)
{
retVal |= res;
timeOutMillis = 0;
}
}
#endif
#ifdef NDB_TCP_TRANSPORTER
if(nTCPTransporters > 0)
if(nTCPTransporters > 0 || retVal == 0)
{
retVal |= poll_TCP(timeOutMillis);
}
else
tcpReadSelectReply = 0;
#endif
......@@ -635,8 +654,11 @@ TransporterRegistry::pollReceive(Uint32 timeOutMillis){
retVal |= poll_SCI(timeOutMillis);
#endif
#ifdef NDB_SHM_TRANSPORTER
if(nSHMTransporters > 0)
retVal |= poll_SHM(timeOutMillis);
if(nSHMTransporters > 0 && retVal == 0)
{
int res = poll_SHM(0);
retVal |= res;
}
#endif
return retVal;
}
......@@ -644,8 +666,8 @@ TransporterRegistry::pollReceive(Uint32 timeOutMillis){
#ifdef NDB_SCI_TRANSPORTER
Uint32
TransporterRegistry::poll_SCI(Uint32 timeOutMillis){
TransporterRegistry::poll_SCI(Uint32 timeOutMillis)
{
for (int i=0; i<nSCITransporters; i++) {
SCI_Transporter * t = theSCITransporters[i];
if (t->isConnected()) {
......@@ -659,10 +681,12 @@ TransporterRegistry::poll_SCI(Uint32 timeOutMillis){
#ifdef NDB_SHM_TRANSPORTER
static int g_shm_counter = 0;
Uint32
TransporterRegistry::poll_SHM(Uint32 timeOutMillis)
{
for(int j=0; j < 20; j++)
for(int j=0; j < 100; j++)
{
for (int i=0; i<nSHMTransporters; i++) {
SHM_Transporter * t = theSHMTransporters[i];
if (t->isConnected()) {
......@@ -671,61 +695,15 @@ TransporterRegistry::poll_SHM(Uint32 timeOutMillis)
}
}
}
/**
* @note: granularity of linux/i386 timer is not good enough.
* Can't sleep if using SHM as it is now.
*/
/*
if(timeOutMillis > 0)
NdbSleep_MilliSleep(timeOutMillis);
else
NdbSleep_MilliSleep(1);
*/
return 0;
#if 0
NDB_TICKS startTime = NdbTick_CurrentMillisecond();
for(int i=0; i<100; i++) {
for (int i=0; i<nSHMTransporters; i++) {
SHM_Transporter * t = theSHMTransporters[i];
if (t->isConnected()) {
if(t->hasDataToRead()){
return 1;
}
else
continue;
}
else
continue;
}
if(NdbTick_CurrentMillisecond() > (startTime +timeOutMillis))
return 0;
}
NdbSleep_MilliSleep(5);
return 0;
#endif
#if 0
for(int j=0; j < 100; j++) {
for (int i=0; i<nSHMTransporters; i++) {
SHM_Transporter * t = theSHMTransporters[i];
if (t->isConnected()) {
if(t->hasDataToRead())
return 1;
}
}
}
return 0;
#endif
}
#endif
#ifdef NDB_OSE_TRANSPORTER
Uint32
TransporterRegistry::poll_OSE(Uint32 timeOutMillis){
TransporterRegistry::poll_OSE(Uint32 timeOutMillis)
{
if(theOSEReceiver != NULL){
return theOSEReceiver->doReceive(timeOutMillis);
}
......@@ -736,16 +714,16 @@ TransporterRegistry::poll_OSE(Uint32 timeOutMillis){
#ifdef NDB_TCP_TRANSPORTER
Uint32
TransporterRegistry::poll_TCP(Uint32 timeOutMillis){
if (nTCPTransporters == 0){
TransporterRegistry::poll_TCP(Uint32 timeOutMillis)
{
if (false && nTCPTransporters == 0)
{
tcpReadSelectReply = 0;
return 0;
}
struct timeval timeout;
#ifdef NDB_OSE
// Return directly if there are no TCP transporters configured
if(timeOutMillis <= 1){
......@@ -760,7 +738,7 @@ TransporterRegistry::poll_TCP(Uint32 timeOutMillis){
timeout.tv_usec = (timeOutMillis % 1000) * 1000;
#endif
NDB_SOCKET_TYPE maxSocketValue = 0;
NDB_SOCKET_TYPE maxSocketValue = -1;
// Needed for TCP/IP connections
// The read- and writeset are used by select
......@@ -788,6 +766,9 @@ TransporterRegistry::poll_TCP(Uint32 timeOutMillis){
maxSocketValue++;
tcpReadSelectReply = select(maxSocketValue, &tcpReadset, 0, 0, &timeout);
if(false && tcpReadSelectReply == -1 && errno == EINTR)
ndbout_c("woke-up by signal");
#ifdef NDB_WIN32
if(tcpReadSelectReply == SOCKET_ERROR)
{
......@@ -801,10 +782,13 @@ TransporterRegistry::poll_TCP(Uint32 timeOutMillis){
void
TransporterRegistry::performReceive(){
TransporterRegistry::performReceive()
{
#ifdef NDB_OSE_TRANSPORTER
if(theOSEReceiver != 0){
while(theOSEReceiver->hasData()){
if(theOSEReceiver != 0)
{
while(theOSEReceiver->hasData())
{
NodeId remoteNodeId;
Uint32 * readPtr;
Uint32 sz = theOSEReceiver->getReceiveData(&remoteNodeId, &readPtr);
......@@ -827,16 +811,20 @@ TransporterRegistry::performReceive(){
#endif
#ifdef NDB_TCP_TRANSPORTER
if(tcpReadSelectReply > 0){
for (int i=0; i<nTCPTransporters; i++) {
if(tcpReadSelectReply > 0)
{
for (int i=0; i<nTCPTransporters; i++)
{
checkJobBuffer();
TCP_Transporter *t = theTCPTransporters[i];
const NodeId nodeId = t->getRemoteNodeId();
const NDB_SOCKET_TYPE socket = t->getSocket();
if(is_connected(nodeId)){
if(t->isConnected() && FD_ISSET(socket, &tcpReadset)) {
if(t->isConnected() && FD_ISSET(socket, &tcpReadset))
{
const int receiveSize = t->doReceive();
if(receiveSize > 0){
if(receiveSize > 0)
{
Uint32 * ptr;
Uint32 sz = t->getReceiveData(&ptr);
Uint32 szUsed = unpack(ptr, sz, nodeId, ioStates[nodeId]);
......@@ -848,16 +836,18 @@ TransporterRegistry::performReceive(){
}
#endif
#ifdef NDB_SCI_TRANSPORTER
//performReceive
//do prepareReceive on the SCI transporters (prepareReceive(t,,,,))
for (int i=0; i<nSCITransporters; i++) {
for (int i=0; i<nSCITransporters; i++)
{
checkJobBuffer();
SCI_Transporter *t = theSCITransporters[i];
const NodeId nodeId = t->getRemoteNodeId();
if(is_connected(nodeId)){
if(t->isConnected() && t->checkConnected()){
if(is_connected(nodeId))
{
if(t->isConnected() && t->checkConnected())
{
Uint32 * readPtr, * eodPtr;
t->getReceivePtr(&readPtr, &eodPtr);
Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
......@@ -867,12 +857,14 @@ TransporterRegistry::performReceive(){
}
#endif
#ifdef NDB_SHM_TRANSPORTER
for (int i=0; i<nSHMTransporters; i++) {
for (int i=0; i<nSHMTransporters; i++)
{
checkJobBuffer();
SHM_Transporter *t = theSHMTransporters[i];
const NodeId nodeId = t->getRemoteNodeId();
if(is_connected(nodeId)){
if(t->isConnected() && t->checkConnected()){
if(t->isConnected() && t->checkConnected())
{
Uint32 * readPtr, * eodPtr;
t->getReceivePtr(&readPtr, &eodPtr);
Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
......@@ -885,15 +877,17 @@ TransporterRegistry::performReceive(){
static int x = 0;
void
TransporterRegistry::performSend(){
TransporterRegistry::performSend()
{
int i;
sendCounter = 1;
#ifdef NDB_OSE_TRANSPORTER
for (int i = 0; i < nOSETransporters; i++){
for (int i = 0; i < nOSETransporters; i++)
{
OSE_Transporter *t = theOSETransporters[i];
if((is_connected(t->getRemoteNodeId()) &&
(t->isConnected())) {
if(is_connected(t->getRemoteNodeId()) &&& (t->isConnected()))
{
t->doSend();
}//if
}//for
......@@ -932,7 +926,8 @@ TransporterRegistry::performSend(){
struct timeval timeout = { 0, 1025 };
Uint32 tmp = select(maxSocketValue, 0, &writeset, 0, &timeout);
if (tmp == 0) {
if (tmp == 0)
{
return;
}//if
for (i = 0; i < nTCPTransporters; i++) {
......@@ -948,24 +943,24 @@ TransporterRegistry::performSend(){
}
#endif
#ifdef NDB_TCP_TRANSPORTER
for (i = x; i < nTCPTransporters; i++) {
for (i = x; i < nTCPTransporters; i++)
{
TCP_Transporter *t = theTCPTransporters[i];
if (t &&
(t->hasDataToSend()) &&
(t->isConnected()) &&
(is_connected(t->getRemoteNodeId()))) {
if (t && t->hasDataToSend() && t->isConnected() &&
is_connected(t->getRemoteNodeId()))
{
t->doSend();
}//if
}//for
for (i = 0; i < x && i < nTCPTransporters; i++) {
}
}
for (i = 0; i < x && i < nTCPTransporters; i++)
{
TCP_Transporter *t = theTCPTransporters[i];
if (t &&
(t->hasDataToSend()) &&
(t->isConnected()) &&
(is_connected(t->getRemoteNodeId()))) {
if (t && t->hasDataToSend() && t->isConnected() &&
is_connected(t->getRemoteNodeId()))
{
t->doSend();
}//if
}//for
}
}
x++;
if (x == nTCPTransporters) x = 0;
#endif
......@@ -977,12 +972,28 @@ TransporterRegistry::performSend(){
SCI_Transporter *t = theSCITransporters[i];
const NodeId nodeId = t->getRemoteNodeId();
if(is_connected(nodeId)){
if(is_connected(nodeId))
{
if(t->isConnected() && t->hasDataToSend()) {
t->doSend();
} //if
} //if
} //if
}
#endif
#ifdef NDB_SHM_TRANSPORTER
for (i=0; i<nSHMTransporters; i++)
{
SHM_Transporter *t = theSHMTransporters[i];
const NodeId nodeId = t->getRemoteNodeId();
if(is_connected(nodeId))
{
if(t->isConnected())
{
t->doSend();
}
}
}
#endif
}
......@@ -1169,7 +1180,8 @@ TransporterRegistry::stop_clients()
}
void
TransporterRegistry::add_transporter_interface(const char *interface, unsigned short port)
TransporterRegistry::add_transporter_interface(const char *interface,
unsigned short port)
{
DBUG_ENTER("TransporterRegistry::add_transporter_interface");
DBUG_PRINT("enter",("interface=%s, port= %d", interface, port));
......@@ -1232,6 +1244,15 @@ TransporterRegistry::start_service(SocketServer& socket_server)
return true;
}
#ifdef NDB_SHM_TRANSPORTER
static
RETSIGTYPE
shm_sig_handler(int signo)
{
g_shm_counter++;
}
#endif
void
TransporterRegistry::startReceiving()
{
......@@ -1250,6 +1271,13 @@ TransporterRegistry::startReceiving()
for(int i = 0; i<nTCPTransporters; i++)
theTCPTransporters[i]->theReceiverPid = theReceiverPid;
#endif
#ifdef NDB_SHM_TRANSPORTER
if(nSHMTransporters)
{
signal(SIGUSR1, shm_sig_handler);
}
#endif
}
void
......
......@@ -48,6 +48,8 @@ void catchsigs(bool ignore); // for process signal handling
extern "C" void handler_shutdown(int signum); // for process signal handling
extern "C" void handler_error(int signum); // for process signal handling
extern int g_shm_pid;
// Shows system information
void systemInfo(const Configuration & conf,
const LogLevel & ll);
......@@ -137,6 +139,7 @@ int main(int argc, char** argv)
}
g_eventLogger.info("Angel pid: %d ndb pid: %d", getppid(), getpid());
g_shm_pid = getpid();
theConfig->setupConfiguration();
systemInfo(* theConfig, * theConfig->m_logLevel);
......
......@@ -82,7 +82,7 @@ static bool transformConnection(InitConfigFileParser::Context & ctx, const char
static bool applyDefaultValues(InitConfigFileParser::Context & ctx, const char *);
static bool checkMandatory(InitConfigFileParser::Context & ctx, const char *);
static bool fixPortNumber(InitConfigFileParser::Context & ctx, const char *);
static bool fixShmkey(InitConfigFileParser::Context & ctx, const char *);
static bool fixShmKey(InitConfigFileParser::Context & ctx, const char *);
static bool checkDbConstraints(InitConfigFileParser::Context & ctx, const char *);
static bool checkConnectionConstraints(InitConfigFileParser::Context &, const char *);
static bool checkTCPConstraints(InitConfigFileParser::Context &, const char *);
......@@ -131,13 +131,15 @@ ConfigInfo::m_SectionRules[] = {
{ "TCP", fixHostname, "HostName2" },
{ "SCI", fixHostname, "HostName1" },
{ "SCI", fixHostname, "HostName2" },
{ "SHM", fixHostname, "HostName1" },
{ "SHM", fixHostname, "HostName2" },
{ "OSE", fixHostname, "HostName1" },
{ "OSE", fixHostname, "HostName2" },
{ "TCP", fixPortNumber, 0 }, // has to come after fixHostName
{ "SHM", fixPortNumber, 0 }, // has to come after fixHostName
{ "SCI", fixPortNumber, 0 }, // has to come after fixHostName
//{ "SHM", fixShmKey, 0 },
{ "SHM", fixShmKey, 0 },
/**
* fixExtConnection must be after fixNodeId
......@@ -168,6 +170,8 @@ ConfigInfo::m_SectionRules[] = {
{ "TCP", checkTCPConstraints, "HostName2" },
{ "SCI", checkTCPConstraints, "HostName1" },
{ "SCI", checkTCPConstraints, "HostName2" },
{ "SHM", checkTCPConstraints, "HostName1" },
{ "SHM", checkTCPConstraints, "HostName2" },
{ "*", checkMandatory, 0 },
......@@ -1687,14 +1691,25 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
0, 0 },
{
CFG_CONNECTION_NODE_1,
"NodeId1",
CFG_CONNECTION_HOSTNAME_1,
"HostName1",
"SHM",
"Id of node ("DB_TOKEN_PRINT", "API_TOKEN_PRINT" or "MGM_TOKEN_PRINT") on one side of the connection",
ConfigInfo::USED,
"Name/IP of computer on one side of the connection",
ConfigInfo::INTERNAL,
false,
ConfigInfo::STRING,
MANDATORY,
UNDEFINED,
0, 0 },
{
CFG_CONNECTION_HOSTNAME_2,
"HostName2",
"SHM",
"Name/IP of computer on one side of the connection",
ConfigInfo::INTERNAL,
false,
ConfigInfo::STRING,
UNDEFINED,
0, 0 },
{
......@@ -1709,6 +1724,17 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
"0",
STR_VALUE(MAX_INT_RNIL) },
{
CFG_CONNECTION_NODE_1,
"NodeId1",
"SHM",
"Id of node ("DB_TOKEN_PRINT", "API_TOKEN_PRINT" or "MGM_TOKEN_PRINT") on one side of the connection",
ConfigInfo::USED,
false,
ConfigInfo::STRING,
MANDATORY,
0, 0 },
{
CFG_CONNECTION_NODE_2,
"NodeId2",
......@@ -3025,15 +3051,32 @@ fixPortNumber(InitConfigFileParser::Context & ctx, const char * data){
<< "per connection, please remove from config. "
<< "Will be changed to " << port << endl;
ctx.m_currentSection->put("PortNumber", port, true);
} else
}
else
{
ctx.m_currentSection->put("PortNumber", port);
}
DBUG_PRINT("info", ("connection %d-%d port %d host %s",
id1, id2, port, hostname.c_str()));
DBUG_RETURN(true);
}
static
bool
fixShmKey(InitConfigFileParser::Context & ctx, const char *)
{
Uint32 id1= 0, id2= 0, key= 0;
require(ctx.m_currentSection->get("NodeId1", &id1));
require(ctx.m_currentSection->get("NodeId2", &id2));
if(ctx.m_currentSection->get("ShmKey", &key))
return true;
key= (id1 > id2 ? id1 << 16 | id2 : id2 << 16 | id1);
ctx.m_currentSection->put("ShmKey", key);
return true;
}
/**
* DB Node rule: Check various constraints
*/
......
......@@ -450,8 +450,12 @@ runReceiveResponse_C(void * me)
return me;
}
extern int g_shm_pid;
void TransporterFacade::threadMainReceive(void)
{
g_shm_pid = getpid();
theTransporterRegistry->startReceiving();
NdbMutex_Lock(theMutexPtr);
theTransporterRegistry->update_connections();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment