Commit 18bc7b69 authored by unknown's avatar unknown

WL#3072 - Maria Recovery

* to honour WAL we now force the whole log when flushing a bitmap page.
* ability to intentionally crash in various places for recovery testing
* bugfix (dirty pages list found in checkpoint record was ignored)
* smaller checkpoint record
* misc small cleanups and comments


mysql-test/include/maria_empty_logs.inc:
  maria-purge.test creates ~11 logs, remove them all
mysql-test/r/maria-recovery-bitmap.result:
  result is good; without the _ma_bitmap_get_log_address() call,
  we got
  check   error   Bitmap at 0 has pages reserved outside of data file length
mysql-test/r/maria-recovery.result:
  result update
mysql-test/t/maria-recovery-bitmap.test:
  enable test of "bitmap-flush should flush whole log otherwise
  corrupted data file (bitmap ahead of data pages)".
mysql-test/t/maria-recovery.test:
  test of checkpoint
sql/sql_table.cc:
  comment
storage/maria/ha_maria.cc:
  _ma_reenable_logging_for_table() now includes file->trn=0.
  At the end of repair() we don't need to re-enable logging, it is
  done already by caller (like copy_data_between_tables()); it sounds
  strange that this function could decide to re-enable, it should be
  up to caller who knows what other operations it plans. Removing this
  line led to assertion failure in maria_lock_database(F_UNLCK), fixed
  by removing the assertion: maria_lock_database()
  is here called in a context where F_UNLCK does not make the
  table visible to others so assertion is excessive, and external_lock()
  is already designed to honour the asserted condition.
  Ability to crash at the end of bulk insert when indices
  have been enabled.
storage/maria/ma_bitmap.c:
  Better use pagecache_file_init() than set pagecache callbacks directly;
  and a new function to set those callbacks for bitmap so that we can
  reuse it.
  _ma_bitmap_get_log_address() is a pagecache get_log_address callback
  which causes the whole log to be flushed when a bitmap page
  is flushed by the page cache. This was required by WAL.
storage/maria/ma_blockrec.c:
  get_log_address pagecache callback for data (non bitmap) pages:
  just reads the LSN from the page's content, like was hard-coded
  before in ma_pagecache.c.
storage/maria/ma_blockrec.h:
  functions which need to be exported
storage/maria/ma_check.c:
  create_new_data_handle() can be static.
  Ability to crash after rebuilding the index in OPTIMIZE,
  in REPAIR. my_lock() implemented already.
storage/maria/ma_checkpoint.c:
  As MARIA_SHARE* is now accessible to pagecache_collect_changed_blocks_LSN(),
  we don't need to store kfile/dfile descriptors in checkpoint record,
  2-byte-id of the table plus one byte to say if this is data or index
  file is enough. So we go from 4+4 bytes per table down to 2+1.
storage/maria/ma_commit.c:
  removing duplicate functions (see _ma_tmp_disable_logging_for_table())
storage/maria/ma_extra.c:
  Monty fixed
storage/maria/ma_key_recover.c:
  comment
storage/maria/ma_locking.c:
  Sometimes other code does funny things with maria_lock_database(),
  like ha_maria::repair() calling it at start and end without going
  through ha_maria::external_lock(). So it happens that maria_lock_database()
  is called with now_transactional!=born_transactional.
storage/maria/ma_loghandler.c:
  update to new prototype
storage/maria/ma_open.c:
  set_data|index_pagecache_callbacks() need to be exported as
  they are now called when disabling/enabling transactionality.
storage/maria/ma_pagecache.c:
  Removing PAGE_LSN_OFFSET, as much of the code relies on it being
  0 anyway (let's not give impression we can just change this constant).
  When flushing a page to disk, call the get_log_address callback to
  know up to which LSN the log should be flushed.
  As we now can access MARIA_SHARE* we can know share->id and store
  it into the checkpoint record; we thus go from 4 bytes per dirty page
  to 2+1.
storage/maria/ma_pagecache.h:
  get_log_address callback
storage/maria/ma_panic.c:
  No reason to reset pagecache callbacks in HA_PANIC_READ:
  all we do is reopen files if they were closed; callbacks should
  be in place already as 'info' exists; we just want to modify
  the file descriptors, not the full PAGECACHE_FILE structure.
  If we open data file and it was closed, share->bitmap.file needs
  to be set.
  Note that the modified code is disabled anyway.
storage/maria/ma_recovery.c:
  Checkpoint record does not contain kfile/dfile descriptors anymore
  so code can be simplified. Hash key in all_dirty_pages is 
  not made from file_descriptor & pageno anymore, but
  index_or_data & table-short-id & pageno.
  If a table's create_rename_lsn is higher than record's LSN,
  we skip the table and don't fail if it's corrupted (because the LSNs
  say that we don't have to look at this table).
  If a table is skipped (for example due to create_rename_lsn),
  its UNDOs still cause undo_lsn to advance; this is so that if later
  we notice the transaction has to rollback we fail (as table should
  not be skipped in this case).
  Fixing a bug: the dirty_pages list was never used, because
  the LSN below which it was used was the minimum rec_lsn of dirty pages!
  It is now the min(checkpoint_start_log_horizon, min(trn's rec_lsn)).
  When we disable/reenable transactionality, we modify pagecache
  callbacks (needed for example for get_log_address: changing
  share->page_type is not enough anymore).
storage/maria/ma_write.c:
  'records' and 'checksum' are protected: they are updated under
  log's mutex in write-hooks when UNDO is written.
storage/maria/maria_chk.c:
  remove use of duplicate functions.
storage/maria/maria_def.h:
  set_data|index_pagecache_callbacks() need to be exported;
  _ma_reenable_logging_for_table() changes to a real function.
storage/maria/unittest/ma_pagecache_consist.c:
  new prototype
storage/maria/unittest/ma_pagecache_single.c:
  new prototype
storage/maria/unittest/ma_test_loghandler_pagecache-t.c:
  new prototype
parent 630169c6
......@@ -12,11 +12,50 @@ EOF
--exec $MYSQLADMIN --no-defaults -S $MASTER_MYSOCK -P $MASTER_MYPORT -u root --password= shutdown 2>&1;
# Depending on what tests were run before, the number of logs
# may vary (maria-purge.test creates ~11 logs).
remove_file $MYSQLTEST_VARDIR/master-data/maria_log_control;
-- error 0,1
remove_file $MYSQLTEST_VARDIR/master-data/maria_log.00000001;
-- error 0,1 # maybe there is just one log
-- error 0,1
remove_file $MYSQLTEST_VARDIR/master-data/maria_log.00000002;
# Hope there were not more than these logs.
-- error 0,1
remove_file $MYSQLTEST_VARDIR/master-data/maria_log.00000003;
-- error 0,1
remove_file $MYSQLTEST_VARDIR/master-data/maria_log.00000004;
-- error 0,1
remove_file $MYSQLTEST_VARDIR/master-data/maria_log.00000005;
-- error 0,1
remove_file $MYSQLTEST_VARDIR/master-data/maria_log.00000006;
-- error 0,1
remove_file $MYSQLTEST_VARDIR/master-data/maria_log.00000007;
-- error 0,1
remove_file $MYSQLTEST_VARDIR/master-data/maria_log.00000008;
-- error 0,1
remove_file $MYSQLTEST_VARDIR/master-data/maria_log.00000009;
-- error 0,1
remove_file $MYSQLTEST_VARDIR/master-data/maria_log.00000010;
-- error 0,1
remove_file $MYSQLTEST_VARDIR/master-data/maria_log.00000011;
-- error 0,1
remove_file $MYSQLTEST_VARDIR/master-data/maria_log.00000012;
-- error 0,1
remove_file $MYSQLTEST_VARDIR/master-data/maria_log.00000013;
-- error 0,1
remove_file $MYSQLTEST_VARDIR/master-data/maria_log.00000014;
-- error 0,1
remove_file $MYSQLTEST_VARDIR/master-data/maria_log.00000015;
-- error 0,1
remove_file $MYSQLTEST_VARDIR/master-data/maria_log.00000016;
-- error 0,1
remove_file $MYSQLTEST_VARDIR/master-data/maria_log.00000017;
-- error 0,1
remove_file $MYSQLTEST_VARDIR/master-data/maria_log.00000018;
-- error 0,1
remove_file $MYSQLTEST_VARDIR/master-data/maria_log.00000019;
-- error 0,1
remove_file $MYSQLTEST_VARDIR/master-data/maria_log.00000020;
# hope there are not more than these logs...
-- error 0,1
remove_file $MYSQLTEST_VARDIR/master-data/maria_recovery.trace;
......
......@@ -25,5 +25,23 @@ mysqltest.t1 check status OK
Checksum-check
ok
use mysqltest;
* TEST of bitmap flushed without REDO-UNDO in the log (WAL violation)
flush table t1;
* copied t1 for comparison
lock tables t1 write;
insert into t1 values (REPEAT('a', 6000));
SET SESSION debug="+d,maria_flush_bitmap,maria_crash";
* crashing mysqld intentionally
set global maria_checkpoint_interval=1;
ERROR HY000: Lost connection to MySQL server during query
* recovery happens
check table t1 extended;
Table Op Msg_type Msg_text
mysqltest.t1 check status OK
* testing that checksum after recovery is as expected
Checksum-check
ok
use mysqltest;
drop table t1;
drop database mysqltest_for_comparison;
drop database mysqltest;
......@@ -214,6 +214,37 @@ t1 CREATE TABLE `t1` (
KEY `c` (`c`)
) ENGINE=MARIA AUTO_INCREMENT=16 DEFAULT CHARSET=latin1
drop table t1;
* TEST of checkpoint
set global debug="+d,info,query,enter,exit,loop,maria_checkpoint_indirect";
set global maria_checkpoint_interval=10000;
create table t1(a int, b varchar(10), index(a,b)) engine=maria;
insert into t1 values(1,"a"),(2,"b"),(3,"c");
delete from t1 where b="b";
update t1 set b="d" where a=1;
flush table t1;
* copied t1 for comparison
lock tables t1 write;
insert into t1 values(4,"e"),(5,"f"),(6,"g");
update t1 set b="h" where a=5;
delete from t1 where b="g";
show status like "Maria_pagecache_blocks_not_flushed";
Variable_name Value
Maria_pagecache_blocks_not_flushed 3
set global maria_checkpoint_interval=10000;
update t1 set b="i" where a=5;
SET SESSION debug="+d,maria_crash";
* crashing mysqld intentionally
set global maria_checkpoint_interval=1;
ERROR HY000: Lost connection to MySQL server during query
* recovery happens
check table t1 extended;
Table Op Msg_type Msg_text
mysqltest.t1 check status OK
* testing that checksum after recovery is as expected
Checksum-check
ok
use mysqltest;
drop table t1;
drop database mysqltest_for_feeding_recovery;
drop database mysqltest_for_comparison;
drop database mysqltest;
......@@ -57,10 +57,6 @@ sleep 5;
set global maria_checkpoint_interval=1;
-- source include/maria_verify_recovery.inc
# disabled until pagecache callback framework is coded at which point
# we can add a get_lsn() callback for bitmaps, fixing the below bug.
if (0)
{
--echo * TEST of bitmap flushed without REDO-UNDO in the log (WAL violation)
# before crashing we'll flush the bitmap page
let $mvr_debug_option="+d,maria_flush_bitmap,maria_crash";
......@@ -71,7 +67,6 @@ insert into t1 values (REPEAT('a', 6000));
# log is not flushed the bitmap is inconsistent with the data.
-- source include/maria_verify_recovery.inc
drop table t1;
}
# clean up everything
let $mms_purpose=comparison;
......
......@@ -179,6 +179,35 @@ let $mvr_crash_statement= set global maria_checkpoint_interval=1;
show create table t1;
drop table t1;
# A basic checkpoint test
--echo * TEST of checkpoint
# Don't take a full checkpoints, we want to test checkpoint vs dirty pages
set global debug="+d,info,query,enter,exit,loop,maria_checkpoint_indirect";
# restart checkpoint thread for it to notice the above
set global maria_checkpoint_interval=10000;
create table t1(a int, b varchar(10), index(a,b)) engine=maria;
insert into t1 values(1,"a"),(2,"b"),(3,"c");
delete from t1 where b="b";
update t1 set b="d" where a=1;
-- source include/maria_make_snapshot_for_comparison.inc
lock tables t1 write;
insert into t1 values(4,"e"),(5,"f"),(6,"g");
update t1 set b="h" where a=5;
delete from t1 where b="g";
show status like "Maria_pagecache_blocks_not_flushed";
# force a checkpoint; there should be dirty pages and an open transaction
set global maria_checkpoint_interval=10000;
# do some more work
update t1 set b="i" where a=5;
let $mvr_restore_old_snapshot=0;
let $mms_compare_physically=0;
let $mvr_debug_option="+d,maria_crash";
let $mvr_crash_statement= set global maria_checkpoint_interval=1;
# Now we have a recovery, which should use the checkpoint record
# and its dirty pages list.
-- source include/maria_verify_recovery.inc
drop table t1;
# clean up everything
let $mms_purpose=feeding_recovery;
eval drop database mysqltest_for_$mms_purpose;
......
......@@ -3706,6 +3706,7 @@ static void wait_while_table_is_used(THD *thd,TABLE *table,
remove_table_from_cache(thd, table->s->db.str,
table->s->table_name.str,
RTFC_WAIT_OTHER_THREAD_FLAG);
/* extra() call must come only after all instances above are closed */
VOID(table->file->extra(function));
DBUG_VOID_RETURN;
}
......
......@@ -120,8 +120,8 @@ static MYSQL_SYSVAR_ULONG(block_size, maria_block_size,
static MYSQL_SYSVAR_ULONG(checkpoint_interval, checkpoint_interval,
PLUGIN_VAR_RQCMDARG,
"Interval between automatic checkpoints, in seconds;"
" 0 means 'no automatic checkpoints'.",
"Interval between automatic checkpoints, in seconds; 0 means"
" 'no automatic checkpoints' which makes sense only for testing.",
NULL, update_checkpoint_interval, 30, 0, UINT_MAX, 1);
static MYSQL_SYSVAR_BOOL(page_checksum, maria_page_checksums, 0,
......@@ -1250,6 +1250,7 @@ int ha_maria::repair(THD *thd, HA_CHECK &param, bool do_optimize)
DBUG_RETURN(HA_ADMIN_FAILED);
}
/** @todo BUG the if() below is always false for BLOCK_RECORD */
if (!do_optimize ||
((file->state->del ||
((file->s->data_file_type != BLOCK_RECORD) &&
......@@ -1294,6 +1295,12 @@ int ha_maria::repair(THD *thd, HA_CHECK &param, bool do_optimize)
{
thd->proc_info= "Repair with keycache";
param.testflag &= ~(T_REP_BY_SORT | T_REP_PARALLEL);
/**
@todo In REPAIR TABLE EXTENDED this will log
REDO_INDEX_NEW_PAGE and UNDO_KEY_INSERT though unneeded.
maria_chk -o does not have this problem as it disables
transactionality.
*/
error= maria_repair(&param, file, fixed_name, param.testflag & T_QUICK);
/**
@todo RECOVERY BUG we do things with the index file
......@@ -1367,15 +1374,7 @@ int ha_maria::repair(THD *thd, HA_CHECK &param, bool do_optimize)
pthread_mutex_unlock(&share->intern_lock);
thd->proc_info= old_proc_info;
if (!thd->locked_tables)
{
/**
@todo RECOVERY BUG find why this is needed. Monty says it's because a
new non-transactional table is created by maria_repair(): find how this
new table's state influences the old one's.
*/
_ma_reenable_logging_for_table(file->s);
maria_lock_database(file, F_UNLCK);
}
DBUG_RETURN(error ? HA_ADMIN_FAILED :
!optimize_done ? HA_ADMIN_ALREADY_DONE : HA_ADMIN_OK);
}
......@@ -1627,6 +1626,17 @@ int ha_maria::enable_indexes(uint mode)
/* mode not implemented */
error= HA_ERR_WRONG_COMMAND;
}
DBUG_EXECUTE_IF("maria_flush_whole_log",
{
DBUG_PRINT("maria_flush_whole_log", ("now"));
translog_flush(translog_get_horizon());
});
DBUG_EXECUTE_IF("maria_crash_enable_index",
{
DBUG_PRINT("maria_crash_enable_index", ("now"));
fflush(DBUG_FILE);
abort();
});
return error;
}
......@@ -1698,6 +1708,11 @@ void ha_maria::start_bulk_insert(ha_rows rows)
{
maria_init_bulk_insert(file, thd->variables.bulk_insert_buff_size, rows);
}
/**
@todo If we have 0 records here, there is no need to log REDO/UNDO for
each data row, we can just log some special UNDO which will empty the
data file if need to rollback.
*/
}
DBUG_VOID_RETURN;
}
......@@ -2097,8 +2112,8 @@ int ha_maria::external_lock(THD *thd, int lock_type)
}
else
{
_ma_reenable_logging_for_table(file->s);
this->file->trn= 0; /* TODO: remove it also in commit and rollback */
_ma_reenable_logging_for_table(file);
/** @todo zero file->trn also in commit and rollback */
if (trn && trnman_has_locked_tables(trn))
{
if (!trnman_decrement_locked_tables(trn))
......
......@@ -133,7 +133,7 @@
static my_bool _ma_read_bitmap_page(MARIA_SHARE *share,
MARIA_FILE_BITMAP *bitmap,
ulonglong page);
static TRANSLOG_ADDRESS _ma_bitmap_get_log_address();
/* Write bitmap page to key cache */
......@@ -221,20 +221,7 @@ my_bool _ma_bitmap_init(MARIA_SHARE *share, File file)
bitmap->block_size= share->block_size;
bitmap->file.file= file;
bitmap->file.callback_data= (uchar*) share;
if (share->temporary)
{
bitmap->file.read_callback= &maria_page_crc_check_none;
bitmap->file.write_callback= &maria_page_filler_set_none;
}
else
{
bitmap->file.read_callback= &maria_page_crc_check_bitmap;
if (share->options & HA_OPTION_PAGE_CHECKSUM)
bitmap->file.write_callback= &maria_page_crc_set_normal;
else
bitmap->file.write_callback= &maria_page_filler_set_bitmap;
}
_ma_bitmap_set_pagecache_callbacks(&bitmap->file, share);
/* Size needs to be aligned on 6 */
aligned_bit_blocks= (share->block_size - PAGE_SUFFIX_SIZE) / 6;
......@@ -2507,3 +2494,49 @@ int _ma_bitmap_create_first(MARIA_SHARE *share)
_ma_bitmap_delete_all(share);
return 0;
}
/**
@brief Pagecache callback to get the TRANSLOG_ADDRESS to flush up to, when a
bitmap page needs to be flushed.
@param page Page's content
@param page_no Page's number (<offset>/<page length>)
@param data_ptr Callback data pointer (pointer to MARIA_SHARE)
@retval TRANSLOG_ADDRESS to flush up to.
*/
TRANSLOG_ADDRESS
_ma_bitmap_get_log_address(uchar *page __attribute__((unused)),
pgcache_page_no_t page_no __attribute__((unused)),
uchar* data_ptr)
{
#ifndef DBUG_OFF
const MARIA_SHARE *share= (MARIA_SHARE*)data_ptr;
#endif
DBUG_ENTER("_ma_bitmap_get_log_address");
DBUG_ASSERT(share->page_type == PAGECACHE_LSN_PAGE &&
share->now_transactional);
/*
WAL imposes that UNDOs reach disk before bitmap is flushed. We don't know
the LSN of the last UNDO about this bitmap page, so we flush whole log.
*/
DBUG_RETURN(translog_get_horizon());
}
void _ma_bitmap_set_pagecache_callbacks(PAGECACHE_FILE *file,
MARIA_SHARE *share)
{
if (share->temporary)
pagecache_file_init(*file, &maria_page_crc_check_none,
&maria_page_filler_set_none, NULL, share);
else
pagecache_file_init(*file, &maria_page_crc_check_bitmap,
((share->options & HA_OPTION_PAGE_CHECKSUM) ?
&maria_page_crc_set_normal :
&maria_page_filler_set_bitmap),
share->now_transactional ?
&_ma_bitmap_get_log_address : NULL, share);
}
......@@ -2597,6 +2597,15 @@ static my_bool write_block_record(MARIA_HA *info,
This is the char/varchar data that didn't fit into the head page.
*/
DBUG_ASSERT(bitmap_blocks->count != 0);
/**
@todo RECOVERY BUG
we are tagging full pages with trn->undo_lsn, but we could here be
executing an UNDO_ROW_DELETE/UPDATE, in which case the above LSN is the
LSN of the UNDO before this UNDO; we should rather use the CLR's LSN in
this case. Is this really causing a bug now?
Simple solution is to use 'lsn' set above. It is always LSN of newly
written UNDO or CLR. Monty may have fixed it already.
*/
if (write_full_pages(info, info->trn->undo_lsn, head_block + 1,
info->rec_buff, (ulong) (tmp_data - info->rec_buff)))
goto disk_err;
......@@ -2618,6 +2627,7 @@ static my_bool write_block_record(MARIA_HA *info,
if (block[block->sub_blocks - 1].used & BLOCKUSED_TAIL)
blob_length-= (blob_length % FULL_PAGE_SIZE(block_size));
/** @todo RECOVERY BUG same as above */
if (blob_length && write_full_pages(info, info->trn->undo_lsn, block,
blob_pos, blob_length))
goto disk_err;
......@@ -6098,3 +6108,28 @@ err:
my_free(current_record, MYF(0));
DBUG_RETURN(error);
}
/**
@brief Pagecache callback to get the TRANSLOG_ADDRESS to flush up to, when a
data (non-bitmap) or index page needs to be flushed. Returns a real LSN.
@param page Page's content
@param page_no Page's number (<offset>/<page length>)
@param data_ptr Callback data pointer (pointer to MARIA_SHARE)
@retval LSN to flush up to
*/
TRANSLOG_ADDRESS
maria_page_get_lsn(uchar *page,
pgcache_page_no_t page_no __attribute__((unused)),
uchar* data_ptr __attribute__((unused)))
{
#ifndef DBUG_OFF
const MARIA_SHARE *share= (MARIA_SHARE*)data_ptr;
DBUG_ASSERT(share->page_type == PAGECACHE_LSN_PAGE &&
share->now_transactional);
#endif
return lsn_korr(page);
}
......@@ -170,6 +170,8 @@ my_bool _ma_write_block_record(MARIA_HA *info, const uchar *record);
my_bool _ma_write_abort_block_record(MARIA_HA *info);
my_bool _ma_compare_block_record(register MARIA_HA *info,
register const uchar *record);
TRANSLOG_ADDRESS
maria_page_get_lsn(uchar *page, pgcache_page_no_t page_no, uchar* data_ptr);
/* ma_bitmap.c */
my_bool _ma_bitmap_init(MARIA_SHARE *share, File file);
......@@ -204,6 +206,8 @@ my_bool _ma_check_if_right_bitmap_type(MARIA_HA *info,
void _ma_bitmap_delete_all(MARIA_SHARE *share);
int _ma_bitmap_create_first(MARIA_SHARE *share);
void _ma_bitmap_flushable(MARIA_SHARE *share, int non_flushable_inc);
void _ma_bitmap_set_pagecache_callbacks(PAGECACHE_FILE *file,
MARIA_SHARE *share);
#ifndef DBUG_OFF
void _ma_print_bitmap(MARIA_FILE_BITMAP *bitmap, uchar *data,
ulonglong page);
......
......@@ -94,7 +94,7 @@ static void copy_data_file_state(MARIA_STATE_INFO *to,
MARIA_STATE_INFO *from);
static int write_log_record_for_repair(const HA_CHECK *param, MARIA_HA *info);
static void report_keypage_fault(HA_CHECK *param, my_off_t position);
my_bool create_new_data_handle(MARIA_SORT_PARAM *param, File new_file);
static my_bool create_new_data_handle(MARIA_SORT_PARAM *param, File new_file);
void maria_chk_init(HA_CHECK *param)
......@@ -2361,6 +2361,11 @@ int maria_repair(HA_CHECK *param, register MARIA_HA *info,
VOID(end_io_cache(&sort_info.new_info->rec_cache));
info->opt_flag&= ~WRITE_CACHE_USED;
/**
@todo RECOVERY BUG seems misplaced in some cases. We modify state after
writing it below. But if we move the call below too much down, flushing
of pages may happen too late, after files have been closed.
*/
if (_ma_flush_table_files_after_repair(param, info))
goto err;
......@@ -2614,15 +2619,16 @@ void maria_lock_memory(HA_CHECK *param __attribute__((unused)))
int _ma_flush_table_files_after_repair(HA_CHECK *param, MARIA_HA *info)
{
MARIA_SHARE *share= info->s;
DBUG_ENTER("_ma_flush_table_files_after_repair");
if (_ma_flush_table_files(info, MARIA_FLUSH_DATA | MARIA_FLUSH_INDEX,
FLUSH_RELEASE, FLUSH_RELEASE) ||
_ma_state_info_write(share, 1|4) ||
(share->base.born_transactional && _ma_sync_table_files(info)))
{
_ma_check_print_error(param,"%d when trying to write bufferts",my_errno);
return 1;
DBUG_RETURN(1);
}
return 0;
DBUG_RETURN(0);
} /* _ma_flush_table_files_after_repair */
......@@ -2720,6 +2726,17 @@ int maria_sort_index(HA_CHECK *param, register MARIA_HA *info, char *name)
share->state.key_del= HA_OFFSET_ERROR;
share->state.changed&= ~STATE_NOT_SORTED_PAGES;
DBUG_EXECUTE_IF("maria_flush_whole_log",
{
DBUG_PRINT("maria_flush_whole_log", ("now"));
translog_flush(translog_get_horizon());
});
DBUG_EXECUTE_IF("maria_crash_sort_index",
{
DBUG_PRINT("maria_crash_sort_index", ("now"));
fflush(DBUG_FILE);
abort();
});
DBUG_RETURN(0);
err:
......@@ -3134,6 +3151,17 @@ int maria_repair_by_sort(HA_CHECK *param, register MARIA_HA *info,
param->retry_repair=1;
goto err;
}
DBUG_EXECUTE_IF("maria_flush_whole_log",
{
DBUG_PRINT("maria_flush_whole_log", ("now"));
translog_flush(translog_get_horizon());
});
DBUG_EXECUTE_IF("maria_crash_create_index_by_sort",
{
DBUG_PRINT("maria_crash_create_index_by_sort", ("now"));
fflush(DBUG_FILE);
abort();
});
if (scan_inited)
{
scan_inited= 0;
......@@ -3174,6 +3202,7 @@ int maria_repair_by_sort(HA_CHECK *param, register MARIA_HA *info,
}
}
/** @todo RECOVERY BUG seems misplaced in some cases */
if (_ma_flush_table_files_after_repair(param, info))
goto err;
......@@ -3312,6 +3341,17 @@ err:
Now that we have flushed and forced everything, we can bump
create_rename_lsn:
*/
DBUG_EXECUTE_IF("maria_flush_whole_log",
{
DBUG_PRINT("maria_flush_whole_log", ("now"));
translog_flush(translog_get_horizon());
});
DBUG_EXECUTE_IF("maria_crash_repair",
{
DBUG_PRINT("maria_crash_repair", ("now"));
fflush(DBUG_FILE);
abort();
});
write_log_record_for_repair(param, info);
}
share->state.changed|= STATE_NOT_SORTED_PAGES;
......@@ -3789,6 +3829,7 @@ err:
*/
if (!rep_quick)
VOID(end_io_cache(&new_data_cache));
/** @todo RECOVERY BUG seems misplaced in some cases */
got_error|= _ma_flush_table_files_after_repair(param, info);
if (!got_error)
{
......@@ -5588,7 +5629,7 @@ my_bool maria_test_if_sort_rep(MARIA_HA *info, ha_rows rows,
because the one we create here is not transactional
*/
my_bool create_new_data_handle(MARIA_SORT_PARAM *param, File new_file)
static my_bool create_new_data_handle(MARIA_SORT_PARAM *param, File new_file)
{
MARIA_SORT_INFO *sort_info= param->sort_info;
......@@ -5604,11 +5645,11 @@ my_bool create_new_data_handle(MARIA_SORT_PARAM *param, File new_file)
pagecache_file_init(new_info->s->bitmap.file, &maria_page_crc_check_bitmap,
(new_info->s->options & HA_OPTION_PAGE_CHECKSUM ?
&maria_page_crc_set_normal :
&maria_page_filler_set_bitmap), new_info->s);
&maria_page_filler_set_bitmap), NULL, new_info->s);
pagecache_file_init(new_info->dfile, &maria_page_crc_check_data,
(new_info->s->options & HA_OPTION_PAGE_CHECKSUM ?
&maria_page_crc_set_normal :
&maria_page_filler_set_normal), new_info->s);
&maria_page_filler_set_normal), NULL, new_info->s);
change_data_file_descriptor(new_info, new_file);
maria_lock_database(new_info, F_EXTRA_LCK);
if ((sort_info->param->testflag & T_UNPACK) &&
......@@ -5913,11 +5954,6 @@ static int write_log_record_for_repair(const HA_CHECK *param, MARIA_HA *info)
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
share->now_transactional= 1;
/**
@todo RECOVERY maria_chk --transaction-log may come here; to be sure
that ha_maria is not using the log too, we should do a my_lock() on the
control file when Maria starts.
*/
if (unlikely(translog_write_record(&lsn, LOGREC_REDO_REPAIR_TABLE,
&dummy_transaction_object, info,
sizeof(log_data),
......
......@@ -157,6 +157,7 @@ static int really_execute_checkpoint(void)
TRANSLOG_ADDRESS checkpoint_start_log_horizon;
char checkpoint_start_log_horizon_char[LSN_STORE_SIZE];
DBUG_ENTER("really_execute_checkpoint");
DBUG_PRINT("enter", ("level: %d", checkpoint_in_progress));
bzero(&record_pieces, sizeof(record_pieces));
/*
......@@ -389,6 +390,10 @@ static void flush_all_tables(int what_to_flush)
void ma_checkpoint_end(void)
{
DBUG_ENTER("ma_checkpoint_end");
/*
Some intentional crash methods, usually triggered by
SET MARIA_CHECKPOINT_INTERVAL=X
*/
DBUG_EXECUTE_IF("maria_flush_bitmap",
{
DBUG_PRINT("maria_flush_bitmap", ("now"));
......@@ -708,11 +713,15 @@ pthread_handler_t ma_checkpoint_background(void *arg)
}
pthread_mutex_unlock(&LOCK_checkpoint);
DBUG_PRINT("info",("Maria background checkpoint thread ends"));
/*
That's the final one, which guarantees that a clean shutdown always ends
with a checkpoint.
*/
ma_checkpoint_execute(CHECKPOINT_FULL, FALSE);
{
CHECKPOINT_LEVEL level= CHECKPOINT_FULL;
/*
That's the final one, which guarantees that a clean shutdown always ends
with a checkpoint.
*/
DBUG_EXECUTE_IF("maria_checkpoint_indirect", level= CHECKPOINT_INDIRECT;);
ma_checkpoint_execute(level, FALSE);
}
pthread_mutex_lock(&LOCK_checkpoint);
checkpoint_thread_die= 2; /* indicate that we are dead */
/* wake up ma_checkpoint_end() which may be waiting for our death */
......@@ -824,8 +833,6 @@ static int collect_tables(LEX_STRING *str, LSN checkpoint_start_log_horizon)
str->length=
4 + /* number of tables */
(2 + /* short id */
4 + /* kfile */
4 + /* dfile */
LSN_STORE_SIZE + /* first_log_write_at_lsn */
1 /* end-of-name 0 */
) * nb + total_names_length;
......@@ -982,19 +989,6 @@ static int collect_tables(LEX_STRING *str, LSN checkpoint_start_log_horizon)
nb_stored++;
int2store(ptr, share->id);
ptr+= 2;
/*
We must store the OS file descriptors, because the pagecache, which
tells us the list of dirty pages, refers to these pages by OS file
descriptors. An alternative is to make the page cache aware of the
2-byte id and of the location of a page ("is it a data file page or an
index file page?").
If one descriptor is -1, normally there should be no dirty pages
collected for this file, it's ok to store -1, it will not be used.
*/
int4store(ptr, kfile.file);
ptr+= 4;
int4store(ptr, dfile.file);
ptr+= 4;
lsn_store(ptr, share->lsn_of_file_id);
ptr+= LSN_STORE_SIZE;
/*
......
......@@ -116,26 +116,3 @@ int maria_begin(MARIA_HA *info)
}
DBUG_RETURN(0);
}
/*
@brief Disable logging for this table
@note
Mainly used during repair table, where we don't want to log all
changes to index or rows
*/
void maria_disable_logging(MARIA_HA *info)
{
info->s->now_transactional= 0;
info->trn= &dummy_transaction_object;
info->s->page_type= PAGECACHE_PLAIN_PAGE;
}
void maria_enable_logging(MARIA_HA *info)
{
if ((info->s->now_transactional= info->s->base.born_transactional))
info->s->page_type= PAGECACHE_LSN_PAGE;
}
......@@ -338,10 +338,8 @@ int maria_extra(MARIA_HA *info, enum ha_extra_function function,
if (_ma_state_info_write(share, 1 | 2) ||
my_sync(share->kfile.file, MYF(0)))
error= my_errno;
#ifdef ASK_MONTY /* see same tag in HA_EXTRA_FORCE_REOPEN */
else
share->changed= 0;
#endif
}
else
{
......
......@@ -32,6 +32,9 @@
@param undo_lsn LSN for undo pages. LSN_IMPOSSIBLE if we shouldn't write
undo (like on duplicate key errors)
info->pinned_pages is the list of pages to unpin. Each member of the list
must have its 'changed' saying if the page was changed or not.
@note
We unpin pages in the reverse order as they where pinned; This may not
be strictly necessary but may simplify things in the future.
......
......@@ -135,13 +135,6 @@ int maria_lock_database(MARIA_HA *info, int lock_type)
}
info->opt_flag&= ~(READ_CACHE_USED | WRITE_CACHE_USED);
info->lock_type= F_UNLCK;
/*
Verify that user of the table cleaned up after itself. Not in
recovery, as for example maria_extra(HA_EXTRA_PREPARE_FOR_RENAME) may
call us here, with transactionality temporarily disabled.
*/
DBUG_ASSERT(maria_in_recovery ||
share->now_transactional == share->base.born_transactional);
break;
case F_RDLCK:
if (info->lock_type == F_WRLCK)
......
......@@ -1342,7 +1342,7 @@ static void translog_file_init(TRANSLOG_FILE *file, uint32 number,
my_bool is_sync)
{
pagecache_file_init(file->handler, &translog_page_validator,
&translog_dummy_callback, file);
&translog_dummy_callback, NULL, file);
file->number= number;
file->was_recovered= 0;
file->is_sync= is_sync;
......
......@@ -36,11 +36,6 @@ static my_bool maria_once_init_dummy(MARIA_SHARE *, File);
static my_bool maria_once_end_dummy(MARIA_SHARE *);
static uchar *_ma_base_info_read(uchar *ptr, MARIA_BASE_INFO *base);
static uchar *_ma_state_info_read(uchar *ptr, MARIA_STATE_INFO *state);
static void set_data_pagecache_callbacks(PAGECACHE_FILE *file,
MARIA_SHARE *share);
static void set_index_pagecache_callbacks(PAGECACHE_FILE *file,
MARIA_SHARE *share);
#define get_next_element(to,pos,size) { memcpy((char*) to,pos,(size_t) size); \
pos+=size;}
......@@ -1534,43 +1529,40 @@ uchar *_ma_column_nr_read(uchar *ptr, uint16 *offsets, uint columns)
}
static void set_data_pagecache_callbacks(PAGECACHE_FILE *file,
MARIA_SHARE *share)
void set_data_pagecache_callbacks(PAGECACHE_FILE *file, MARIA_SHARE *share)
{
file->callback_data= (uchar*) share;
/*
Note that non-BLOCK_RECORD formats don't use the pagecache for their data
files, so it does not matter that maria_page* calls are passed below for
them. On the other hand, index file can always have page CRCs, for all
data formats.
*/
if (share->temporary)
{
file->read_callback= &maria_page_crc_check_none;
file->write_callback= &maria_page_filler_set_none;
}
pagecache_file_init(*file, &maria_page_crc_check_none,
&maria_page_filler_set_none, NULL, share);
else
{
file->read_callback= &maria_page_crc_check_data;
if (share->options & HA_OPTION_PAGE_CHECKSUM)
file->write_callback= &maria_page_crc_set_normal;
else
file->write_callback= &maria_page_filler_set_normal;
}
pagecache_file_init(*file, &maria_page_crc_check_data,
((share->options & HA_OPTION_PAGE_CHECKSUM) ?
&maria_page_crc_set_normal :
&maria_page_filler_set_normal),
share->now_transactional ?
&maria_page_get_lsn : NULL, share);
}
static void set_index_pagecache_callbacks(PAGECACHE_FILE *file,
MARIA_SHARE *share)
void set_index_pagecache_callbacks(PAGECACHE_FILE *file, MARIA_SHARE *share)
{
file->callback_data= (uchar*) share;
if (share->temporary)
{
file->read_callback= &maria_page_crc_check_none;
file->write_callback= &maria_page_filler_set_none;
}
pagecache_file_init(*file, &maria_page_crc_check_none,
&maria_page_filler_set_none, NULL, share);
else
{
file->read_callback= &maria_page_crc_check_index;
if (share->options & HA_OPTION_PAGE_CHECKSUM)
file->write_callback= &maria_page_crc_set_index;
else
file->write_callback= &maria_page_filler_set_normal;
}
pagecache_file_init(*file, &maria_page_crc_check_index,
((share->options & HA_OPTION_PAGE_CHECKSUM) ?
&maria_page_crc_set_index :
&maria_page_filler_set_normal),
share->now_transactional ?
&maria_page_get_lsn : NULL,
share);
}
......
......@@ -42,6 +42,7 @@
#include "maria_def.h"
#include <m_string.h>
#include "ma_pagecache.h"
#include "ma_blockrec.h"
#include <my_bit.h>
#include <errno.h>
......@@ -124,9 +125,6 @@ my_bool my_disable_flush_pagecache_blocks= 0;
#define COND_FOR_WRLOCK 2 /* queue of write lock */
#define COND_SIZE 3 /* number of COND_* queues */
/* offset of LSN on the page */
#define PAGE_LSN_OFFSET 0
typedef pthread_cond_t KEYCACHE_CONDVAR;
/* descriptor of the page in the page cache block buffer */
......@@ -574,7 +572,7 @@ static int ___pagecache_pthread_cond_signal(pthread_cond_t *cond);
#define pagecache_pthread_cond_signal pthread_cond_signal
#endif /* defined(PAGECACHE_DEBUG) */
extern my_bool translog_flush(LSN lsn);
extern my_bool translog_flush(TRANSLOG_ADDRESS lsn);
/*
Write page to the disk
......@@ -599,26 +597,24 @@ static uint pagecache_fwrite(PAGECACHE *pagecache,
enum pagecache_page_type type,
myf flags)
{
TRANSLOG_ADDRESS (*addr_callback)
(uchar *page, pgcache_page_no_t offset, uchar *data)=
filedesc->get_log_address_callback;
DBUG_ENTER("pagecache_fwrite");
DBUG_ASSERT(type != PAGECACHE_READ_UNKNOWN_PAGE);
/**
@todo RECOVERY BUG Here, we should call a callback get_lsn(): it will use
lsn_korr() for LSN pages, and translog_get_horizon() for bitmap pages.
*/
if (type == PAGECACHE_LSN_PAGE)
if (addr_callback != NULL)
{
LSN lsn;
TRANSLOG_ADDRESS addr=
(*addr_callback)(buffer, pageno, filedesc->callback_data);
DBUG_PRINT("info", ("Log handler call"));
/* TODO: integrate with page format */
lsn= lsn_korr(buffer + PAGE_LSN_OFFSET);
DBUG_ASSERT(LSN_VALID(lsn));
if (translog_flush(lsn))
DBUG_ASSERT(LSN_VALID(addr));
if (translog_flush(addr))
DBUG_RETURN(1);
}
DBUG_PRINT("info", ("write_callback: 0x%lx data: 0x%lx",
(ulong) filedesc->write_callback,
(ulong) filedesc->callback_data));
if ((filedesc->write_callback)(buffer, pageno, filedesc->callback_data))
if ((*filedesc->write_callback)(buffer, pageno, filedesc->callback_data))
{
DBUG_PRINT("error", ("write callback problem"));
DBUG_RETURN(1);
......@@ -2527,14 +2523,14 @@ static void check_and_set_lsn(PAGECACHE *pagecache,
to not log REDOs).
*/
DBUG_ASSERT((block->type == PAGECACHE_LSN_PAGE) || maria_in_recovery);
old= lsn_korr(block->buffer + PAGE_LSN_OFFSET);
old= lsn_korr(block->buffer);
DBUG_PRINT("info", ("old lsn: (%lu, 0x%lx) new lsn: (%lu, 0x%lx)",
LSN_IN_PARTS(old), LSN_IN_PARTS(lsn)));
if (cmp_translog_addr(lsn, old) > 0)
{
DBUG_ASSERT(block->type != PAGECACHE_READ_UNKNOWN_PAGE);
lsn_store(block->buffer + PAGE_LSN_OFFSET, lsn);
lsn_store(block->buffer, lsn);
/* we stored LSN in page so we dirtied it */
if (!(block->status & PCBLOCK_CHANGED))
link_to_changed_list(pagecache, block);
......@@ -2948,7 +2944,7 @@ uchar *pagecache_read(PAGECACHE *pagecache,
int error= 0;
enum pagecache_page_pin pin= lock_to_pin[test(buff==0)][lock];
PAGECACHE_BLOCK_LINK *fake_link;
DBUG_ENTER("pagecache_valid_read");
DBUG_ENTER("pagecache_read");
DBUG_PRINT("enter", ("fd: %u page: %lu buffer: 0x%lx level: %u "
"t:%s %s %s",
(uint) file->file, (ulong) pageno,
......@@ -3676,8 +3672,8 @@ static int flush_cached_blocks(PAGECACHE *pagecache,
block->pins));
DBUG_ASSERT(block->pins == 1);
/**
@todo If page is contiguous with next page to flush, group flushes in
one single my_pwrite().
@todo IO If page is contiguous with next page to flush, group flushes
in one single my_pwrite().
*/
error= pagecache_fwrite(pagecache, &block->hash_link->file,
block->buffer,
......@@ -4190,7 +4186,7 @@ my_bool pagecache_collect_changed_blocks_with_lsn(PAGECACHE *pagecache,
wqueue_add_to_queue(&other_flusher->flush_queue, thread);
do
{
KEYCACHE_DBUG_PRINT("pagecache_collect_çhanged_blocks_with_lsn: wait",
KEYCACHE_DBUG_PRINT("pagecache_collect_changed_blocks_with_lsn: wait",
("suspend thread %ld", thread->id));
pagecache_pthread_cond_wait(&thread->suspend,
&pagecache->cache_lock);
......@@ -4214,6 +4210,7 @@ my_bool pagecache_collect_changed_blocks_with_lsn(PAGECACHE *pagecache,
*/
DBUG_ASSERT(block->hash_link != NULL);
DBUG_ASSERT(block->status & PCBLOCK_CHANGED);
/* Note that we don't store bitmap pages */
if (block->type != PAGECACHE_LSN_PAGE)
continue; /* no need to store it */
stored_list_size++;
......@@ -4222,7 +4219,8 @@ my_bool pagecache_collect_changed_blocks_with_lsn(PAGECACHE *pagecache,
compile_time_assert(sizeof(pagecache->blocks) <= 8);
str->length= 8 + /* number of dirty pages */
(4 + /* file */
(2 + /* table id */
1 + /* data or index file */
4 + /* pageno */
LSN_STORE_SIZE /* rec_lsn */
) * stored_list_size;
......@@ -4231,7 +4229,8 @@ my_bool pagecache_collect_changed_blocks_with_lsn(PAGECACHE *pagecache,
ptr= str->str;
int8store(ptr, (ulonglong)stored_list_size);
ptr+= 8;
if (!stored_list_size)
DBUG_PRINT("info", ("found %lu dirty pages", stored_list_size));
if (stored_list_size == 0)
goto end;
for (file_hash= 0; file_hash < PAGECACHE_CHANGED_BLOCKS_HASH; file_hash++)
{
......@@ -4240,16 +4239,17 @@ my_bool pagecache_collect_changed_blocks_with_lsn(PAGECACHE *pagecache,
block;
block= block->next_changed)
{
uint16 table_id;
MARIA_SHARE *share;
if (block->type != PAGECACHE_LSN_PAGE)
continue; /* no need to store it in the checkpoint record */
compile_time_assert(sizeof(block->hash_link->file.file) <= 4);
compile_time_assert(sizeof(block->hash_link->pageno) <= 4);
/**
@todo RECOVERY when we have a pointer to MARIA_SHARE, store share->id
instead of this file.
*/
int4store(ptr, block->hash_link->file.file);
ptr+= 4;
share= (MARIA_SHARE *)(block->hash_link->file.callback_data);
table_id= share->id;
int2store(ptr, table_id);
ptr+= 2;
ptr[0]= (share->kfile.file == block->hash_link->file.file);
ptr++;
int4store(ptr, block->hash_link->pageno);
ptr+= 4;
lsn_store(ptr, block->rec_lsn);
......
......@@ -81,10 +81,15 @@ typedef uint32 pgcache_page_no_t;
typedef struct st_pagecache_file
{
File file;
/** Cannot be NULL */
my_bool (*read_callback)(uchar *page, pgcache_page_no_t offset,
uchar *data);
/** Cannot be NULL */
my_bool (*write_callback)(uchar *page, pgcache_page_no_t offset,
uchar *data);
/** Can be NULL */
TRANSLOG_ADDRESS (*get_log_address_callback)
(uchar *page, pgcache_page_no_t offset, uchar *data);
uchar *callback_data;
} PAGECACHE_FILE;
......@@ -257,10 +262,10 @@ extern void pagecache_unpin_by_link(PAGECACHE *pagecache,
/* PCFLUSH_ERROR and PCFLUSH_PINNED. */
#define PCFLUSH_PINNED_AND_ERROR (PCFLUSH_ERROR|PCFLUSH_PINNED)
#define pagecache_file_init(F,RC,WC,D) \
#define pagecache_file_init(F,RC,WC,GLC,D) \
do{ \
(F).read_callback= (RC); (F).write_callback= (WC); \
(F).callback_data= (uchar*)(D); \
(F).get_log_address_callback= (GLC); (F).callback_data= (uchar*)(D); \
} while(0)
#define flush_pagecache_blocks(A,B,C) \
......
......@@ -98,19 +98,16 @@ int maria_panic(enum ha_panic_function flag)
#ifdef CANT_OPEN_FILES_TWICE
{ /* Open closed files */
char name_buff[FN_REFLEN];
if (info->s->kfile.file < 0)
MARIA_SHARE *share= info->s;
if (share->kfile.file < 0)
{
if ((info->s->kfile.file= my_open(fn_format(name_buff,
info->filename, "",
N_NAME_IEXT,4),
info->mode,
MYF(MY_WME))) < 0)
if ((share->kfile.file= my_open(fn_format(name_buff,
info->filename, "",
N_NAME_IEXT,4),
info->mode,
MYF(MY_WME))) < 0)
error = my_errno;
pagecache_file_init(info->s->kfile, &maria_page_crc_check_index,
(info->s->options & HA_OPTION_PAGE_CHECKSUM ?
&maria_page_crc_set_index :
&maria_page_filler_set_normal), info->s);
}
if (info->dfile.file < 0)
{
......@@ -119,12 +116,10 @@ int maria_panic(enum ha_panic_function flag)
info->mode,
MYF(MY_WME))) < 0)
error = my_errno;
pagecache_file_init(info->dfile, &maria_page_crc_check_data,
(share->options & HA_OPTION_PAGE_CHECKSUM ?
&maria_page_crc_set_normal:
&maria_page_filler_set_normal), share);
info->rec_cache.file= info->dfile.file;
}
if (share->bitmap.file.file < 0)
share->bitmap.file.file= info->dfile.file;
}
#endif
if (info->was_locked)
......
......@@ -40,15 +40,18 @@ struct st_dirty_page /* used only in the REDO phase */
struct st_table_for_recovery /* used in the REDO and UNDO phase */
{
MARIA_HA *info;
File org_kfile, org_dfile; /**< OS descriptors when Checkpoint saw table */
};
/* Variables used by all functions of this module. Ok as single-threaded */
static struct st_trn_for_recovery *all_active_trans;
static struct st_table_for_recovery *all_tables;
static HASH all_dirty_pages;
static struct st_dirty_page *dirty_pages_pool;
static LSN current_group_end_lsn,
checkpoint_start= LSN_IMPOSSIBLE;
static LSN current_group_end_lsn;
/*
LSN after which dirty pages list does not apply. Can be slightly before
when ma_checkpoint_execute() started.
*/
static LSN checkpoint_start= LSN_IMPOSSIBLE;
#ifndef DBUG_OFF
/** Current group of REDOs is about this table and only this one */
static MARIA_HA *current_group_table;
......@@ -58,6 +61,7 @@ static FILE *tracef; /**< trace file for debugging */
static my_bool skip_DDLs; /**< if REDO phase should skip DDL records */
/** @brief to avoid writing a checkpoint if recovery did nothing. */
static my_bool checkpoint_useful;
/** @todo looks like duplicate of recovery_message_printed */
static my_bool procent_printed;
static ulonglong now; /**< for tracking execution time of phases */
uint warnings; /**< count of warnings */
......@@ -123,10 +127,8 @@ static void prepare_table_for_close(MARIA_HA *info, TRANSLOG_ADDRESS horizon);
static LSN parse_checkpoint_record(LSN lsn);
static void new_transaction(uint16 sid, TrID long_id, LSN undo_lsn,
LSN first_undo_lsn);
static int new_table(uint16 sid, const char *name,
File org_kfile, File org_dfile,
LSN lsn_of_file_id);
static int new_page(File fileid, pgcache_page_no_t pageid, LSN rec_lsn,
static int new_table(uint16 sid, const char *name, LSN lsn_of_file_id);
static int new_page(uint32 fileid, pgcache_page_no_t pageid, LSN rec_lsn,
struct st_dirty_page *dirty_page);
static int close_all_tables(void);
static my_bool close_one_table(const char *name, TRANSLOG_ADDRESS addr);
......@@ -135,6 +137,10 @@ static void print_redo_phase_progress(TRANSLOG_ADDRESS addr);
/** @brief global [out] buffer for translog_read_record(); never shrinks */
static struct
{
/*
uchar* is more adapted (less casts) than char*, thus we don't use
LEX_STRING.
*/
uchar *str;
size_t length;
} log_record_buffer;
......@@ -1137,7 +1143,7 @@ prototype_redo_exec_hook(FILE_ID)
all_tables[sid].info= NULL;
}
name= (char *)log_record_buffer.str + FILEID_STORE_SIZE;
if (new_table(sid, name, -1, -1, rec->lsn))
if (new_table(sid, name, rec->lsn))
goto end;
error= 0;
end:
......@@ -1145,9 +1151,7 @@ end:
}
static int new_table(uint16 sid, const char *name,
File org_kfile, File org_dfile,
LSN lsn_of_file_id)
static int new_table(uint16 sid, const char *name, LSN lsn_of_file_id)
{
/*
-1 (skip table): close table and return 0;
......@@ -1180,12 +1184,6 @@ static int new_table(uint16 sid, const char *name,
error= 0;
goto end;
}
if (maria_is_crashed(info))
{
/** @todo what should we do? how to continue recovery? */
tprint(tracef, "Table is crashed, can't apply log records to it\n");
goto end;
}
share= info->s;
/* check that we're not already using it */
if (share->reopen != 1)
......@@ -1214,6 +1212,16 @@ static int new_table(uint16 sid, const char *name,
LSN_IN_PARTS(lsn_of_file_id));
error= -1;
goto end;
/*
Note that we tested that before testing corruption; a recent corrupted
table is not a blocker for the present log record.
*/
}
if (maria_is_crashed(info))
{
/** @todo what should we do? how to continue recovery? */
tprint(tracef, "Table is crashed, can't apply log records to it\n");
goto end;
}
/* don't log any records for this work */
_ma_tmp_disable_logging_for_table(info, FALSE);
......@@ -1255,8 +1263,6 @@ static int new_table(uint16 sid, const char *name,
*/
info->s->lsn_of_file_id= lsn_of_file_id;
all_tables[sid].info= info;
all_tables[sid].org_kfile= org_kfile;
all_tables[sid].org_dfile= org_dfile;
/*
We don't set info->s->id, it would be useless (no logging in REDO phase);
if you change that, know that some records in REDO phase call
......@@ -1567,10 +1573,17 @@ prototype_redo_exec_hook(UNDO_ROW_INSERT)
MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
MARIA_SHARE *share;
set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
if (info == NULL)
{
/*
Note that we set undo_lsn anyway. So that if the transaction is later
rolled back, this UNDO is tried for execution and we get an error (as it
would then be abnormal that info==NULL).
*/
return 0;
}
share= info->s;
set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
{
tprint(tracef, " state has LSN (%lu,0x%lx) older than record, updating"
......@@ -1603,10 +1616,10 @@ prototype_redo_exec_hook(UNDO_ROW_DELETE)
MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
MARIA_SHARE *share;
set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
if (info == NULL)
return 0;
share= info->s;
set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
{
tprint(tracef, " state older than record\n");
......@@ -1637,10 +1650,11 @@ prototype_redo_exec_hook(UNDO_ROW_UPDATE)
{
MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
MARIA_SHARE *share;
set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
if (info == NULL)
return 0;
share= info->s;
set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
{
if (share->calc_checksum)
......@@ -1667,10 +1681,11 @@ prototype_redo_exec_hook(UNDO_KEY_INSERT)
{
MARIA_HA *info;
MARIA_SHARE *share;
set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
if (!(info= get_MARIA_HA_from_UNDO_record(rec)))
return 0;
share= info->s;
set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
{
const uchar *ptr= rec->header + LSN_STORE_SIZE + FILEID_STORE_SIZE;
......@@ -1721,9 +1736,10 @@ prototype_redo_exec_hook(UNDO_KEY_INSERT)
prototype_redo_exec_hook(UNDO_KEY_DELETE)
{
MARIA_HA *info;
set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
if (!(info= get_MARIA_HA_from_UNDO_record(rec)))
return 0;
set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
_ma_unpin_all_pages(info, rec->lsn);
return 0;
}
......@@ -1733,10 +1749,11 @@ prototype_redo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT)
{
MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
MARIA_SHARE *share;
set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
if (info == NULL)
return 0;
share= info->s;
set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
{
uint key_nr;
......@@ -1810,15 +1827,15 @@ prototype_redo_exec_hook(CLR_END)
uchar *logpos;
DBUG_ENTER("exec_REDO_LOGREC_CLR_END");
if (info == NULL)
DBUG_RETURN(0);
share= info->s;
previous_undo_lsn= lsn_korr(rec->header);
undone_record_type=
clr_type_korr(rec->header + LSN_STORE_SIZE + FILEID_STORE_SIZE);
log_desc= &log_record_type_descriptor[undone_record_type];
set_undo_lsn_for_active_trans(rec->short_trid, previous_undo_lsn);
if (info == NULL)
DBUG_RETURN(0);
share= info->s;
tprint(tracef, " CLR_END was about %s, undo_lsn now LSN (%lu,0x%lx)\n",
log_desc->name, LSN_IN_PARTS(previous_undo_lsn));
......@@ -2559,7 +2576,7 @@ static void prepare_table_for_close(MARIA_HA *info, TRANSLOG_ADDRESS horizon)
share->state.is_of_horizon= horizon;
_ma_state_info_write_sub(share->kfile.file, &share->state, 1);
}
_ma_reenable_logging_for_table(share);
_ma_reenable_logging_for_table(info);
info->trn= NULL; /* safety */
}
......@@ -2625,12 +2642,19 @@ static MARIA_HA *get_MARIA_HA_from_REDO_record(const
DBUG_ASSERT(info->s->last_version != 0);
if (cmp_translog_addr(rec->lsn, checkpoint_start) < 0)
{
/*
64-bit key is formed like this:
Most significant byte: 0
Next byte: 0 if data page, 1 if index page
Next 2 bytes: table's short id
Next 4 bytes: page number
*/
uint64 file_and_page_id=
(((uint64) (index_page_redo_entry ? all_tables[sid].org_kfile :
all_tables[sid].org_dfile)) << 32) | page;
(((uint64)((index_page_redo_entry << 16) | sid)) << 32) | page;
struct st_dirty_page *dirty_page= (struct st_dirty_page *)
hash_search(&all_dirty_pages,
(uchar *)&file_and_page_id, sizeof(file_and_page_id));
DBUG_PRINT("info", ("in dirty pages list: %d", dirty_page != NULL));
if ((dirty_page == NULL) ||
cmp_translog_addr(rec->lsn, dirty_page->rec_lsn) < 0)
{
......@@ -2737,7 +2761,8 @@ static LSN parse_checkpoint_record(LSN lsn)
/*
how much brain juice and discussions there was to come to writing this
line
line. It may make start_address slightly decrease (only by the time it
takes to write one or a few rows, roughly).
*/
set_if_smaller(start_address, minimum_rec_lsn_of_active_transactions);
......@@ -2770,22 +2795,17 @@ static LSN parse_checkpoint_record(LSN lsn)
for (i= 0; i< nb_tables; i++)
{
char name[FN_REFLEN];
File kfile, dfile;
LSN first_log_write_lsn;
uint name_len;
uint16 sid= uint2korr(ptr);
ptr+= 2;
DBUG_ASSERT(sid > 0);
kfile= uint4korr(ptr);
ptr+= 4;
dfile= uint4korr(ptr);
ptr+= 4;
first_log_write_lsn= lsn_korr(ptr);
ptr+= LSN_STORE_SIZE;
name_len= strlen((char *)ptr) + 1;
strmake(name, (char *)ptr, sizeof(name)-1);
ptr+= name_len;
if (new_table(sid, name, kfile, dfile, first_log_write_lsn))
if (new_table(sid, name, first_log_write_lsn))
return LSN_ERROR;
}
......@@ -2808,15 +2828,18 @@ static LSN parse_checkpoint_record(LSN lsn)
minimum_rec_lsn_of_dirty_pages= LSN_MAX;
for (i= 0; i < nb_dirty_pages ; i++)
{
pgcache_page_no_t pageid;
pgcache_page_no_t page_id;
LSN rec_lsn;
File fileid= uint4korr(ptr);
ptr+= 4;
pageid= uint4korr(ptr);
uint16 table_id= uint2korr(ptr);
ptr+= 2;
uint32 is_index= ptr[0];
ptr++;
page_id= uint4korr(ptr);
ptr+= 4;
rec_lsn= lsn_korr(ptr);
ptr+= LSN_STORE_SIZE;
if (new_page(fileid, pageid, rec_lsn, next_dirty_page_in_pool++))
if (new_page((is_index << 16) | table_id,
page_id, rec_lsn, next_dirty_page_in_pool++))
return LSN_ERROR;
set_if_smaller(minimum_rec_lsn_of_dirty_pages, rec_lsn);
}
......@@ -2830,11 +2853,11 @@ static LSN parse_checkpoint_record(LSN lsn)
eprint(tracef, "checkpoint record corrupted\n");
return LSN_ERROR;
}
set_if_smaller(start_address, minimum_rec_lsn_of_dirty_pages);
/*
start_address is now from where the dirty pages list can be ignored.
Find LSN higher or equal to this TRANSLOG_ADDRESS, suitable for
translog_read_record() functions
translog_read_record() functions.
*/
checkpoint_start= translog_next_LSN(start_address, LSN_IMPOSSIBLE);
if (checkpoint_start == LSN_IMPOSSIBLE)
......@@ -2845,10 +2868,16 @@ static LSN parse_checkpoint_record(LSN lsn)
*/
return LSN_ERROR;
}
return checkpoint_start;
/* now, where the REDO phase should start reading log: */
set_if_smaller(start_address, minimum_rec_lsn_of_dirty_pages);
DBUG_PRINT("info",
("checkpoint_start: (%lu,0x%lx) start_address: (%lu,0x%lx)",
LSN_IN_PARTS(checkpoint_start), LSN_IN_PARTS(start_address)));
return start_address;
}
static int new_page(File fileid, pgcache_page_no_t pageid, LSN rec_lsn,
static int new_page(uint32 fileid, pgcache_page_no_t pageid, LSN rec_lsn,
struct st_dirty_page *dirty_page)
{
/* serves as hash key */
......@@ -2954,6 +2983,7 @@ static my_bool close_one_table(const char *name, TRANSLOG_ADDRESS addr)
@note for example in the REDO phase we disable logging but that does not
make the log incomplete.
*/
void _ma_tmp_disable_logging_for_table(MARIA_HA *info,
my_bool log_incomplete)
{
......@@ -2966,15 +2996,52 @@ void _ma_tmp_disable_logging_for_table(MARIA_HA *info,
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
translog_write_record(&lsn, LOGREC_INCOMPLETE_LOG,
info->trn, info, sizeof(log_data),
&dummy_transaction_object, info, sizeof(log_data),
TRANSLOG_INTERNAL_PARTS + 1, log_array,
log_data, NULL);
}
/* if we disabled before writing the record, record wouldn't reach log */
share->now_transactional= FALSE;
/*
Some code in ma_blockrec.c assumes a trn.
info->trn in some cases can be not NULL and not dummy_transaction_object
when arriving here, but overwriting it does not leak as it is still
remembered in THD_TRN.
*/
info->trn= &dummy_transaction_object;
share->page_type= PAGECACHE_PLAIN_PAGE;
/* Functions below will pick up now_transactional and change callbacks */
set_data_pagecache_callbacks(&info->dfile, share);
set_index_pagecache_callbacks(&share->kfile, share);
_ma_bitmap_set_pagecache_callbacks(&share->bitmap.file, share);
}
/**
Re-enables logging for a table which had it temporarily disabled.
@param info table
*/
void _ma_reenable_logging_for_table(MARIA_HA *info)
{
MARIA_SHARE *share= info->s;
if ((share->now_transactional= share->base.born_transactional))
{
/*
The change below does NOT affect pages already in the page cache, so you
should have flushed them out already, or write a pagecache function to
change their type.
*/
share->page_type= PAGECACHE_LSN_PAGE;
info->trn= NULL; /* safety */
}
set_data_pagecache_callbacks(&info->dfile, share);
set_index_pagecache_callbacks(&share->kfile, share);
_ma_bitmap_set_pagecache_callbacks(&share->bitmap.file, share);
}
static void print_redo_phase_progress(TRANSLOG_ADDRESS addr)
{
static int end_logno= FILENO_IMPOSSIBLE, end_offset, percentage_printed= 0;
......
......@@ -201,10 +201,6 @@ int maria_write(MARIA_HA *info, uchar *record)
{
if ((*share->write_record)(info,record))
goto err;
/**
@todo when we enable multiple writers, we will have to protect
'records' and 'checksum' somehow.
*/
info->state->checksum+= !share->now_transactional *
info->cur_row.checksum;
}
......
......@@ -1038,7 +1038,7 @@ static int maria_chk(HA_CHECK *param, char *filename)
if (param->testflag & (T_REP_ANY | T_SORT_RECORDS | T_SORT_INDEX))
{
/* Mark table as not transactional to avoid logging */
maria_disable_logging(info);
_ma_tmp_disable_logging_for_table(info, FALSE);
if (param->testflag & T_REP_ANY)
{
......@@ -1214,7 +1214,7 @@ static int maria_chk(HA_CHECK *param, char *filename)
((param->testflag & T_SORT_RECORDS) ?
UPDATE_SORT : 0)));
info->update&= ~HA_STATE_CHANGED;
maria_enable_logging(info);
_ma_reenable_logging_for_table(info);
maria_lock_database(info, F_UNLCK);
end2:
......@@ -1673,7 +1673,7 @@ static int maria_sort_records(HA_CHECK *param,
pagecache_file_init(info->dfile, &maria_page_crc_check_data,
(share->options & HA_OPTION_PAGE_CHECKSUM ?
&maria_page_crc_set_normal :
&maria_page_filler_set_normal), share);
&maria_page_filler_set_normal), NULL, share);
info->state->del=0;
info->state->empty=0;
share->state.dellink= HA_OFFSET_ERROR;
......
......@@ -1054,12 +1054,13 @@ int _ma_update_create_rename_lsn(MARIA_SHARE *share,
LSN lsn, my_bool do_sync);
int _ma_update_create_rename_lsn_sub(MARIA_SHARE *share,
LSN lsn, my_bool do_sync);
void set_data_pagecache_callbacks(PAGECACHE_FILE *file,
MARIA_SHARE *share);
void set_index_pagecache_callbacks(PAGECACHE_FILE *file,
MARIA_SHARE *share);
void _ma_tmp_disable_logging_for_table(MARIA_HA *info,
my_bool log_incomplete);
#define _ma_reenable_logging_for_table(S) \
{ if (((S)->now_transactional= (S)->base.born_transactional)) \
(S)->page_type= PAGECACHE_LSN_PAGE; }
void _ma_reenable_logging_for_table(MARIA_HA *info);
#define MARIA_NO_CRC_NORMAL_PAGE 0xffffffff
#define MARIA_NO_CRC_BITMAP_PAGE 0xfffffffe
......
......@@ -333,7 +333,7 @@ int main(int argc __attribute__((unused)),
errno);
exit(1);
}
pagecache_file_init(file1, &dummy_callback, &dummy_callback, NULL);
pagecache_file_init(file1, &dummy_callback, &dummy_callback, NULL, NULL);
DBUG_PRINT("info", ("file1: %d", file1.file));
if (chmod(file1_name, S_IRWXU | S_IRWXG | S_IRWXO) != 0)
{
......
......@@ -520,7 +520,7 @@ int main(int argc __attribute__((unused)),
errno);
exit(1);
}
pagecache_file_init(file1, &dummy_callback, &dummy_callback, NULL);
pagecache_file_init(file1, &dummy_callback, &dummy_callback, NULL, NULL);
my_close(tmp_file, MYF(0));
my_delete(file2_name, MYF(0));
......
......@@ -124,7 +124,7 @@ int main(int argc __attribute__((unused)), char *argv[])
errno);
exit(1);
}
pagecache_file_init(file1, &dummy_callback, &dummy_callback, NULL);
pagecache_file_init(file1, &dummy_callback, &dummy_callback, NULL, NULL);
if (chmod(file1_name, S_IRWXU | S_IRWXG | S_IRWXO) != 0)
{
fprintf(stderr, "Got error during file1 chmod() (errno: %d)\n",
......@@ -136,8 +136,7 @@ int main(int argc __attribute__((unused)), char *argv[])
uchar page[PCACHE_PAGE];
bzero(page, PCACHE_PAGE);
#define PAGE_LSN_OFFSET 0
lsn_store(page + PAGE_LSN_OFFSET, lsn);
lsn_store(page, lsn);
pagecache_write(&pagecache, &file1, 0, 3, (char*)page,
PAGECACHE_LSN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment