/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#define DBTUP_C
#include "Dbtup.hpp"
#include <RefConvert.hpp>
#include <ndb_limits.h>
#include <pc.hpp>
#include <signaldata/TupCommit.hpp>

#define ljam() { jamLine(5000 + __LINE__); }
#define ljamEntry() { jamEntryLine(5000 + __LINE__); }

void Dbtup::execTUP_WRITELOG_REQ(Signal* signal)
{
  jamEntry();
  OperationrecPtr loopOpPtr;
  loopOpPtr.i = signal->theData[0];
  Uint32 gci = signal->theData[1];
  ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
  while (loopOpPtr.p->nextActiveOp != RNIL) {
    ljam();
    loopOpPtr.i = loopOpPtr.p->nextActiveOp;
    ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
  }//while
  do {
    Uint32 blockNo = refToBlock(loopOpPtr.p->userblockref);
    ndbrequire(loopOpPtr.p->transstate == STARTED);
    signal->theData[0] = loopOpPtr.p->userpointer;
    signal->theData[1] = gci;
    if (loopOpPtr.p->prevActiveOp == RNIL) {
      ljam();
      EXECUTE_DIRECT(blockNo, GSN_LQH_WRITELOG_REQ, signal, 2);
      return;
    }//if
    ljam();
    EXECUTE_DIRECT(blockNo, GSN_LQH_WRITELOG_REQ, signal, 2);
    jamEntry();
    loopOpPtr.i = loopOpPtr.p->prevActiveOp;
    ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
  } while (true);
}//Dbtup::execTUP_WRITELOG_REQ()

void Dbtup::execTUP_DEALLOCREQ(Signal* signal)
{
  TablerecPtr regTabPtr;
  FragrecordPtr regFragPtr;

  jamEntry();

  Uint32 fragId = signal->theData[0];
  regTabPtr.i = signal->theData[1];
  Uint32 fragPageId = signal->theData[2];
  Uint32 pageIndex = signal->theData[3];

  ptrCheckGuard(regTabPtr, cnoOfTablerec, tablerec);
  getFragmentrec(regFragPtr, fragId, regTabPtr.p);
  ndbrequire(regFragPtr.p != NULL);

  PagePtr pagePtr;
  pagePtr.i = getRealpid(regFragPtr.p, fragPageId);
  ptrCheckGuard(pagePtr, cnoOfPage, page);
  Uint32 pageIndexScaled = pageIndex >> 1;
  ndbrequire((pageIndex & 1) == 0);
  Uint32 pageOffset = ZPAGE_HEADER_SIZE + 
                     (regTabPtr.p->tupheadsize * pageIndexScaled);
//---------------------------------------------------
/* --- Deallocate a tuple as requested by ACC  --- */
//---------------------------------------------------
  if (isUndoLoggingNeeded(regFragPtr.p, fragPageId)) {
    ljam();
    cprAddUndoLogRecord(signal,
                        ZLCPR_TYPE_INSERT_TH,
                        fragPageId,
                        pageIndex,
                        regTabPtr.i,
                        fragId,
                        regFragPtr.p->checkpointVersion);
    cprAddData(signal,
               regFragPtr.p,
               pagePtr.i,
               regTabPtr.p->tupheadsize,
               pageOffset);
  }//if
  {
    freeTh(regFragPtr.p,
           regTabPtr.p,
           signal,
           pagePtr.p,
           pageOffset);
  }
}

/* ---------------------------------------------------------------- */
/* ------------ PERFORM A COMMIT ON AN UPDATE OPERATION  ---------- */
/* ---------------------------------------------------------------- */
void Dbtup::commitUpdate(Signal* signal,
                         Operationrec*  const regOperPtr,
                         Fragrecord* const regFragPtr,
                         Tablerec* const regTabPtr)
{
  if (regOperPtr->realPageIdC != RNIL) {
    if (isUndoLoggingNeeded(regFragPtr, regOperPtr->fragPageIdC)) {
/* ------------------------------------------------------------------------ */
/* IF THE COPY WAS CREATED WITHIN THIS CHECKPOINT WE ONLY HAVE              */
/* TO LOG THE CREATION OF THE COPY. IF HOWEVER IT WAS CREATED BEFORE  SAVE  */
/* THIS CHECKPOINT, WE HAVE TO THE DATA AS WELL.                            */
/* ------------------------------------------------------------------------ */
      if (regOperPtr->undoLogged) {
        ljam();
        cprAddUndoLogRecord(signal,
                            ZLCPR_TYPE_INSERT_TH_NO_DATA,
                            regOperPtr->fragPageIdC,
                            regOperPtr->pageIndexC,
                            regOperPtr->tableRef,
                            regOperPtr->fragId,
                            regFragPtr->checkpointVersion);
      } else {
        ljam();
        cprAddUndoLogRecord(signal,
                            ZLCPR_TYPE_INSERT_TH,
                            regOperPtr->fragPageIdC,
                            regOperPtr->pageIndexC,
                            regOperPtr->tableRef,
                            regOperPtr->fragId,
                            regFragPtr->checkpointVersion);
        cprAddData(signal,
                   regFragPtr,
                   regOperPtr->realPageIdC,
                   regTabPtr->tupheadsize,
                   regOperPtr->pageOffsetC);
      }//if
    }//if

    PagePtr copyPagePtr;
    copyPagePtr.i = regOperPtr->realPageIdC;
    ptrCheckGuard(copyPagePtr, cnoOfPage, page);
    freeTh(regFragPtr,
           regTabPtr,
           signal,
           copyPagePtr.p,
           (Uint32)regOperPtr->pageOffsetC);
    regOperPtr->realPageIdC = RNIL;
    regOperPtr->fragPageIdC = RNIL;
    regOperPtr->pageOffsetC = ZNIL;
    regOperPtr->pageIndexC = ZNIL;
  }//if
}//Dbtup::commitUpdate()

void
Dbtup::commitSimple(Signal* signal,
                    Operationrec* const regOperPtr,
                    Fragrecord* const regFragPtr,
                    Tablerec* const regTabPtr)
{
  operPtr.p = regOperPtr;
  fragptr.p = regFragPtr;
  tabptr.p = regTabPtr;

  // Checking detached triggers
  checkDetachedTriggers(signal,
                        regOperPtr,
                        regTabPtr);

  removeActiveOpList(regOperPtr);
  if (regOperPtr->optype == ZUPDATE) {
    ljam();
    commitUpdate(signal, regOperPtr, regFragPtr, regTabPtr);
    if (regTabPtr->GCPIndicator) {
      updateGcpId(signal, regOperPtr, regFragPtr, regTabPtr);
    }//if
  } else if (regOperPtr->optype == ZINSERT) {
    ljam();
    if (regTabPtr->GCPIndicator) {
      updateGcpId(signal, regOperPtr, regFragPtr, regTabPtr);
    }//if
  } else {
    ndbrequire(regOperPtr->optype == ZDELETE);
  }//if
}//Dbtup::commitSimple()

void Dbtup::removeActiveOpList(Operationrec*  const regOperPtr)
{
  if (regOperPtr->inActiveOpList == ZTRUE) {
    OperationrecPtr raoOperPtr;
    regOperPtr->inActiveOpList = ZFALSE;
    if (regOperPtr->prevActiveOp != RNIL) {
      ljam();
      raoOperPtr.i = regOperPtr->prevActiveOp;
      ptrCheckGuard(raoOperPtr, cnoOfOprec, operationrec);
      raoOperPtr.p->nextActiveOp = regOperPtr->nextActiveOp;
    } else {
      ljam();
      PagePtr pagePtr;
      pagePtr.i = regOperPtr->realPageId;
      ptrCheckGuard(pagePtr, cnoOfPage, page);
      ndbrequire(regOperPtr->pageOffset < ZWORDS_ON_PAGE);
      pagePtr.p->pageWord[regOperPtr->pageOffset] = regOperPtr->nextActiveOp;
    }//if
    if (regOperPtr->nextActiveOp != RNIL) {
      ljam();
      raoOperPtr.i = regOperPtr->nextActiveOp;
      ptrCheckGuard(raoOperPtr, cnoOfOprec, operationrec);
      raoOperPtr.p->prevActiveOp = regOperPtr->prevActiveOp;
    }//if
    regOperPtr->prevActiveOp = RNIL;
    regOperPtr->nextActiveOp = RNIL;
  }//if
}//Dbtup::removeActiveOpList()

/* ---------------------------------------------------------------- */
/* INITIALIZATION OF ONE CONNECTION RECORD TO PREPARE FOR NEXT OP.  */
/* ---------------------------------------------------------------- */
void Dbtup::initOpConnection(Operationrec* regOperPtr,
			     Fragrecord * fragPtrP)
{
  Uint32 RinFragList = regOperPtr->inFragList;
  regOperPtr->transstate = IDLE;
  regOperPtr->currentAttrinbufLen = 0;
  regOperPtr->optype = ZREAD;
  if (RinFragList == ZTRUE) {
    OperationrecPtr tropNextLinkPtr;
    OperationrecPtr tropPrevLinkPtr;
/*----------------------------------------------------------------- */
/*       TO ENSURE THAT WE HAVE SUCCESSFUL ABORTS OF FOLLOWING      */
/*       OPERATIONS WHICH NEVER STARTED WE SET THE OPTYPE TO READ.  */
/*----------------------------------------------------------------- */
/*       REMOVE IT FROM THE DOUBLY LINKED LIST ON THE FRAGMENT      */
/*----------------------------------------------------------------- */
    tropPrevLinkPtr.i = regOperPtr->prevOprecInList;
    tropNextLinkPtr.i = regOperPtr->nextOprecInList;
    regOperPtr->inFragList = ZFALSE;
    if (tropPrevLinkPtr.i == RNIL) {
      ljam();
      fragPtrP->firstusedOprec = tropNextLinkPtr.i;
    } else {
      ljam();
      ptrCheckGuard(tropPrevLinkPtr, cnoOfOprec, operationrec);
      tropPrevLinkPtr.p->nextOprecInList = tropNextLinkPtr.i;
    }//if
    if (tropNextLinkPtr.i == RNIL) {
      fragPtrP->lastusedOprec = tropPrevLinkPtr.i;
    } else {
      ptrCheckGuard(tropNextLinkPtr, cnoOfOprec, operationrec);
      tropNextLinkPtr.p->prevOprecInList = tropPrevLinkPtr.i;
    }
    regOperPtr->prevOprecInList = RNIL;
    regOperPtr->nextOprecInList = RNIL;
  }//if
}//Dbtup::initOpConnection()

/* ----------------------------------------------------------------- */
/* --------------- COMMIT THIS PART OF A TRANSACTION --------------- */
/* ----------------------------------------------------------------- */
void Dbtup::execTUP_COMMITREQ(Signal* signal) 
{
  FragrecordPtr regFragPtr;
  OperationrecPtr regOperPtr;
  TablerecPtr regTabPtr;

  TupCommitReq * const tupCommitReq = (TupCommitReq *)signal->getDataPtr();

  ljamEntry();
  regOperPtr.i = tupCommitReq->opPtr;
  ptrCheckGuard(regOperPtr, cnoOfOprec, operationrec);

  ndbrequire(regOperPtr.p->transstate == STARTED);
  regOperPtr.p->gci = tupCommitReq->gci;
  regOperPtr.p->hashValue = tupCommitReq->hashValue;

  regFragPtr.i = regOperPtr.p->fragmentPtr;
  ptrCheckGuard(regFragPtr, cnoOfFragrec, fragrecord);

  regTabPtr.i = regOperPtr.p->tableRef;
  ptrCheckGuard(regTabPtr, cnoOfTablerec, tablerec);

  if (!regTabPtr.p->tuxCustomTriggers.isEmpty()) {
    ljam();
    executeTuxCommitTriggers(signal,
                             regOperPtr.p,
                             regTabPtr.p);
  }

  if (regOperPtr.p->tupleState == NO_OTHER_OP) {
    if ((regOperPtr.p->prevActiveOp == RNIL) &&
        (regOperPtr.p->nextActiveOp == RNIL)) {
      ljam();
/* ---------------------------------------------------------- */
// We handle the simple case separately as an optimisation
/* ---------------------------------------------------------- */
      commitSimple(signal,
                   regOperPtr.p,
                   regFragPtr.p,
                   regTabPtr.p);
    } else {
/* ---------------------------------------------------------- */
// This is the first commit message of this record in this
// transaction. We will commit this record completely for this
// transaction. If there are other operations they will be
// responsible to release their own resources. Also commit of
// a delete is postponed until the last operation is committed
// on the tuple.
//
// As part of this commitRecord we will also handle detached
// triggers and release of resources for this operation.
/* ---------------------------------------------------------- */
      ljam();
      commitRecord(signal,
                   regOperPtr.p,
                   regFragPtr.p,
                   regTabPtr.p);
      removeActiveOpList(regOperPtr.p);
    }//if
  } else {
    ljam();
/* ---------------------------------------------------------- */
// Release any copy tuples
/* ---------------------------------------------------------- */
    ndbrequire(regOperPtr.p->tupleState == TO_BE_COMMITTED);
    commitUpdate(signal, regOperPtr.p, regFragPtr.p, regTabPtr.p);
    removeActiveOpList(regOperPtr.p);
  }//if
  initOpConnection(regOperPtr.p, regFragPtr.p);
}//execTUP_COMMITREQ()

void
Dbtup::updateGcpId(Signal* signal,
                   Operationrec* const regOperPtr,
                   Fragrecord* const regFragPtr,
                   Tablerec* const regTabPtr)
{
  PagePtr pagePtr;
  ljam();
//--------------------------------------------------------------------
// Is this code safe for UNDO logging. Not sure currently. RONM
//--------------------------------------------------------------------
  pagePtr.i = regOperPtr->realPageId;
  ptrCheckGuard(pagePtr, cnoOfPage, page);
  Uint32 temp = regOperPtr->pageOffset + regTabPtr->tupGCPIndex;
  ndbrequire((temp < ZWORDS_ON_PAGE) &&
             (regTabPtr->tupGCPIndex < regTabPtr->tupheadsize));
  if (isUndoLoggingNeeded(regFragPtr, regOperPtr->fragPageId)) {
    Uint32 prevGCI = pagePtr.p->pageWord[temp];
    ljam();
    cprAddUndoLogRecord(signal,
                        ZLCPR_TYPE_UPDATE_GCI,
                        regOperPtr->fragPageId,
                        regOperPtr->pageIndex,
                        regOperPtr->tableRef,
                        regOperPtr->fragId,
                        regFragPtr->checkpointVersion);
    cprAddGCIUpdate(signal,
                    prevGCI,
                    regFragPtr);
  }//if
  pagePtr.p->pageWord[temp] = regOperPtr->gci;
  if (regTabPtr->checksumIndicator) {
    ljam();
    setChecksum(pagePtr.p, regOperPtr->pageOffset, regTabPtr->tupheadsize);
  }//if
}//Dbtup::updateGcpId()

void
Dbtup::commitRecord(Signal* signal,
                    Operationrec* const regOperPtr,
                    Fragrecord* const regFragPtr,
                    Tablerec* const regTabPtr)
{
  Uint32 opType;
  OperationrecPtr firstOpPtr;
  PagePtr pagePtr;

  pagePtr.i = regOperPtr->realPageId;
  ptrCheckGuard(pagePtr, cnoOfPage, page);

  setTupleStatesSetOpType(regOperPtr, pagePtr.p, opType, firstOpPtr);

  fragptr.p = regFragPtr;
  tabptr.p = regTabPtr;

  if (opType == ZINSERT_DELETE) {
    ljam();
//--------------------------------------------------------------------
// We started by inserting the tuple and ended by deleting. Seen from
// transactions point of view no changes were made.
//--------------------------------------------------------------------
    commitUpdate(signal, regOperPtr, regFragPtr, regTabPtr);
    return;
  } else if (opType == ZINSERT) {
    ljam();
//--------------------------------------------------------------------
// We started by inserting whereafter we made several changes to the
// tuple that could include updates, deletes and new inserts. The final
// state of the tuple is the original tuple. This is reached from this
// operation. We change the optype on this operation to ZINSERT to
// ensure proper operation of the detached trigger.
// We restore the optype after executing triggers although not really
// needed.
//--------------------------------------------------------------------
    Uint32 saveOpType = regOperPtr->optype;
    regOperPtr->optype = ZINSERT;
    operPtr.p = regOperPtr;

    checkDetachedTriggers(signal,
                          regOperPtr,
                          regTabPtr);

    regOperPtr->optype = saveOpType;
  } else if (opType == ZUPDATE) {
    ljam();
//--------------------------------------------------------------------
// We want to use the first operation which contains a copy tuple
// reference. This operation contains the before value of this record
// for this transaction. Then this operation is used for executing
// triggers with optype set to update.
//--------------------------------------------------------------------
    OperationrecPtr befOpPtr;
    findBeforeValueOperation(befOpPtr, firstOpPtr);

    Uint32 saveOpType = befOpPtr.p->optype;
    Bitmask<MAXNROFATTRIBUTESINWORDS> attributeMask;
    Bitmask<MAXNROFATTRIBUTESINWORDS> saveAttributeMask;

    calculateChangeMask(pagePtr.p,
                        regTabPtr,
                        befOpPtr.p->pageOffset,
                        attributeMask);

    saveAttributeMask.clear();
    saveAttributeMask.bitOR(befOpPtr.p->changeMask);
    befOpPtr.p->changeMask.clear();
    befOpPtr.p->changeMask.bitOR(attributeMask);
    befOpPtr.p->gci = regOperPtr->gci;
    
    befOpPtr.p->optype = opType;
    operPtr.p = befOpPtr.p;
    checkDetachedTriggers(signal,
                          befOpPtr.p,
                          regTabPtr);

    befOpPtr.p->changeMask.clear();
    befOpPtr.p->changeMask.bitOR(saveAttributeMask);

    befOpPtr.p->optype = saveOpType;
  } else if (opType == ZDELETE) {
    ljam();
//--------------------------------------------------------------------
// We want to use the first operation which contains a copy tuple.
// We benefit from the fact that we know that it cannot be a simple
// delete and it cannot be an insert followed by a delete. Thus there
// must either be an update or a insert following a delete. In both
// cases we will find a before value in a copy tuple.
//
// An added complexity is that the trigger handling assumes that the
// before value is located in the original tuple so we have to move the
// copy tuple reference to the original tuple reference and afterwards
// restore it again.
//--------------------------------------------------------------------
    OperationrecPtr befOpPtr;
    findBeforeValueOperation(befOpPtr, firstOpPtr);
    Uint32 saveOpType = befOpPtr.p->optype;

    Uint32 realPageId = befOpPtr.p->realPageId;
    Uint32 pageOffset = befOpPtr.p->pageOffset;
    Uint32 fragPageId = befOpPtr.p->fragPageId;
    Uint32 pageIndex  = befOpPtr.p->pageIndex;

    befOpPtr.p->realPageId = befOpPtr.p->realPageIdC;
    befOpPtr.p->pageOffset = befOpPtr.p->pageOffsetC;
    befOpPtr.p->fragPageId = befOpPtr.p->fragPageIdC;
    befOpPtr.p->pageIndex  = befOpPtr.p->pageIndexC;
    befOpPtr.p->gci = regOperPtr->gci;

    befOpPtr.p->optype = opType;
    operPtr.p = befOpPtr.p;
    checkDetachedTriggers(signal,
                          befOpPtr.p,
                          regTabPtr);

    befOpPtr.p->realPageId = realPageId;
    befOpPtr.p->pageOffset = pageOffset;
    befOpPtr.p->fragPageId = fragPageId;
    befOpPtr.p->pageIndex  = pageIndex;
    befOpPtr.p->optype     = saveOpType;
  } else {
    ndbrequire(false);
  }//if

  commitUpdate(signal, regOperPtr, regFragPtr, regTabPtr);
  if (regTabPtr->GCPIndicator) {
    updateGcpId(signal, regOperPtr, regFragPtr, regTabPtr);
  }//if
}//Dbtup::commitRecord()

void
Dbtup::setTupleStatesSetOpType(Operationrec* const regOperPtr,
                               Page* const pagePtr,
                               Uint32& opType,
                               OperationrecPtr& firstOpPtr)
{
  OperationrecPtr loopOpPtr;
  OperationrecPtr lastOpPtr;

  ndbrequire(regOperPtr->pageOffset < ZWORDS_ON_PAGE);
  loopOpPtr.i = pagePtr->pageWord[regOperPtr->pageOffset];
  ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
  lastOpPtr = loopOpPtr;
  if (loopOpPtr.p->optype == ZDELETE) {
    ljam();
    opType = ZDELETE;
  } else {
    ljam();
    opType = ZUPDATE;
  }//if
  do {
    ljam();
    ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
    firstOpPtr = loopOpPtr;
    loopOpPtr.p->tupleState = TO_BE_COMMITTED;
    loopOpPtr.i = loopOpPtr.p->nextActiveOp;
  } while (loopOpPtr.i != RNIL);
  if (opType == ZDELETE) {
    ljam();
    if (firstOpPtr.p->optype == ZINSERT) {
      ljam();
      opType = ZINSERT_DELETE;
    }//if
  } else {
    ljam();
    if (firstOpPtr.p->optype == ZINSERT) {
      ljam();
      opType = ZINSERT;
    }//if
  }///if
}//Dbtup::setTupleStatesSetOpType()

void Dbtup::findBeforeValueOperation(OperationrecPtr& befOpPtr,
                                     OperationrecPtr firstOpPtr)
{
  befOpPtr = firstOpPtr;
  if (befOpPtr.p->realPageIdC != RNIL) {
    ljam();
    return;
  } else {
    ljam();
    befOpPtr.i = befOpPtr.p->prevActiveOp;
    ptrCheckGuard(befOpPtr, cnoOfOprec, operationrec);
    ndbrequire(befOpPtr.p->realPageIdC != RNIL);
  }//if
}//Dbtup::findBeforeValueOperation()

void
Dbtup::calculateChangeMask(Page* const pagePtr,
                           Tablerec* const regTabPtr,
                           Uint32 pageOffset,
                           Bitmask<MAXNROFATTRIBUTESINWORDS>& attributeMask)
{
  OperationrecPtr loopOpPtr;

  attributeMask.clear();
  ndbrequire(pageOffset < ZWORDS_ON_PAGE);
  loopOpPtr.i = pagePtr->pageWord[pageOffset];
  do {
    ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
    if (loopOpPtr.p->optype == ZUPDATE) {
      ljam();
      attributeMask.bitOR(loopOpPtr.p->changeMask);
    } else if (loopOpPtr.p->optype == ZINSERT) {
      ljam();
      attributeMask.set();
      return;
    } else {
      ndbrequire(loopOpPtr.p->optype == ZDELETE);
    }//if
    loopOpPtr.i = loopOpPtr.p->nextActiveOp;
  } while (loopOpPtr.i != RNIL);
}//Dbtup::calculateChangeMask()