Commit 9bbafed2 authored by unknown's avatar unknown

Merge perch.ndb.mysql.com:/home/jonas/src/50-work

into  perch.ndb.mysql.com:/home/jonas/src/51-work


storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp:
  Auto merged
storage/ndb/include/ndbapi/NdbTransaction.hpp:
  merge
storage/ndb/src/ndbapi/NdbScanOperation.cpp:
  merge
storage/ndb/src/ndbapi/NdbTransaction.cpp:
  merge
parents 410b81c2 6032e445
......@@ -658,8 +658,11 @@ private:
// Release all cursor operations in connection
void releaseOps(NdbOperation*);
void releaseScanOperations(NdbIndexScanOperation*);
void releaseScanOperation(NdbIndexScanOperation*);
bool releaseScanOperation(NdbIndexScanOperation** listhead,
NdbIndexScanOperation** listtail,
NdbIndexScanOperation* op);
void releaseExecutedScanOperation(NdbIndexScanOperation*);
// Set the transaction identity of the transaction
void setTransactionId(Uint64 aTransactionId);
......
......@@ -10015,73 +10015,84 @@ void Dbdih::startNextChkpt(Signal* signal)
nodePtr.i = replicaPtr.p->procNode;
ptrCheckGuard(nodePtr, MAX_NDB_NODES, nodeRecord);
if (replicaPtr.p->lcpOngoingFlag &&
replicaPtr.p->lcpIdStarted < lcpId) {
jam();
//-------------------------------------------------------------------
// We have found a replica on a node that performs local checkpoint
// that is alive and that have not yet been started.
//-------------------------------------------------------------------
if (nodePtr.p->noOfStartedChkpt < 2) {
jam();
/**
* Send LCP_FRAG_ORD to LQH
*/
/**
* Mark the replica so with lcpIdStarted == true
*/
replicaPtr.p->lcpIdStarted = lcpId;
Uint32 i = nodePtr.p->noOfStartedChkpt;
nodePtr.p->startedChkpt[i].tableId = tabPtr.i;
nodePtr.p->startedChkpt[i].fragId = curr.fragmentId;
nodePtr.p->startedChkpt[i].replicaPtr = replicaPtr.i;
nodePtr.p->noOfStartedChkpt = i + 1;
sendLCP_FRAG_ORD(signal, nodePtr.p->startedChkpt[i]);
} else if (nodePtr.p->noOfQueuedChkpt < 2) {
jam();
/**
* Put LCP_FRAG_ORD "in queue"
*/
/**
* Mark the replica so with lcpIdStarted == true
*/
replicaPtr.p->lcpIdStarted = lcpId;
if (c_lcpState.m_participatingLQH.get(nodePtr.i))
{
if (replicaPtr.p->lcpOngoingFlag &&
replicaPtr.p->lcpIdStarted < lcpId)
{
jam();
//-------------------------------------------------------------------
// We have found a replica on a node that performs local checkpoint
// that is alive and that have not yet been started.
//-------------------------------------------------------------------
Uint32 i = nodePtr.p->noOfQueuedChkpt;
nodePtr.p->queuedChkpt[i].tableId = tabPtr.i;
nodePtr.p->queuedChkpt[i].fragId = curr.fragmentId;
nodePtr.p->queuedChkpt[i].replicaPtr = replicaPtr.i;
nodePtr.p->noOfQueuedChkpt = i + 1;
} else {
jam();
if (nodePtr.p->noOfStartedChkpt < 2)
{
jam();
/**
* Send LCP_FRAG_ORD to LQH
*/
/**
* Mark the replica so with lcpIdStarted == true
*/
replicaPtr.p->lcpIdStarted = lcpId;
if(save){
Uint32 i = nodePtr.p->noOfStartedChkpt;
nodePtr.p->startedChkpt[i].tableId = tabPtr.i;
nodePtr.p->startedChkpt[i].fragId = curr.fragmentId;
nodePtr.p->startedChkpt[i].replicaPtr = replicaPtr.i;
nodePtr.p->noOfStartedChkpt = i + 1;
sendLCP_FRAG_ORD(signal, nodePtr.p->startedChkpt[i]);
}
else if (nodePtr.p->noOfQueuedChkpt < 2)
{
jam();
/**
* Stop increasing value on first that was "full"
* Put LCP_FRAG_ORD "in queue"
*/
c_lcpState.currentFragment = curr;
save = false;
}
busyNodes.set(nodePtr.i);
if(busyNodes.count() == lcpNodes){
/**
* There were no possibility to start the local checkpoint
* and it was not possible to queue it up. In this case we
* stop the start of local checkpoints until the nodes with a
* backlog have performed more checkpoints. We will return and
* will not continue the process of starting any more checkpoints.
* Mark the replica so with lcpIdStarted == true
*/
return;
replicaPtr.p->lcpIdStarted = lcpId;
Uint32 i = nodePtr.p->noOfQueuedChkpt;
nodePtr.p->queuedChkpt[i].tableId = tabPtr.i;
nodePtr.p->queuedChkpt[i].fragId = curr.fragmentId;
nodePtr.p->queuedChkpt[i].replicaPtr = replicaPtr.i;
nodePtr.p->noOfQueuedChkpt = i + 1;
}
else
{
jam();
if(save)
{
/**
* Stop increasing value on first that was "full"
*/
c_lcpState.currentFragment = curr;
save = false;
}
busyNodes.set(nodePtr.i);
if(busyNodes.count() == lcpNodes)
{
/**
* There were no possibility to start the local checkpoint
* and it was not possible to queue it up. In this case we
* stop the start of local checkpoints until the nodes with a
* backlog have performed more checkpoints. We will return and
* will not continue the process of starting any more checkpoints.
*/
return;
}//if
}//if
}//if
}
}//while
}
}//while
}
curr.fragmentId++;
if (curr.fragmentId >= tabPtr.p->totalfragments) {
jam();
......
......@@ -694,9 +694,27 @@ void NdbScanOperation::close(bool forceSend, bool releaseOp)
theNdbCon = NULL;
m_transConnection = NULL;
if (releaseOp && tTransCon) {
if (tTransCon)
{
NdbIndexScanOperation* tOp = (NdbIndexScanOperation*)this;
tTransCon->releaseScanOperation(tOp);
bool ret = true;
if (theStatus != WaitResponse)
{
/**
* Not executed yet
*/
ret =
tTransCon->releaseScanOperation(&tTransCon->m_theFirstScanOperation,
&tTransCon->m_theLastScanOperation,
tOp);
}
else if (releaseOp)
{
ret = tTransCon->releaseScanOperation(&tTransCon->m_firstExecutedScanOp,
0, tOp);
}
assert(ret);
}
tCon->theScanningOp = 0;
......
......@@ -979,50 +979,59 @@ Remark: Release scan op when hupp'ed trans closed (save memory)
void
NdbTransaction::releaseScanOperation(NdbIndexScanOperation* cursorOp)
{
DBUG_ENTER("NdbTransaction::releaseScanOperation");
DBUG_ENTER("NdbTransaction::releaseExecutedScanOperation");
DBUG_PRINT("enter", ("this=0x%x op=0x%x", (UintPtr)this, (UintPtr)cursorOp));
releaseScanOperation(&m_firstExecutedScanOp, 0, cursorOp);
DBUG_VOID_RETURN;
}//NdbTransaction::releaseExecutedScanOperation()
// here is one reason to make op lists doubly linked
if (cursorOp->m_executed)
bool
NdbTransaction::releaseScanOperation(NdbIndexScanOperation** listhead,
NdbIndexScanOperation** listtail,
NdbIndexScanOperation* op)
{
if (* listhead == op)
{
if (m_firstExecutedScanOp == cursorOp) {
m_firstExecutedScanOp = (NdbIndexScanOperation*)cursorOp->theNext;
cursorOp->release();
theNdb->releaseScanOperation(cursorOp);
} else if (m_firstExecutedScanOp != NULL) {
NdbIndexScanOperation* tOp = m_firstExecutedScanOp;
while (tOp->theNext != NULL) {
if (tOp->theNext == cursorOp) {
tOp->theNext = cursorOp->theNext;
cursorOp->release();
theNdb->releaseScanOperation(cursorOp);
break;
}
tOp = (NdbIndexScanOperation*)tOp->theNext;
}
* listhead = (NdbIndexScanOperation*)op->theNext;
if (listtail && *listtail == op)
{
assert(* listhead == 0);
* listtail = 0;
}
}
else
{
if (m_theFirstScanOperation == cursorOp) {
m_theFirstScanOperation = (NdbIndexScanOperation*)cursorOp->theNext;
cursorOp->release();
theNdb->releaseScanOperation(cursorOp);
} else if (m_theFirstScanOperation != NULL) {
NdbIndexScanOperation* tOp = m_theFirstScanOperation;
while (tOp->theNext != NULL) {
if (tOp->theNext == cursorOp) {
tOp->theNext = cursorOp->theNext;
cursorOp->release();
theNdb->releaseScanOperation(cursorOp);
break;
}
tOp = (NdbIndexScanOperation*)tOp->theNext;
NdbIndexScanOperation* tmp = * listhead;
while (tmp != NULL)
{
if (tmp->theNext == op)
{
tmp->theNext = (NdbIndexScanOperation*)op->theNext;
if (listtail && *listtail == op)
{
assert(op->theNext == 0);
*listtail = tmp;
}
break;
}
tmp = (NdbIndexScanOperation*)tmp->theNext;
}
if (tmp == NULL)
op = NULL;
}
DBUG_VOID_RETURN;
}//NdbTransaction::releaseScanOperation()
if (op != NULL)
{
op->release();
theNdb->releaseScanOperation(op);
return true;
}
return false;
}
/*****************************************************************************
NdbOperation* getNdbOperation(const char* aTableName);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment