Commit 007e29e2 authored by unknown's avatar unknown

ndb - bug#31482

  (re)impl. simple-read (read that releases lock just before LQHKEYCONF)
  use simple-read for blobs


storage/ndb/include/kernel/signaldata/TcKeyConf.hpp:
  rename bit
storage/ndb/include/ndbapi/NdbOperation.hpp:
  add new lock-mode
storage/ndb/src/common/debugger/signaldata/TcKeyConf.cpp:
  rename bit
storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp:
  remove aggregate
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp:
  impl. simple-read = normal read + release lock just before LQHKEYCONF
storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp:
  impl. simple-read = normal read + release lock just before LQHKEYCONF
storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp:
  impl. simple-read = normal read + release lock just before LQHKEYCONF
storage/ndb/src/ndbapi/NdbBlob.cpp:
  use simple read for blobs
storage/ndb/src/ndbapi/NdbIndexOperation.cpp:
  no simple-read for ui (yet)
storage/ndb/src/ndbapi/NdbOperationDefine.cpp:
  impl. simple-read
storage/ndb/src/ndbapi/NdbOperationExec.cpp:
  impl. simple-read
storage/ndb/src/ndbapi/NdbReceiver.cpp:
  impl. simple-read
storage/ndb/src/ndbapi/NdbScanOperation.cpp:
  no simple-read for scan (yet)
storage/ndb/src/ndbapi/NdbTransaction.cpp:
  rename bit
storage/ndb/test/ndbapi/testBasic.cpp:
  add testcase for simlpe-read
storage/ndb/test/run-test/daily-basic-tests.txt:
  add testcase
storage/ndb/test/src/HugoOperations.cpp:
  add simple-read
parent 72a432e2
...@@ -46,7 +46,7 @@ public: ...@@ -46,7 +46,7 @@ public:
*/ */
STATIC_CONST( StaticLength = 5 ); STATIC_CONST( StaticLength = 5 );
STATIC_CONST( OperationLength = 2 ); STATIC_CONST( OperationLength = 2 );
STATIC_CONST( SimpleReadBit = (((Uint32)1) << 31) ); STATIC_CONST( DirtyReadBit = (((Uint32)1) << 31) );
private: private:
......
...@@ -93,8 +93,9 @@ public: ...@@ -93,8 +93,9 @@ public:
,LM_CommittedRead ///< Ignore locks, read last committed value ,LM_CommittedRead ///< Ignore locks, read last committed value
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
= 2, = 2,
LM_Dirty = 2 LM_Dirty = 2,
#endif #endif
LM_SimpleRead = 3 ///< Read with shared lock, but release lock directly
}; };
/** /**
......
...@@ -51,11 +51,11 @@ printTCKEYCONF(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receive ...@@ -51,11 +51,11 @@ printTCKEYCONF(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receive
(TcKeyConf::getMarkerFlag(confInfo) == 0)?"false":"true"); (TcKeyConf::getMarkerFlag(confInfo) == 0)?"false":"true");
fprintf(output, "Operations:\n"); fprintf(output, "Operations:\n");
for(i = 0; i < noOfOp; i++) { for(i = 0; i < noOfOp; i++) {
if(sig->operations[i].attrInfoLen > TcKeyConf::SimpleReadBit) if(sig->operations[i].attrInfoLen > TcKeyConf::DirtyReadBit)
fprintf(output, fprintf(output,
" apiOperationPtr: H'%.8x, simplereadnode: %u\n", " apiOperationPtr: H'%.8x, simplereadnode: %u\n",
sig->operations[i].apiOperationPtr, sig->operations[i].apiOperationPtr,
sig->operations[i].attrInfoLen & (~TcKeyConf::SimpleReadBit)); sig->operations[i].attrInfoLen & (~TcKeyConf::DirtyReadBit));
else else
fprintf(output, fprintf(output,
" apiOperationPtr: H'%.8x, attrInfoLen: %u\n", " apiOperationPtr: H'%.8x, attrInfoLen: %u\n",
......
...@@ -2025,7 +2025,6 @@ public: ...@@ -2025,7 +2025,6 @@ public:
Uint8 reclenAiLqhkey; Uint8 reclenAiLqhkey;
Uint8 m_offset_current_keybuf; Uint8 m_offset_current_keybuf;
Uint8 replicaType; Uint8 replicaType;
Uint8 simpleRead;
Uint8 seqNoReplica; Uint8 seqNoReplica;
Uint8 tcNodeFailrec; Uint8 tcNodeFailrec;
Uint8 m_disk_table; Uint8 m_disk_table;
......
...@@ -3496,7 +3496,6 @@ void Dblqh::execLQHKEYREQ(Signal* signal) ...@@ -3496,7 +3496,6 @@ void Dblqh::execLQHKEYREQ(Signal* signal)
regTcPtr->dirtyOp = LqhKeyReq::getDirtyFlag(Treqinfo); regTcPtr->dirtyOp = LqhKeyReq::getDirtyFlag(Treqinfo);
regTcPtr->opExec = LqhKeyReq::getInterpretedFlag(Treqinfo); regTcPtr->opExec = LqhKeyReq::getInterpretedFlag(Treqinfo);
regTcPtr->opSimple = LqhKeyReq::getSimpleFlag(Treqinfo); regTcPtr->opSimple = LqhKeyReq::getSimpleFlag(Treqinfo);
regTcPtr->simpleRead = op == ZREAD && regTcPtr->opSimple;
regTcPtr->seqNoReplica = LqhKeyReq::getSeqNoReplica(Treqinfo); regTcPtr->seqNoReplica = LqhKeyReq::getSeqNoReplica(Treqinfo);
UintR TreclenAiLqhkey = LqhKeyReq::getAIInLqhKeyReq(Treqinfo); UintR TreclenAiLqhkey = LqhKeyReq::getAIInLqhKeyReq(Treqinfo);
regTcPtr->apiVersionNo = 0; regTcPtr->apiVersionNo = 0;
...@@ -3514,8 +3513,14 @@ void Dblqh::execLQHKEYREQ(Signal* signal) ...@@ -3514,8 +3513,14 @@ void Dblqh::execLQHKEYREQ(Signal* signal)
op == ZREAD_EX ? ZUPDATE : (Operation_t) op == ZWRITE ? ZINSERT : (Operation_t) op; op == ZREAD_EX ? ZUPDATE : (Operation_t) op == ZWRITE ? ZINSERT : (Operation_t) op;
} }
CRASH_INSERTION2(5041, regTcPtr->simpleRead && if (regTcPtr->dirtyOp)
refToNode(signal->senderBlockRef()) != cownNodeid); {
ndbrequire(regTcPtr->opSimple);
}
CRASH_INSERTION2(5041, (op == ZREAD &&
(regTcPtr->opSimple || regTcPtr->dirtyOp) &&
refToNode(signal->senderBlockRef()) != cownNodeid));
regTcPtr->reclenAiLqhkey = TreclenAiLqhkey; regTcPtr->reclenAiLqhkey = TreclenAiLqhkey;
regTcPtr->currReclenAi = TreclenAiLqhkey; regTcPtr->currReclenAi = TreclenAiLqhkey;
...@@ -3687,8 +3692,8 @@ void Dblqh::execLQHKEYREQ(Signal* signal) ...@@ -3687,8 +3692,8 @@ void Dblqh::execLQHKEYREQ(Signal* signal)
Uint8 TdistKey = LqhKeyReq::getDistributionKey(TtotReclenAi); Uint8 TdistKey = LqhKeyReq::getDistributionKey(TtotReclenAi);
if ((tfragDistKey != TdistKey) && if ((tfragDistKey != TdistKey) &&
(regTcPtr->seqNoReplica == 0) && (regTcPtr->seqNoReplica == 0) &&
(regTcPtr->dirtyOp == ZFALSE) && (regTcPtr->dirtyOp == ZFALSE))
(regTcPtr->simpleRead == ZFALSE)) { {
/* ---------------------------------------------------------------------- /* ----------------------------------------------------------------------
* WE HAVE DIFFERENT OPINION THAN THE DIH THAT STARTED THE TRANSACTION. * WE HAVE DIFFERENT OPINION THAN THE DIH THAT STARTED THE TRANSACTION.
* THE REASON COULD BE THAT THIS IS AN OLD DISTRIBUTION WHICH IS NO LONGER * THE REASON COULD BE THAT THIS IS AN OLD DISTRIBUTION WHICH IS NO LONGER
...@@ -4778,7 +4783,18 @@ void Dblqh::tupkeyConfLab(Signal* signal) ...@@ -4778,7 +4783,18 @@ void Dblqh::tupkeyConfLab(Signal* signal)
TRACE_OP(regTcPtr, "TUPKEYCONF"); TRACE_OP(regTcPtr, "TUPKEYCONF");
if (regTcPtr->simpleRead) { if (readLen != 0)
{
jam();
/* SET BIT 15 IN REQINFO */
LqhKeyReq::setApplicationAddressFlag(regTcPtr->reqinfo, 1);
regTcPtr->readlenAi = readLen;
}//if
if (regTcPtr->operation == ZREAD &&
(regTcPtr->opSimple || regTcPtr->dirtyOp))
{
jam(); jam();
/* ---------------------------------------------------------------------- /* ----------------------------------------------------------------------
* THE OPERATION IS A SIMPLE READ. * THE OPERATION IS A SIMPLE READ.
...@@ -4792,14 +4808,6 @@ void Dblqh::tupkeyConfLab(Signal* signal) ...@@ -4792,14 +4808,6 @@ void Dblqh::tupkeyConfLab(Signal* signal)
commitContinueAfterBlockedLab(signal); commitContinueAfterBlockedLab(signal);
return; return;
}//if }//if
if (readLen != 0)
{
jam();
/* SET BIT 15 IN REQINFO */
LqhKeyReq::setApplicationAddressFlag(regTcPtr->reqinfo, 1);
regTcPtr->readlenAi = readLen;
}//if
regTcPtr->totSendlenAi = writeLen; regTcPtr->totSendlenAi = writeLen;
ndbrequire(regTcPtr->totSendlenAi == regTcPtr->currTupAiLen); ndbrequire(regTcPtr->totSendlenAi == regTcPtr->currTupAiLen);
...@@ -5178,12 +5186,15 @@ void Dblqh::packLqhkeyreqLab(Signal* signal) ...@@ -5178,12 +5186,15 @@ void Dblqh::packLqhkeyreqLab(Signal* signal)
/* */ /* */
/* ------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------- */
sendLqhkeyconfTc(signal, regTcPtr->tcBlockref); sendLqhkeyconfTc(signal, regTcPtr->tcBlockref);
if (regTcPtr->dirtyOp != ZTRUE) { if (! (regTcPtr->dirtyOp ||
(regTcPtr->operation == ZREAD && regTcPtr->opSimple)))
{
jam(); jam();
regTcPtr->transactionState = TcConnectionrec::PREPARED; regTcPtr->transactionState = TcConnectionrec::PREPARED;
releaseOprec(signal); releaseOprec(signal);
} else { } else {
jam(); jam();
/*************************************************************>*/ /*************************************************************>*/
/* DIRTY WRITES ARE USED IN TWO SITUATIONS. THE FIRST */ /* DIRTY WRITES ARE USED IN TWO SITUATIONS. THE FIRST */
/* SITUATION IS WHEN THEY ARE USED TO UPDATE COUNTERS AND*/ /* SITUATION IS WHEN THEY ARE USED TO UPDATE COUNTERS AND*/
...@@ -6406,8 +6417,8 @@ void Dblqh::commitContinueAfterBlockedLab(Signal* signal) ...@@ -6406,8 +6417,8 @@ void Dblqh::commitContinueAfterBlockedLab(Signal* signal)
Ptr<TcConnectionrec> regTcPtr = tcConnectptr; Ptr<TcConnectionrec> regTcPtr = tcConnectptr;
Ptr<Fragrecord> regFragptr = fragptr; Ptr<Fragrecord> regFragptr = fragptr;
Uint32 operation = regTcPtr.p->operation; Uint32 operation = regTcPtr.p->operation;
Uint32 simpleRead = regTcPtr.p->simpleRead;
Uint32 dirtyOp = regTcPtr.p->dirtyOp; Uint32 dirtyOp = regTcPtr.p->dirtyOp;
Uint32 opSimple = regTcPtr.p->opSimple;
if (regTcPtr.p->activeCreat != Fragrecord::AC_IGNORED) { if (regTcPtr.p->activeCreat != Fragrecord::AC_IGNORED) {
if (operation != ZREAD) { if (operation != ZREAD) {
TupCommitReq * const tupCommitReq = TupCommitReq * const tupCommitReq =
...@@ -6465,20 +6476,29 @@ void Dblqh::commitContinueAfterBlockedLab(Signal* signal) ...@@ -6465,20 +6476,29 @@ void Dblqh::commitContinueAfterBlockedLab(Signal* signal)
EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1); EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1);
} }
if (simpleRead) { if (dirtyOp)
{
jam(); jam();
/* ------------------------------------------------------------------------- */ /**
/*THE OPERATION WAS A SIMPLE READ THUS THE COMMIT PHASE IS ONLY NEEDED TO */ * The dirtyRead does not send anything but TRANSID_AI from LDM
/*RELEASE THE LOCKS. AT THIS POINT IN THE CODE THE LOCKS ARE RELEASED AND WE */ */
/*ARE IN A POSITION TO SEND LQHKEYCONF TO TC. WE WILL ALSO RELEASE ALL */
/*RESOURCES BELONGING TO THIS OPERATION SINCE NO MORE WORK WILL BE */
/*PERFORMED. */
/* ------------------------------------------------------------------------- */
fragptr = regFragptr; fragptr = regFragptr;
tcConnectptr = regTcPtr; tcConnectptr = regTcPtr;
cleanUp(signal); cleanUp(signal);
return; return;
}//if }
/**
* The simpleRead will send a LQHKEYCONF
* but have already released the locks
*/
if (opSimple)
{
fragptr = regFragptr;
tcConnectptr = regTcPtr;
packLqhkeyreqLab(signal);
return;
}
} }
}//if }//if
jamEntry(); jamEntry();
...@@ -7088,7 +7108,7 @@ void Dblqh::abortStateHandlerLab(Signal* signal) ...@@ -7088,7 +7108,7 @@ void Dblqh::abortStateHandlerLab(Signal* signal)
/* ------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------- */
return; return;
}//if }//if
if (regTcPtr->simpleRead) { if (regTcPtr->opSimple) {
jam(); jam();
/* ------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------- */
/*A SIMPLE READ IS CURRENTLY RELEASING THE LOCKS OR WAITING FOR ACCESS TO */ /*A SIMPLE READ IS CURRENTLY RELEASING THE LOCKS OR WAITING FOR ACCESS TO */
...@@ -7356,7 +7376,8 @@ void Dblqh::continueAbortLab(Signal* signal) ...@@ -7356,7 +7376,8 @@ void Dblqh::continueAbortLab(Signal* signal)
void Dblqh::continueAfterLogAbortWriteLab(Signal* signal) void Dblqh::continueAfterLogAbortWriteLab(Signal* signal)
{ {
TcConnectionrec * const regTcPtr = tcConnectptr.p; TcConnectionrec * const regTcPtr = tcConnectptr.p;
if (regTcPtr->simpleRead) { if (regTcPtr->operation == ZREAD && regTcPtr->dirtyOp)
{
jam(); jam();
TcKeyRef * const tcKeyRef = (TcKeyRef *) signal->getDataPtrSend(); TcKeyRef * const tcKeyRef = (TcKeyRef *) signal->getDataPtrSend();
...@@ -19027,7 +19048,6 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal) ...@@ -19027,7 +19048,6 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal)
ndbout << " operation = " << tcRec.p->operation<<endl; ndbout << " operation = " << tcRec.p->operation<<endl;
ndbout << " tcNodeFailrec = " << tcRec.p->tcNodeFailrec ndbout << " tcNodeFailrec = " << tcRec.p->tcNodeFailrec
<< " seqNoReplica = " << tcRec.p->seqNoReplica << " seqNoReplica = " << tcRec.p->seqNoReplica
<< " simpleRead = " << tcRec.p->simpleRead
<< endl; << endl;
ndbout << " replicaType = " << tcRec.p->replicaType ndbout << " replicaType = " << tcRec.p->replicaType
<< " reclenAiLqhkey = " << tcRec.p->reclenAiLqhkey << " reclenAiLqhkey = " << tcRec.p->reclenAiLqhkey
......
...@@ -786,6 +786,7 @@ public: ...@@ -786,6 +786,7 @@ public:
UintR apiConnect; /* POINTER TO API CONNECT RECORD */ UintR apiConnect; /* POINTER TO API CONNECT RECORD */
UintR nextTcConnect; /* NEXT TC RECORD*/ UintR nextTcConnect; /* NEXT TC RECORD*/
Uint8 dirtyOp; Uint8 dirtyOp;
Uint8 opSimple;
Uint8 lastReplicaNo; /* NUMBER OF THE LAST REPLICA IN THE OPERATION */ Uint8 lastReplicaNo; /* NUMBER OF THE LAST REPLICA IN THE OPERATION */
Uint8 noOfNodes; /* TOTAL NUMBER OF NODES IN OPERATION */ Uint8 noOfNodes; /* TOTAL NUMBER OF NODES IN OPERATION */
Uint8 operation; /* OPERATION TYPE */ Uint8 operation; /* OPERATION TYPE */
...@@ -886,12 +887,7 @@ public: ...@@ -886,12 +887,7 @@ public:
Uint8 opExec; Uint8 opExec;
Uint8 unused; Uint8 unused;
Uint8 unused1;
/**
* IS THE OPERATION A SIMPLE TRANSACTION
* 0 = NO, 1 = YES
*/
Uint8 opSimple;
//--------------------------------------------------- //---------------------------------------------------
// Second 16 byte cache line in second 64 byte cache // Second 16 byte cache line in second 64 byte cache
...@@ -1464,7 +1460,7 @@ private: ...@@ -1464,7 +1460,7 @@ private:
void releaseAttrinfo(); void releaseAttrinfo();
void releaseGcp(Signal* signal); void releaseGcp(Signal* signal);
void releaseKeys(); void releaseKeys();
void releaseSimpleRead(Signal*, ApiConnectRecordPtr, TcConnectRecord*); void releaseDirtyRead(Signal*, ApiConnectRecordPtr, TcConnectRecord*);
void releaseDirtyWrite(Signal* signal); void releaseDirtyWrite(Signal* signal);
void releaseTcCon(); void releaseTcCon();
void releaseTcConnectFail(Signal* signal); void releaseTcConnectFail(Signal* signal);
...@@ -1620,7 +1616,7 @@ private: ...@@ -1620,7 +1616,7 @@ private:
void startphase1x010Lab(Signal* signal); void startphase1x010Lab(Signal* signal);
void lqhKeyConf_checkTransactionState(Signal * signal, void lqhKeyConf_checkTransactionState(Signal * signal,
ApiConnectRecord * const regApiPtr); Ptr<ApiConnectRecord> regApiPtr);
void checkDropTab(Signal* signal); void checkDropTab(Signal* signal);
......
...@@ -1141,7 +1141,7 @@ NdbBlob::readTableParts(char* buf, Uint32 part, Uint32 count) ...@@ -1141,7 +1141,7 @@ NdbBlob::readTableParts(char* buf, Uint32 part, Uint32 count)
* table tuple does not fully protect blob parts since DBTUP * table tuple does not fully protect blob parts since DBTUP
* commits each tuple separately. * commits each tuple separately.
*/ */
tOp->readTuple() == -1 || tOp->readTuple(NdbOperation::LM_SimpleRead) == -1 ||
setPartKeyValue(tOp, part + n) == -1 || setPartKeyValue(tOp, part + n) == -1 ||
tOp->getValue((Uint32)3, buf) == NULL) { tOp->getValue((Uint32)3, buf) == NULL) {
setErrorCode(tOp); setErrorCode(tOp);
......
...@@ -85,6 +85,9 @@ int NdbIndexOperation::readTuple(NdbOperation::LockMode lm) ...@@ -85,6 +85,9 @@ int NdbIndexOperation::readTuple(NdbOperation::LockMode lm)
case LM_CommittedRead: case LM_CommittedRead:
return readTuple(); return readTuple();
break; break;
case LM_SimpleRead:
return readTuple();
break;
default: default:
return -1; return -1;
}; };
......
...@@ -131,6 +131,8 @@ NdbOperation::readTuple(NdbOperation::LockMode lm) ...@@ -131,6 +131,8 @@ NdbOperation::readTuple(NdbOperation::LockMode lm)
case LM_CommittedRead: case LM_CommittedRead:
return committedRead(); return committedRead();
break; break;
case LM_SimpleRead:
return simpleRead();
default: default:
return -1; return -1;
}; };
...@@ -185,24 +187,22 @@ NdbOperation::readTupleExclusive() ...@@ -185,24 +187,22 @@ NdbOperation::readTupleExclusive()
int int
NdbOperation::simpleRead() NdbOperation::simpleRead()
{ {
/** NdbTransaction* tNdbCon = theNdbCon;
* Currently/still disabled
*/
return readTuple();
#if 0
int tErrorLine = theErrorLine; int tErrorLine = theErrorLine;
if (theStatus == Init) { if (theStatus == Init) {
theStatus = OperationDefined; theStatus = OperationDefined;
theOperationType = ReadRequest; theOperationType = ReadRequest;
theSimpleIndicator = 1; theSimpleIndicator = 1;
theDirtyIndicator = 0;
theErrorLine = tErrorLine++; theErrorLine = tErrorLine++;
theLockMode = LM_Read; theLockMode = LM_SimpleRead;
m_abortOption = AO_IgnoreError;
tNdbCon->theSimpleState = 0;
return 0; return 0;
} else { } else {
setErrorCode(4200); setErrorCode(4200);
return -1; return -1;
}//if }//if
#endif
}//NdbOperation::simpleRead() }//NdbOperation::simpleRead()
/***************************************************************************** /*****************************************************************************
...@@ -338,13 +338,17 @@ NdbOperation::setReadLockMode(LockMode lockMode) ...@@ -338,13 +338,17 @@ NdbOperation::setReadLockMode(LockMode lockMode)
{ {
/* We only support changing lock mode for read operations at this time. */ /* We only support changing lock mode for read operations at this time. */
assert(theOperationType == ReadRequest || theOperationType == ReadExclusive); assert(theOperationType == ReadRequest || theOperationType == ReadExclusive);
switch (lockMode) switch (lockMode) {
{ case LM_CommittedRead: /* TODO, check theNdbCon->theSimpleState */
case LM_CommittedRead:
theOperationType= ReadRequest; theOperationType= ReadRequest;
theSimpleIndicator= 1; theSimpleIndicator= 1;
theDirtyIndicator= 1; theDirtyIndicator= 1;
break; break;
case LM_SimpleRead: /* TODO, check theNdbCon->theSimpleState */
theOperationType= ReadRequest;
theSimpleIndicator= 1;
theDirtyIndicator= 0;
break;
case LM_Read: case LM_Read:
theNdbCon->theSimpleState= 0; theNdbCon->theSimpleState= 0;
theOperationType= ReadRequest; theOperationType= ReadRequest;
......
...@@ -175,12 +175,11 @@ NdbOperation::prepareSend(Uint32 aTC_ConnectPtr, ...@@ -175,12 +175,11 @@ NdbOperation::prepareSend(Uint32 aTC_ConnectPtr,
Uint8 tInterpretIndicator = theInterpretIndicator; Uint8 tInterpretIndicator = theInterpretIndicator;
Uint8 tNoDisk = m_no_disk_flag; Uint8 tNoDisk = m_no_disk_flag;
//------------------------------------------------------------- /**
// Simple state is set if start and commit is set and it is * A dirty read, can not abort the transaction
// a read request. Otherwise it is set to zero. */
//-------------------------------------------------------------
Uint8 tReadInd = (theOperationType == ReadRequest); Uint8 tReadInd = (theOperationType == ReadRequest);
Uint8 tSimpleState = tReadInd & tSimpleIndicator; Uint8 tDirtyState = tReadInd & tDirtyIndicator;
tcKeyReq->transId1 = tTransId1; tcKeyReq->transId1 = tTransId1;
tcKeyReq->transId2 = tTransId2; tcKeyReq->transId2 = tTransId2;
...@@ -206,8 +205,8 @@ NdbOperation::prepareSend(Uint32 aTC_ConnectPtr, ...@@ -206,8 +205,8 @@ NdbOperation::prepareSend(Uint32 aTC_ConnectPtr,
tcKeyReq->setOperationType(tReqInfo, tOperationType); tcKeyReq->setOperationType(tReqInfo, tOperationType);
tcKeyReq->setKeyLength(tReqInfo, tTupKeyLen); tcKeyReq->setKeyLength(tReqInfo, tTupKeyLen);
// A simple read is always ignore error // A dirty read is always ignore error
abortOption = tSimpleState ? (Uint8) AO_IgnoreError : (Uint8) abortOption; abortOption = tDirtyState ? (Uint8) AO_IgnoreError : (Uint8) abortOption;
tcKeyReq->setAbortOption(tReqInfo, abortOption); tcKeyReq->setAbortOption(tReqInfo, abortOption);
m_abortOption = abortOption; m_abortOption = abortOption;
...@@ -549,8 +548,8 @@ NdbOperation::receiveTCKEYREF( NdbApiSignal* aSignal) ...@@ -549,8 +548,8 @@ NdbOperation::receiveTCKEYREF( NdbApiSignal* aSignal)
theStatus = Finished; theStatus = Finished;
theReceiver.m_received_result_length = ~0; theReceiver.m_received_result_length = ~0;
// not simple read // not dirty read
if(! (theOperationType == ReadRequest && theSimpleIndicator)) if(! (theOperationType == ReadRequest && theDirtyIndicator))
{ {
theNdbCon->OpCompleteFailure(this); theNdbCon->OpCompleteFailure(this);
return -1; return -1;
......
...@@ -283,7 +283,7 @@ NdbReceiver::execTRANSID_AI(const Uint32* aDataPtr, Uint32 aLength) ...@@ -283,7 +283,7 @@ NdbReceiver::execTRANSID_AI(const Uint32* aDataPtr, Uint32 aLength)
Uint32 tmp = m_received_result_length + aLength; Uint32 tmp = m_received_result_length + aLength;
m_received_result_length = tmp; m_received_result_length = tmp;
return (tmp == exp || (exp > TcKeyConf::SimpleReadBit) ? 1 : 0); return (tmp == exp || (exp > TcKeyConf::DirtyReadBit) ? 1 : 0);
} }
int int
......
...@@ -237,6 +237,7 @@ NdbScanOperation::setReadLockMode(LockMode lockMode) ...@@ -237,6 +237,7 @@ NdbScanOperation::setReadLockMode(LockMode lockMode)
lockHoldMode= false; lockHoldMode= false;
readCommitted= true; readCommitted= true;
break; break;
case LM_SimpleRead:
case LM_Read: case LM_Read:
lockExcl= false; lockExcl= false;
lockHoldMode= true; lockHoldMode= true;
......
...@@ -1787,8 +1787,8 @@ from other transactions. ...@@ -1787,8 +1787,8 @@ from other transactions.
const Uint32 tAttrInfoLen = *tPtr++; const Uint32 tAttrInfoLen = *tPtr++;
if (tOp && tOp->checkMagicNumber()) { if (tOp && tOp->checkMagicNumber()) {
Uint32 done = tOp->execTCOPCONF(tAttrInfoLen); Uint32 done = tOp->execTCOPCONF(tAttrInfoLen);
if(tAttrInfoLen > TcKeyConf::SimpleReadBit){ if(tAttrInfoLen > TcKeyConf::DirtyReadBit){
Uint32 node = tAttrInfoLen & (~TcKeyConf::SimpleReadBit); Uint32 node = tAttrInfoLen & (~TcKeyConf::DirtyReadBit);
NdbNodeBitmask::set(m_db_nodes, node); NdbNodeBitmask::set(m_db_nodes, node);
if(NdbNodeBitmask::get(m_failed_db_nodes, node) && !done) if(NdbNodeBitmask::get(m_failed_db_nodes, node) && !done)
{ {
...@@ -2182,7 +2182,7 @@ NdbTransaction::report_node_failure(Uint32 id){ ...@@ -2182,7 +2182,7 @@ NdbTransaction::report_node_failure(Uint32 id){
* 4) X X * 4) X X
*/ */
NdbOperation* tmp = theFirstExecOpInList; NdbOperation* tmp = theFirstExecOpInList;
const Uint32 len = TcKeyConf::SimpleReadBit | id; const Uint32 len = TcKeyConf::DirtyReadBit | id;
Uint32 tNoComp = theNoOfOpCompleted; Uint32 tNoComp = theNoOfOpCompleted;
Uint32 tNoSent = theNoOfOpSent; Uint32 tNoSent = theNoOfOpSent;
Uint32 count = 0; Uint32 count = 0;
......
...@@ -136,31 +136,13 @@ int runPkRead(NDBT_Context* ctx, NDBT_Step* step){ ...@@ -136,31 +136,13 @@ int runPkRead(NDBT_Context* ctx, NDBT_Step* step){
int loops = ctx->getNumLoops(); int loops = ctx->getNumLoops();
int records = ctx->getNumRecords(); int records = ctx->getNumRecords();
int batchSize = ctx->getProperty("BatchSize", 1); int batchSize = ctx->getProperty("BatchSize", 1);
int lm = ctx->getProperty("LockMode", NdbOperation::LM_Read);
int i = 0; int i = 0;
HugoTransactions hugoTrans(*ctx->getTab()); HugoTransactions hugoTrans(*ctx->getTab());
while (i<loops) {
g_info << i << ": ";
if (hugoTrans.pkReadRecords(GETNDB(step), records, batchSize) != NDBT_OK){
g_info << endl;
return NDBT_FAILED;
}
i++;
}
g_info << endl;
return NDBT_OK;
}
int runPkDirtyRead(NDBT_Context* ctx, NDBT_Step* step){
int loops = ctx->getNumLoops();
int records = ctx->getNumRecords();
int batchSize = ctx->getProperty("BatchSize", 1);
int i = 0;
bool dirty = true;
HugoTransactions hugoTrans(*ctx->getTab());
while (i<loops) { while (i<loops) {
g_info << i << ": "; g_info << i << ": ";
if (hugoTrans.pkReadRecords(GETNDB(step), records, batchSize, if (hugoTrans.pkReadRecords(GETNDB(step), records, batchSize,
NdbOperation::LM_CommittedRead) != NDBT_OK){ (NdbOperation::LockMode)lm) != NDBT_OK){
g_info << endl; g_info << endl;
return NDBT_FAILED; return NDBT_FAILED;
} }
...@@ -1552,14 +1534,23 @@ TESTCASE("PkInsert", ...@@ -1552,14 +1534,23 @@ TESTCASE("PkInsert",
} }
TESTCASE("PkRead", TESTCASE("PkRead",
"Verify that we can insert, read and delete from this table using PK"){ "Verify that we can insert, read and delete from this table using PK"){
TC_PROPERTY("LockMode", NdbOperation::LM_Read);
INITIALIZER(runLoadTable); INITIALIZER(runLoadTable);
STEP(runPkRead); STEP(runPkRead);
FINALIZER(runClearTable); FINALIZER(runClearTable);
} }
TESTCASE("PkDirtyRead", TESTCASE("PkDirtyRead",
"Verify that we can insert, dirty read and delete from this table using PK"){ "Verify that we can insert, dirty read and delete from this table using PK"){
TC_PROPERTY("LockMode", NdbOperation::LM_Dirty);
INITIALIZER(runLoadTable); INITIALIZER(runLoadTable);
STEP(runPkDirtyRead); STEP(runPkRead);
FINALIZER(runClearTable);
}
TESTCASE("PkSimpleRead",
"Verify that we can insert, simple read and delete from this table using PK"){
TC_PROPERTY("LockMode", NdbOperation::LM_SimpleRead);
INITIALIZER(runLoadTable);
STEP(runPkRead);
FINALIZER(runClearTable); FINALIZER(runClearTable);
} }
TESTCASE("PkUpdate", TESTCASE("PkUpdate",
......
...@@ -63,6 +63,14 @@ max-time: 500 ...@@ -63,6 +63,14 @@ max-time: 500
cmd: testBasic cmd: testBasic
args: -n PkRead args: -n PkRead
max-time: 500
cmd: testBasic
args: -n PkSimpleRead
max-time: 500
cmd: testBasic
args: -n PkDirtyRead
max-time: 500 max-time: 500
cmd: testBasic cmd: testBasic
args: -n PkUpdate args: -n PkUpdate
......
...@@ -93,6 +93,7 @@ int HugoOperations::pkReadRecord(Ndb* pNdb, ...@@ -93,6 +93,7 @@ int HugoOperations::pkReadRecord(Ndb* pNdb,
case NdbOperation::LM_Read: case NdbOperation::LM_Read:
case NdbOperation::LM_Exclusive: case NdbOperation::LM_Exclusive:
case NdbOperation::LM_CommittedRead: case NdbOperation::LM_CommittedRead:
case NdbOperation::LM_SimpleRead:
if(idx && idx->getType() == NdbDictionary::Index::OrderedIndex && if(idx && idx->getType() == NdbDictionary::Index::OrderedIndex &&
pIndexScanOp == 0) pIndexScanOp == 0)
{ {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment