Commit 0db2536e authored by unknown's avatar unknown

wl1801 - Support for handling NF during dirty read


ndb/include/ndbapi/NdbConnection.hpp:
  Support for handling NF during dirty read
ndb/include/ndbapi/NdbReceiver.hpp:
  Support for handling NF during dirty read
ndb/src/ndbapi/NdbConnection.cpp:
  Support for handling NF during dirty read
ndb/src/ndbapi/NdbReceiver.cpp:
  Support for handling NF during dirty read
ndb/src/ndbapi/Ndbif.cpp:
  Support for handling NF during dirty read
ndb/src/ndbapi/ndberror.c:
  Support for handling NF during dirty read
ndb/test/ndbapi/testNodeRestart.cpp:
  Support for handling NF during dirty read
parent 3b783caf
......@@ -658,6 +658,8 @@ private:
Uint32 m_db_nodes[2];
Uint32 m_failed_db_nodes[2];
int report_node_failure(Uint32 id);
// Scan operations
bool m_waitForReply;
NdbIndexScanOperation* m_theFirstScanOperation;
......
......@@ -60,6 +60,7 @@ public:
inline void next(NdbReceiver* next) { m_next = next;}
inline NdbReceiver* next() { return m_next; }
void setErrorCode(int);
private:
Uint32 theMagicNumber;
Ndb* m_ndb;
......
......@@ -1537,16 +1537,20 @@ from other transactions.
const Uint32* tPtr = (Uint32 *)&keyConf->operations[0];
Uint32 tNoComp = theNoOfOpCompleted;
for (Uint32 i = 0; i < tNoOfOperations ; i++) {
tOp = theNdb->void2rec(theNdb->int2void(*tPtr));
tPtr++;
const Uint32 tAttrInfoLen = *tPtr;
tPtr++;
tOp = theNdb->void2rec(theNdb->int2void(*tPtr++));
const Uint32 tAttrInfoLen = *tPtr++;
if (tOp && tOp->checkMagicNumber()) {
tNoComp += tOp->execTCOPCONF(tAttrInfoLen);
Uint32 done = tOp->execTCOPCONF(tAttrInfoLen);
if(tAttrInfoLen > TcKeyConf::SimpleReadBit){
NdbNodeBitmask::set(m_db_nodes,
tAttrInfoLen & (~TcKeyConf::SimpleReadBit));
Uint32 node = tAttrInfoLen & (~TcKeyConf::SimpleReadBit);
NdbNodeBitmask::set(m_db_nodes, node);
if(NdbNodeBitmask::get(m_failed_db_nodes, node) && !done)
{
done = 1;
tOp->setErrorCode(4119);
}
}
tNoComp += done;
} else {
return -1;
}//if
......@@ -1960,3 +1964,43 @@ NdbConnection::printState()
}
#undef CASE
#endif
int
NdbConnection::report_node_failure(Uint32 id){
NdbNodeBitmask::set(m_failed_db_nodes, id);
if(!NdbNodeBitmask::get(m_db_nodes, id))
{
return 0;
}
/**
* Arrived
* TCKEYCONF TRANSIDAI
* 1) - -
* 2) - X
* 3) X -
* 4) X X
*/
NdbOperation* tmp = theFirstExecOpInList;
const Uint32 len = TcKeyConf::SimpleReadBit | id;
Uint32 tNoComp = theNoOfOpCompleted;
Uint32 tNoSent = theNoOfOpSent;
while(tmp != 0)
{
if(tmp->theReceiver.m_expected_result_length == len &&
tmp->theReceiver.m_received_result_length == 0)
{
tNoComp++;
tmp->theError.code = 4119;
}
tmp = tmp->next();
}
theNoOfOpCompleted = tNoComp;
if(tNoComp == tNoSent)
{
theError.code = 4119;
theCompletionStatus = NdbConnection::CompletedFailure;
return 1;
}
return 0;
}
......@@ -274,3 +274,11 @@ NdbReceiver::execKEYINFO20(Uint32 info, const Uint32* aDataPtr, Uint32 aLength)
return (tmp == m_expected_result_length ? 1 : 0);
}
void
NdbReceiver::setErrorCode(int code)
{
theMagicNumber = 0;
NdbOperation* op = (NdbOperation*)getOwner();
op->setErrorCode(code);
}
......@@ -249,6 +249,7 @@ Ndb::report_node_failure(Uint32 node_id)
*/
the_release_ind[node_id] = 1;
theWaiter.nodeFail(node_id);
return;
}//Ndb::report_node_failure()
......@@ -271,9 +272,10 @@ Ndb::abortTransactionsAfterNodeFailure(Uint16 aNodeId)
Uint32 tNoSentTransactions = theNoOfSentTransactions;
for (int i = tNoSentTransactions - 1; i >= 0; i--) {
NdbConnection* localCon = theSentTransactionsArray[i];
if (localCon->getConnectedNodeId() == aNodeId ) {
if (localCon->getConnectedNodeId() == aNodeId) {
const NdbConnection::SendStatusType sendStatus = localCon->theSendStatus;
if (sendStatus == NdbConnection::sendTC_OP || sendStatus == NdbConnection::sendTC_COMMIT) {
if (sendStatus == NdbConnection::sendTC_OP ||
sendStatus == NdbConnection::sendTC_COMMIT) {
/*
A transaction was interrupted in the prepare phase by a node
failure. Since the transaction was not found in the phase
......@@ -293,7 +295,7 @@ Ndb::abortTransactionsAfterNodeFailure(Uint16 aNodeId)
printState("abortTransactionsAfterNodeFailure %x", this);
abort();
#endif
}//
}
/*
All transactions arriving here have no connection to the kernel
intact since the node was failing and they were aborted. Thus we
......@@ -302,7 +304,11 @@ Ndb::abortTransactionsAfterNodeFailure(Uint16 aNodeId)
localCon->theCommitStatus = NdbConnection::Aborted;
localCon->theReleaseOnClose = true;
completedTransaction(localCon);
}//if
}
else if(localCon->report_node_failure(aNodeId));
{
completedTransaction(localCon);
}
}//for
return;
}//Ndb::abortTransactionsAfterNodeFailure()
......
......@@ -94,6 +94,7 @@ ErrorBundle ErrorCodes[] = {
{ 4115, NR,
"Transaction was committed but all read information was not "
"received due to node crash" },
{ 4119, NR, "Simple/dirty read failed due to node failure" },
/**
* Node shutdown
......
......@@ -299,8 +299,10 @@ int runRestarts(NDBT_Context* ctx, NDBT_Step* step){
int runDirtyRead(NDBT_Context* ctx, NDBT_Step* step){
int result = NDBT_OK;
int loops = ctx->getNumLoops();
int records = ctx->getNumRecords();
NdbRestarter restarter;
HugoTransactions hugoTrans(*ctx->getTab());
HugoOperations hugoOps(*ctx->getTab());
Ndb* pNdb = GETNDB(step);
int i = 0;
while(i<loops && result != NDBT_FAILED && !ctx->isTestStopped()){
......@@ -312,14 +314,34 @@ int runDirtyRead(NDBT_Context* ctx, NDBT_Step* step){
restarter.insertErrorInNode(nodeId, 5041);
restarter.insertErrorInAllNodes(8048);
if (hugoTrans.pkReadRecords(GETNDB(step), 1, 1,
NdbOperation::LM_CommittedRead) != 0)
{
for(int j = 0; j<records; j++){
if(hugoOps.startTransaction(pNdb) != 0)
return NDBT_FAILED;
if(hugoOps.pkReadRecord(pNdb, j, 1, NdbOperation::LM_CommittedRead) != 0)
goto err;
int res;
if((res = hugoOps.execute_Commit(pNdb)) == 4119)
goto done;
if(res != 0)
goto err;
if(hugoOps.closeTransaction(pNdb) != 0)
return NDBT_FAILED;
}
done:
if(hugoOps.closeTransaction(pNdb) != 0)
return NDBT_FAILED;
i++;
restarter.waitClusterStarted(60) ;
}
return result;
err:
hugoOps.closeTransaction(pNdb);
return NDBT_FAILED;
}
NDBT_TESTSUITE(testNodeRestart);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment