ndb - jamify (better) DbtupDiskAlloc

parent 4d552dbb
...@@ -86,6 +86,7 @@ inline const Uint32* ALIGN_WORD(const void* ptr) ...@@ -86,6 +86,7 @@ inline const Uint32* ALIGN_WORD(const void* ptr)
// DbtupDebug.cpp 30000 // DbtupDebug.cpp 30000
// DbtupVarAlloc.cpp 32000 // DbtupVarAlloc.cpp 32000
// DbtupScan.cpp 33000 // DbtupScan.cpp 33000
// DbtupDiskAlloc.cpp 35000
//------------------------------------------------------------------ //------------------------------------------------------------------
/* /*
......
...@@ -108,8 +108,11 @@ void Dbtup::removeActiveOpList(Operationrec* const regOperPtr, ...@@ -108,8 +108,11 @@ void Dbtup::removeActiveOpList(Operationrec* const regOperPtr,
* Release copy tuple * Release copy tuple
*/ */
if(!regOperPtr->m_copy_tuple_location.isNull()) if(!regOperPtr->m_copy_tuple_location.isNull())
{
ljam();
c_undo_buffer.free_copy_tuple(&regOperPtr->m_copy_tuple_location); c_undo_buffer.free_copy_tuple(&regOperPtr->m_copy_tuple_location);
}
if (regOperPtr->op_struct.in_active_list) { if (regOperPtr->op_struct.in_active_list) {
regOperPtr->op_struct.in_active_list= false; regOperPtr->op_struct.in_active_list= false;
if (regOperPtr->nextActiveOp != RNIL) { if (regOperPtr->nextActiveOp != RNIL) {
...@@ -172,6 +175,7 @@ Dbtup::dealloc_tuple(Signal* signal, ...@@ -172,6 +175,7 @@ Dbtup::dealloc_tuple(Signal* signal,
Uint32 extra_bits = Tuple_header::FREED; Uint32 extra_bits = Tuple_header::FREED;
if (bits & Tuple_header::DISK_PART) if (bits & Tuple_header::DISK_PART)
{ {
ljam();
Local_key disk; Local_key disk;
memcpy(&disk, ptr->get_disk_ref_ptr(regTabPtr), sizeof(disk)); memcpy(&disk, ptr->get_disk_ref_ptr(regTabPtr), sizeof(disk));
PagePtr tmpptr; PagePtr tmpptr;
...@@ -184,6 +188,7 @@ Dbtup::dealloc_tuple(Signal* signal, ...@@ -184,6 +188,7 @@ Dbtup::dealloc_tuple(Signal* signal,
if (! (bits & (Tuple_header::LCP_SKIP | Tuple_header::ALLOC)) && if (! (bits & (Tuple_header::LCP_SKIP | Tuple_header::ALLOC)) &&
lcpScan_ptr_i != RNIL) lcpScan_ptr_i != RNIL)
{ {
ljam();
ScanOpPtr scanOp; ScanOpPtr scanOp;
c_scanOpPool.getPtr(scanOp, lcpScan_ptr_i); c_scanOpPool.getPtr(scanOp, lcpScan_ptr_i);
Local_key rowid = regOperPtr->m_tuple_location; Local_key rowid = regOperPtr->m_tuple_location;
...@@ -191,6 +196,7 @@ Dbtup::dealloc_tuple(Signal* signal, ...@@ -191,6 +196,7 @@ Dbtup::dealloc_tuple(Signal* signal,
rowid.m_page_no = page->frag_page_id; rowid.m_page_no = page->frag_page_id;
if (rowid > scanpos) if (rowid > scanpos)
{ {
ljam();
extra_bits = Tuple_header::LCP_KEEP; // Note REMOVE FREE extra_bits = Tuple_header::LCP_KEEP; // Note REMOVE FREE
ptr->m_operation_ptr_i = lcp_keep_list; ptr->m_operation_ptr_i = lcp_keep_list;
regFragPtr->m_lcp_keep_list = rowid.ref(); regFragPtr->m_lcp_keep_list = rowid.ref();
...@@ -231,11 +237,13 @@ Dbtup::commit_operation(Signal* signal, ...@@ -231,11 +237,13 @@ Dbtup::commit_operation(Signal* signal,
Uint32 mm_vars= regTabPtr->m_attributes[MM].m_no_of_varsize; Uint32 mm_vars= regTabPtr->m_attributes[MM].m_no_of_varsize;
if(mm_vars == 0) if(mm_vars == 0)
{ {
ljam();
memcpy(tuple_ptr, copy, 4*fixsize); memcpy(tuple_ptr, copy, 4*fixsize);
disk_ptr= (Tuple_header*)(((Uint32*)copy)+fixsize); disk_ptr= (Tuple_header*)(((Uint32*)copy)+fixsize);
} }
else else
{ {
ljam();
/** /**
* Var_part_ref is only stored in *allocated* tuple * Var_part_ref is only stored in *allocated* tuple
* so memcpy from copy, will over write it... * so memcpy from copy, will over write it...
...@@ -260,6 +268,7 @@ Dbtup::commit_operation(Signal* signal, ...@@ -260,6 +268,7 @@ Dbtup::commit_operation(Signal* signal,
if(copy_bits & Tuple_header::MM_SHRINK) if(copy_bits & Tuple_header::MM_SHRINK)
{ {
ljam();
vpagePtrP->shrink_entry(tmp.m_page_idx, (sz + 3) >> 2); vpagePtrP->shrink_entry(tmp.m_page_idx, (sz + 3) >> 2);
update_free_page_list(regFragPtr, vpagePtr); update_free_page_list(regFragPtr, vpagePtr);
} }
...@@ -270,6 +279,7 @@ Dbtup::commit_operation(Signal* signal, ...@@ -270,6 +279,7 @@ Dbtup::commit_operation(Signal* signal,
if (regTabPtr->m_no_of_disk_attributes && if (regTabPtr->m_no_of_disk_attributes &&
(copy_bits & Tuple_header::DISK_INLINE)) (copy_bits & Tuple_header::DISK_INLINE))
{ {
ljam();
Local_key key; Local_key key;
memcpy(&key, copy->get_disk_ref_ptr(regTabPtr), sizeof(Local_key)); memcpy(&key, copy->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
Uint32 logfile_group_id= regFragPtr->m_logfile_group_id; Uint32 logfile_group_id= regFragPtr->m_logfile_group_id;
...@@ -280,22 +290,26 @@ Dbtup::commit_operation(Signal* signal, ...@@ -280,22 +290,26 @@ Dbtup::commit_operation(Signal* signal,
Uint32 sz, *dst; Uint32 sz, *dst;
if(copy_bits & Tuple_header::DISK_ALLOC) if(copy_bits & Tuple_header::DISK_ALLOC)
{ {
ljam();
disk_page_alloc(signal, regTabPtr, regFragPtr, &key, diskPagePtr, gci); disk_page_alloc(signal, regTabPtr, regFragPtr, &key, diskPagePtr, gci);
} }
if(regTabPtr->m_attributes[DD].m_no_of_varsize == 0) if(regTabPtr->m_attributes[DD].m_no_of_varsize == 0)
{ {
ljam();
sz= regTabPtr->m_offsets[DD].m_fix_header_size; sz= regTabPtr->m_offsets[DD].m_fix_header_size;
dst= ((Fix_page*)diskPagePtr.p)->get_ptr(key.m_page_idx, sz); dst= ((Fix_page*)diskPagePtr.p)->get_ptr(key.m_page_idx, sz);
} }
else else
{ {
ljam();
dst= ((Var_page*)diskPagePtr.p)->get_ptr(key.m_page_idx); dst= ((Var_page*)diskPagePtr.p)->get_ptr(key.m_page_idx);
sz= ((Var_page*)diskPagePtr.p)->get_entry_len(key.m_page_idx); sz= ((Var_page*)diskPagePtr.p)->get_entry_len(key.m_page_idx);
} }
if(! (copy_bits & Tuple_header::DISK_ALLOC)) if(! (copy_bits & Tuple_header::DISK_ALLOC))
{ {
ljam();
disk_page_undo_update(diskPagePtr.p, disk_page_undo_update(diskPagePtr.p,
&key, dst, sz, gci, logfile_group_id); &key, dst, sz, gci, logfile_group_id);
} }
...@@ -309,6 +323,7 @@ Dbtup::commit_operation(Signal* signal, ...@@ -309,6 +323,7 @@ Dbtup::commit_operation(Signal* signal,
if(lcpScan_ptr_i != RNIL && (bits & Tuple_header::ALLOC)) if(lcpScan_ptr_i != RNIL && (bits & Tuple_header::ALLOC))
{ {
ljam();
ScanOpPtr scanOp; ScanOpPtr scanOp;
c_scanOpPool.getPtr(scanOp, lcpScan_ptr_i); c_scanOpPool.getPtr(scanOp, lcpScan_ptr_i);
Local_key rowid = regOperPtr->m_tuple_location; Local_key rowid = regOperPtr->m_tuple_location;
...@@ -316,6 +331,7 @@ Dbtup::commit_operation(Signal* signal, ...@@ -316,6 +331,7 @@ Dbtup::commit_operation(Signal* signal,
rowid.m_page_no = pagePtr.p->frag_page_id; rowid.m_page_no = pagePtr.p->frag_page_id;
if(rowid > scanpos) if(rowid > scanpos)
{ {
ljam();
copy_bits |= Tuple_header::LCP_SKIP; copy_bits |= Tuple_header::LCP_SKIP;
} }
} }
...@@ -374,7 +390,10 @@ Dbtup::disk_page_commit_callback(Signal* signal, ...@@ -374,7 +390,10 @@ Dbtup::disk_page_commit_callback(Signal* signal,
execTUP_COMMITREQ(signal); execTUP_COMMITREQ(signal);
if(signal->theData[0] == 0) if(signal->theData[0] == 0)
{
ljam();
c_lqh->tupcommit_conf_callback(signal, regOperPtr.p->userpointer); c_lqh->tupcommit_conf_callback(signal, regOperPtr.p->userpointer);
}
} }
void void
...@@ -412,6 +431,7 @@ Dbtup::disk_page_log_buffer_callback(Signal* signal, ...@@ -412,6 +431,7 @@ Dbtup::disk_page_log_buffer_callback(Signal* signal,
void void
Dbtup::fix_commit_order(OperationrecPtr opPtr) Dbtup::fix_commit_order(OperationrecPtr opPtr)
{ {
ljam();
ndbassert(!opPtr.p->is_first_operation()); ndbassert(!opPtr.p->is_first_operation());
OperationrecPtr firstPtr = opPtr; OperationrecPtr firstPtr = opPtr;
while(firstPtr.p->prevActiveOp != RNIL) while(firstPtr.p->prevActiveOp != RNIL)
...@@ -437,7 +457,10 @@ Dbtup::fix_commit_order(OperationrecPtr opPtr) ...@@ -437,7 +457,10 @@ Dbtup::fix_commit_order(OperationrecPtr opPtr)
c_operation_pool.getPtr(seco)->prevActiveOp = opPtr.i; c_operation_pool.getPtr(seco)->prevActiveOp = opPtr.i;
c_operation_pool.getPtr(prev)->nextActiveOp = firstPtr.i; c_operation_pool.getPtr(prev)->nextActiveOp = firstPtr.i;
if(next != RNIL) if(next != RNIL)
{
ljam();
c_operation_pool.getPtr(next)->prevActiveOp = firstPtr.i; c_operation_pool.getPtr(next)->prevActiveOp = firstPtr.i;
}
} }
/* ----------------------------------------------------------------- */ /* ----------------------------------------------------------------- */
...@@ -502,6 +525,7 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal) ...@@ -502,6 +525,7 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
bool get_page = false; bool get_page = false;
if(regOperPtr.p->op_struct.m_load_diskpage_on_commit) if(regOperPtr.p->op_struct.m_load_diskpage_on_commit)
{ {
ljam();
Page_cache_client::Request req; Page_cache_client::Request req;
ndbassert(regOperPtr.p->is_first_operation() && ndbassert(regOperPtr.p->is_first_operation() &&
regOperPtr.p->is_last_operation()); regOperPtr.p->is_last_operation());
...@@ -511,6 +535,7 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal) ...@@ -511,6 +535,7 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
*/ */
if(!regOperPtr.p->m_copy_tuple_location.isNull()) if(!regOperPtr.p->m_copy_tuple_location.isNull())
{ {
ljam();
Tuple_header* tmp= (Tuple_header*) Tuple_header* tmp= (Tuple_header*)
c_undo_buffer.get_ptr(&regOperPtr.p->m_copy_tuple_location); c_undo_buffer.get_ptr(&regOperPtr.p->m_copy_tuple_location);
...@@ -520,23 +545,25 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal) ...@@ -520,23 +545,25 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
if (unlikely(regOperPtr.p->op_struct.op_type == ZDELETE && if (unlikely(regOperPtr.p->op_struct.op_type == ZDELETE &&
tmp->m_header_bits & Tuple_header::DISK_ALLOC)) tmp->m_header_bits & Tuple_header::DISK_ALLOC))
{ {
jam(); ljam();
/** /**
* Insert+Delete * Insert+Delete
*/ */
regOperPtr.p->op_struct.m_load_diskpage_on_commit = 0; regOperPtr.p->op_struct.m_load_diskpage_on_commit = 0;
regOperPtr.p->op_struct.m_wait_log_buffer = 0; regOperPtr.p->op_struct.m_wait_log_buffer = 0;
disk_page_abort_prealloc(signal, regFragPtr.p, disk_page_abort_prealloc(signal, regFragPtr.p,
&req.m_page, req.m_page.m_page_idx); &req.m_page, req.m_page.m_page_idx);
c_lgman->free_log_space(regFragPtr.p->m_logfile_group_id, c_lgman->free_log_space(regFragPtr.p->m_logfile_group_id,
regOperPtr.p->m_undo_buffer_space); regOperPtr.p->m_undo_buffer_space);
ndbout_c("insert+delete"); ndbout_c("insert+delete");
goto skip_disk; ljamEntry();
goto skip_disk;
} }
} }
else else
{ {
ljam();
// initial delete // initial delete
ndbassert(regOperPtr.p->op_struct.op_type == ZDELETE); ndbassert(regOperPtr.p->op_struct.op_type == ZDELETE);
memcpy(&req.m_page, memcpy(&req.m_page,
...@@ -560,11 +587,14 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal) ...@@ -560,11 +587,14 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
/** /**
* Timeslice * Timeslice
*/ */
ljam();
signal->theData[0] = 1; signal->theData[0] = 1;
return; return;
case -1: case -1:
ndbrequire("NOT YET IMPLEMENTED" == 0); ndbrequire("NOT YET IMPLEMENTED" == 0);
break; break;
default:
ljam();
} }
get_page = true; get_page = true;
...@@ -581,6 +611,7 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal) ...@@ -581,6 +611,7 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
if(regOperPtr.p->op_struct.m_wait_log_buffer) if(regOperPtr.p->op_struct.m_wait_log_buffer)
{ {
ljam();
ndbassert(regOperPtr.p->is_first_operation() && ndbassert(regOperPtr.p->is_first_operation() &&
regOperPtr.p->is_last_operation()); regOperPtr.p->is_last_operation());
...@@ -592,18 +623,23 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal) ...@@ -592,18 +623,23 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
Logfile_client lgman(this, c_lgman, regFragPtr.p->m_logfile_group_id); Logfile_client lgman(this, c_lgman, regFragPtr.p->m_logfile_group_id);
int res= lgman.get_log_buffer(signal, sz, &cb); int res= lgman.get_log_buffer(signal, sz, &cb);
ljamEntry();
switch(res){ switch(res){
case 0: case 0:
ljam();
signal->theData[0] = 1; signal->theData[0] = 1;
return; return;
case -1: case -1:
ndbrequire("NOT YET IMPLEMENTED" == 0); ndbrequire("NOT YET IMPLEMENTED" == 0);
break; break;
default:
ljam();
} }
} }
if(!tuple_ptr) if(!tuple_ptr)
{ {
ljam();
tuple_ptr = (Tuple_header*) tuple_ptr = (Tuple_header*)
get_ptr(&page, &regOperPtr.p->m_tuple_location,regTabPtr.p); get_ptr(&page, &regOperPtr.p->m_tuple_location,regTabPtr.p);
} }
...@@ -612,6 +648,7 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal) ...@@ -612,6 +648,7 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
if(get_tuple_state(regOperPtr.p) == TUPLE_PREPARED) if(get_tuple_state(regOperPtr.p) == TUPLE_PREPARED)
{ {
ljam();
/** /**
* Execute all tux triggers at first commit * Execute all tux triggers at first commit
* since previous tuple is otherwise removed... * since previous tuple is otherwise removed...
...@@ -637,6 +674,7 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal) ...@@ -637,6 +674,7 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
if(regOperPtr.p->is_last_operation()) if(regOperPtr.p->is_last_operation())
{ {
ljam();
/** /**
* Perform "real" commit * Perform "real" commit
*/ */
...@@ -647,12 +685,14 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal) ...@@ -647,12 +685,14 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
if(regOperPtr.p->op_struct.op_type != ZDELETE) if(regOperPtr.p->op_struct.op_type != ZDELETE)
{ {
ljam();
commit_operation(signal, gci, tuple_ptr, page, commit_operation(signal, gci, tuple_ptr, page,
regOperPtr.p, regFragPtr.p, regTabPtr.p); regOperPtr.p, regFragPtr.p, regTabPtr.p);
removeActiveOpList(regOperPtr.p, tuple_ptr); removeActiveOpList(regOperPtr.p, tuple_ptr);
} }
else else
{ {
ljam();
removeActiveOpList(regOperPtr.p, tuple_ptr); removeActiveOpList(regOperPtr.p, tuple_ptr);
if (get_page) if (get_page)
ndbassert(tuple_ptr->m_header_bits & Tuple_header::DISK_PART); ndbassert(tuple_ptr->m_header_bits & Tuple_header::DISK_PART);
...@@ -662,6 +702,7 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal) ...@@ -662,6 +702,7 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
} }
else else
{ {
ljam();
removeActiveOpList(regOperPtr.p, tuple_ptr); removeActiveOpList(regOperPtr.p, tuple_ptr);
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment