Commit 6f24d1eb authored by mronstrom@mysql.com's avatar mronstrom@mysql.com

Two level data access of operation i-value insted of huge fixed

size array.
Added a number of subroutines for this
parent 99996ff3
......@@ -532,8 +532,9 @@ public:
SCAN = 1,
COPY = 2
};
UintR scan_acc_op_ptr[MAX_PARALLEL_OP_PER_SCAN];
UintR scan_acc_op_ptr[32];
Uint32 scan_acc_index;
Uint32 scan_acc_attr_recs;
UintR scanApiOpPtr;
UintR scanLocalref[2];
Uint32 scan_batch_len;
......@@ -2226,6 +2227,7 @@ private:
void release_acc_ptr_list(ScanRecord*);
Uint32 get_acc_ptr_from_scan_record(ScanRecord*, Uint32);
void set_acc_ptr_in_scan_record(ScanRecord*, Uint32, Uint32);
void get_acc_ptr(ScanRecord*, Uint32*, Uint32);
void removeTable(Uint32 tableId);
void sendLCP_COMPLETE_REP(Signal* signal, Uint32 lcpId);
......@@ -2392,6 +2394,8 @@ private:
int saveTupattrbuf(Signal* signal, Uint32* dataPtr, Uint32 length);
void seizeAddfragrec(Signal* signal);
void seizeAttrinbuf(Signal* signal);
Uint32 seize_attrinbuf();
Uint32 release_attrinbuf(Uint32);
void seizeFragmentrec(Signal* signal);
void seizePageRef(Signal* signal);
void seizeTcrec();
......@@ -2595,13 +2599,14 @@ private:
UintR cfirstfreeAddfragrec;
UintR caddfragrecFileSize;
#define ZATTRINBUF_FILE_SIZE 10000 // 1.25 MByte
#define ZATTRINBUF_FILE_SIZE 12288 // 1.5 MByte
#define ZINBUF_DATA_LEN 24 /* POSITION OF 'DATA LENGHT'-VARIABLE. */
#define ZINBUF_NEXT 25 /* POSITION OF 'NEXT'-VARIABLE. */
Attrbuf *attrbuf;
AttrbufPtr attrinbufptr;
UintR cfirstfreeAttrinbuf;
UintR cattrinbufFileSize;
Uint32 c_no_attrinbuf_recs;
#define ZDATABUF_FILE_SIZE 10000 // 200 kByte
Databuf *databuf;
......
......@@ -27,6 +27,7 @@ void Dblqh::initData()
{
caddfragrecFileSize = ZADDFRAGREC_FILE_SIZE;
cattrinbufFileSize = ZATTRINBUF_FILE_SIZE;
c_no_attrinbuf_recs= ZATTRINBUF_FILE_SIZE;
cdatabufFileSize = ZDATABUF_FILE_SIZE;
cfragrecFileSize = 0;
cgcprecFileSize = ZGCPREC_FILE_SIZE;
......
......@@ -3086,10 +3086,9 @@ void Dblqh::seizeAttrinbuf(Signal* signal)
Attrbuf *regAttrbuf = attrbuf;
Uint32 tattrinbufFileSize = cattrinbufFileSize;
regAttrinbufptr.i = cfirstfreeAttrinbuf;
regAttrinbufptr.i = seize_attrinbuf();
tmpAttrinbufptr.i = tcConnectptr.p->lastAttrinbuf;
ptrCheckGuard(regAttrinbufptr, tattrinbufFileSize, regAttrbuf);
Uint32 nextFirst = regAttrinbufptr.p->attrbuf[ZINBUF_NEXT];
tcConnectptr.p->lastAttrinbuf = regAttrinbufptr.i;
regAttrinbufptr.p->attrbuf[ZINBUF_DATA_LEN] = 0;
if (tmpAttrinbufptr.i == RNIL) {
......@@ -3101,7 +3100,6 @@ void Dblqh::seizeAttrinbuf(Signal* signal)
tmpAttrinbufptr.p->attrbuf[ZINBUF_NEXT] = regAttrinbufptr.i;
}//if
regAttrinbufptr.p->attrbuf[ZINBUF_NEXT] = RNIL;
cfirstfreeAttrinbuf = nextFirst;
attrinbufptr = regAttrinbufptr;
}//Dblqh::seizeAttrinbuf()
......@@ -4698,11 +4696,7 @@ void Dblqh::releaseOprec(Signal* signal)
* ####################################################################### */
while (regAttrinbufptr.i != RNIL) {
jam();
ptrCheckGuard(regAttrinbufptr, cattrinbufFileSize, attrbuf);
Tmpbuf = regAttrinbufptr.p->attrbuf[ZINBUF_NEXT];
regAttrinbufptr.p->attrbuf[ZINBUF_NEXT] = cfirstfreeAttrinbuf;
cfirstfreeAttrinbuf = regAttrinbufptr.i;
regAttrinbufptr.i = Tmpbuf;
regAttrinbufptr.i= release_attrinbuf(regAttrinbufptr.i);
}//while
regTcPtr->firstAttrinbuf = RNIL;
regTcPtr->lastAttrinbuf = RNIL;
......@@ -7221,6 +7215,18 @@ void Dblqh::scanLockReleasedLab(Signal* signal)
bool
Dblqh::seize_acc_ptr_list(ScanRecord* scanP, Uint32 batch_size)
{
Uint32 i, attr_buf_recs;
if (batch_size > 1) {
attr_buf_recs= (batch_size + 30) / 32;
if (c_no_attrinbuf_recs < attr_buf_recs) {
jam();
return false;
}
for (i= 1; i <= attr_buf_recs; i++) {
scanP->scan_acc_op_ptr[i]= seize_attrinbuf();
}
}
scanP->scan_acc_attr_recs= attr_buf_recs;
scanP->scan_acc_index = 0;
return true;
}
......@@ -7228,31 +7234,89 @@ Dblqh::seize_acc_ptr_list(ScanRecord* scanP, Uint32 batch_size)
void
Dblqh::release_acc_ptr_list(ScanRecord* scanP)
{
Uint32 i, attr_buf_recs;
attr_buf_recs= scanP->scan_acc_attr_recs;
for (i= 1; i < attr_buf_recs; i++) {
release_attrinbuf(scanP->scan_acc_op_ptr[i]);
}
scanP->scan_acc_attr_recs= 0;
scanP->scan_acc_index = 0;
}
Uint32
Dblqh::seize_attrinbuf()
{
AttrbufPtr regAttrPtr;
Uint32 ret_attr_buf;
ndbrequire(c_no_attrinbuf_recs > 0);
c_no_attrinbuf_recs--;
ret_attr_buf= cfirstfreeAttrinbuf;
regAttrPtr.i= ret_attr_buf;
ptrCheckGuard(regAttrPtr, cattrinbufFileSize, attrbuf);
cfirstfreeAttrinbuf= regAttrPtr.p->attrbuf[ZINBUF_NEXT];
return ret_attr_buf;
}
Uint32
Dblqh::release_attrinbuf(Uint32 attr_buf_i)
{
Uint32 next_buf;
AttrbufPtr regAttrPtr;
c_no_attrinbuf_recs++;
regAttrPtr.i= attr_buf_i;
ptrCheckGuard(regAttrPtr, cattrinbufFileSize, attrbuf);
next_buf= regAttrPtr.p->attrbuf[ZINBUF_NEXT];
regAttrPtr.p->attrbuf[ZINBUF_NEXT]= cfirstfreeAttrinbuf;
cfirstfreeAttrinbuf= regAttrPtr.i;
return next_buf;
}
void
Dblqh::init_acc_ptr_list(ScanRecord* scanP)
{
scanP->scan_acc_index = 0;
}
inline
void
Dblqh::get_acc_ptr(ScanRecord* scanP, Uint32 *acc_ptr, Uint32 index)
{
if (index == 0) {
jam();
acc_ptr= &scanP->scan_acc_op_ptr[0];
} else {
Uint32 attr_buf_index, attr_buf_rec;
AttrbufPtr regAttrPtr;
jam();
attr_buf_rec= (index + 30) / 32;
attr_buf_index= (index - 1) & 31;
regAttrPtr.i= scanP->scan_acc_op_ptr[attr_buf_rec];
ptrCheckGuard(regAttrPtr, cattrinbufFileSize, attrbuf);
acc_ptr= &regAttrPtr.p->attrbuf[attr_buf_index];
}
}
Uint32
Dblqh::get_acc_ptr_from_scan_record(ScanRecord* scanP, Uint32 index)
{
Uint32 *acc_ptr;
Uint32 attr_buf_rec, attr_buf_index;
ndbrequire((index < MAX_PARALLEL_OP_PER_SCAN) &&
index < scanP->scan_acc_index);
return scanP->scan_acc_op_ptr[index];
get_acc_ptr(scanP, acc_ptr, index);
return *acc_ptr;
}
void
Dblqh::set_acc_ptr_in_scan_record(ScanRecord* scanP,
Uint32 index, Uint32 acc)
{
Uint32 *acc_ptr;
ndbrequire((index == 0 || scanP->scan_acc_index == index) &&
(index < MAX_PARALLEL_OP_PER_SCAN));
scanP->scan_acc_index= index + 1;
scanP->scan_acc_op_ptr[index]= acc;
get_acc_ptr(scanP, acc_ptr, index);
*acc_ptr= acc;
}
/* -------------------------------------------------------------------------
......@@ -16339,6 +16403,8 @@ void Dblqh::initialiseScanrec(Signal* signal)
scanptr.p->scanTcWaiting = ZFALSE;
scanptr.p->nextHash = RNIL;
scanptr.p->prevHash = RNIL;
scanptr.p->scan_acc_index= 0;
scanptr.p->scan_acc_attr_recs= 0;
}
tmp.release();
}//Dblqh::initialiseScanrec()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment