Commit fc08f82b authored by unknown's avatar unknown

WL#3072 - Maria recovery

Unit test for recovery: runs ma_test1 and ma_test2 (both only with
INSERTs and DELETEs; UPDATEs disabled as not handled by recovery)
then moves the tables elswhere; recreates tables from the log, and
compares and fails if there is a difference. Passes now.
Most of maria_read_log.c moved to ma_recovery.c, as it will be re-used
for recovery-from-ha_maria.
Bugfixes of applying of REDO_INSERT, REDO_PURGE_ROW.
Applying of REDO_PURGE_BLOCKS, REDO_DELETE_ALL, REDO_DROP_TABLE,
UNDO_ROW_INSERT (in REDO phase only, i.e. just doing records++),
UNDO_ROW_DELETE, UNDO_ROW_PURGE.
Code cleanups.
Monty: please look for "QQ". Sanja: please look for "Sanja".
Future tasks: recovery of the bitmap (easy), recovery of the state
(make it idempotent), more REDOs (Monty to work on
REDO_UPDATE?), UNDO phase...
Pushing this cset as it looks safe, contains test and bugfixes which
will help Monty implement applying of REDO_UPDATE.


sql/handler.cc:
  typo
storage/maria/Makefile.am:
  Adding ma_test_recovery (which ma_test_all invokes, and which can
  also be run alone). Most of maria_read_log.c moved to ma_recovery.c
storage/maria/ha_maria.cc:
  comments
storage/maria/ma_bitmap.c:
  fixing comments. 2 -> sizeof(maria_bitmap_marker).
  Bitmap-related part of _ma_initialize_datafile() moves in bitmap module.
  Now putting the "bm" signature when creating the first bitmap page
  (it used to happen only at next open, but that
  caused an annoying difference when testing Recovery if the original
  run didn't open the table, and it looks more
  logical like this: it goes to disk only with its signature correct);
  see the "QQ" comment towards the _ma_initialize_data_file() call
  in ma_create.c for more).
  When reading a bitmap page, verify its signature (happens when normally
  using the table or when CHECKing it; not when REPAIRing it).
storage/maria/ma_blockrec.c:
  * no need to sync the data file if table is not transactional
  * Comments, code cleanup (log-related data moved to log-related code
  block, int5store->page_store).
  * Store the table's short id into LOGREC_UNDO_ROW_PURGE, like we
  do for other records (though this record will soon be replaced
  with a CLR).
  * If "page" is 1 it means the page which extends from byte
  page*block_size+1 to (page+1)*block_size (byte number 1 being
  the first byte of the file). The last byte of the file is
  data_file_length (same convention).
  A new page needs to be created if the last byte of the page is
  beyond the last byte of the file, i.e.
   (page+1)*block_size+1 > data_file_length, so we correct the test
  (bug found when testing log applying for ma_test1 -M -T --skip-update).
  * update the page's LSN when removing a row from it during
  execution of a REDO_PURGE_ROW record (bug found when testing log
  applying for ma_test1 -M -T --skip-update).
  * applying of REDO_PURGE_BLOCKs (limited to a one-page range for now).
storage/maria/ma_blockrec.h:
  new functions. maria_bitmap_marker does not need to be exported.
storage/maria/ma_close.c:
  we can always flush the table's state when closing the last instance
  of the table. And it is needed for maria_read_log (as it does
  not use maria_lock_database()).
storage/maria/ma_control_file.c:
  when in Recovery, some assertions should not be used.
storage/maria/ma_control_file.h:
  double-inclusion safe
storage/maria/ma_create.c:
  during recovery, don't log records. Comments.
  Moving the creation of the first bitmap page to ma_bitmap.c
storage/maria/ma_delete_table.c:
  during recovery, don't log records. Log the end-zero of the dropped
  table's name, so that recovery can use the string in place without
  extending it to fit an end zero.
storage/maria/ma_loghandler.c:
  * inwrite_rec_hook also needs access to the MARIA_SHARE, like
  prewrite_rec_hook. This will be needed to update
  share->records_diff (in the upcoming patch "recovery of the state").
  * LOG_DESC::record_ends_group changed to an enum.
  * LOG_DESC for LOGREC_REDO_PURGE_BLOCKS and LOGREC_UNDO_ROW_PURGE
  corrected
  * Sanja please see the @todo LOG BUG
  * avoiding DBUG_RETURN(func()) as it gives confusing debug traces.
storage/maria/ma_loghandler.h:
  - log write hooks called while the log's lock is held (inwrite_rec_hook)
  now need the MARIA_SHARE, like prewrite_rec_hook already had
  - instead of a bool saying if this record's type ends groups or not,
  we refine: it may not end a group, it may end a group, or it may
  be a group in itself. Imagine that we had a physical write failure
  to a table before we log the UNDO, we still end up in
  external_lock(F_UNLCK) and then we log a COMMIT: we don't want
  to consider this COMMIT as ending the group of REDOs (don't want
  to execute those REDOs during Recovery), that's why we say "COMMIT
  is a group in itself, it aborts any previous group". This also
  gives one more sanity check in maria_read_log.
storage/maria/ma_recovery.c:
  New Recovery code, replacing the old pseudocode.
  Most of maria_read_log moved here.
  Call-able from ha_maria, but not enabled yet.
  Compared to the previous version of maria_read_log, some bugs have
  been fixed, debugging output can go to stdout or a disk file (for now
  it's useful for me, later it can be changed), execution of
  REDO_DROP_TABLE, REDO_DELETE_ALL, REDO_PURGE_BLOCKS has been added. Duplicate code
  has been factored into functions. We abort an unfinished group
  of records if we see a record which is a group in itself (like COMMIT).
  No need for maria_panic() after a bug (which caused tables to not
  be closed) was fixed; if there is yet another bug I prefer to see it.
  When opening a table for Recovery, set data_file_length
  and key_file_length to their real physical value (these are the
  easiest state members to restore :). Warn us if the last page
  was truncated (but Recovery handles it).
  MARIA_SHARE::state::state::records is now partly recovered (not
  idempotent, but works if recreating tables from scracth).
  When applying a REDO to a page, stamp it with the UNDO's LSN
  (current_group_end_lsn), not with the REDO's LSN; it makes
  the table more identical to the original table (easier to compare
  the two tables in the end).
  Big thing missing: some types of REDOs are not handled,
  and the UNDO phase does not exist (missing functions to execute UNDOs
  to actually rollback). So for now tests are only inserting/deleting
  a few 100 rows, closing the table and seeing if the log is applied ok;
  it works. UPDATE not handled.
storage/maria/ma_recovery.h:
  new functions: ma_recover() for recovery from inside ha_maria;
  _ma_apply_log() for maria_read_log (ma_recover() calls _ma_apply_log()).
  Btw, we need to not use the word "recover" for REPAIR/maria_chk anymore.
storage/maria/ma_rename.c:
  don't write log records during recovery
storage/maria/ma_test2.c:
  - fail if maria_info() or other subtests find some wrong information
  - new option -g to skip updates.
  - init the translog before creating the table, so that log applying
  can work.
  - in "#if 0" you'll see some fixed bugs (will be removed).
storage/maria/ma_test_all.sh:
  cleanup files. Test log applying.
storage/maria/maria_read_log.c:
  most of the logic moves to ma_recovery.c to be shared between
  maria_read_log and recovery-from-inside-mysqld.
  See ma_recovery.c for additional changes made to the moved code.
storage/maria/ma_test_recovery:
  unit test for Recovery. Tests insert and delete,
  REDO_UPDATE not yet coded.
  Script is called from ma_test_all. Can run standalone.
parent 97a41052
......@@ -2788,7 +2788,7 @@ int ha_change_key_cache(KEY_CACHE *old_key_cache,
int ha_init_pagecache(const char *name, PAGECACHE *pagecache)
{
DBUG_ENTER("ha_init_key_cache");
DBUG_ENTER("ha_init_pagecache");
if (!pagecache->inited)
{
......
......@@ -30,8 +30,8 @@ DEFS = @DEFS@
# "." is needed first because tests in unittest need libmaria
SUBDIRS = . unittest
EXTRA_DIST = ma_test_all.sh ma_test_all.res ma_ft_stem.c CMakeLists.txt plug.in
pkgdata_DATA = ma_test_all ma_test_all.res
EXTRA_DIST = ma_test_all.sh ma_test_all.res ma_ft_stem.c CMakeLists.txt plug.in ma_test_recovery
pkgdata_DATA = ma_test_all ma_test_all.res ma_test_recovery
pkglib_LIBRARIES = libmaria.a
bin_PROGRAMS = maria_chk maria_pack maria_ftdump maria_read_log
maria_chk_DEPENDENCIES= $(LIBRARIES)
......@@ -61,7 +61,7 @@ noinst_HEADERS = maria_def.h ma_rt_index.h ma_rt_key.h ma_rt_mbr.h \
ma_ft_eval.h trnman.h lockman.h tablockman.h \
ma_control_file.h ha_maria.h ma_blockrec.h \
ma_loghandler.h ma_loghandler_lsn.h ma_pagecache.h \
ma_commit.h
ma_recovery.h ma_commit.h
ma_test1_DEPENDENCIES= $(LIBRARIES)
ma_test1_LDADD= @CLIENT_EXTRA_LDFLAGS@ libmaria.a \
$(top_builddir)/storage/myisam/libmyisam.a \
......@@ -120,7 +120,7 @@ libmaria_a_SOURCES = ma_init.c ma_open.c ma_extra.c ma_info.c ma_rkey.c \
ma_rt_index.c ma_rt_key.c ma_rt_mbr.c ma_rt_split.c \
ma_sp_key.c ma_control_file.c ma_loghandler.c \
ma_pagecache.c ma_pagecaches.c \
ma_commit.c
ma_recovery.c ma_commit.c
CLEANFILES = test?.MA? FT?.MA? isam.log ma_test_all ma_rt_test.MA? sp_test.MA?
SUFFIXES = .sh
......
......@@ -37,6 +37,15 @@
#define trans_register_ha(A, B, C) do { /* nothing */ } while(0)
#endif
/**
@todo For now there is no way for a user to set a different value of
maria_recover_options, i.e. auto-check-and-repair is always disabled.
We could enable it. As the auto-repair is initiated when opened from the
SQL layer (open_unireg_entry(), check_and_repair()), it does not happen
when Maria's Recovery internally opens the table to apply log records to
it, which is good. It would happen only after Recovery, if the table is
still corrupted.
*/
ulong maria_recover_options= HA_RECOVER_NONE;
static handlerton *maria_hton;
......@@ -1877,6 +1886,10 @@ int ha_maria::external_lock(THD *thd, int lock_type)
corresponding unlock (they just stay locked and are later dropped while
locked); if a tmp table was transactional, "SELECT FROM non_tmp, tmp"
would never commit as its "locked_tables" count would stay 1.
When Maria has has_transactions()==TRUE, open_temporary_table()
(sql_base.cc) will use TRANSACTIONAL_TMP_TABLE and thus the
external_lock(F_UNLCK) will happen and we can then allow the user to
create transactional temporary tables.
*/
if (!file->s->base.born_transactional)
goto skip_transaction;
......
......@@ -130,6 +130,7 @@
#define FULL_HEAD_PAGE 4
#define FULL_TAIL_PAGE 7
/** all bitmap pages end with this 2-byte signature */
uchar maria_bitmap_marker[2]= {(uchar) 'b',(uchar) 'm'};
static my_bool _ma_read_bitmap_page(MARIA_SHARE *share,
......@@ -244,7 +245,7 @@ my_bool _ma_bitmap_end(MARIA_SHARE *share)
/*
Flush bitmap to disk
Send updated bitmap to the page cache
SYNOPSIS
_ma_flush_bitmap()
......@@ -286,7 +287,7 @@ my_bool _ma_flush_bitmap(MARIA_SHARE *share)
share Share handler
NOTES
This is called on ma_delete_all (truncate data file).
This is called on maria_delete_all_rows (truncate data file).
*/
void _ma_bitmap_delete_all(MARIA_SHARE *share)
......@@ -294,8 +295,9 @@ void _ma_bitmap_delete_all(MARIA_SHARE *share)
MARIA_FILE_BITMAP *bitmap= &share->bitmap;
if (bitmap->map) /* Not in create */
{
bzero(bitmap->map, share->block_size);
memcpy(bitmap->map + share->block_size - 2, maria_bitmap_marker, 2);
bzero(bitmap->map, bitmap->block_size);
memcpy(bitmap->map + bitmap->block_size - sizeof(maria_bitmap_marker),
maria_bitmap_marker, sizeof(maria_bitmap_marker));
bitmap->changed= 1;
bitmap->page= 0;
bitmap->used_size= bitmap->total_size;
......@@ -497,6 +499,10 @@ static void _ma_print_bitmap(MARIA_FILE_BITMAP *bitmap)
TODO
Update 'bitmap->used_size' to real size of used bitmap
NOTE
We don't always have share->bitmap.bitmap_lock here
(when called from_ma_check_bitmap_data() for example).
RETURN
0 ok
1 error (Error writing old bitmap or reading bitmap page)
......@@ -516,7 +522,8 @@ static my_bool _ma_read_bitmap_page(MARIA_SHARE *share,
{
share->state.state.data_file_length= position + bitmap->block_size;
bzero(bitmap->map, bitmap->block_size);
memcpy(bitmap->map + share->block_size - 2, maria_bitmap_marker, 2);
memcpy(bitmap->map + bitmap->block_size - sizeof(maria_bitmap_marker),
maria_bitmap_marker, sizeof(maria_bitmap_marker));
bitmap->used_size= 0;
#ifndef DBUG_OFF
memcpy(bitmap->map + bitmap->block_size, bitmap->map, bitmap->block_size);
......@@ -525,11 +532,14 @@ static my_bool _ma_read_bitmap_page(MARIA_SHARE *share,
}
bitmap->used_size= bitmap->total_size;
DBUG_ASSERT(share->pagecache->block_size == bitmap->block_size);
res= pagecache_read(share->pagecache,
(PAGECACHE_FILE*)&bitmap->file, page, 0,
(byte*) bitmap->map,
PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED, 0) == 0;
res= (pagecache_read(share->pagecache,
(PAGECACHE_FILE*)&bitmap->file, page, 0,
(byte*) bitmap->map,
PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED, 0) == NULL) |
memcmp(bitmap->map + bitmap->block_size -
sizeof(maria_bitmap_marker),
maria_bitmap_marker, sizeof(maria_bitmap_marker));
#ifndef DBUG_OFF
if (!res)
memcpy(bitmap->map + bitmap->block_size, bitmap->map, bitmap->block_size);
......@@ -1630,9 +1640,16 @@ static my_bool set_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap,
bitmap->changed= 1;
DBUG_EXECUTE("bitmap", _ma_print_bitmap(bitmap););
if (fill_pattern != 3 && fill_pattern != 7 &&
bitmap_page < info->s->state.first_bitmap_with_space)
info->s->state.first_bitmap_with_space= bitmap_page;
if (fill_pattern != 3 && fill_pattern != 7)
set_if_smaller(info->s->state.first_bitmap_with_space, bitmap_page);
/*
Note that if the condition above is false (page is full), and all pages of
this bitmap are now full, and that bitmap page was
first_bitmap_with_space, we don't modify first_bitmap_with_space, indeed
its value still tells us where to start our search for a bitmap with space
(which is for sure after this full one).
That does mean that first_bitmap_with_space is only a lower bound.
*/
DBUG_RETURN(0);
}
......@@ -1747,8 +1764,7 @@ my_bool _ma_reset_full_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap,
tmp= (1 << bit_count) - 1;
*data&= ~tmp;
}
if (bitmap_page < info->s->state.first_bitmap_with_space)
info->s->state.first_bitmap_with_space= bitmap_page;
set_if_smaller(info->s->state.first_bitmap_with_space, bitmap_page);
bitmap->changed= 1;
DBUG_EXECUTE("bitmap", _ma_print_bitmap(bitmap););
DBUG_RETURN(0);
......@@ -2014,3 +2030,28 @@ my_bool _ma_check_if_right_bitmap_type(MARIA_HA *info,
DBUG_ASSERT(0);
return 1;
}
/**
@brief create the first bitmap page of a freshly created data file
@param share table's share
@return Operation status
@retval 0 OK
@retval !=0 Error
*/
int _ma_bitmap_create_first(MARIA_SHARE *share)
{
uint block_size= share->bitmap.block_size;
File file= share->bitmap.file.file;
if (my_chsize(file, block_size, 0, MYF(MY_WME)) ||
my_pwrite(file, maria_bitmap_marker, sizeof(maria_bitmap_marker),
block_size - sizeof(maria_bitmap_marker),
MYF(MY_NABP | MY_WME)))
return 1;
share->state.state.data_file_length= block_size;
_ma_bitmap_delete_all(share);
return 0;
}
......@@ -398,7 +398,8 @@ my_bool _ma_once_end_block_record(MARIA_SHARE *share)
File must be synced as it is going out of the maria_open_list and so
becoming unknown to Checkpoint.
*/
if (my_sync(share->bitmap.file.file, MYF(MY_WME)) ||
if ((share->now_transactional &&
my_sync(share->bitmap.file.file, MYF(MY_WME))) ||
my_close(share->bitmap.file.file, MYF(MY_WME)))
res= 1;
/*
......@@ -1455,9 +1456,6 @@ static my_bool free_full_pages(MARIA_HA *info, MARIA_ROW *row)
static my_bool free_full_page_range(MARIA_HA *info, ulonglong page, uint count)
{
uchar log_data[FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE +
ROW_EXTENT_SIZE];
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
my_bool res= 0;
if (pagecache_delete_pages(info->s->pagecache, &info->dfile,
......@@ -1467,12 +1465,16 @@ static my_bool free_full_page_range(MARIA_HA *info, ulonglong page, uint count)
if (info->s->now_transactional)
{
LSN lsn;
/** @todo unify log_data's shape with delete_head_or_tail() */
uchar log_data[FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE +
ROW_EXTENT_SIZE];
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
DBUG_ASSERT(info->trn->rec_lsn);
pagerange_store(log_data + FILEID_STORE_SIZE, 1);
int5store(log_data + FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE,
page_store(log_data + FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE,
page);
int2store(log_data + FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE + 5,
count);
int2store(log_data + FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE +
PAGE_STORE_SIZE, count);
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
......@@ -1967,8 +1969,8 @@ static my_bool write_block_record(MARIA_HA *info,
((last_head_block - head_block) - 2) * ROW_EXTENT_SIZE;
}
DBUG_ASSERT(uint2korr(extent_data+5) & TAIL_BIT);
int5store(extent_data, head_tail_block->page);
int2store(extent_data + 5, head_tail_block->page_count);
page_store(extent_data, head_tail_block->page);
int2store(extent_data + PAGE_STORE_SIZE, head_tail_block->page_count);
}
}
else
......@@ -2225,7 +2227,11 @@ static my_bool write_block_record(MARIA_HA *info,
and this hook will mark the table corrupted.
Maybe hook should be stored in the pagecache's block structure, or in a
hash "file->maria_ha*".
*/
@todo RECOVERY we should distinguish below between log write error and
table write error. The former should stop Maria immediately, the latter
should mark the table corrupted.
*/
/* Unpin all pinned pages to not cause problems for disk cache */
_ma_unpin_all_pages(info, 0);
......@@ -2340,7 +2346,7 @@ my_bool _ma_write_abort_block_record(MARIA_HA *info)
{
LSN lsn;
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
uchar log_data[LSN_STORE_SIZE];
uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE];
/*
Write UNDO record
......@@ -2351,16 +2357,28 @@ my_bool _ma_write_abort_block_record(MARIA_HA *info)
*/
/**
@todo RECOVERY BUG
We will soon change that: we will here execute the UNDO records
generated while we were trying to write the row; this will log some
CLRs which will replace this LOGREC_UNDO_PURGE.
We do need the code above (delete_head_or_tail() etc) for
non-transactional tables.
For transactional tables we can either also use it or execute the
UNDO_INSERT. If we crash before this
_ma_write_abort_block_record(), Recovery will do the work of this
function by executing UNDO_INSERT.
For transactional tables, we will remove this LOGREC_UNDO_PURGE and
replace it with a LOGREC_CLR_END: we should go back the UNDO chain
until we reach the UNDO which inserted the row into the data file, and
use its previous_undo_lsn.
Same logic for when we remove inserted keys (in case of error in
maria_write(): we come to the present function only after removing the
inserted keys... as long as we unpin the key pages only after writing
the CLR_END, this would be recovery-safe...).
*/
lsn_store(log_data, info->trn->undo_lsn);
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
if (translog_write_record(&lsn, LOGREC_UNDO_ROW_PURGE,
info->trn, NULL, sizeof(log_data),
TRANSLOG_INTERNAL_PARTS + 1, log_array, NULL))
info->trn, info->s, sizeof(log_data),
TRANSLOG_INTERNAL_PARTS + 1, log_array,
log_data + LSN_STORE_SIZE))
res= 1;
}
_ma_unpin_all_pages(info, info->trn->undo_lsn);
......@@ -2390,6 +2408,7 @@ my_bool _ma_update_block_record(MARIA_HA *info, MARIA_RECORD_POS record_pos,
ulonglong page;
struct st_row_pos_info row_pos;
MARIA_SHARE *share= info->s;
my_bool res;
DBUG_ENTER("_ma_update_block_record");
DBUG_PRINT("enter", ("rowid: %lu", (long) record_pos));
......@@ -2486,8 +2505,8 @@ my_bool _ma_update_block_record(MARIA_HA *info, MARIA_RECORD_POS record_pos,
row_pos.dir= dir;
row_pos.data= buff + uint2korr(dir);
row_pos.length= head_length;
DBUG_RETURN(write_block_record(info, oldrec, record, new_row, blocks, 1,
&row_pos));
res= write_block_record(info, oldrec, record, new_row, blocks, 1, &row_pos);
DBUG_RETURN(res);
err:
_ma_unpin_all_pages(info, 0);
......@@ -2609,7 +2628,7 @@ static my_bool delete_head_or_tail(MARIA_HA *info,
res= delete_dir_entry(buff, block_size, record_number, &empty_space);
if (res < 0)
DBUG_RETURN(1);
if (res == 0)
if (res == 0) /* after our deletion, page is still not empty */
{
uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE];
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
......@@ -2638,14 +2657,13 @@ static my_bool delete_head_or_tail(MARIA_HA *info,
PAGECACHE_WRITE_DELAY, &page_link.link))
DBUG_RETURN(1);
}
else
else /* page is now empty */
{
uchar log_data[FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE +
PAGE_STORE_SIZE + PAGERANGE_STORE_SIZE];
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
if (info->s->now_transactional)
{
uchar log_data[FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE +
PAGE_STORE_SIZE + PAGERANGE_STORE_SIZE];
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
pagerange_store(log_data + FILEID_STORE_SIZE, 1);
page_store(log_data+ FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE, page);
pagerange_store(log_data + FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE +
......@@ -2850,7 +2868,7 @@ static void init_extent(MARIA_EXTENT_CURSOR *extent, byte *extent_info,
uint page_count;
extent->extent= extent_info;
extent->extent_count= extents;
extent->page= uint5korr(extent_info); /* First extent */
extent->page= page_korr(extent_info); /* First extent */
page_count= uint2korr(extent_info + ROW_EXTENT_PAGE_SIZE);
extent->page_count= page_count & ~TAIL_BIT;
extent->tail= page_count & TAIL_BIT;
......@@ -2890,7 +2908,7 @@ static byte *read_next_extent(MARIA_HA *info, MARIA_EXTENT_CURSOR *extent,
if (!--extent->extent_count)
goto crashed;
extent->extent+= ROW_EXTENT_SIZE;
extent->page= uint5korr(extent->extent);
extent->page= page_korr(extent->extent);
page_count= uint2korr(extent->extent+ROW_EXTENT_PAGE_SIZE);
if (!page_count)
goto crashed;
......@@ -4124,15 +4142,21 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn,
uint block_size= share->block_size;
uint rec_offset;
byte *buff= info->keyread_buff, *dir;
DBUG_ENTER("_ma_apply_redo_insert_row_head");
DBUG_ENTER("_ma_apply_redo_insert_row_head_or_tail");
info->keyread_buff_used= 1;
page= page_korr(header);
rownr= dirpos_korr(header+PAGE_STORE_SIZE);
if (page * info->s->block_size > info->state->data_file_length)
if (((page + 1) * info->s->block_size) > info->state->data_file_length)
{
/* New page at end of file */
/*
New page at end of file. Note that the test above is also positive if
data_file_length is not a multiple of block_size (system crashed while
writing the last page): in this case we just extend the last page and
fill it entirely with zeroes, then the REDO will put correct data on
it.
*/
DBUG_ASSERT(rownr == 0);
if (rownr != 0)
goto err;
......@@ -4142,7 +4166,7 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn,
dir= buff+ block_size - PAGE_SUFFIX_SIZE - DIR_ENTRY_SIZE;
/* Update that file is extended */
info->state->data_file_length= page * info->s->block_size;
info->state->data_file_length= (page + 1) * info->s->block_size;
}
else
{
......@@ -4295,8 +4319,6 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn,
lsn LSN to put on page
page_type HEAD_PAGE or TAIL_PAGE
header Header (without FILEID)
data Data to be put on page
data_length Length of data
NOTES
This function is very similar to delete_head_or_tail()
......@@ -4341,6 +4363,7 @@ uint _ma_apply_redo_purge_row_head_or_tail(MARIA_HA *info, LSN lsn,
if (delete_dir_entry(buff, block_size, record_number, &empty_space) < 0)
DBUG_RETURN(HA_ERR_WRONG_IN_RECORD);
lsn_store(buff, lsn);
if (pagecache_write(share->pagecache,
&info->dfile, page, 0,
buff, PAGECACHE_PLAIN_PAGE,
......@@ -4355,3 +4378,91 @@ uint _ma_apply_redo_purge_row_head_or_tail(MARIA_HA *info, LSN lsn,
DBUG_RETURN(0);
}
/**
@brief Apply LOGREC_REDO_PURGE_BLOCKS
@param info Maria handler
@param header Header (without FILEID)
@note It marks the page free in the bitmap, and sets the directory's count
to 0.
@return Operation status
@retval 0 OK
@retval !=0 Error
*/
uint _ma_apply_redo_purge_blocks(MARIA_HA *info,
LSN lsn, const byte *header)
{
MARIA_SHARE *share= info->s;
ulonglong page;
uint page_range;
uint res;
byte *buff= info->keyread_buff;
uint block_size= share->block_size;
DBUG_ENTER("_ma_apply_redo_purge_blocks");
info->keyread_buff_used= 1;
page_range= pagerange_korr(header);
/* works only for a one-page range for now */
DBUG_ASSERT(page_range == 1); // for now
header+= PAGERANGE_STORE_SIZE;
page= page_korr(header);
header+= PAGE_STORE_SIZE;
page_range= pagerange_korr(header);
DBUG_ASSERT(page_range == 1); // for now
if (!(buff= pagecache_read(share->pagecache,
&info->dfile,
page, 0,
buff, PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED, 0)))
DBUG_RETURN(my_errno);
if (lsn_korr(buff) >= lsn)
{
/* Already applied */
goto mark_free_in_bitmap;
}
buff[PAGE_TYPE_OFFSET]= UNALLOCATED_PAGE;
/*
Strictly speaking, we don't need to zero the last directory entry of this
page; setting the directory's count to zero is enough (it makes the last
directory entry invisible, irrelevant).
But as the "runtime" code (delete_head_or_tail()) called
delete_dir_entry() which zeroed the entry, if we don't do it here, we get
a difference between runtime and log-applying. Irrelevant, but it's
time-consuming to differentiate irrelevant differences from relevant
ones. So we remove the difference by zeroing the entry.
*/
{
uint rownr= ((uint) ((uchar *) buff)[DIR_COUNT_OFFSET]) - 1;
byte *dir= (buff + block_size - DIR_ENTRY_SIZE * rownr -
DIR_ENTRY_SIZE - PAGE_SUFFIX_SIZE);
dir[0]= dir[1]= 0; /* Delete entry */
}
buff[DIR_COUNT_OFFSET]= 0;
lsn_store(buff, lsn);
if (pagecache_write(share->pagecache,
&info->dfile, page, 0,
buff, PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED,
PAGECACHE_PIN_LEFT_UNPINNED,
PAGECACHE_WRITE_DELAY, 0))
DBUG_RETURN(my_errno);
mark_free_in_bitmap:
/** @todo leave bitmap lock to the bitmap code... */
pthread_mutex_lock(&share->bitmap.bitmap_lock);
res= _ma_reset_full_page_bits(info, &share->bitmap, page, 1);
pthread_mutex_unlock(&share->bitmap.bitmap_lock);
DBUG_RETURN(res);
}
......@@ -105,8 +105,6 @@ enum en_page_type { UNALLOCATED_PAGE, HEAD_PAGE, TAIL_PAGE, BLOB_PAGE, MAX_PAGE_
/* Don't allocate memory for too many row extents on the stack */
#define ROW_EXTENTS_ON_STACK 32
extern uchar maria_bitmap_marker[2];
/* Functions to convert MARIA_RECORD_POS to/from page:offset */
static inline MARIA_RECORD_POS ma_recordpos(ulonglong page, uint dir_entry)
......@@ -178,6 +176,7 @@ my_bool _ma_check_if_right_bitmap_type(MARIA_HA *info,
ulonglong page,
uint *bitmap_pattern);
void _ma_bitmap_delete_all(MARIA_SHARE *share);
int _ma_bitmap_create_first(MARIA_SHARE *share);
uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn,
uint page_type,
const byte *header,
......@@ -186,3 +185,5 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn,
uint _ma_apply_redo_purge_row_head_or_tail(MARIA_HA *info, LSN lsn,
uint page_type,
const byte *header);
uint _ma_apply_redo_purge_blocks(MARIA_HA *info, LSN lsn,
const byte *header);
......@@ -87,7 +87,7 @@ int maria_close(register MARIA_HA *info)
may be using the file at this point
IF using --external-locking, which does not apply to Maria.
*/
if (share->mode != O_RDONLY && maria_is_crashed(info))
if (share->mode != O_RDONLY)
_ma_state_info_write(share->kfile.file, &share->state, 1);
if (my_close(share->kfile.file, MYF(0)))
error= my_errno;
......
......@@ -51,6 +51,8 @@ uint32 last_logno= FILENO_IMPOSSIBLE;
it is called at startup.
*/
my_bool maria_multi_threaded= FALSE;
/** @brief if currently doing a recovery */
my_bool maria_in_recovery= FALSE;
/*
Control file is less then 512 bytes (a disk sector),
......
......@@ -18,6 +18,9 @@
First version written by Guilhem Bichot on 2006-04-27.
*/
#ifndef _ma_control_file_h
#define _ma_control_file_h
#define CONTROL_FILE_BASE_NAME "maria_log_control"
/* Here is the interface of this module */
......@@ -33,7 +36,7 @@ extern LSN last_checkpoint_lsn;
*/
extern uint32 last_logno;
extern my_bool maria_multi_threaded;
extern my_bool maria_multi_threaded, maria_in_recovery;
typedef enum enum_control_file_error {
CONTROL_FILE_OK= 0,
......@@ -74,3 +77,4 @@ int ma_control_file_end();
#ifdef __cplusplus
}
#endif
#endif
......@@ -677,7 +677,7 @@ int maria_create(const char *name, enum data_file_type datafile_type,
/* max_data_file_length and max_key_file_length are recalculated on open */
if (tmp_table)
share.base.max_data_file_length= (my_off_t) ci->data_file_length;
else if (ci->transactional && translog_inited)
else if (ci->transactional && translog_inited && !maria_in_recovery)
{
/*
we have checked translog_inited above, because maria_chk may call us
......@@ -940,23 +940,31 @@ int maria_create(const char *name, enum data_file_type datafile_type,
for (i= TRANSLOG_INTERNAL_PARTS;
i < (sizeof(log_array)/sizeof(log_array[0])); i++)
total_rec_length+= log_array[i].length;
/*
For this record to be of any use for Recovery, we need the upper
MySQL layer to be crash-safe, which it is not now (that would require
work using the ddl_log of sql/sql_table.cc); when it is, we should
reconsider the moment of writing this log record (before or after op,
under THR_LOCK_maria or not...), how to use it in Recovery.
For now this record can serve when we apply logs to a backup,
so we sync it. This happens before the data file is created. If the data
file was created before, and we crashed before writing the log record,
at restart the table may be used, so we would not have a trustable
history in the log (impossible to apply this log to a backup). The way
we do it, if we crash before writing the log record then there is no
data file and the table cannot be used.
Note that in case of TRUNCATE TABLE we also come here.
When in CREATE/TRUNCATE (or DROP or RENAME or REPAIR) we have not called
external_lock(), so have no TRN. It does not matter, as all these
operations are non-transactional and sync their files.
/**
For this record to be of any use for Recovery, we need the upper
MySQL layer to be crash-safe, which it is not now (that would require
work using the ddl_log of sql/sql_table.cc); when it is, we should
reconsider the moment of writing this log record (before or after op,
under THR_LOCK_maria or not...), how to use it in Recovery.
For now this record can serve when we apply logs to a backup,
so we sync it. This happens before the data file is created. If the
data file was created before, and we crashed before writing the log
record, at restart the table may be used, so we would not have a
trustable history in the log (impossible to apply this log to a
backup). The way we do it, if we crash before writing the log record
then there is no data file and the table cannot be used.
@todo Note that in case of TRUNCATE TABLE we also come here; for
Recovery to be able to finish TRUNCATE TABLE, instead of leaving a
half-truncated table, we should log the record at start of
maria_create(); for that we shouldn't write to the index file but to a
buffer (DYNAMIC_STRING), put the buffer into the record, then put the
buffer into the index file (so, change _ma_keydef_write() etc). That
would also enable Recovery to finish a CREATE TABLE. The final result
would be that we would be able to finish what the SQL layer has asked
for: it would be atomic.
When in CREATE/TRUNCATE (or DROP or RENAME or REPAIR) we have not
called external_lock(), so have no TRN. It does not matter, as all
these operations are non-transactional and sync their files.
*/
if (unlikely(translog_write_record(&share.state.create_rename_lsn,
LOGREC_REDO_CREATE_TABLE,
......@@ -1016,6 +1024,20 @@ int maria_create(const char *name, enum data_file_type datafile_type,
goto err;
errpos=3;
/*
QQ: this sets data_file_length from 0 to 8192, but we wrote the state
already to the index file (because:
- log record is built from index header so state must be written before
log record
- data file must be created after log record, so that "missing log
record" implies "unusable table").
Thus, we below create a 8192-byte data file, but its recorded size is 0,
so next time we read the bitmap (a maria_write() for example) we'll
overwrite the bitmap we just created below.
It's not very efficient. Though there is no bug.
Why do we absolutely want to create a 8192-byte page for a freshly
created, empty table? Why don't we leave the data file empty?
*/
if (_ma_initialize_data_file(&share, dfile))
goto err;
}
......@@ -1159,11 +1181,14 @@ int _ma_initialize_data_file(MARIA_SHARE *share, File dfile)
{
if (share->data_file_type == BLOCK_RECORD)
{
if (my_chsize(dfile, share->base.block_size, 0, MYF(MY_WME)))
return 1;
share->state.state.data_file_length= share->base.block_size;
_ma_bitmap_delete_all(share);
share->bitmap.block_size= share->base.block_size;
share->bitmap.file.file = dfile;
return _ma_bitmap_create_first(share);
}
/*
So, in BLOCK_RECORD, a freshly created datafile is one page long; while in
other formats it is 0-byte long.
*/
return 0;
}
......
......@@ -64,7 +64,8 @@ int maria_delete_table(const char *name)
raid_type= info->s->base.raid_type;
raid_chunks= info->s->base.raid_chunks;
#endif
sync_dir= (info->s->now_transactional && !info->s->temporary) ?
sync_dir= (info->s->now_transactional && !info->s->temporary &&
!maria_in_recovery) ?
MY_SYNC_DIR : 0;
maria_close(info);
}
......@@ -85,7 +86,7 @@ int maria_delete_table(const char *name)
LSN lsn;
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char *)name;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= strlen(name);
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= strlen(name) + 1;
if (unlikely(translog_write_record(&lsn, LOGREC_REDO_DROP_TABLE,
&dummy_transaction_object, NULL,
log_array[TRANSLOG_INTERNAL_PARTS +
......
......@@ -181,10 +181,10 @@ static MARIA_SHARE **id_to_share= NULL;
static my_atomic_rwlock_t LOCK_id_to_share;
static my_bool write_hook_for_redo(enum translog_record_type type,
TRN *trn, LSN *lsn,
TRN *trn, MARIA_SHARE *share, LSN *lsn,
struct st_translog_parts *parts);
static my_bool write_hook_for_undo(enum translog_record_type type,
TRN *trn, LSN *lsn,
TRN *trn, MARIA_SHARE *share, LSN *lsn,
struct st_translog_parts *parts);
/*
......@@ -197,27 +197,27 @@ LOG_DESC log_record_type_descriptor[LOGREC_NUMBER_OF_TYPES];
static LOG_DESC INIT_LOGREC_FIXED_RECORD_0LSN_EXAMPLE=
{LOGRECTYPE_FIXEDLENGTH, 6, 6, NULL, NULL, NULL, 0,
"fixed0example", FALSE, NULL, NULL};
"fixed0example", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
static LOG_DESC INIT_LOGREC_VARIABLE_RECORD_0LSN_EXAMPLE=
{LOGRECTYPE_VARIABLE_LENGTH, 0, 9, NULL, NULL, NULL, 0,
"variable0example", FALSE, NULL, NULL};
"variable0example", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
static LOG_DESC INIT_LOGREC_FIXED_RECORD_1LSN_EXAMPLE=
{LOGRECTYPE_PSEUDOFIXEDLENGTH, 7, 7, NULL, NULL, NULL, 1,
"fixed1example", FALSE, NULL, NULL};
"fixed1example", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
static LOG_DESC INIT_LOGREC_VARIABLE_RECORD_1LSN_EXAMPLE=
{LOGRECTYPE_VARIABLE_LENGTH, 0, 12, NULL, NULL, NULL, 1,
"variable1example", FALSE, NULL, NULL};
"variable1example", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
static LOG_DESC INIT_LOGREC_FIXED_RECORD_2LSN_EXAMPLE=
{LOGRECTYPE_PSEUDOFIXEDLENGTH, 23, 23, NULL, NULL, NULL, 2,
"fixed2example", FALSE, NULL, NULL};
"fixed2example", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
static LOG_DESC INIT_LOGREC_VARIABLE_RECORD_2LSN_EXAMPLE=
{LOGRECTYPE_VARIABLE_LENGTH, 0, 19, NULL, NULL, NULL, 2,
"variable2example", FALSE, NULL, NULL};
"variable2example", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
void example_loghandler_init()
......@@ -239,157 +239,172 @@ void example_loghandler_init()
static LOG_DESC INIT_LOGREC_RESERVED_FOR_CHUNKS23=
{LOGRECTYPE_NOT_ALLOWED, 0, 0, NULL, NULL, NULL, 0,
"reserved", FALSE, NULL, NULL };
"reserved", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL };
static LOG_DESC INIT_LOGREC_REDO_INSERT_ROW_HEAD=
{LOGRECTYPE_VARIABLE_LENGTH, 0,
FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE, NULL,
write_hook_for_redo, NULL, 0,
"redo_insert_row_head", FALSE, NULL, NULL};
"redo_insert_row_head", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
static LOG_DESC INIT_LOGREC_REDO_INSERT_ROW_TAIL=
{LOGRECTYPE_VARIABLE_LENGTH, 0,
FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE, NULL,
write_hook_for_redo, NULL, 0,
"redo_insert_row_tail", FALSE, NULL, NULL};
"redo_insert_row_tail", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
static LOG_DESC INIT_LOGREC_REDO_INSERT_ROW_BLOB=
{LOGRECTYPE_VARIABLE_LENGTH, 0, 8, NULL, write_hook_for_redo, NULL, 0,
"redo_insert_row_blob", FALSE, NULL, NULL};
"redo_insert_row_blob", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
/*QQQ:TODO:header???*/
static LOG_DESC INIT_LOGREC_REDO_INSERT_ROW_BLOBS=
{LOGRECTYPE_VARIABLE_LENGTH, 0, FILEID_STORE_SIZE, NULL,
write_hook_for_redo, NULL, 0,
"redo_insert_row_blobs", FALSE, NULL, NULL};
"redo_insert_row_blobs", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
static LOG_DESC INIT_LOGREC_REDO_PURGE_ROW_HEAD=
{LOGRECTYPE_FIXEDLENGTH,
FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
NULL, write_hook_for_redo, NULL, 0,
"redo_purge_row_head", FALSE, NULL, NULL};
"redo_purge_row_head", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
static LOG_DESC INIT_LOGREC_REDO_PURGE_ROW_TAIL=
{LOGRECTYPE_FIXEDLENGTH,
FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
NULL, write_hook_for_redo, NULL, 0,
"redo_purge_row_tail", FALSE, NULL, NULL};
"redo_purge_row_tail", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
/* QQQ: TODO: variable and fixed size??? */
static LOG_DESC INIT_LOGREC_REDO_PURGE_BLOCKS=
{LOGRECTYPE_VARIABLE_LENGTH,
0,
FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE,
FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE +
PAGE_STORE_SIZE + PAGERANGE_STORE_SIZE,
FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE +
PAGE_STORE_SIZE + PAGERANGE_STORE_SIZE,
NULL, write_hook_for_redo, NULL, 0,
"redo_purge_blocks", FALSE, NULL, NULL};
"redo_purge_blocks", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
static LOG_DESC INIT_LOGREC_REDO_DELETE_ROW=
{LOGRECTYPE_FIXEDLENGTH, 16, 16, NULL, write_hook_for_redo, NULL, 0,
"redo_delete_row", FALSE, NULL, NULL};
"redo_delete_row", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
static LOG_DESC INIT_LOGREC_REDO_UPDATE_ROW_HEAD=
{LOGRECTYPE_VARIABLE_LENGTH, 0, 9, NULL, write_hook_for_redo, NULL, 0,
"redo_update_row_head", FALSE, NULL, NULL};
"redo_update_row_head", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
static LOG_DESC INIT_LOGREC_REDO_INDEX=
{LOGRECTYPE_VARIABLE_LENGTH, 0, 9, NULL, write_hook_for_redo, NULL, 0,
"redo_index", FALSE, NULL, NULL};
"redo_index", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
static LOG_DESC INIT_LOGREC_REDO_UNDELETE_ROW=
{LOGRECTYPE_FIXEDLENGTH, 16, 16, NULL, write_hook_for_redo, NULL, 0,
"redo_undelete_row", FALSE, NULL, NULL};
"redo_undelete_row", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
static LOG_DESC INIT_LOGREC_CLR_END=
{LOGRECTYPE_PSEUDOFIXEDLENGTH, 5, 5, NULL, write_hook_for_redo, NULL, 1,
"clr_end", TRUE, NULL, NULL};
"clr_end", LOGREC_LAST_IN_GROUP, NULL, NULL};
static LOG_DESC INIT_LOGREC_PURGE_END=
{LOGRECTYPE_PSEUDOFIXEDLENGTH, 5, 5, NULL, NULL, NULL, 1,
"purge_end", TRUE, NULL, NULL};
"purge_end", LOGREC_LAST_IN_GROUP, NULL, NULL};
static LOG_DESC INIT_LOGREC_UNDO_ROW_INSERT=
{LOGRECTYPE_FIXEDLENGTH,
LSN_STORE_SIZE + FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
LSN_STORE_SIZE + FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
NULL, write_hook_for_undo, NULL, 0,
"undo_row_insert", TRUE, NULL, NULL};
"undo_row_insert", LOGREC_LAST_IN_GROUP, NULL, NULL};
static LOG_DESC INIT_LOGREC_UNDO_ROW_DELETE=
{LOGRECTYPE_VARIABLE_LENGTH, 0,
LSN_STORE_SIZE + FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
NULL, write_hook_for_undo, NULL, 0,
"undo_row_delete", TRUE, NULL, NULL};
"undo_row_delete", LOGREC_LAST_IN_GROUP, NULL, NULL};
static LOG_DESC INIT_LOGREC_UNDO_ROW_UPDATE=
{LOGRECTYPE_VARIABLE_LENGTH, 0,
LSN_STORE_SIZE + FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
NULL, write_hook_for_undo, NULL, 1,
"undo_row_update", TRUE, NULL, NULL};
"undo_row_update", LOGREC_LAST_IN_GROUP, NULL, NULL};
static LOG_DESC INIT_LOGREC_UNDO_ROW_PURGE=
{LOGRECTYPE_PSEUDOFIXEDLENGTH, LSN_STORE_SIZE, LSN_STORE_SIZE,
NULL, NULL, NULL, 1,
"undo_row_purge", TRUE, NULL, NULL};
{LOGRECTYPE_PSEUDOFIXEDLENGTH, LSN_STORE_SIZE + FILEID_STORE_SIZE,
LSN_STORE_SIZE + FILEID_STORE_SIZE,
NULL, write_hook_for_undo, NULL, 1,
"undo_row_purge", LOGREC_LAST_IN_GROUP, NULL, NULL};
static LOG_DESC INIT_LOGREC_UNDO_KEY_INSERT=
{LOGRECTYPE_VARIABLE_LENGTH, 0, 10, NULL, write_hook_for_undo, NULL, 1,
"undo_key_insert", TRUE, NULL, NULL};
"undo_key_insert", LOGREC_LAST_IN_GROUP, NULL, NULL};
static LOG_DESC INIT_LOGREC_UNDO_KEY_DELETE=
{LOGRECTYPE_VARIABLE_LENGTH, 0, 15, NULL, write_hook_for_undo, NULL, 0,
"undo_key_delete", TRUE, NULL, NULL}; // QQ: why not compressed?
"undo_key_delete", LOGREC_LAST_IN_GROUP, NULL, NULL}; // QQ: why not compressed?
static LOG_DESC INIT_LOGREC_PREPARE=
{LOGRECTYPE_VARIABLE_LENGTH, 0, 0, NULL, NULL, NULL, 0,
"prepare", TRUE, NULL, NULL};
"prepare", LOGREC_IS_GROUP_ITSELF, NULL, NULL};
static LOG_DESC INIT_LOGREC_PREPARE_WITH_UNDO_PURGE=
{LOGRECTYPE_VARIABLE_LENGTH, 0, 5, NULL, NULL, NULL, 1,
"prepare_with_undo_purge", TRUE, NULL, NULL};
"prepare_with_undo_purge", LOGREC_IS_GROUP_ITSELF, NULL, NULL};
static LOG_DESC INIT_LOGREC_COMMIT=
{LOGRECTYPE_FIXEDLENGTH, 0, 0, NULL, NULL, NULL, 0,
"commit", TRUE, NULL, NULL};
{LOGRECTYPE_FIXEDLENGTH, 0, 0, NULL,
NULL, NULL, 0, "commit", LOGREC_IS_GROUP_ITSELF, NULL,
NULL};
static LOG_DESC INIT_LOGREC_COMMIT_WITH_UNDO_PURGE=
{LOGRECTYPE_PSEUDOFIXEDLENGTH, 5, 5, NULL, NULL, NULL, 1,
"commit_with_undo_purge", TRUE, NULL, NULL};
"commit_with_undo_purge", LOGREC_IS_GROUP_ITSELF, NULL, NULL};
static LOG_DESC INIT_LOGREC_CHECKPOINT=
{LOGRECTYPE_VARIABLE_LENGTH, 0, 0, NULL, NULL, NULL, 0,
"checkpoint", TRUE, NULL, NULL};
"checkpoint", LOGREC_IS_GROUP_ITSELF, NULL, NULL};
static LOG_DESC INIT_LOGREC_REDO_CREATE_TABLE=
{LOGRECTYPE_VARIABLE_LENGTH, 0, 1 + 2, NULL, NULL, NULL, 0,
"redo_create_table", TRUE, NULL, NULL};
"redo_create_table", LOGREC_IS_GROUP_ITSELF, NULL, NULL};
static LOG_DESC INIT_LOGREC_REDO_RENAME_TABLE=
{LOGRECTYPE_VARIABLE_LENGTH, 0, 0, NULL, NULL, NULL, 0,
"redo_rename_table", TRUE, NULL, NULL};
"redo_rename_table", LOGREC_IS_GROUP_ITSELF, NULL, NULL};
/**
@todo LOG BUG
the "1" below is a hack to overcome a bug in the log handler where a 0-byte
header is considered a read failure:
translog_read_record() calls translog_init_reader_data() which calls
translog_read_record_header_scan() which calls
translog_read_record_header_from_buffer() which calls
translog_variable_length_header() which returns 0 (normal);
translog_init_reader_data() considers this 0 as a problem,
and thus translog_read_record() fails.
*/
static LOG_DESC INIT_LOGREC_REDO_DROP_TABLE=
{LOGRECTYPE_VARIABLE_LENGTH, 0, 0, NULL, NULL, NULL, 0,
"redo_drop_table", TRUE, NULL, NULL};
{LOGRECTYPE_VARIABLE_LENGTH, 0, 1, NULL, NULL, NULL, 0,
"redo_drop_table", LOGREC_IS_GROUP_ITSELF, NULL, NULL};
static LOG_DESC INIT_LOGREC_REDO_DELETE_ALL=
{LOGRECTYPE_FIXEDLENGTH, FILEID_STORE_SIZE, FILEID_STORE_SIZE,
NULL, write_hook_for_redo, NULL, 0,
"redo_delete_all", TRUE, NULL, NULL};
"redo_delete_all", LOGREC_IS_GROUP_ITSELF, NULL, NULL};
static LOG_DESC INIT_LOGREC_REDO_REPAIR_TABLE=
{LOGRECTYPE_FIXEDLENGTH, FILEID_STORE_SIZE + 4, FILEID_STORE_SIZE + 4,
NULL, NULL, NULL, 0,
"redo_repair_table", TRUE, NULL, NULL};
"redo_repair_table", LOGREC_IS_GROUP_ITSELF, NULL, NULL};
static LOG_DESC INIT_LOGREC_FILE_ID=
{LOGRECTYPE_VARIABLE_LENGTH, 0, 2, NULL, NULL, NULL, 0,
"file_id", TRUE, NULL, NULL};
"file_id", LOGREC_IS_GROUP_ITSELF, NULL, NULL};
static LOG_DESC INIT_LOGREC_LONG_TRANSACTION_ID=
{LOGRECTYPE_FIXEDLENGTH, 6, 6, NULL, NULL, NULL, 0,
"long_transaction_id", TRUE, NULL, NULL};
"long_transaction_id", LOGREC_IS_GROUP_ITSELF, NULL, NULL};
const myf log_write_flags= MY_WME | MY_NABP | MY_WAIT_IF_FULL;
......@@ -3045,6 +3060,7 @@ static translog_size_t translog_get_current_group_size()
static my_bool
translog_write_variable_record_1group(LSN *lsn,
enum translog_record_type type,
MARIA_SHARE *share,
SHORT_TRANSACTION_ID short_trid,
struct st_translog_parts *parts,
struct st_translog_buffer
......@@ -3062,7 +3078,8 @@ translog_write_variable_record_1group(LSN *lsn,
*lsn= horizon= log_descriptor.horizon;
if (log_record_type_descriptor[type].inwrite_hook &&
(*log_record_type_descriptor[type].inwrite_hook)(type, trn, lsn, parts))
(*log_record_type_descriptor[type].inwrite_hook)(type, trn, share,
lsn, parts))
{
translog_unlock();
DBUG_RETURN(1);
......@@ -3199,6 +3216,7 @@ translog_write_variable_record_1group(LSN *lsn,
static my_bool
translog_write_variable_record_1chunk(LSN *lsn,
enum translog_record_type type,
MARIA_SHARE *share,
SHORT_TRANSACTION_ID short_trid,
struct st_translog_parts *parts,
struct st_translog_buffer
......@@ -3214,7 +3232,7 @@ translog_write_variable_record_1chunk(LSN *lsn,
*lsn= log_descriptor.horizon;
if (log_record_type_descriptor[type].inwrite_hook &&
(*log_record_type_descriptor[type].inwrite_hook)(type, trn,
(*log_record_type_descriptor[type].inwrite_hook)(type, trn, share,
lsn, parts))
{
translog_unlock();
......@@ -3567,6 +3585,7 @@ static my_bool translog_relative_LSN_encode(struct st_translog_parts *parts,
static my_bool
translog_write_variable_record_mgroup(LSN *lsn,
enum translog_record_type type,
MARIA_SHARE *share,
SHORT_TRANSACTION_ID short_trid,
struct st_translog_parts *parts,
struct st_translog_buffer
......@@ -3909,7 +3928,7 @@ translog_write_variable_record_mgroup(LSN *lsn,
first_chunk0= 0;
*lsn= horizon;
if (log_record_type_descriptor[type].inwrite_hook &&
(*log_record_type_descriptor[type].inwrite_hook) (type, trn,
(*log_record_type_descriptor[type].inwrite_hook) (type, trn, share,
lsn, parts))
goto err;
}
......@@ -3995,6 +4014,7 @@ translog_write_variable_record_mgroup(LSN *lsn,
static my_bool translog_write_variable_record(LSN *lsn,
enum translog_record_type type,
MARIA_SHARE *share,
SHORT_TRANSACTION_ID short_trid,
struct st_translog_parts *parts,
TRN *trn)
......@@ -4007,6 +4027,7 @@ static my_bool translog_write_variable_record(LSN *lsn,
/* Max number of such LSNs per record is 2 */
byte compressed_LSNs[MAX_NUMBER_OF_LSNS_PER_RECORD *
COMPRESSED_LSN_MAX_STORE_SIZE];
my_bool res;
DBUG_ENTER("translog_write_variable_record");
translog_lock();
......@@ -4071,9 +4092,11 @@ static my_bool translog_write_variable_record(LSN *lsn,
if (page_rest >= parts->record_length + header_length1)
{
/* following function makes translog_unlock(); */
DBUG_RETURN(translog_write_variable_record_1chunk(lsn, type, short_trid,
parts, buffer_to_flush,
header_length1, trn));
res= translog_write_variable_record_1chunk(lsn, type, share,
short_trid,
parts, buffer_to_flush,
header_length1, trn);
DBUG_RETURN(res);
}
buffer_rest= translog_get_current_group_size();
......@@ -4081,15 +4104,19 @@ static my_bool translog_write_variable_record(LSN *lsn,
if (buffer_rest >= parts->record_length + header_length1 - page_rest)
{
/* following function makes translog_unlock(); */
DBUG_RETURN(translog_write_variable_record_1group(lsn, type, short_trid,
parts, buffer_to_flush,
header_length1, trn));
res= translog_write_variable_record_1group(lsn, type, share,
short_trid,
parts, buffer_to_flush,
header_length1, trn);
DBUG_RETURN(res);
}
/* following function makes translog_unlock(); */
DBUG_RETURN(translog_write_variable_record_mgroup(lsn, type, short_trid,
parts, buffer_to_flush,
header_length1,
buffer_rest, trn));
res= translog_write_variable_record_mgroup(lsn, type, share,
short_trid,
parts, buffer_to_flush,
header_length1,
buffer_rest, trn);
DBUG_RETURN(res);
}
......@@ -4112,6 +4139,7 @@ static my_bool translog_write_variable_record(LSN *lsn,
static my_bool translog_write_fixed_record(LSN *lsn,
enum translog_record_type type,
MARIA_SHARE *share,
SHORT_TRANSACTION_ID short_trid,
struct st_translog_parts *parts,
TRN *trn)
......@@ -4164,7 +4192,7 @@ static my_bool translog_write_fixed_record(LSN *lsn,
*lsn= log_descriptor.horizon;
if (log_record_type_descriptor[type].inwrite_hook &&
(*log_record_type_descriptor[type].inwrite_hook) (type, trn,
(*log_record_type_descriptor[type].inwrite_hook) (type, trn, share,
lsn, parts))
{
rc= 1;
......@@ -4363,11 +4391,13 @@ my_bool translog_write_record(LSN *lsn,
{
switch (log_record_type_descriptor[type].class) {
case LOGRECTYPE_VARIABLE_LENGTH:
rc= translog_write_variable_record(lsn, type, short_trid, &parts, trn);
rc= translog_write_variable_record(lsn, type, share,
short_trid, &parts, trn);
break;
case LOGRECTYPE_PSEUDOFIXEDLENGTH:
case LOGRECTYPE_FIXEDLENGTH:
rc= translog_write_fixed_record(lsn, type, short_trid, &parts, trn);
rc= translog_write_fixed_record(lsn, type, share,
short_trid, &parts, trn);
break;
case LOGRECTYPE_NOT_ALLOWED:
default:
......@@ -4927,6 +4957,7 @@ translog_read_record_header_from_buffer(byte *page,
TRANSLOG_HEADER_BUFFER *buff,
TRANSLOG_SCANNER_DATA *scanner)
{
translog_size_t res;
DBUG_ENTER("translog_read_record_header_from_buffer");
DBUG_ASSERT((page[page_offset] & TRANSLOG_CHUNK_TYPE) ==
TRANSLOG_CHUNK_LSN ||
......@@ -4941,15 +4972,18 @@ translog_read_record_header_from_buffer(byte *page,
/* Read required bytes from the header and call hook */
switch (log_record_type_descriptor[buff->type].class) {
case LOGRECTYPE_VARIABLE_LENGTH:
DBUG_RETURN(translog_variable_length_header(page, page_offset, buff,
scanner));
res= translog_variable_length_header(page, page_offset, buff,
scanner);
break;
case LOGRECTYPE_PSEUDOFIXEDLENGTH:
case LOGRECTYPE_FIXEDLENGTH:
DBUG_RETURN(translog_fixed_length_header(page, page_offset, buff));
res= translog_fixed_length_header(page, page_offset, buff);
break;
default:
DBUG_ASSERT(0);
res= 0;
}
DBUG_RETURN(0); /* purecov: deadcode */
DBUG_RETURN(res);
}
......@@ -4979,7 +5013,7 @@ translog_size_t translog_read_record_header(LSN lsn,
TRANSLOG_HEADER_BUFFER *buff)
{
byte buffer[TRANSLOG_PAGE_SIZE], *page;
translog_size_t page_offset= LSN_OFFSET(lsn) % TRANSLOG_PAGE_SIZE;
translog_size_t res, page_offset= LSN_OFFSET(lsn) % TRANSLOG_PAGE_SIZE;
TRANSLOG_ADDRESS addr;
TRANSLOG_VALIDATOR_DATA data;
DBUG_ENTER("translog_read_record_header");
......@@ -4993,11 +5027,9 @@ translog_size_t translog_read_record_header(LSN lsn,
data.was_recovered= 0;
addr= lsn;
addr-= page_offset; /* offset decreasing */
if (!(page= translog_get_page(&data, buffer)))
DBUG_RETURN(0);
DBUG_RETURN(translog_read_record_header_from_buffer(page, page_offset,
buff, 0));
res= (!(page= translog_get_page(&data, buffer))) ? 0 :
translog_read_record_header_from_buffer(page, page_offset, buff, 0);
DBUG_RETURN(res);
}
......@@ -5030,6 +5062,7 @@ translog_read_record_header_scan(TRANSLOG_SCANNER_DATA
TRANSLOG_HEADER_BUFFER *buff,
my_bool move_scanner)
{
translog_size_t res;
DBUG_ENTER("translog_read_record_header_scan");
DBUG_PRINT("enter", ("Scanner: Cur: (%lu,0x%lx) Hrz: (%lu,0x%lx) "
"Lst: (%lu,0x%lx) Offset: %u(%x) fixed %d",
......@@ -5044,11 +5077,12 @@ translog_read_record_header_scan(TRANSLOG_SCANNER_DATA
buff->groups_no= 0;
buff->lsn= scanner->page_addr;
buff->lsn+= scanner->page_offset; /* offset increasing */
DBUG_RETURN(translog_read_record_header_from_buffer(scanner->page,
scanner->page_offset,
buff,
(move_scanner ?
scanner : 0)));
res= translog_read_record_header_from_buffer(scanner->page,
scanner->page_offset,
buff,
(move_scanner ?
scanner : 0));
DBUG_RETURN(res);
}
......@@ -5083,7 +5117,7 @@ translog_size_t translog_read_next_record_header(TRANSLOG_SCANNER_DATA
TRANSLOG_HEADER_BUFFER *buff)
{
uint8 chunk_type;
translog_size_t res;
buff->groups_no= 0; /* to be sure that we will free it right */
DBUG_ENTER("translog_read_next_record_header");
......@@ -5114,9 +5148,11 @@ translog_size_t translog_read_next_record_header(TRANSLOG_SCANNER_DATA
/* Last record was read */
buff->lsn= LSN_IMPOSSIBLE;
/* Return 'end of log' marker */
DBUG_RETURN(TRANSLOG_RECORD_HEADER_MAX_SIZE + 1);
res= TRANSLOG_RECORD_HEADER_MAX_SIZE + 1;
}
DBUG_RETURN(translog_read_record_header_scan(scanner, buff, 0));
else
res= translog_read_record_header_scan(scanner, buff, 0);
DBUG_RETURN(res);
}
......@@ -5610,7 +5646,9 @@ my_bool translog_flush(LSN lsn)
static my_bool write_hook_for_redo(enum translog_record_type type
__attribute__ ((unused)),
TRN *trn, LSN *lsn,
TRN *trn, MARIA_SHARE *share
__attribute__ ((unused)),
LSN *lsn,
struct st_translog_parts *parts
__attribute__ ((unused)))
{
......@@ -5646,7 +5684,9 @@ static my_bool write_hook_for_redo(enum translog_record_type type
static my_bool write_hook_for_undo(enum translog_record_type type
__attribute__ ((unused)),
TRN *trn, LSN *lsn,
TRN *trn, MARIA_SHARE *share
__attribute__ ((unused)),
LSN *lsn,
struct st_translog_parts *parts
__attribute__ ((unused)))
{
......
......@@ -289,7 +289,7 @@ typedef my_bool(*prewrite_rec_hook) (enum translog_record_type type,
struct st_translog_parts *parts);
typedef my_bool(*inwrite_rec_hook) (enum translog_record_type type,
TRN *trn,
TRN *trn, struct st_maria_share *share,
LSN *lsn,
struct st_translog_parts *parts);
......@@ -309,6 +309,11 @@ enum record_class
/* C++ can't bear that a variable's name is "class" */
#ifndef __cplusplus
enum enum_record_in_group {
LOGREC_NOT_LAST_IN_GROUP= 0, LOGREC_LAST_IN_GROUP, LOGREC_IS_GROUP_ITSELF
};
/*
Descriptor of log record type
Note: Don't reorder because of constructs later...
......@@ -338,7 +343,7 @@ typedef struct st_log_record_type_descriptor
/* the rest is for maria_read_log & Recovery */
/** @brief for debug error messages or "maria_read_log" command-line tool */
const char *name;
my_bool record_ends_group;
enum enum_record_in_group record_in_group;
/* a function to execute when we see the record during the REDO phase */
int (*record_execute_in_redo_phase)(const TRANSLOG_HEADER_BUFFER *);
/* a function to execute when we see the record during the UNDO phase */
......
/* Copyright (C) 2006 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
/* Copyright (C) 2006, 2007 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
......@@ -16,180 +16,1097 @@
/*
WL#3072 Maria recovery
First version written by Guilhem Bichot on 2006-04-27.
Does not compile yet.
*/
/* Here is the implementation of this module */
#include "page_cache.h"
#include "least_recently_dirtied.h"
#include "transaction.h"
#include "share.h"
#include "log.h"
typedef struct st_record_type_properties {
/* used for debug error messages or "maria_read_log" command-line tool: */
char *name,
my_bool record_ends_group;
/* a function to execute when we see the record during the REDO phase */
int (*record_execute_in_redo_phase)(RECORD *); /* param will be record header instead later */
/* a function to execute when we see the record during the UNDO phase */
int (*record_execute_in_undo_phase)(RECORD *); /* param will be record header instead later */
} RECORD_TYPE_PROPERTIES;
int no_op(RECORD *) {return 0};
RECORD_TYPE_PROPERTIES all_record_type_properties[]=
#include "maria_def.h"
#include "ma_recovery.h"
#include "ma_blockrec.h"
struct TRN_FOR_RECOVERY
{
/* listed here in the order of the "log records type" enumeration */
{"REDO_INSERT_HEAD", FALSE, redo_insert_head_execute_in_redo_phase, no_op},
...,
{"UNDO_INSERT" , TRUE , undo_insert_execute_in_redo_phase, undo_insert_execute_in_undo_phase},
{"COMMIT", , TRUE , commit_execute_in_redo_phase, no_op},
...
LSN group_start_lsn, undo_lsn;
TrID long_trid;
};
int redo_insert_head_execute_in_redo_phase(RECORD *record)
{
/* write the data to the proper page */
}
/* Variables used by all functions of this module. Ok as single-threaded */
static struct TRN_FOR_RECOVERY *all_active_trans;
static MARIA_HA **all_tables;
static LSN current_group_end_lsn;
FILE *tracef; /**< trace file for debugging */
int undo_insert_execute_in_redo_phase(RECORD *record)
{
trans_table[short_trans_id].undo_lsn= record.lsn;
/* don't restore the old version of the row */
}
#define prototype_exec_hook(R) \
static int exec_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec)
prototype_exec_hook(LONG_TRANSACTION_ID);
#ifdef MARIA_CHECKPOINT
prototype_exec_hook(CHECKPOINT);
#endif
prototype_exec_hook(REDO_CREATE_TABLE);
prototype_exec_hook(REDO_DROP_TABLE);
prototype_exec_hook(FILE_ID);
prototype_exec_hook(REDO_INSERT_ROW_HEAD);
prototype_exec_hook(REDO_INSERT_ROW_TAIL);
prototype_exec_hook(REDO_PURGE_ROW_HEAD);
prototype_exec_hook(REDO_PURGE_ROW_TAIL);
prototype_exec_hook(REDO_PURGE_BLOCKS);
prototype_exec_hook(REDO_DELETE_ALL);
prototype_exec_hook(UNDO_ROW_INSERT);
prototype_exec_hook(UNDO_ROW_DELETE);
prototype_exec_hook(UNDO_ROW_PURGE);
prototype_exec_hook(COMMIT);
static int end_of_redo_phase();
static void display_record_position(const LOG_DESC *log_desc,
const TRANSLOG_HEADER_BUFFER *rec,
uint number);
static int display_and_apply_record(const LOG_DESC *log_desc,
const TRANSLOG_HEADER_BUFFER *rec);
static MARIA_HA *get_MARIA_HA_from_REDO_record(const
TRANSLOG_HEADER_BUFFER *rec);
static MARIA_HA *get_MARIA_HA_from_UNDO_record(const
TRANSLOG_HEADER_BUFFER *rec);
static int close_recovered_table(MARIA_HA *info);
int undo_insert_execute_in_undo_phase(RECORD *record)
{
/* restore the old version of the row */
trans_table[short_trans_id].undo_lsn= record.prev_undo_lsn;
}
int commit_execute_in_redo_phase(RECORD *record)
/** @brief global [out] buffer for translog_read_record(); never shrinks */
static LEX_STRING log_record_buffer;
#define enlarge_buffer(rec) \
if (log_record_buffer.length < rec->record_length) \
{ \
log_record_buffer.length= rec->record_length; \
log_record_buffer.str= my_realloc(log_record_buffer.str, \
rec->record_length, MYF(MY_WME)); \
}
#define ALERT_USER() DBUG_ASSERT(0)
/**
@brief Recovers from the last checkpoint
*/
int maria_recover()
{
trans_table[short_trans_id].state= COMMITTED;
my_bool res= TRUE;
LSN from_lsn;
FILE *trace_file;
DBUG_ENTER("maria_recover");
DBUG_ASSERT(!maria_in_recovery);
maria_in_recovery= TRUE;
if (last_checkpoint_lsn == LSN_IMPOSSIBLE)
from_lsn= first_lsn_in_log();
else
{
DBUG_ASSERT(0); /* not yet implemented */
/**
@todo read the checkpoint record, fill structures
and use the minimum of checkpoint_start_lsn, rec_lsn of trns, rec_lsn
of dirty pages.
*/
//from_lsn= something;
}
/*
and that's all: the delete/update handler should not be woken up! as there
may be REDO for purge further in the log.
mysqld has not yet initialized any page cache. Let's create a dedicated
one for recovery.
*/
if ((trace_file= fopen("maria_recovery.trace", "w")))
{
fprintf(trace_file, "TRACE of the last MARIA recovery from mysqld\n");
res= (init_pagecache(maria_pagecache,
/** @todo what size? */
1024*1024,
0, 0,
maria_block_size) == 0) ||
maria_apply_log(from_lsn, TRUE, trace_file);
end_pagecache(maria_pagecache, TRUE);
if (!res)
fprintf(trace_file, "SUCCESS\n");
fclose(trace_file);
}
/**
@todo take checkpoint if log applying did some work.
Be sure to not checkpoint if no work.
*/
maria_in_recovery= FALSE;
DBUG_RETURN(res);
}
#define record_ends_group(R) \
all_record_type_properties[(R)->type].record_ends_group)
#define execute_log_record_in_redo_phase(R) \
all_record_type_properties[(R).type].record_execute_in_redo_phase(R)
/**
@brief Displays and/or applies the log
@param lsn LSN from which log reading/applying should start
@param apply if log records should be applied or not
@param trace_file trace file where progress/debug messages will go
@todo This trace_file thing is primitive; soon we will make it similar to
ma_check_print_warning() etc, and a successful recovery does not need to
create a trace file. But for debugging now it is useful.
@return Operation status
@retval 0 OK
@retval !=0 Error
*/
int recovery()
int maria_apply_log(LSN lsn, my_bool apply, FILE *trace_file)
{
control_file_create_or_open();
/*
init log handler: tell it that we are going to do large reads of the
log, sequential and backward. Log handler could decide to alloc a big
read-only IO_CACHE for this, or use its usual page cache.
*/
int error= 0;
DBUG_ENTER("maria_apply_log");
/* read checkpoint log record from log handler */
RECORD *checkpoint_record= log_read_record(last_checkpoint_lsn_at_start);
DBUG_ASSERT(!maria_multi_threaded);
all_active_trans= (struct TRN_FOR_RECOVERY *)
my_malloc((SHORT_TRID_MAX + 1) * sizeof(struct TRN_FOR_RECOVERY),
MYF(MY_ZEROFILL));
all_tables= (MARIA_HA **)my_malloc((SHARE_ID_MAX + 1) * sizeof(MARIA_HA *),
MYF(MY_ZEROFILL));
if (!all_active_trans || !all_tables)
goto err;
/* parse this record, build structs (dirty_pages, transactions table, file_map) */
/*
read log records (note: sometimes only the header is needed, for ex during
REDO phase only the header of UNDO is needed, not the 4G blob in the
variable-length part, so I could use that; however for PREPARE (which is a
variable-length record) I'll need to read the full record in the REDO
phase):
*/
tracef= trace_file;
/* install hooks for execution */
#define install_exec_hook(R) \
log_record_type_descriptor[LOGREC_ ## R].record_execute_in_redo_phase= \
exec_LOGREC_ ## R;
install_exec_hook(LONG_TRANSACTION_ID);
#ifdef MARIA_CHECKPOINT
install_exec_hook(CHECKPOINT);
#endif
install_exec_hook(REDO_CREATE_TABLE);
install_exec_hook(REDO_DROP_TABLE);
install_exec_hook(FILE_ID);
install_exec_hook(REDO_INSERT_ROW_HEAD);
install_exec_hook(REDO_INSERT_ROW_TAIL);
install_exec_hook(REDO_PURGE_ROW_HEAD);
install_exec_hook(REDO_PURGE_ROW_TAIL);
install_exec_hook(REDO_PURGE_BLOCKS);
install_exec_hook(REDO_DELETE_ALL);
install_exec_hook(UNDO_ROW_INSERT);
install_exec_hook(UNDO_ROW_DELETE);
install_exec_hook(UNDO_ROW_PURGE);
install_exec_hook(COMMIT);
/**** REDO PHASE *****/
current_group_end_lsn= LSN_IMPOSSIBLE;
record= log_read_record(min(rec_lsn, ...)); /* later, read only header */
TRANSLOG_HEADER_BUFFER rec;
struct st_translog_scanner_data scanner;
uint i= 1;
/*
if log handler knows the end LSN of the log, we could print here how many
MB of log we have to read (to give an idea of the time), and print
progress notes.
*/
translog_size_t len= translog_read_record_header(lsn, &rec);
/** @todo translog_read_record_header() should be fixed for 0-byte headers */
if (len == 0) /* means error, but apparently EOF too */
{
fprintf(tracef, "empty log\n");
goto end;
}
while (record != NULL)
if (translog_init_scanner(lsn, 1, &scanner))
{
fprintf(tracef, "Scanner init failed\n");
goto err;
}
for (;;i++)
{
uint16 sid= rec.short_trid;
const LOG_DESC *log_desc= &log_record_type_descriptor[rec.type];
display_record_position(log_desc, &rec, i);
/*
A complete group is a set of log records with an "end mark" record
(e.g. a set of REDOs for an operation, terminated by an UNDO for this
operation); if there is no "end mark" record the group is incomplete
and won't be executed.
*/
if (record_ends_group(record)
if ((log_desc->record_in_group == LOGREC_IS_GROUP_ITSELF) ||
(log_desc->record_in_group == LOGREC_LAST_IN_GROUP))
{
if (trans_table[record.short_trans_id].group_start_lsn != 0)
if (all_active_trans[sid].group_start_lsn != LSN_IMPOSSIBLE)
{
/*
There is a complete group for this transaction, containing more than
this event.
We're going to read recently read log records:
for this log_read_record() to be efficient (not touch the disk),
log handler could cache recently read pages
(can just use an IO_CACHE of 10 MB to read the log, or the normal
log handler page cache).
Without it only OS file cache will help.
*/
record2=
log_read_record(trans_table[record.short_trans_id].group_start_lsn);
do
if (log_desc->record_in_group == LOGREC_IS_GROUP_ITSELF)
{
/*
can happen if the transaction got a table write error, then
unlocked tables thus wrote a COMMIT record.
*/
fprintf(tracef, "\nDiscarding unfinished group before this record\n");
ALERT_USER();
all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
}
else
{
if (record2.short_trans_id == record.short_trans_id)
execute_log_record_in_redo_phase(record2); /* it's in our group */
record2= log_read_next_record();
/*
There is a complete group for this transaction, containing more
than this event.
*/
fprintf(tracef, " ends a group:\n");
struct st_translog_scanner_data scanner2;
TRANSLOG_HEADER_BUFFER rec2;
len=
translog_read_record_header(all_active_trans[sid].group_start_lsn, &rec2);
if (len == (TRANSLOG_RECORD_HEADER_MAX_SIZE + 1))
{
fprintf(tracef, "Cannot find record where it should be\n");
goto err;
}
if (translog_init_scanner(rec2.lsn, 1, &scanner2))
{
fprintf(tracef, "Scanner2 init failed\n");
goto err;
}
current_group_end_lsn= rec.lsn;
do
{
if (rec2.short_trid == sid) /* it's in our group */
{
const LOG_DESC *log_desc2= &log_record_type_descriptor[rec2.type];
display_record_position(log_desc2, &rec2, 0);
if (apply && display_and_apply_record(log_desc2, &rec2))
goto err;
}
len= translog_read_next_record_header(&scanner2, &rec2);
if (len == (TRANSLOG_RECORD_HEADER_MAX_SIZE + 1))
{
fprintf(tracef, "Cannot find record where it should be\n");
goto err;
}
}
while (rec2.lsn < rec.lsn);
translog_free_record_header(&rec2);
/* group finished */
all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
current_group_end_lsn= LSN_IMPOSSIBLE; /* for debugging */
display_record_position(log_desc, &rec, 0);
}
while (record2.lsn < record.lsn);
trans_table[record.short_trans_id].group_start_lsn= 0; /* group finished */
}
execute_log_record_in_redo_phase(record);
if (apply && display_and_apply_record(log_desc, &rec))
goto err;
}
else /* record does not end group */
{
/* just record the fact, can't know if can execute yet */
if (trans_table[short_trans_id].group_start_lsn == 0) /* group not yet started */
trans_table[short_trans_id].group_start_lsn= record.lsn;
if (all_active_trans[sid].group_start_lsn == LSN_IMPOSSIBLE)
{
/* group not yet started */
all_active_trans[sid].group_start_lsn= rec.lsn;
}
}
len= translog_read_next_record_header(&scanner, &rec);
if (len == (TRANSLOG_RECORD_HEADER_MAX_SIZE + 1))
{
fprintf(tracef, "EOF on the log\n");
break;
}
}
translog_free_record_header(&rec);
/*
So we have applied all REDOs.
We may now have unfinished transactions.
I don't think it's this program's job to roll them back:
to roll back and at the same time stay idempotent, it needs to write log
records (without CLRs, 2nd rollback would hit the effects of first
rollback and fail). But this standalone tool is not allowed to write to
the server's transaction log. So we do not roll back anything.
In the real Recovery code, or the code to do "recover after online
backup", yes we will roll back.
*/
if (end_of_redo_phase())
goto err;
goto end;
err:
error= 1;
fprintf(tracef, "Recovery of tables with transaction logs FAILED\n");
end:
my_free((gptr)all_tables, MYF(MY_ALLOW_ZERO_PTR));
my_free((gptr)all_active_trans, MYF(MY_ALLOW_ZERO_PTR));
my_free(log_record_buffer.str, MYF(MY_ALLOW_ZERO_PTR));
log_record_buffer.str= NULL;
log_record_buffer.length= 0;
DBUG_RETURN(error);
}
/* very basic info about the record's header */
static void display_record_position(const LOG_DESC *log_desc,
const TRANSLOG_HEADER_BUFFER *rec,
uint number)
{
/*
if number==0, we're going over records which we had already seen and which
form a group, so we indent below the group's end record
*/
fprintf(tracef, "%sRec#%u LSN (%lu,0x%lx) short_trid %u %s(num_type:%u) len %lu\n",
number ? "" : " ", number,
(ulong) LSN_FILE_NO(rec->lsn), (ulong) LSN_OFFSET(rec->lsn),
rec->short_trid, log_desc->name, rec->type,
(ulong)rec->record_length);
}
static int display_and_apply_record(const LOG_DESC *log_desc,
const TRANSLOG_HEADER_BUFFER *rec)
{
int error;
if (log_desc->record_execute_in_redo_phase == NULL)
{
/* die on all not-yet-handled records :) */
DBUG_ASSERT("one more hook" == "to write");
return 1;
}
if ((error= (*log_desc->record_execute_in_redo_phase)(rec)))
fprintf(tracef, "Got error when executing record\n");
return error;
}
prototype_exec_hook(LONG_TRANSACTION_ID)
{
uint16 sid= rec->short_trid;
TrID long_trid= all_active_trans[sid].long_trid;
/* abort group of this trn (must be of before a crash) */
LSN gslsn= all_active_trans[sid].group_start_lsn;
char llbuf[22];
if (gslsn != LSN_IMPOSSIBLE)
{
fprintf(tracef, "Group at LSN (%lu,0x%lx) short_trid %u aborted\n",
(ulong) LSN_FILE_NO(gslsn), (ulong) LSN_OFFSET(gslsn), sid);
all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
}
if (long_trid != 0)
{
LSN ulsn= all_active_trans[sid].undo_lsn;
if (ulsn != LSN_IMPOSSIBLE)
{
llstr(long_trid, llbuf);
fprintf(tracef, "Found an old transaction long_trid %s short_trid %u"
" with same short id as this new transaction, and has neither"
" committed nor rollback (undo_lsn: (%lu,0x%lx))\n", llbuf,
sid, (ulong) LSN_FILE_NO(ulsn), (ulong) LSN_OFFSET(ulsn));
goto err;
}
}
long_trid= uint6korr(rec->header);
all_active_trans[sid].long_trid= long_trid;
llstr(long_trid, llbuf);
fprintf(tracef, "Transaction long_trid %s short_trid %u starts\n", llbuf, sid);
goto end;
err:
ALERT_USER();
return 1;
end:
return 0;
}
#ifdef MARIA_CHECKPOINT
prototype_exec_hook(CHECKPOINT)
{
/* the only checkpoint we care about was found via control file, ignore */
return 0;
}
#endif
prototype_exec_hook(REDO_CREATE_TABLE)
{
File dfile= -1, kfile= -1;
char *linkname_ptr, filename[FN_REFLEN];
char *name, *ptr;
myf create_flag;
uint flags;
int error= 1, create_mode= O_RDWR | O_TRUNC;
MARIA_HA *info= NULL;
enlarge_buffer(rec);
if (log_record_buffer.str == NULL ||
translog_read_record(rec->lsn, 0, rec->record_length,
log_record_buffer.str, NULL) !=
rec->record_length)
{
fprintf(tracef, "Failed to read record\n");
goto end;
}
name= log_record_buffer.str;
fprintf(tracef, "Table '%s'", name);
/* we try hard to get create_rename_lsn, to avoid mistakes if possible */
info= maria_open(name, O_RDONLY, HA_OPEN_FOR_REPAIR);
if (info)
{
MARIA_SHARE *share= info->s;
/* check that we're not already using it */
DBUG_ASSERT(share->reopen == 1);
DBUG_ASSERT(share->now_transactional == share->base.born_transactional);
if (!share->base.born_transactional)
{
/*
could be that transactional table was later dropped, and a non-trans
one was renamed to its name, thus create_rename_lsn is 0 and should
not be trusted.
*/
fprintf(tracef, ", is not transactional\n");
ALERT_USER();
error= 0;
goto end;
}
if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0)
{
fprintf(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than record",
(ulong) LSN_FILE_NO(rec->lsn),
(ulong) LSN_OFFSET(rec->lsn));
error= 0;
goto end;
}
if (maria_is_crashed(info))
{
fprintf(tracef, ", is crashed, overwriting it");
ALERT_USER();
}
maria_close(info);
info= NULL;
}
/* if does not exist, is older, or its header is corrupted, overwrite it */
// TODO symlinks
ptr= name + strlen(name) + 1;
if ((flags= ptr[0] ? HA_DONT_TOUCH_DATA : 0))
fprintf(tracef, ", we will only touch index file");
fn_format(filename, name, "", MARIA_NAME_IEXT,
(MY_UNPACK_FILENAME |
(flags & HA_DONT_TOUCH_DATA) ? MY_RETURN_REAL_PATH : 0) |
MY_APPEND_EXT);
linkname_ptr= NULL;
create_flag= MY_DELETE_OLD;
fprintf(tracef, ", creating as '%s'", filename);
if ((kfile= my_create_with_symlink(linkname_ptr, filename, 0, create_mode,
MYF(MY_WME|create_flag))) < 0)
{
fprintf(tracef, "Failed to create index file\n");
goto end;
}
ptr++;
uint kfile_size_before_extension= uint2korr(ptr);
ptr+= 2;
uint keystart= uint2korr(ptr);
ptr+= 2;
/* set create_rename_lsn (for maria_read_log to be idempotent) */
lsn_store(ptr + sizeof(info->s->state.header) + 2, rec->lsn);
/* we also set is_of_lsn, like maria_create() does */
lsn_store(ptr + sizeof(info->s->state.header) + 2 + LSN_STORE_SIZE,
rec->lsn);
if (my_pwrite(kfile, ptr,
kfile_size_before_extension, 0, MYF(MY_NABP|MY_WME)) ||
my_chsize(kfile, keystart, 0, MYF(MY_WME)))
{
fprintf(tracef, "Failed to write to index file\n");
goto end;
}
if (!(flags & HA_DONT_TOUCH_DATA))
{
fn_format(filename,name,"", MARIA_NAME_DEXT,
MY_UNPACK_FILENAME | MY_APPEND_EXT);
linkname_ptr= NULL;
create_flag=MY_DELETE_OLD;
if (((dfile=
my_create_with_symlink(linkname_ptr, filename, 0, create_mode,
MYF(MY_WME | create_flag))) < 0) ||
my_close(dfile, MYF(MY_WME)))
{
fprintf(tracef, "Failed to create data file\n");
goto end;
}
/*
Later we can optimize: instead of "execute_log_record(record2)", do
copy_record_into_exec_buffer(record2):
this will just copy record into a multi-record (10 MB?) memory buffer,
and when buffer is full, will do sorting of REDOs per
page id and execute them.
This sorting will enable us to do more sequential reads of the
data/index pages.
Note that updating bitmap pages (when we have executed a REDO for a page
we update its bitmap page) may break the sequential read of pages,
so maybe we should read and cache bitmap pages in the beginning.
Or ok the sequence will be broken, but quickly all bitmap pages will be
in memory and so the sequence will not be broken anymore.
Sorting could even determine, based on physical device of files
("st_dev" in stat()), that some files should be should be taken by
different threads, if we want to do parallism.
we now have an empty data file. To be able to
_ma_initialize_data_file() we need some pieces of the share to be
correctly filled. So we just open the table (fortunately, an empty
data file does not preclude this).
*/
if (((info= maria_open(name, O_RDONLY, 0)) == NULL) ||
_ma_initialize_data_file(info->s, info->dfile.file))
{
fprintf(tracef, "Failed to open new table or write to data file\n");
goto end;
}
}
error= 0;
end:
fprintf(tracef, "\n");
if (kfile >= 0)
error|= my_close(kfile, MYF(MY_WME));
if (info != NULL)
error|= maria_close(info);
return error;
}
prototype_exec_hook(REDO_DROP_TABLE)
{
char *name;
int error= 1;
MARIA_HA *info= NULL;
enlarge_buffer(rec);
if (log_record_buffer.str == NULL ||
translog_read_record(rec->lsn, 0, rec->record_length,
log_record_buffer.str, NULL) !=
rec->record_length)
{
fprintf(tracef, "Failed to read record\n");
goto end;
}
name= log_record_buffer.str;
fprintf(tracef, "Table '%s'", name);
info= maria_open(name, O_RDONLY, HA_OPEN_FOR_REPAIR);
if (info)
{
MARIA_SHARE *share= info->s;
/*
We may have open instances on this table. But it does not matter, the
maria_extra() below will take care of them.
*/
if (!share->base.born_transactional)
{
fprintf(tracef, ", is not transactional\n");
ALERT_USER();
error= 0;
goto end;
}
if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0)
{
fprintf(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than record",
(ulong) LSN_FILE_NO(rec->lsn),
(ulong) LSN_OFFSET(rec->lsn));
error= 0;
goto end;
}
if (maria_is_crashed(info))
{
fprintf(tracef, ", is crashed, dropping it");
ALERT_USER();
}
/*
Here's how to read a complete variable-length record if needed:
<sanja> read the header, allocate buffer of record length, read whole
record.
This maria_extra() call serves to signal that old open instances of
this table should not be used anymore, and (only on Windows) to close
open files so they can be deleted
*/
record= log_read_next_record();
if (maria_extra(info, HA_EXTRA_PREPARE_FOR_DELETE, NULL) ||
maria_close(info))
goto end;
info= NULL;
}
/* if does not exist, is older, or its header is corrupted, drop it */
fprintf(tracef, ", dropping '%s'", name);
if (maria_delete_table(name))
{
fprintf(tracef, "Failed to drop table\n");
goto end;
}
error= 0;
end:
fprintf(tracef, "\n");
if (info != NULL)
error|= maria_close(info);
return error;
}
prototype_exec_hook(FILE_ID)
{
uint16 sid;
int error= 1;
char *name, *buff;
MARIA_HA *info= NULL;
MARIA_SHARE *share;
enlarge_buffer(rec);
if (log_record_buffer.str == NULL ||
translog_read_record(rec->lsn, 0, rec->record_length,
log_record_buffer.str, NULL) !=
rec->record_length)
{
fprintf(tracef, "Failed to read record\n");
goto end;
}
buff= log_record_buffer.str;
sid= fileid_korr(buff);
name= buff + FILEID_STORE_SIZE;
info= all_tables[sid];
if (info != NULL)
{
all_tables[sid]= NULL;
if (close_recovered_table(info))
{
fprintf(tracef, "Failed to close table\n");
goto end;
}
}
fprintf(tracef, "Table '%s', id %u", name, sid);
info= maria_open(name, O_RDWR, HA_OPEN_FOR_REPAIR);
if (info == NULL)
{
fprintf(tracef, ", is absent (must have been dropped later?)"
" or its header is so corrupted that we cannot open it;"
" we skip it\n");
error= 0;
goto end;
}
if (maria_is_crashed(info))
{
fprintf(tracef, "Table is crashed, can't apply log records to it\n");
goto end;
/*
we should make an exception for REDO_REPAIR_TABLE records: if we want to
execute them, we should not reject the crashed table here.
*/
}
share= info->s;
/* check that we're not already using it */
DBUG_ASSERT(share->reopen == 1);
DBUG_ASSERT(share->now_transactional == share->base.born_transactional);
if (!share->base.born_transactional)
{
fprintf(tracef, ", is not transactional\n");
ALERT_USER();
error= 0;
goto end;
}
all_tables[sid]= info;
/* don't log any records for this work */
_ma_tmp_disable_logging_for_table(share);
/* execution of some REDO records relies on data_file_length */
my_off_t dfile_len= my_seek(info->dfile.file, 0, SEEK_END, MYF(MY_WME));
my_off_t kfile_len= my_seek(info->s->kfile.file, 0, SEEK_END, MYF(MY_WME));
if ((dfile_len == MY_FILEPOS_ERROR) ||
(kfile_len == MY_FILEPOS_ERROR))
{
fprintf(tracef, ", length unknown\n");
goto end;
}
share->state.state.data_file_length= dfile_len;
share->state.state.key_file_length= kfile_len;
if ((dfile_len == 0) || ((dfile_len % share->block_size) > 0))
{
fprintf(tracef, ", has too short last page\n");
/* Recovery will fix this, no error */
ALERT_USER();
}
fprintf(tracef, ", opened\n");
error= 0;
end:
if (error && info != NULL)
error|= maria_close(info);
return error;
}
prototype_exec_hook(REDO_INSERT_ROW_HEAD)
{
int error= 1;
byte *buff= NULL;
MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
if (info == NULL)
goto end;
/*
If REDO's LSN is > page's LSN (read from disk), we are going to modify the
page and change its LSN. The normal runtime code stores the UNDO's LSN
into the page. Here storing the REDO's LSN (rec->lsn) would work
(we are not writing to the log here, so don't have to "flush up to UNDO's
LSN"). But in a test scenario where we do updates at runtime, then remove
tables, apply the log and check that this results in the same table as at
runtime, putting the same LSN as runtime had done will decrease
differences. So we use the UNDO's LSN which is current_group_end_lsn.
*/
enlarge_buffer(rec);
if (log_record_buffer.str == NULL ||
translog_read_record(rec->lsn, 0, rec->record_length,
log_record_buffer.str, NULL) !=
rec->record_length)
{
fprintf(tracef, "Failed to read record\n");
goto end;
}
buff= log_record_buffer.str;
if (_ma_apply_redo_insert_row_head_or_tail(info, current_group_end_lsn,
HEAD_PAGE,
buff + FILEID_STORE_SIZE,
buff +
FILEID_STORE_SIZE +
PAGE_STORE_SIZE +
DIRPOS_STORE_SIZE,
rec->record_length -
(FILEID_STORE_SIZE +
PAGE_STORE_SIZE +
DIRPOS_STORE_SIZE)))
goto end;
error= 0;
end:
return error;
}
prototype_exec_hook(REDO_INSERT_ROW_TAIL)
{
int error= 1;
byte *buff= NULL;
MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
if (info == NULL)
goto end;
enlarge_buffer(rec);
if (log_record_buffer.str == NULL ||
translog_read_record(rec->lsn, 0, rec->record_length,
log_record_buffer.str, NULL) !=
rec->record_length)
{
fprintf(tracef, "Failed to read record\n");
goto end;
}
buff= log_record_buffer.str;
if (_ma_apply_redo_insert_row_head_or_tail(info, current_group_end_lsn,
TAIL_PAGE,
buff + FILEID_STORE_SIZE,
buff +
FILEID_STORE_SIZE +
PAGE_STORE_SIZE +
DIRPOS_STORE_SIZE,
rec->record_length -
(FILEID_STORE_SIZE +
PAGE_STORE_SIZE +
DIRPOS_STORE_SIZE)))
goto end;
error= 0;
end:
return error;
}
prototype_exec_hook(REDO_PURGE_ROW_HEAD)
{
int error= 1;
MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
if (info == NULL)
goto end;
if (_ma_apply_redo_purge_row_head_or_tail(info, current_group_end_lsn,
HEAD_PAGE,
rec->header + FILEID_STORE_SIZE))
goto end;
error= 0;
end:
return error;
}
prototype_exec_hook(REDO_PURGE_ROW_TAIL)
{
int error= 1;
MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
if (info == NULL)
goto end;
if (_ma_apply_redo_purge_row_head_or_tail(info, current_group_end_lsn,
TAIL_PAGE,
rec->header + FILEID_STORE_SIZE))
goto end;
error= 0;
end:
return error;
}
prototype_exec_hook(REDO_PURGE_BLOCKS)
{
int error= 1;
MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
if (info == NULL)
goto end;
if (_ma_apply_redo_purge_blocks(info, current_group_end_lsn,
rec->header + FILEID_STORE_SIZE))
goto end;
error= 0;
end:
return error;
}
prototype_exec_hook(REDO_DELETE_ALL)
{
int error= 1;
MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
if (info == NULL)
goto end;
fprintf(tracef, " deleting all %lu rows\n",
(ulong)info->s->state.state.records);
if (maria_delete_all_rows(info))
goto end;
error= 0;
end:
return error;
}
prototype_exec_hook(UNDO_ROW_INSERT)
{
int error= 1;
MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
if (info == NULL)
goto end;
all_active_trans[rec->short_trid].undo_lsn= rec->lsn;
/*
todo: instead of above, call write_hook_for_undo, it will also set
first_undo_lsn
*/
/*
in an upcoming patch ("recovery of the state"), we introduce
state.is_of_lsn. For now, we just assume the state is old (true when we
recreate tables from scratch - but not idempotent).
*/
{
fprintf(tracef, " state older than record, updating rows' count\n");
info->s->state.state.records++;
}
fprintf(tracef, " rows' count %lu\n", (ulong)info->s->state.state.records);
error= 0;
end:
return error;
}
prototype_exec_hook(UNDO_ROW_DELETE)
{
int error= 1;
MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
if (info == NULL)
goto end;
all_active_trans[rec->short_trid].undo_lsn= rec->lsn;
/*
todo: instead of above, call write_hook_for_undo, it will also set
first_undo_lsn
*/
{
fprintf(tracef, " state older than record, updating rows' count\n");
info->s->state.state.records--;
}
fprintf(tracef, " rows' count %lu\n", (ulong)info->s->state.state.records);
error= 0;
end:
return error;
}
prototype_exec_hook(UNDO_ROW_PURGE)
{
int error= 1;
MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
if (info == NULL)
goto end;
/* this a bit broken, but this log record type will be deleted soon */
all_active_trans[rec->short_trid].undo_lsn= rec->lsn;
/*
todo: instead of above, call write_hook_for_undo, it will also set
first_undo_lsn
*/
{
fprintf(tracef, " state older than record, updating rows' count\n");
info->s->state.state.records--;
}
fprintf(tracef, " rows' count %lu\n", (ulong)info->s->state.state.records);
error= 0;
end:
return error;
}
prototype_exec_hook(COMMIT)
{
uint16 sid= rec->short_trid;
TrID long_trid= all_active_trans[sid].long_trid;
LSN gslsn= all_active_trans[sid].group_start_lsn;
char llbuf[22];
if (long_trid == 0)
{
fprintf(tracef, "We don't know about transaction with short_trid %u;"
"it probably committed long ago, forget it\n", sid);
return 0;
}
llstr(long_trid, llbuf);
fprintf(tracef, "Transaction long_trid %s short_trid %u committed", llbuf, sid);
if (gslsn != LSN_IMPOSSIBLE)
{
/*
It's not an error, it may be that trn got a disk error when writing to a
table, so an unfinished group staid in the log.
*/
fprintf(tracef, ", with group at LSN (%lu,0x%lx) short_trid %u aborted\n",
(ulong) LSN_FILE_NO(gslsn), (ulong) LSN_OFFSET(gslsn), sid);
all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
}
else
fprintf(tracef, "\n");
bzero(&all_active_trans[sid], sizeof(all_active_trans[sid]));
#ifdef MARIA_VERSIONING
/*
Earlier or here, create true transactions in TM.
If done earlier, note that TM should not wake up the delete/update handler
when it receives a commit info, as existing REDO for purge may exist in
the log, and so the delete/update handler may do changes which conflict
with these REDOs.
Even if done here, better to not wake it up now as we're going to free the
page cache.
if real recovery:
transaction was committed, move it to some separate list for later
purging (but don't purge now! purging may have been started before, we
may find REDO_PURGE records soon).
*/
#endif
return 0;
}
/* Just to inform about any aborted groups or unfinished transactions */
static int end_of_redo_phase()
{
uint sid, unfinished= 0, error= 0;
for (sid= 0; sid <= SHORT_TRID_MAX; sid++)
{
TrID long_trid= all_active_trans[sid].long_trid;
LSN gslsn= all_active_trans[sid].group_start_lsn;
if (all_active_trans[sid].undo_lsn != LSN_IMPOSSIBLE)
{
char llbuf[22];
llstr(long_trid, llbuf);
fprintf(tracef, "Transaction long_trid %s short_trid %u unfinished\n",
llbuf, sid);
unfinished++;
}
if (gslsn != LSN_IMPOSSIBLE)
{
fprintf(tracef, "Group at LSN (%lu,0x%lx) short_trid %u aborted\n",
(ulong) LSN_FILE_NO(gslsn), (ulong) LSN_OFFSET(gslsn), sid);
ALERT_USER();
}
/* If real recovery: roll back unfinished transaction */
#ifdef MARIA_VERSIONING
/*
If real recovery: transaction was committed, move it to some separate
list for soon purging. Create TRNs.
*/
#endif
}
/*
We don't close tables if there are some unfinished transactions, because
closing tables normally requires that all unfinished transactions on them
be rolled back. Unfinished transactions are symptom of a crash, we
reproduce the crash.
For example, closing will soon write the state to disk and when doing that
it will think this is a committed state, but it may not be.
*/
if (unfinished > 0)
fprintf(tracef, "WARNING: %u unfinished transactions; some tables may be"
" left inconsistent!\n", unfinished);
for (sid= 0; sid <= SHARE_ID_MAX; sid++)
{
MARIA_HA *info= all_tables[sid];
if (info != NULL)
{
/* if error, still close other tables */
error|= close_recovered_table(info);
}
}
return error;
}
static int close_recovered_table(MARIA_HA *info)
{
int error;
MARIA_SHARE *share= info->s;
fprintf(tracef, " Closing table '%s'\n", share->open_file_name);
_ma_reenable_logging_for_table(share);
/*
Recovery normally corrected problems, don't scare user with "table was not
closed properly" in CHECK TABLE and don't automatically check table at
next open (when we have --maria-recover).
*/
share->state.open_count= share->global_changed ? 1 : 0;
/* this var is set only by non-recovery operations (mi_write() etc) */
DBUG_ASSERT(!share->global_changed);
if ((error= maria_close(info)))
fprintf(tracef, "Failed to close table\n");
return error;
}
static MARIA_HA *get_MARIA_HA_from_REDO_record(const
TRANSLOG_HEADER_BUFFER *rec)
{
uint16 sid;
ulonglong page;
MARIA_HA *info;
char llbuf[22];
sid= fileid_korr(rec->header);
page= page_korr(rec->header + FILEID_STORE_SIZE);
/* BUG not correct for REDO_PURGE_BLOCKS, page is not at this pos */
llstr(page, llbuf);
fprintf(tracef, " For page %s of table of short id %u", llbuf, sid);
info= all_tables[sid];
if (info == NULL)
{
fprintf(tracef, ", table skipped, so skipping record\n");
return NULL;
}
fprintf(tracef, ", '%s'", info->s->open_file_name);
/* detect if an open instance of a dropped table (internal bug) */
DBUG_ASSERT(info->s->last_version != 0);
if (cmp_translog_addr(info->s->state.create_rename_lsn, rec->lsn) >= 0)
{
fprintf(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than log"
" record\n",
(ulong) LSN_FILE_NO(rec->lsn), (ulong) LSN_OFFSET(rec->lsn));
return NULL;
}
fprintf(tracef, ", applying record\n");
return info;
/*
Soon we will also skip the page depending on the rec_lsn for this page in
the checkpoint record, but this is not absolutely needed for now (just
assume we have made no checkpoint). Btw rec_lsn and bitmap's recovery is a
an unsolved problem (rec_lsn is to ignore a REDO without reading the data
page and to do so we need to be sure the corresponding bitmap page does
not need a _ma_bitmap_set()).
*/
}
static MARIA_HA *get_MARIA_HA_from_UNDO_record(const
TRANSLOG_HEADER_BUFFER *rec)
{
uint16 sid;
MARIA_HA *info;
sid= fileid_korr(rec->header + LSN_STORE_SIZE);
fprintf(tracef, " For table of short id %u", sid);
info= all_tables[sid];
if (info == NULL)
{
fprintf(tracef, ", table skipped, so skipping record\n");
return NULL;
}
fprintf(tracef, ", '%s'", info->s->open_file_name);
DBUG_ASSERT(info->s->last_version != 0);
if (cmp_translog_addr(info->s->state.create_rename_lsn, rec->lsn) >= 0)
{
fprintf(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than log"
" record\n",
(ulong) LSN_FILE_NO(rec->lsn), (ulong) LSN_OFFSET(rec->lsn));
return NULL;
}
fprintf(tracef, ", applying record\n");
return info;
/*
Soon we will also skip the page depending on the rec_lsn for this page in
the checkpoint record, but this is not absolutely needed for now (just
assume we have made no checkpoint).
*/
}
/* some comments and pseudo-code which we keep for later */
#if 0
/*
MikaelR suggests: support checkpoints during REDO phase too: do checkpoint
after a certain amount of log records have been executed. This helps
against repeated crashes. Those checkpoints could not be user-requested
......@@ -214,8 +1131,7 @@ int recovery()
/**** UNDO PHASE *****/
print_information_to_error_log(nb of trans to roll back, nb of prepared trans);
print_information_to_error_log(nb of trans to roll back, nb of prepared trans
/*
Launch one or more threads to do the background rollback. Don't wait for
them to complete their rollback (background rollback; for debugging, we
......@@ -265,3 +1181,4 @@ pthread_handler_decl rollback_background_thread()
unlock_mutex(rollback_threads);
pthread_exit();
}
#endif
......@@ -22,4 +22,8 @@
/* This is the interface of this module. */
/* Performs recovery of the engine at start */
int recovery();
C_MODE_START
int maria_recover();
int maria_apply_log(LSN lsn, my_bool applyn, FILE *trace_file);
C_MODE_END
......@@ -62,8 +62,8 @@ int maria_rename(const char *old_name, const char *new_name)
this is important; make sure transactionality has been re-enabled.
*/
DBUG_ASSERT(share->now_transactional == share->base.born_transactional);
sync_dir= (share->now_transactional && !share->temporary) ?
MY_SYNC_DIR : 0;
sync_dir= (share->now_transactional && !share->temporary &&
!maria_in_recovery) ? MY_SYNC_DIR : 0;
if (sync_dir)
{
uchar log_data[2 + 2];
......
......@@ -47,7 +47,7 @@ static void copy_key(struct st_maria_info *info,uint inx,
static int verbose=0,testflag=0,
first_key=0,async_io=0,pagecacheing=0,write_cacheing=0,locking=0,
rec_pointer_size=0,pack_fields=1,silent=0,
opt_quick_mode=0, transactional= 0;
opt_quick_mode=0, transactional= 0, skip_update= 0;
static int pack_seg=HA_SPACE_PACK,pack_type=HA_PACK_KEY,remove_count=-1;
static int create_flag= 0, srand_arg= 0;
static ulong pagecache_size=IO_SIZE*16;
......@@ -84,7 +84,24 @@ int main(int argc, char *argv[])
if (! async_io)
my_disable_async_io=1;
maria_init();
maria_data_root= ".";
/* Maria requires that we always have a page cache */
if (maria_init() ||
(init_pagecache(maria_pagecache, pagecache_size, 0, 0,
maria_block_size) == 0) ||
ma_control_file_create_or_open(TRUE) ||
(init_pagecache(maria_log_pagecache,
TRANSLOG_PAGECACHE_SIZE, 0, 0,
TRANSLOG_PAGE_SIZE) == 0) ||
translog_init(maria_data_root, TRANSLOG_FILE_SIZE,
0, 0, maria_log_pagecache,
TRANSLOG_DEFAULT_FLAGS) ||
(transactional && trnman_init()))
{
fprintf(stderr, "Error in initialization");
exit(1);
}
reclength=STANDARD_LENGTH+60+(use_blob ? 8 : 0);
blob_pos=STANDARD_LENGTH+60;
keyinfo[0].seg= &glob_keyseg[0][0];
......@@ -220,22 +237,6 @@ int main(int argc, char *argv[])
goto err;
if (!silent)
printf("- Writing key:s\n");
maria_data_root= ".";
/* Maria requires that we always have a page cache */
if ((init_pagecache(maria_pagecache, pagecache_size, 0, 0,
maria_block_size) == 0) ||
ma_control_file_create_or_open(TRUE) ||
(init_pagecache(maria_log_pagecache,
TRANSLOG_PAGECACHE_SIZE, 0, 0,
TRANSLOG_PAGE_SIZE) == 0) ||
translog_init(maria_data_root, TRANSLOG_FILE_SIZE,
0, 0, maria_log_pagecache,
TRANSLOG_DEFAULT_FLAGS))
{
fprintf(stderr, "Error in initialization");
exit(1);
}
if (locking)
maria_lock_database(file,F_WRLCK);
if (write_cacheing)
......@@ -246,6 +247,14 @@ int main(int argc, char *argv[])
for (i=0 ; i < recant ; i++)
{
ulong blob_length;
#if 0
/*
Starting from i==72, there was a difference between runtime and
log-appplying. This is now fixed, by not using non_header_data_len in
log-applying.
*/
if (i == 72) goto end;
#endif
n1=rnd(1000); n2=rnd(100); n3=rnd(5000);
sprintf(record,"%6d:%4d:%8d:Pos: %4d ",n1,n2,n3,write_count);
int4store(record+STANDARD_LENGTH-4,(long) i);
......@@ -260,7 +269,7 @@ int main(int argc, char *argv[])
printf("Error: %d in write at record: %d\n",my_errno,i);
goto err;
}
if (verbose) printf(" Double key: %d\n",n3);
if (verbose) printf(" Double key: %d at record# %d\n", n3, i);
}
else
{
......@@ -294,7 +303,7 @@ int main(int argc, char *argv[])
if (maria_extra(file,HA_EXTRA_NO_CACHE,0))
{
puts("got error from maria_extra(HA_EXTRA_NO_CACHE)");
goto end;
goto err;
}
}
#ifdef REMOVE_WHEN_WE_HAVE_RESIZE
......@@ -376,6 +385,8 @@ int main(int argc, char *argv[])
else
bmove(record+blob_pos,read_record+blob_pos,8);
}
if (skip_update)
continue;
if (maria_update(file,read_record,record2))
{
if (my_errno != HA_ERR_FOUND_DUPP_KEY || key3[n3] == 0)
......@@ -423,7 +434,7 @@ int main(int argc, char *argv[])
if (memcmp(read_record,read_record2,reclength) != 0)
{
printf("maria_rsame didn't find same record\n");
goto end;
goto err;
}
info.recpos=maria_position(file);
if (maria_rfirst(file,read_record2,0) ||
......@@ -431,7 +442,7 @@ int main(int argc, char *argv[])
memcmp(read_record,read_record2,reclength) != 0)
{
printf("maria_rsame_with_pos didn't find same record\n");
goto end;
goto err;
}
{
info.recpos= maria_position(file);
......@@ -442,7 +453,7 @@ int main(int argc, char *argv[])
info.recpos != maria_position(file))
{
printf("maria_rsame_with_pos lost position\n");
goto end;
goto err;
}
}
ant=1;
......@@ -451,7 +462,7 @@ int main(int argc, char *argv[])
if (ant != dupp_keys)
{
printf("next: Found: %d keys of %d\n",ant,dupp_keys);
goto end;
goto err;
}
ant=0;
while (maria_rprev(file,read_record3,0) == 0 &&
......@@ -459,7 +470,7 @@ int main(int argc, char *argv[])
if (ant != dupp_keys)
{
printf("prev: Found: %d records of %d\n",ant,dupp_keys);
goto end;
goto err;
}
/* Check of maria_rnext_same */
......@@ -471,7 +482,7 @@ int main(int argc, char *argv[])
if (ant != dupp_keys || my_errno != HA_ERR_END_OF_FILE)
{
printf("maria_rnext_same: Found: %d records of %d\n",ant,dupp_keys);
goto end;
goto err;
}
}
......@@ -482,7 +493,7 @@ int main(int argc, char *argv[])
if (maria_rfirst(file,read_record,0))
{
printf("Can't find first record\n");
goto end;
goto err;
}
while ((error=maria_rnext(file,read_record3,0)) == 0 && ant < write_count+10)
ant++;
......@@ -490,7 +501,7 @@ int main(int argc, char *argv[])
{
printf("next: I found: %d records of %d (error: %d)\n",
ant, write_count - opt_delete, error);
goto end;
goto err;
}
if (maria_rlast(file,read_record2,0) ||
bcmp(read_record2,read_record3,reclength))
......@@ -498,7 +509,7 @@ int main(int argc, char *argv[])
printf("Can't find last record\n");
DBUG_DUMP("record2",(byte*) read_record2,reclength);
DBUG_DUMP("record3",(byte*) read_record3,reclength);
goto end;
goto err;
}
ant=1;
while (maria_rprev(file,read_record3,0) == 0 && ant < write_count+10)
......@@ -506,12 +517,12 @@ int main(int argc, char *argv[])
if (ant != write_count - opt_delete)
{
printf("prev: I found: %d records of %d\n",ant,write_count);
goto end;
goto err;
}
if (bcmp(read_record,read_record3,reclength))
{
printf("Can't find first record\n");
goto end;
goto err;
}
if (!silent)
......@@ -552,7 +563,7 @@ int main(int argc, char *argv[])
if (bcmp(read_record+start,key,(uint) i))
{
puts("Didn't find right record");
goto end;
goto err;
}
}
if (dupp_keys > 2)
......@@ -570,7 +581,7 @@ int main(int argc, char *argv[])
if (ant != dupp_keys-1)
{
printf("next: I can only find: %d keys of %d\n",ant,dupp_keys-1);
goto end;
goto err;
}
}
if (dupp_keys>4)
......@@ -588,7 +599,7 @@ int main(int argc, char *argv[])
if (ant != dupp_keys-2)
{
printf("next: I can only find: %d keys of %d\n",ant,dupp_keys-2);
goto end;
goto err;
}
}
if (dupp_keys > 6)
......@@ -607,7 +618,7 @@ int main(int argc, char *argv[])
if (ant != dupp_keys-3)
{
printf("next: I can only find: %d keys of %d\n",ant,dupp_keys-3);
goto end;
goto err;
}
if (!silent)
......@@ -622,7 +633,7 @@ int main(int argc, char *argv[])
if (ant != dupp_keys-4)
{
printf("next: I can only find: %d keys of %d\n",ant,dupp_keys-4);
goto end;
goto err;
}
}
......@@ -655,7 +666,7 @@ int main(int argc, char *argv[])
if (bcmp(read_record,read_record2,reclength) != 0)
{
printf("maria_rsame didn't find same record\n");
goto end;
goto err;
}
}
if (!silent)
......@@ -682,7 +693,7 @@ int main(int argc, char *argv[])
{
printf("maria_records_range returned %ld; Should be about %ld\n",
(long) range_records,(long) info.records);
goto end;
goto err;
}
if (verbose)
{
......@@ -719,7 +730,7 @@ int main(int argc, char *argv[])
{
printf("maria_records_range for key: %d returned %lu; Should be about %lu\n",
i, (ulong) range_records, (ulong) records);
goto end;
goto err;
}
if (verbose && records)
{
......@@ -740,6 +751,7 @@ int main(int argc, char *argv[])
puts("Wrong info from maria_info");
printf("Got: records: %lu delete: %lu i_keys: %d\n",
(ulong) info.records, (ulong) info.deleted, info.keys);
goto err;
}
if (verbose)
{
......@@ -764,7 +776,7 @@ int main(int argc, char *argv[])
if (locking || (!use_blob && !pack_fields))
{
puts("got error from maria_extra(HA_EXTRA_CACHE)");
goto end;
goto err;
}
}
ant=0;
......@@ -777,12 +789,12 @@ int main(int argc, char *argv[])
{
printf("scan with cache: I can only find: %d records of %d\n",
ant,write_count-opt_delete);
goto end;
goto err;
}
if (maria_extra(file,HA_EXTRA_NO_CACHE,0))
{
puts("got error from maria_extra(HA_EXTRA_NO_CACHE)");
goto end;
goto err;
}
ant=0;
......@@ -794,7 +806,7 @@ int main(int argc, char *argv[])
{
printf("scan with cache: I can only find: %d records of %d\n",
ant,write_count-opt_delete);
goto end;
goto err;
}
if (testflag == 4)
......@@ -852,6 +864,15 @@ int main(int argc, char *argv[])
goto err;
}
opt_delete++;
#if 0
/
/*
179 is ok, 180 causes a difference between runtime and log-applying.
This is now fixed (we zero the last directory entry during
log-applying, just to eliminate this irrelevant difference).
*/
if (opt_delete==180) goto end;
#endif
}
else
found_parts++;
......@@ -1021,6 +1042,9 @@ static void get_options(int argc, char **argv)
case 'D':
create_flag|=HA_CREATE_DELAY_KEY_WRITE;
break;
case 'g':
skip_update= TRUE;
break;
case '?':
case 'I':
case 'V':
......
......@@ -6,6 +6,9 @@
# If you want to run this in Valgrind, you should use --trace-children=yes,
# so that it detects problems in ma_test* and not in the shell script
# Running in a "shared memory" disk is 10 times faster; you can do
# mkdir /dev/shm/test; cd /dev/shm/test; maria_path=<path_to_maria_binaries>
# Remove # from following line if you need some more information
#set -x -v -e
......@@ -21,6 +24,7 @@ fi
# Delete temporary files
rm -f *.TMD
rm -f maria_log*
run_tests()
{
......@@ -211,8 +215,14 @@ echo "$maria_path/maria_chk$suffix -sm test2 will warn that 'Datafile is almost
$maria_path/maria_chk$suffix -sm test2 >ma_test2_message.txt 2>&1
cat ma_test2_message.txt
grep "warning: Datafile is almost full" ma_test2_message.txt >/dev/null
rm -f ma_test2_message.txt
$maria_path/maria_chk$suffix -ssm test2
#
# Test that removing tables and applying the log leads to identical tables
#
/bin/sh $maria_path/ma_test_recovery
#
# Some timing tests
#
......
set -e
if [ -z "$maria_path" ]
then
maria_path="."
fi
echo "MARIA RECOVERY TESTS - success is if exit code is 0"
# runs a program inserting/deleting rows, then moves the resulting table
# elsewhere; applies the log and checks that the data file is
# identical to the saved original.
# Does not test the index file as we don't have logging for it yet.
rm -f maria_log*
prog="$maria_path/ma_test1 -M -T --skip-update"
echo "TEST WITH $prog"
$prog
mv -f test1.MAD test1.MAD.good
rm test1.MAI
echo "applying log"
$maria_path/maria_read_log -a > /dev/null
cmp test1.MAD test1.MAD.good
rm -f test1.*
rm -f maria_log*
prog="$maria_path/ma_test2 -s -L -K -W -P -M -T -g"
echo "TEST WITH $prog"
$prog
mv -f test2.MAD test2.MAD.good
rm test2.MAI
echo "applying log"
$maria_path/maria_read_log -a > /dev/null
cmp test2.MAD test2.MAD.good
rm -f test2.*
echo "ALL RECOVERY TESTS OK"
......@@ -14,7 +14,7 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "maria_def.h"
#include <ma_blockrec.h>
#include "ma_recovery.h"
#include <my_getopt.h>
#define PCACHE_SIZE (1024*1024*10)
......@@ -32,60 +32,6 @@ const char *default_dbug_option= "d:t:i:o,/tmp/maria_read_log.trace";
#endif /* DBUG_OFF */
static my_bool opt_only_display, opt_display_and_apply;
struct TRN_FOR_RECOVERY
{
LSN group_start_lsn, undo_lsn;
TrID long_trid;
};
struct TRN_FOR_RECOVERY all_active_trans[SHORT_TRID_MAX + 1];
MARIA_HA *all_tables[SHORT_TRID_MAX + 1];
LSN current_group_end_lsn= LSN_IMPOSSIBLE;
static void end_of_redo_phase();
static void display_record_position(const LOG_DESC *log_desc,
const TRANSLOG_HEADER_BUFFER *rec,
uint number);
static int display_and_apply_record(const LOG_DESC *log_desc,
const TRANSLOG_HEADER_BUFFER *rec);
#define prototype_exec_hook(R) \
static int exec_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec)
prototype_exec_hook(LONG_TRANSACTION_ID);
#ifdef MARIA_CHECKPOINT
prototype_exec_hook(CHECKPOINT);
#endif
prototype_exec_hook(REDO_CREATE_TABLE);
prototype_exec_hook(FILE_ID);
prototype_exec_hook(REDO_INSERT_ROW_HEAD);
prototype_exec_hook(REDO_INSERT_ROW_TAIL);
prototype_exec_hook(REDO_PURGE_ROW_HEAD);
prototype_exec_hook(REDO_PURGE_ROW_TAIL);
prototype_exec_hook(UNDO_ROW_INSERT);
prototype_exec_hook(UNDO_ROW_DELETE);
prototype_exec_hook(COMMIT);
/*
TODO: Avoid mallocs in exec.
Proposed fix:
Add either a context/buffer argument to all exec_hook functions
or add 'record_buffer' and 'record_buffer_length' to
TRANSLOG_HEADER_BUFFER.
With this we could use my_realloc() instead of my_malloc() to
allocate data and save some mallocs.
*/
/*
To implement REDO_DROP_TABLE and REDO_RENAME_TABLE, we would need to go
through the all_tables[] array, find all open instances of the
table-to-drop-or-rename, and remove them from the array.
We however know that in real Recovery, we don't have to handle those log
records at all, same for REDO_CREATE_TABLE.
So for now, we can use this program to replay/debug a sequence of CREATE +
DMLs, but not DROP/RENAME; it is probably enough for a start.
*/
int main(int argc, char **argv)
{
LSN lsn;
......@@ -97,6 +43,7 @@ int main(int argc, char **argv)
get_options(&argc, &argv);
maria_data_root= ".";
maria_in_recovery= TRUE;
if (maria_init())
{
......@@ -114,6 +61,8 @@ int main(int argc, char **argv)
fprintf(stderr, "Can't find any log\n");
goto err;
}
/* same page cache for log and data; assumes same page size... */
DBUG_ASSERT(maria_block_size == TRANSLOG_PAGE_SIZE);
if (init_pagecache(maria_pagecache, PCACHE_SIZE, 0, 0,
TRANSLOG_PAGE_SIZE) == 0)
{
......@@ -133,147 +82,22 @@ int main(int argc, char **argv)
goto err;
}
/* install hooks for execution */
#define install_exec_hook(R) \
log_record_type_descriptor[LOGREC_ ## R].record_execute_in_redo_phase= \
exec_LOGREC_ ## R;
install_exec_hook(LONG_TRANSACTION_ID);
#ifdef MARIA_CHECKPOINT
install_exec_hook(CHECKPOINT);
#endif
install_exec_hook(REDO_CREATE_TABLE);
install_exec_hook(FILE_ID);
install_exec_hook(REDO_INSERT_ROW_HEAD);
install_exec_hook(REDO_INSERT_ROW_TAIL);
install_exec_hook(REDO_PURGE_ROW_HEAD);
install_exec_hook(REDO_PURGE_ROW_TAIL);
install_exec_hook(UNDO_ROW_INSERT);
install_exec_hook(UNDO_ROW_DELETE);
install_exec_hook(COMMIT);
if (opt_only_display)
printf("You are using --only-display, NOTHING will be written to disk\n");
lsn= first_lsn_in_log(); /*could also be last_checkpoint_lsn */
TRANSLOG_HEADER_BUFFER rec;
struct st_translog_scanner_data scanner;
uint i= 1;
lsn= first_lsn_in_log(); /* LSN could be also --start-from-lsn=# */
translog_size_t len= translog_read_record_header(lsn, &rec);
if (len == (TRANSLOG_RECORD_HEADER_MAX_SIZE + 1))
{
printf("EOF on the log\n");
goto end;
}
if (translog_init_scanner(lsn, 1, &scanner))
{
fprintf(stderr, "Scanner init failed\n");
fprintf(stdout, "TRACE of the last maria_read_log\n");
if (maria_apply_log(lsn, opt_display_and_apply, stdout))
goto err;
}
for (;;i++)
{
uint16 sid= rec.short_trid;
const LOG_DESC *log_desc= &log_record_type_descriptor[rec.type];
display_record_position(log_desc, &rec, i);
fprintf(stdout, "SUCCESS\n");
/*
A complete group is a set of log records with an "end mark" record
(e.g. a set of REDOs for an operation, terminated by an UNDO for this
operation); if there is no "end mark" record the group is incomplete
and won't be executed.
There are pitfalls: if a table write failed, the transaction may have
put an incomplete group in the log and then a COMMIT record, that will
make a complete group which is wrong. We say that we should mark the
table corrupted if such error happens (what if it cannot be marked?).
*/
if (log_desc->record_ends_group)
{
if (all_active_trans[sid].group_start_lsn != LSN_IMPOSSIBLE)
{
/*
There is a complete group for this transaction, containing more than
this event.
*/
printf(" ends a group:\n");
struct st_translog_scanner_data scanner2;
TRANSLOG_HEADER_BUFFER rec2;
len=
translog_read_record_header(all_active_trans[sid].group_start_lsn, &rec2);
if (len == (TRANSLOG_RECORD_HEADER_MAX_SIZE + 1))
{
fprintf(stderr, "Cannot find record where it should be\n");
goto err;
}
if (translog_init_scanner(rec2.lsn, 1, &scanner2))
{
fprintf(stderr, "Scanner2 init failed\n");
goto err;
}
current_group_end_lsn= rec.lsn;
do
{
if (rec2.short_trid == sid) /* it's in our group */
{
const LOG_DESC *log_desc2= &log_record_type_descriptor[rec2.type];
display_record_position(log_desc2, &rec2, 0);
if (display_and_apply_record(log_desc2, &rec2))
goto err;
}
len= translog_read_next_record_header(&scanner2, &rec2);
if (len == (TRANSLOG_RECORD_HEADER_MAX_SIZE + 1))
{
fprintf(stderr, "Cannot find record where it should be\n");
goto err;
}
}
while (rec2.lsn < rec.lsn);
translog_free_record_header(&rec2);
/* group finished */
all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
current_group_end_lsn= LSN_IMPOSSIBLE; /* for debugging */
}
if (display_and_apply_record(log_desc, &rec))
goto err;
}
else /* record does not end group */
{
/* just record the fact, can't know if can execute yet */
if (all_active_trans[sid].group_start_lsn == LSN_IMPOSSIBLE)
{
/* group not yet started */
all_active_trans[sid].group_start_lsn= rec.lsn;
}
}
len= translog_read_next_record_header(&scanner, &rec);
if (len == (TRANSLOG_RECORD_HEADER_MAX_SIZE + 1))
{
printf("EOF on the log\n");
goto end;
}
}
translog_free_record_header(&rec);
/*
So we have applied all REDOs.
We may now have unfinished transactions.
I don't think it's this program's job to roll them back:
to roll back and at the same time stay idempotent, it needs to write log
records (without CLRs, 2nd rollback would hit the effects of first
rollback and fail). But this standalone tool is not allowed to write to
the server's transaction log. So we do not roll back anything.
In the real Recovery code, or the code to do "recover after online
backup", yes we will roll back.
*/
end_of_redo_phase();
goto end;
err:
/* don't touch anything more, in case we hit a bug */
exit(1);
end:
maria_panic(HA_PANIC_CLOSE);
maria_end();
free_defaults(default_argv);
my_end(0);
exit(0);
......@@ -355,629 +179,3 @@ static void get_options(int *argc,char ***argv)
exit(1);
}
}
/* very basic info about the record's header */
static void display_record_position(const LOG_DESC *log_desc,
const TRANSLOG_HEADER_BUFFER *rec,
uint number)
{
/*
if number==0, we're going over records which we had already seen and which
form a group, so we indent below the group's end record
*/
printf("%sRec#%u LSN (%lu,0x%lx) short_trid %u %s(num_type:%u) len %lu\n",
number ? "" : " ", number,
(ulong) LSN_FILE_NO(rec->lsn), (ulong) LSN_OFFSET(rec->lsn),
rec->short_trid, log_desc->name, rec->type,
(ulong)rec->record_length);
}
static int display_and_apply_record(const LOG_DESC *log_desc,
const TRANSLOG_HEADER_BUFFER *rec)
{
int error;
if (opt_only_display)
return 0;
if (log_desc->record_execute_in_redo_phase == NULL)
{
/* die on all not-yet-handled records :) */
DBUG_ASSERT("one more hook" == "to write");
}
if ((error= (*log_desc->record_execute_in_redo_phase)(rec)))
fprintf(stderr, "Got error when executing record\n");
return error;
}
prototype_exec_hook(LONG_TRANSACTION_ID)
{
uint16 sid= rec->short_trid;
TrID long_trid= all_active_trans[sid].long_trid;
/* abort group of this trn (must be of before a crash) */
LSN gslsn= all_active_trans[sid].group_start_lsn;
char llbuf[22];
if (gslsn != LSN_IMPOSSIBLE)
{
printf("Group at LSN (%lu,0x%lx) short_trid %u aborted\n",
(ulong) LSN_FILE_NO(gslsn), (ulong) LSN_OFFSET(gslsn), sid);
all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
}
if (long_trid != 0)
{
LSN ulsn= all_active_trans[sid].undo_lsn;
if (ulsn != LSN_IMPOSSIBLE)
{
llstr(long_trid, llbuf);
fprintf(stderr, "Found an old transaction long_trid %s short_trid %u"
" with same short id as this new transaction, and has neither"
" committed nor rollback (undo_lsn: (%lu,0x%lx))\n", llbuf,
sid, (ulong) LSN_FILE_NO(ulsn), (ulong) LSN_OFFSET(ulsn));
goto err;
}
}
long_trid= uint6korr(rec->header);
all_active_trans[sid].long_trid= long_trid;
llstr(long_trid, llbuf);
printf("Transaction long_trid %s short_trid %u starts\n", llbuf, sid);
goto end;
err:
DBUG_ASSERT(0);
return 1;
end:
return 0;
}
#ifdef MARIA_CHECKPOINT
prototype_exec_hook(CHECKPOINT)
{
/* the only checkpoint we care about was found via control file, ignore */
return 0;
}
#endif
prototype_exec_hook(REDO_CREATE_TABLE)
{
File dfile= -1, kfile= -1;
char *linkname_ptr, filename[FN_REFLEN];
char *name, *ptr;
myf create_flag;
uint flags;
int error, create_mode= O_RDWR | O_TRUNC;
MARIA_HA *info= NULL;
if (((name= my_malloc(rec->record_length, MYF(MY_WME))) == NULL) ||
(translog_read_record(rec->lsn, 0, rec->record_length, name, NULL) !=
rec->record_length))
{
fprintf(stderr, "Failed to read record\n");
goto err;
}
printf("Table '%s'", name);
/* we try hard to get create_rename_lsn, to avoid mistakes if possible */
info= maria_open(name, O_RDONLY, HA_OPEN_FOR_REPAIR);
if (info)
{
MARIA_SHARE *share= info->s;
/* check that we're not already using it */
DBUG_ASSERT(share->reopen == 1);
DBUG_ASSERT(share->now_transactional == share->base.born_transactional);
if (!share->base.born_transactional)
{
/*
could be that transactional table was later dropped, and a non-trans
one was renamed to its name, thus create_rename_lsn is 0 and should
not be trusted.
*/
printf(", is not transactional\n");
DBUG_ASSERT(0); /* I want to know this */
goto end;
}
if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0)
{
printf(", has create_rename_lsn (%lu,0x%lx) is more recent than record",
(ulong) LSN_FILE_NO(rec->lsn),
(ulong) LSN_OFFSET(rec->lsn));
goto end;
}
if (maria_is_crashed(info))
{
printf(", is crashed, overwriting it");
DBUG_ASSERT(0); /* I want to know this */
}
maria_close(info);
info= NULL;
}
/* if does not exist, is older, or its header is corrupted, overwrite it */
// TODO symlinks
ptr= name + strlen(name) + 1;
if ((flags= ptr[0] ? HA_DONT_TOUCH_DATA : 0))
printf(", we will only touch index file");
fn_format(filename, name, "", MARIA_NAME_IEXT,
(MY_UNPACK_FILENAME |
(flags & HA_DONT_TOUCH_DATA) ? MY_RETURN_REAL_PATH : 0) |
MY_APPEND_EXT);
linkname_ptr= NULL;
create_flag= MY_DELETE_OLD;
printf(", creating as '%s'", filename);
if ((kfile= my_create_with_symlink(linkname_ptr, filename, 0, create_mode,
MYF(MY_WME|create_flag))) < 0)
{
fprintf(stderr, "Failed to create index file\n");
goto err;
}
ptr++;
uint kfile_size_before_extension= uint2korr(ptr);
ptr+= 2;
uint keystart= uint2korr(ptr);
ptr+= 2;
/* set create_rename_lsn (for maria_read_log to be idempotent) */
lsn_store(ptr + sizeof(info->s->state.header) + 2, rec->lsn);
if (my_pwrite(kfile, ptr,
kfile_size_before_extension, 0, MYF(MY_NABP|MY_WME)) ||
my_chsize(kfile, keystart, 0, MYF(MY_WME)))
{
fprintf(stderr, "Failed to write to index file\n");
goto err;
}
if (!(flags & HA_DONT_TOUCH_DATA))
{
fn_format(filename,name,"", MARIA_NAME_DEXT,
MY_UNPACK_FILENAME | MY_APPEND_EXT);
linkname_ptr= NULL;
create_flag=MY_DELETE_OLD;
if ((dfile=
my_create_with_symlink(linkname_ptr, filename, 0, create_mode,
MYF(MY_WME | create_flag))) < 0)
{
fprintf(stderr, "Failed to create data file\n");
goto err;
}
/*
we now have an empty data file. To be able to
_ma_initialize_data_file() we need some pieces of the share to be
correctly filled. So we just open the table (fortunately, an empty
data file does not preclude this).
*/
if (((info= maria_open(name, O_RDONLY, 0)) == NULL) ||
_ma_initialize_data_file(info->s, dfile))
{
fprintf(stderr, "Failed to open new table or write to data file\n");
goto err;
}
}
error= 0;
goto end;
err:
DBUG_ASSERT(0);
error= 1;
end:
printf("\n");
if (kfile >= 0)
error|= my_close(kfile, MYF(MY_WME));
if (dfile >= 0)
error|= my_close(dfile, MYF(MY_WME));
if (info != NULL)
error|= maria_close(info);
my_free(name, MYF(MY_ALLOW_ZERO_PTR));
return 0;
}
prototype_exec_hook(FILE_ID)
{
uint16 sid;
int error;
char *name, *buff;
MARIA_HA *info= NULL;
MARIA_SHARE *share;
if (((buff= my_malloc(rec->record_length, MYF(MY_WME))) == NULL) ||
(translog_read_record(rec->lsn, 0, rec->record_length, buff, NULL) !=
rec->record_length))
{
fprintf(stderr, "Failed to read record\n");
goto err;
}
sid= fileid_korr(buff);
name= buff + FILEID_STORE_SIZE;
printf("Table '%s', id %u", name, sid);
info= all_tables[sid];
if (info != NULL)
{
printf(", closing table '%s'", info->s->open_file_name);
all_tables[sid]= NULL;
_ma_reenable_logging_for_table(info->s); /* put back the truth */
if (maria_close(info))
{
fprintf(stderr, "Failed to close table\n");
goto err;
}
}
info= maria_open(name, O_RDWR, HA_OPEN_FOR_REPAIR);
if (info == NULL)
{
printf(", is absent (must have been dropped later?)"
" or its header is so corrupted that we cannot open it;"
" we skip it\n");
goto end;
}
if (maria_is_crashed(info))
{
fprintf(stderr, "Table is crashed, can't apply log records to it\n");
goto err;
}
share= info->s;
/* check that we're not already using it */
DBUG_ASSERT(share->reopen == 1);
DBUG_ASSERT(share->now_transactional == share->base.born_transactional);
if (!share->base.born_transactional)
{
printf(", is not transactional\n");
DBUG_ASSERT(0); /* I want to know this */
goto end;
}
all_tables[sid]= info;
/* don't log any records for this work */
_ma_tmp_disable_logging_for_table(share);
printf(", opened\n");
error= 0;
goto end;
err:
DBUG_ASSERT(0);
error= 1;
if (info != NULL)
error|= maria_close(info);
end:
my_free(buff, MYF(MY_ALLOW_ZERO_PTR));
return 0;
}
prototype_exec_hook(REDO_INSERT_ROW_HEAD)
{
uint16 sid;
ulonglong page;
MARIA_HA *info;
char llbuf[22];
byte *buff= 0;
sid= fileid_korr(rec->header);
page= page_korr(rec->header + FILEID_STORE_SIZE);
llstr(page, llbuf);
printf("For page %s of table of short id %u", llbuf, sid);
info= all_tables[sid];
if (info == NULL)
{
printf(", table skipped, so skipping record\n");
goto end;
}
printf(", '%s'", info->s->open_file_name);
if (cmp_translog_addr(info->s->state.create_rename_lsn, rec->lsn) >= 0)
{
printf(", has create_rename_lsn (%lu,0x%lx) is more recent than log"
" record\n",
(ulong) LSN_FILE_NO(rec->lsn), (ulong) LSN_OFFSET(rec->lsn));
goto end;
}
/*
Soon we will also skip the page depending on the rec_lsn for this page in
the checkpoint record, but this is not absolutely needed for now (just
assume we have made no checkpoint).
*/
printf(", applying record\n");
/*
If REDO's LSN is > page's LSN (read from disk), we are going to modify the
page and change its LSN. The normal runtime code stores the UNDO's LSN
into the page. Here storing the REDO's LSN (rec->lsn) would work
(we are not writing to the log here, so don't have to "flush up to UNDO's
LSN"). But in a test scenario where we do updates at runtime, then remove
tables, apply the log and check that this results in the same table as at
runtime, putting the same LSN as runtime had done will decrease
differences. So we use the UNDO's LSN which is current_group_end_lsn.
*/
if ((!(buff= (byte*) my_malloc(rec->record_length, MYF(MY_WME)))) ||
(translog_read_record(rec->lsn, 0, rec->record_length, buff, NULL) !=
rec->record_length))
{
fprintf(stderr, "Failed to read record\n");
goto end;
}
if (_ma_apply_redo_insert_row_head_or_tail(info, rec->lsn, HEAD_PAGE,
rec->header + FILEID_STORE_SIZE,
buff + (rec->record_length -
rec->non_header_data_len),
rec->non_header_data_len))
goto end;
my_free(buff, MYF(0));
return 0;
end:
/* as we don't have apply working: */
my_free(buff, MYF(MY_ALLOW_ZERO_PTR));
return 1;
}
prototype_exec_hook(REDO_INSERT_ROW_TAIL)
{
uint16 sid;
ulonglong page;
MARIA_HA *info;
char llbuf[22];
byte *buff= 0;
sid= fileid_korr(rec->header);
page= page_korr(rec->header + FILEID_STORE_SIZE);
llstr(page, llbuf);
printf("For page %s of table of short id %u", llbuf, sid);
info= all_tables[sid];
if (info == NULL)
{
printf(", table skipped, so skipping record\n");
goto end;
}
printf(", '%s'", info->s->open_file_name);
if (cmp_translog_addr(info->s->state.create_rename_lsn, rec->lsn) >= 0)
{
printf(", has create_rename_lsn (%lu,0x%lx) is more recent than log"
" record\n",
(ulong) LSN_FILE_NO(rec->lsn), (ulong) LSN_OFFSET(rec->lsn));
goto end;
}
/*
Soon we will also skip the page depending on the rec_lsn for this page in
the checkpoint record, but this is not absolutely needed for now (just
assume we have made no checkpoint).
*/
printf(", applying record\n");
/*
If REDO's LSN is > page's LSN (read from disk), we are going to modify the
page and change its LSN. The normal runtime code stores the UNDO's LSN
into the page. Here storing the REDO's LSN (rec->lsn) would work
(we are not writing to the log here, so don't have to "flush up to UNDO's
LSN"). But in a test scenario where we do updates at runtime, then remove
tables, apply the log and check that this results in the same table as at
runtime, putting the same LSN as runtime had done will decrease
differences. So we use the UNDO's LSN which is current_group_end_lsn.
*/
if ((!(buff= (byte*) my_malloc(rec->record_length, MYF(MY_WME)))) ||
(translog_read_record(rec->lsn, 0, rec->record_length, buff, NULL) !=
rec->record_length))
{
fprintf(stderr, "Failed to read record\n");
goto end;
}
if (_ma_apply_redo_insert_row_head_or_tail(info, rec->lsn, TAIL_PAGE,
rec->header + FILEID_STORE_SIZE,
buff + (rec->record_length -
rec->non_header_data_len),
rec->non_header_data_len))
goto end;
my_free(buff, MYF(0));
return 0;
end:
/* as we don't have apply working: */
my_free(buff, MYF(MY_ALLOW_ZERO_PTR));
return 1;
}
prototype_exec_hook(REDO_PURGE_ROW_HEAD)
{
uint16 sid;
ulonglong page;
MARIA_HA *info;
char llbuf[22];
sid= fileid_korr(rec->header);
page= page_korr(rec->header + FILEID_STORE_SIZE);
llstr(page, llbuf);
printf("For page %s of table of short id %u", llbuf, sid);
info= all_tables[sid];
if (info == NULL)
{
printf(", table skipped, so skipping record\n");
goto end;
}
printf(", '%s'", info->s->open_file_name);
if (cmp_translog_addr(info->s->state.create_rename_lsn, rec->lsn) >= 0)
{
printf(", has create_rename_lsn (%lu,0x%lx) is more recent than log"
" record\n",
(ulong) LSN_FILE_NO(rec->lsn), (ulong) LSN_OFFSET(rec->lsn));
goto end;
}
/*
Soon we will also skip the page depending on the rec_lsn for this page in
the checkpoint record, but this is not absolutely needed for now (just
assume we have made no checkpoint).
*/
printf(", applying record\n");
/*
If REDO's LSN is > page's LSN (read from disk), we are going to modify the
page and change its LSN. The normal runtime code stores the UNDO's LSN
into the page. Here storing the REDO's LSN (rec->lsn) would work
(we are not writing to the log here, so don't have to "flush up to UNDO's
LSN"). But in a test scenario where we do updates at runtime, then remove
tables, apply the log and check that this results in the same table as at
runtime, putting the same LSN as runtime had done will decrease
differences. So we use the UNDO's LSN which is current_group_end_lsn.
*/
if (_ma_apply_redo_purge_row_head_or_tail(info, rec->lsn, HEAD_PAGE,
rec->header + FILEID_STORE_SIZE))
goto end;
return 0;
end:
/* as we don't have apply working: */
return 1;
}
prototype_exec_hook(REDO_PURGE_ROW_TAIL)
{
uint16 sid;
ulonglong page;
MARIA_HA *info;
char llbuf[22];
sid= fileid_korr(rec->header);
page= page_korr(rec->header + FILEID_STORE_SIZE);
llstr(page, llbuf);
printf("For page %s of table of short id %u", llbuf, sid);
info= all_tables[sid];
if (info == NULL)
{
printf(", table skipped, so skipping record\n");
goto end;
}
printf(", '%s'", info->s->open_file_name);
if (cmp_translog_addr(info->s->state.create_rename_lsn, rec->lsn) >= 0)
{
printf(", has create_rename_lsn (%lu,0x%lx) is more recent than log"
" record\n",
(ulong) LSN_FILE_NO(rec->lsn), (ulong) LSN_OFFSET(rec->lsn));
goto end;
}
/*
Soon we will also skip the page depending on the rec_lsn for this page in
the checkpoint record, but this is not absolutely needed for now (just
assume we have made no checkpoint).
*/
printf(", applying record\n");
/*
If REDO's LSN is > page's LSN (read from disk), we are going to modify the
page and change its LSN. The normal runtime code stores the UNDO's LSN
into the page. Here storing the REDO's LSN (rec->lsn) would work
(we are not writing to the log here, so don't have to "flush up to UNDO's
LSN"). But in a test scenario where we do updates at runtime, then remove
tables, apply the log and check that this results in the same table as at
runtime, putting the same LSN as runtime had done will decrease
differences. So we use the UNDO's LSN which is current_group_end_lsn.
*/
if (_ma_apply_redo_purge_row_head_or_tail(info, rec->lsn, TAIL_PAGE,
rec->header + FILEID_STORE_SIZE))
goto end;
return 0;
end:
/* as we don't have apply working: */
return 1;
}
static int exec_LOGREC_UNDO_ROW_INSERT(const TRANSLOG_HEADER_BUFFER *rec
__attribute__((unused)))
{
/* Ignore this during the redo phase */
return 0;
}
static int exec_LOGREC_UNDO_ROW_DELETE(const TRANSLOG_HEADER_BUFFER *rec
__attribute__((unused)))
{
/* Ignore this during the redo phase */
return 0;
}
prototype_exec_hook(COMMIT)
{
uint16 sid= rec->short_trid;
TrID long_trid= all_active_trans[sid].long_trid;
LSN gslsn= all_active_trans[sid].group_start_lsn;
char llbuf[22];
if (long_trid == 0)
{
printf("We don't know about transaction short_trid %u;"
"it probably committed long ago, forget it\n", sid);
return 0;
}
llstr(long_trid, llbuf);
printf("Transaction long_trid %s short_trid %u committed", llbuf, sid);
if (gslsn != LSN_IMPOSSIBLE)
{
/*
It's not an error, it may be that trn got a disk error when writing to a
table, so an unfinished group staid in the log.
*/
printf(", with group at LSN (%lu,0x%lx) short_trid %u aborted\n",
(ulong) LSN_FILE_NO(gslsn), (ulong) LSN_OFFSET(gslsn), sid);
all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
}
else
printf("\n");
all_active_trans[sid].long_trid= 0;
#ifdef MARIA_VERSIONING
/*
if real recovery:
transaction was committed, move it to some separate list for later
purging (but don't purge now! purging may have been started before, we
may find REDO_PURGE records soon).
*/
#endif
return 0;
}
/* Just to inform about any aborted groups or unfinished transactions */
static void end_of_redo_phase()
{
uint sid, unfinished= 0;
for (sid= 0; sid <= SHORT_TRID_MAX; sid++)
{
TrID long_trid= all_active_trans[sid].long_trid;
LSN gslsn= all_active_trans[sid].group_start_lsn;
if (long_trid == 0)
continue;
if (all_active_trans[sid].undo_lsn != LSN_IMPOSSIBLE)
{
char llbuf[22];
llstr(long_trid, llbuf);
printf("Transaction long_trid %s short_trid %u unfinished\n",
llbuf, sid);
}
if (gslsn != LSN_IMPOSSIBLE)
{
printf("Group at LSN (%lu,0x%lx) short_trid %u aborted\n",
(ulong) LSN_FILE_NO(gslsn), (ulong) LSN_OFFSET(gslsn), sid);
}
/* If real recovery: roll back unfinished transaction */
#ifdef MARIA_VERSIONING
/*
If real recovery: transaction was committed, move it to some separate
list for soon purging.
*/
#endif
}
/*
We don't close tables if there are some unfinished transactions, because
closing tables normally requires that all unfinished transactions on them
be rolled back.
For example, closing will soon write the state to disk and when doing that
it will think this is a committed state, but it may not be.
*/
if (unfinished == 0)
{
for (sid= 0; sid <= SHORT_TRID_MAX; sid++)
{
MARIA_HA *info= all_tables[sid];
if (info != NULL)
{
_ma_reenable_logging_for_table(info->s); /* put back the truth */
maria_close(info);
}
}
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment