/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include "FastScheduler.hpp" #include "RefConvert.hpp" #include "Emulator.hpp" #include "VMSignal.hpp" #include <Error.hpp> #include <SignalLoggerManager.hpp> #include <BlockNumbers.h> #include <GlobalSignalNumbers.h> #include <signaldata/EventReport.hpp> #include "LongSignal.hpp" #include <NdbTick.h> #define MIN_NUMBER_OF_SIG_PER_DO_JOB 64 #define MAX_NUMBER_OF_SIG_PER_DO_JOB 2048 #define EXTRA_SIGNALS_PER_DO_JOB 32 FastScheduler::FastScheduler() { // These constants work for sun only, but they should be initated from // Emulator.C as soon as VMTime has been initiated. theJobBuffers[0].newBuffer(JBASIZE); theJobBuffers[1].newBuffer(JBBSIZE); theJobBuffers[2].newBuffer(JBCSIZE); theJobBuffers[3].newBuffer(JBDSIZE); clear(); } FastScheduler::~FastScheduler() { } void FastScheduler::clear() { int i; // Make sure the restart signals are not sent too early // the prio is set back in 'main' using the 'ready' method. globalData.highestAvailablePrio = LEVEL_IDLE; globalData.sendPackedActivated = 0; globalData.activateSendPacked = 0; for (i = 0; i < JB_LEVELS; i++){ theJobBuffers[i].clear(); } globalData.JobCounter = 0; globalData.JobLap = 0; globalData.loopMax = 32; globalData.VMSignals[0].header.theSignalId = 0; theDoJobTotalCounter = 0; theDoJobCallCounter = 0; } void FastScheduler::activateSendPacked() { globalData.sendPackedActivated = 1; globalData.activateSendPacked = 0; globalData.loopMax = 2048; }//FastScheduler::activateSendPacked() //------------------------------------------------------------------------ // sendPacked is executed at the end of the loop. // To ensure that we don't send any messages before executing all local // packed signals we do another turn in the loop (unless we have already // executed too many signals in the loop). //------------------------------------------------------------------------ void FastScheduler::doJob() { Uint32 loopCount = 0; Uint32 TminLoops = getBOccupancy() + EXTRA_SIGNALS_PER_DO_JOB; Uint32 TloopMax = (Uint32)globalData.loopMax; if (TminLoops < TloopMax) { TloopMax = TminLoops; }//if if (TloopMax < MIN_NUMBER_OF_SIG_PER_DO_JOB) { TloopMax = MIN_NUMBER_OF_SIG_PER_DO_JOB; }//if register Signal* signal = getVMSignals(); register Uint32 tHighPrio= globalData.highestAvailablePrio; do{ while ((tHighPrio < LEVEL_IDLE) && (loopCount < TloopMax)) { // signal->garbage_register(); // To ensure we find bugs quickly register Uint32 gsnbnr = theJobBuffers[tHighPrio].retrieve(signal); register BlockNumber reg_bnr = gsnbnr & 0xFFF; register GlobalSignalNumber reg_gsn = gsnbnr >> 16; globalData.incrementWatchDogCounter(1); if (reg_bnr > 0) { Uint32 tJobCounter = globalData.JobCounter; Uint32 tJobLap = globalData.JobLap; SimulatedBlock* b = globalData.getBlock(reg_bnr); theJobPriority[tJobCounter] = (Uint8)tHighPrio; globalData.JobCounter = (tJobCounter + 1) & 4095; globalData.JobLap = tJobLap + 1; #ifdef VM_TRACE_TIME Uint32 us1, us2; Uint64 ms1, ms2; NdbTick_CurrentMicrosecond(&ms1, &us1); b->m_currentGsn = reg_gsn; #endif getSections(signal->header.m_noOfSections, signal->m_sectionPtr); #ifdef VM_TRACE { if (globalData.testOn) { signal->header.theVerId_signalNumber = reg_gsn; signal->header.theReceiversBlockNumber = reg_bnr; globalSignalLoggers.executeSignal(signal->header, tHighPrio, &signal->theData[0], globalData.ownId, signal->m_sectionPtr, signal->header.m_noOfSections); }//if } #endif b->executeFunction(reg_gsn, signal); releaseSections(signal->header.m_noOfSections, signal->m_sectionPtr); signal->header.m_noOfSections = 0; #ifdef VM_TRACE_TIME NdbTick_CurrentMicrosecond(&ms2, &us2); Uint64 diff = ms2; diff -= ms1; diff *= 1000000; diff += us2; diff -= us1; b->addTime(reg_gsn, diff); #endif tHighPrio = globalData.highestAvailablePrio; } else { tHighPrio++; globalData.highestAvailablePrio = tHighPrio; }//if loopCount++; }//while sendPacked(); tHighPrio = globalData.highestAvailablePrio; if(getBOccupancy() > MAX_OCCUPANCY) { if(loopCount != TloopMax) abort(); assert( loopCount == TloopMax ); TloopMax += 512; } } while ((getBOccupancy() > MAX_OCCUPANCY) || ((loopCount < TloopMax) && (tHighPrio < LEVEL_IDLE))); theDoJobCallCounter ++; theDoJobTotalCounter += loopCount; if (theDoJobCallCounter == 8192) { reportDoJobStatistics(theDoJobTotalCounter >> 13); theDoJobCallCounter = 0; theDoJobTotalCounter = 0; }//if }//FastScheduler::doJob() void FastScheduler::sendPacked() { if (globalData.sendPackedActivated == 1) { SimulatedBlock* b_lqh = globalData.getBlock(DBLQH); SimulatedBlock* b_tc = globalData.getBlock(DBTC); SimulatedBlock* b_tup = globalData.getBlock(DBTUP); Signal* signal = getVMSignals(); b_lqh->executeFunction(GSN_SEND_PACKED, signal); b_tc->executeFunction(GSN_SEND_PACKED, signal); b_tup->executeFunction(GSN_SEND_PACKED, signal); return; } else if (globalData.activateSendPacked == 0) { return; } else { activateSendPacked(); }//if return; }//FastScheduler::sendPacked() Uint32 APZJobBuffer::retrieve(Signal* signal) { Uint32 tOccupancy = theOccupancy; Uint32 myRPtr = rPtr; BufferEntry& buf = buffer[myRPtr]; Uint32 gsnbnr; Uint32 cond = (++myRPtr == bufSize) - 1; Uint32 tRecBlockNo = buf.header.theReceiversBlockNumber; if (tOccupancy != 0) { if (tRecBlockNo != 0) { // Transform protocol to signal. rPtr = myRPtr & cond; theOccupancy = tOccupancy - 1; gsnbnr = buf.header.theVerId_signalNumber << 16 | tRecBlockNo; Uint32 tSignalId = globalData.theSignalId; Uint32 tLength = buf.header.theLength; Uint32 tFirstData = buf.theDataRegister[0]; signal->header = buf.header; // Recall our signal Id for restart purposes buf.header.theSignalId = tSignalId; globalData.theSignalId = tSignalId + 1; Uint32* tDataRegPtr = &buf.theDataRegister[0]; Uint32* tSigDataPtr = signal->getDataPtrSend(); *tSigDataPtr = tFirstData; tDataRegPtr++; tSigDataPtr++; Uint32 tLengthCopied = 1; while (tLengthCopied < tLength) { Uint32 tData0 = tDataRegPtr[0]; Uint32 tData1 = tDataRegPtr[1]; Uint32 tData2 = tDataRegPtr[2]; Uint32 tData3 = tDataRegPtr[3]; tDataRegPtr += 4; tLengthCopied += 4; tSigDataPtr[0] = tData0; tSigDataPtr[1] = tData1; tSigDataPtr[2] = tData2; tSigDataPtr[3] = tData3; tSigDataPtr += 4; }//while /** * Copy sections references (copy all without if-statements) */ tDataRegPtr = &buf.theDataRegister[tLength]; SegmentedSectionPtr * tSecPtr = &signal->m_sectionPtr[0]; Uint32 tData0 = tDataRegPtr[0]; Uint32 tData1 = tDataRegPtr[1]; Uint32 tData2 = tDataRegPtr[2]; tSecPtr[0].i = tData0; tSecPtr[1].i = tData1; tSecPtr[2].i = tData2; //--------------------------------------------------------- // Prefetch of buffer[rPtr] is done here. We prefetch for // read both the first cache line and the next 64 byte // entry //--------------------------------------------------------- PREFETCH((void*)&buffer[rPtr]); PREFETCH((void*)(((char*)&buffer[rPtr]) + 64)); return gsnbnr; } else { bnr_error(); return 0; // Will never come here, simply to keep GCC happy. }//if } else { //------------------------------------------------------------ // The Job Buffer was empty, signal this by return zero. //------------------------------------------------------------ return 0; }//if }//APZJobBuffer::retrieve() void APZJobBuffer::signal2buffer(Signal* signal, BlockNumber bnr, GlobalSignalNumber gsn, BufferEntry& buf) { Uint32 tSignalId = globalData.theSignalId; Uint32 tFirstData = signal->theData[0]; Uint32 tLength = signal->header.theLength; Uint32 tSigId = buf.header.theSignalId; buf.header = signal->header; buf.header.theVerId_signalNumber = gsn; buf.header.theReceiversBlockNumber = bnr; buf.header.theSendersSignalId = tSignalId - 1; buf.header.theSignalId = tSigId; buf.theDataRegister[0] = tFirstData; Uint32 tLengthCopied = 1; Uint32* tSigDataPtr = &signal->theData[1]; Uint32* tDataRegPtr = &buf.theDataRegister[1]; while (tLengthCopied < tLength) { Uint32 tData0 = tSigDataPtr[0]; Uint32 tData1 = tSigDataPtr[1]; Uint32 tData2 = tSigDataPtr[2]; Uint32 tData3 = tSigDataPtr[3]; tLengthCopied += 4; tSigDataPtr += 4; tDataRegPtr[0] = tData0; tDataRegPtr[1] = tData1; tDataRegPtr[2] = tData2; tDataRegPtr[3] = tData3; tDataRegPtr += 4; }//while /** * Copy sections references (copy all without if-statements) */ tDataRegPtr = &buf.theDataRegister[tLength]; SegmentedSectionPtr * tSecPtr = &signal->m_sectionPtr[0]; Uint32 tData0 = tSecPtr[0].i; Uint32 tData1 = tSecPtr[1].i; Uint32 tData2 = tSecPtr[2].i; tDataRegPtr[0] = tData0; tDataRegPtr[1] = tData1; tDataRegPtr[2] = tData2; }//APZJobBuffer::signal2buffer() void APZJobBuffer::insert(const SignalHeader * const sh, const Uint32 * const theData, const Uint32 secPtrI[3]){ Uint32 tOccupancy = theOccupancy + 1; Uint32 myWPtr = wPtr; register BufferEntry& buf = buffer[myWPtr]; if (tOccupancy < bufSize) { Uint32 cond = (++myWPtr == bufSize) - 1; wPtr = myWPtr & cond; theOccupancy = tOccupancy; buf.header = * sh; const Uint32 len = buf.header.theLength; memcpy(buf.theDataRegister, theData, 4 * len); memcpy(&buf.theDataRegister[len], &secPtrI[0], 4 * 3); //--------------------------------------------------------- // Prefetch of buffer[wPtr] is done here. We prefetch for // write both the first cache line and the next 64 byte // entry //--------------------------------------------------------- WRITEHINT((void*)&buffer[wPtr]); WRITEHINT((void*)(((char*)&buffer[wPtr]) + 64)); } else { jbuf_error(); }//if } APZJobBuffer::APZJobBuffer() : bufSize(0), buffer(NULL), memRef(NULL) { clear(); } APZJobBuffer::~APZJobBuffer() { delete [] buffer; } void APZJobBuffer::newBuffer(int size) { buffer = new BufferEntry[size + 1]; // +1 to support "overrrun" if(buffer){ #ifndef NDB_PURIFY ::memset(buffer, 0, (size * sizeof(BufferEntry))); #endif bufSize = size; } else bufSize = 0; } void APZJobBuffer::clear() { rPtr = 0; wPtr = 0; theOccupancy = 0; } /** * Function prototype for print_restart * * Defined later in this file */ void print_restart(FILE * output, Signal* signal, Uint32 aLevel); void FastScheduler::dumpSignalMemory(FILE * output) { Signal signal; Uint32 ReadPtr[5]; Uint32 tJob; Uint32 tLastJob; fprintf(output, "\n"); if (globalData.JobLap > 4095) { if (globalData.JobCounter != 0) tJob = globalData.JobCounter - 1; else tJob = 4095; tLastJob = globalData.JobCounter; } else { if (globalData.JobCounter == 0) return; // No signals sent else { tJob = globalData.JobCounter - 1; tLastJob = 4095; } } ReadPtr[0] = theJobBuffers[0].getReadPtr(); ReadPtr[1] = theJobBuffers[1].getReadPtr(); ReadPtr[2] = theJobBuffers[2].getReadPtr(); ReadPtr[3] = theJobBuffers[3].getReadPtr(); do { unsigned char tLevel = theJobPriority[tJob]; globalData.incrementWatchDogCounter(4); if (ReadPtr[tLevel] == 0) ReadPtr[tLevel] = theJobBuffers[tLevel].getBufSize() - 1; else ReadPtr[tLevel]--; theJobBuffers[tLevel].retrieveDump(&signal, ReadPtr[tLevel]); print_restart(output, &signal, tLevel); if (tJob == 0) tJob = 4095; else tJob--; } while (tJob != tLastJob); fflush(output); } void FastScheduler::prio_level_error() { ERROR_SET(ecError, ERROR_WRONG_PRIO_LEVEL, "Wrong Priority Level", "FastScheduler.C"); } void jbuf_error() { ERROR_SET(ecError, BLOCK_ERROR_JBUFCONGESTION, "Job Buffer Full", "APZJobBuffer.C"); } void bnr_error() { ERROR_SET(ecError, BLOCK_ERROR_BNR_ZERO, "Block Number Zero", "FastScheduler.C"); } void print_restart(FILE * output, Signal* signal, Uint32 aLevel) { fprintf(output, "--------------- Signal ----------------\n"); SignalLoggerManager::printSignalHeader(output, signal->header, aLevel, globalData.ownId, true); SignalLoggerManager::printSignalData (output, signal->header, &signal->theData[0]); } /** * This method used to be a Cmvmi member function * but is now a "ordinary" function" * * See TransporterCallback.cpp for explanation */ void FastScheduler::reportDoJobStatistics(Uint32 tMeanLoopCount) { Signal signal; memset(&signal.header, 0, sizeof(signal.header)); signal.theData[0] = NDB_LE_JobStatistic; signal.theData[1] = tMeanLoopCount; memset(&signal.header, 0, sizeof(SignalHeader)); signal.header.theLength = 2; signal.header.theSendersSignalId = 0; signal.header.theSendersBlockRef = numberToRef(0, 0); execute(&signal, JBA, CMVMI, GSN_EVENT_REP); }