Commit db693ec9 authored by unknown's avatar unknown

ndb - bug#30975 (recommit to 51-telco-gca)

    - only update extent pages *after* flush of real page has been done
    - sync both create/drop of table into undolog (for disk tables)


storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp:
  inform TUP which LCP to restore each fragment to
storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp:
  1) inform TUP which LCP to restore each fragment to
  2) inform TUP both before/after a page has been written
storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp:
  1) inform TUP which LCP to restore each fragment to
  2) inform TUP both before/after a page has been written
storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp:
  log both create/drop table
storage/ndb/src/kernel/blocks/lgman.cpp:
  let TUP know about all LCPs
storage/ndb/src/kernel/blocks/pgman.cpp:
  add "when" argument to disk_page_unmap_callback so that TUP gets informed both before and after page writeout
    so that extent pages can be updated only *after* page has been written
storage/ndb/src/kernel/blocks/tsman.cpp:
  remove lsn from update page free bits
  use wal for page vs extent relation
storage/ndb/src/kernel/blocks/tsman.hpp:
  remove lsn from update page free bits
  use wal for page vs extent relation
parent c127ccf9
......@@ -14042,11 +14042,16 @@ void Dblqh::execSTART_FRAGREQ(Signal* signal)
fragptr.p->fragStatus = Fragrecord::ACTIVE_CREATION;
}
c_tup->disk_restart_mark_no_lcp(tabptr.i, fragId);
c_tup->disk_restart_lcp_id(tabptr.i, fragId, RNIL);
jamEntry();
return;
}//if
}
else
{
jam();
c_tup->disk_restart_lcp_id(tabptr.i, fragId, lcpId);
jamEntry();
}
c_lcpId = (c_lcpId == 0 ? lcpId : c_lcpId);
c_lcpId = (c_lcpId < lcpId ? c_lcpId : lcpId);
......
......@@ -624,7 +624,8 @@ struct Fragrecord {
DLList<ScanOp>::Head m_scanList;
enum { UC_LCP = 1, UC_CREATE = 2 };
enum { UC_LCP = 1, UC_CREATE = 2, UC_SET_LCP = 3 };
Uint32 m_restore_lcp_id;
Uint32 m_undo_complete;
Uint32 m_tablespace_id;
Uint32 m_logfile_group_id;
......@@ -2748,7 +2749,7 @@ private:
public:
int disk_page_load_hook(Uint32 page_id);
void disk_page_unmap_callback(Uint32 page_id, Uint32 dirty_count);
void disk_page_unmap_callback(Uint32 when, Uint32 page, Uint32 dirty_count);
int disk_restart_alloc_extent(Uint32 tableId, Uint32 fragId,
const Local_key* key, Uint32 pages);
......@@ -2769,11 +2770,11 @@ public:
Local_key m_key;
};
void disk_restart_mark_no_lcp(Uint32 table, Uint32 frag);
void disk_restart_lcp_id(Uint32 table, Uint32 frag, Uint32 lcpId);
private:
void disk_restart_undo_next(Signal*);
void disk_restart_undo_lcp(Uint32, Uint32, Uint32 flag);
void disk_restart_undo_lcp(Uint32, Uint32, Uint32 flag, Uint32 lcpId);
void disk_restart_undo_callback(Signal* signal, Uint32, Uint32);
void disk_restart_undo_alloc(Apply_undo*);
void disk_restart_undo_update(Apply_undo*);
......
......@@ -146,6 +146,7 @@ void Dbtup::execTUPFRAGREQ(Signal* signal)
regFragPtr.p->m_lcp_scan_op = RNIL;
regFragPtr.p->m_lcp_keep_list = RNIL;
regFragPtr.p->m_var_page_chunks = RNIL;
regFragPtr.p->m_restore_lcp_id = RNIL;
if (ERROR_INSERTED(4007) && regTabPtr.p->fragid[0] == fragId ||
ERROR_INSERTED(4008) && regTabPtr.p->fragid[1] == fragId) {
......@@ -673,11 +674,11 @@ Dbtup::undo_createtable_callback(Signal* signal, Uint32 opPtrI, Uint32 unused)
switch(ret){
case 0:
return;
case -1:
warningEvent("Failed to sync log for create of table: %u", regTabPtr.i);
default:
ndbout_c("ret: %d", ret);
ndbrequire(false);
execute(signal, req.m_callback, regFragPtr.p->m_logfile_group_id);
}
}
void
......@@ -958,8 +959,6 @@ void Dbtup::releaseFragment(Signal* signal, Uint32 tableId,
return;
}
#if NOT_YET_UNDO_DROP_TABLE
#error "This code is complete, but I prefer not to enable it until I need it"
if (logfile_group_id != RNIL)
{
Callback cb;
......@@ -968,7 +967,14 @@ void Dbtup::releaseFragment(Signal* signal, Uint32 tableId,
safe_cast(&Dbtup::drop_table_log_buffer_callback);
Uint32 sz= sizeof(Disk_undo::Drop) >> 2;
int r0 = c_lgman->alloc_log_space(logfile_group_id, sz);
if (r0)
{
jam();
warningEvent("Failed to alloc log space for drop table: %u",
tabPtr.i);
goto done;
}
Logfile_client lgman(this, c_lgman, logfile_group_id);
int res= lgman.get_log_buffer(signal, sz, &cb);
switch(res){
......@@ -976,15 +982,18 @@ void Dbtup::releaseFragment(Signal* signal, Uint32 tableId,
ljam();
return;
case -1:
ndbrequire("NOT YET IMPLEMENTED" == 0);
warningEvent("Failed to get log buffer for drop table: %u",
tabPtr.i);
c_lgman->free_log_space(logfile_group_id, sz);
goto done;
break;
default:
execute(signal, cb, logfile_group_id);
return;
}
}
#endif
done:
drop_table_logsync_callback(signal, tabPtr.i, RNIL);
}
......@@ -1163,9 +1172,10 @@ Dbtup::drop_table_log_buffer_callback(Signal* signal, Uint32 tablePtrI,
switch(ret){
case 0:
return;
case -1:
warningEvent("Failed to syn log for drop of table: %u", tablePtrI);
default:
ndbout_c("ret: %d", ret);
ndbrequire(false);
execute(signal, req.m_callback, logfile_group_id);
}
}
......
......@@ -2684,8 +2684,16 @@ Lgman::execute_undo_record(Signal* signal)
Uint32 lcp = * (ptr - len + 1);
if(m_latest_lcp && lcp > m_latest_lcp)
{
// Just ignore
break;
if (0)
{
const Uint32 * base = ptr - len + 1;
Uint32 lcp = base[0];
Uint32 tableId = base[1] >> 16;
Uint32 fragId = base[1] & 0xFFFF;
ndbout_c("NOT! ignoring lcp: %u tab: %u frag: %u",
lcp, tableId, fragId);
}
}
if(m_latest_lcp == 0 ||
......
......@@ -500,6 +500,11 @@ Pgman::release_page_entry(Ptr<Page_entry>& ptr)
if (! (state & Page_entry::LOCKED))
ndbrequire(! (state & Page_entry::REQUEST));
if (ptr.p->m_copy_page_i != RNIL)
{
m_global_page_pool.release(ptr.p->m_copy_page_i);
}
set_page_state(ptr, 0);
m_page_hashlist.remove(ptr);
......@@ -1151,7 +1156,8 @@ Pgman::process_cleanup(Signal* signal)
#ifdef VM_TRACE
debugOut << "PGMAN: " << ptr << " : process_cleanup" << endl;
#endif
c_tup->disk_page_unmap_callback(ptr.p->m_real_page_i,
c_tup->disk_page_unmap_callback(0,
ptr.p->m_real_page_i,
ptr.p->m_dirty_count);
pageout(signal, ptr);
max_count--;
......@@ -1189,6 +1195,11 @@ Pgman::move_cleanup_ptr(Ptr<Page_entry> ptr)
void
Pgman::execLCP_FRAG_ORD(Signal* signal)
{
if (ERROR_INSERTED(11008))
{
ndbout_c("Ignore LCP_FRAG_ORD");
return;
}
LcpFragOrd* ord = (LcpFragOrd*)signal->getDataPtr();
ndbrequire(ord->lcpId >= m_last_lcp_complete + 1 || m_last_lcp_complete == 0);
m_last_lcp = ord->lcpId;
......@@ -1205,6 +1216,12 @@ Pgman::execLCP_FRAG_ORD(Signal* signal)
void
Pgman::execEND_LCP_REQ(Signal* signal)
{
if (ERROR_INSERTED(11008))
{
ndbout_c("Ignore END_LCP");
return;
}
EndLcpReq* req = (EndLcpReq*)signal->getDataPtr();
m_end_lcp_req = *req;
......@@ -1283,7 +1300,8 @@ Pgman::process_lcp(Signal* signal)
{
DBG_LCP(" pageout()" << endl);
ptr.p->m_state |= Page_entry::LCP;
c_tup->disk_page_unmap_callback(ptr.p->m_real_page_i,
c_tup->disk_page_unmap_callback(0,
ptr.p->m_real_page_i,
ptr.p->m_dirty_count);
pageout(signal, ptr);
}
......@@ -1505,6 +1523,10 @@ Pgman::fswriteconf(Signal* signal, Ptr<Page_entry> ptr)
Page_state state = ptr.p->m_state;
ndbrequire(state & Page_entry::PAGEOUT);
c_tup->disk_page_unmap_callback(1,
ptr.p->m_real_page_i,
ptr.p->m_dirty_count);
state &= ~ Page_entry::PAGEOUT;
state &= ~ Page_entry::EMPTY;
state &= ~ Page_entry::DIRTY;
......@@ -1758,7 +1780,7 @@ Pgman::get_page(Signal* signal, Ptr<Page_entry> ptr, Page_request page_req)
#endif
state |= Page_entry::REQUEST;
if (only_request && req_flags & Page_request::EMPTY_PAGE)
if (only_request && (req_flags & Page_request::EMPTY_PAGE))
{
state |= Page_entry::EMPTY;
}
......@@ -2420,7 +2442,8 @@ Pgman::execDUMP_STATE_ORD(Signal* signal)
if (pl_hash.find(ptr, key))
{
ndbout << "pageout " << ptr << endl;
c_tup->disk_page_unmap_callback(ptr.p->m_real_page_i,
c_tup->disk_page_unmap_callback(0,
ptr.p->m_real_page_i,
ptr.p->m_dirty_count);
pageout(signal, ptr);
}
......@@ -2476,6 +2499,11 @@ Pgman::execDUMP_STATE_ORD(Signal* signal)
{
SET_ERROR_INSERT_VALUE(11007);
}
if (signal->theData[0] == 11008)
{
SET_ERROR_INSERT_VALUE(11008);
}
}
// page cache client
......
......@@ -302,7 +302,7 @@ Tsman::execDUMP_STATE_ORD(Signal* signal){
Uint32 new_bits = curr_bits ^ rand();
Local_key key = chunks[chunk].start_page;
key.m_page_no += page;
ndbrequire(update_page_free_bits(signal, &key, new_bits, 0) == 0);
ndbrequire(update_page_free_bits(signal, &key, new_bits) == 0);
}
}
}
......@@ -369,6 +369,20 @@ Tsman::execCREATE_FILEGROUP_REQ(Signal* signal){
CreateFilegroupImplRef::SignalLength, JBB);
}
NdbOut&
operator<<(NdbOut& out, const File_formats::Datafile::Extent_header & obj)
{
out << "table: " << obj.m_table
<< " fragment: " << obj.m_fragment_id << " ";
for(Uint32 i = 0; i<32; i++)
{
char t[2];
BaseString::snprintf(t, sizeof(t), "%x", obj.get_free_bits(i));
out << t;
}
return out;
}
void
Tsman::execDROP_FILEGROUP_REQ(Signal* signal){
jamEntry();
......@@ -1582,8 +1596,7 @@ Tsman::execFREE_EXTENT_REQ(Signal* signal)
int
Tsman::update_page_free_bits(Signal* signal,
Local_key *key,
unsigned committed_bits,
Uint64 lsn)
unsigned committed_bits)
{
jamEntry();
......@@ -1618,6 +1631,18 @@ Tsman::update_page_free_bits(Signal* signal,
File_formats::Datafile::Extent_header* header =
page->get_header(val.m_extent_no, val.m_extent_size);
if (header->m_table == RNIL)
{
ndbout << "update page free bits page: " << *key
<< " " << *header << endl;
}
if (0)
{
ndbout << "update page free bits page(" << committed_bits << ") "
<< *key << " " << *header << endl;
}
ndbrequire(header->m_table != RNIL);
Uint32 page_no_in_extent = calc_page_no_in_extent(key->m_page_no, &val);
......@@ -1629,7 +1654,7 @@ Tsman::update_page_free_bits(Signal* signal,
Uint32 src = header->get_free_bits(page_no_in_extent) & UNCOMMITTED_MASK;
header->update_free_bits(page_no_in_extent, src | committed_bits);
m_page_cache_client.update_lsn(preq.m_page, lsn);
m_page_cache_client.update_lsn(preq.m_page, 0);
return 0;
}
......@@ -1717,6 +1742,11 @@ Tsman::unmap_page(Signal* signal, Local_key *key, Uint32 uncommitted_bits)
File_formats::Datafile::Extent_header* header =
page->get_header(val.m_extent_no, val.m_extent_size);
if (header->m_table == RNIL)
{
ndbout << "trying to unmap page: " << *key
<< " " << *header << endl;
}
ndbrequire(header->m_table != RNIL);
Uint32 page_no_in_extent = calc_page_no_in_extent(key->m_page_no, &val);
......@@ -1738,9 +1768,7 @@ Tsman::restart_undo_page_free_bits(Signal* signal,
Uint32 tableId,
Uint32 fragId,
Local_key *key,
unsigned bits,
Uint64 undo_lsn,
Uint64 page_lsn)
unsigned bits)
{
jamEntry();
......@@ -1774,21 +1802,7 @@ Tsman::restart_undo_page_free_bits(Signal* signal,
(File_formats::Datafile::Extent_page*)ptr_p;
File_formats::Datafile::Extent_header* header =
page->get_header(val.m_extent_no, val.m_extent_size);
Uint64 lsn = 0;
lsn += page->m_page_header.m_page_lsn_hi; lsn <<= 32;
lsn += page->m_page_header.m_page_lsn_lo;
if (undo_lsn > lsn && undo_lsn > page_lsn)
{
if (DBG_UNDO)
ndbout << "tsman: ignore " << undo_lsn << "(" << lsn << ", "
<< page_lsn << ") "
<< *key << " "
<< " -> " << bits << endl;
return 0;
}
if (header->m_table == RNIL)
{
if (DBG_UNDO)
......@@ -1807,7 +1821,7 @@ Tsman::restart_undo_page_free_bits(Signal* signal,
*/
if (DBG_UNDO)
{
ndbout << "tsman: apply " << undo_lsn << "(" << lsn << ") "
ndbout << "tsman: apply "
<< *key << " " << (src & COMMITTED_MASK)
<< " -> " << bits << endl;
}
......@@ -1855,7 +1869,7 @@ Tsman::execALLOC_PAGE_REQ(Signal* signal)
/**
* Handling of unmapped extent header pages is not implemented
*/
int flags = 0;
int flags = Page_cache_client::DIRTY_REQ;
int real_page_id;
Uint32 page_no;
Uint32 src_bits;
......
......@@ -209,12 +209,12 @@ private:
void load_extent_page_callback(Signal*, Uint32, Uint32);
void create_file_ref(Signal*, Ptr<Tablespace>, Ptr<Datafile>,
Uint32,Uint32,Uint32);
int update_page_free_bits(Signal*, Local_key*, unsigned committed_bits,
Uint64 lsn);
int update_page_free_bits(Signal*, Local_key*, unsigned committed_bits);
int get_page_free_bits(Signal*, Local_key*, unsigned*, unsigned*);
int unmap_page(Signal*, Local_key*, unsigned uncommitted_bits);
int restart_undo_page_free_bits(Signal*, Uint32, Uint32, Local_key*,
unsigned committed_bits, Uint64, Uint64);
unsigned committed_bits);
int alloc_extent(Signal* signal, Uint32 tablespace, Local_key* key);
int alloc_page_from_extent(Signal*, Uint32, Local_key*, Uint32 bits);
......@@ -320,7 +320,7 @@ public:
/**
* Update page free bits
*/
int update_page_free_bits(Local_key*, unsigned bits, Uint64 lsn);
int update_page_free_bits(Local_key*, unsigned bits);
/**
* Get page free bits
......@@ -336,8 +336,7 @@ public:
/**
* Undo handling of page bits
*/
int restart_undo_page_free_bits(Local_key*, unsigned bits,
Uint64 lsn, Uint64 page_lsn);
int restart_undo_page_free_bits(Local_key*, unsigned bits);
/**
* Get tablespace info
......@@ -417,10 +416,9 @@ Tablespace_client::free_extent(Local_key* key, Uint64 lsn)
inline
int
Tablespace_client::update_page_free_bits(Local_key *key,
unsigned committed_bits,
Uint64 lsn)
unsigned committed_bits)
{
return m_tsman->update_page_free_bits(m_signal, key, committed_bits, lsn);
return m_tsman->update_page_free_bits(m_signal, key, committed_bits);
}
inline
......@@ -442,17 +440,13 @@ Tablespace_client::unmap_page(Local_key *key, unsigned uncommitted_bits)
inline
int
Tablespace_client::restart_undo_page_free_bits(Local_key* key,
unsigned committed_bits,
Uint64 lsn,
Uint64 page_lsn)
unsigned committed_bits)
{
return m_tsman->restart_undo_page_free_bits(m_signal,
m_table_id,
m_fragment_id,
key,
committed_bits,
lsn,
page_lsn);
committed_bits);
}
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment