Commit 642ff311 authored by unknown's avatar unknown

ndb - bug#21948 & bug#17605

  fix alloc/free extent in undo log
  allow extent to be reused once a lcp is finished (instead of when next lcp starts)


storage/ndb/include/kernel/signaldata/Extent.hpp:
  Add lsn to alloc extent
storage/ndb/src/kernel/blocks/diskpage.hpp:
  Add (unused) undo entries for drop table, and alloc/free extent
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp:
  Notify TSMAN of both start and stop of LCP
storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp:
  1) Add unused undo entries for drop table, alloc/free extent
  2) handle create_table better (correct?) in undo log
  3) fix some typos/style
storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp:
  1) Add unused undo entries for drop table, alloc/free extent
  2) handle create_table better (correct?) in undo log
  3) fix some typos/style
storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp:
  fix style
storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp:
  fix typo/style
storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp:
  Make sure regFragPtr.p->m_logfile_group_id = RNIL is applicable
storage/ndb/src/kernel/blocks/lgman.cpp:
  Add m_logfile_group_id to log callback
storage/ndb/src/kernel/blocks/print_file.cpp:
  Add (unused) undo entries for drop table, and alloc/free extent
storage/ndb/src/kernel/blocks/tsman.cpp:
  1) change so that LCP limit on resuse of extent is only for duration of lcp
  2) refactor so lookup_extent is put into subroutine
storage/ndb/src/kernel/blocks/tsman.hpp:
  refactor so lookup_extent is put into subroutine
parent 9648c0d1
...@@ -74,6 +74,8 @@ struct FreeExtentReq { ...@@ -74,6 +74,8 @@ struct FreeExtentReq {
Local_key key; Local_key key;
Uint32 table_id; Uint32 table_id;
Uint32 tablespace_id; Uint32 tablespace_id;
Uint32 lsn_hi;
Uint32 lsn_lo;
} request; } request;
struct struct
{ {
......
...@@ -11276,7 +11276,7 @@ void Dblqh::execLCP_PREPARE_REF(Signal* signal) ...@@ -11276,7 +11276,7 @@ void Dblqh::execLCP_PREPARE_REF(Signal* signal)
/** /**
* First fragment mean that last LCP is complete :-) * First fragment mean that last LCP is complete :-)
*/ */
EXECUTE_DIRECT(TSMAN, GSN_END_LCP_REQ, signal, signal->length()); EXECUTE_DIRECT(TSMAN, GSN_LCP_FRAG_ORD, signal, signal->length());
jamEntry(); jamEntry();
} }
...@@ -11327,7 +11327,7 @@ void Dblqh::execLCP_PREPARE_CONF(Signal* signal) ...@@ -11327,7 +11327,7 @@ void Dblqh::execLCP_PREPARE_CONF(Signal* signal)
/** /**
* First fragment mean that last LCP is complete :-) * First fragment mean that last LCP is complete :-)
*/ */
EXECUTE_DIRECT(TSMAN, GSN_END_LCP_REQ, signal, signal->length()); EXECUTE_DIRECT(TSMAN, GSN_LCP_FRAG_ORD, signal, signal->length());
jamEntry(); jamEntry();
} }
...@@ -11611,6 +11611,9 @@ void Dblqh::completeLcpRoundLab(Signal* signal, Uint32 lcpId) ...@@ -11611,6 +11611,9 @@ void Dblqh::completeLcpRoundLab(Signal* signal, Uint32 lcpId)
sendSignal(LGMAN_REF, GSN_END_LCP_REQ, signal, sendSignal(LGMAN_REF, GSN_END_LCP_REQ, signal,
EndLcpReq::SignalLength, JBB); EndLcpReq::SignalLength, JBB);
EXECUTE_DIRECT(TSMAN, GSN_END_LCP_REQ, signal, EndLcpReq::SignalLength);
jamEntry();
lcpPtr.i = 0; lcpPtr.i = 0;
ptrAss(lcpPtr, lcpRecord); ptrAss(lcpPtr, lcpRecord);
lcpPtr.p->m_outstanding = 3; lcpPtr.p->m_outstanding = 3;
......
...@@ -625,7 +625,8 @@ struct Fragrecord { ...@@ -625,7 +625,8 @@ struct Fragrecord {
DLList<ScanOp>::Head m_scanList; DLList<ScanOp>::Head m_scanList;
bool m_undo_complete; enum { UC_LCP = 1, UC_CREATE = 2 };
Uint32 m_undo_complete;
Uint32 m_tablespace_id; Uint32 m_tablespace_id;
Uint32 m_logfile_group_id; Uint32 m_logfile_group_id;
Disk_alloc_info m_disk_alloc_info; Disk_alloc_info m_disk_alloc_info;
...@@ -989,6 +990,9 @@ ArrayPool<TupTriggerData> c_triggerPool; ...@@ -989,6 +990,9 @@ ArrayPool<TupTriggerData> c_triggerPool;
,UNDO_UPDATE = File_formats::Undofile::UNDO_TUP_UPDATE ,UNDO_UPDATE = File_formats::Undofile::UNDO_TUP_UPDATE
,UNDO_FREE = File_formats::Undofile::UNDO_TUP_FREE ,UNDO_FREE = File_formats::Undofile::UNDO_TUP_FREE
,UNDO_CREATE = File_formats::Undofile::UNDO_TUP_CREATE ,UNDO_CREATE = File_formats::Undofile::UNDO_TUP_CREATE
,UNDO_DROP = File_formats::Undofile::UNDO_TUP_DROP
,UNDO_ALLOC_EXTENT = File_formats::Undofile::UNDO_TUP_ALLOC_EXTENT
,UNDO_FREE_EXTENT = File_formats::Undofile::UNDO_TUP_FREE_EXTENT
}; };
struct Alloc struct Alloc
...@@ -1021,6 +1025,30 @@ ArrayPool<TupTriggerData> c_triggerPool; ...@@ -1021,6 +1025,30 @@ ArrayPool<TupTriggerData> c_triggerPool;
Uint32 m_table; Uint32 m_table;
Uint32 m_type_length; // 16 bit type, 16 bit length Uint32 m_type_length; // 16 bit type, 16 bit length
}; };
struct Drop
{
Uint32 m_table;
Uint32 m_type_length; // 16 bit type, 16 bit length
};
struct AllocExtent
{
Uint32 m_table;
Uint32 m_fragment;
Uint32 m_page_no;
Uint32 m_file_no;
Uint32 m_type_length;
};
struct FreeExtent
{
Uint32 m_table;
Uint32 m_fragment;
Uint32 m_page_no;
Uint32 m_file_no;
Uint32 m_type_length;
};
}; };
Extent_info_pool c_extent_pool; Extent_info_pool c_extent_pool;
...@@ -1420,7 +1448,7 @@ public: ...@@ -1420,7 +1448,7 @@ public:
int nr_delete(Signal*, Uint32, Uint32 fragPtr, const Local_key*, Uint32 gci); int nr_delete(Signal*, Uint32, Uint32 fragPtr, const Local_key*, Uint32 gci);
void nr_delete_page_callback(Signal*, Uint32 op, Uint32 page); void nr_delete_page_callback(Signal*, Uint32 op, Uint32 page);
void nr_delete_logbuffer_callback(Signal*, Uint32 op, Uint32 page); void nr_delete_log_buffer_callback(Signal*, Uint32 op, Uint32 page);
private: private:
BLOCK_DEFINES(Dbtup); BLOCK_DEFINES(Dbtup);
...@@ -2345,9 +2373,10 @@ private: ...@@ -2345,9 +2373,10 @@ private:
Uint32 fragId); Uint32 fragId);
void releaseFragment(Signal* signal, Uint32 tableId); void releaseFragment(Signal* signal, Uint32 tableId, Uint32);
void drop_fragment_free_var_pages(Signal*); void drop_fragment_free_var_pages(Signal*);
void drop_fragment_free_exent(Signal*, TablerecPtr, FragrecordPtr, Uint32); void drop_fragment_free_extent(Signal*, TablerecPtr, FragrecordPtr, Uint32);
void drop_fragment_free_extent_log_buffer_callback(Signal*, Uint32, Uint32);
void drop_fragment_unmap_pages(Signal*, TablerecPtr, FragrecordPtr, Uint32); void drop_fragment_unmap_pages(Signal*, TablerecPtr, FragrecordPtr, Uint32);
void drop_fragment_unmap_page_callback(Signal* signal, Uint32, Uint32); void drop_fragment_unmap_page_callback(Signal* signal, Uint32, Uint32);
...@@ -2632,6 +2661,9 @@ private: ...@@ -2632,6 +2661,9 @@ private:
void disk_page_log_buffer_callback(Signal*, Uint32 opPtrI, Uint32); void disk_page_log_buffer_callback(Signal*, Uint32 opPtrI, Uint32);
void disk_page_alloc_extent_log_buffer_callback(Signal*, Uint32, Uint32);
void disk_page_free_extent_log_buffer_callback(Signal*, Uint32, Uint32);
Uint64 disk_page_undo_alloc(Page*, const Local_key*, Uint64 disk_page_undo_alloc(Page*, const Local_key*,
Uint32 sz, Uint32 gci, Uint32 logfile_group_id); Uint32 sz, Uint32 gci, Uint32 logfile_group_id);
...@@ -2646,6 +2678,9 @@ private: ...@@ -2646,6 +2678,9 @@ private:
void undo_createtable_callback(Signal* signal, Uint32 opPtrI, Uint32 unused); void undo_createtable_callback(Signal* signal, Uint32 opPtrI, Uint32 unused);
void undo_createtable_logsync_callback(Signal* signal, Uint32, Uint32); void undo_createtable_logsync_callback(Signal* signal, Uint32, Uint32);
void drop_table_log_buffer_callback(Signal*, Uint32, Uint32);
void drop_table_logsync_callback(Signal*, Uint32, Uint32);
void disk_page_set_dirty(Ptr<Page>); void disk_page_set_dirty(Ptr<Page>);
void restart_setup_page(Disk_alloc_info&, Ptr<Page>); void restart_setup_page(Disk_alloc_info&, Ptr<Page>);
void update_extent_pos(Disk_alloc_info&, Ptr<Extent_info>); void update_extent_pos(Disk_alloc_info&, Ptr<Extent_info>);
...@@ -2679,7 +2714,7 @@ public: ...@@ -2679,7 +2714,7 @@ public:
private: private:
void disk_restart_undo_next(Signal*); void disk_restart_undo_next(Signal*);
void disk_restart_undo_lcp(Uint32, Uint32); void disk_restart_undo_lcp(Uint32, Uint32, Uint32 flag);
void disk_restart_undo_callback(Signal* signal, Uint32, Uint32); void disk_restart_undo_callback(Signal* signal, Uint32, Uint32);
void disk_restart_undo_alloc(Apply_undo*); void disk_restart_undo_alloc(Apply_undo*);
void disk_restart_undo_update(Apply_undo*); void disk_restart_undo_update(Apply_undo*);
......
...@@ -68,6 +68,26 @@ operator<<(NdbOut& out, const Ptr<Dbtup::Extent_info> & ptr) ...@@ -68,6 +68,26 @@ operator<<(NdbOut& out, const Ptr<Dbtup::Extent_info> & ptr)
return out; return out;
} }
#if NOT_YET_FREE_EXTENT
static
inline
bool
check_free(const Dbtup::Extent_info* extP)
{
Uint32 res = 0;
for (Uint32 i = 1; i<MAX_FREE_LIST; i++)
res += extP->m_free_page_count[i];
return res;
}
#error "Code for deallocting extents when they get empty"
#error "This code is not yet complete"
#endif
#if NOT_YET_UNDO_ALLOC_EXTENT
#error "This is needed for deallocting extents when they get empty"
#error "This code is not complete yet"
#endif
void void
Dbtup::dump_disk_alloc(Dbtup::Disk_alloc_info & alloc) Dbtup::dump_disk_alloc(Dbtup::Disk_alloc_info & alloc)
{ {
...@@ -441,10 +461,26 @@ Dbtup::disk_page_prealloc(Signal* signal, ...@@ -441,10 +461,26 @@ Dbtup::disk_page_prealloc(Signal* signal,
/** /**
* We need to alloc an extent * We need to alloc an extent
*/ */
#if NOT_YET_UNDO_ALLOC_EXTENT
Uint32 logfile_group_id = fragPtr.p->m_logfile_group_id;
err = c_lgman->alloc_log_space(logfile_group_id,
sizeof(Disk_undo::AllocExtent)>>2);
if(unlikely(err))
{
return -err;
}
#endif
if (!c_extent_pool.seize(ext)) if (!c_extent_pool.seize(ext))
{ {
jam();
//XXX //XXX
err= 2; err= 2;
#if NOT_YET_UNDO_ALLOC_EXTENT
c_lgman->free_log_space(logfile_group_id,
sizeof(Disk_undo::AllocExtent)>>2);
#endif
c_page_request_pool.release(req); c_page_request_pool.release(req);
ndbout_c("no free extent info"); ndbout_c("no free extent info");
return -err; return -err;
...@@ -452,12 +488,44 @@ Dbtup::disk_page_prealloc(Signal* signal, ...@@ -452,12 +488,44 @@ Dbtup::disk_page_prealloc(Signal* signal,
if ((err= tsman.alloc_extent(&ext.p->m_key)) < 0) if ((err= tsman.alloc_extent(&ext.p->m_key)) < 0)
{ {
jam();
#if NOT_YET_UNDO_ALLOC_EXTENT
c_lgman->free_log_space(logfile_group_id,
sizeof(Disk_undo::AllocExtent)>>2);
#endif
c_extent_pool.release(ext); c_extent_pool.release(ext);
c_page_request_pool.release(req); c_page_request_pool.release(req);
return err; return err;
} }
int pages= err; int pages= err;
#if NOT_YET_UNDO_ALLOC_EXTENT
{
/**
* Do something here
*/
{
Callback cb;
cb.m_callbackData= ext.i;
cb.m_callbackFunction =
safe_cast(&Dbtup::disk_page_alloc_extent_log_buffer_callback);
Uint32 sz= sizeof(Disk_undo::AllocExtent)>>2;
Logfile_client lgman(this, c_lgman, logfile_group_id);
int res= lgman.get_log_buffer(signal, sz, &cb);
switch(res){
case 0:
break;
case -1:
ndbrequire("NOT YET IMPLEMENTED" == 0);
break;
default:
execute(signal, cb, res);
}
}
}
#endif
ndbout << "allocated " << pages << " pages: " << ext.p->m_key << endl; ndbout << "allocated " << pages << " pages: " << ext.p->m_key << endl;
ext.p->m_first_page_no = ext.p->m_key.m_page_no; ext.p->m_first_page_no = ext.p->m_key.m_page_no;
bzero(ext.p->m_free_page_count, sizeof(ext.p->m_free_page_count)); bzero(ext.p->m_free_page_count, sizeof(ext.p->m_free_page_count));
...@@ -1007,6 +1075,12 @@ Dbtup::disk_page_free(Signal *signal, ...@@ -1007,6 +1075,12 @@ Dbtup::disk_page_free(Signal *signal,
extentPtr.p->m_free_space += sz; extentPtr.p->m_free_space += sz;
update_extent_pos(alloc, extentPtr); update_extent_pos(alloc, extentPtr);
#if NOT_YET_FREE_EXTENT
if (check_free(extentPtr.p) == 0)
{
ndbout_c("free: extent is free");
}
#endif
} }
void void
...@@ -1104,13 +1178,55 @@ Dbtup::disk_page_abort_prealloc_callback_1(Signal* signal, ...@@ -1104,13 +1178,55 @@ Dbtup::disk_page_abort_prealloc_callback_1(Signal* signal,
extentPtr.p->m_free_space += sz; extentPtr.p->m_free_space += sz;
update_extent_pos(alloc, extentPtr); update_extent_pos(alloc, extentPtr);
#if NOT_YET_FREE_EXTENT
if (check_free(extentPtr.p) == 0)
{
ndbout_c("abort: extent is free");
}
#endif
}
#if NOT_YET_UNDO_ALLOC_EXTENT
void
Dbtup::disk_page_alloc_extent_log_buffer_callback(Signal* signal,
Uint32 extentPtrI,
Uint32 unused)
{
Ptr<Extent_info> extentPtr;
c_extent_pool.getPtr(extentPtr, extentPtrI);
Local_key key = extentPtr.p->m_key;
Tablespace_client2 tsman(signal, c_tsman, &key);
Ptr<Tablerec> tabPtr;
tabPtr.i= tsman.m_table_id;
ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
Ptr<Fragrecord> fragPtr;
getFragmentrec(fragPtr, tsman.m_fragment_id, tabPtr.p);
Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
Disk_undo::AllocExtent alloc;
alloc.m_table = tabPtr.i;
alloc.m_fragment = tsman.m_fragment_id;
alloc.m_page_no = key.m_page_no;
alloc.m_file_no = key.m_file_no;
alloc.m_type_length = (Disk_undo::UNDO_ALLOC_EXTENT<<16)|(sizeof(alloc)>> 2);
Logfile_client::Change c[1] = {{ &alloc, sizeof(alloc) >> 2 } };
Uint64 lsn= lgman.add_entry(c, 1);
tsman.update_lsn(&key, lsn);
} }
#endif
Uint64 Uint64
Dbtup::disk_page_undo_alloc(Page* page, const Local_key* key, Dbtup::disk_page_undo_alloc(Page* page, const Local_key* key,
Uint32 sz, Uint32 gci, Uint32 logfile_group_id) Uint32 sz, Uint32 gci, Uint32 logfile_group_id)
{ {
Logfile_client lsman(this, c_lgman, logfile_group_id); Logfile_client lgman(this, c_lgman, logfile_group_id);
Disk_undo::Alloc alloc; Disk_undo::Alloc alloc;
alloc.m_type_length= (Disk_undo::UNDO_ALLOC << 16) | (sizeof(alloc) >> 2); alloc.m_type_length= (Disk_undo::UNDO_ALLOC << 16) | (sizeof(alloc) >> 2);
...@@ -1119,7 +1235,7 @@ Dbtup::disk_page_undo_alloc(Page* page, const Local_key* key, ...@@ -1119,7 +1235,7 @@ Dbtup::disk_page_undo_alloc(Page* page, const Local_key* key,
Logfile_client::Change c[1] = {{ &alloc, sizeof(alloc) >> 2 } }; Logfile_client::Change c[1] = {{ &alloc, sizeof(alloc) >> 2 } };
Uint64 lsn= lsman.add_entry(c, 1); Uint64 lsn= lgman.add_entry(c, 1);
m_pgman.update_lsn(* key, lsn); m_pgman.update_lsn(* key, lsn);
return lsn; return lsn;
...@@ -1130,7 +1246,7 @@ Dbtup::disk_page_undo_update(Page* page, const Local_key* key, ...@@ -1130,7 +1246,7 @@ Dbtup::disk_page_undo_update(Page* page, const Local_key* key,
const Uint32* src, Uint32 sz, const Uint32* src, Uint32 sz,
Uint32 gci, Uint32 logfile_group_id) Uint32 gci, Uint32 logfile_group_id)
{ {
Logfile_client lsman(this, c_lgman, logfile_group_id); Logfile_client lgman(this, c_lgman, logfile_group_id);
Disk_undo::Update update; Disk_undo::Update update;
update.m_page_no = key->m_page_no; update.m_page_no = key->m_page_no;
...@@ -1148,7 +1264,7 @@ Dbtup::disk_page_undo_update(Page* page, const Local_key* key, ...@@ -1148,7 +1264,7 @@ Dbtup::disk_page_undo_update(Page* page, const Local_key* key,
ndbassert(4*(3 + sz + 1) == (sizeof(update) + 4*sz - 4)); ndbassert(4*(3 + sz + 1) == (sizeof(update) + 4*sz - 4));
Uint64 lsn= lsman.add_entry(c, 3); Uint64 lsn= lgman.add_entry(c, 3);
m_pgman.update_lsn(* key, lsn); m_pgman.update_lsn(* key, lsn);
return lsn; return lsn;
...@@ -1159,7 +1275,7 @@ Dbtup::disk_page_undo_free(Page* page, const Local_key* key, ...@@ -1159,7 +1275,7 @@ Dbtup::disk_page_undo_free(Page* page, const Local_key* key,
const Uint32* src, Uint32 sz, const Uint32* src, Uint32 sz,
Uint32 gci, Uint32 logfile_group_id) Uint32 gci, Uint32 logfile_group_id)
{ {
Logfile_client lsman(this, c_lgman, logfile_group_id); Logfile_client lgman(this, c_lgman, logfile_group_id);
Disk_undo::Free free; Disk_undo::Free free;
free.m_page_no = key->m_page_no; free.m_page_no = key->m_page_no;
...@@ -1177,7 +1293,7 @@ Dbtup::disk_page_undo_free(Page* page, const Local_key* key, ...@@ -1177,7 +1293,7 @@ Dbtup::disk_page_undo_free(Page* page, const Local_key* key,
ndbassert(4*(3 + sz + 1) == (sizeof(free) + 4*sz - 4)); ndbassert(4*(3 + sz + 1) == (sizeof(free) + 4*sz - 4));
Uint64 lsn= lsman.add_entry(c, 3); Uint64 lsn= lgman.add_entry(c, 3);
m_pgman.update_lsn(* key, lsn); m_pgman.update_lsn(* key, lsn);
return lsn; return lsn;
...@@ -1207,7 +1323,7 @@ Dbtup::disk_restart_undo(Signal* signal, Uint64 lsn, ...@@ -1207,7 +1323,7 @@ Dbtup::disk_restart_undo(Signal* signal, Uint64 lsn,
ndbrequire(len == 3); ndbrequire(len == 3);
Uint32 tableId = ptr[1] >> 16; Uint32 tableId = ptr[1] >> 16;
Uint32 fragId = ptr[1] & 0xFFFF; Uint32 fragId = ptr[1] & 0xFFFF;
disk_restart_undo_lcp(tableId, fragId); disk_restart_undo_lcp(tableId, fragId, Fragrecord::UC_LCP);
disk_restart_undo_next(signal); disk_restart_undo_next(signal);
return; return;
} }
...@@ -1246,10 +1362,20 @@ Dbtup::disk_restart_undo(Signal* signal, Uint64 lsn, ...@@ -1246,10 +1362,20 @@ Dbtup::disk_restart_undo(Signal* signal, Uint64 lsn,
ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec); ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
for(Uint32 i = 0; i<MAX_FRAG_PER_NODE; i++) for(Uint32 i = 0; i<MAX_FRAG_PER_NODE; i++)
if (tabPtr.p->fragrec[i] != RNIL) if (tabPtr.p->fragrec[i] != RNIL)
disk_restart_undo_lcp(tabPtr.i, tabPtr.p->fragid[i]); disk_restart_undo_lcp(tabPtr.i, tabPtr.p->fragid[i],
Fragrecord::UC_CREATE);
disk_restart_undo_next(signal); disk_restart_undo_next(signal);
return; return;
} }
case File_formats::Undofile::UNDO_TUP_DROP:
jam();
case File_formats::Undofile::UNDO_TUP_ALLOC_EXTENT:
jam();
case File_formats::Undofile::UNDO_TUP_FREE_EXTENT:
jam();
disk_restart_undo_next(signal);
return;
case File_formats::Undofile::UNDO_END: case File_formats::Undofile::UNDO_END:
f_undo_done = true; f_undo_done = true;
return; return;
...@@ -1283,7 +1409,7 @@ Dbtup::disk_restart_undo_next(Signal* signal) ...@@ -1283,7 +1409,7 @@ Dbtup::disk_restart_undo_next(Signal* signal)
} }
void void
Dbtup::disk_restart_undo_lcp(Uint32 tableId, Uint32 fragId) Dbtup::disk_restart_undo_lcp(Uint32 tableId, Uint32 fragId, Uint32 flag)
{ {
Ptr<Tablerec> tabPtr; Ptr<Tablerec> tabPtr;
tabPtr.i= tableId; tabPtr.i= tableId;
...@@ -1295,7 +1421,7 @@ Dbtup::disk_restart_undo_lcp(Uint32 tableId, Uint32 fragId) ...@@ -1295,7 +1421,7 @@ Dbtup::disk_restart_undo_lcp(Uint32 tableId, Uint32 fragId)
getFragmentrec(fragPtr, fragId, tabPtr.p); getFragmentrec(fragPtr, fragId, tabPtr.p);
if (!fragPtr.isNull()) if (!fragPtr.isNull())
{ {
fragPtr.p->m_undo_complete = true; fragPtr.p->m_undo_complete |= flag;
} }
} }
} }
...@@ -1502,6 +1628,12 @@ Dbtup::disk_restart_alloc_extent(Uint32 tableId, Uint32 fragId, ...@@ -1502,6 +1628,12 @@ Dbtup::disk_restart_alloc_extent(Uint32 tableId, Uint32 fragId,
if (tabPtr.p->tableStatus == DEFINED) if (tabPtr.p->tableStatus == DEFINED)
{ {
getFragmentrec(fragPtr, fragId, tabPtr.p); getFragmentrec(fragPtr, fragId, tabPtr.p);
if (fragPtr.p->m_undo_complete & Fragrecord::UC_CREATE)
{
jam();
return -1;
}
if (!fragPtr.isNull()) if (!fragPtr.isNull())
{ {
Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info; Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
......
...@@ -3129,7 +3129,7 @@ Dbtup::nr_delete(Signal* signal, Uint32 senderData, ...@@ -3129,7 +3129,7 @@ Dbtup::nr_delete(Signal* signal, Uint32 senderData,
disk_page_set_dirty(disk_page); disk_page_set_dirty(disk_page);
preq.m_callback.m_callbackFunction = preq.m_callback.m_callbackFunction =
safe_cast(&Dbtup::nr_delete_logbuffer_callback); safe_cast(&Dbtup::nr_delete_log_buffer_callback);
Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id); Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
res= lgman.get_log_buffer(signal, sz, &preq.m_callback); res= lgman.get_log_buffer(signal, sz, &preq.m_callback);
switch(res){ switch(res){
...@@ -3182,7 +3182,7 @@ Dbtup::nr_delete_page_callback(Signal* signal, ...@@ -3182,7 +3182,7 @@ Dbtup::nr_delete_page_callback(Signal* signal,
Callback cb; Callback cb;
cb.m_callbackData = userpointer; cb.m_callbackData = userpointer;
cb.m_callbackFunction = cb.m_callbackFunction =
safe_cast(&Dbtup::nr_delete_logbuffer_callback); safe_cast(&Dbtup::nr_delete_log_buffer_callback);
Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id); Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
int res= lgman.get_log_buffer(signal, sz, &cb); int res= lgman.get_log_buffer(signal, sz, &cb);
switch(res){ switch(res){
...@@ -3202,7 +3202,7 @@ Dbtup::nr_delete_page_callback(Signal* signal, ...@@ -3202,7 +3202,7 @@ Dbtup::nr_delete_page_callback(Signal* signal,
} }
void void
Dbtup::nr_delete_logbuffer_callback(Signal* signal, Dbtup::nr_delete_log_buffer_callback(Signal* signal,
Uint32 userpointer, Uint32 userpointer,
Uint32 unused) Uint32 unused)
{ {
......
...@@ -164,7 +164,7 @@ void Dbtup::execCONTINUEB(Signal* signal) ...@@ -164,7 +164,7 @@ void Dbtup::execCONTINUEB(Signal* signal)
break; break;
case ZREL_FRAG: case ZREL_FRAG:
ljam(); ljam();
releaseFragment(signal, dataPtr); releaseFragment(signal, dataPtr, signal->theData[2]);
break; break;
case ZREPORT_MEMORY_USAGE:{ case ZREPORT_MEMORY_USAGE:{
ljam(); ljam();
...@@ -223,7 +223,7 @@ void Dbtup::execCONTINUEB(Signal* signal) ...@@ -223,7 +223,7 @@ void Dbtup::execCONTINUEB(Signal* signal)
fragPtr.i= signal->theData[2]; fragPtr.i= signal->theData[2];
ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec); ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord); ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
drop_fragment_free_exent(signal, tabPtr, fragPtr, signal->theData[3]); drop_fragment_free_extent(signal, tabPtr, fragPtr, signal->theData[3]);
return; return;
} }
case ZUNMAP_PAGES: case ZUNMAP_PAGES:
......
...@@ -309,6 +309,7 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal) ...@@ -309,6 +309,7 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal)
if(lastAttr) if(lastAttr)
{ {
jam();
/** /**
* Init Disk_alloc_info * Init Disk_alloc_info
*/ */
...@@ -320,6 +321,11 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal) ...@@ -320,6 +321,11 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal)
ndbrequire(tsman.get_tablespace_info(&rep) == 0); ndbrequire(tsman.get_tablespace_info(&rep) == 0);
regFragPtr.p->m_logfile_group_id= rep.tablespace.logfile_group_id; regFragPtr.p->m_logfile_group_id= rep.tablespace.logfile_group_id;
} }
else
{
jam();
regFragPtr.p->m_logfile_group_id = RNIL;
}
new (&regFragPtr.p->m_disk_alloc_info) new (&regFragPtr.p->m_disk_alloc_info)
Disk_alloc_info(regTabPtr.p, rep.tablespace.extent_size); Disk_alloc_info(regTabPtr.p, rep.tablespace.extent_size);
releaseFragoperrec(fragOperPtr); releaseFragoperrec(fragOperPtr);
...@@ -565,6 +571,11 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal) ...@@ -565,6 +571,11 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal)
ndbrequire(tsman.get_tablespace_info(&rep) == 0); ndbrequire(tsman.get_tablespace_info(&rep) == 0);
regFragPtr.p->m_logfile_group_id= rep.tablespace.logfile_group_id; regFragPtr.p->m_logfile_group_id= rep.tablespace.logfile_group_id;
} }
else
{
jam();
regFragPtr.p->m_logfile_group_id = RNIL;
}
new (&regFragPtr.p->m_disk_alloc_info) new (&regFragPtr.p->m_disk_alloc_info)
Disk_alloc_info(regTabPtr.p, rep.tablespace.extent_size); Disk_alloc_info(regTabPtr.p, rep.tablespace.extent_size);
...@@ -597,7 +608,7 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal) ...@@ -597,7 +608,7 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal)
ndbrequire("NOT YET IMPLEMENTED" == 0); ndbrequire("NOT YET IMPLEMENTED" == 0);
break; break;
} }
execute(signal, cb, 0); execute(signal, cb, regFragPtr.p->m_logfile_group_id);
return; return;
} }
} }
...@@ -874,7 +885,8 @@ Dbtup::execDROP_TAB_REQ(Signal* signal) ...@@ -874,7 +885,8 @@ Dbtup::execDROP_TAB_REQ(Signal* signal)
signal->theData[0]= ZREL_FRAG; signal->theData[0]= ZREL_FRAG;
signal->theData[1]= tabPtr.i; signal->theData[1]= tabPtr.i;
sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB); signal->theData[2]= RNIL;
sendSignal(cownref, GSN_CONTINUEB, signal, 3, JBB);
} }
void Dbtup::releaseTabDescr(Tablerec* const regTabPtr) void Dbtup::releaseTabDescr(Tablerec* const regTabPtr)
...@@ -902,7 +914,8 @@ void Dbtup::releaseTabDescr(Tablerec* const regTabPtr) ...@@ -902,7 +914,8 @@ void Dbtup::releaseTabDescr(Tablerec* const regTabPtr)
} }
} }
void Dbtup::releaseFragment(Signal* signal, Uint32 tableId) void Dbtup::releaseFragment(Signal* signal, Uint32 tableId,
Uint32 logfile_group_id)
{ {
TablerecPtr tabPtr; TablerecPtr tabPtr;
tabPtr.i= tableId; tabPtr.i= tableId;
...@@ -930,15 +943,34 @@ void Dbtup::releaseFragment(Signal* signal, Uint32 tableId) ...@@ -930,15 +943,34 @@ void Dbtup::releaseFragment(Signal* signal, Uint32 tableId)
return; return;
} }
DropTabConf * const dropConf= (DropTabConf *)signal->getDataPtrSend(); #if NOT_YET_UNDO_DROP_TABLE
dropConf->senderRef= reference(); #error "This code is complete, but I prefer not to enable it until I need it"
dropConf->senderData= tabPtr.p->m_dropTable.tabUserPtr; if (logfile_group_id != RNIL)
dropConf->tableId= tabPtr.i; {
sendSignal(tabPtr.p->m_dropTable.tabUserRef, GSN_DROP_TAB_CONF, Callback cb;
signal, DropTabConf::SignalLength, JBB); cb.m_callbackData= tabPtr.i;
cb.m_callbackFunction =
safe_cast(&Dbtup::drop_table_log_buffer_callback);
Uint32 sz= sizeof(Disk_undo::Drop) >> 2;
int r0 = c_lgman->alloc_log_space(logfile_group_id, sz);
releaseTabDescr(tabPtr.p); Logfile_client lgman(this, c_lgman, logfile_group_id);
initTab(tabPtr.p); int res= lgman.get_log_buffer(signal, sz, &cb);
switch(res){
case 0:
ljam();
return;
case -1:
ndbrequire("NOT YET IMPLEMENTED" == 0);
break;
default:
execute(signal, cb, logfile_group_id);
return;
}
}
#endif
drop_table_logsync_callback(signal, tabPtr.i, RNIL);
} }
void void
...@@ -965,7 +997,7 @@ Dbtup::drop_fragment_unmap_pages(Signal *signal, ...@@ -965,7 +997,7 @@ Dbtup::drop_fragment_unmap_pages(Signal *signal,
alloc_info.m_curr_extent_info_ptr_i= RNIL; alloc_info.m_curr_extent_info_ptr_i= RNIL;
} }
drop_fragment_free_exent(signal, tabPtr, fragPtr, 0); drop_fragment_free_extent(signal, tabPtr, fragPtr, 0);
return; return;
} }
...@@ -998,7 +1030,7 @@ Dbtup::drop_fragment_unmap_pages(Signal *signal, ...@@ -998,7 +1030,7 @@ Dbtup::drop_fragment_unmap_pages(Signal *signal,
} }
return; return;
} }
drop_fragment_free_exent(signal, tabPtr, fragPtr, 0); drop_fragment_free_extent(signal, tabPtr, fragPtr, 0);
} }
void void
...@@ -1031,7 +1063,7 @@ Dbtup::drop_fragment_unmap_page_callback(Signal* signal, ...@@ -1031,7 +1063,7 @@ Dbtup::drop_fragment_unmap_page_callback(Signal* signal,
} }
void void
Dbtup::drop_fragment_free_exent(Signal *signal, Dbtup::drop_fragment_free_extent(Signal *signal,
TablerecPtr tabPtr, TablerecPtr tabPtr,
FragrecordPtr fragPtr, FragrecordPtr fragPtr,
Uint32 pos) Uint32 pos)
...@@ -1040,6 +1072,125 @@ Dbtup::drop_fragment_free_exent(Signal *signal, ...@@ -1040,6 +1072,125 @@ Dbtup::drop_fragment_free_exent(Signal *signal,
{ {
Disk_alloc_info& alloc_info= fragPtr.p->m_disk_alloc_info; Disk_alloc_info& alloc_info= fragPtr.p->m_disk_alloc_info;
for(; pos<EXTENT_SEARCH_MATRIX_SIZE; pos++) for(; pos<EXTENT_SEARCH_MATRIX_SIZE; pos++)
{
if(!alloc_info.m_free_extents[pos].isEmpty())
{
jam();
Callback cb;
cb.m_callbackData= fragPtr.i;
cb.m_callbackFunction =
safe_cast(&Dbtup::drop_fragment_free_extent_log_buffer_callback);
#if NOT_YET_UNDO_FREE_EXTENT
Uint32 sz= sizeof(Disk_undo::FreeExtent) >> 2;
int r0 = c_lgman->alloc_log_space(fragPtr.p->m_logfile_group_id, sz);
Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
int res= lgman.get_log_buffer(signal, sz, &cb);
switch(res){
case 0:
ljam();
return;
case -1:
ndbrequire("NOT YET IMPLEMENTED" == 0);
break;
default:
execute(signal, cb, fragPtr.p->m_logfile_group_id);
return;
}
#else
execute(signal, cb, fragPtr.p->m_logfile_group_id);
return;
#endif
}
}
ArrayPool<Page> *cheat_pool= (ArrayPool<Page>*)&m_global_page_pool;
for(pos= 0; pos<MAX_FREE_LIST; pos++)
{
ndbrequire(alloc_info.m_page_requests[pos].isEmpty());
LocalDLList<Page> list(* cheat_pool, alloc_info.m_dirty_pages[pos]);
list.remove();
}
}
signal->theData[0] = ZFREE_VAR_PAGES;
signal->theData[1] = tabPtr.i;
signal->theData[2] = fragPtr.i;
sendSignal(reference(), GSN_CONTINUEB, signal, 3, JBB);
}
void
Dbtup::drop_table_log_buffer_callback(Signal* signal, Uint32 tablePtrI,
Uint32 logfile_group_id)
{
TablerecPtr tabPtr;
tabPtr.i = tablePtrI;
ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
ndbrequire(tabPtr.p->m_no_of_disk_attributes);
Disk_undo::Drop drop;
drop.m_table = tabPtr.i;
drop.m_type_length =
(Disk_undo::UNDO_DROP << 16) | (sizeof(drop) >> 2);
Logfile_client lgman(this, c_lgman, logfile_group_id);
Logfile_client::Change c[1] = {{ &drop, sizeof(drop) >> 2 } };
Uint64 lsn = lgman.add_entry(c, 1);
Logfile_client::Request req;
req.m_callback.m_callbackData= tablePtrI;
req.m_callback.m_callbackFunction =
safe_cast(&Dbtup::drop_table_logsync_callback);
int ret = lgman.sync_lsn(signal, lsn, &req, 0);
switch(ret){
case 0:
return;
default:
ndbout_c("ret: %d", ret);
ndbrequire(false);
}
}
void
Dbtup::drop_table_logsync_callback(Signal* signal,
Uint32 tabPtrI,
Uint32 logfile_group_id)
{
TablerecPtr tabPtr;
tabPtr.i = tabPtrI;
ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
DropTabConf * const dropConf= (DropTabConf *)signal->getDataPtrSend();
dropConf->senderRef= reference();
dropConf->senderData= tabPtr.p->m_dropTable.tabUserPtr;
dropConf->tableId= tabPtr.i;
sendSignal(tabPtr.p->m_dropTable.tabUserRef, GSN_DROP_TAB_CONF,
signal, DropTabConf::SignalLength, JBB);
releaseTabDescr(tabPtr.p);
initTab(tabPtr.p);
}
void
Dbtup::drop_fragment_free_extent_log_buffer_callback(Signal* signal,
Uint32 fragPtrI,
Uint32 unused)
{
FragrecordPtr fragPtr;
fragPtr.i = fragPtrI;
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
TablerecPtr tabPtr;
tabPtr.i = fragPtr.p->fragTableId;
ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
ndbrequire(tabPtr.p->m_no_of_disk_attributes);
Disk_alloc_info& alloc_info= fragPtr.p->m_disk_alloc_info;
for(Uint32 pos = 0; pos<EXTENT_SEARCH_MATRIX_SIZE; pos++)
{ {
if(!alloc_info.m_free_extents[pos].isEmpty()) if(!alloc_info.m_free_extents[pos].isEmpty())
{ {
...@@ -1049,11 +1200,29 @@ Dbtup::drop_fragment_free_exent(Signal *signal, ...@@ -1049,11 +1200,29 @@ Dbtup::drop_fragment_free_exent(Signal *signal,
Ptr<Extent_info> ext_ptr; Ptr<Extent_info> ext_ptr;
list.first(ext_ptr); list.first(ext_ptr);
#if NOT_YET_UNDO_FREE_EXTENT
#error "This code is complete"
#error "but not needed until we do dealloc of empty extents"
Disk_undo::FreeExtent free;
free.m_table = tabPtr.i;
free.m_fragment = fragPtr.p->fragmentId;
free.m_file_no = ext_ptr.p->m_key.m_file_no;
free.m_page_no = ext_ptr.p->m_key.m_page_no;
free.m_type_length =
(Disk_undo::UNDO_FREE_EXTENT << 16) | (sizeof(free) >> 2);
Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
Logfile_client::Change c[1] = {{ &free, sizeof(free) >> 2 } };
Uint64 lsn = lgman.add_entry(c, 1);
#else
Uint64 lsn = 0;
#endif
Tablespace_client tsman(signal, c_tsman, tabPtr.i, Tablespace_client tsman(signal, c_tsman, tabPtr.i,
fragPtr.p->fragmentId, fragPtr.p->fragmentId,
fragPtr.p->m_tablespace_id); fragPtr.p->m_tablespace_id);
tsman.free_extent(&ext_ptr.p->m_key); tsman.free_extent(&ext_ptr.p->m_key, lsn);
c_extent_hash.remove(ext_ptr); c_extent_hash.remove(ext_ptr);
list.release(ext_ptr); list.release(ext_ptr);
...@@ -1065,20 +1234,7 @@ Dbtup::drop_fragment_free_exent(Signal *signal, ...@@ -1065,20 +1234,7 @@ Dbtup::drop_fragment_free_exent(Signal *signal,
return; return;
} }
} }
ndbrequire(false);
ArrayPool<Page> *cheat_pool= (ArrayPool<Page>*)&m_global_page_pool;
for(pos= 0; pos<MAX_FREE_LIST; pos++)
{
ndbrequire(alloc_info.m_page_requests[pos].isEmpty());
LocalDLList<Page> list(* cheat_pool, alloc_info.m_dirty_pages[pos]);
list.remove();
}
}
signal->theData[0] = ZFREE_VAR_PAGES;
signal->theData[1] = tabPtr.i;
signal->theData[2] = fragPtr.i;
sendSignal(reference(), GSN_CONTINUEB, signal, 3, JBB);
} }
void void
...@@ -1112,7 +1268,7 @@ Dbtup::drop_fragment_free_var_pages(Signal* signal) ...@@ -1112,7 +1268,7 @@ Dbtup::drop_fragment_free_var_pages(Signal* signal)
sendSignal(cownref, GSN_CONTINUEB, signal, 3, JBB); sendSignal(cownref, GSN_CONTINUEB, signal, 3, JBB);
return; return;
} }
Uint32 logfile_group_id = fragPtr.p->m_logfile_group_id ;
releaseFragPages(fragPtr.p); releaseFragPages(fragPtr.p);
Uint32 i; Uint32 i;
for(i= 0; i<MAX_FRAG_PER_NODE; i++) for(i= 0; i<MAX_FRAG_PER_NODE; i++)
...@@ -1126,7 +1282,8 @@ Dbtup::drop_fragment_free_var_pages(Signal* signal) ...@@ -1126,7 +1282,8 @@ Dbtup::drop_fragment_free_var_pages(Signal* signal)
signal->theData[0]= ZREL_FRAG; signal->theData[0]= ZREL_FRAG;
signal->theData[1]= tabPtr.i; signal->theData[1]= tabPtr.i;
sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB); signal->theData[2]= logfile_group_id;
sendSignal(cownref, GSN_CONTINUEB, signal, 3, JBB);
return; return;
} }
......
...@@ -154,6 +154,9 @@ struct File_formats ...@@ -154,6 +154,9 @@ struct File_formats
,UNDO_TUP_UPDATE = 4 ,UNDO_TUP_UPDATE = 4
,UNDO_TUP_FREE = 5 ,UNDO_TUP_FREE = 5
,UNDO_TUP_CREATE = 6 ,UNDO_TUP_CREATE = 6
,UNDO_TUP_DROP = 7
,UNDO_TUP_ALLOC_EXTENT = 8
,UNDO_TUP_FREE_EXTENT = 9
,UNDO_END = 0x7FFF ,UNDO_END = 0x7FFF
,UNDO_NEXT_LSN = 0x8000 ,UNDO_NEXT_LSN = 0x8000
......
...@@ -1160,13 +1160,14 @@ Lgman::process_log_sync_waiters(Signal* signal, Ptr<Logfile_group> ptr) ...@@ -1160,13 +1160,14 @@ Lgman::process_log_sync_waiters(Signal* signal, Ptr<Logfile_group> ptr)
bool removed= false; bool removed= false;
Ptr<Log_waiter> waiter; Ptr<Log_waiter> waiter;
list.first(waiter); list.first(waiter);
Uint32 logfile_group_id = ptr.p->m_logfile_group_id;
if(waiter.p->m_sync_lsn <= ptr.p->m_last_synced_lsn) if(waiter.p->m_sync_lsn <= ptr.p->m_last_synced_lsn)
{ {
removed= true; removed= true;
Uint32 block = waiter.p->m_block; Uint32 block = waiter.p->m_block;
SimulatedBlock* b = globalData.getBlock(block); SimulatedBlock* b = globalData.getBlock(block);
b->execute(signal, waiter.p->m_callback, 0); b->execute(signal, waiter.p->m_callback, logfile_group_id);
list.releaseFirst(waiter); list.releaseFirst(waiter);
} }
...@@ -1522,12 +1523,13 @@ Lgman::process_log_buffer_waiters(Signal* signal, Ptr<Logfile_group> ptr) ...@@ -1522,12 +1523,13 @@ Lgman::process_log_buffer_waiters(Signal* signal, Ptr<Logfile_group> ptr)
bool removed= false; bool removed= false;
Ptr<Log_waiter> waiter; Ptr<Log_waiter> waiter;
list.first(waiter); list.first(waiter);
Uint32 logfile_group_id = ptr.p->m_logfile_group_id;
if(waiter.p->m_size + 2*File_formats::UNDO_PAGE_WORDS < free_buffer) if(waiter.p->m_size + 2*File_formats::UNDO_PAGE_WORDS < free_buffer)
{ {
removed= true; removed= true;
Uint32 block = waiter.p->m_block; Uint32 block = waiter.p->m_block;
SimulatedBlock* b = globalData.getBlock(block); SimulatedBlock* b = globalData.getBlock(block);
b->execute(signal, waiter.p->m_callback, 0); b->execute(signal, waiter.p->m_callback, logfile_group_id);
list.releaseFirst(waiter); list.releaseFirst(waiter);
} }
...@@ -2699,6 +2701,9 @@ Lgman::execute_undo_record(Signal* signal) ...@@ -2699,6 +2701,9 @@ Lgman::execute_undo_record(Signal* signal)
case File_formats::Undofile::UNDO_TUP_UPDATE: case File_formats::Undofile::UNDO_TUP_UPDATE:
case File_formats::Undofile::UNDO_TUP_FREE: case File_formats::Undofile::UNDO_TUP_FREE:
case File_formats::Undofile::UNDO_TUP_CREATE: case File_formats::Undofile::UNDO_TUP_CREATE:
case File_formats::Undofile::UNDO_TUP_DROP:
case File_formats::Undofile::UNDO_TUP_ALLOC_EXTENT:
case File_formats::Undofile::UNDO_TUP_FREE_EXTENT:
tup->disk_restart_undo(signal, lsn, mask, ptr - len + 1, len); tup->disk_restart_undo(signal, lsn, mask, ptr - len + 1, len);
return; return;
default: default:
......
...@@ -304,6 +304,8 @@ print_undo_page(int count, void* ptr, Uint32 sz){ ...@@ -304,6 +304,8 @@ print_undo_page(int count, void* ptr, Uint32 sz){
case File_formats::Undofile::UNDO_LCP: case File_formats::Undofile::UNDO_LCP:
printf("[ %lld LCP %d tab: %d frag: %d ]", lsn, printf("[ %lld LCP %d tab: %d frag: %d ]", lsn,
src[0], src[1] >> 16, src[1] & 0xFFFF); src[0], src[1] >> 16, src[1] & 0xFFFF);
if(g_verbosity <= 3)
printf("\n");
break; break;
case File_formats::Undofile::UNDO_TUP_ALLOC: case File_formats::Undofile::UNDO_TUP_ALLOC:
if(g_verbosity > 3) if(g_verbosity > 3)
...@@ -340,6 +342,48 @@ print_undo_page(int count, void* ptr, Uint32 sz){ ...@@ -340,6 +342,48 @@ print_undo_page(int count, void* ptr, Uint32 sz){
req->m_gci); req->m_gci);
} }
break; break;
case File_formats::Undofile::UNDO_TUP_CREATE:
{
Dbtup::Disk_undo::Create *req = (Dbtup::Disk_undo::Create*)src;
printf("[ %lld Create %d ]", lsn, req->m_table);
if(g_verbosity <= 3)
printf("\n");
break;
}
case File_formats::Undofile::UNDO_TUP_DROP:
{
Dbtup::Disk_undo::Drop *req = (Dbtup::Disk_undo::Drop*)src;
printf("[ %lld Drop %d ]", lsn, req->m_table);
if(g_verbosity <= 3)
printf("\n");
break;
}
case File_formats::Undofile::UNDO_TUP_ALLOC_EXTENT:
{
Dbtup::Disk_undo::AllocExtent *req = (Dbtup::Disk_undo::AllocExtent*)src;
printf("[ %lld AllocExtent tab: %d frag: %d file: %d page: %d ]",
lsn,
req->m_table,
req->m_fragment,
req->m_file_no,
req->m_page_no);
if(g_verbosity <= 3)
printf("\n");
break;
}
case File_formats::Undofile::UNDO_TUP_FREE_EXTENT:
{
Dbtup::Disk_undo::FreeExtent *req = (Dbtup::Disk_undo::FreeExtent*)src;
printf("[ %lld FreeExtent tab: %d frag: %d file: %d page: %d ]",
lsn,
req->m_table,
req->m_fragment,
req->m_file_no,
req->m_page_no);
if(g_verbosity <= 3)
printf("\n");
break;
}
default: default:
ndbout_c("[ Unknown type %d len: %d, pos: %d ]", type, len, pos); ndbout_c("[ Unknown type %d len: %d, pos: %d ]", type, len, pos);
if(!(len && type)) if(!(len && type))
......
This diff is collapsed.
...@@ -63,6 +63,7 @@ protected: ...@@ -63,6 +63,7 @@ protected:
void execALLOC_PAGE_REQ(Signal* signal); void execALLOC_PAGE_REQ(Signal* signal);
void execLCP_FRAG_ORD(Signal*);
void execEND_LCP_REQ(Signal*); void execEND_LCP_REQ(Signal*);
void end_lcp(Signal*, Uint32 tablespace, Uint32 list, Uint32 file); void end_lcp(Signal*, Uint32 tablespace, Uint32 list, Uint32 file);
...@@ -108,6 +109,7 @@ public: ...@@ -108,6 +109,7 @@ public:
Uint32 m_offset_data_pages; // 1(zero) + extent header pages Uint32 m_offset_data_pages; // 1(zero) + extent header pages
Uint32 m_data_pages; Uint32 m_data_pages;
Uint32 m_used_extent_cnt; Uint32 m_used_extent_cnt;
Uint32 m_extent_headers_per_extent_page;
} m_online; } m_online;
struct { struct {
Uint32 m_senderData; Uint32 m_senderData;
...@@ -196,6 +198,7 @@ private: ...@@ -196,6 +198,7 @@ private:
Datafile_pool m_file_pool; Datafile_pool m_file_pool;
Tablespace_pool m_tablespace_pool; Tablespace_pool m_tablespace_pool;
bool m_lcp_ongoing;
Datafile_hash m_file_hash; Datafile_hash m_file_hash;
Tablespace_list m_tablespace_list; Tablespace_list m_tablespace_list;
Tablespace_hash m_tablespace_hash; Tablespace_hash m_tablespace_hash;
...@@ -226,15 +229,52 @@ private: ...@@ -226,15 +229,52 @@ private:
void release_extent_pages(Signal* signal, Ptr<Datafile> ptr); void release_extent_pages(Signal* signal, Ptr<Datafile> ptr);
void release_extent_pages_callback(Signal*, Uint32, Uint32); void release_extent_pages_callback(Signal*, Uint32, Uint32);
struct req
{
Uint32 m_extent_pages;
Uint32 m_extent_size;
Uint32 m_extent_no; // on extent page
Uint32 m_extent_page_no;
};
struct req lookup_extent(Uint32 page_no, const Datafile*) const;
Uint32 calc_page_no_in_extent(Uint32 page_no, const struct req* val) const;
}; };
inline
Tsman::req
Tsman::lookup_extent(Uint32 page_no, const Datafile * filePtrP) const
{
struct req val;
val.m_extent_size = filePtrP->m_extent_size;
val.m_extent_pages = filePtrP->m_online.m_offset_data_pages;
Uint32 per_page = filePtrP->m_online.m_extent_headers_per_extent_page;
Uint32 extent =
(page_no - val.m_extent_pages) / val.m_extent_size + per_page;
val.m_extent_page_no = extent / per_page;
val.m_extent_no = extent % per_page;
return val;
}
inline
Uint32
Tsman::calc_page_no_in_extent(Uint32 page_no, const Tsman::req* val) const
{
return (page_no - val->m_extent_pages) % val->m_extent_size;
}
class Tablespace_client class Tablespace_client
{ {
public:
Tsman * m_tsman; Tsman * m_tsman;
Signal* m_signal; Signal* m_signal;
Uint32 m_table_id; Uint32 m_table_id;
Uint32 m_fragment_id; Uint32 m_fragment_id;
Uint32 m_tablespace_id; Uint32 m_tablespace_id;
public: public:
Tablespace_client(Signal* signal, Tsman* tsman, Tablespace_client(Signal* signal, Tsman* tsman,
Uint32 table, Uint32 fragment, Uint32 tablespaceId) { Uint32 table, Uint32 fragment, Uint32 tablespaceId) {
...@@ -245,6 +285,8 @@ public: ...@@ -245,6 +285,8 @@ public:
m_tablespace_id= tablespaceId; m_tablespace_id= tablespaceId;
} }
Tablespace_client(Signal* signal, Tsman* tsman, Local_key* key);
/** /**
* Return >0 if success, no of pages in extent, sets key * Return >0 if success, no of pages in extent, sets key
* <0 if failure, -error code * <0 if failure, -error code
...@@ -274,7 +316,7 @@ public: ...@@ -274,7 +316,7 @@ public:
/** /**
* Free extent * Free extent
*/ */
int free_extent(Local_key* key); int free_extent(Local_key* key, Uint64 lsn);
/** /**
* Update page free bits * Update page free bits
...@@ -307,6 +349,11 @@ public: ...@@ -307,6 +349,11 @@ public:
* <0 - on error * <0 - on error
*/ */
int get_tablespace_info(CreateFilegroupImplReq* rep); int get_tablespace_info(CreateFilegroupImplReq* rep);
/**
* Update lsn of page corresponing to key
*/
int update_lsn(Local_key* key, Uint64 lsn);
}; };
#include <signaldata/Extent.hpp> #include <signaldata/Extent.hpp>
...@@ -351,12 +398,14 @@ Tablespace_client::alloc_page_from_extent(Local_key* key, Uint32 bits) ...@@ -351,12 +398,14 @@ Tablespace_client::alloc_page_from_extent(Local_key* key, Uint32 bits)
inline inline
int int
Tablespace_client::free_extent(Local_key* key) Tablespace_client::free_extent(Local_key* key, Uint64 lsn)
{ {
FreeExtentReq* req = (FreeExtentReq*)m_signal->theData; FreeExtentReq* req = (FreeExtentReq*)m_signal->theData;
req->request.key = *key; req->request.key = *key;
req->request.table_id = m_table_id; req->request.table_id = m_table_id;
req->request.tablespace_id = m_tablespace_id; req->request.tablespace_id = m_tablespace_id;
req->request.lsn_hi = (Uint32)(lsn >> 32);
req->request.lsn_lo = (Uint32)(lsn & 0xFFFFFFFF);
m_tsman->execFREE_EXTENT_REQ(m_signal); m_tsman->execFREE_EXTENT_REQ(m_signal);
if(req->reply.errorCode == 0){ if(req->reply.errorCode == 0){
...@@ -407,5 +456,4 @@ Tablespace_client::restart_undo_page_free_bits(Local_key* key, ...@@ -407,5 +456,4 @@ Tablespace_client::restart_undo_page_free_bits(Local_key* key,
page_lsn); page_lsn);
} }
#endif #endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment