Commit ab9fc570 authored by unknown's avatar unknown

BUG#9749 - ndb lock upgrade - more fixes...

1) Make getNoParall into function instead of a procedure
2) Check for multiple transactions in "upgrade's" parallell queue
3) Set lock mode according to lock_owner's lockMode

NOTE: Does still not handle lock upgrade in case of aborts correctly


ndb/src/kernel/blocks/dbacc/Dbacc.hpp:
  Make getNoParall into function instead of a procedure
ndb/src/kernel/blocks/dbacc/DbaccMain.cpp:
  1) Make getNoParall into function instead of a procedure
  2) Check for multiple transactions in "upgrade's" parallell queue
  3) Set lock mode according to lock_owner's lockMode
parent 245211ab
...@@ -1022,7 +1022,7 @@ private: ...@@ -1022,7 +1022,7 @@ private:
Uint32 placeReadInLockQueue(Signal* signal); Uint32 placeReadInLockQueue(Signal* signal);
void placeSerialQueueRead(Signal* signal); void placeSerialQueueRead(Signal* signal);
void checkOnlyReadEntry(Signal* signal); void checkOnlyReadEntry(Signal* signal);
void getNoParallelTransaction(Signal* signal); Uint32 getNoParallelTransaction(const Operationrec*);
void moveLastParallelQueue(Signal* signal); void moveLastParallelQueue(Signal* signal);
void moveLastParallelQueueWrite(Signal* signal); void moveLastParallelQueueWrite(Signal* signal);
Uint32 placeWriteInLockQueue(Signal* signal); Uint32 placeWriteInLockQueue(Signal* signal);
...@@ -1265,7 +1265,6 @@ private: ...@@ -1265,7 +1265,6 @@ private:
OperationrecPtr mlpqOperPtr; OperationrecPtr mlpqOperPtr;
OperationrecPtr queOperPtr; OperationrecPtr queOperPtr;
OperationrecPtr readWriteOpPtr; OperationrecPtr readWriteOpPtr;
OperationrecPtr tgnptMainOpPtr;
Uint32 cfreeopRec; Uint32 cfreeopRec;
Uint32 coprecsize; Uint32 coprecsize;
/* --------------------------------------------------------------------------------- */ /* --------------------------------------------------------------------------------- */
...@@ -1516,7 +1515,6 @@ private: ...@@ -1516,7 +1515,6 @@ private:
Uint32 turlIndex; Uint32 turlIndex;
Uint32 tlfrTmp1; Uint32 tlfrTmp1;
Uint32 tlfrTmp2; Uint32 tlfrTmp2;
Uint32 tgnptNrTransaction;
Uint32 tudqeIndex; Uint32 tudqeIndex;
Uint32 tscanTrid1; Uint32 tscanTrid1;
Uint32 tscanTrid2; Uint32 tscanTrid2;
......
...@@ -1936,9 +1936,7 @@ void Dbacc::insertelementLab(Signal* signal) ...@@ -1936,9 +1936,7 @@ void Dbacc::insertelementLab(Signal* signal)
/* --------------------------------------------------------------------------------- */ /* --------------------------------------------------------------------------------- */
Uint32 Dbacc::placeReadInLockQueue(Signal* signal) Uint32 Dbacc::placeReadInLockQueue(Signal* signal)
{ {
tgnptMainOpPtr = queOperPtr; if (getNoParallelTransaction(queOperPtr.p) == 1) {
getNoParallelTransaction(signal);
if (tgnptNrTransaction == 1) {
if ((queOperPtr.p->transId1 == operationRecPtr.p->transId1) && if ((queOperPtr.p->transId1 == operationRecPtr.p->transId1) &&
(queOperPtr.p->transId2 == operationRecPtr.p->transId2)) { (queOperPtr.p->transId2 == operationRecPtr.p->transId2)) {
/* --------------------------------------------------------------------------------- */ /* --------------------------------------------------------------------------------- */
...@@ -2021,9 +2019,7 @@ void Dbacc::placeSerialQueueRead(Signal* signal) ...@@ -2021,9 +2019,7 @@ void Dbacc::placeSerialQueueRead(Signal* signal)
checkOnlyReadEntry(signal); checkOnlyReadEntry(signal);
return; return;
}//if }//if
tgnptMainOpPtr = readWriteOpPtr; if (getNoParallelTransaction(readWriteOpPtr.p) == 1) {
getNoParallelTransaction(signal);
if (tgnptNrTransaction == 1) {
jam(); jam();
/* --------------------------------------------------------------------------------- */ /* --------------------------------------------------------------------------------- */
/* THERE WAS ONLY ONE TRANSACTION INVOLVED IN THE PARALLEL QUEUE. IF THIS IS OUR */ /* THERE WAS ONLY ONE TRANSACTION INVOLVED IN THE PARALLEL QUEUE. IF THIS IS OUR */
...@@ -2104,24 +2100,23 @@ void Dbacc::checkOnlyReadEntry(Signal* signal) ...@@ -2104,24 +2100,23 @@ void Dbacc::checkOnlyReadEntry(Signal* signal)
/* --------------------------------------------------------------------------------- */ /* --------------------------------------------------------------------------------- */
/* GET_NO_PARALLEL_TRANSACTION */ /* GET_NO_PARALLEL_TRANSACTION */
/* --------------------------------------------------------------------------------- */ /* --------------------------------------------------------------------------------- */
void Dbacc::getNoParallelTransaction(Signal* signal) Uint32
Dbacc::getNoParallelTransaction(const Operationrec * op)
{ {
OperationrecPtr tnptOpPtr; OperationrecPtr tmp;
tgnptNrTransaction = 1; tmp.i= op->nextParallelQue;
tnptOpPtr.i = tgnptMainOpPtr.p->nextParallelQue; Uint32 transId[2] = { op->transId1, op->transId2 };
while ((tnptOpPtr.i != RNIL) && while (tmp.i != RNIL)
(tgnptNrTransaction == 1)) { {
jam(); jam();
ptrCheckGuard(tnptOpPtr, coprecsize, operationrec); ptrCheckGuard(tmp, coprecsize, operationrec);
if ((tnptOpPtr.p->transId1 == tgnptMainOpPtr.p->transId1) && if (tmp.p->transId1 == transId[0] && tmp.p->transId2 == transId[1])
(tnptOpPtr.p->transId2 == tgnptMainOpPtr.p->transId2)) { tmp.i = tmp.p->nextParallelQue;
tnptOpPtr.i = tnptOpPtr.p->nextParallelQue; else
} else { return 2;
jam(); }
tgnptNrTransaction++; return 1;
}//if
}//while
}//Dbacc::getNoParallelTransaction() }//Dbacc::getNoParallelTransaction()
void Dbacc::moveLastParallelQueue(Signal* signal) void Dbacc::moveLastParallelQueue(Signal* signal)
...@@ -2162,9 +2157,7 @@ void Dbacc::moveLastParallelQueueWrite(Signal* signal) ...@@ -2162,9 +2157,7 @@ void Dbacc::moveLastParallelQueueWrite(Signal* signal)
/* --------------------------------------------------------------------------------- */ /* --------------------------------------------------------------------------------- */
Uint32 Dbacc::placeWriteInLockQueue(Signal* signal) Uint32 Dbacc::placeWriteInLockQueue(Signal* signal)
{ {
tgnptMainOpPtr = queOperPtr; if (!((getNoParallelTransaction(queOperPtr.p) == 1) &&
getNoParallelTransaction(signal);
if (!((tgnptNrTransaction == 1) &&
(queOperPtr.p->transId1 == operationRecPtr.p->transId1) && (queOperPtr.p->transId1 == operationRecPtr.p->transId1) &&
(queOperPtr.p->transId2 == operationRecPtr.p->transId2))) { (queOperPtr.p->transId2 == operationRecPtr.p->transId2))) {
jam(); jam();
...@@ -2215,9 +2208,7 @@ void Dbacc::placeSerialQueueWrite(Signal* signal) ...@@ -2215,9 +2208,7 @@ void Dbacc::placeSerialQueueWrite(Signal* signal)
}//if }//if
readWriteOpPtr.i = readWriteOpPtr.p->nextSerialQue; readWriteOpPtr.i = readWriteOpPtr.p->nextSerialQue;
ptrCheckGuard(readWriteOpPtr, coprecsize, operationrec); ptrCheckGuard(readWriteOpPtr, coprecsize, operationrec);
tgnptMainOpPtr = readWriteOpPtr; if (getNoParallelTransaction(readWriteOpPtr.p) == 1) {
getNoParallelTransaction(signal);
if (tgnptNrTransaction == 1) {
/* --------------------------------------------------------------------------------- */ /* --------------------------------------------------------------------------------- */
/* THERE WAS ONLY ONE TRANSACTION INVOLVED IN THE PARALLEL QUEUE. IF THIS IS OUR */ /* THERE WAS ONLY ONE TRANSACTION INVOLVED IN THE PARALLEL QUEUE. IF THIS IS OUR */
/* TRANSACTION WE CAN STILL GET HOLD OF THE LOCK. */ /* TRANSACTION WE CAN STILL GET HOLD OF THE LOCK. */
...@@ -5878,9 +5869,7 @@ Dbacc::check_lock_upgrade(Signal* signal, ...@@ -5878,9 +5869,7 @@ Dbacc::check_lock_upgrade(Signal* signal,
return; return;
} }
tgnptMainOpPtr = lock_owner; if (getNoParallelTransaction(lock_owner.p) > 1)
getNoParallelTransaction(signal);
if (tgnptNrTransaction > 1)
{ {
jam(); jam();
/** /**
...@@ -5888,6 +5877,15 @@ Dbacc::check_lock_upgrade(Signal* signal, ...@@ -5888,6 +5877,15 @@ Dbacc::check_lock_upgrade(Signal* signal,
*/ */
return; return;
} }
if (getNoParallelTransaction(next.p) > 1)
{
jam();
/**
* No lock upgrade if more than 1 transaction in next's parallell queue
*/
return;
}
OperationrecPtr tmp; OperationrecPtr tmp;
tmp.i = lock_owner.p->nextSerialQue = next.p->nextSerialQue; tmp.i = lock_owner.p->nextSerialQue = next.p->nextSerialQue;
...@@ -5901,20 +5899,19 @@ Dbacc::check_lock_upgrade(Signal* signal, ...@@ -5901,20 +5899,19 @@ Dbacc::check_lock_upgrade(Signal* signal,
// Find end of parallell que // Find end of parallell que
tmp = lock_owner; tmp = lock_owner;
tmp.p->lockMode= 1;
while(tmp.p->nextParallelQue != RNIL) while(tmp.p->nextParallelQue != RNIL)
{ {
jam(); jam();
tmp.i = tmp.p->nextParallelQue; tmp.i = tmp.p->nextParallelQue;
ptrCheckGuard(tmp, coprecsize, operationrec); ptrCheckGuard(tmp, coprecsize, operationrec);
tmp.p->lockMode= 1;
} }
next.p->prevParallelQue = tmp.i; next.p->prevParallelQue = tmp.i;
tmp.p->nextParallelQue = next.i; tmp.p->nextParallelQue = next.i;
OperationrecPtr save = operationRecPtr; OperationrecPtr save = operationRecPtr;
Uint32 lockMode = lock_owner.p->lockMode;
Uint32 TelementIsDisappeared = 0; // lock upgrade = all reads Uint32 TelementIsDisappeared = 0; // lock upgrade = all reads
Uint32 ThashValue = lock_owner.p->hashValue; Uint32 ThashValue = lock_owner.p->hashValue;
Uint32 localdata[2]; Uint32 localdata[2];
...@@ -5927,7 +5924,7 @@ Dbacc::check_lock_upgrade(Signal* signal, ...@@ -5927,7 +5924,7 @@ Dbacc::check_lock_upgrade(Signal* signal,
next.p->localdata[1] = localdata[1]; next.p->localdata[1] = localdata[1];
operationRecPtr = next; operationRecPtr = next;
ndbassert(next.p->lockMode); next.p->lockMode = lockMode;
TelementIsDisappeared = executeNextOperation(signal); TelementIsDisappeared = executeNextOperation(signal);
if (next.p->nextParallelQue != RNIL) if (next.p->nextParallelQue != RNIL)
{ {
...@@ -5941,7 +5938,7 @@ Dbacc::check_lock_upgrade(Signal* signal, ...@@ -5941,7 +5938,7 @@ Dbacc::check_lock_upgrade(Signal* signal,
} while (1); } while (1);
operationRecPtr = save; operationRecPtr = save;
} }
/* ------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------- */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment