Commit 94836195 authored by unknown's avatar unknown

Bug #13054 start backup command return may be interrpted

parent d0b81ba3
......@@ -33,7 +33,7 @@ class DefineBackupReq {
friend bool printDEFINE_BACKUP_REQ(FILE *, const Uint32 *, Uint32, Uint16);
public:
STATIC_CONST( SignalLength = 8 + NdbNodeBitmask::Size);
STATIC_CONST( SignalLength = 9 + NdbNodeBitmask::Size);
private:
/**
......@@ -60,6 +60,13 @@ private:
* Length of backup data
*/
Uint32 backupDataLen;
/**
* Backup flags
*/
/* & 0x3 - waitCompleted
*/
Uint32 flags;
};
class DefineBackupRef {
......
......@@ -36,11 +36,14 @@ class BackupReq {
friend bool printBACKUP_REQ(FILE *, const Uint32 *, Uint32, Uint16);
public:
STATIC_CONST( SignalLength = 2 );
STATIC_CONST( SignalLength = 3 );
private:
Uint32 senderData;
Uint32 backupDataLen;
/* & 0x3 - waitCompleted
*/
Uint32 flags;
};
class BackupData {
......
......@@ -30,28 +30,12 @@
* from the failed NDB node
*
*/
class NFCompleteRep {
/**
* Sender(s)
*/
friend class Dbdict;
friend class Dblqh;
friend class Dbtc;
friend class Qmgr;
/**
* Sender/Reciver
*/
friend class Dbdih;
friend class ClusterMgr;
struct NFCompleteRep {
friend bool printNF_COMPLETE_REP(FILE *, const Uint32 *, Uint32, Uint16);
public:
STATIC_CONST( SignalLength = 5 );
private:
/**
* Which block has completed...
*
......
......@@ -24,34 +24,8 @@
* This signals is sent by Qmgr to NdbCntr
* and then from NdbCntr sent to: dih, dict, lqh, tc & API
*/
class NodeFailRep {
/**
* Sender(s)
*/
friend class Qmgr;
/**
* Sender(s) / Reciver(s)
*/
friend class Ndbcntr;
friend class Dbdict;
/**
* Reciver(s)
*/
friend class Dbdih;
friend class Dblqh;
friend class Dbtc;
friend class ClusterMgr;
friend class Trix;
friend class Backup;
friend class Suma;
friend class Grep;
friend class SafeCounterManager;
public:
struct NodeFailRep {
STATIC_CONST( SignalLength = 3 + NodeBitmask::Size );
private:
Uint32 failNo;
......
......@@ -69,6 +69,9 @@ static const Uint32 BACKUP_SEQUENCE = 0x1F000000;
static Uint32 g_TypeOfStart = NodeState::ST_ILLEGAL_TYPE;
#define SEND_BACKUP_STARTED_FLAG(A) (((A) & 0x3) > 0)
#define SEND_BACKUP_COMPLETED_FLAG(A) (((A) & 0x3) > 1)
void
Backup::execSTTOR(Signal* signal)
{
......@@ -852,7 +855,8 @@ Backup::execBACKUP_REQ(Signal* signal)
const Uint32 senderData = req->senderData;
const BlockReference senderRef = signal->senderBlockRef();
const Uint32 dataLen32 = req->backupDataLen; // In 32 bit words
const Uint32 flags = signal->getLength() > 2 ? req->flags : 2;
if(getOwnNodeId() != getMasterNodeId()) {
jam();
sendBackupRef(senderRef, signal, senderData, BackupRef::IAmNotMaster);
......@@ -894,6 +898,7 @@ Backup::execBACKUP_REQ(Signal* signal)
ptr.p->errorCode = 0;
ptr.p->clientRef = senderRef;
ptr.p->clientData = senderData;
ptr.p->flags = flags;
ptr.p->masterRef = reference();
ptr.p->nodes = c_aliveNodes;
ptr.p->backupId = 0;
......@@ -931,7 +936,10 @@ void
Backup::sendBackupRef(Signal* signal, BackupRecordPtr ptr, Uint32 errorCode)
{
jam();
sendBackupRef(ptr.p->clientRef, signal, ptr.p->clientData, errorCode);
if (SEND_BACKUP_STARTED_FLAG(ptr.p->flags))
{
sendBackupRef(ptr.p->clientRef, signal, ptr.p->clientData, errorCode);
}
cleanup(signal, ptr);
}
......@@ -1098,6 +1106,7 @@ Backup::sendDefineBackupReq(Signal *signal, BackupRecordPtr ptr)
req->backupKey[1] = ptr.p->backupKey[1];
req->nodes = ptr.p->nodes;
req->backupDataLen = ptr.p->backupDataLen;
req->flags = ptr.p->flags;
ptr.p->masterData.gsn = GSN_DEFINE_BACKUP_REQ;
ptr.p->masterData.sendCounter = ptr.p->nodes;
......@@ -1193,13 +1202,18 @@ Backup::defineBackupReply(Signal* signal, BackupRecordPtr ptr, Uint32 nodeId)
/**
* Reply to client
*/
CRASH_INSERTION((10034));
BackupConf * conf = (BackupConf*)signal->getDataPtrSend();
conf->backupId = ptr.p->backupId;
conf->senderData = ptr.p->clientData;
conf->nodes = ptr.p->nodes;
sendSignal(ptr.p->clientRef, GSN_BACKUP_CONF, signal,
BackupConf::SignalLength, JBB);
if (SEND_BACKUP_STARTED_FLAG(ptr.p->flags))
{
sendSignal(ptr.p->clientRef, GSN_BACKUP_CONF, signal,
BackupConf::SignalLength, JBB);
}
signal->theData[0] = EventReport::BackupStarted;
signal->theData[1] = ptr.p->clientRef;
signal->theData[2] = ptr.p->backupId;
......@@ -2090,9 +2104,12 @@ Backup::stopBackupReply(Signal* signal, BackupRecordPtr ptr, Uint32 nodeId)
rep->noOfLogBytes = ptr.p->noOfLogBytes;
rep->noOfLogRecords = ptr.p->noOfLogRecords;
rep->nodes = ptr.p->nodes;
sendSignal(ptr.p->clientRef, GSN_BACKUP_COMPLETE_REP, signal,
BackupCompleteRep::SignalLength, JBB);
if (SEND_BACKUP_COMPLETED_FLAG(ptr.p->flags))
{
sendSignal(ptr.p->clientRef, GSN_BACKUP_COMPLETE_REP, signal,
BackupCompleteRep::SignalLength, JBB);
}
signal->theData[0] = EventReport::BackupCompleted;
signal->theData[1] = ptr.p->clientRef;
signal->theData[2] = ptr.p->backupId;
......@@ -2133,9 +2150,11 @@ Backup::masterAbort(Signal* signal, BackupRecordPtr ptr)
rep->backupId = ptr.p->backupId;
rep->senderData = ptr.p->clientData;
rep->reason = ptr.p->errorCode;
sendSignal(ptr.p->clientRef, GSN_BACKUP_ABORT_REP, signal,
BackupAbortRep::SignalLength, JBB);
if (SEND_BACKUP_COMPLETED_FLAG(ptr.p->flags))
{
sendSignal(ptr.p->clientRef, GSN_BACKUP_ABORT_REP, signal,
BackupAbortRep::SignalLength, JBB);
}
signal->theData[0] = EventReport::BackupAborted;
signal->theData[1] = ptr.p->clientRef;
signal->theData[2] = ptr.p->backupId;
......@@ -2267,6 +2286,7 @@ Backup::execDEFINE_BACKUP_REQ(Signal* signal)
ptr.p->errorCode = 0;
ptr.p->clientRef = req->clientRef;
ptr.p->clientData = req->clientData;
ptr.p->flags = req->flags;
ptr.p->masterRef = senderRef;
ptr.p->nodes = req->nodes;
ptr.p->backupId = backupId;
......
......@@ -413,6 +413,7 @@ public:
Uint32 clientRef;
Uint32 clientData;
Uint32 flags;
Uint32 backupId;
Uint32 backupKey[2];
Uint32 masterRef;
......
......@@ -39,6 +39,8 @@
#include <signaldata/BackupSignalData.hpp>
#include <signaldata/GrepImpl.hpp>
#include <signaldata/ManagementServer.hpp>
#include <signaldata/NFCompleteRep.hpp>
#include <signaldata/NodeFailRep.hpp>
#include <NdbSleep.h>
#include <EventLogger.hpp>
#include <DebuggerNames.hpp>
......@@ -56,6 +58,8 @@
#include <mgmapi_config_parameters.h>
#include <m_string.h>
#include <SignalSender.hpp>
//#define MGM_SRV_DEBUG
#ifdef MGM_SRV_DEBUG
#define DEBUG(x) do ndbout << x << endl; while(0)
......@@ -709,6 +713,15 @@ int MgmtSrvr::okToSendTo(NodeId processId, bool unCond)
}
}
void report_unknown_signal(SimpleSignal *signal)
{
g_eventLogger.error("Unknown signal received. SignalNumber: "
"%i from (%d, %x)",
signal->readSignalNumber(),
refToNode(signal->header.theSendersBlockRef),
refToBlock(signal->header.theSendersBlockRef));
}
/*****************************************************************************
* Starting and stopping database nodes
****************************************************************************/
......@@ -1911,81 +1924,6 @@ MgmtSrvr::handleReceivedSignal(NdbApiSignal* signal)
}
break;
case GSN_BACKUP_CONF:{
const BackupConf * const conf =
CAST_CONSTPTR(BackupConf, signal->getDataPtr());
BackupEvent event;
event.Event = BackupEvent::BackupStarted;
event.Started.BackupId = conf->backupId;
event.Nodes = conf->nodes;
#ifdef VM_TRACE
ndbout_c("Backup master is %d", refToNode(signal->theSendersBlockRef));
#endif
backupCallback(event);
}
break;
case GSN_BACKUP_REF:{
const BackupRef * const ref =
CAST_CONSTPTR(BackupRef, signal->getDataPtr());
Uint32 errCode = ref->errorCode;
if(ref->errorCode == BackupRef::IAmNotMaster){
const Uint32 aNodeId = refToNode(ref->masterRef);
#ifdef VM_TRACE
ndbout_c("I'm not master resending to %d", aNodeId);
#endif
theWaitNode= aNodeId;
NdbApiSignal aSignal(_ownReference);
BackupReq* req = CAST_PTR(BackupReq, aSignal.getDataPtrSend());
aSignal.set(TestOrd::TraceAPI, BACKUP, GSN_BACKUP_REQ,
BackupReq::SignalLength);
req->senderData = 19;
req->backupDataLen = 0;
int i = theFacade->sendSignalUnCond(&aSignal, aNodeId);
if(i == 0){
return;
}
errCode = 5030;
}
BackupEvent event;
event.Event = BackupEvent::BackupFailedToStart;
event.FailedToStart.ErrorCode = errCode;
backupCallback(event);
break;
}
case GSN_BACKUP_ABORT_REP:{
const BackupAbortRep * const rep =
CAST_CONSTPTR(BackupAbortRep, signal->getDataPtr());
BackupEvent event;
event.Event = BackupEvent::BackupAborted;
event.Aborted.Reason = rep->reason;
event.Aborted.BackupId = rep->backupId;
event.Aborted.ErrorCode = rep->reason;
backupCallback(event);
}
break;
case GSN_BACKUP_COMPLETE_REP:{
const BackupCompleteRep * const rep =
CAST_CONSTPTR(BackupCompleteRep, signal->getDataPtr());
BackupEvent event;
event.Event = BackupEvent::BackupCompleted;
event.Completed.BackupId = rep->backupId;
event.Completed.NoOfBytes = rep->noOfBytes;
event.Completed.NoOfLogBytes = rep->noOfLogBytes;
event.Completed.NoOfRecords = rep->noOfRecords;
event.Completed.NoOfLogRecords = rep->noOfLogRecords;
event.Completed.stopGCP = rep->stopGCP;
event.Completed.startGCP = rep->startGCP;
event.Nodes = rep->nodes;
backupCallback(event);
}
break;
case GSN_MGM_LOCK_CONFIG_REP:
case GSN_MGM_LOCK_CONFIG_REQ:
case GSN_MGM_UNLOCK_CONFIG_REP:
......@@ -2446,6 +2384,9 @@ MgmtSrvr::eventReport(NodeId nodeId, const Uint32 * theData)
int
MgmtSrvr::startBackup(Uint32& backupId, int waitCompleted)
{
SignalSender ss(theFacade);
ss.lock(); // lock will be released on exit
bool next;
NodeId nodeId = 0;
while((next = getNextNodeId(&nodeId, NDB_MGM_NODE_TYPE_NDB)) == true &&
......@@ -2453,50 +2394,126 @@ MgmtSrvr::startBackup(Uint32& backupId, int waitCompleted)
if(!next) return NO_CONTACT_WITH_DB_NODES;
NdbApiSignal* signal = getSignal();
if (signal == NULL) {
return COULD_NOT_ALLOCATE_MEMORY;
}
SimpleSignal ssig;
BackupReq* req = CAST_PTR(BackupReq, signal->getDataPtrSend());
signal->set(TestOrd::TraceAPI, BACKUP, GSN_BACKUP_REQ,
BackupReq::SignalLength);
BackupReq* req = CAST_PTR(BackupReq, ssig.getDataPtrSend());
ssig.set(ss, TestOrd::TraceAPI, BACKUP, GSN_BACKUP_REQ,
BackupReq::SignalLength);
req->senderData = 19;
req->backupDataLen = 0;
assert(waitCompleted < 3);
req->flags = waitCompleted & 0x3;
int result;
if (waitCompleted == 2) {
result = sendRecSignal(nodeId, WAIT_BACKUP_COMPLETED,
signal, true, 48*60*60*1000 /* 48 hours */);
}
else if (waitCompleted == 1) {
result = sendRecSignal(nodeId, WAIT_BACKUP_STARTED,
signal, true, 5*60*1000 /*5 mins*/);
}
else {
result = sendRecSignal(nodeId, NO_WAIT, signal, true);
}
if (result == -1) {
return SEND_OR_RECEIVE_FAILED;
}
if (waitCompleted){
switch(m_lastBackupEvent.Event){
case BackupEvent::BackupCompleted:
backupId = m_lastBackupEvent.Completed.BackupId;
BackupEvent event;
int do_send = 1;
while (1) {
if (do_send)
{
SendStatus result = ss.sendSignal(nodeId, &ssig);
if (result != SEND_OK) {
return SEND_OR_RECEIVE_FAILED;
}
if (waitCompleted == 0)
return 0;
do_send = 0;
}
SimpleSignal *signal = ss.waitFor();
int gsn = signal->readSignalNumber();
switch (gsn) {
case GSN_BACKUP_CONF:{
const BackupConf * const conf =
CAST_CONSTPTR(BackupConf, signal->getDataPtr());
event.Event = BackupEvent::BackupStarted;
event.Started.BackupId = conf->backupId;
event.Nodes = conf->nodes;
#ifdef VM_TRACE
ndbout_c("Backup(%d) master is %d", conf->backupId,
refToNode(signal->header.theSendersBlockRef));
#endif
backupId = conf->backupId;
if (waitCompleted == 1)
return 0;
// wait for next signal
break;
case BackupEvent::BackupStarted:
backupId = m_lastBackupEvent.Started.BackupId;
}
case GSN_BACKUP_COMPLETE_REP:{
const BackupCompleteRep * const rep =
CAST_CONSTPTR(BackupCompleteRep, signal->getDataPtr());
#ifdef VM_TRACE
ndbout_c("Backup(%d) completed %d", rep->backupId);
#endif
event.Event = BackupEvent::BackupCompleted;
event.Completed.BackupId = rep->backupId;
event.Completed.NoOfBytes = rep->noOfBytes;
event.Completed.NoOfLogBytes = rep->noOfLogBytes;
event.Completed.NoOfRecords = rep->noOfRecords;
event.Completed.NoOfLogRecords = rep->noOfLogRecords;
event.Completed.stopGCP = rep->stopGCP;
event.Completed.startGCP = rep->startGCP;
event.Nodes = rep->nodes;
backupId = rep->backupId;
return 0;
}
case GSN_BACKUP_REF:{
const BackupRef * const ref =
CAST_CONSTPTR(BackupRef, signal->getDataPtr());
if(ref->errorCode == BackupRef::IAmNotMaster){
nodeId = refToNode(ref->masterRef);
#ifdef VM_TRACE
ndbout_c("I'm not master resending to %d", nodeId);
#endif
do_send = 1; // try again
continue;
}
event.Event = BackupEvent::BackupFailedToStart;
event.FailedToStart.ErrorCode = ref->errorCode;
return ref->errorCode;
}
case GSN_BACKUP_ABORT_REP:{
const BackupAbortRep * const rep =
CAST_CONSTPTR(BackupAbortRep, signal->getDataPtr());
event.Event = BackupEvent::BackupAborted;
event.Aborted.Reason = rep->reason;
event.Aborted.BackupId = rep->backupId;
event.Aborted.ErrorCode = rep->reason;
#ifdef VM_TRACE
ndbout_c("Backup %d aborted", rep->backupId);
#endif
return rep->reason;
}
case GSN_NF_COMPLETEREP:{
const NFCompleteRep * const rep =
CAST_CONSTPTR(NFCompleteRep, signal->getDataPtr());
#ifdef VM_TRACE
ndbout_c("Node %d fail completed", rep->failedNodeId);
#endif
if (rep->failedNodeId == nodeId ||
waitCompleted == 1)
return 1326;
// wait for next signal
// master node will report aborted backup
break;
case BackupEvent::BackupFailedToStart:
return m_lastBackupEvent.FailedToStart.ErrorCode;
case BackupEvent::BackupAborted:
return m_lastBackupEvent.Aborted.ErrorCode;
default:
return -1;
}
case GSN_NODE_FAILREP:{
const NodeFailRep * const rep =
CAST_CONSTPTR(NodeFailRep, signal->getDataPtr());
#ifdef VM_TRACE
ndbout_c("Node %d failed", rep->failNo);
#endif
if (rep->failNo == nodeId ||
waitCompleted == 1)
// wait for next signal
// master node will report aborted backup
break;
}
default:
report_unknown_signal(signal);
return SEND_OR_RECEIVE_FAILED;
}
}
return 0;
......@@ -2535,36 +2552,6 @@ MgmtSrvr::abortBackup(Uint32 backupId)
return 0;
}
void
MgmtSrvr::backupCallback(BackupEvent & event)
{
DBUG_ENTER("MgmtSrvr::backupCallback");
m_lastBackupEvent = event;
switch(event.Event){
case BackupEvent::BackupFailedToStart:
DBUG_PRINT("info",("BackupEvent::BackupFailedToStart"));
theWaitState = NO_WAIT;
break;
case BackupEvent::BackupAborted:
DBUG_PRINT("info",("BackupEvent::BackupAborted"));
theWaitState = NO_WAIT;
break;
case BackupEvent::BackupCompleted:
DBUG_PRINT("info",("BackupEvent::BackupCompleted"));
theWaitState = NO_WAIT;
break;
case BackupEvent::BackupStarted:
if(theWaitState == WAIT_BACKUP_STARTED)
{
DBUG_PRINT("info",("BackupEvent::BackupStarted NO_WAIT"));
theWaitState = NO_WAIT;
} else {
DBUG_PRINT("info",("BackupEvent::BackupStarted"));
}
}
DBUG_VOID_RETURN;
}
/*****************************************************************************
* Global Replication
......
......@@ -757,9 +757,6 @@ private:
static void *signalRecvThread_C(void *);
void signalRecvThreadRun();
void backupCallback(BackupEvent &);
BackupEvent m_lastBackupEvent;
Config *_props;
int send(class NdbApiSignal* signal, Uint32 node, Uint32 node_type);
......
......@@ -35,7 +35,8 @@ libndbapi_la_SOURCES = \
NdbDictionaryImpl.cpp \
DictCache.cpp \
ndb_cluster_connection.cpp \
NdbBlob.cpp
NdbBlob.cpp \
SignalSender.cpp
INCLUDES_LOC = -I$(top_srcdir)/ndb/src/mgmapi
......
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "SignalSender.hpp"
#include <NdbSleep.h>
#include <SignalLoggerManager.hpp>
#include <signaldata/NFCompleteRep.hpp>
#include <signaldata/NodeFailRep.hpp>
SimpleSignal::SimpleSignal(bool dealloc){
memset(this, 0, sizeof(* this));
deallocSections = dealloc;
}
SimpleSignal::~SimpleSignal(){
if(!deallocSections)
return;
if(ptr[0].p != 0) delete []ptr[0].p;
if(ptr[1].p != 0) delete []ptr[1].p;
if(ptr[2].p != 0) delete []ptr[2].p;
}
void
SimpleSignal::set(class SignalSender& ss,
Uint8 trace, Uint16 recBlock, Uint16 gsn, Uint32 len){
header.theTrace = trace;
header.theReceiversBlockNumber = recBlock;
header.theVerId_signalNumber = gsn;
header.theLength = len;
header.theSendersBlockRef = refToBlock(ss.getOwnRef());
}
void
SimpleSignal::print(FILE * out){
fprintf(out, "---- Signal ----------------\n");
SignalLoggerManager::printSignalHeader(out, header, 0, 0, false);
SignalLoggerManager::printSignalData(out, header, theData);
for(Uint32 i = 0; i<header.m_noOfSections; i++){
Uint32 len = ptr[i].sz;
fprintf(out, " --- Section %d size=%d ---\n", i, len);
Uint32 * signalData = ptr[i].p;
while(len >= 7){
fprintf(out,
" H\'%.8x H\'%.8x H\'%.8x H\'%.8x H\'%.8x H\'%.8x H\'%.8x\n",
signalData[0], signalData[1], signalData[2], signalData[3],
signalData[4], signalData[5], signalData[6]);
len -= 7;
signalData += 7;
}
if(len > 0){
fprintf(out, " H\'%.8x", signalData[0]);
for(Uint32 i = 1; i<len; i++)
fprintf(out, " H\'%.8x", signalData[i]);
fprintf(out, "\n");
}
}
}
SignalSender::SignalSender(TransporterFacade *facade)
: m_lock(0)
{
m_cond = NdbCondition_Create();
theFacade = facade;
m_blockNo = theFacade->open(this, execSignal, execNodeStatus);
assert(m_blockNo > 0);
}
SignalSender::~SignalSender(){
int i;
if (m_lock)
unlock();
theFacade->close(m_blockNo,0);
// free these _after_ closing theFacade to ensure that
// we delete all signals
for (i= m_jobBuffer.size()-1; i>= 0; i--)
delete m_jobBuffer[i];
for (i= m_usedBuffer.size()-1; i>= 0; i--)
delete m_usedBuffer[i];
NdbCondition_Destroy(m_cond);
}
int SignalSender::lock()
{
if (NdbMutex_Lock(theFacade->theMutexPtr))
return -1;
m_lock= 1;
return 0;
}
int SignalSender::unlock()
{
if (NdbMutex_Unlock(theFacade->theMutexPtr))
return -1;
m_lock= 0;
return 0;
}
Uint32
SignalSender::getOwnRef() const {
return numberToRef(m_blockNo, theFacade->ownId());
}
Uint32
SignalSender::getAliveNode() const{
return theFacade->get_an_alive_node();
}
const ClusterMgr::Node &
SignalSender::getNodeInfo(Uint16 nodeId) const {
return theFacade->theClusterMgr->getNodeInfo(nodeId);
}
Uint32
SignalSender::getNoOfConnectedNodes() const {
return theFacade->theClusterMgr->getNoOfConnectedNodes();
}
SendStatus
SignalSender::sendSignal(Uint16 nodeId, const SimpleSignal * s){
return theFacade->theTransporterRegistry->prepareSend(&s->header,
1, // JBB
&s->theData[0],
nodeId,
&s->ptr[0]);
}
template<class T>
SimpleSignal *
SignalSender::waitFor(Uint32 timeOutMillis, T & t)
{
SimpleSignal * s = t.check(m_jobBuffer);
if(s != 0){
return s;
}
NDB_TICKS now = NdbTick_CurrentMillisecond();
NDB_TICKS stop = now + timeOutMillis;
Uint32 wait = (timeOutMillis == 0 ? 10 : timeOutMillis);
do {
NdbCondition_WaitTimeout(m_cond,
theFacade->theMutexPtr,
wait);
SimpleSignal * s = t.check(m_jobBuffer);
if(s != 0){
m_usedBuffer.push_back(s);
return s;
}
now = NdbTick_CurrentMillisecond();
wait = (timeOutMillis == 0 ? 10 : stop - now);
} while(stop > now || timeOutMillis == 0);
return 0;
}
class WaitForAny {
public:
SimpleSignal * check(Vector<SimpleSignal*> & m_jobBuffer){
if(m_jobBuffer.size() > 0){
SimpleSignal * s = m_jobBuffer[0];
m_jobBuffer.erase(0);
return s;
}
return 0;
}
};
SimpleSignal *
SignalSender::waitFor(Uint32 timeOutMillis){
WaitForAny w;
return waitFor(timeOutMillis, w);
}
class WaitForNode {
public:
Uint32 m_nodeId;
SimpleSignal * check(Vector<SimpleSignal*> & m_jobBuffer){
Uint32 len = m_jobBuffer.size();
for(Uint32 i = 0; i<len; i++){
if(refToNode(m_jobBuffer[i]->header.theSendersBlockRef) == m_nodeId){
SimpleSignal * s = m_jobBuffer[i];
m_jobBuffer.erase(i);
return s;
}
}
return 0;
}
};
SimpleSignal *
SignalSender::waitFor(Uint16 nodeId, Uint32 timeOutMillis){
WaitForNode w;
w.m_nodeId = nodeId;
return waitFor(timeOutMillis, w);
}
#include <NdbApiSignal.hpp>
void
SignalSender::execSignal(void* signalSender,
NdbApiSignal* signal,
class LinearSectionPtr ptr[3]){
SimpleSignal * s = new SimpleSignal(true);
s->header = * signal;
memcpy(&s->theData[0], signal->getDataPtr(), 4 * s->header.theLength);
for(Uint32 i = 0; i<s->header.m_noOfSections; i++){
s->ptr[i].p = new Uint32[ptr[i].sz];
s->ptr[i].sz = ptr[i].sz;
memcpy(s->ptr[i].p, ptr[i].p, 4 * ptr[i].sz);
}
SignalSender * ss = (SignalSender*)signalSender;
ss->m_jobBuffer.push_back(s);
NdbCondition_Signal(ss->m_cond);
}
void
SignalSender::execNodeStatus(void* signalSender,
Uint32 nodeId,
bool alive,
bool nfCompleted){
if (alive) {
// node connected
return;
}
SimpleSignal * s = new SimpleSignal(true);
SignalSender * ss = (SignalSender*)signalSender;
// node disconnected
if(nfCompleted)
{
// node shutdown complete
s->header.theVerId_signalNumber = GSN_NF_COMPLETEREP;
NFCompleteRep *rep = (NFCompleteRep *)s->getDataPtrSend();
rep->failedNodeId = nodeId;
}
else
{
// node failure
s->header.theVerId_signalNumber = GSN_NODE_FAILREP;
NodeFailRep *rep = (NodeFailRep *)s->getDataPtrSend();
rep->failNo = nodeId;
}
ss->m_jobBuffer.push_back(s);
NdbCondition_Signal(ss->m_cond);
}
template SimpleSignal* SignalSender::waitFor<WaitForNode>(unsigned, WaitForNode&);
template SimpleSignal* SignalSender::waitFor<WaitForAny>(unsigned, WaitForAny&);
template Vector<SimpleSignal*>;
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef SIGNAL_SENDER_HPP
#define SIGNAL_SENDER_HPP
#include <ndb_global.h>
#include "TransporterFacade.hpp"
#include <Vector.hpp>
struct SimpleSignal {
public:
SimpleSignal(bool dealloc = false);
~SimpleSignal();
void set(class SignalSender&,
Uint8 trace, Uint16 recBlock, Uint16 gsn, Uint32 len);
struct SignalHeader header;
Uint32 theData[25];
LinearSectionPtr ptr[3];
int readSignalNumber() {return header.theVerId_signalNumber; }
Uint32 *getDataPtrSend() { return theData; }
const Uint32 *getDataPtr() const { return theData; }
void print(FILE * out = stdout);
private:
bool deallocSections;
};
class SignalSender {
public:
SignalSender(TransporterFacade *facade);
virtual ~SignalSender();
int lock();
int unlock();
Uint32 getOwnRef() const;
Uint32 getAliveNode() const;
const ClusterMgr::Node &getNodeInfo(Uint16 nodeId) const;
Uint32 getNoOfConnectedNodes() const;
SendStatus sendSignal(Uint16 nodeId, const SimpleSignal *);
SimpleSignal * waitFor(Uint32 timeOutMillis = 0);
SimpleSignal * waitFor(Uint16 nodeId, Uint32 timeOutMillis = 0);
SimpleSignal * waitFor(Uint16 nodeId, Uint16 gsn, Uint32 timeOutMillis = 0);
private:
int m_blockNo;
TransporterFacade * theFacade;
static void execSignal(void* signalSender,
NdbApiSignal* signal,
class LinearSectionPtr ptr[3]);
static void execNodeStatus(void* signalSender, Uint32 nodeId,
bool alive, bool nfCompleted);
int m_lock;
struct NdbCondition * m_cond;
Vector<SimpleSignal *> m_jobBuffer;
Vector<SimpleSignal *> m_usedBuffer;
template<class T>
SimpleSignal * waitFor(Uint32 timeOutMillis, T & t);
};
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment