ndb -

  fix scan bugs introduced by acc modifications
  add more error testcases
parent 6c307b33
......@@ -533,7 +533,6 @@ struct Operationrec {
,OP_STATE_WAITING = 0x00000
,OP_STATE_RUNNING = 0x10000
,OP_STATE_EXECUTED = 0x30000
,OP_STATE_RUNNING_ABORT = 0x20000
,OP_EXECUTED_DIRTY_READ = 0x3050F
,OP_INITIAL = ~(Uint32)0
......
......@@ -1181,6 +1181,7 @@ void Dbacc::execACCKEYREQ(Signal* signal)
void
Dbacc::execACCKEY_ORD(Signal* signal, Uint32 opPtrI)
{
jamEntry();
OperationrecPtr lastOp;
lastOp.i = opPtrI;
ptrCheckGuard(lastOp, coprecsize, operationrec);
......@@ -1200,9 +1201,6 @@ Dbacc::execACCKEY_ORD(Signal* signal, Uint32 opPtrI)
startNext(signal, lastOp);
return;
}
else if (opstate == Operationrec::OP_STATE_RUNNING_ABORT)
{
}
else
{
}
......@@ -1240,15 +1238,14 @@ Dbacc::startNext(Signal* signal, OperationrecPtr lastOp)
{
jam();
ptrCheckGuard(loPtr, coprecsize, operationrec);
nextOp.i = loPtr.p->nextSerialQue;
}
else
{
jam();
nextOp.i = loPtr.i;
loPtr = lastOp;
}
nextOp.i = loPtr.p->nextSerialQue;
ndbassert(loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER);
if (nextOp.i == RNIL)
......@@ -1411,6 +1408,9 @@ Dbacc::startNext(Signal* signal, OperationrecPtr lastOp)
else
{
jam();
fragrecptr.i = nextOp.p->fragptr;
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
sendAcckeyconf(signal);
sendSignal(nextOp.p->userblockref, GSN_ACCKEYCONF,
signal, 6, JBA);
......@@ -1680,8 +1680,7 @@ Dbacc::validate_lock_queue(OperationrecPtr opPtr)
bool running = false;
{
Uint32 opstate = loPtr.p->m_op_bits & Operationrec::OP_STATE_MASK;
if (opstate == Operationrec::OP_STATE_RUNNING ||
opstate == Operationrec::OP_STATE_RUNNING_ABORT)
if (opstate == Operationrec::OP_STATE_RUNNING)
running = true;
else
{
......@@ -1719,8 +1718,7 @@ Dbacc::validate_lock_queue(OperationrecPtr opPtr)
}
else
{
if (opstate == Operationrec::OP_STATE_RUNNING ||
opstate == Operationrec::OP_STATE_RUNNING_ABORT)
if (opstate == Operationrec::OP_STATE_RUNNING)
running = true;
else
vlqrequire(opstate == Operationrec::OP_STATE_EXECUTED);
......@@ -1830,8 +1828,6 @@ operator<<(NdbOut & out, Dbacc::OperationrecPtr ptr)
out << " RUNNING "; break;
case Dbacc::Operationrec::OP_STATE_EXECUTED:
out << " EXECUTED "; break;
case Dbacc::Operationrec::OP_STATE_RUNNING_ABORT:
out << " RUNNIG_ABORT "; break;
case Dbacc::Operationrec::OP_STATE_IDLE:
out << " IDLE "; break;
default:
......@@ -1857,7 +1853,6 @@ operator<<(NdbOut & out, Dbacc::OperationrecPtr ptr)
,OP_STATE_WAITING = 0x0000
,OP_STATE_RUNNING = 0x1000
,OP_STATE_EXECUTED = 0x3000
,OP_STATE_RUNNING_ABORT = 0x2000
};
*/
if (opbits & Dbacc::Operationrec::OP_LOCK_OWNER)
......@@ -3950,6 +3945,7 @@ void Dbacc::checkoverfreelist(Signal* signal)
void
Dbacc::abortParallelQueueOperation(Signal* signal, OperationrecPtr opPtr)
{
jam();
OperationrecPtr nextP;
OperationrecPtr prevP;
OperationrecPtr loPtr;
......@@ -3992,13 +3988,21 @@ Dbacc::abortParallelQueueOperation(Signal* signal, OperationrecPtr opPtr)
else
{
jam();
/**
* P0 - P1
*
* Abort P1, check start next
*/
ndbassert(prevP.p->m_op_bits & Operationrec::OP_LOCK_OWNER);
prevP.p->m_lo_last_parallel_op_ptr_i = RNIL;
startNext(signal, prevP);
validate_lock_queue(prevP);
return;
}
/**
* Abort P1/P2
*/
if (opbits & Operationrec::OP_LOCK_MODE)
{
Uint32 nextbits = nextP.p->m_op_bits;
......@@ -4024,12 +4028,23 @@ Dbacc::abortParallelQueueOperation(Signal* signal, OperationrecPtr opPtr)
/**
* Abort P1, P2
*/
if (opstate == Operationrec::OP_STATE_RUNNING)
{
jam();
startNext(signal, prevP);
validate_lock_queue(prevP);
return;
}
ndbassert(opstate == Operationrec::OP_STATE_EXECUTED ||
opstate == Operationrec::OP_STATE_WAITING);
/**
* Scan to last of run queue
*/
while (nextP.p->nextParallelQue != RNIL)
{
jam();
nextP.i = nextP.p->nextParallelQue;
ptrCheckGuard(nextP, coprecsize, operationrec);
}
......@@ -4049,6 +4064,7 @@ Dbacc::abortParallelQueueOperation(Signal* signal, OperationrecPtr opPtr)
void
Dbacc::abortSerieQueueOperation(Signal* signal, OperationrecPtr opPtr)
{
jam();
OperationrecPtr prevS, nextS;
OperationrecPtr prevP, nextP;
OperationrecPtr loPtr;
......@@ -4620,7 +4636,25 @@ Dbacc::release_lockowner(Signal* signal, OperationrecPtr opPtr, bool commit)
* Aborting an operation can *always* lead to lock upgrade
*/
action = CHECK_LOCK_UPGRADE;
Uint32 opstate = opbits & Operationrec::OP_STATE_MASK;
if (opstate != Operationrec::OP_STATE_EXECUTED)
{
ndbassert(opstate == Operationrec::OP_STATE_RUNNING);
if (opbits & Operationrec::OP_ELEMENT_DISAPPEARED)
{
jam();
report_dealloc(signal, opPtr.p);
newOwner.p->localdata[0] = ~0;
}
else
{
jam();
newOwner.p->localdata[0] = opPtr.p->localdata[0];
newOwner.p->localdata[1] = opPtr.p->localdata[1];
}
action = START_NEW;
}
/**
* Update ACC_LOCK_MODE
*/
......
......@@ -2604,11 +2604,6 @@ void Dblqh::execTUPKEYREF(Signal* signal)
ndbassert(regTcPtr->transactionState == TcConnectionrec::WAIT_TUP ||
regTcPtr->transactionState ==TcConnectionrec::WAIT_TUP_TO_ABORT);
}
else if (getNodeState().startLevel == NodeState::SL_STARTED)
{
if (terrorCode == 899)
ndbout << "899: " << regTcPtr->m_row_id << endl;
}
switch (tcConnectptr.p->transactionState) {
case TcConnectionrec::WAIT_TUP:
......@@ -9095,6 +9090,7 @@ void Dblqh::scanTupkeyConfLab(Signal* signal)
if (accOpPtr != (Uint32)-1)
{
c_acc->execACCKEY_ORD(signal, accOpPtr);
jamEntry();
}
else
{
......@@ -9419,7 +9415,7 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
const Uint32 readCommitted = ScanFragReq::getReadCommittedFlag(reqinfo);
const Uint32 rangeScan = ScanFragReq::getRangeScanFlag(reqinfo);
const Uint32 descending = ScanFragReq::getDescendingFlag(reqinfo);
const Uint32 tupScan = ScanFragReq::getTupScanFlag(reqinfo);
Uint32 tupScan = ScanFragReq::getTupScanFlag(reqinfo);
const Uint32 attrLen = ScanFragReq::getAttrLen(reqinfo);
const Uint32 scanPrio = ScanFragReq::getScanPrio(reqinfo);
......@@ -9458,7 +9454,7 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
scanptr.p->scanApiOpPtr = scanFragReq->clientOpPtr;
scanptr.p->m_last_row = 0;
scanptr.p->scanStoredProcId = RNIL;
scanptr.p->copyPtr = RNIL;
if (max_rows == 0 || (max_bytes > 0 && max_rows > max_bytes)){
jam();
return ScanFragRef::ZWRONG_BATCH_SIZE;
......@@ -9479,8 +9475,10 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
* !idx uses 1 - (MAX_PARALLEL_SCANS_PER_FRAG - 1) = 1-11
* idx uses from MAX_PARALLEL_SCANS_PER_FRAG - MAX = 12-42)
*/
Uint32 start = (rangeScan || tupScan ? MAX_PARALLEL_SCANS_PER_FRAG : 1 );
Uint32 stop = (rangeScan || tupScan ? MAX_PARALLEL_INDEX_SCANS_PER_FRAG : MAX_PARALLEL_SCANS_PER_FRAG - 1);
tupScan = 0; // Make sure that close tup scan does not start acc scan incorrectly
Uint32 start = (rangeScan || tupScan) ? MAX_PARALLEL_SCANS_PER_FRAG : 1 ;
Uint32 stop = (rangeScan || tupScan) ? MAX_PARALLEL_INDEX_SCANS_PER_FRAG :
MAX_PARALLEL_SCANS_PER_FRAG - 1;
stop += start;
Uint32 free = tFragPtr.p->m_scanNumberMask.find(start);
......
......@@ -156,7 +156,7 @@ Dbtup::execNEXT_SCANREQ(Signal* signal)
conf->scanPtr = scan.m_userPtr;
unsigned signalLength = 1;
sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
signal, signalLength, JBB);
signal, signalLength, JBB);
return;
}
break;
......@@ -171,7 +171,7 @@ Dbtup::execNEXT_SCANREQ(Signal* signal)
lockReq->requestInfo = AccLockReq::AbortWithConf;
lockReq->accOpPtr = scan.m_accLockOp;
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ,
signal, AccLockReq::UndoSignalLength);
signal, AccLockReq::UndoSignalLength);
jamEntry();
ndbrequire(lockReq->returnCode == AccLockReq::Success);
scan.m_state = ScanOp::Aborting;
......@@ -182,10 +182,10 @@ Dbtup::execNEXT_SCANREQ(Signal* signal)
ndbrequire(scan.m_accLockOp != RNIL);
AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
lockReq->returnCode = RNIL;
lockReq->requestInfo = AccLockReq::Unlock;
lockReq->requestInfo = AccLockReq::Abort;
lockReq->accOpPtr = scan.m_accLockOp;
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ,
signal, AccLockReq::UndoSignalLength);
signal, AccLockReq::UndoSignalLength);
jamEntry();
ndbrequire(lockReq->returnCode == AccLockReq::Success);
scan.m_accLockOp = RNIL;
......@@ -433,7 +433,7 @@ Dbtup::execACCKEYCONF(Signal* signal)
jam();
AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
lockReq->returnCode = RNIL;
lockReq->requestInfo = AccLockReq::Unlock;
lockReq->requestInfo = AccLockReq::Abort;
lockReq->accOpPtr = scan.m_accLockOp;
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
jamEntry();
......
......@@ -321,7 +321,7 @@ Dbtux::execNEXT_SCANREQ(Signal* signal)
conf->scanPtr = scan.m_userPtr;
unsigned signalLength = 1;
sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
signal, signalLength, JBB);
signal, signalLength, JBB);
return;
}
break;
......@@ -344,7 +344,8 @@ Dbtux::execNEXT_SCANREQ(Signal* signal)
lockReq->returnCode = RNIL;
lockReq->requestInfo = AccLockReq::AbortWithConf;
lockReq->accOpPtr = scan.m_accLockOp;
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal,
AccLockReq::UndoSignalLength);
jamEntry();
ndbrequire(lockReq->returnCode == AccLockReq::Success);
scan.m_state = ScanOp::Aborting;
......@@ -355,9 +356,10 @@ Dbtux::execNEXT_SCANREQ(Signal* signal)
ndbrequire(scan.m_accLockOp != RNIL);
AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
lockReq->returnCode = RNIL;
lockReq->requestInfo = AccLockReq::Unlock;
lockReq->requestInfo = AccLockReq::Abort;
lockReq->accOpPtr = scan.m_accLockOp;
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal,
AccLockReq::UndoSignalLength);
jamEntry();
ndbrequire(lockReq->returnCode == AccLockReq::Success);
scan.m_accLockOp = RNIL;
......@@ -612,7 +614,7 @@ Dbtux::execACCKEYCONF(Signal* signal)
jam();
AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
lockReq->returnCode = RNIL;
lockReq->requestInfo = AccLockReq::Unlock;
lockReq->requestInfo = AccLockReq::Abort;
lockReq->accOpPtr = scan.m_accLockOp;
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
jamEntry();
......
......@@ -108,6 +108,8 @@ public:
NDBT_ResultRow& get_row(Uint32 idx) { return *rows[idx];}
int execute_async(Ndb*, NdbTransaction::ExecType, NdbTransaction::AbortOption = NdbTransaction::AbortOnError);
int execute_async_prepare(Ndb*, NdbTransaction::ExecType, NdbTransaction::AbortOption = NdbTransaction::AbortOnError);
int wait_async(Ndb*, int timeout = -1);
protected:
......
......@@ -1090,11 +1090,6 @@ runMassiveRollback4(NDBT_Context* ctx, NDBT_Step* step){
ok = false;
break;
}
if (hugoOps.execute_NoCommit(pNdb) != 0)
{
ok = false;
break;
}
}
hugoOps.execute_Rollback(pNdb);
CHECK(hugoOps.closeTransaction(pNdb) == 0);
......@@ -1199,6 +1194,61 @@ runTupErrors(NDBT_Context* ctx, NDBT_Step* step){
return NDBT_OK;
}
int
runInsertError(NDBT_Context* ctx, NDBT_Step* step){
int result = NDBT_OK;
HugoOperations hugoOp1(*ctx->getTab());
HugoOperations hugoOp2(*ctx->getTab());
Ndb* pNdb = GETNDB(step);
NdbRestarter restarter;
restarter.insertErrorInAllNodes(4017);
const Uint32 LOOPS = 10;
for (Uint32 i = 0; i<LOOPS; i++)
{
CHECK(hugoOp1.startTransaction(pNdb) == 0);
CHECK(hugoOp1.pkInsertRecord(pNdb, 1) == 0);
CHECK(hugoOp2.startTransaction(pNdb) == 0);
CHECK(hugoOp2.pkReadRecord(pNdb, 1, 1) == 0);
CHECK(hugoOp1.execute_async_prepare(pNdb, NdbTransaction::Commit) == 0);
CHECK(hugoOp2.execute_async_prepare(pNdb, NdbTransaction::Commit) == 0);
hugoOp1.wait_async(pNdb);
hugoOp2.wait_async(pNdb);
CHECK(hugoOp1.closeTransaction(pNdb) == 0);
CHECK(hugoOp2.closeTransaction(pNdb) == 0);
}
restarter.insertErrorInAllNodes(0);
return result;
}
int
runInsertError2(NDBT_Context* ctx, NDBT_Step* step){
int result = NDBT_OK;
HugoOperations hugoOp1(*ctx->getTab());
Ndb* pNdb = GETNDB(step);
NdbRestarter restarter;
restarter.insertErrorInAllNodes(4017);
const Uint32 LOOPS = 1;
for (Uint32 i = 0; i<LOOPS; i++)
{
CHECK(hugoOp1.startTransaction(pNdb) == 0);
CHECK(hugoOp1.pkInsertRecord(pNdb, 1) == 0);
CHECK(hugoOp1.pkDeleteRecord(pNdb, 1) == 0);
CHECK(hugoOp1.execute_NoCommit(pNdb) == 0);
CHECK(hugoOp1.closeTransaction(pNdb) == 0);
}
restarter.insertErrorInAllNodes(0);
}
NDBT_TESTSUITE(testBasic);
TESTCASE("PkInsert",
"Verify that we can insert and delete from this table using PK"
......@@ -1449,16 +1499,16 @@ TESTCASE("MassiveTransaction",
INITIALIZER(runLoadTable2);
FINALIZER(runClearTable2);
}
TESTCASE("Fill",
"Verify what happens when we fill the db" ){
INITIALIZER(runFillTable);
INITIALIZER(runPkRead);
FINALIZER(runClearTable2);
}
TESTCASE("TupError",
"Verify what happens when we fill the db" ){
INITIALIZER(runTupErrors);
}
TESTCASE("InsertError", "" ){
INITIALIZER(runInsertError);
}
TESTCASE("InsertError2", "" ){
INITIALIZER(runInsertError2);
}
NDBT_TESTSUITE_END(testBasic);
#if 0
......@@ -1469,6 +1519,12 @@ TESTCASE("ReadConsistency",
STEP(runReadOne);
FINALIZER(runClearTable2);
}
TESTCASE("Fill",
"Verify what happens when we fill the db" ){
INITIALIZER(runFillTable);
INITIALIZER(runPkRead);
FINALIZER(runClearTable2);
}
#endif
int main(int argc, const char** argv){
......
......@@ -286,15 +286,26 @@ int runRandScanRead(NDBT_Context* ctx, NDBT_Step* step){
int records = ctx->getNumRecords();
int parallelism = ctx->getProperty("Parallelism", 240);
int abort = ctx->getProperty("AbortProb", 5);
int tupscan = ctx->getProperty("TupScan", (Uint32)0);
int i = 0;
HugoTransactions hugoTrans(*ctx->getTab());
while (i<loops && !ctx->isTestStopped()) {
g_info << i << ": ";
NdbOperation::LockMode lm = (NdbOperation::LockMode)(rand() % 3);
int scan_flags = 0;
if (tupscan == 1)
scan_flags |= NdbScanOperation::SF_TupScan;
else if (tupscan == 2 && ((rand() & 0x800)))
{
scan_flags |= NdbScanOperation::SF_TupScan;
}
if (hugoTrans.scanReadRecords(GETNDB(step),
records, abort, parallelism,
lm) != 0){
lm,
scan_flags) != 0){
return NDBT_FAILED;
}
i++;
......@@ -1320,6 +1331,16 @@ TESTCASE("ScanRead488",
STEPS(runRandScanRead, 70);
FINALIZER(runClearTable);
}
TESTCASE("ScanRead488T",
"Verify scan requirement: It's only possible to have 11 concurrent "\
"scans per fragment running in Ndb kernel at the same time. "\
"When this limit is exceeded the scan will be aborted with errorcode "\
"488."){
TC_PROPERTY("TupScan", 1);
INITIALIZER(runLoadTable);
STEPS(runRandScanRead, 70);
FINALIZER(runClearTable);
}
TESTCASE("ScanRead488O",
"Verify scan requirement: It's only possible to have 11 concurrent "\
"scans per fragment running in Ndb kernel at the same time. "\
......@@ -1336,6 +1357,7 @@ TESTCASE("ScanRead488_Mixed",
"scans per fragment running in Ndb kernel at the same time. "\
"When this limit is exceeded the scan will be aborted with errorcode "\
"488."){
TC_PROPERTY("TupScan", 2);
INITIALIZER(createOrderedPkIndex);
INITIALIZER(runLoadTable);
STEPS(runRandScanRead, 50);
......
......@@ -219,6 +219,14 @@ max-time: 500
cmd: testBasic
args: -n TupError
max-time: 500
cmd: testBasic
args: -n InsertError T1
max-time: 500
cmd: testBasic
args: -n InsertError2 T1
max-time: 500
cmd: testTimeout
args: T1
......@@ -273,6 +281,10 @@ max-time: 500
cmd: testScan
args: -n ScanRead488O -l 10 T6 D1 D2
max-time: 1000
cmd: testScan
args: -n ScanRead488T -l 10 T6 D1 D2
max-time: 1000
cmd: testScan
args: -n ScanRead488_Mixed -l 10 T6 D1 D2
......
......@@ -471,16 +471,33 @@ HugoOperations::execute_async(Ndb* pNdb, NdbTransaction::ExecType et,
return NDBT_OK;
}
int
HugoOperations::execute_async_prepare(Ndb* pNdb, NdbTransaction::ExecType et,
NdbTransaction::AbortOption eao){
m_async_reply= 0;
pTrans->executeAsynchPrepare(et,
HugoOperations_async_callback,
this,
eao);
return NDBT_OK;
}
int
HugoOperations::wait_async(Ndb* pNdb, int timeout)
{
pNdb->pollNdb(1000);
if(m_async_reply)
volatile int * wait = &m_async_reply;
while (!* wait)
{
if(m_async_return)
ndbout << "ERROR: " << pNdb->getNdbError(m_async_return) << endl;
return m_async_return;
pNdb->sendPollNdb(1000);
if(* wait)
{
if(m_async_return)
ndbout << "ERROR: " << pNdb->getNdbError(m_async_return) << endl;
return m_async_return;
}
}
ndbout_c("wait returned nothing...");
return -1;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment