Commit b3222660 authored by unknown's avatar unknown

ndb - bug#30975 (recommit to 51-telco-gca)

    - only update extent pages *after* flush of real page has been done
    - sync both create/drop of table into undolog (for disk tables)


storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp:
  inform TUP which LCP to restore each fragment to
storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp:
  1) inform TUP which LCP to restore each fragment to
  2) inform TUP both before/after a page has been written
storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp:
  1) inform TUP which LCP to restore each fragment to
  2) inform TUP both before/after a page has been written
storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp:
  log both create/drop table
storage/ndb/src/kernel/blocks/lgman.cpp:
  let TUP know about all LCPs
storage/ndb/src/kernel/blocks/pgman.cpp:
  add "when" argument to disk_page_unmap_callback so that TUP gets informed both before and after page writeout
    so that extent pages can be updated only *after* page has been written
storage/ndb/src/kernel/blocks/tsman.cpp:
  remove lsn from update page free bits
  use wal for page vs extent relation
storage/ndb/src/kernel/blocks/tsman.hpp:
  remove lsn from update page free bits
  use wal for page vs extent relation
parent 8625e942
......@@ -14042,11 +14042,16 @@ void Dblqh::execSTART_FRAGREQ(Signal* signal)
fragptr.p->fragStatus = Fragrecord::ACTIVE_CREATION;
}
c_tup->disk_restart_mark_no_lcp(tabptr.i, fragId);
c_tup->disk_restart_lcp_id(tabptr.i, fragId, RNIL);
jamEntry();
return;
}//if
}
else
{
jam();
c_tup->disk_restart_lcp_id(tabptr.i, fragId, lcpId);
jamEntry();
}
c_lcpId = (c_lcpId == 0 ? lcpId : c_lcpId);
c_lcpId = (c_lcpId < lcpId ? c_lcpId : lcpId);
......
......@@ -624,7 +624,8 @@ struct Fragrecord {
DLList<ScanOp>::Head m_scanList;
enum { UC_LCP = 1, UC_CREATE = 2 };
enum { UC_LCP = 1, UC_CREATE = 2, UC_SET_LCP = 3 };
Uint32 m_restore_lcp_id;
Uint32 m_undo_complete;
Uint32 m_tablespace_id;
Uint32 m_logfile_group_id;
......@@ -2748,7 +2749,7 @@ private:
public:
int disk_page_load_hook(Uint32 page_id);
void disk_page_unmap_callback(Uint32 page_id, Uint32 dirty_count);
void disk_page_unmap_callback(Uint32 when, Uint32 page, Uint32 dirty_count);
int disk_restart_alloc_extent(Uint32 tableId, Uint32 fragId,
const Local_key* key, Uint32 pages);
......@@ -2769,11 +2770,11 @@ public:
Local_key m_key;
};
void disk_restart_mark_no_lcp(Uint32 table, Uint32 frag);
void disk_restart_lcp_id(Uint32 table, Uint32 frag, Uint32 lcpId);
private:
void disk_restart_undo_next(Signal*);
void disk_restart_undo_lcp(Uint32, Uint32, Uint32 flag);
void disk_restart_undo_lcp(Uint32, Uint32, Uint32 flag, Uint32 lcpId);
void disk_restart_undo_callback(Signal* signal, Uint32, Uint32);
void disk_restart_undo_alloc(Apply_undo*);
void disk_restart_undo_update(Apply_undo*);
......
......@@ -907,8 +907,10 @@ Dbtup::disk_page_set_dirty(PagePtr pagePtr)
}
void
Dbtup::disk_page_unmap_callback(Uint32 page_id, Uint32 dirty_count)
Dbtup::disk_page_unmap_callback(Uint32 when,
Uint32 page_id, Uint32 dirty_count)
{
jamEntry();
Ptr<GlobalPage> gpage;
m_global_page_pool.getPtr(gpage, page_id);
PagePtr pagePtr;
......@@ -922,17 +924,9 @@ Dbtup::disk_page_unmap_callback(Uint32 page_id, Uint32 dirty_count)
{
return ;
}
Local_key key;
key.m_page_no = pagePtr.p->m_page_no;
key.m_file_no = pagePtr.p->m_file_no;
Uint32 idx = pagePtr.p->list_index;
ndbassert((idx & 0x8000) == 0);
Uint32 idx = pagePtr.p->list_index;
if (DBG_DISK)
ndbout << "disk_page_unmap_callback " << key << endl;
Ptr<Tablerec> tabPtr;
tabPtr.i= pagePtr.p->m_table_id;
ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
......@@ -942,26 +936,83 @@ Dbtup::disk_page_unmap_callback(Uint32 page_id, Uint32 dirty_count)
Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
if (dirty_count == 0)
if (when == 0)
{
/**
* Before pageout
*/
jam();
if (DBG_DISK)
{
Local_key key;
key.m_page_no = pagePtr.p->m_page_no;
key.m_file_no = pagePtr.p->m_file_no;
ndbout << "disk_page_unmap_callback(before) " << key
<< " cnt: " << dirty_count << " " << (idx & ~0x8000) << endl;
}
ndbassert((idx & 0x8000) == 0);
ArrayPool<Page> *pool= (ArrayPool<Page>*)&m_global_page_pool;
LocalDLList<Page> list(*pool, alloc.m_dirty_pages[idx]);
list.remove(pagePtr);
if (dirty_count == 0)
{
jam();
pagePtr.p->list_index = idx | 0x8000;
Local_key key;
key.m_page_no = pagePtr.p->m_page_no;
key.m_file_no = pagePtr.p->m_file_no;
Uint32 free = pagePtr.p->free_space;
Uint32 used = pagePtr.p->uncommitted_used_space;
ddassert(free >= used);
ddassert(alloc.calc_page_free_bits(free - used) == idx);
Tablespace_client tsman(0, c_tsman,
fragPtr.p->fragTableId,
fragPtr.p->fragmentId,
fragPtr.p->m_tablespace_id);
tsman.unmap_page(&key, idx);
jamEntry();
}
}
else if (when == 1)
{
Uint32 free = pagePtr.p->free_space;
Uint32 used = pagePtr.p->uncommitted_used_space;
ddassert(free >= used);
ddassert(alloc.calc_page_free_bits(free - used) == idx);
/**
* After page out
*/
jam();
Local_key key;
key.m_page_no = pagePtr.p->m_page_no;
key.m_file_no = pagePtr.p->m_file_no;
Uint32 real_free = pagePtr.p->free_space;
if (DBG_DISK)
{
ndbout << "disk_page_unmap_callback(after) " << key
<< " cnt: " << dirty_count << " " << (idx & ~0x8000) << endl;
}
Tablespace_client tsman(0, c_tsman,
fragPtr.p->fragTableId,
fragPtr.p->fragmentId,
fragPtr.p->m_tablespace_id);
tsman.unmap_page(&key, idx);
jamEntry();
pagePtr.p->list_index = idx | 0x8000;
if (DBG_DISK && alloc.calc_page_free_bits(real_free) != (idx & ~0x8000))
{
ndbout << key
<< " calc: " << alloc.calc_page_free_bits(real_free)
<< " idx: " << (idx & ~0x8000)
<< endl;
}
tsman.update_page_free_bits(&key, alloc.calc_page_free_bits(real_free));
}
ArrayPool<Page> *pool= (ArrayPool<Page>*)&m_global_page_pool;
LocalDLList<Page> list(*pool, alloc.m_dirty_pages[idx]);
list.remove(pagePtr);
}
void
......@@ -992,20 +1043,6 @@ Dbtup::disk_page_alloc(Signal* signal,
lsn= disk_page_undo_alloc(pagePtr.p, key, sz, gci, logfile_group_id);
}
Uint32 new_free = pagePtr.p->free_space;
Uint32 new_bits= alloc.calc_page_free_bits(new_free);
if (old_bits != new_bits)
{
Tablespace_client tsman(signal, c_tsman,
fragPtrP->fragTableId,
fragPtrP->fragmentId,
fragPtrP->m_tablespace_id);
tsman.update_page_free_bits(key, new_bits, lsn);
jamEntry();
}
}
void
......@@ -1049,17 +1086,6 @@ Dbtup::disk_page_free(Signal *signal,
Uint32 new_free = pagePtr.p->free_space;
Uint32 new_bits = alloc.calc_page_free_bits(new_free);
if (old_bits != new_bits)
{
Tablespace_client tsman(signal, c_tsman,
fragPtrP->fragTableId,
fragPtrP->fragmentId,
fragPtrP->m_tablespace_id);
tsman.update_page_free_bits(key, new_bits, lsn);
jamEntry();
}
Uint32 ext = pagePtr.p->m_extent_info_ptr;
Uint32 used = pagePtr.p->uncommitted_used_space;
Uint32 old_idx = pagePtr.p->list_index;
......@@ -1345,15 +1371,23 @@ Dbtup::disk_restart_undo(Signal* signal, Uint64 lsn,
case File_formats::Undofile::UNDO_LCP_FIRST:
case File_formats::Undofile::UNDO_LCP:
{
jam();
ndbrequire(len == 3);
Uint32 lcp = ptr[0];
Uint32 tableId = ptr[1] >> 16;
Uint32 fragId = ptr[1] & 0xFFFF;
disk_restart_undo_lcp(tableId, fragId, Fragrecord::UC_LCP);
disk_restart_undo_lcp(tableId, fragId, Fragrecord::UC_LCP, lcp);
disk_restart_undo_next(signal);
if (DBG_UNDO)
{
ndbout_c("UNDO LCP %u (%u, %u)", lcp, tableId, fragId);
}
return;
}
case File_formats::Undofile::UNDO_TUP_ALLOC:
{
jam();
Disk_undo::Alloc* rec= (Disk_undo::Alloc*)ptr;
preq.m_page.m_page_no = rec->m_page_no;
preq.m_page.m_file_no = rec->m_file_no_page_idx >> 16;
......@@ -1362,6 +1396,7 @@ Dbtup::disk_restart_undo(Signal* signal, Uint64 lsn,
}
case File_formats::Undofile::UNDO_TUP_UPDATE:
{
jam();
Disk_undo::Update* rec= (Disk_undo::Update*)ptr;
preq.m_page.m_page_no = rec->m_page_no;
preq.m_page.m_file_no = rec->m_file_no_page_idx >> 16;
......@@ -1370,6 +1405,7 @@ Dbtup::disk_restart_undo(Signal* signal, Uint64 lsn,
}
case File_formats::Undofile::UNDO_TUP_FREE:
{
jam();
Disk_undo::Free* rec= (Disk_undo::Free*)ptr;
preq.m_page.m_page_no = rec->m_page_no;
preq.m_page.m_file_no = rec->m_file_no_page_idx >> 16;
......@@ -1381,6 +1417,7 @@ Dbtup::disk_restart_undo(Signal* signal, Uint64 lsn,
*
*/
{
jam();
Disk_undo::Create* rec= (Disk_undo::Create*)ptr;
Ptr<Tablerec> tabPtr;
tabPtr.i= rec->m_table;
......@@ -1388,12 +1425,34 @@ Dbtup::disk_restart_undo(Signal* signal, Uint64 lsn,
for(Uint32 i = 0; i<MAX_FRAG_PER_NODE; i++)
if (tabPtr.p->fragrec[i] != RNIL)
disk_restart_undo_lcp(tabPtr.i, tabPtr.p->fragid[i],
Fragrecord::UC_CREATE);
Fragrecord::UC_CREATE, 0);
disk_restart_undo_next(signal);
if (DBG_UNDO)
{
ndbout_c("UNDO CREATE (%u)", tabPtr.i);
}
return;
}
case File_formats::Undofile::UNDO_TUP_DROP:
{
jam();
Disk_undo::Drop* rec = (Disk_undo::Drop*)ptr;
Ptr<Tablerec> tabPtr;
tabPtr.i= rec->m_table;
ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
for(Uint32 i = 0; i<MAX_FRAG_PER_NODE; i++)
if (tabPtr.p->fragrec[i] != RNIL)
disk_restart_undo_lcp(tabPtr.i, tabPtr.p->fragid[i],
Fragrecord::UC_CREATE, 0);
disk_restart_undo_next(signal);
if (DBG_UNDO)
{
ndbout_c("UNDO DROP (%u)", tabPtr.i);
}
return;
}
case File_formats::Undofile::UNDO_TUP_ALLOC_EXTENT:
jam();
case File_formats::Undofile::UNDO_TUP_FREE_EXTENT:
......@@ -1402,6 +1461,7 @@ Dbtup::disk_restart_undo(Signal* signal, Uint64 lsn,
return;
case File_formats::Undofile::UNDO_END:
jam();
f_undo_done = true;
return;
default:
......@@ -1435,14 +1495,32 @@ Dbtup::disk_restart_undo_next(Signal* signal)
}
void
Dbtup::disk_restart_mark_no_lcp(Uint32 tableId, Uint32 fragId)
Dbtup::disk_restart_lcp_id(Uint32 tableId, Uint32 fragId, Uint32 lcpId)
{
jamEntry();
disk_restart_undo_lcp(tableId, fragId, Fragrecord::UC_CREATE);
if (lcpId == RNIL)
{
disk_restart_undo_lcp(tableId, fragId, Fragrecord::UC_CREATE, 0);
if (DBG_UNDO)
{
ndbout_c("mark_no_lcp (%u, %u)", tableId, fragId);
}
}
else
{
disk_restart_undo_lcp(tableId, fragId, Fragrecord::UC_SET_LCP, lcpId);
if (DBG_UNDO)
{
ndbout_c("mark_no_lcp (%u, %u)", tableId, fragId);
}
}
}
void
Dbtup::disk_restart_undo_lcp(Uint32 tableId, Uint32 fragId, Uint32 flag)
Dbtup::disk_restart_undo_lcp(Uint32 tableId, Uint32 fragId, Uint32 flag,
Uint32 lcpId)
{
Ptr<Tablerec> tabPtr;
tabPtr.i= tableId;
......@@ -1450,11 +1528,43 @@ Dbtup::disk_restart_undo_lcp(Uint32 tableId, Uint32 fragId, Uint32 flag)
if (tabPtr.p->tableStatus == DEFINED)
{
jam();
FragrecordPtr fragPtr;
getFragmentrec(fragPtr, fragId, tabPtr.p);
if (!fragPtr.isNull())
{
fragPtr.p->m_undo_complete |= flag;
jam();
switch(flag){
case Fragrecord::UC_CREATE:
jam();
fragPtr.p->m_undo_complete |= flag;
return;
case Fragrecord::UC_LCP:
jam();
if (fragPtr.p->m_undo_complete == 0 &&
fragPtr.p->m_restore_lcp_id == lcpId)
{
jam();
fragPtr.p->m_undo_complete |= flag;
if (DBG_UNDO)
ndbout_c("table: %u fragment: %u lcp: %u -> done",
tableId, fragId, lcpId);
}
return;
case Fragrecord::UC_SET_LCP:
{
jam();
if (DBG_UNDO)
ndbout_c("table: %u fragment: %u restore to lcp: %u",
tableId, fragId, lcpId);
ndbrequire(fragPtr.p->m_undo_complete == 0);
ndbrequire(fragPtr.p->m_restore_lcp_id == RNIL);
fragPtr.p->m_restore_lcp_id = lcpId;
return;
}
}
jamLine(flag);
ndbrequire(false);
}
}
}
......@@ -1478,6 +1588,7 @@ Dbtup::disk_restart_undo_callback(Signal* signal,
pagePtr.p->nextList != RNIL ||
pagePtr.p->prevList != RNIL)
{
jam();
update = true;
pagePtr.p->list_index |= 0x8000;
pagePtr.p->nextList = pagePtr.p->prevList = RNIL;
......@@ -1488,6 +1599,9 @@ Dbtup::disk_restart_undo_callback(Signal* signal,
if (tableId >= cnoOfTablerec)
{
jam();
if (DBG_UNDO)
ndbout_c("UNDO table> %u", tableId);
disk_restart_undo_next(signal);
return;
}
......@@ -1496,6 +1610,9 @@ Dbtup::disk_restart_undo_callback(Signal* signal,
if (undo->m_table_ptr.p->tableStatus != DEFINED)
{
jam();
if (DBG_UNDO)
ndbout_c("UNDO !defined (%u) ", tableId);
disk_restart_undo_next(signal);
return;
}
......@@ -1503,19 +1620,25 @@ Dbtup::disk_restart_undo_callback(Signal* signal,
getFragmentrec(undo->m_fragment_ptr, fragId, undo->m_table_ptr.p);
if(undo->m_fragment_ptr.isNull())
{
jam();
if (DBG_UNDO)
ndbout_c("UNDO fragment null %u/%u", tableId, fragId);
disk_restart_undo_next(signal);
return;
}
if (undo->m_fragment_ptr.p->m_undo_complete)
{
jam();
if (DBG_UNDO)
ndbout_c("UNDO undo complete %u/%u", tableId, fragId);
disk_restart_undo_next(signal);
return;
}
Local_key key;
key.m_page_no = pagePtr.p->m_page_no;
key.m_file_no = pagePtr.p->m_file_no;
Local_key key = undo->m_key;
// key.m_page_no = pagePtr.p->m_page_no;
// key.m_file_no = pagePtr.p->m_file_no;
Uint64 lsn = 0;
lsn += pagePtr.p->m_page_header.m_page_lsn_hi; lsn <<= 32;
......@@ -1525,6 +1648,7 @@ Dbtup::disk_restart_undo_callback(Signal* signal,
if (undo->m_lsn <= lsn)
{
jam();
if (DBG_UNDO)
{
ndbout << "apply: " << undo->m_lsn << "(" << lsn << " )"
......@@ -1539,12 +1663,15 @@ Dbtup::disk_restart_undo_callback(Signal* signal,
*/
switch(undo->m_type){
case File_formats::Undofile::UNDO_TUP_ALLOC:
jam();
disk_restart_undo_alloc(undo);
break;
case File_formats::Undofile::UNDO_TUP_UPDATE:
jam();
disk_restart_undo_update(undo);
break;
case File_formats::Undofile::UNDO_TUP_FREE:
jam();
disk_restart_undo_free(undo);
break;
default:
......@@ -1559,14 +1686,17 @@ Dbtup::disk_restart_undo_callback(Signal* signal,
m_pgman.update_lsn(undo->m_key, lsn);
jamEntry();
disk_restart_undo_page_bits(signal, undo);
}
else if (DBG_UNDO)
{
jam();
ndbout << "ignore: " << undo->m_lsn << "(" << lsn << " )"
<< key << " type: " << undo->m_type << endl;
<< key << " type: " << undo->m_type
<< " tab: " << tableId << endl;
}
disk_restart_undo_page_bits(signal, undo);
disk_restart_undo_next(signal);
}
......@@ -1641,16 +1771,12 @@ Dbtup::disk_restart_undo_page_bits(Signal* signal, Apply_undo* undo)
Uint32 new_bits = alloc.calc_page_free_bits(free);
pageP->list_index = 0x8000 | new_bits;
Uint64 lsn = 0;
lsn += pageP->m_page_header.m_page_lsn_hi; lsn <<= 32;
lsn += pageP->m_page_header.m_page_lsn_lo;
Tablespace_client tsman(signal, c_tsman,
fragPtrP->fragTableId,
fragPtrP->fragmentId,
fragPtrP->m_tablespace_id);
tsman.restart_undo_page_free_bits(&undo->m_key, new_bits, undo->m_lsn, lsn);
tsman.restart_undo_page_free_bits(&undo->m_key, new_bits);
jamEntry();
}
......@@ -1687,6 +1813,7 @@ Dbtup::disk_restart_alloc_extent(Uint32 tableId, Uint32 fragId,
if (alloc.m_curr_extent_info_ptr_i != RNIL)
{
jam();
Ptr<Extent_info> old;
c_extent_pool.getPtr(old, alloc.m_curr_extent_info_ptr_i);
ndbassert(old.p->m_free_matrix_pos == RNIL);
......@@ -1713,6 +1840,7 @@ void
Dbtup::disk_restart_page_bits(Uint32 tableId, Uint32 fragId,
const Local_key*, Uint32 bits)
{
jam();
TablerecPtr tabPtr;
FragrecordPtr fragPtr;
tabPtr.i = tableId;
......
......@@ -146,6 +146,7 @@ void Dbtup::execTUPFRAGREQ(Signal* signal)
regFragPtr.p->m_lcp_scan_op = RNIL;
regFragPtr.p->m_lcp_keep_list = RNIL;
regFragPtr.p->m_var_page_chunks = RNIL;
regFragPtr.p->m_restore_lcp_id = RNIL;
if (ERROR_INSERTED(4007) && regTabPtr.p->fragid[0] == fragId ||
ERROR_INSERTED(4008) && regTabPtr.p->fragid[1] == fragId) {
......@@ -673,11 +674,11 @@ Dbtup::undo_createtable_callback(Signal* signal, Uint32 opPtrI, Uint32 unused)
switch(ret){
case 0:
return;
case -1:
warningEvent("Failed to sync log for create of table: %u", regTabPtr.i);
default:
ndbout_c("ret: %d", ret);
ndbrequire(false);
execute(signal, req.m_callback, regFragPtr.p->m_logfile_group_id);
}
}
void
......@@ -958,8 +959,6 @@ void Dbtup::releaseFragment(Signal* signal, Uint32 tableId,
return;
}
#if NOT_YET_UNDO_DROP_TABLE
#error "This code is complete, but I prefer not to enable it until I need it"
if (logfile_group_id != RNIL)
{
Callback cb;
......@@ -968,7 +967,14 @@ void Dbtup::releaseFragment(Signal* signal, Uint32 tableId,
safe_cast(&Dbtup::drop_table_log_buffer_callback);
Uint32 sz= sizeof(Disk_undo::Drop) >> 2;
int r0 = c_lgman->alloc_log_space(logfile_group_id, sz);
if (r0)
{
jam();
warningEvent("Failed to alloc log space for drop table: %u",
tabPtr.i);
goto done;
}
Logfile_client lgman(this, c_lgman, logfile_group_id);
int res= lgman.get_log_buffer(signal, sz, &cb);
switch(res){
......@@ -976,15 +982,18 @@ void Dbtup::releaseFragment(Signal* signal, Uint32 tableId,
ljam();
return;
case -1:
ndbrequire("NOT YET IMPLEMENTED" == 0);
warningEvent("Failed to get log buffer for drop table: %u",
tabPtr.i);
c_lgman->free_log_space(logfile_group_id, sz);
goto done;
break;
default:
execute(signal, cb, logfile_group_id);
return;
}
}
#endif
done:
drop_table_logsync_callback(signal, tabPtr.i, RNIL);
}
......@@ -1163,9 +1172,10 @@ Dbtup::drop_table_log_buffer_callback(Signal* signal, Uint32 tablePtrI,
switch(ret){
case 0:
return;
case -1:
warningEvent("Failed to syn log for drop of table: %u", tablePtrI);
default:
ndbout_c("ret: %d", ret);
ndbrequire(false);
execute(signal, req.m_callback, logfile_group_id);
}
}
......
......@@ -2684,8 +2684,16 @@ Lgman::execute_undo_record(Signal* signal)
Uint32 lcp = * (ptr - len + 1);
if(m_latest_lcp && lcp > m_latest_lcp)
{
// Just ignore
break;
if (0)
{
const Uint32 * base = ptr - len + 1;
Uint32 lcp = base[0];
Uint32 tableId = base[1] >> 16;
Uint32 fragId = base[1] & 0xFFFF;
ndbout_c("NOT! ignoring lcp: %u tab: %u frag: %u",
lcp, tableId, fragId);
}
}
if(m_latest_lcp == 0 ||
......
......@@ -500,6 +500,11 @@ Pgman::release_page_entry(Ptr<Page_entry>& ptr)
if (! (state & Page_entry::LOCKED))
ndbrequire(! (state & Page_entry::REQUEST));
if (ptr.p->m_copy_page_i != RNIL)
{
m_global_page_pool.release(ptr.p->m_copy_page_i);
}
set_page_state(ptr, 0);
m_page_hashlist.remove(ptr);
......@@ -1151,7 +1156,8 @@ Pgman::process_cleanup(Signal* signal)
#ifdef VM_TRACE
debugOut << "PGMAN: " << ptr << " : process_cleanup" << endl;
#endif
c_tup->disk_page_unmap_callback(ptr.p->m_real_page_i,
c_tup->disk_page_unmap_callback(0,
ptr.p->m_real_page_i,
ptr.p->m_dirty_count);
pageout(signal, ptr);
max_count--;
......@@ -1189,6 +1195,11 @@ Pgman::move_cleanup_ptr(Ptr<Page_entry> ptr)
void
Pgman::execLCP_FRAG_ORD(Signal* signal)
{
if (ERROR_INSERTED(11008))
{
ndbout_c("Ignore LCP_FRAG_ORD");
return;
}
LcpFragOrd* ord = (LcpFragOrd*)signal->getDataPtr();
ndbrequire(ord->lcpId >= m_last_lcp_complete + 1 || m_last_lcp_complete == 0);
m_last_lcp = ord->lcpId;
......@@ -1205,6 +1216,12 @@ Pgman::execLCP_FRAG_ORD(Signal* signal)
void
Pgman::execEND_LCP_REQ(Signal* signal)
{
if (ERROR_INSERTED(11008))
{
ndbout_c("Ignore END_LCP");
return;
}
EndLcpReq* req = (EndLcpReq*)signal->getDataPtr();
m_end_lcp_req = *req;
......@@ -1283,7 +1300,8 @@ Pgman::process_lcp(Signal* signal)
{
DBG_LCP(" pageout()" << endl);
ptr.p->m_state |= Page_entry::LCP;
c_tup->disk_page_unmap_callback(ptr.p->m_real_page_i,
c_tup->disk_page_unmap_callback(0,
ptr.p->m_real_page_i,
ptr.p->m_dirty_count);
pageout(signal, ptr);
}
......@@ -1505,6 +1523,10 @@ Pgman::fswriteconf(Signal* signal, Ptr<Page_entry> ptr)
Page_state state = ptr.p->m_state;
ndbrequire(state & Page_entry::PAGEOUT);
c_tup->disk_page_unmap_callback(1,
ptr.p->m_real_page_i,
ptr.p->m_dirty_count);
state &= ~ Page_entry::PAGEOUT;
state &= ~ Page_entry::EMPTY;
state &= ~ Page_entry::DIRTY;
......@@ -1758,7 +1780,7 @@ Pgman::get_page(Signal* signal, Ptr<Page_entry> ptr, Page_request page_req)
#endif
state |= Page_entry::REQUEST;
if (only_request && req_flags & Page_request::EMPTY_PAGE)
if (only_request && (req_flags & Page_request::EMPTY_PAGE))
{
state |= Page_entry::EMPTY;
}
......@@ -2420,7 +2442,8 @@ Pgman::execDUMP_STATE_ORD(Signal* signal)
if (pl_hash.find(ptr, key))
{
ndbout << "pageout " << ptr << endl;
c_tup->disk_page_unmap_callback(ptr.p->m_real_page_i,
c_tup->disk_page_unmap_callback(0,
ptr.p->m_real_page_i,
ptr.p->m_dirty_count);
pageout(signal, ptr);
}
......@@ -2476,6 +2499,11 @@ Pgman::execDUMP_STATE_ORD(Signal* signal)
{
SET_ERROR_INSERT_VALUE(11007);
}
if (signal->theData[0] == 11008)
{
SET_ERROR_INSERT_VALUE(11008);
}
}
// page cache client
......
......@@ -302,7 +302,7 @@ Tsman::execDUMP_STATE_ORD(Signal* signal){
Uint32 new_bits = curr_bits ^ rand();
Local_key key = chunks[chunk].start_page;
key.m_page_no += page;
ndbrequire(update_page_free_bits(signal, &key, new_bits, 0) == 0);
ndbrequire(update_page_free_bits(signal, &key, new_bits) == 0);
}
}
}
......@@ -369,6 +369,20 @@ Tsman::execCREATE_FILEGROUP_REQ(Signal* signal){
CreateFilegroupImplRef::SignalLength, JBB);
}
NdbOut&
operator<<(NdbOut& out, const File_formats::Datafile::Extent_header & obj)
{
out << "table: " << obj.m_table
<< " fragment: " << obj.m_fragment_id << " ";
for(Uint32 i = 0; i<32; i++)
{
char t[2];
BaseString::snprintf(t, sizeof(t), "%x", obj.get_free_bits(i));
out << t;
}
return out;
}
void
Tsman::execDROP_FILEGROUP_REQ(Signal* signal){
jamEntry();
......@@ -1582,8 +1596,7 @@ Tsman::execFREE_EXTENT_REQ(Signal* signal)
int
Tsman::update_page_free_bits(Signal* signal,
Local_key *key,
unsigned committed_bits,
Uint64 lsn)
unsigned committed_bits)
{
jamEntry();
......@@ -1618,6 +1631,18 @@ Tsman::update_page_free_bits(Signal* signal,
File_formats::Datafile::Extent_header* header =
page->get_header(val.m_extent_no, val.m_extent_size);
if (header->m_table == RNIL)
{
ndbout << "update page free bits page: " << *key
<< " " << *header << endl;
}
if (0)
{
ndbout << "update page free bits page(" << committed_bits << ") "
<< *key << " " << *header << endl;
}
ndbrequire(header->m_table != RNIL);
Uint32 page_no_in_extent = calc_page_no_in_extent(key->m_page_no, &val);
......@@ -1629,7 +1654,7 @@ Tsman::update_page_free_bits(Signal* signal,
Uint32 src = header->get_free_bits(page_no_in_extent) & UNCOMMITTED_MASK;
header->update_free_bits(page_no_in_extent, src | committed_bits);
m_page_cache_client.update_lsn(preq.m_page, lsn);
m_page_cache_client.update_lsn(preq.m_page, 0);
return 0;
}
......@@ -1717,6 +1742,11 @@ Tsman::unmap_page(Signal* signal, Local_key *key, Uint32 uncommitted_bits)
File_formats::Datafile::Extent_header* header =
page->get_header(val.m_extent_no, val.m_extent_size);
if (header->m_table == RNIL)
{
ndbout << "trying to unmap page: " << *key
<< " " << *header << endl;
}
ndbrequire(header->m_table != RNIL);
Uint32 page_no_in_extent = calc_page_no_in_extent(key->m_page_no, &val);
......@@ -1738,9 +1768,7 @@ Tsman::restart_undo_page_free_bits(Signal* signal,
Uint32 tableId,
Uint32 fragId,
Local_key *key,
unsigned bits,
Uint64 undo_lsn,
Uint64 page_lsn)
unsigned bits)
{
jamEntry();
......@@ -1774,21 +1802,7 @@ Tsman::restart_undo_page_free_bits(Signal* signal,
(File_formats::Datafile::Extent_page*)ptr_p;
File_formats::Datafile::Extent_header* header =
page->get_header(val.m_extent_no, val.m_extent_size);
Uint64 lsn = 0;
lsn += page->m_page_header.m_page_lsn_hi; lsn <<= 32;
lsn += page->m_page_header.m_page_lsn_lo;
if (undo_lsn > lsn && undo_lsn > page_lsn)
{
if (DBG_UNDO)
ndbout << "tsman: ignore " << undo_lsn << "(" << lsn << ", "
<< page_lsn << ") "
<< *key << " "
<< " -> " << bits << endl;
return 0;
}
if (header->m_table == RNIL)
{
if (DBG_UNDO)
......@@ -1807,7 +1821,7 @@ Tsman::restart_undo_page_free_bits(Signal* signal,
*/
if (DBG_UNDO)
{
ndbout << "tsman: apply " << undo_lsn << "(" << lsn << ") "
ndbout << "tsman: apply "
<< *key << " " << (src & COMMITTED_MASK)
<< " -> " << bits << endl;
}
......@@ -1855,7 +1869,7 @@ Tsman::execALLOC_PAGE_REQ(Signal* signal)
/**
* Handling of unmapped extent header pages is not implemented
*/
int flags = 0;
int flags = Page_cache_client::DIRTY_REQ;
int real_page_id;
Uint32 page_no;
Uint32 src_bits;
......
......@@ -209,12 +209,12 @@ private:
void load_extent_page_callback(Signal*, Uint32, Uint32);
void create_file_ref(Signal*, Ptr<Tablespace>, Ptr<Datafile>,
Uint32,Uint32,Uint32);
int update_page_free_bits(Signal*, Local_key*, unsigned committed_bits,
Uint64 lsn);
int update_page_free_bits(Signal*, Local_key*, unsigned committed_bits);
int get_page_free_bits(Signal*, Local_key*, unsigned*, unsigned*);
int unmap_page(Signal*, Local_key*, unsigned uncommitted_bits);
int restart_undo_page_free_bits(Signal*, Uint32, Uint32, Local_key*,
unsigned committed_bits, Uint64, Uint64);
unsigned committed_bits);
int alloc_extent(Signal* signal, Uint32 tablespace, Local_key* key);
int alloc_page_from_extent(Signal*, Uint32, Local_key*, Uint32 bits);
......@@ -320,7 +320,7 @@ public:
/**
* Update page free bits
*/
int update_page_free_bits(Local_key*, unsigned bits, Uint64 lsn);
int update_page_free_bits(Local_key*, unsigned bits);
/**
* Get page free bits
......@@ -336,8 +336,7 @@ public:
/**
* Undo handling of page bits
*/
int restart_undo_page_free_bits(Local_key*, unsigned bits,
Uint64 lsn, Uint64 page_lsn);
int restart_undo_page_free_bits(Local_key*, unsigned bits);
/**
* Get tablespace info
......@@ -417,10 +416,9 @@ Tablespace_client::free_extent(Local_key* key, Uint64 lsn)
inline
int
Tablespace_client::update_page_free_bits(Local_key *key,
unsigned committed_bits,
Uint64 lsn)
unsigned committed_bits)
{
return m_tsman->update_page_free_bits(m_signal, key, committed_bits, lsn);
return m_tsman->update_page_free_bits(m_signal, key, committed_bits);
}
inline
......@@ -442,17 +440,13 @@ Tablespace_client::unmap_page(Local_key *key, unsigned uncommitted_bits)
inline
int
Tablespace_client::restart_undo_page_free_bits(Local_key* key,
unsigned committed_bits,
Uint64 lsn,
Uint64 page_lsn)
unsigned committed_bits)
{
return m_tsman->restart_undo_page_free_bits(m_signal,
m_table_id,
m_fragment_id,
key,
committed_bits,
lsn,
page_lsn);
committed_bits);
}
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment