From 73360e558228c8d8d1869d54e62f46b0f77a324f Mon Sep 17 00:00:00 2001
From: "mronstrom@mysql.com" <>
Date: Tue, 20 Jul 2004 00:23:49 +0200
Subject: [PATCH] Bug #4479 Ensures that the node doesn't crash by overflowing
 the UNDO log buffer at local checkpoints. Inserts a real-time break after 512
 operations and when low on UNDO log buffer.

---
 ndb/src/kernel/blocks/dbacc/Dbacc.hpp     |   2 +
 ndb/src/kernel/blocks/dbacc/DbaccMain.cpp | 106 ++++++++++++++++------
 2 files changed, 78 insertions(+), 30 deletions(-)

diff --git a/ndb/src/kernel/blocks/dbacc/Dbacc.hpp b/ndb/src/kernel/blocks/dbacc/Dbacc.hpp
index 6ba2d083e58..5185e91caac 100644
--- a/ndb/src/kernel/blocks/dbacc/Dbacc.hpp
+++ b/ndb/src/kernel/blocks/dbacc/Dbacc.hpp
@@ -218,6 +218,7 @@ ndbout << "Ptr: " << ptr.p->word32 << " \tIndex: " << tmp_string << " \tValue: "
 #define ZREL_FRAG 6
 #define ZREL_DIR 7
 #define ZREPORT_MEMORY_USAGE 8
+#define ZLCP_OP_WRITE_RT_BREAK 9
 
 /* ------------------------------------------------------------------------- */
 /* ERROR CODES                                                               */
@@ -1190,6 +1191,7 @@ private:
   void zpagesize_error(const char* where);
 
   void reportMemoryUsage(Signal* signal, int gth);
+  void lcp_write_op_to_undolog(Signal* signal);
 
 
   // Initialisation
diff --git a/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp b/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
index 933ee2cf8e1..ccc1acdd273 100644
--- a/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
+++ b/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
@@ -46,13 +46,17 @@ Dbacc::remainingUndoPages(){
   ndbrequire(HeadPage>=TailPage);
 
   Uint32 UsedPages = HeadPage - TailPage;
-  Uint32 Remaining = cundopagesize - UsedPages;
+  Int32 Remaining = cundopagesize - UsedPages;
 
   // There can not be more than cundopagesize remaining
-  ndbrequire(Remaining<=cundopagesize);
-
+  if (Remaining <= 0){
+    // No more undolog, crash node
+    progError(__LINE__,
+	      ERR_NO_MORE_UNDOLOG,
+	      "There are more than 1Mbyte undolog writes outstanding");
+  }
   return Remaining;
-}//Dbacc::remainingUndoPages()
+}
 
 void
 Dbacc::updateLastUndoPageIdWritten(Signal* signal, Uint32 aNewValue){
@@ -193,6 +197,17 @@ void Dbacc::execCONTINUEB(Signal* signal)
     return;
   }
 
+  case ZLCP_OP_WRITE_RT_BREAK:
+  {
+    operationRecPtr.i= signal->theData[1];
+    fragrecptr.i= signal->theData[2];
+    lcpConnectptr.i= signal->theData[3];
+    ptrCheckGuard(operationRecPtr, coprecsize, operationrec);
+    ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
+    ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec);
+    lcp_write_op_to_undolog(signal);
+    return;
+  }
   default:
     ndbrequire(false);
     break;
@@ -7697,32 +7712,70 @@ void Dbacc::execACC_LCPREQ(Signal* signal)
   fragrecptr.p->lcpMaxOverDirIndex = fragrecptr.p->lastOverIndex;
   fragrecptr.p->createLcp = ZTRUE;
   operationRecPtr.i = fragrecptr.p->lockOwnersList;
-  while (operationRecPtr.i != RNIL) {
+  lcp_write_op_to_undolog(signal);
+}
+
+void
+Dbacc::lcp_write_op_to_undolog(Signal* signal)
+{
+  bool delay_continueb= false;
+  Uint32 i, j;
+  for (i= 0; i < 16; i++) {
     jam();
-    ptrCheckGuard(operationRecPtr, coprecsize, operationrec);
+    if (remainingUndoPages() <= ZMIN_UNDO_PAGES_AT_COMMIT) {
+      jam();
+      delay_continueb= true;
+      break;
+    }
+    for (j= 0; j < 32; j++) {
+      if (operationRecPtr.i == RNIL) {
+        jam();
+        break;
+      }
+      jam();
+      ptrCheckGuard(operationRecPtr, coprecsize, operationrec);
 
-    if ((operationRecPtr.p->operation == ZINSERT) ||
-	(operationRecPtr.p->elementIsDisappeared == ZTRUE)){
+      if ((operationRecPtr.p->operation == ZINSERT) ||
+          (operationRecPtr.p->elementIsDisappeared == ZTRUE)){
       /*******************************************************************
        * Only log inserts and elements that are marked as dissapeared.
        * All other operations update the element header and that is handled
        * when pages are written to disk
        ********************************************************************/
-      undopageptr.i = (cundoposition>>ZUNDOPAGEINDEXBITS) & (cundopagesize-1);
-      ptrAss(undopageptr, undopage);
-      theadundoindex = cundoposition & ZUNDOPAGEINDEX_MASK;
-      tundoindex = theadundoindex + ZUNDOHEADSIZE;
-
-      writeUndoOpInfo(signal);/* THE INFORMATION ABOUT ELEMENT HEADER, STORED*/
-                              /* IN OP REC, IS WRITTEN AT UNDO PAGES */
-      cundoElemIndex = 0;/* DEFAULT VALUE USED BY WRITE_UNDO_HEADER SUBROTINE */
-      writeUndoHeader(signal, RNIL, UndoHeader::ZOP_INFO); /* WRITE THE HEAD OF THE UNDO ELEMENT */
-      checkUndoPages(signal);	/* SEND UNDO PAGE TO DISK WHEN A GROUP OF  */
-                                /* UNDO PAGES,CURRENTLY 8, IS FILLED */
-    }//if
+        undopageptr.i = (cundoposition>>ZUNDOPAGEINDEXBITS) & (cundopagesize-1);
+        ptrAss(undopageptr, undopage);
+        theadundoindex = cundoposition & ZUNDOPAGEINDEX_MASK;
+        tundoindex = theadundoindex + ZUNDOHEADSIZE;
 
-    operationRecPtr.i = operationRecPtr.p->nextLockOwnerOp;
-  }//while
+        writeUndoOpInfo(signal);/* THE INFORMATION ABOUT ELEMENT HEADER, STORED*/
+                                /* IN OP REC, IS WRITTEN AT UNDO PAGES */
+        cundoElemIndex = 0;/* DEFAULT VALUE USED BY WRITE_UNDO_HEADER SUBROTINE */
+        writeUndoHeader(signal, RNIL, UndoHeader::ZOP_INFO); /* WRITE THE HEAD OF THE UNDO ELEMENT */
+        checkUndoPages(signal);	/* SEND UNDO PAGE TO DISK WHEN A GROUP OF  */
+                                /* UNDO PAGES,CURRENTLY 8, IS FILLED */
+      }
+      operationRecPtr.i = operationRecPtr.p->nextLockOwnerOp;
+    }
+    if (operationRecPtr.i == RNIL) {
+      jam();
+      break;
+    }
+  }
+  if (operationRecPtr.i != RNIL) {
+    jam();
+    signal->theData[0]= ZLCP_OP_WRITE_RT_BREAK;
+    signal->theData[1]= operationRecPtr.i;
+    signal->theData[2]= fragrecptr.i;
+    signal->theData[3]= lcpConnectptr.i;
+    if (delay_continueb) {
+      jam();
+      sendSignalWithDelay(cownBlockref, GSN_CONTINUEB, signal, 10, 4);
+    } else {
+      jam();
+      sendSignal(cownBlockref, GSN_CONTINUEB, signal, 4, JBB);
+    }
+    return;
+  }
 
   signal->theData[0] = fragrecptr.p->lcpLqhPtr;
   sendSignal(lcpConnectptr.p->lcpUserblockref, GSN_ACC_LCPSTARTED, 
@@ -7735,8 +7788,7 @@ void Dbacc::execACC_LCPREQ(Signal* signal)
   signal->theData[0] = lcpConnectptr.i;
   signal->theData[1] = fragrecptr.i;
   sendSignal(cownBlockref, GSN_ACC_SAVE_PAGES, signal, 2, JBB);
-  return;
-}//Dbacc::execACC_LCPREQ()
+}
 
 /* ******************--------------------------------------------------------------- */
 /* ACC_SAVE_PAGES           A GROUP OF PAGES IS ALLOCATED. THE PAGES AND OVERFLOW    */
@@ -8595,12 +8647,6 @@ void Dbacc::checkUndoPages(Signal* signal)
    * RECORDS IN
    */
   Uint16 nextUndoPageId = tundoPageId + 1;
-  if (nextUndoPageId > (clastUndoPageIdWritten + cundopagesize)){
-    // No more undolog, crash node
-    progError(__LINE__,
-	      ERR_NO_MORE_UNDOLOG,
-	      "There are more than 1Mbyte undolog writes outstanding");
-  }
   updateUndoPositionPage(signal, nextUndoPageId << ZUNDOPAGEINDEXBITS);
 
   if ((tundoPageId & (ZWRITE_UNDOPAGESIZE - 1)) == (ZWRITE_UNDOPAGESIZE - 1)) {
-- 
2.30.9