Commit 470842ba authored by unknown's avatar unknown

WL#3072 Maria recovery:

fix for bug: if a crash happened right after writing a REDO like this:
REDO - UNDO - REDO*, then recovery would ignore the last REDO* (ok),
rollback: REDO - UNDO - REDO* - REDO - CLR, and a next recovery would
thus execute REDO* instead of skipping it again. Recovery now logs
LOGREC_INCOMPLETE_GROUP when it meets REDO* for the first time,
to draw a boundary and ensure it is always skipped. Tested by hand.
Note: ma_test_all fails "maria_chk: error: Key 1 - Found too many records"
not due to this patch (failed before).


BitKeeper/triggers/post-commit:
  no truncation of the commit mail, or how to review patches?
mysql-test/include/maria_verify_recovery.inc:
  let caller choose the statement used to crash (sometimes we
  want the crash to happen at special places)
mysql-test/t/maria-recovery.test:
  user of maria_verify_recovery.inc now specifies statement which the
  script should use for crashing.
storage/maria/ma_bitmap.c:
  it's easier to search for all places using functions from the bitmap
  module (like in ma_blockrec.c) if those exported functions all start
  with "_ma_bitmap": renaming some of them.
  Assertion that when we read a bitmap page, overwriting bitmap->map,
  we are not losing information (i.e. bitmap->changed is false).
storage/maria/ma_blockrec.c:
  update to new names. Adding code (disabled, protected by a #ifdef)
  that I use to test certain crash scenarios (more to come).
storage/maria/ma_blockrec.h:
  update to new names
storage/maria/ma_checkpoint.c:
  update to new names
storage/maria/ma_extra.c:
  update to new names
storage/maria/ma_loghandler.c:
  new LOGREC_INCOMPLETE_GROUP
storage/maria/ma_loghandler.h:
  new LOGREC_INCOMPLETE_GROUP
storage/maria/ma_recovery.c:
  When at the end of the REDO phase we have identified some transactions
  with incomplete REDO groups (REDOs without an UNDO or CLR_END),
  for each of them we log LOGREC_INCOMPLETE_GROUP. This way, the
  upcoming UNDO phase can write more records for such transaction,
  a future recovery won't pair the incomplete group with the
  CLR_END (as there is LOGREC_INCOMPLETE_GROUP to draw a boundary).
parent ef8ef30b
...@@ -97,7 +97,7 @@ see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html ...@@ -97,7 +97,7 @@ see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
EOF EOF
bk changes -v -r+ bk changes -v -r+
bk rset -r+ -ah | bk gnupatch -h -dup -T bk rset -r+ -ah | bk gnupatch -h -dup -T
) | bk sed -e ${LIMIT}q > $BKROOT/BitKeeper/tmp/commits.txt ) > $BKROOT/BitKeeper/tmp/commits.txt
$SENDMAIL -t < $BKROOT/BitKeeper/tmp/commits.txt $SENDMAIL -t < $BKROOT/BitKeeper/tmp/commits.txt
......
...@@ -4,11 +4,11 @@ ...@@ -4,11 +4,11 @@
# API: # API:
# 1) set $mms_tables to N, the script will cover tables mysqltest.t1,...tN # 1) set $mms_tables to N, the script will cover tables mysqltest.t1,...tN
# 2) set $mvr_debug_option to the crash way # 2) set $mvr_debug_option to the crash way
# 3) set $mvr_restore_old_snapshot to 1 if you want recovery to run on # 3) set $mvr_crash_statement to the statement which will trigger a crash
# 4) set $mvr_restore_old_snapshot to 1 if you want recovery to run on
# an old copy of tables and of the control file, 0 for normal recovery. # an old copy of tables and of the control file, 0 for normal recovery.
# 4) set $mms_compare_physically to 1 if you want a physical byte-for-byte # 5) set $mms_compare_physically to 1 if you want a physical byte-for-byte
# comparison with expected table. Checksum comparison is always done. # comparison with expected table. Checksum comparison is always done.
# "mvr" is a namespace for Maria_Verify_Recovery # "mvr" is a namespace for Maria_Verify_Recovery
connection admin; connection admin;
...@@ -34,7 +34,7 @@ system echo wait-maria_verify_recovery.inc >> $MYSQLTEST_VARDIR/tmp/master0.expe ...@@ -34,7 +34,7 @@ system echo wait-maria_verify_recovery.inc >> $MYSQLTEST_VARDIR/tmp/master0.expe
eval SET SESSION debug=$mvr_debug_option; eval SET SESSION debug=$mvr_debug_option;
--echo * crashing mysqld intentionally --echo * crashing mysqld intentionally
--error 2013 --error 2013
set global maria_checkpoint_interval=1; # this will crash (DBUG magic) eval $mvr_crash_statement; # this will crash (DBUG magic)
if ($mvr_restore_old_snapshot) if ($mvr_restore_old_snapshot)
{ {
......
...@@ -38,6 +38,7 @@ let $mvr_restore_old_snapshot=1; ...@@ -38,6 +38,7 @@ let $mvr_restore_old_snapshot=1;
# produce a physically identical table. # produce a physically identical table.
let $mms_compare_physically=1; let $mms_compare_physically=1;
let $mvr_debug_option="+d,maria_flush_whole_log,maria_crash"; let $mvr_debug_option="+d,maria_flush_whole_log,maria_crash";
let $mvr_crash_statement= set global maria_checkpoint_interval=1;
# the script below will trigger recovery and compare checksums # the script below will trigger recovery and compare checksums
-- source include/maria_verify_recovery.inc -- source include/maria_verify_recovery.inc
let $mms_compare_physically=0; let $mms_compare_physically=0;
...@@ -58,6 +59,7 @@ let $mvr_restore_old_snapshot=0; ...@@ -58,6 +59,7 @@ let $mvr_restore_old_snapshot=0;
# UNDO phase prevents physical comparison, normally, # UNDO phase prevents physical comparison, normally,
# so we'll only use checksums to compare. # so we'll only use checksums to compare.
let $mms_compare_physically=0; let $mms_compare_physically=0;
let $mvr_crash_statement= set global maria_checkpoint_interval=1;
# Note that we don't remove logs between iterations. Test is # Note that we don't remove logs between iterations. Test is
# cumulative (each new recovery processes more log records than the previous). # cumulative (each new recovery processes more log records than the previous).
...@@ -134,6 +136,7 @@ SELECT LENGTH(b) FROM t1 WHERE i=3; ...@@ -134,6 +136,7 @@ SELECT LENGTH(b) FROM t1 WHERE i=3;
let $mvr_restore_old_snapshot=1; let $mvr_restore_old_snapshot=1;
let $mms_compare_physically=0; let $mms_compare_physically=0;
let $mvr_debug_option="+d,maria_flush_whole_log,maria_crash"; let $mvr_debug_option="+d,maria_flush_whole_log,maria_crash";
let $mvr_crash_statement= set global maria_checkpoint_interval=1;
-- source include/maria_verify_recovery.inc -- source include/maria_verify_recovery.inc
SELECT LENGTH(b) FROM t1 WHERE i=3; SELECT LENGTH(b) FROM t1 WHERE i=3;
drop table t1; drop table t1;
......
...@@ -229,7 +229,7 @@ my_bool _ma_bitmap_init(MARIA_SHARE *share, File file) ...@@ -229,7 +229,7 @@ my_bool _ma_bitmap_init(MARIA_SHARE *share, File file)
my_bool _ma_bitmap_end(MARIA_SHARE *share) my_bool _ma_bitmap_end(MARIA_SHARE *share)
{ {
my_bool res= _ma_flush_bitmap(share); my_bool res= _ma_bitmap_flush(share);
pthread_mutex_destroy(&share->bitmap.bitmap_lock); pthread_mutex_destroy(&share->bitmap.bitmap_lock);
my_free((uchar*) share->bitmap.map, MYF(MY_ALLOW_ZERO_PTR)); my_free((uchar*) share->bitmap.map, MYF(MY_ALLOW_ZERO_PTR));
share->bitmap.map= 0; share->bitmap.map= 0;
...@@ -241,11 +241,11 @@ my_bool _ma_bitmap_end(MARIA_SHARE *share) ...@@ -241,11 +241,11 @@ my_bool _ma_bitmap_end(MARIA_SHARE *share)
Send updated bitmap to the page cache Send updated bitmap to the page cache
SYNOPSIS SYNOPSIS
_ma_flush_bitmap() _ma_bitmap_flush()
share Share handler share Share handler
NOTES NOTES
In the future, _ma_flush_bitmap() will be called to flush changes don't In the future, _ma_bitmap_flush() will be called to flush changes don't
by this thread (ie, checking the changed flag is ok). The reason we by this thread (ie, checking the changed flag is ok). The reason we
check it again in the mutex is that if someone else did a flush at the check it again in the mutex is that if someone else did a flush at the
same time, we don't have to do the write. same time, we don't have to do the write.
...@@ -255,10 +255,10 @@ my_bool _ma_bitmap_end(MARIA_SHARE *share) ...@@ -255,10 +255,10 @@ my_bool _ma_bitmap_end(MARIA_SHARE *share)
1 error 1 error
*/ */
my_bool _ma_flush_bitmap(MARIA_SHARE *share) my_bool _ma_bitmap_flush(MARIA_SHARE *share)
{ {
my_bool res= 0; my_bool res= 0;
DBUG_ENTER("_ma_flush_bitmap"); DBUG_ENTER("_ma_bitmap_flush");
if (share->bitmap.changed) if (share->bitmap.changed)
{ {
pthread_mutex_lock(&share->bitmap.bitmap_lock); pthread_mutex_lock(&share->bitmap.bitmap_lock);
...@@ -585,6 +585,7 @@ static my_bool _ma_read_bitmap_page(MARIA_SHARE *share, ...@@ -585,6 +585,7 @@ static my_bool _ma_read_bitmap_page(MARIA_SHARE *share,
my_bool res; my_bool res;
DBUG_ENTER("_ma_read_bitmap_page"); DBUG_ENTER("_ma_read_bitmap_page");
DBUG_ASSERT(page % bitmap->pages_covered == 0); DBUG_ASSERT(page % bitmap->pages_covered == 0);
DBUG_ASSERT(!bitmap->changed);
bitmap->page= page; bitmap->page= page;
if (end_of_page > share->state.state.data_file_length) if (end_of_page > share->state.state.data_file_length)
...@@ -713,7 +714,7 @@ static my_bool _ma_change_bitmap_page(MARIA_HA *info, ...@@ -713,7 +714,7 @@ static my_bool _ma_change_bitmap_page(MARIA_HA *info,
RETURN RETURN
0 ok 0 ok
1 error (either couldn't save old bitmap or read new one 1 error (either couldn't save old bitmap or read new one)
*/ */
static my_bool move_to_next_bitmap(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap) static my_bool move_to_next_bitmap(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap)
...@@ -1824,7 +1825,7 @@ static uint get_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap, ...@@ -1824,7 +1825,7 @@ static uint get_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap,
Mark all pages in a region as free Mark all pages in a region as free
SYNOPSIS SYNOPSIS
_ma_reset_full_page_bits() _ma_bitmap_reset_full_page_bits()
info Maria handler info Maria handler
bitmap Bitmap handler bitmap Bitmap handler
page Start page page Start page
...@@ -1839,13 +1840,14 @@ static uint get_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap, ...@@ -1839,13 +1840,14 @@ static uint get_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap,
1 Error (when reading bitmap) 1 Error (when reading bitmap)
*/ */
my_bool _ma_reset_full_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap, my_bool _ma_bitmap_reset_full_page_bits(MARIA_HA *info,
MARIA_FILE_BITMAP *bitmap,
ulonglong page, uint page_count) ulonglong page, uint page_count)
{ {
ulonglong bitmap_page; ulonglong bitmap_page;
uint offset, bit_start, bit_count, tmp; uint offset, bit_start, bit_count, tmp;
uchar *data; uchar *data;
DBUG_ENTER("_ma_reset_full_page_bits"); DBUG_ENTER("_ma_bitmap_reset_full_page_bits");
DBUG_PRINT("enter", ("page: %lu page_count: %u", (ulong) page, page_count)); DBUG_PRINT("enter", ("page: %lu page_count: %u", (ulong) page, page_count));
safe_mutex_assert_owner(&info->s->bitmap.bitmap_lock); safe_mutex_assert_owner(&info->s->bitmap.bitmap_lock);
...@@ -1899,7 +1901,7 @@ my_bool _ma_reset_full_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap, ...@@ -1899,7 +1901,7 @@ my_bool _ma_reset_full_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap,
Set all pages in a region as used Set all pages in a region as used
SYNOPSIS SYNOPSIS
_ma_set_full_page_bits() _ma_bitmap_set_full_page_bits()
info Maria handler info Maria handler
bitmap Bitmap handler bitmap Bitmap handler
page Start page page Start page
...@@ -1914,13 +1916,14 @@ my_bool _ma_reset_full_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap, ...@@ -1914,13 +1916,14 @@ my_bool _ma_reset_full_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap,
1 Error (when reading bitmap) 1 Error (when reading bitmap)
*/ */
my_bool _ma_set_full_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap, my_bool _ma_bitmap_set_full_page_bits(MARIA_HA *info,
MARIA_FILE_BITMAP *bitmap,
ulonglong page, uint page_count) ulonglong page, uint page_count)
{ {
ulonglong bitmap_page; ulonglong bitmap_page;
uint offset, bit_start, bit_count, tmp; uint offset, bit_start, bit_count, tmp;
uchar *data; uchar *data;
DBUG_ENTER("_ma_set_full_page_bits"); DBUG_ENTER("_ma_bitmap_set_full_page_bits");
DBUG_PRINT("enter", ("page: %lu page_count: %u", (ulong) page, page_count)); DBUG_PRINT("enter", ("page: %lu page_count: %u", (ulong) page, page_count));
safe_mutex_assert_owner(&info->s->bitmap.bitmap_lock); safe_mutex_assert_owner(&info->s->bitmap.bitmap_lock);
...@@ -2058,7 +2061,7 @@ my_bool _ma_bitmap_release_unused(MARIA_HA *info, MARIA_BITMAP_BLOCKS *blocks) ...@@ -2058,7 +2061,7 @@ my_bool _ma_bitmap_release_unused(MARIA_HA *info, MARIA_BITMAP_BLOCKS *blocks)
goto err; goto err;
} }
if (!(block->used & BLOCKUSED_USED) && if (!(block->used & BLOCKUSED_USED) &&
_ma_reset_full_page_bits(info, bitmap, _ma_bitmap_reset_full_page_bits(info, bitmap,
block->page, page_count)) block->page, page_count))
goto err; goto err;
} }
...@@ -2105,7 +2108,8 @@ my_bool _ma_bitmap_free_full_pages(MARIA_HA *info, const uchar *extents, ...@@ -2105,7 +2108,8 @@ my_bool _ma_bitmap_free_full_pages(MARIA_HA *info, const uchar *extents,
continue; /* Not used extent */ continue; /* Not used extent */
if (pagecache_delete_pages(info->s->pagecache, &info->dfile, page, if (pagecache_delete_pages(info->s->pagecache, &info->dfile, page,
page_count, PAGECACHE_LOCK_WRITE, 1) || page_count, PAGECACHE_LOCK_WRITE, 1) ||
_ma_reset_full_page_bits(info, &info->s->bitmap, page, page_count)) _ma_bitmap_reset_full_page_bits(info, &info->s->bitmap, page,
page_count))
{ {
pthread_mutex_unlock(&info->s->bitmap.bitmap_lock); pthread_mutex_unlock(&info->s->bitmap.bitmap_lock);
DBUG_RETURN(1); DBUG_RETURN(1);
...@@ -2122,7 +2126,7 @@ my_bool _ma_bitmap_free_full_pages(MARIA_HA *info, const uchar *extents, ...@@ -2122,7 +2126,7 @@ my_bool _ma_bitmap_free_full_pages(MARIA_HA *info, const uchar *extents,
SYNOPSIS SYNOPSIS
_ma_bitmap_set() _ma_bitmap_set()
info Mari handler info Maria handler
page Adress to page page Adress to page
head 1 if page is a head page, 0 if tail page head 1 if page is a head page, 0 if tail page
empty_space How much empty space there is on page empty_space How much empty space there is on page
......
...@@ -1830,8 +1830,7 @@ static my_bool free_full_page_range(MARIA_HA *info, ulonglong page, uint count) ...@@ -1830,8 +1830,7 @@ static my_bool free_full_page_range(MARIA_HA *info, ulonglong page, uint count)
res= 1; res= 1;
} }
pthread_mutex_lock(&info->s->bitmap.bitmap_lock); pthread_mutex_lock(&info->s->bitmap.bitmap_lock);
if (_ma_reset_full_page_bits(info, &info->s->bitmap, page, if (_ma_bitmap_reset_full_page_bits(info, &info->s->bitmap, page, count))
count))
res= 1; res= 1;
pthread_mutex_unlock(&info->s->bitmap.bitmap_lock); pthread_mutex_unlock(&info->s->bitmap.bitmap_lock);
DBUG_RETURN(res); DBUG_RETURN(res);
...@@ -2357,6 +2356,24 @@ static my_bool write_block_record(MARIA_HA *info, ...@@ -2357,6 +2356,24 @@ static my_bool write_block_record(MARIA_HA *info,
goto disk_err; goto disk_err;
} }
#ifdef RECOVERY_EXTRA_DEBUG
if (info->trn->undo_lsn != LSN_IMPOSSIBLE)
{
/* Stop right after the REDO; testing incomplete log record groups */
DBUG_EXECUTE_IF("maria_flush_whole_log",
{
DBUG_PRINT("maria_flush_whole_log", ("now"));
translog_flush(translog_get_horizon());
});
DBUG_EXECUTE_IF("maria_crash",
{
DBUG_PRINT("maria_crash", ("now"));
fflush(DBUG_FILE);
abort();
});
}
#endif
/* Increase data file size, if extended */ /* Increase data file size, if extended */
position= (my_off_t) head_block->page * block_size; position= (my_off_t) head_block->page * block_size;
if (info->state->data_file_length <= position) if (info->state->data_file_length <= position)
...@@ -2677,6 +2694,24 @@ static my_bool allocate_and_write_block_record(MARIA_HA *info, ...@@ -2677,6 +2694,24 @@ static my_bool allocate_and_write_block_record(MARIA_HA *info,
if (_ma_bitmap_find_place(info, row, blocks)) if (_ma_bitmap_find_place(info, row, blocks))
DBUG_RETURN(1); /* Error reading bitmap */ DBUG_RETURN(1); /* Error reading bitmap */
#ifdef RECOVERY_EXTRA_DEBUG
/* Send this over-allocated bitmap to disk and crash, see if recovers */
DBUG_EXECUTE_IF("maria_flush_bitmap",
{
DBUG_PRINT("maria_flush_bitmap", ("now"));
_ma_bitmap_flush(info->s);
_ma_flush_table_files(info, MARIA_FLUSH_DATA |
MARIA_FLUSH_INDEX,
FLUSH_KEEP, FLUSH_KEEP);
});
DBUG_EXECUTE_IF("maria_crash",
{
DBUG_PRINT("maria_crash", ("now"));
fflush(DBUG_FILE);
abort();
});
#endif
/* page will be pinned & locked by get_head_or_tail_page */ /* page will be pinned & locked by get_head_or_tail_page */
if (get_head_or_tail_page(info, blocks->block, info->buff, if (get_head_or_tail_page(info, blocks->block, info->buff,
row->space_on_head_page, HEAD_PAGE, row->space_on_head_page, HEAD_PAGE,
...@@ -4108,7 +4143,7 @@ my_bool _ma_scan_init_block_record(MARIA_HA *info) ...@@ -4108,7 +4143,7 @@ my_bool _ma_scan_init_block_record(MARIA_HA *info)
We have to flush bitmap as we will read the bitmap from the page cache We have to flush bitmap as we will read the bitmap from the page cache
while scanning rows while scanning rows
*/ */
DBUG_RETURN(_ma_flush_bitmap(info->s)); DBUG_RETURN(_ma_bitmap_flush(info->s));
} }
...@@ -5329,7 +5364,7 @@ uint _ma_apply_redo_free_blocks(MARIA_HA *info, ...@@ -5329,7 +5364,7 @@ uint _ma_apply_redo_free_blocks(MARIA_HA *info,
/** @todo leave bitmap lock to the bitmap code... */ /** @todo leave bitmap lock to the bitmap code... */
pthread_mutex_lock(&share->bitmap.bitmap_lock); pthread_mutex_lock(&share->bitmap.bitmap_lock);
res= _ma_reset_full_page_bits(info, &share->bitmap, start_page, res= _ma_bitmap_reset_full_page_bits(info, &share->bitmap, start_page,
page_range); page_range);
pthread_mutex_unlock(&share->bitmap.bitmap_lock); pthread_mutex_unlock(&share->bitmap.bitmap_lock);
if (res) if (res)
...@@ -5404,7 +5439,7 @@ uint _ma_apply_redo_free_head_or_tail(MARIA_HA *info, LSN lsn, ...@@ -5404,7 +5439,7 @@ uint _ma_apply_redo_free_head_or_tail(MARIA_HA *info, LSN lsn,
} }
/** @todo leave bitmap lock to the bitmap code... */ /** @todo leave bitmap lock to the bitmap code... */
pthread_mutex_lock(&share->bitmap.bitmap_lock); pthread_mutex_lock(&share->bitmap.bitmap_lock);
res= _ma_reset_full_page_bits(info, &share->bitmap, page, 1); res= _ma_bitmap_reset_full_page_bits(info, &share->bitmap, page, 1);
pthread_mutex_unlock(&share->bitmap.bitmap_lock); pthread_mutex_unlock(&share->bitmap.bitmap_lock);
if (res) if (res)
DBUG_RETURN(res); DBUG_RETURN(res);
...@@ -5553,7 +5588,7 @@ uint _ma_apply_redo_insert_row_blobs(MARIA_HA *info, ...@@ -5553,7 +5588,7 @@ uint _ma_apply_redo_insert_row_blobs(MARIA_HA *info,
} }
/** @todo leave bitmap lock to the bitmap code... */ /** @todo leave bitmap lock to the bitmap code... */
pthread_mutex_lock(&share->bitmap.bitmap_lock); pthread_mutex_lock(&share->bitmap.bitmap_lock);
res= _ma_set_full_page_bits(info, &share->bitmap, start_page, res= _ma_bitmap_set_full_page_bits(info, &share->bitmap, start_page,
page_range); page_range);
pthread_mutex_unlock(&share->bitmap.bitmap_lock); pthread_mutex_unlock(&share->bitmap.bitmap_lock);
if (res) if (res)
......
...@@ -170,7 +170,7 @@ my_bool _ma_compare_block_record(register MARIA_HA *info, ...@@ -170,7 +170,7 @@ my_bool _ma_compare_block_record(register MARIA_HA *info,
/* ma_bitmap.c */ /* ma_bitmap.c */
my_bool _ma_bitmap_init(MARIA_SHARE *share, File file); my_bool _ma_bitmap_init(MARIA_SHARE *share, File file);
my_bool _ma_bitmap_end(MARIA_SHARE *share); my_bool _ma_bitmap_end(MARIA_SHARE *share);
my_bool _ma_flush_bitmap(MARIA_SHARE *share); my_bool _ma_bitmap_flush(MARIA_SHARE *share);
void _ma_bitmap_reset_cache(MARIA_SHARE *share); void _ma_bitmap_reset_cache(MARIA_SHARE *share);
my_bool _ma_bitmap_find_place(MARIA_HA *info, MARIA_ROW *row, my_bool _ma_bitmap_find_place(MARIA_HA *info, MARIA_ROW *row,
MARIA_BITMAP_BLOCKS *result_blocks); MARIA_BITMAP_BLOCKS *result_blocks);
...@@ -179,9 +179,11 @@ my_bool _ma_bitmap_free_full_pages(MARIA_HA *info, const uchar *extents, ...@@ -179,9 +179,11 @@ my_bool _ma_bitmap_free_full_pages(MARIA_HA *info, const uchar *extents,
uint count); uint count);
my_bool _ma_bitmap_set(MARIA_HA *info, ulonglong pos, my_bool head, my_bool _ma_bitmap_set(MARIA_HA *info, ulonglong pos, my_bool head,
uint empty_space); uint empty_space);
my_bool _ma_reset_full_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap, my_bool _ma_bitmap_reset_full_page_bits(MARIA_HA *info,
MARIA_FILE_BITMAP *bitmap,
ulonglong page, uint page_count); ulonglong page, uint page_count);
my_bool _ma_set_full_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap, my_bool _ma_bitmap_set_full_page_bits(MARIA_HA *info,
MARIA_FILE_BITMAP *bitmap,
ulonglong page, uint page_count); ulonglong page, uint page_count);
uint _ma_free_size_to_head_pattern(MARIA_FILE_BITMAP *bitmap, uint size); uint _ma_free_size_to_head_pattern(MARIA_FILE_BITMAP *bitmap, uint size);
my_bool _ma_bitmap_find_new_place(MARIA_HA *info, MARIA_ROW *new_row, my_bool _ma_bitmap_find_new_place(MARIA_HA *info, MARIA_ROW *new_row,
......
...@@ -1070,7 +1070,7 @@ static int collect_tables(LEX_STRING *str, LSN checkpoint_start_log_horizon) ...@@ -1070,7 +1070,7 @@ static int collect_tables(LEX_STRING *str, LSN checkpoint_start_log_horizon)
over-allocation if crash); see also _ma_change_bitmap_page(). over-allocation if crash); see also _ma_change_bitmap_page().
*/ */
sync_error|= sync_error|=
_ma_flush_bitmap(share); /* after that, all is in page cache */ _ma_bitmap_flush(share); /* after that, all is in page cache */
DBUG_ASSERT(share->pagecache == maria_pagecache); DBUG_ASSERT(share->pagecache == maria_pagecache);
} }
if (share->in_checkpoint & MARIA_CHECKPOINT_SHOULD_FREE_ME) if (share->in_checkpoint & MARIA_CHECKPOINT_SHOULD_FREE_ME)
......
...@@ -572,7 +572,7 @@ int _ma_flush_table_files(MARIA_HA *info, uint flush_data_or_index, ...@@ -572,7 +572,7 @@ int _ma_flush_table_files(MARIA_HA *info, uint flush_data_or_index,
} }
if (share->data_file_type == BLOCK_RECORD) if (share->data_file_type == BLOCK_RECORD)
{ {
if(_ma_flush_bitmap(share) || if(_ma_bitmap_flush(share) ||
flush_pagecache_blocks(share->pagecache, &info->dfile, flush_pagecache_blocks(share->pagecache, &info->dfile,
flush_type_for_data)) flush_type_for_data))
goto err; goto err;
......
...@@ -531,6 +531,11 @@ static LOG_DESC INIT_LOGREC_INCOMPLETE_LOG= ...@@ -531,6 +531,11 @@ static LOG_DESC INIT_LOGREC_INCOMPLETE_LOG=
NULL, NULL, NULL, 0, NULL, NULL, NULL, 0,
"incomplete_log", LOGREC_IS_GROUP_ITSELF, NULL, NULL}; "incomplete_log", LOGREC_IS_GROUP_ITSELF, NULL, NULL};
static LOG_DESC INIT_LOGREC_INCOMPLETE_GROUP=
{LOGRECTYPE_FIXEDLENGTH, 0, 0,
NULL, NULL, NULL, 0,
"incomplete_group", LOGREC_IS_GROUP_ITSELF, NULL, NULL};
const myf log_write_flags= MY_WME | MY_NABP | MY_WAIT_IF_FULL; const myf log_write_flags= MY_WME | MY_NABP | MY_WAIT_IF_FULL;
static void loghandler_init() static void loghandler_init()
...@@ -610,12 +615,14 @@ static void loghandler_init() ...@@ -610,12 +615,14 @@ static void loghandler_init()
INIT_LOGREC_LONG_TRANSACTION_ID; INIT_LOGREC_LONG_TRANSACTION_ID;
log_record_type_descriptor[LOGREC_INCOMPLETE_LOG]= log_record_type_descriptor[LOGREC_INCOMPLETE_LOG]=
INIT_LOGREC_INCOMPLETE_LOG; INIT_LOGREC_INCOMPLETE_LOG;
for (i= LOGREC_INCOMPLETE_LOG + 1; log_record_type_descriptor[LOGREC_INCOMPLETE_GROUP]=
INIT_LOGREC_INCOMPLETE_GROUP;
for (i= LOGREC_INCOMPLETE_GROUP + 1;
i < LOGREC_NUMBER_OF_TYPES; i < LOGREC_NUMBER_OF_TYPES;
i++) i++)
log_record_type_descriptor[i].rclass= LOGRECTYPE_NOT_ALLOWED; log_record_type_descriptor[i].rclass= LOGRECTYPE_NOT_ALLOWED;
DBUG_EXECUTE("info", DBUG_EXECUTE("info",
check_translog_description_table(LOGREC_INCOMPLETE_LOG);); check_translog_description_table(LOGREC_INCOMPLETE_GROUP););
}; };
......
...@@ -133,6 +133,7 @@ enum translog_record_type ...@@ -133,6 +133,7 @@ enum translog_record_type
LOGREC_FILE_ID, LOGREC_FILE_ID,
LOGREC_LONG_TRANSACTION_ID, LOGREC_LONG_TRANSACTION_ID,
LOGREC_INCOMPLETE_LOG, LOGREC_INCOMPLETE_LOG,
LOGREC_INCOMPLETE_GROUP,
LOGREC_RESERVED_FUTURE_EXTENSION= 63 LOGREC_RESERVED_FUTURE_EXTENSION= 63
}; };
#define LOGREC_NUMBER_OF_TYPES 64 /* Maximum, can't be extended */ #define LOGREC_NUMBER_OF_TYPES 64 /* Maximum, can't be extended */
......
...@@ -80,6 +80,7 @@ prototype_redo_exec_hook(REDO_REPAIR_TABLE); ...@@ -80,6 +80,7 @@ prototype_redo_exec_hook(REDO_REPAIR_TABLE);
prototype_redo_exec_hook(REDO_DROP_TABLE); prototype_redo_exec_hook(REDO_DROP_TABLE);
prototype_redo_exec_hook(FILE_ID); prototype_redo_exec_hook(FILE_ID);
prototype_redo_exec_hook(INCOMPLETE_LOG); prototype_redo_exec_hook(INCOMPLETE_LOG);
prototype_redo_exec_hook_dummy(INCOMPLETE_GROUP);
prototype_redo_exec_hook(REDO_INSERT_ROW_HEAD); prototype_redo_exec_hook(REDO_INSERT_ROW_HEAD);
prototype_redo_exec_hook(REDO_INSERT_ROW_TAIL); prototype_redo_exec_hook(REDO_INSERT_ROW_TAIL);
prototype_redo_exec_hook(REDO_INSERT_ROW_BLOBS); prototype_redo_exec_hook(REDO_INSERT_ROW_BLOBS);
...@@ -108,7 +109,7 @@ prototype_undo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT); ...@@ -108,7 +109,7 @@ prototype_undo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT);
static int run_redo_phase(LSN lsn, enum maria_apply_log_way apply); static int run_redo_phase(LSN lsn, enum maria_apply_log_way apply);
static uint end_of_redo_phase(my_bool prepare_for_undo_phase); static uint end_of_redo_phase(my_bool prepare_for_undo_phase);
static int run_undo_phase(uint unfinished); static int run_undo_phase(uint uncommitted);
static void display_record_position(const LOG_DESC *log_desc, static void display_record_position(const LOG_DESC *log_desc,
const TRANSLOG_HEADER_BUFFER *rec, const TRANSLOG_HEADER_BUFFER *rec,
uint number); uint number);
...@@ -276,7 +277,7 @@ int maria_apply_log(LSN from_lsn, enum maria_apply_log_way apply, ...@@ -276,7 +277,7 @@ int maria_apply_log(LSN from_lsn, enum maria_apply_log_way apply,
my_bool take_checkpoints, uint *warnings_count) my_bool take_checkpoints, uint *warnings_count)
{ {
int error= 0; int error= 0;
uint unfinished_trans; uint uncommitted_trans;
ulonglong old_now; ulonglong old_now;
DBUG_ENTER("maria_apply_log"); DBUG_ENTER("maria_apply_log");
...@@ -326,7 +327,7 @@ int maria_apply_log(LSN from_lsn, enum maria_apply_log_way apply, ...@@ -326,7 +327,7 @@ int maria_apply_log(LSN from_lsn, enum maria_apply_log_way apply,
if (run_redo_phase(from_lsn, apply)) if (run_redo_phase(from_lsn, apply))
goto err; goto err;
if ((unfinished_trans= if ((uncommitted_trans=
end_of_redo_phase(should_run_undo_phase)) == (uint)-1) end_of_redo_phase(should_run_undo_phase)) == (uint)-1)
goto err; goto err;
...@@ -366,13 +367,13 @@ int maria_apply_log(LSN from_lsn, enum maria_apply_log_way apply, ...@@ -366,13 +367,13 @@ int maria_apply_log(LSN from_lsn, enum maria_apply_log_way apply,
if (should_run_undo_phase) if (should_run_undo_phase)
{ {
if (run_undo_phase(unfinished_trans)) if (run_undo_phase(uncommitted_trans))
goto err; goto err;
} }
else if (unfinished_trans > 0) else if (uncommitted_trans > 0)
{ {
tprint(tracef, "***WARNING: %u unfinished transactions; some tables may" tprint(tracef, "***WARNING: %u uncommitted transactions; some tables may"
" be left inconsistent!***\n", unfinished_trans); " be left inconsistent!***\n", uncommitted_trans);
warnings++; warnings++;
} }
...@@ -481,7 +482,7 @@ prototype_redo_exec_hook(LONG_TRANSACTION_ID) ...@@ -481,7 +482,7 @@ prototype_redo_exec_hook(LONG_TRANSACTION_ID)
LSN gslsn= all_active_trans[sid].group_start_lsn; LSN gslsn= all_active_trans[sid].group_start_lsn;
if (gslsn != LSN_IMPOSSIBLE) if (gslsn != LSN_IMPOSSIBLE)
{ {
tprint(tracef, "Group at LSN (%lu,0x%lx) short_trid %u aborted\n", tprint(tracef, "Group at LSN (%lu,0x%lx) short_trid %u incomplete\n",
LSN_IN_PARTS(gslsn), sid); LSN_IN_PARTS(gslsn), sid);
all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE; all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
} }
...@@ -538,6 +539,12 @@ prototype_redo_exec_hook_dummy(CHECKPOINT) ...@@ -538,6 +539,12 @@ prototype_redo_exec_hook_dummy(CHECKPOINT)
} }
prototype_redo_exec_hook_dummy(INCOMPLETE_GROUP)
{
/* abortion was already made */
return 0;
}
prototype_redo_exec_hook(INCOMPLETE_LOG) prototype_redo_exec_hook(INCOMPLETE_LOG)
{ {
MARIA_HA *info; MARIA_HA *info;
...@@ -1687,7 +1694,6 @@ prototype_redo_exec_hook(COMMIT) ...@@ -1687,7 +1694,6 @@ prototype_redo_exec_hook(COMMIT)
{ {
uint16 sid= rec->short_trid; uint16 sid= rec->short_trid;
TrID long_trid= all_active_trans[sid].long_trid; TrID long_trid= all_active_trans[sid].long_trid;
LSN gslsn= all_active_trans[sid].group_start_lsn;
char llbuf[22]; char llbuf[22];
if (long_trid == 0) if (long_trid == 0)
{ {
...@@ -1696,19 +1702,8 @@ prototype_redo_exec_hook(COMMIT) ...@@ -1696,19 +1702,8 @@ prototype_redo_exec_hook(COMMIT)
return 0; return 0;
} }
llstr(long_trid, llbuf); llstr(long_trid, llbuf);
tprint(tracef, "Transaction long_trid %s short_trid %u committed", llbuf, sid); tprint(tracef, "Transaction long_trid %s short_trid %u committed\n",
if (gslsn != LSN_IMPOSSIBLE) llbuf, sid);
{
/*
It's not an error, it may be that trn got a disk error when writing to a
table, so an unfinished group staid in the log.
*/
tprint(tracef, ", with group at LSN (%lu,0x%lx) short_trid %u aborted\n",
LSN_IN_PARTS(gslsn), sid);
all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
}
else
tprint(tracef, "\n");
bzero(&all_active_trans[sid], sizeof(all_active_trans[sid])); bzero(&all_active_trans[sid], sizeof(all_active_trans[sid]));
#ifdef MARIA_VERSIONING #ifdef MARIA_VERSIONING
/* /*
...@@ -2096,6 +2091,7 @@ static int run_redo_phase(LSN lsn, enum maria_apply_log_way apply) ...@@ -2096,6 +2091,7 @@ static int run_redo_phase(LSN lsn, enum maria_apply_log_way apply)
install_redo_exec_hook(REDO_DROP_TABLE); install_redo_exec_hook(REDO_DROP_TABLE);
install_redo_exec_hook(FILE_ID); install_redo_exec_hook(FILE_ID);
install_redo_exec_hook(INCOMPLETE_LOG); install_redo_exec_hook(INCOMPLETE_LOG);
install_redo_exec_hook(INCOMPLETE_GROUP);
install_redo_exec_hook(REDO_INSERT_ROW_HEAD); install_redo_exec_hook(REDO_INSERT_ROW_HEAD);
install_redo_exec_hook(REDO_INSERT_ROW_TAIL); install_redo_exec_hook(REDO_INSERT_ROW_TAIL);
install_redo_exec_hook(REDO_INSERT_ROW_BLOBS); install_redo_exec_hook(REDO_INSERT_ROW_BLOBS);
...@@ -2154,8 +2150,8 @@ static int run_redo_phase(LSN lsn, enum maria_apply_log_way apply) ...@@ -2154,8 +2150,8 @@ static int run_redo_phase(LSN lsn, enum maria_apply_log_way apply)
/* /*
A complete group is a set of log records with an "end mark" record A complete group is a set of log records with an "end mark" record
(e.g. a set of REDOs for an operation, terminated by an UNDO for this (e.g. a set of REDOs for an operation, terminated by an UNDO for this
operation); if there is no "end mark" record the group is incomplete operation); if there is no "end mark" record the group is incomplete and
and won't be executed. won't be executed.
*/ */
if ((log_desc->record_in_group == LOGREC_IS_GROUP_ITSELF) || if ((log_desc->record_in_group == LOGREC_IS_GROUP_ITSELF) ||
(log_desc->record_in_group == LOGREC_LAST_IN_GROUP)) (log_desc->record_in_group == LOGREC_LAST_IN_GROUP))
...@@ -2168,8 +2164,7 @@ static int run_redo_phase(LSN lsn, enum maria_apply_log_way apply) ...@@ -2168,8 +2164,7 @@ static int run_redo_phase(LSN lsn, enum maria_apply_log_way apply)
can happen if the transaction got a table write error, then can happen if the transaction got a table write error, then
unlocked tables thus wrote a COMMIT record. unlocked tables thus wrote a COMMIT record.
*/ */
tprint(tracef, "\nDiscarding unfinished group before this record\n"); tprint(tracef, "\nDiscarding incomplete group before this record\n");
ALERT_USER();
all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE; all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
} }
else else
...@@ -2285,14 +2280,14 @@ static int run_redo_phase(LSN lsn, enum maria_apply_log_way apply) ...@@ -2285,14 +2280,14 @@ static int run_redo_phase(LSN lsn, enum maria_apply_log_way apply)
/** /**
@brief Informs about any aborted groups or unfinished transactions, @brief Informs about any aborted groups or uncommitted transactions,
prepares for the UNDO phase if needed. prepares for the UNDO phase if needed.
@note Observe that it may init trnman. @note Observe that it may init trnman.
*/ */
static uint end_of_redo_phase(my_bool prepare_for_undo_phase) static uint end_of_redo_phase(my_bool prepare_for_undo_phase)
{ {
uint sid, unfinished= 0; uint sid, uncommitted= 0;
char llbuf[22]; char llbuf[22];
LSN addr; LSN addr;
...@@ -2316,12 +2311,15 @@ static uint end_of_redo_phase(my_bool prepare_for_undo_phase) ...@@ -2316,12 +2311,15 @@ static uint end_of_redo_phase(my_bool prepare_for_undo_phase)
LSN gslsn= all_active_trans[sid].group_start_lsn; LSN gslsn= all_active_trans[sid].group_start_lsn;
TRN *trn; TRN *trn;
if (gslsn != LSN_IMPOSSIBLE) if (gslsn != LSN_IMPOSSIBLE)
tprint(tracef, "Group at LSN (%lu,0x%lx) short_trid %u aborted\n", {
tprint(tracef, "Group at LSN (%lu,0x%lx) short_trid %u incomplete\n",
LSN_IN_PARTS(gslsn), sid); LSN_IN_PARTS(gslsn), sid);
all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
}
if (all_active_trans[sid].undo_lsn != LSN_IMPOSSIBLE) if (all_active_trans[sid].undo_lsn != LSN_IMPOSSIBLE)
{ {
llstr(long_trid, llbuf); llstr(long_trid, llbuf);
tprint(tracef, "Transaction long_trid %s short_trid %u unfinished\n", tprint(tracef, "Transaction long_trid %s short_trid %u uncommitted\n",
llbuf, sid); llbuf, sid);
/* dummy_transaction_object serves only for DDLs */ /* dummy_transaction_object serves only for DDLs */
DBUG_ASSERT(long_trid != 0); DBUG_ASSERT(long_trid != 0);
...@@ -2332,9 +2330,24 @@ static uint end_of_redo_phase(my_bool prepare_for_undo_phase) ...@@ -2332,9 +2330,24 @@ static uint end_of_redo_phase(my_bool prepare_for_undo_phase)
trn->undo_lsn= all_active_trans[sid].undo_lsn; trn->undo_lsn= all_active_trans[sid].undo_lsn;
trn->first_undo_lsn= all_active_trans[sid].first_undo_lsn | trn->first_undo_lsn= all_active_trans[sid].first_undo_lsn |
TRANSACTION_LOGGED_LONG_ID; /* because trn is known in log */ TRANSACTION_LOGGED_LONG_ID; /* because trn is known in log */
if (gslsn != LSN_IMPOSSIBLE)
{
/*
UNDO phase will log some records. So, a future recovery may see:
REDO(from incomplete group) - REDO(from rollback) - CLR_END
and thus execute the first REDO (finding it in "a complete
group"). To prevent that:
*/
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS];
LSN lsn;
if (translog_write_record(&lsn, LOGREC_INCOMPLETE_GROUP,
trn, NULL, 0,
TRANSLOG_INTERNAL_PARTS, log_array,
NULL, NULL))
return -1;
}
} }
/* otherwise we will just warn about it */ uncommitted++;
unfinished++;
} }
#ifdef MARIA_VERSIONING #ifdef MARIA_VERSIONING
/* /*
...@@ -2366,13 +2379,13 @@ static uint end_of_redo_phase(my_bool prepare_for_undo_phase) ...@@ -2366,13 +2379,13 @@ static uint end_of_redo_phase(my_bool prepare_for_undo_phase)
translog_assign_id_to_share_from_recovery(info->s, sid); translog_assign_id_to_share_from_recovery(info->s, sid);
} }
} }
return unfinished; return uncommitted;
} }
static int run_undo_phase(uint unfinished) static int run_undo_phase(uint uncommitted)
{ {
if (unfinished > 0) if (uncommitted > 0)
{ {
checkpoint_useful= TRUE; checkpoint_useful= TRUE;
if (tracef != stdout) if (tracef != stdout)
...@@ -2382,12 +2395,12 @@ static int run_undo_phase(uint unfinished) ...@@ -2382,12 +2395,12 @@ static int run_undo_phase(uint unfinished)
fprintf(stderr, "transactions to roll back:"); fprintf(stderr, "transactions to roll back:");
recovery_message_printed= REC_MSG_UNDO; recovery_message_printed= REC_MSG_UNDO;
} }
tprint(tracef, "%u transactions will be rolled back\n", unfinished); tprint(tracef, "%u transactions will be rolled back\n", uncommitted);
for( ; ; ) for( ; ; )
{ {
if (recovery_message_printed == REC_MSG_UNDO) if (recovery_message_printed == REC_MSG_UNDO)
fprintf(stderr, " %u", unfinished); fprintf(stderr, " %u", uncommitted);
if ((unfinished--) == 0) if ((uncommitted--) == 0)
break; break;
char llbuf[22]; char llbuf[22];
TRN *trn= trnman_get_any_trn(); TRN *trn= trnman_get_any_trn();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment