diff --git a/ndb/src/kernel/blocks/dbacc/Dbacc.hpp b/ndb/src/kernel/blocks/dbacc/Dbacc.hpp index 64b947b5462743b08c30eaaf9de1a43213ea8988..aaa4aca7b003598b72d01a2c0d7165cf463b2004 100644 --- a/ndb/src/kernel/blocks/dbacc/Dbacc.hpp +++ b/ndb/src/kernel/blocks/dbacc/Dbacc.hpp @@ -1100,6 +1100,8 @@ private: Uint32 executeNextOperation(Signal* signal); void releaselock(Signal* signal); void takeOutFragWaitQue(Signal* signal); + void check_lock_upgrade(Signal* signal, OperationrecPtr lock_owner, + OperationrecPtr release_op); void allocOverflowPage(Signal* signal); bool getrootfragmentrec(Signal* signal, RootfragmentrecPtr&, Uint32 fragId); void insertLockOwnersList(Signal* signal, const OperationrecPtr&); diff --git a/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp b/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp index 17c5a31cbedf05d2d79fbc16e93196c9d2a56cd9..28956de198cffbc81736ae338f8c7a27ece5c65e 100644 --- a/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp +++ b/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp @@ -5802,9 +5802,148 @@ void Dbacc::commitOperation(Signal* signal) ptrCheckGuard(tolqTmpPtr, coprecsize, operationrec); tolqTmpPtr.p->prevParallelQue = operationRecPtr.p->prevParallelQue; }//if - }//if + + /** + * Check possible lock upgrade + * 1) Find lock owner + * 2) Count transactions in parallel que + * 3) If count == 1 and TRANSID(next serial) == TRANSID(lock owner) + * upgrade next serial + */ + if(operationRecPtr.p->lockMode) + { + jam(); + /** + * Committing a non shared operation can't lead to lock upgrade + */ + return; + } + + OperationrecPtr lock_owner; + lock_owner.i = operationRecPtr.p->prevParallelQue; + ptrCheckGuard(lock_owner, coprecsize, operationrec); + Uint32 transid[2] = { lock_owner.p->transId1, + lock_owner.p->transId2 }; + + + while(lock_owner.p->prevParallelQue != RNIL) + { + lock_owner.i = lock_owner.p->prevParallelQue; + ptrCheckGuard(lock_owner, coprecsize, operationrec); + + if(lock_owner.p->transId1 != transid[0] || + lock_owner.p->transId2 != transid[1]) + { + jam(); + /** + * If more than 1 trans in lock queue -> no lock upgrade + */ + return; + } + } + + check_lock_upgrade(signal, lock_owner, operationRecPtr); + } }//Dbacc::commitOperation() +void +Dbacc::check_lock_upgrade(Signal* signal, + OperationrecPtr lock_owner, + OperationrecPtr release_op) +{ + if((lock_owner.p->transId1 == release_op.p->transId1 && + lock_owner.p->transId2 == release_op.p->transId2) || + release_op.p->lockMode || + lock_owner.p->nextSerialQue == RNIL) + { + jam(); + /** + * No lock upgrade if same trans or lock owner has no serial queue + * or releasing non shared op + */ + return; + } + + OperationrecPtr next; + next.i = lock_owner.p->nextSerialQue; + ptrCheckGuard(next, coprecsize, operationrec); + + if(lock_owner.p->transId1 != next.p->transId1 || + lock_owner.p->transId2 != next.p->transId2) + { + jam(); + /** + * No lock upgrad if !same trans in serial queue + */ + return; + } + + tgnptMainOpPtr = lock_owner; + getNoParallelTransaction(signal); + if (tgnptNrTransaction > 1) + { + jam(); + /** + * No lock upgrade if more than 1 transaction in parallell queue + */ + return; + } + + OperationrecPtr tmp; + tmp.i = lock_owner.p->nextSerialQue = next.p->nextSerialQue; + if(tmp.i != RNIL) + { + ptrCheckGuard(tmp, coprecsize, operationrec); + ndbassert(tmp.p->prevSerialQue == next.i); + tmp.p->prevSerialQue = lock_owner.i; + } + next.p->nextSerialQue = next.p->prevSerialQue = RNIL; + + // Find end of parallell que + tmp = lock_owner; + tmp.p->lockMode= 1; + while(tmp.p->nextParallelQue != RNIL) + { + jam(); + tmp.i = tmp.p->nextParallelQue; + ptrCheckGuard(tmp, coprecsize, operationrec); + tmp.p->lockMode= 1; + } + + next.p->prevParallelQue = tmp.i; + tmp.p->nextParallelQue = next.i; + + OperationrecPtr save = operationRecPtr; + + Uint32 TelementIsDisappeared = 0; // lock upgrade = all reads + Uint32 ThashValue = lock_owner.p->hashValue; + Uint32 localdata[2]; + localdata[0] = lock_owner.p->localdata[0]; + localdata[1] = lock_owner.p->localdata[1]; + do { + next.p->elementIsDisappeared = TelementIsDisappeared; + next.p->hashValue = ThashValue; + next.p->localdata[0] = localdata[0]; + next.p->localdata[1] = localdata[1]; + + operationRecPtr = next; + ndbassert(next.p->lockMode); + TelementIsDisappeared = executeNextOperation(signal); + if (next.p->nextParallelQue != RNIL) + { + jam(); + next.i = next.p->nextParallelQue; + ptrCheckGuard(next, coprecsize, operationrec); + } else { + jam(); + break; + }//if + } while (1); + + operationRecPtr = save; + +} + /* ------------------------------------------------------------------------- */ /* RELEASELOCK */ /* RESETS LOCK OF AN ELEMENT. */ @@ -5841,6 +5980,8 @@ void Dbacc::releaselock(Signal* signal) ptrCheckGuard(trlTmpOperPtr, coprecsize, operationrec); trlTmpOperPtr.p->prevSerialQue = trlOperPtr.i; }//if + + check_lock_upgrade(signal, copyInOperPtr, operationRecPtr); /* --------------------------------------------------------------------------------- */ /* SINCE THERE ARE STILL ITEMS IN THE PARALLEL QUEUE WE NEED NOT WORRY ABOUT */ /* STARTING QUEUED OPERATIONS. THUS WE CAN END HERE. */ diff --git a/ndb/test/ndbapi/testOperations.cpp b/ndb/test/ndbapi/testOperations.cpp index 9f1d5ee1191451acceb3f994e147857cff079c1d..773511a04759266024354a72f5fdab6cd1491a60 100644 --- a/ndb/test/ndbapi/testOperations.cpp +++ b/ndb/test/ndbapi/testOperations.cpp @@ -547,21 +547,64 @@ runLockUpgrade1(NDBT_Context* ctx, NDBT_Step* step){ do { CHECK(hugoOps.startTransaction(pNdb) == 0); - CHECK(hugoOps.pkReadRecord(pNdb, 0, 1, NdbOperation::LM_Read) == 0); - CHECK(hugoOps.execute_NoCommit(pNdb) == 0); + if(ctx->getProperty("LOCK_UPGRADE", 1) == 1) + { + CHECK(hugoOps.pkReadRecord(pNdb, 0, 1, NdbOperation::LM_Read) == 0); + CHECK(hugoOps.execute_NoCommit(pNdb) == 0); - ctx->setProperty("READ_DONE", 1); - ctx->broadcast(); - ndbout_c("wait 2"); - ctx->getPropertyWait("READ_DONE", 2); - ndbout_c("wait 2 - done"); + ctx->setProperty("READ_DONE", 1); + ctx->broadcast(); + ndbout_c("wait 2"); + ctx->getPropertyWait("READ_DONE", 2); + ndbout_c("wait 2 - done"); + } + else + { + ctx->setProperty("READ_DONE", 1); + ctx->broadcast(); + ctx->getPropertyWait("READ_DONE", 2); + ndbout_c("wait 2 - done"); + CHECK(hugoOps.pkReadRecord(pNdb, 0, 1, NdbOperation::LM_Read) == 0); + CHECK(hugoOps.execute_NoCommit(pNdb) == 0); + } + if(ctx->getProperty("LU_OP", o_INS) == o_INS) + { + CHECK(hugoOps.pkDeleteRecord(pNdb, 0, 1) == 0); + CHECK(hugoOps.pkInsertRecord(pNdb, 0, 1, 2) == 0); + } + else if(ctx->getProperty("LU_OP", o_UPD) == o_UPD) + { + CHECK(hugoOps.pkUpdateRecord(pNdb, 0, 1, 2) == 0); + } + else + { + CHECK(hugoOps.pkDeleteRecord(pNdb, 0, 1) == 0); + } ctx->setProperty("READ_DONE", 3); ctx->broadcast(); ndbout_c("before update"); - CHECK(hugoOps.pkUpdateRecord(pNdb, 0, 1, 2) == 0); ndbout_c("wait update"); - CHECK(hugoOps.execute_NoCommit(pNdb) == 0); - CHECK(hugoOps.closeTransaction(pNdb)); + CHECK(hugoOps.execute_Commit(pNdb) == 0); + CHECK(hugoOps.closeTransaction(pNdb) == 0); + + CHECK(hugoOps.startTransaction(pNdb) == 0); + CHECK(hugoOps.pkReadRecord(pNdb, 0, 1) == 0); + int res= hugoOps.execute_Commit(pNdb); + if(ctx->getProperty("LU_OP", o_INS) == o_INS) + { + CHECK(res == 0); + CHECK(hugoOps.verifyUpdatesValue(2) == 0); + } + else if(ctx->getProperty("LU_OP", o_UPD) == o_UPD) + { + CHECK(res == 0); + CHECK(hugoOps.verifyUpdatesValue(2) == 0); + } + else + { + CHECK(res == 626); + } + } while(0); return result; @@ -592,10 +635,10 @@ runLockUpgrade2(NDBT_Context* ctx, NDBT_Step* step){ ndbout_c("wait 3 - done"); NdbSleep_MilliSleep(200); - CHECK(hugoOps.execute_Commit(pNdb) == 0); + CHECK(hugoOps.execute_Commit(pNdb) == 0); } while(0); - return NDBT_FAILED; + return result; } int @@ -607,11 +650,16 @@ main(int argc, const char** argv){ NDBT_TestSuite ts("testOperations"); + for(Uint32 i = 0; i <6; i++) { BaseString name("bug_9749"); + name.appfmt("_%d", i); NDBT_TestCaseImpl1 *pt = new NDBT_TestCaseImpl1(&ts, name.c_str(), ""); + pt->setProperty("LOCK_UPGRADE", 1 + (i & 1)); + pt->setProperty("LU_OP", 1 + (i >> 1)); + pt->addInitializer(new NDBT_Initializer(pt, "runClearTable", runClearTable));