/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #define DBTUX_SCAN_CPP #include "Dbtux.hpp" void Dbtux::execACC_SCANREQ(Signal* signal) { jamEntry(); const AccScanReq reqCopy = *(const AccScanReq*)signal->getDataPtr(); const AccScanReq* const req = &reqCopy; ScanOpPtr scanPtr; scanPtr.i = RNIL; do { // get the index IndexPtr indexPtr; c_indexPool.getPtr(indexPtr, req->tableId); // get the fragment FragPtr fragPtr; fragPtr.i = RNIL; for (unsigned i = 0; i < indexPtr.p->m_numFrags; i++) { jam(); if (indexPtr.p->m_fragId[i] == req->fragmentNo << 1) { jam(); c_fragPool.getPtr(fragPtr, indexPtr.p->m_fragPtrI[i]); break; } } ndbrequire(fragPtr.i != RNIL); Frag& frag = *fragPtr.p; // must be normal DIH/TC fragment TreeHead& tree = frag.m_tree; // check for empty fragment if (tree.m_root == NullTupLoc) { jam(); AccScanConf* const conf = (AccScanConf*)signal->getDataPtrSend(); conf->scanPtr = req->senderData; conf->accPtr = RNIL; conf->flag = AccScanConf::ZEMPTY_FRAGMENT; sendSignal(req->senderRef, GSN_ACC_SCANCONF, signal, AccScanConf::SignalLength, JBB); return; } // seize from pool and link to per-fragment list if (! frag.m_scanList.seize(scanPtr)) { jam(); break; } new (scanPtr.p) ScanOp(c_scanBoundPool); scanPtr.p->m_state = ScanOp::First; scanPtr.p->m_userPtr = req->senderData; scanPtr.p->m_userRef = req->senderRef; scanPtr.p->m_tableId = indexPtr.p->m_tableId; scanPtr.p->m_indexId = indexPtr.i; scanPtr.p->m_fragId = fragPtr.p->m_fragId; scanPtr.p->m_fragPtrI = fragPtr.i; scanPtr.p->m_transId1 = req->transId1; scanPtr.p->m_transId2 = req->transId2; scanPtr.p->m_savePointId = req->savePointId; scanPtr.p->m_readCommitted = AccScanReq::getReadCommittedFlag(req->requestInfo); scanPtr.p->m_lockMode = AccScanReq::getLockMode(req->requestInfo); scanPtr.p->m_keyInfo = AccScanReq::getKeyinfoFlag(req->requestInfo); #ifdef VM_TRACE if (debugFlags & DebugScan) { debugOut << "Seize scan " << scanPtr.i << " " << *scanPtr.p << endl; } #endif /* * readCommitted lockMode keyInfo * 1 0 0 - read committed (no lock) * 0 0 0 - read latest (read lock) * 0 1 1 - read exclusive (write lock) */ // conf AccScanConf* const conf = (AccScanConf*)signal->getDataPtrSend(); conf->scanPtr = req->senderData; conf->accPtr = scanPtr.i; conf->flag = AccScanConf::ZNOT_EMPTY_FRAGMENT; sendSignal(req->senderRef, GSN_ACC_SCANCONF, signal, AccScanConf::SignalLength, JBB); return; } while (0); if (scanPtr.i != RNIL) { jam(); releaseScanOp(scanPtr); } // LQH does not handle REF signal->theData[0] = 0x313; sendSignal(req->senderRef, GSN_ACC_SCANREF, signal, 1, JBB); } /* * Receive bounds for scan in single direct call. The bounds can arrive * in any order. Attribute ids are those of index table. * * Replace EQ by equivalent LE + GE. Check for conflicting bounds. * Check that sets of lower and upper bounds are on initial sequences of * keys and that all but possibly last bound is non-strict. * * Finally save the sets of lower and upper bounds (i.e. start key and * end key). Full bound type (< 4) is included but only the strict bit * is used since lower and upper have now been separated. */ void Dbtux::execTUX_BOUND_INFO(Signal* signal) { jamEntry(); struct BoundInfo { int type; unsigned offset; unsigned size; }; TuxBoundInfo* const sig = (TuxBoundInfo*)signal->getDataPtrSend(); const TuxBoundInfo reqCopy = *(const TuxBoundInfo*)sig; const TuxBoundInfo* const req = &reqCopy; // get records ScanOp& scan = *c_scanOpPool.getPtr(req->tuxScanPtrI); Index& index = *c_indexPool.getPtr(scan.m_indexId); // collect lower and upper bounds BoundInfo boundInfo[2][MaxIndexAttributes]; // largest attrId seen plus one Uint32 maxAttrId[2] = { 0, 0 }; unsigned offset = 0; const Uint32* const data = (Uint32*)sig + TuxBoundInfo::SignalLength; // walk through entries while (offset + 2 <= req->boundAiLength) { jam(); const unsigned type = data[offset]; if (type > 4) { jam(); scan.m_state = ScanOp::Invalid; sig->errorCode = TuxBoundInfo::InvalidAttrInfo; return; } const AttributeHeader* ah = (const AttributeHeader*)&data[offset + 1]; const Uint32 attrId = ah->getAttributeId(); const Uint32 dataSize = ah->getDataSize(); if (attrId >= index.m_numAttrs) { jam(); scan.m_state = ScanOp::Invalid; sig->errorCode = TuxBoundInfo::InvalidAttrInfo; return; } for (unsigned j = 0; j <= 1; j++) { // check if lower/upper bit matches const unsigned luBit = (j << 1); if ((type & 0x2) != luBit && type != 4) continue; // EQ -> LE, GE const unsigned type2 = (type & 0x1) | luBit; // fill in any gap while (maxAttrId[j] <= attrId) { BoundInfo& b = boundInfo[j][maxAttrId[j]++]; b.type = -1; } BoundInfo& b = boundInfo[j][attrId]; if (b.type != -1) { // compare with previous bound if (b.type != (int)type2 || b.size != 2 + dataSize || memcmp(&data[b.offset + 2], &data[offset + 2], dataSize << 2) != 0) { jam(); scan.m_state = ScanOp::Invalid; sig->errorCode = TuxBoundInfo::InvalidBounds; return; } } else { // enter new bound b.type = type2; b.offset = offset; b.size = 2 + dataSize; } } // jump to next offset += 2 + dataSize; } if (offset != req->boundAiLength) { jam(); scan.m_state = ScanOp::Invalid; sig->errorCode = TuxBoundInfo::InvalidAttrInfo; return; } for (unsigned j = 0; j <= 1; j++) { // save lower/upper bound in index attribute id order for (unsigned i = 0; i < maxAttrId[j]; i++) { jam(); const BoundInfo& b = boundInfo[j][i]; // check for gap or strict bound before last if (b.type == -1 || (i + 1 < maxAttrId[j] && (b.type & 0x1))) { jam(); scan.m_state = ScanOp::Invalid; sig->errorCode = TuxBoundInfo::InvalidBounds; return; } bool ok = scan.m_bound[j]->append(&data[b.offset], b.size); if (! ok) { jam(); scan.m_state = ScanOp::Invalid; sig->errorCode = TuxBoundInfo::OutOfBuffers; return; } } scan.m_boundCnt[j] = maxAttrId[j]; } // no error sig->errorCode = 0; } void Dbtux::execNEXT_SCANREQ(Signal* signal) { jamEntry(); const NextScanReq reqCopy = *(const NextScanReq*)signal->getDataPtr(); const NextScanReq* const req = &reqCopy; ScanOpPtr scanPtr; scanPtr.i = req->accPtr; c_scanOpPool.getPtr(scanPtr); ScanOp& scan = *scanPtr.p; Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI); #ifdef VM_TRACE if (debugFlags & DebugScan) { debugOut << "NEXT_SCANREQ scan " << scanPtr.i << " " << scan << endl; } #endif // handle unlock previous and close scan switch (req->scanFlag) { case NextScanReq::ZSCAN_NEXT: jam(); break; case NextScanReq::ZSCAN_NEXT_COMMIT: jam(); case NextScanReq::ZSCAN_COMMIT: jam(); if (! scan.m_readCommitted) { jam(); AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend(); lockReq->returnCode = RNIL; lockReq->requestInfo = AccLockReq::Unlock; lockReq->accOpPtr = req->accOperationPtr; EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength); jamEntry(); ndbrequire(lockReq->returnCode == AccLockReq::Success); removeAccLockOp(scan, req->accOperationPtr); } if (req->scanFlag == NextScanReq::ZSCAN_COMMIT) { jam(); NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend(); conf->scanPtr = scan.m_userPtr; unsigned signalLength = 1; sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF, signal, signalLength, JBB); return; } break; case NextScanReq::ZSCAN_CLOSE: jam(); // unlink from tree node first to avoid state changes if (scan.m_scanPos.m_loc != NullTupLoc) { jam(); const TupLoc loc = scan.m_scanPos.m_loc; NodeHandle node(frag); selectNode(node, loc); unlinkScan(node, scanPtr); scan.m_scanPos.m_loc = NullTupLoc; } if (scan.m_lockwait) { jam(); ndbrequire(scan.m_accLockOp != RNIL); // use ACC_ABORTCONF to flush out any reply in job buffer AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend(); lockReq->returnCode = RNIL; lockReq->requestInfo = AccLockReq::AbortWithConf; lockReq->accOpPtr = scan.m_accLockOp; EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength); jamEntry(); ndbrequire(lockReq->returnCode == AccLockReq::Success); scan.m_state = ScanOp::Aborting; return; } if (scan.m_state == ScanOp::Locked) { jam(); ndbrequire(scan.m_accLockOp != RNIL); AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend(); lockReq->returnCode = RNIL; lockReq->requestInfo = AccLockReq::Unlock; lockReq->accOpPtr = scan.m_accLockOp; EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength); jamEntry(); ndbrequire(lockReq->returnCode == AccLockReq::Success); scan.m_accLockOp = RNIL; } scan.m_state = ScanOp::Aborting; scanClose(signal, scanPtr); return; case NextScanReq::ZSCAN_NEXT_ABORT: jam(); default: jam(); ndbrequire(false); break; } // start looking for next scan result AccCheckScan* checkReq = (AccCheckScan*)signal->getDataPtrSend(); checkReq->accPtr = scanPtr.i; checkReq->checkLcpStop = AccCheckScan::ZNOT_CHECK_LCP_STOP; EXECUTE_DIRECT(DBTUX, GSN_ACC_CHECK_SCAN, signal, AccCheckScan::SignalLength); jamEntry(); } void Dbtux::execACC_CHECK_SCAN(Signal* signal) { jamEntry(); const AccCheckScan reqCopy = *(const AccCheckScan*)signal->getDataPtr(); const AccCheckScan* const req = &reqCopy; ScanOpPtr scanPtr; scanPtr.i = req->accPtr; c_scanOpPool.getPtr(scanPtr); ScanOp& scan = *scanPtr.p; Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI); #ifdef VM_TRACE if (debugFlags & DebugScan) { debugOut << "ACC_CHECK_SCAN scan " << scanPtr.i << " " << scan << endl; } #endif if (req->checkLcpStop == AccCheckScan::ZCHECK_LCP_STOP) { jam(); signal->theData[0] = scan.m_userPtr; signal->theData[1] = true; EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2); jamEntry(); return; // stop } if (scan.m_lockwait) { jam(); // LQH asks if we are waiting for lock and we tell it to ask again const TreeEnt ent = scan.m_scanEnt; NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend(); conf->scanPtr = scan.m_userPtr; conf->accOperationPtr = RNIL; // no tuple returned conf->fragId = frag.m_fragId | ent.m_fragBit; unsigned signalLength = 3; // if TC has ordered scan close, it will be detected here sendSignal(scan.m_userRef, GSN_NEXT_SCANCONF, signal, signalLength, JBB); return; // stop } if (scan.m_state == ScanOp::First) { jam(); // search is done only once in single range scan scanFirst(scanPtr); #ifdef VM_TRACE if (debugFlags & DebugScan) { debugOut << "First scan " << scanPtr.i << " " << scan << endl; } #endif } if (scan.m_state == ScanOp::Next) { jam(); // look for next scanNext(scanPtr); } // for reading tuple key in Current or Locked state Data pkData = c_dataBuffer; unsigned pkSize = 0; // indicates not yet done if (scan.m_state == ScanOp::Current) { // found an entry to return jam(); ndbrequire(scan.m_accLockOp == RNIL); if (! scan.m_readCommitted) { jam(); const TreeEnt ent = scan.m_scanEnt; // read tuple key readTablePk(frag, ent, pkData, pkSize); // get read lock or exclusive lock AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend(); lockReq->returnCode = RNIL; lockReq->requestInfo = scan.m_lockMode == 0 ? AccLockReq::LockShared : AccLockReq::LockExclusive; lockReq->accOpPtr = RNIL; lockReq->userPtr = scanPtr.i; lockReq->userRef = reference(); lockReq->tableId = scan.m_tableId; lockReq->fragId = frag.m_fragId | ent.m_fragBit; lockReq->fragPtrI = frag.m_accTableFragPtrI[ent.m_fragBit]; const Uint32* const buf32 = static_cast<Uint32*>(pkData); const Uint64* const buf64 = reinterpret_cast<const Uint64*>(buf32); lockReq->hashValue = md5_hash(buf64, pkSize); lockReq->tupAddr = getTupAddr(frag, ent); lockReq->transId1 = scan.m_transId1; lockReq->transId2 = scan.m_transId2; // execute EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::LockSignalLength); jamEntry(); switch (lockReq->returnCode) { case AccLockReq::Success: jam(); scan.m_state = ScanOp::Locked; scan.m_accLockOp = lockReq->accOpPtr; #ifdef VM_TRACE if (debugFlags & DebugScan) { debugOut << "Lock immediate scan " << scanPtr.i << " " << scan << endl; } #endif break; case AccLockReq::IsBlocked: jam(); // normal lock wait scan.m_state = ScanOp::Blocked; scan.m_lockwait = true; scan.m_accLockOp = lockReq->accOpPtr; #ifdef VM_TRACE if (debugFlags & DebugScan) { debugOut << "Lock wait scan " << scanPtr.i << " " << scan << endl; } #endif // LQH will wake us up signal->theData[0] = scan.m_userPtr; signal->theData[1] = true; EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2); jamEntry(); return; // stop break; case AccLockReq::Refused: jam(); // we cannot see deleted tuple (assert only) ndbassert(false); // skip it scan.m_state = ScanOp::Next; signal->theData[0] = scan.m_userPtr; signal->theData[1] = true; EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2); jamEntry(); return; // stop break; case AccLockReq::NoFreeOp: jam(); // max ops should depend on max scans (assert only) ndbassert(false); // stay in Current state scan.m_state = ScanOp::Current; signal->theData[0] = scan.m_userPtr; signal->theData[1] = true; EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2); jamEntry(); return; // stop break; default: ndbrequire(false); break; } } else { scan.m_state = ScanOp::Locked; } } if (scan.m_state == ScanOp::Locked) { // we have lock or do not need one jam(); // read keys if not already done (uses signal) const TreeEnt ent = scan.m_scanEnt; if (scan.m_keyInfo) { jam(); if (pkSize == 0) { jam(); readTablePk(frag, ent, pkData, pkSize); } } // conf signal NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend(); conf->scanPtr = scan.m_userPtr; // the lock is passed to LQH Uint32 accLockOp = scan.m_accLockOp; if (accLockOp != RNIL) { scan.m_accLockOp = RNIL; // remember it until LQH unlocks it addAccLockOp(scan, accLockOp); } else { ndbrequire(scan.m_readCommitted); // operation RNIL in LQH would signal no tuple returned accLockOp = (Uint32)-1; } conf->accOperationPtr = accLockOp; conf->fragId = frag.m_fragId | ent.m_fragBit; conf->localKey[0] = getTupAddr(frag, ent); conf->localKey[1] = 0; conf->localKeyLength = 1; unsigned signalLength = 6; // add key info if (scan.m_keyInfo) { jam(); conf->keyLength = pkSize; // piggy-back first 4 words of key data for (unsigned i = 0; i < 4; i++) { conf->key[i] = i < pkSize ? pkData[i] : 0; } signalLength = 11; } if (! scan.m_readCommitted) { sendSignal(scan.m_userRef, GSN_NEXT_SCANCONF, signal, signalLength, JBB); } else { Uint32 blockNo = refToBlock(scan.m_userRef); EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, signalLength); } // send rest of key data if (scan.m_keyInfo && pkSize > 4) { unsigned total = 4; while (total < pkSize) { jam(); unsigned length = pkSize - total; if (length > 20) length = 20; signal->theData[0] = scan.m_userPtr; signal->theData[1] = 0; signal->theData[2] = 0; signal->theData[3] = length; memcpy(&signal->theData[4], &pkData[total], length << 2); sendSignal(scan.m_userRef, GSN_ACC_SCAN_INFO24, signal, 4 + length, JBB); total += length; } } // next time look for next entry scan.m_state = ScanOp::Next; return; } // XXX in ACC this is checked before req->checkLcpStop if (scan.m_state == ScanOp::Last || scan.m_state == ScanOp::Invalid) { jam(); NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend(); conf->scanPtr = scan.m_userPtr; conf->accOperationPtr = RNIL; conf->fragId = RNIL; unsigned signalLength = 3; sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF, signal, signalLength, JBB); return; } ndbrequire(false); } /* * Lock succeeded (after delay) in ACC. If the lock is for current * entry, set state to Locked. If the lock is for an entry we were * moved away from, simply unlock it. Finally, if we are closing the * scan, do nothing since we have already sent an abort request. */ void Dbtux::execACCKEYCONF(Signal* signal) { jamEntry(); ScanOpPtr scanPtr; scanPtr.i = signal->theData[0]; c_scanOpPool.getPtr(scanPtr); ScanOp& scan = *scanPtr.p; #ifdef VM_TRACE if (debugFlags & DebugScan) { debugOut << "Lock obtained scan " << scanPtr.i << " " << scan << endl; } #endif ndbrequire(scan.m_lockwait && scan.m_accLockOp != RNIL); scan.m_lockwait = false; if (scan.m_state == ScanOp::Blocked) { // the lock wait was for current entry jam(); scan.m_state = ScanOp::Locked; // LQH has the ball return; } if (scan.m_state != ScanOp::Aborting) { // we were moved, release lock jam(); AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend(); lockReq->returnCode = RNIL; lockReq->requestInfo = AccLockReq::Unlock; lockReq->accOpPtr = scan.m_accLockOp; EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength); jamEntry(); ndbrequire(lockReq->returnCode == AccLockReq::Success); scan.m_accLockOp = RNIL; // LQH has the ball return; } // lose the lock scan.m_accLockOp = RNIL; // continue at ACC_ABORTCONF } /* * Lock failed (after delay) in ACC. Probably means somebody ahead of * us in lock queue deleted the tuple. */ void Dbtux::execACCKEYREF(Signal* signal) { jamEntry(); ScanOpPtr scanPtr; scanPtr.i = signal->theData[0]; c_scanOpPool.getPtr(scanPtr); ScanOp& scan = *scanPtr.p; #ifdef VM_TRACE if (debugFlags & DebugScan) { debugOut << "Lock refused scan " << scanPtr.i << " " << scan << endl; } #endif ndbrequire(scan.m_lockwait && scan.m_accLockOp != RNIL); scan.m_lockwait = false; if (scan.m_state != ScanOp::Aborting) { jam(); // release the operation AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend(); lockReq->returnCode = RNIL; lockReq->requestInfo = AccLockReq::Abort; lockReq->accOpPtr = scan.m_accLockOp; EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength); jamEntry(); ndbrequire(lockReq->returnCode == AccLockReq::Success); scan.m_accLockOp = RNIL; // scan position should already have been moved (assert only) if (scan.m_state == ScanOp::Blocked) { jam(); ndbassert(false); scan.m_state = ScanOp::Next; } // LQH has the ball return; } // lose the lock scan.m_accLockOp = RNIL; // continue at ACC_ABORTCONF } /* * Received when scan is closing. This signal arrives after any * ACCKEYCON or ACCKEYREF which may have been in job buffer. */ void Dbtux::execACC_ABORTCONF(Signal* signal) { jamEntry(); ScanOpPtr scanPtr; scanPtr.i = signal->theData[0]; c_scanOpPool.getPtr(scanPtr); ScanOp& scan = *scanPtr.p; #ifdef VM_TRACE if (debugFlags & DebugScan) { debugOut << "ACC_ABORTCONF scan " << scanPtr.i << " " << scan << endl; } #endif ndbrequire(scan.m_state == ScanOp::Aborting); // most likely we are still in lock wait if (scan.m_lockwait) { jam(); scan.m_lockwait = false; scan.m_accLockOp = RNIL; } scanClose(signal, scanPtr); } /* * Find start position for single range scan. If it exists, sets state * to Next and links the scan to the node. The first entry is returned * by scanNext. */ void Dbtux::scanFirst(ScanOpPtr scanPtr) { ScanOp& scan = *scanPtr.p; Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI); TreeHead& tree = frag.m_tree; // set up index keys for this operation setKeyAttrs(frag); // unpack lower bound into c_dataBuffer const ScanBound& bound = *scan.m_bound[0]; ScanBoundIterator iter; bound.first(iter); for (unsigned j = 0; j < bound.getSize(); j++) { jam(); c_dataBuffer[j] = *iter.data; bound.next(iter); } // search for scan start position TreePos treePos; searchToScan(frag, c_dataBuffer, scan.m_boundCnt[0], treePos); if (treePos.m_loc == NullTupLoc) { // empty tree jam(); scan.m_state = ScanOp::Last; return; } // set position and state scan.m_scanPos = treePos; scan.m_state = ScanOp::Next; // link the scan to node found NodeHandle node(frag); selectNode(node, treePos.m_loc); linkScan(node, scanPtr); } /* * Move to next entry. The scan is already linked to some node. When * we leave, if an entry was found, it will be linked to a possibly * different node. The scan has a position, and a direction which tells * from where we came to this position. This is one of: * * 0 - up from left child (scan this node next) * 1 - up from right child (proceed to parent) * 2 - up from root (the scan ends) * 3 - left to right within node (at end proceed to right child) * 4 - down from parent (proceed to left child) * * If an entry was found, scan direction is 3. Therefore tree * re-organizations need not worry about scan direction. */ void Dbtux::scanNext(ScanOpPtr scanPtr) { ScanOp& scan = *scanPtr.p; Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI); #ifdef VM_TRACE if (debugFlags & DebugScan) { debugOut << "Next in scan " << scanPtr.i << " " << scan << endl; } #endif // cannot be moved away from tuple we have locked ndbrequire(scan.m_state != ScanOp::Locked); // set up index keys for this operation setKeyAttrs(frag); // unpack upper bound into c_dataBuffer const ScanBound& bound = *scan.m_bound[1]; ScanBoundIterator iter; bound.first(iter); for (unsigned j = 0; j < bound.getSize(); j++) { jam(); c_dataBuffer[j] = *iter.data; bound.next(iter); } // use copy of position TreePos pos = scan.m_scanPos; // get and remember original node NodeHandle origNode(frag); selectNode(origNode, pos.m_loc); ndbrequire(islinkScan(origNode, scanPtr)); // current node in loop NodeHandle node = origNode; // copy of entry found TreeEnt ent; while (true) { jam(); if (pos.m_dir == 2) { // coming up from root ends the scan jam(); pos.m_loc = NullTupLoc; scan.m_state = ScanOp::Last; break; } if (node.m_loc != pos.m_loc) { jam(); selectNode(node, pos.m_loc); } if (pos.m_dir == 4) { // coming down from parent proceed to left child jam(); TupLoc loc = node.getLink(0); if (loc != NullTupLoc) { jam(); pos.m_loc = loc; pos.m_dir = 4; // unchanged continue; } // pretend we came from left child pos.m_dir = 0; } if (pos.m_dir == 0) { // coming up from left child scan current node jam(); pos.m_pos = 0; pos.m_match = false; pos.m_dir = 3; } if (pos.m_dir == 3) { // within node jam(); unsigned occup = node.getOccup(); ndbrequire(occup >= 1); // advance position if (! pos.m_match) pos.m_match = true; else pos.m_pos++; if (pos.m_pos < occup) { jam(); ent = node.getEnt(pos.m_pos); pos.m_dir = 3; // unchanged // read and compare all attributes readKeyAttrs(frag, ent, 0, c_entryKey); int ret = cmpScanBound(frag, 1, c_dataBuffer, scan.m_boundCnt[1], c_entryKey); ndbrequire(ret != NdbSqlUtil::CmpUnknown); if (ret < 0) { jam(); // hit upper bound of single range scan pos.m_loc = NullTupLoc; scan.m_state = ScanOp::Last; break; } // can we see it if (! scanVisible(scanPtr, ent)) { jam(); continue; } // found entry scan.m_state = ScanOp::Current; break; } // after node proceed to right child TupLoc loc = node.getLink(1); if (loc != NullTupLoc) { jam(); pos.m_loc = loc; pos.m_dir = 4; continue; } // pretend we came from right child pos.m_dir = 1; } if (pos.m_dir == 1) { // coming up from right child proceed to parent jam(); pos.m_loc = node.getLink(2); pos.m_dir = node.getSide(); continue; } ndbrequire(false); } // copy back position scan.m_scanPos = pos; // relink if (scan.m_state == ScanOp::Current) { ndbrequire(pos.m_match == true && pos.m_dir == 3); ndbrequire(pos.m_loc == node.m_loc); if (origNode.m_loc != node.m_loc) { jam(); unlinkScan(origNode, scanPtr); linkScan(node, scanPtr); } // copy found entry scan.m_scanEnt = ent; } else if (scan.m_state == ScanOp::Last) { jam(); ndbrequire(pos.m_loc == NullTupLoc); unlinkScan(origNode, scanPtr); } else { ndbrequire(false); } #ifdef VM_TRACE if (debugFlags & DebugScan) { debugOut << "Next out scan " << scanPtr.i << " " << scan << endl; } #endif } /* * Check if an entry is visible to the scan. * * There is a special check to never accept same tuple twice in a row. * This is faster than asking TUP. It also fixes some special cases * which are not analyzed or handled yet. */ bool Dbtux::scanVisible(ScanOpPtr scanPtr, TreeEnt ent) { const ScanOp& scan = *scanPtr.p; const Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI); Uint32 fragBit = ent.m_fragBit; Uint32 tableFragPtrI = frag.m_tupTableFragPtrI[fragBit]; Uint32 fragId = frag.m_fragId | fragBit; Uint32 tupAddr = getTupAddr(frag, ent); Uint32 tupVersion = ent.m_tupVersion; // check for same tuple twice in row if (scan.m_scanEnt.m_tupLoc == ent.m_tupLoc && scan.m_scanEnt.m_fragBit == fragBit) { jam(); return false; } Uint32 transId1 = scan.m_transId1; Uint32 transId2 = scan.m_transId2; Uint32 savePointId = scan.m_savePointId; bool ret = c_tup->tuxQueryTh(tableFragPtrI, tupAddr, tupVersion, transId1, transId2, savePointId); jamEntry(); return ret; } /* * Finish closing of scan and send conf. Any lock wait has been done * already. */ void Dbtux::scanClose(Signal* signal, ScanOpPtr scanPtr) { ScanOp& scan = *scanPtr.p; ndbrequire(! scan.m_lockwait && scan.m_accLockOp == RNIL); // unlock all not unlocked by LQH for (unsigned i = 0; i < scan.m_maxAccLockOps; i++) { if (scan.m_accLockOps[i] != RNIL) { jam(); AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend(); lockReq->returnCode = RNIL; lockReq->requestInfo = AccLockReq::Abort; lockReq->accOpPtr = scan.m_accLockOps[i]; EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength); jamEntry(); ndbrequire(lockReq->returnCode == AccLockReq::Success); scan.m_accLockOps[i] = RNIL; } } // send conf NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend(); conf->scanPtr = scanPtr.p->m_userPtr; conf->accOperationPtr = RNIL; conf->fragId = RNIL; unsigned signalLength = 3; sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF, signal, signalLength, JBB); releaseScanOp(scanPtr); } void Dbtux::addAccLockOp(ScanOp& scan, Uint32 accLockOp) { ndbrequire(accLockOp != RNIL); Uint32* list = scan.m_accLockOps; bool ok = false; for (unsigned i = 0; i < scan.m_maxAccLockOps; i++) { ndbrequire(list[i] != accLockOp); if (! ok && list[i] == RNIL) { list[i] = accLockOp; ok = true; // continue check for duplicates } } if (! ok) { unsigned i = scan.m_maxAccLockOps; if (i < MaxAccLockOps) { list[i] = accLockOp; ok = true; scan.m_maxAccLockOps = i + 1; } } ndbrequire(ok); } void Dbtux::removeAccLockOp(ScanOp& scan, Uint32 accLockOp) { ndbrequire(accLockOp != RNIL); Uint32* list = scan.m_accLockOps; bool ok = false; for (unsigned i = 0; i < scan.m_maxAccLockOps; i++) { if (list[i] == accLockOp) { list[i] = RNIL; ok = true; break; } } ndbrequire(ok); } /* * Release allocated records. */ void Dbtux::releaseScanOp(ScanOpPtr& scanPtr) { #ifdef VM_TRACE if (debugFlags & DebugScan) { debugOut << "Release scan " << scanPtr.i << " " << *scanPtr.p << endl; } #endif Frag& frag = *c_fragPool.getPtr(scanPtr.p->m_fragPtrI); scanPtr.p->m_boundMin.release(); scanPtr.p->m_boundMax.release(); // unlink from per-fragment list and release from pool frag.m_scanList.release(scanPtr); }