DbtuxNode.cpp 14.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#define DBTUX_NODE_CPP
#include "Dbtux.hpp"

/*
unknown's avatar
unknown committed
21
 * Allocate index node in TUP.
22
 */
unknown's avatar
unknown committed
23 24
int
Dbtux::allocNode(Signal* signal, NodeHandle& node)
25
{
unknown's avatar
unknown committed
26
  Frag& frag = node.m_frag;
27 28 29
  Uint32 pageId = NullTupLoc.m_pageId;
  Uint32 pageOffset = NullTupLoc.m_pageOffset;
  Uint32* node32 = 0;
unknown's avatar
unknown committed
30 31
  int errorCode = c_tup->tuxAllocNode(signal, frag.m_tupIndexFragPtrI, pageId, pageOffset, node32);
  if (errorCode == 0) {
32
    jam();
unknown's avatar
unknown committed
33 34 35 36
    node.m_loc = TupLoc(pageId, pageOffset);
    node.m_node = reinterpret_cast<TreeNode*>(node32);
    node.m_acc = AccNone;
    ndbrequire(node.m_loc != NullTupLoc && node.m_node != 0);
37
  }
unknown's avatar
unknown committed
38
  return errorCode;
39 40 41
}

/*
unknown's avatar
unknown committed
42
 * Access more of the node.
43 44
 */
void
unknown's avatar
unknown committed
45
Dbtux::accessNode(Signal* signal, NodeHandle& node, AccSize acc)
46
{
unknown's avatar
unknown committed
47 48 49 50 51
  ndbrequire(node.m_loc != NullTupLoc && node.m_node != 0);
  if (node.m_acc >= acc)
    return;
  // XXX could do prefetch
  node.m_acc = acc;
52 53 54
}

/*
unknown's avatar
unknown committed
55
 * Set handle to point to existing node.
56 57
 */
void
unknown's avatar
unknown committed
58
Dbtux::selectNode(Signal* signal, NodeHandle& node, TupLoc loc, AccSize acc)
59
{
unknown's avatar
unknown committed
60 61 62 63 64 65 66 67 68 69 70
  Frag& frag = node.m_frag;
  ndbrequire(loc != NullTupLoc);
  Uint32 pageId = loc.m_pageId;
  Uint32 pageOffset = loc.m_pageOffset;
  Uint32* node32 = 0;
  c_tup->tuxGetNode(frag.m_tupIndexFragPtrI, pageId, pageOffset, node32);
  node.m_loc = loc;
  node.m_node = reinterpret_cast<TreeNode*>(node32);
  node.m_acc = AccNone;
  ndbrequire(node.m_loc != NullTupLoc && node.m_node != 0);
  accessNode(signal, node, acc);
71 72 73
}

/*
unknown's avatar
unknown committed
74
 * Set handle to point to new node.  Uses the pre-allocated node.
75 76
 */
void
unknown's avatar
unknown committed
77
Dbtux::insertNode(Signal* signal, NodeHandle& node, AccSize acc)
78
{
unknown's avatar
unknown committed
79 80 81 82 83 84 85 86 87 88 89 90
  Frag& frag = node.m_frag;
  TupLoc loc = frag.m_freeLoc;
  frag.m_freeLoc = NullTupLoc;
  selectNode(signal, node, loc, acc);
  new (node.m_node) TreeNode();
#ifdef VM_TRACE
  TreeHead& tree = frag.m_tree;
  memset(tree.getPref(node.m_node, 0), 0xa2, tree.m_prefSize << 2);
  memset(tree.getPref(node.m_node, 1), 0xa2, tree.m_prefSize << 2);
  TreeEnt* entList = tree.getEntList(node.m_node);
  memset(entList, 0xa4, (tree.m_maxOccup + 1) * (TreeEntSize << 2));
#endif
91 92 93
}

/*
94
 * Delete existing node.
95 96
 */
void
unknown's avatar
unknown committed
97
Dbtux::deleteNode(Signal* signal, NodeHandle& node)
98
{
unknown's avatar
unknown committed
99 100 101 102 103 104
  Frag& frag = node.m_frag;
  ndbrequire(node.getOccup() == 0);
  TupLoc loc = node.m_loc;
  Uint32 pageId = loc.m_pageId;
  Uint32 pageOffset = loc.m_pageOffset;
  Uint32* node32 = reinterpret_cast<Uint32*>(node.m_node);
105 106
  c_tup->tuxFreeNode(signal, frag.m_tupIndexFragPtrI, pageId, pageOffset, node32);
  // invalidate handle and storage
unknown's avatar
unknown committed
107 108
  node.m_loc = NullTupLoc;
  node.m_node = 0;
109 110 111 112 113 114
}

/*
 * Set prefix.
 */
void
115
Dbtux::setNodePref(Signal* signal, NodeHandle& node, unsigned i)
116
{
117
  Frag& frag = node.m_frag;
118 119 120
  TreeHead& tree = frag.m_tree;
  ReadPar readPar;
  ndbrequire(i <= 1);
121
  readPar.m_ent = node.getMinMax(i);
122 123 124 125 126 127 128 129 130 131 132
  readPar.m_first = 0;
  readPar.m_count = frag.m_numAttrs;
  // leave in signal data
  readPar.m_data = 0;
  // XXX implement max words to read
  tupReadAttrs(signal, frag, readPar);
  // copy whatever fits
  CopyPar copyPar;
  copyPar.m_items = readPar.m_count;
  copyPar.m_headers = true;
  copyPar.m_maxwords = tree.m_prefSize;
133
  Data pref = node.getPref(i);
134 135 136
  copyAttrs(pref, readPar.m_data, copyPar);
}

137
// node operations
138 139 140 141 142 143 144 145 146 147 148

/*
 * Add entry at position.  Move entries greater than or equal to the old
 * one (if any) to the right.
 *
 *            X
 *            v
 *      A B C D E _ _  =>  A B C X D E _
 *      0 1 2 3 4 5 6      0 1 2 3 4 5 6
 */
void
149
Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent)
150
{
151 152 153
  Frag& frag = node.m_frag;
  TreeHead& tree = frag.m_tree;
  const unsigned occup = node.getOccup();
154 155 156
  ndbrequire(occup < tree.m_maxOccup && pos <= occup);
  // fix scans
  ScanOpPtr scanPtr;
157
  scanPtr.i = node.getNodeScan();
158 159
  while (scanPtr.i != RNIL) {
    jam();
160
    c_scanOpPool.getPtr(scanPtr);
161
    TreePos& scanPos = scanPtr.p->m_scanPos;
162
    ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
163 164 165
    if (scanPos.m_pos >= pos) {
      jam();
#ifdef VM_TRACE
166 167 168
      if (debugFlags & DebugScan) {
        debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl;
        debugOut << "At pushUp pos=" << pos << " " << node << endl;
169 170 171 172 173 174 175
      }
#endif
      scanPos.m_pos++;
    }
    scanPtr.i = scanPtr.p->m_nodeScan;
  }
  // fix node
176
  TreeEnt* const entList = tree.getEntList(node.m_node);
177 178 179 180 181 182 183 184
  entList[occup] = entList[0];
  TreeEnt* const tmpList = entList + 1;
  for (unsigned i = occup; i > pos; i--) {
    jam();
    tmpList[i] = tmpList[i - 1];
  }
  tmpList[pos] = ent;
  entList[0] = entList[occup + 1];
185
  node.setOccup(occup + 1);
186 187
  // fix prefixes
  if (occup == 0 || pos == 0)
188
    setNodePref(signal, node, 0);
189
  if (occup == 0 || pos == occup)
190
    setNodePref(signal, node, 1);
191 192 193 194
}

/*
 * Remove and return entry at position.  Move entries greater than the
195
 * removed one to the left.  This is the opposite of nodePushUp.
196 197 198 199 200 201 202
 *
 *                               D
 *            ^                  ^
 *      A B C D E F _  =>  A B C E F _ _
 *      0 1 2 3 4 5 6      0 1 2 3 4 5 6
 */
void
203
Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
204
{
205 206 207
  Frag& frag = node.m_frag;
  TreeHead& tree = frag.m_tree;
  const unsigned occup = node.getOccup();
208 209 210
  ndbrequire(occup <= tree.m_maxOccup && pos < occup);
  ScanOpPtr scanPtr;
  // move scans whose entry disappears
211
  scanPtr.i = node.getNodeScan();
212 213
  while (scanPtr.i != RNIL) {
    jam();
214
    c_scanOpPool.getPtr(scanPtr);
215
    TreePos& scanPos = scanPtr.p->m_scanPos;
216
    ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
217 218 219 220
    const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
    if (scanPos.m_pos == pos) {
      jam();
#ifdef VM_TRACE
221 222 223
      if (debugFlags & DebugScan) {
        debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
        debugOut << "At popDown pos=" << pos << " " << node << endl;
224 225
      }
#endif
226
      scanNext(signal, scanPtr);
227 228 229 230
    }
    scanPtr.i = nextPtrI;
  }
  // fix other scans
231
  scanPtr.i = node.getNodeScan();
232 233
  while (scanPtr.i != RNIL) {
    jam();
234
    c_scanOpPool.getPtr(scanPtr);
235
    TreePos& scanPos = scanPtr.p->m_scanPos;
236
    ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
237 238 239 240
    ndbrequire(scanPos.m_pos != pos);
    if (scanPos.m_pos > pos) {
      jam();
#ifdef VM_TRACE
241 242 243
      if (debugFlags & DebugScan) {
        debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl;
        debugOut << "At popDown pos=" << pos << " " << node << endl;
244 245 246 247 248 249 250
      }
#endif
      scanPos.m_pos--;
    }
    scanPtr.i = scanPtr.p->m_nodeScan;
  }
  // fix node
251
  TreeEnt* const entList = tree.getEntList(node.m_node);
252 253 254 255 256 257 258 259
  entList[occup] = entList[0];
  TreeEnt* const tmpList = entList + 1;
  ent = tmpList[pos];
  for (unsigned i = pos; i < occup - 1; i++) {
    jam();
    tmpList[i] = tmpList[i + 1];
  }
  entList[0] = entList[occup - 1];
260
  node.setOccup(occup - 1);
261 262
  // fix prefixes
  if (occup != 1 && pos == 0)
263
    setNodePref(signal, node, 0);
264
  if (occup != 1 && pos == occup - 1)
265
    setNodePref(signal, node, 1);
266 267 268 269 270 271 272 273 274 275 276 277
}

/*
 * Add entry at existing position.  Move entries less than or equal to
 * the old one to the left.  Remove and return old min entry.
 *
 *            X            A
 *      ^     v            ^
 *      A B C D E _ _  =>  B C D X E _ _
 *      0 1 2 3 4 5 6      0 1 2 3 4 5 6
 */
void
278
Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
279
{
280 281 282
  Frag& frag = node.m_frag;
  TreeHead& tree = frag.m_tree;
  const unsigned occup = node.getOccup();
283 284 285
  ndbrequire(occup <= tree.m_maxOccup && pos < occup);
  ScanOpPtr scanPtr;
  // move scans whose entry disappears
286
  scanPtr.i = node.getNodeScan();
287 288
  while (scanPtr.i != RNIL) {
    jam();
289
    c_scanOpPool.getPtr(scanPtr);
290
    TreePos& scanPos = scanPtr.p->m_scanPos;
291
    ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
292 293 294 295
    const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
    if (scanPos.m_pos == 0) {
      jam();
#ifdef VM_TRACE
296 297 298
      if (debugFlags & DebugScan) {
        debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
        debugOut << "At pushDown pos=" << pos << " " << node << endl;
299 300 301
      }
#endif
      // here we may miss a valid entry "X"  XXX known bug
302
      scanNext(signal, scanPtr);
303 304 305 306
    }
    scanPtr.i = nextPtrI;
  }
  // fix other scans
307
  scanPtr.i = node.getNodeScan();
308 309
  while (scanPtr.i != RNIL) {
    jam();
310
    c_scanOpPool.getPtr(scanPtr);
311
    TreePos& scanPos = scanPtr.p->m_scanPos;
312
    ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
313 314 315 316
    ndbrequire(scanPos.m_pos != 0);
    if (scanPos.m_pos <= pos) {
      jam();
#ifdef VM_TRACE
317 318 319
      if (debugFlags & DebugScan) {
        debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl;
        debugOut << "At pushDown pos=" << pos << " " << node << endl;
320 321 322 323 324 325 326
      }
#endif
      scanPos.m_pos--;
    }
    scanPtr.i = scanPtr.p->m_nodeScan;
  }
  // fix node
327
  TreeEnt* const entList = tree.getEntList(node.m_node);
328 329 330 331 332 333 334 335 336
  entList[occup] = entList[0];
  TreeEnt* const tmpList = entList + 1;
  TreeEnt oldMin = tmpList[0];
  for (unsigned i = 0; i < pos; i++) {
    jam();
    tmpList[i] = tmpList[i + 1];
  }
  tmpList[pos] = ent;
  ent = oldMin;
337 338
  entList[0] = entList[occup];
  // fix prefixes
339
  if (true)
340
    setNodePref(signal, node, 0);
341
  if (occup == 1 || pos == occup - 1)
342
    setNodePref(signal, node, 1);
343 344 345 346 347
}

/*
 * Remove and return entry at position.  Move entries less than the
 * removed one to the right.  Replace min entry by the input entry.
348
 * This is the opposite of nodePushDown.
349 350 351 352 353 354 355
 *
 *      X                        D
 *      v     ^                  ^
 *      A B C D E _ _  =>  X A B C E _ _
 *      0 1 2 3 4 5 6      0 1 2 3 4 5 6
 */
void
356
Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
357
{
358 359 360
  Frag& frag = node.m_frag;
  TreeHead& tree = frag.m_tree;
  const unsigned occup = node.getOccup();
361 362 363
  ndbrequire(occup <= tree.m_maxOccup && pos < occup);
  ScanOpPtr scanPtr;
  // move scans whose entry disappears
364
  scanPtr.i = node.getNodeScan();
365 366
  while (scanPtr.i != RNIL) {
    jam();
367
    c_scanOpPool.getPtr(scanPtr);
368
    TreePos& scanPos = scanPtr.p->m_scanPos;
369
    ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
370 371 372 373
    const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
    if (scanPos.m_pos == pos) {
      jam();
#ifdef VM_TRACE
374 375 376
      if (debugFlags & DebugScan) {
        debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
        debugOut << "At popUp pos=" << pos << " " << node << endl;
377 378 379
      }
#endif
      // here we may miss a valid entry "X"  XXX known bug
380
      scanNext(signal, scanPtr);
381 382 383 384
    }
    scanPtr.i = nextPtrI;
  }
  // fix other scans
385
  scanPtr.i = node.getNodeScan();
386 387
  while (scanPtr.i != RNIL) {
    jam();
388
    c_scanOpPool.getPtr(scanPtr);
389
    TreePos& scanPos = scanPtr.p->m_scanPos;
390
    ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
391 392 393 394
    ndbrequire(scanPos.m_pos != pos);
    if (scanPos.m_pos < pos) {
      jam();
#ifdef VM_TRACE
395 396 397
      if (debugFlags & DebugScan) {
        debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl;
        debugOut << "At popUp pos=" << pos << " " << node << endl;
398 399 400 401 402 403 404
      }
#endif
      scanPos.m_pos++;
    }
    scanPtr.i = scanPtr.p->m_nodeScan;
  }
  // fix node
405
  TreeEnt* const entList = tree.getEntList(node.m_node);
406 407 408 409 410 411 412 413 414
  entList[occup] = entList[0];
  TreeEnt* const tmpList = entList + 1;
  TreeEnt newMin = ent;
  ent = tmpList[pos];
  for (unsigned i = pos; i > 0; i--) {
    jam();
    tmpList[i] = tmpList[i - 1];
  }
  tmpList[0] = newMin;
415 416
  entList[0] = entList[occup];
  // fix prefixes
417
  if (true)
418
    setNodePref(signal, node, 0);
419
  if (occup == 1 || pos == occup - 1)
420
    setNodePref(signal, node, 1);
421 422 423 424 425 426 427
}

/*
 * Move all possible entries from another node before the min (i=0) or
 * after the max (i=1).  XXX can be optimized
 */
void
428
Dbtux::nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsigned i)
429
{
430 431
  Frag& frag = dstNode.m_frag;
  TreeHead& tree = frag.m_tree;
432
  ndbrequire(i <= 1);
433
  while (dstNode.getOccup() < tree.m_maxOccup && srcNode.getOccup() != 0) {
434
    TreeEnt ent;
435 436
    nodePopDown(signal, srcNode, i == 0 ? srcNode.getOccup() - 1 : 0, ent);
    nodePushUp(signal, dstNode, i == 0 ? 0 : dstNode.getOccup(), ent);
437 438 439 440 441 442 443 444
  }
}

/*
 * Link scan to the list under the node.  The list is single-linked and
 * ordering does not matter.
 */
void
445
Dbtux::linkScan(NodeHandle& node, ScanOpPtr scanPtr)
446 447
{
#ifdef VM_TRACE
448 449 450
  if (debugFlags & DebugScan) {
    debugOut << "Link scan " << scanPtr.i << " " << *scanPtr.p << endl;
    debugOut << "To node " << node << endl;
451 452
  }
#endif
453 454 455
  ndbrequire(! islinkScan(node, scanPtr) && scanPtr.p->m_nodeScan == RNIL);
  scanPtr.p->m_nodeScan = node.getNodeScan();
  node.setNodeScan(scanPtr.i);
456 457 458 459 460 461
}

/*
 * Unlink a scan from the list under the node.
 */
void
462
Dbtux::unlinkScan(NodeHandle& node, ScanOpPtr scanPtr)
463 464
{
#ifdef VM_TRACE
465 466 467
  if (debugFlags & DebugScan) {
    debugOut << "Unlink scan " << scanPtr.i << " " << *scanPtr.p << endl;
    debugOut << "From node " << node << endl;
468 469
  }
#endif
470 471 472
  ScanOpPtr currPtr;
  currPtr.i = node.getNodeScan();
  ScanOpPtr prevPtr;
473 474 475
  prevPtr.i = RNIL;
  while (true) {
    jam();
476
    c_scanOpPool.getPtr(currPtr);
477 478 479 480
    Uint32 nextPtrI = currPtr.p->m_nodeScan;
    if (currPtr.i == scanPtr.i) {
      jam();
      if (prevPtr.i == RNIL) {
481
        node.setNodeScan(nextPtrI);
482 483 484 485 486 487
      } else {
        jam();
        prevPtr.p->m_nodeScan = nextPtrI;
      }
      scanPtr.p->m_nodeScan = RNIL;
      // check for duplicates
488
      ndbrequire(! islinkScan(node, scanPtr));
489 490 491 492 493 494 495 496 497 498 499
      return;
    }
    prevPtr = currPtr;
    currPtr.i = nextPtrI;
  }
}

/*
 * Check if a scan is linked to this node.  Only for ndbrequire.
 */
bool
500
Dbtux::islinkScan(NodeHandle& node, ScanOpPtr scanPtr)
501
{
502 503
  ScanOpPtr currPtr;
  currPtr.i = node.getNodeScan();
504 505
  while (currPtr.i != RNIL) {
    jam();
506
    c_scanOpPool.getPtr(currPtr);
507 508 509 510 511 512 513 514 515 516
    if (currPtr.i == scanPtr.i) {
      jam();
      return true;
    }
    currPtr.i = currPtr.p->m_nodeScan;
  }
  return false;
}

void
517
Dbtux::NodeHandle::progError(int line, int cause, const char* file)
518
{
519
  ErrorReporter::handleAssert("Dbtux::NodeHandle: assert failed", file, line);
520
}