Commit 68f5d6a0 authored by unknown's avatar unknown

WL#3072 - Maria recovery.

* fix for bitmap vs checkpoint bug which could lead to corrupted
tables in case of crashes at certain moments: a bitmap could be flushed
to disk even though it was inconsistent with the log (it could be
flushed before REDO-UNDO are written to the log). One bug remains, need
code from others. Tests added. Fix is to pin unflushable bitmap pages,
and let checkpoint wait for them to be flushable.
* fix for long_trid!=0 assertion failure at Recovery.
* less useless wakeups in the background flush|checkpoint thread.
* store global_trid_generator in checkpoint record.


mysql-test/r/maria-recovery.result:
  result update
mysql-test/t/maria-recovery.test:
  make it easier to locate subtests
storage/maria/ma_bitmap.c:
  When we send a bitmap to the pagecache, if this bitmap is not in a
  flushable state we keep it pinned and add it to a list, it will be
  unpinned when the bitmap is flushable again.
  A new function _ma_bitmap_flush_all() used by checkpoint.
  A new function _ma_bitmap_flushable() used by block format to signal
  when it starts modifying a bitmap and when it is done with it.
storage/maria/ma_blockrec.c:
  When starting a row operation (insert/update/delete), mark that
  the bitmap is not flushable (because for example INSERT is going
  to over-allocate in the bitmap to prevent other threads from using
  our data pages). If a checkpoint comes at this moment it will wait
  for the bitmap to be flushable before flushing it.
  When the operation ends, bitmap becomes flushable again; that
  transition is done under the bitmap's mutex (needed for correct
  synchro with a concurrent checkpoint); but for INSERT/UPDATE this
  happens inside _ma_bitmap_release_unused() at a place where it already
  has the mutex, so the only penalty (mutex adding) is in DELETE and UNDO
  of INSERT. In case of errors after setting the bitmap unflushable,
  we must always set it back to flushable or checkpoint would block.
  Debug possibilities to force a sleep while the bitmap is over-allocated.
  In case of error in get_head_or_tail() in allocate_and_write_block_record(),
  we still need to unpin all pages.
  Bugfix: _ma_apply_redo_insert_row_blobs() produced wrong
  data_file_length.
storage/maria/ma_blockrec.h:
  new bitmap calls.
storage/maria/ma_checkpoint.c:
  filter_flush_indirect not needed anymore (flushing bitmap
  pages happens in _ma_bitmap_flush_all() now). So
  st_filter_param::is_data_file|pages_covered_by_bitmap not needed.
  Other filter_flush* don't need to flush bitmap anymore.
  Add debug possibility to flush all bitmap pages outside of a checkpoint,
  to simulate pagecache LRU eviction.
  When the background flush/checkpoint thread notices it has nothing
  to flush, it now sleeps directly until the next potential checkpoint
  moment instead of waking up every second.
  When in checkpoint we decide to not store a table in the checkpoint record
  (because it has logged no writes for example), we can also skip flushing
  this table.
storage/maria/ma_commit.c:
  comment is out-of-date
storage/maria/ma_key_recover.c:
  comment fix
storage/maria/ma_loghandler.c:
  comment is out-of-date
storage/maria/ma_open.c:
  comment is out-of-date
storage/maria/ma_pagecache.c:
  comment for bug to fix. And we don't take checkpoints at end of REDO
  phase yet so can trust block->type.
storage/maria/ma_recovery.c:
  Comments. Now-unneeded code for incomplete REDO-UNDO groups removed.
  When we forget about an old transaction we must really forget
  about it with bzero() (fixes the "long_trid!=0 assertion" recovery
  bug). When we delete a row with maria_delete() we turn on
  STATE_NOT_OPTIMIZED_ROWS so we do the same when we see a CLR_END
  for an UNDO_ROW_INSERT or when we execute an UNDO_ROW_INSERT (in both
  cases a row was deleted). Pick up max_long_trid from the checkpoint record.
storage/maria/maria_chk.c:
  comment
storage/maria/maria_def.h:
  MARIA_FILE_BITMAP gets new members: 'flushable', 'bitmap_cond' and
  'pinned_pages'.
storage/maria/trnman.c:
  I used to think that recovery only needs to know the maximum TrID
  of the lists of active and committed transactions. But no, sometimes
  both lists can even be empty and their TrID should not be reused.
  So Checkpoint now saves global_trid_generator in the checkpoint record.
storage/maria/trnman_public.h:
  macros to read/store a TrID
mysql-test/r/maria-recovery-bitmap.result:
  result is ok. Without the code fix, we would get a corruption message
  about the bitmap page in CHECK TABLE EXTENDED.
mysql-test/t/maria-recovery-bitmap-master.opt:
  usual when we crash mysqld in tests
mysql-test/t/maria-recovery-bitmap.test:
  test of recovery problems specific of the bitmap pages.
parent 2e5aff67
drop database if exists mysqltest;
create database mysqltest;
use mysqltest;
* shut down mysqld, removed logs, restarted it
use mysqltest;
create table t1 (a varchar(10000)) engine=maria;
* TEST of over-allocated bitmap not flushed by checkpoint
insert into t1 values ("bbbbbbb");
flush table t1;
* copied t1 for comparison
insert into t1 values ("bbbbbbb");
delete from t1 limit 1;
set session debug="+d,info,enter,exit,maria_over_alloc_bitmap";
insert into t1 values ("aaaaaaaaa");
set global maria_checkpoint_interval=1;
SET SESSION debug="+d,maria_crash";
* crashing mysqld intentionally
set global maria_checkpoint_interval=1;
ERROR HY000: Lost connection to MySQL server during query
* recovery happens
check table t1 extended;
Table Op Msg_type Msg_text
mysqltest.t1 check status OK
* testing that checksum after recovery is as expected
Checksum-check
ok
use mysqltest;
drop database mysqltest_for_comparison;
drop database mysqltest;
set global maria_log_file_size=4294967296;
drop database if exists mysqltest; drop database if exists mysqltest;
create database mysqltest; create database mysqltest;
use mysqltest; use mysqltest;
...@@ -118,6 +119,7 @@ a ...@@ -118,6 +119,7 @@ a
00000000 00000000
00000000 00000000
drop table t1; drop table t1;
* TEST of two REDOs for same page in one REDO group
* shut down mysqld, removed logs, restarted it * shut down mysqld, removed logs, restarted it
use mysqltest; use mysqltest;
CREATE TABLE t1 ( CREATE TABLE t1 (
...@@ -150,6 +152,7 @@ SELECT LENGTH(b) FROM t1 WHERE i=3; ...@@ -150,6 +152,7 @@ SELECT LENGTH(b) FROM t1 WHERE i=3;
LENGTH(b) LENGTH(b)
5001 5001
drop table t1; drop table t1;
* TEST of INSERT vs state.auto_increment
* shut down mysqld, removed logs, restarted it * shut down mysqld, removed logs, restarted it
use mysqltest; use mysqltest;
CREATE TABLE t1 ( CREATE TABLE t1 (
...@@ -184,6 +187,7 @@ t1 CREATE TABLE `t1` ( ...@@ -184,6 +187,7 @@ t1 CREATE TABLE `t1` (
PRIMARY KEY (`i`), PRIMARY KEY (`i`),
KEY `c` (`c`) KEY `c` (`c`)
) ENGINE=MARIA AUTO_INCREMENT=5 DEFAULT CHARSET=latin1 ) ENGINE=MARIA AUTO_INCREMENT=5 DEFAULT CHARSET=latin1
* TEST of UPDATE vs state.auto_increment
* copied t1 for feeding_recovery * copied t1 for feeding_recovery
update t1 set i=15 where c="a"; update t1 set i=15 where c="a";
flush table t1; flush table t1;
......
--skip-stack-trace --skip-core-file
# Tests of Maria's recovery of the bitmap pages
--source include/not_embedded.inc
# Don't test this under valgrind, memory leaks will occur as we crash
--source include/not_valgrind.inc
# Binary must be compiled with debug for crash to occur
--source include/have_debug.inc
--source include/have_maria.inc
--disable_warnings
drop database if exists mysqltest;
--enable_warnings
create database mysqltest;
# Include scripts can perform SQL. For it to not influence the main test
# they use a separate connection. This way if they use a DDL it would
# not autocommit in the main test.
connect (admin, 127.0.0.1, root,,mysqltest,,);
--enable_reconnect
connection default;
use mysqltest;
--enable_reconnect
-- source include/maria_empty_logs.inc
let $mms_tables=1;
create table t1 (a varchar(10000)) engine=maria;
# we want recovery to use the tables as they were at time of crash
let $mvr_restore_old_snapshot=0;
# UNDO phase prevents physical comparison, normally,
# so we'll only use checksums to compare.
let $mms_compare_physically=0;
let $mvr_crash_statement= set global maria_checkpoint_interval=1;
--echo * TEST of over-allocated bitmap not flushed by checkpoint
let $mvr_debug_option="+d,maria_crash";
insert into t1 values ("bbbbbbb");
-- source include/maria_make_snapshot_for_comparison.inc
# make_snapshot_for_comparison closed the table, which lost its id.
# So we make a null operation just to give a short id to the table so
# that checkpoint includes table in checkpoint (otherwise nothing to
# test).
insert into t1 values ("bbbbbbb");
delete from t1 limit 1;
set session debug="+d,info,enter,exit,maria_over_alloc_bitmap";
send insert into t1 values ("aaaaaaaaa");
connection admin;
# Leave time for INSERT to block after modifying bitmap;
# in the future we should not use sleep but something like
# debug_sync_point().
sleep 5;
# force a checkpoint, which could, if buggy, flush over-allocated
# bitmap page; as REDO-UNDO was not written, bitmap and data page
# would be inconsistent. Correct checkpoint will wait until UNDO is
# written.
set global maria_checkpoint_interval=1;
-- source include/maria_verify_recovery.inc
# disabled until pagecache callback framework is coded at which point
# we can add a get_lsn() callback for bitmaps, fixing the below bug.
if (0)
{
--echo * TEST of bitmap flushed without REDO-UNDO in the log (WAL violation)
# before crashing we'll flush the bitmap page
let $mvr_debug_option="+d,maria_flush_bitmap,maria_crash";
-- source include/maria_make_snapshot_for_comparison.inc
lock tables t1 write;
insert into t1 values (REPEAT('a', 6000));
# bitmap of after-INSERT will be on disk, but data pages will not; if
# log is not flushed the bitmap is inconsistent with the data.
-- source include/maria_verify_recovery.inc
drop table t1;
}
# clean up everything
let $mms_purpose=comparison;
eval drop database mysqltest_for_$mms_purpose;
drop database mysqltest;
...@@ -122,6 +122,7 @@ drop table t1; ...@@ -122,6 +122,7 @@ drop table t1;
# the rewrite was ignored. # the rewrite was ignored.
# #
--echo * TEST of two REDOs for same page in one REDO group
-- source include/maria_empty_logs.inc -- source include/maria_empty_logs.inc
let $mms_tables=1; let $mms_tables=1;
CREATE TABLE t1 ( CREATE TABLE t1 (
...@@ -144,6 +145,7 @@ SELECT LENGTH(b) FROM t1 WHERE i=3; ...@@ -144,6 +145,7 @@ SELECT LENGTH(b) FROM t1 WHERE i=3;
drop table t1; drop table t1;
# Test that INSERT's effect on auto-increment is recovered # Test that INSERT's effect on auto-increment is recovered
--echo * TEST of INSERT vs state.auto_increment
-- source include/maria_empty_logs.inc -- source include/maria_empty_logs.inc
let $mms_tables=1; let $mms_tables=1;
CREATE TABLE t1 ( CREATE TABLE t1 (
...@@ -165,6 +167,7 @@ let $mvr_crash_statement= set global maria_checkpoint_interval=1; ...@@ -165,6 +167,7 @@ let $mvr_crash_statement= set global maria_checkpoint_interval=1;
show create table t1; show create table t1;
# Test that UPDATE's effect on auto-increment is recovered # Test that UPDATE's effect on auto-increment is recovered
--echo * TEST of UPDATE vs state.auto_increment
-- source include/maria_make_snapshot_for_feeding_recovery.inc -- source include/maria_make_snapshot_for_feeding_recovery.inc
update t1 set i=15 where c="a"; update t1 set i=15 where c="a";
-- source include/maria_make_snapshot_for_comparison.inc -- source include/maria_make_snapshot_for_comparison.inc
......
...@@ -132,6 +132,8 @@ uchar maria_bitmap_marker[4]= ...@@ -132,6 +132,8 @@ uchar maria_bitmap_marker[4]=
{(uchar) 255, (uchar) 255, (uchar) 255, (uchar) 254}; {(uchar) 255, (uchar) 255, (uchar) 255, (uchar) 254};
uchar maria_normal_page_marker[4]= uchar maria_normal_page_marker[4]=
{(uchar) 255, (uchar) 255, (uchar) 255, (uchar) 255}; {(uchar) 255, (uchar) 255, (uchar) 255, (uchar) 255};
/*#define WRONG_BITMAP_FLUSH 1*/ /*define only for provoking bugs*/
#undef WRONG_BITMAP_FLUSH
static my_bool _ma_read_bitmap_page(MARIA_SHARE *share, static my_bool _ma_read_bitmap_page(MARIA_SHARE *share,
MARIA_FILE_BITMAP *bitmap, MARIA_FILE_BITMAP *bitmap,
...@@ -143,14 +145,48 @@ static my_bool _ma_read_bitmap_page(MARIA_SHARE *share, ...@@ -143,14 +145,48 @@ static my_bool _ma_read_bitmap_page(MARIA_SHARE *share,
static inline my_bool write_changed_bitmap(MARIA_SHARE *share, static inline my_bool write_changed_bitmap(MARIA_SHARE *share,
MARIA_FILE_BITMAP *bitmap) MARIA_FILE_BITMAP *bitmap)
{ {
DBUG_ENTER("write_changed_bitmap");
DBUG_ASSERT(share->pagecache->block_size == bitmap->block_size); DBUG_ASSERT(share->pagecache->block_size == bitmap->block_size);
return (pagecache_write(share->pagecache, DBUG_PRINT("info", ("bitmap->flushable: %d", bitmap->flushable));
if (bitmap->flushable
#ifdef WRONG_BITMAP_FLUSH
|| 1
#endif
)
{
my_bool res= pagecache_write(share->pagecache,
&bitmap->file, bitmap->page, 0, &bitmap->file, bitmap->page, 0,
(uchar*) bitmap->map, PAGECACHE_PLAIN_PAGE, (uchar*) bitmap->map, PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED, PAGECACHE_LOCK_LEFT_UNLOCKED,
PAGECACHE_PIN_LEFT_UNPINNED, PAGECACHE_PIN_LEFT_UNPINNED,
PAGECACHE_WRITE_DELAY, 0, PAGECACHE_WRITE_DELAY, 0, LSN_IMPOSSIBLE);
LSN_IMPOSSIBLE)); DBUG_RETURN(res);
}
else
{
/**
@todo RECOVERY BUG
Not flushable: its content is not reflected by the log, to honour WAL we
must keep the bitmap page pinned. Scenario of INSERT:
REDO - UNDO (written to log but not forced)
bitmap goes to page cache (because other INSERT needs to)
and then to disk (pagecache eviction)
crash: recovery will not find REDO-UNDO, table is corrupted.
Solutions:
give LSNs to bitmap pages or change pagecache to flush all log when
flushing a bitmap page or keep bitmap page pinned until checkpoint.
*/
MARIA_PINNED_PAGE page_link;
int res= pagecache_write(share->pagecache,
&bitmap->file, bitmap->page, 0,
(uchar*) bitmap->map, PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_WRITE, PAGECACHE_PIN,
PAGECACHE_WRITE_DELAY, &page_link.link,
LSN_IMPOSSIBLE);
page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK;
push_dynamic(&bitmap->pinned_pages, (void*) &page_link);
DBUG_RETURN(res);
}
} }
/* /*
...@@ -180,7 +216,9 @@ my_bool _ma_bitmap_init(MARIA_SHARE *share, File file) ...@@ -180,7 +216,9 @@ my_bool _ma_bitmap_init(MARIA_SHARE *share, File file)
size*= 2; size*= 2;
#endif #endif
if (!(bitmap->map= (uchar*) my_malloc(size, MYF(MY_WME)))) if (((bitmap->map= (uchar*) my_malloc(size, MYF(MY_WME))) == NULL) ||
my_init_dynamic_array(&bitmap->pinned_pages,
sizeof(MARIA_PINNED_PAGE), 1, 1))
return 1; return 1;
bitmap->file.file= file; bitmap->file.file= file;
...@@ -193,6 +231,7 @@ my_bool _ma_bitmap_init(MARIA_SHARE *share, File file) ...@@ -193,6 +231,7 @@ my_bool _ma_bitmap_init(MARIA_SHARE *share, File file)
The +1 is to add the bitmap page, as this doesn't have to be covered The +1 is to add the bitmap page, as this doesn't have to be covered
*/ */
bitmap->pages_covered= aligned_bit_blocks * 16 + 1; bitmap->pages_covered= aligned_bit_blocks * 16 + 1;
bitmap->flushable= TRUE;
/* Update size for bits */ /* Update size for bits */
/* TODO; Make this dependent of the row size */ /* TODO; Make this dependent of the row size */
...@@ -207,6 +246,7 @@ my_bool _ma_bitmap_init(MARIA_SHARE *share, File file) ...@@ -207,6 +246,7 @@ my_bool _ma_bitmap_init(MARIA_SHARE *share, File file)
bitmap->sizes[7]= 0; bitmap->sizes[7]= 0;
pthread_mutex_init(&share->bitmap.bitmap_lock, MY_MUTEX_INIT_SLOW); pthread_mutex_init(&share->bitmap.bitmap_lock, MY_MUTEX_INIT_SLOW);
pthread_cond_init(&share->bitmap.bitmap_cond, 0);
_ma_bitmap_reset_cache(share); _ma_bitmap_reset_cache(share);
...@@ -231,6 +271,8 @@ my_bool _ma_bitmap_end(MARIA_SHARE *share) ...@@ -231,6 +271,8 @@ my_bool _ma_bitmap_end(MARIA_SHARE *share)
{ {
my_bool res= _ma_bitmap_flush(share); my_bool res= _ma_bitmap_flush(share);
pthread_mutex_destroy(&share->bitmap.bitmap_lock); pthread_mutex_destroy(&share->bitmap.bitmap_lock);
pthread_cond_destroy(&share->bitmap.bitmap_cond);
delete_dynamic(&share->bitmap.pinned_pages);
my_free((uchar*) share->bitmap.map, MYF(MY_ALLOW_ZERO_PTR)); my_free((uchar*) share->bitmap.map, MYF(MY_ALLOW_ZERO_PTR));
share->bitmap.map= 0; share->bitmap.map= 0;
return res; return res;
...@@ -273,6 +315,104 @@ my_bool _ma_bitmap_flush(MARIA_SHARE *share) ...@@ -273,6 +315,104 @@ my_bool _ma_bitmap_flush(MARIA_SHARE *share)
} }
/**
Dirty-page filtering criteria for bitmap pages
@param type Page's type
@param pageno Page's number
@param rec_lsn Page's rec_lsn
@param arg pages_covered of bitmap
*/
static enum pagecache_flush_filter_result
filter_flush_bitmap_pages(enum pagecache_page_type type
__attribute__ ((unused)),
pgcache_page_no_t pageno,
LSN rec_lsn __attribute__ ((unused)),
void *arg)
{
return ((pageno % (*(ulong*)arg)) == 0);
}
/**
Flushes current bitmap page to the pagecache, and then all bitmap pages
from pagecache to the file. Used by Checkpoint.
@param share Table's share
*/
my_bool _ma_bitmap_flush_all(MARIA_SHARE *share)
{
my_bool res= 0;
MARIA_FILE_BITMAP *bitmap= &share->bitmap;
DBUG_ENTER("_ma_bitmap_flush_all");
pthread_mutex_lock(&bitmap->bitmap_lock);
if (bitmap->changed)
{
#ifndef WRONG_BITMAP_FLUSH
while (!bitmap->flushable)
{
DBUG_PRINT("info", ("waiting for bitmap to be flushable"));
pthread_cond_wait(&bitmap->bitmap_cond, &bitmap->bitmap_lock);
}
#endif
/*
Bitmap is in a flushable state: its contents in memory are reflected by
log records (complete REDO-UNDO groups) and all bitmap pages are
unpinned. We keep the mutex to preserve this situation, and flush to the
file.
*/
res= write_changed_bitmap(share, bitmap);
bitmap->changed= FALSE;
/*
We do NOT use FLUSH_KEEP_LAZY because we must be sure that bitmap
pages have been flushed. That's a condition of correctness of
Recovery: data pages may have been all flushed, if we write the
checkpoint record Recovery will start from after their REDOs. If
bitmap page was not flushed, as the REDOs about it will be skipped, it
will wrongly not be recovered. If bitmap pages had a rec_lsn it would
be different.
There should be no pinned pages as bitmap->flushable is true.
*/
if (flush_pagecache_blocks_with_filter(share->pagecache,
&bitmap->file, FLUSH_KEEP,
filter_flush_bitmap_pages,
&bitmap->pages_covered) &
PCFLUSH_PINNED_AND_ERROR)
res= TRUE;
}
pthread_mutex_unlock(&bitmap->bitmap_lock);
DBUG_RETURN(res);
}
/**
@brief Unpin all pinned bitmap pages
@param share Table's share
@return Operation status
@retval 0 ok
*/
static void _ma_bitmap_unpin_all(MARIA_SHARE *share)
{
MARIA_FILE_BITMAP *bitmap= &share->bitmap;
MARIA_PINNED_PAGE *page_link= ((MARIA_PINNED_PAGE*)
dynamic_array_ptr(&bitmap->pinned_pages, 0));
MARIA_PINNED_PAGE *pinned_page= page_link + bitmap->pinned_pages.elements;
DBUG_ENTER("_ma_bitmap_unpin_all");
DBUG_PRINT("info", ("pinned: %u", bitmap->pinned_pages.elements));
while (pinned_page-- != page_link)
pagecache_unlock_by_link(share->pagecache, pinned_page->link,
pinned_page->unlock, PAGECACHE_UNPIN,
LSN_IMPOSSIBLE, LSN_IMPOSSIBLE, TRUE);
bitmap->pinned_pages.elements= 0;
DBUG_VOID_RETURN;
}
/* /*
Intialize bitmap in memory to a zero bitmap Intialize bitmap in memory to a zero bitmap
...@@ -684,12 +824,6 @@ static my_bool _ma_change_bitmap_page(MARIA_HA *info, ...@@ -684,12 +824,6 @@ static my_bool _ma_change_bitmap_page(MARIA_HA *info,
if (bitmap->changed) if (bitmap->changed)
{ {
/**
@todo RECOVERY BUG this is going to flush the bitmap page possibly to
disk even though it could be over-allocated with not yet any REDO-UNDO
complete group (WAL violation: no way to undo the over-allocation if
crash). See also collect_tables().
*/
if (write_changed_bitmap(info->s, bitmap)) if (write_changed_bitmap(info->s, bitmap))
DBUG_RETURN(1); DBUG_RETURN(1);
bitmap->changed= 0; bitmap->changed= 0;
...@@ -1973,6 +2107,46 @@ my_bool _ma_bitmap_set_full_page_bits(MARIA_HA *info, ...@@ -1973,6 +2107,46 @@ my_bool _ma_bitmap_set_full_page_bits(MARIA_HA *info,
} }
/**
Make a transition of MARIA_FILE_BITMAP::flushable.
If the bitmap becomes flushable, which requires that REDO-UNDO has been
logged and all bitmap pages touched by the thread have a correct
allocation, it unpins all bitmap pages, and if checkpoint is waiting, it
wakes it up.
If the bitmap becomes unflushable, it just records it.
@param share Table's share
@param flushable New state
*/
void _ma_bitmap_flushable(MARIA_SHARE *share, my_bool flushable)
{
MARIA_FILE_BITMAP *bitmap= &share->bitmap;
if (flushable)
{
pthread_mutex_lock(&bitmap->bitmap_lock);
_ma_bitmap_unpin_all(share);
bitmap->flushable= TRUE;
pthread_mutex_unlock(&bitmap->bitmap_lock);
/*
Ok to read in_checkpoint without mutex, as it is set before Checkpoint
calls _ma_bitmap_flush_all().
*/
if (share->in_checkpoint)
{
DBUG_PRINT("info", ("bitmap ready waking up checkpoint"));
pthread_cond_broadcast(&bitmap->bitmap_cond);
}
return;
}
/*
Ok to set without mutex: we didn't touch the bitmap yet; when we touch it
we will take the mutex.
*/
bitmap->flushable= FALSE;
}
/* /*
Correct bitmap pages to reflect the true allocation Correct bitmap pages to reflect the true allocation
...@@ -2015,7 +2189,7 @@ my_bool _ma_bitmap_release_unused(MARIA_HA *info, MARIA_BITMAP_BLOCKS *blocks) ...@@ -2015,7 +2189,7 @@ my_bool _ma_bitmap_release_unused(MARIA_HA *info, MARIA_BITMAP_BLOCKS *blocks)
*/ */
current_bitmap_value= FULL_HEAD_PAGE; current_bitmap_value= FULL_HEAD_PAGE;
pthread_mutex_lock(&info->s->bitmap.bitmap_lock); pthread_mutex_lock(&bitmap->bitmap_lock);
/* First handle head block */ /* First handle head block */
if (block->used & BLOCKUSED_USED) if (block->used & BLOCKUSED_USED)
...@@ -2065,11 +2239,19 @@ my_bool _ma_bitmap_release_unused(MARIA_HA *info, MARIA_BITMAP_BLOCKS *blocks) ...@@ -2065,11 +2239,19 @@ my_bool _ma_bitmap_release_unused(MARIA_HA *info, MARIA_BITMAP_BLOCKS *blocks)
block->page, page_count)) block->page, page_count))
goto err; goto err;
} }
pthread_mutex_unlock(&info->s->bitmap.bitmap_lock);
_ma_bitmap_unpin_all(info->s);
bitmap->flushable= TRUE;
pthread_mutex_unlock(&bitmap->bitmap_lock);
if (info->s->in_checkpoint)
{
DBUG_PRINT("info", ("bitmap ready waking up checkpoint"));
pthread_cond_broadcast(&bitmap->bitmap_cond);
}
DBUG_RETURN(0); DBUG_RETURN(0);
err: err:
pthread_mutex_unlock(&info->s->bitmap.bitmap_lock); pthread_mutex_unlock(&bitmap->bitmap_lock);
DBUG_RETURN(1); DBUG_RETURN(1);
} }
......
...@@ -2692,32 +2692,21 @@ static my_bool allocate_and_write_block_record(MARIA_HA *info, ...@@ -2692,32 +2692,21 @@ static my_bool allocate_and_write_block_record(MARIA_HA *info,
MARIA_BITMAP_BLOCKS *blocks= &row->insert_blocks; MARIA_BITMAP_BLOCKS *blocks= &row->insert_blocks;
DBUG_ENTER("allocate_and_write_block_record"); DBUG_ENTER("allocate_and_write_block_record");
_ma_bitmap_flushable(info->s, FALSE);
if (_ma_bitmap_find_place(info, row, blocks)) if (_ma_bitmap_find_place(info, row, blocks))
DBUG_RETURN(1); /* Error reading bitmap */ goto err; /* Error reading bitmap */
#ifdef RECOVERY_EXTRA_DEBUG /*
/* Send this over-allocated bitmap to disk and crash, see if recovers */ Sleep; a checkpoint will happen and should not send this over-allocated
DBUG_EXECUTE_IF("maria_flush_bitmap", bitmap to disk but rather wait.
{ */
DBUG_PRINT("maria_flush_bitmap", ("now")); DBUG_EXECUTE_IF("maria_over_alloc_bitmap", sleep(10););
_ma_bitmap_flush(info->s);
_ma_flush_table_files(info, MARIA_FLUSH_DATA |
MARIA_FLUSH_INDEX,
FLUSH_KEEP, FLUSH_KEEP);
});
DBUG_EXECUTE_IF("maria_crash",
{
DBUG_PRINT("maria_crash", ("now"));
fflush(DBUG_FILE);
abort();
});
#endif
/* page will be pinned & locked by get_head_or_tail_page */ /* page will be pinned & locked by get_head_or_tail_page */
if (get_head_or_tail_page(info, blocks->block, info->buff, if (get_head_or_tail_page(info, blocks->block, info->buff,
row->space_on_head_page, HEAD_PAGE, row->space_on_head_page, HEAD_PAGE,
PAGECACHE_LOCK_WRITE, &row_pos)) PAGECACHE_LOCK_WRITE, &row_pos))
DBUG_RETURN(1); goto err;
row->lastpos= ma_recordpos(blocks->block->page, row_pos.rownr); row->lastpos= ma_recordpos(blocks->block->page, row_pos.rownr);
if (info->s->calc_checksum) if (info->s->calc_checksum)
{ {
...@@ -2732,11 +2721,17 @@ static my_bool allocate_and_write_block_record(MARIA_HA *info, ...@@ -2732,11 +2721,17 @@ static my_bool allocate_and_write_block_record(MARIA_HA *info,
if (write_block_record(info, (uchar*) 0, record, row, if (write_block_record(info, (uchar*) 0, record, row,
blocks, blocks->block->org_bitmap_value != 0, blocks, blocks->block->org_bitmap_value != 0,
&row_pos, undo_lsn, 0)) &row_pos, undo_lsn, 0))
DBUG_RETURN(1); /* Error reading bitmap */ goto err; /* Error reading bitmap */
DBUG_PRINT("exit", ("Rowid: %lu (%lu:%u)", (ulong) row->lastpos, DBUG_PRINT("exit", ("Rowid: %lu (%lu:%u)", (ulong) row->lastpos,
(ulong) ma_recordpos_to_page(row->lastpos), (ulong) ma_recordpos_to_page(row->lastpos),
ma_recordpos_to_dir_entry(row->lastpos))); ma_recordpos_to_dir_entry(row->lastpos)));
/* Now let checkpoint happen but don't commit */
DBUG_EXECUTE_IF("maria_over_alloc_bitmap", sleep(1000););
DBUG_RETURN(0); DBUG_RETURN(0);
err:
_ma_bitmap_flushable(info->s, TRUE);
_ma_unpin_all_pages_and_finalize_row(info, LSN_IMPOSSIBLE);
DBUG_RETURN(1);
} }
...@@ -2806,6 +2801,7 @@ my_bool _ma_write_abort_block_record(MARIA_HA *info) ...@@ -2806,6 +2801,7 @@ my_bool _ma_write_abort_block_record(MARIA_HA *info)
MARIA_SHARE *share= info->s; MARIA_SHARE *share= info->s;
DBUG_ENTER("_ma_write_abort_block_record"); DBUG_ENTER("_ma_write_abort_block_record");
_ma_bitmap_flushable(share, FALSE);
if (delete_head_or_tail(info, if (delete_head_or_tail(info,
ma_recordpos_to_page(info->cur_row.lastpos), ma_recordpos_to_page(info->cur_row.lastpos),
ma_recordpos_to_dir_entry(info->cur_row.lastpos), 1, ma_recordpos_to_dir_entry(info->cur_row.lastpos), 1,
...@@ -2840,6 +2836,7 @@ my_bool _ma_write_abort_block_record(MARIA_HA *info) ...@@ -2840,6 +2836,7 @@ my_bool _ma_write_abort_block_record(MARIA_HA *info)
&lsn, (void*) 0)) &lsn, (void*) 0))
res= 1; res= 1;
} }
_ma_bitmap_flushable(share, TRUE);
_ma_unpin_all_pages_and_finalize_row(info, lsn); _ma_unpin_all_pages_and_finalize_row(info, lsn);
DBUG_RETURN(res); DBUG_RETURN(res);
} }
...@@ -2889,12 +2886,13 @@ static my_bool _ma_update_block_record2(MARIA_HA *info, ...@@ -2889,12 +2886,13 @@ static my_bool _ma_update_block_record2(MARIA_HA *info,
calc_record_size(info, record, new_row); calc_record_size(info, record, new_row);
page= ma_recordpos_to_page(record_pos); page= ma_recordpos_to_page(record_pos);
_ma_bitmap_flushable(share, FALSE);
DBUG_ASSERT(share->pagecache->block_size == block_size); DBUG_ASSERT(share->pagecache->block_size == block_size);
if (!(buff= pagecache_read(share->pagecache, if (!(buff= pagecache_read(share->pagecache,
&info->dfile, (pgcache_page_no_t) page, 0, &info->dfile, (pgcache_page_no_t) page, 0,
info->buff, share->page_type, info->buff, share->page_type,
PAGECACHE_LOCK_WRITE, &page_link.link))) PAGECACHE_LOCK_WRITE, &page_link.link)))
DBUG_RETURN(1); goto err;
page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK; page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK;
page_link.changed= 1; page_link.changed= 1;
push_dynamic(&info->pinned_pages, (void*) &page_link); push_dynamic(&info->pinned_pages, (void*) &page_link);
...@@ -2918,7 +2916,7 @@ static my_bool _ma_update_block_record2(MARIA_HA *info, ...@@ -2918,7 +2916,7 @@ static my_bool _ma_update_block_record2(MARIA_HA *info,
if (extend_area_on_page(buff, dir, rownr, share->block_size, if (extend_area_on_page(buff, dir, rownr, share->block_size,
new_row->total_length, &org_empty_size, new_row->total_length, &org_empty_size,
&rec_offset, &length)) &rec_offset, &length))
DBUG_RETURN(1); goto err;
row_pos.buff= buff; row_pos.buff= buff;
row_pos.rownr= rownr; row_pos.rownr= rownr;
...@@ -2980,6 +2978,7 @@ static my_bool _ma_update_block_record2(MARIA_HA *info, ...@@ -2980,6 +2978,7 @@ static my_bool _ma_update_block_record2(MARIA_HA *info,
DBUG_RETURN(res); DBUG_RETURN(res);
err: err:
_ma_bitmap_flushable(share, TRUE);
_ma_unpin_all_pages_and_finalize_row(info, LSN_IMPOSSIBLE); _ma_unpin_all_pages_and_finalize_row(info, LSN_IMPOSSIBLE);
DBUG_RETURN(1); DBUG_RETURN(1);
} }
...@@ -3288,6 +3287,7 @@ my_bool _ma_delete_block_record(MARIA_HA *info, const uchar *record) ...@@ -3288,6 +3287,7 @@ my_bool _ma_delete_block_record(MARIA_HA *info, const uchar *record)
DBUG_PRINT("enter", ("Rowid: %lu (%lu:%u)", (ulong) info->cur_row.lastpos, DBUG_PRINT("enter", ("Rowid: %lu (%lu:%u)", (ulong) info->cur_row.lastpos,
(ulong) page, record_number)); (ulong) page, record_number));
_ma_bitmap_flushable(share, FALSE);
if (delete_head_or_tail(info, page, record_number, 1, 0) || if (delete_head_or_tail(info, page, record_number, 1, 0) ||
delete_tails(info, info->cur_row.tail_positions)) delete_tails(info, info->cur_row.tail_positions))
goto err; goto err;
...@@ -3334,10 +3334,12 @@ my_bool _ma_delete_block_record(MARIA_HA *info, const uchar *record) ...@@ -3334,10 +3334,12 @@ my_bool _ma_delete_block_record(MARIA_HA *info, const uchar *record)
} }
_ma_bitmap_flushable(share, TRUE);
_ma_unpin_all_pages_and_finalize_row(info, lsn); _ma_unpin_all_pages_and_finalize_row(info, lsn);
DBUG_RETURN(0); DBUG_RETURN(0);
err: err:
_ma_bitmap_flushable(share, TRUE);
_ma_unpin_all_pages_and_finalize_row(info, LSN_IMPOSSIBLE); _ma_unpin_all_pages_and_finalize_row(info, LSN_IMPOSSIBLE);
DBUG_RETURN(1); DBUG_RETURN(1);
} }
...@@ -5509,10 +5511,14 @@ uint _ma_apply_redo_insert_row_blobs(MARIA_HA *info, ...@@ -5509,10 +5511,14 @@ uint _ma_apply_redo_insert_row_blobs(MARIA_HA *info,
enum pagecache_page_pin unpin_method; enum pagecache_page_pin unpin_method;
uint length; uint length;
if ((page * info->s->block_size) > info->state->data_file_length) if (((page + 1) * info->s->block_size) >
info->state->data_file_length)
{ {
/* New page or half written page at end of file */ /* New page or half written page at end of file */
info->state->data_file_length= page * info->s->block_size; DBUG_PRINT("info", ("Enlarging data file from %lu to %lu",
(ulong) info->state->data_file_length,
(ulong) ((page + 1 ) * info->s->block_size)));
info->state->data_file_length= (page + 1) * info->s->block_size;
buff= info->keyread_buff; buff= info->keyread_buff;
info->keyread_buff_used= 1; info->keyread_buff_used= 1;
make_empty_page(info, buff, BLOB_PAGE); make_empty_page(info, buff, BLOB_PAGE);
...@@ -5540,7 +5546,12 @@ uint _ma_apply_redo_insert_row_blobs(MARIA_HA *info, ...@@ -5540,7 +5546,12 @@ uint _ma_apply_redo_insert_row_blobs(MARIA_HA *info,
LSN_IMPOSSIBLE, 0); LSN_IMPOSSIBLE, 0);
DBUG_RETURN(my_errno); DBUG_RETURN(my_errno);
} }
/* Physical file was too short; Create new page */ /*
Physical file was too short, create new page. It can be that
recovery started with a file with N pages, wrote page N+2 into
pagecache (increased data_file_length but not physical file
length), now reads page N+1: the read fails.
*/
buff= info->keyread_buff; buff= info->keyread_buff;
info->keyread_buff_used= 1; info->keyread_buff_used= 1;
make_empty_page(info, buff, BLOB_PAGE); make_empty_page(info, buff, BLOB_PAGE);
...@@ -5637,6 +5648,7 @@ my_bool _ma_apply_undo_row_insert(MARIA_HA *info, LSN undo_lsn, ...@@ -5637,6 +5648,7 @@ my_bool _ma_apply_undo_row_insert(MARIA_HA *info, LSN undo_lsn,
if (read_row_extent_info(info, buff, rownr)) if (read_row_extent_info(info, buff, rownr))
DBUG_RETURN(1); DBUG_RETURN(1);
_ma_bitmap_flushable(share, FALSE);
if (delete_head_or_tail(info, page, rownr, 1, 1) || if (delete_head_or_tail(info, page, rownr, 1, 1) ||
delete_tails(info, info->cur_row.tail_positions)) delete_tails(info, info->cur_row.tail_positions))
goto err; goto err;
...@@ -5653,6 +5665,7 @@ my_bool _ma_apply_undo_row_insert(MARIA_HA *info, LSN undo_lsn, ...@@ -5653,6 +5665,7 @@ my_bool _ma_apply_undo_row_insert(MARIA_HA *info, LSN undo_lsn,
res= 0; res= 0;
err: err:
_ma_bitmap_flushable(share, TRUE);
_ma_unpin_all_pages_and_finalize_row(info, lsn); _ma_unpin_all_pages_and_finalize_row(info, lsn);
DBUG_RETURN(res); DBUG_RETURN(res);
} }
......
...@@ -171,6 +171,7 @@ my_bool _ma_compare_block_record(register MARIA_HA *info, ...@@ -171,6 +171,7 @@ my_bool _ma_compare_block_record(register MARIA_HA *info,
my_bool _ma_bitmap_init(MARIA_SHARE *share, File file); my_bool _ma_bitmap_init(MARIA_SHARE *share, File file);
my_bool _ma_bitmap_end(MARIA_SHARE *share); my_bool _ma_bitmap_end(MARIA_SHARE *share);
my_bool _ma_bitmap_flush(MARIA_SHARE *share); my_bool _ma_bitmap_flush(MARIA_SHARE *share);
my_bool _ma_bitmap_flush_all(MARIA_SHARE *share);
void _ma_bitmap_reset_cache(MARIA_SHARE *share); void _ma_bitmap_reset_cache(MARIA_SHARE *share);
my_bool _ma_bitmap_find_place(MARIA_HA *info, MARIA_ROW *row, my_bool _ma_bitmap_find_place(MARIA_HA *info, MARIA_ROW *row,
MARIA_BITMAP_BLOCKS *result_blocks); MARIA_BITMAP_BLOCKS *result_blocks);
...@@ -198,6 +199,7 @@ my_bool _ma_check_if_right_bitmap_type(MARIA_HA *info, ...@@ -198,6 +199,7 @@ my_bool _ma_check_if_right_bitmap_type(MARIA_HA *info,
uint *bitmap_pattern); uint *bitmap_pattern);
void _ma_bitmap_delete_all(MARIA_SHARE *share); void _ma_bitmap_delete_all(MARIA_SHARE *share);
int _ma_bitmap_create_first(MARIA_SHARE *share); int _ma_bitmap_create_first(MARIA_SHARE *share);
void _ma_bitmap_flushable(MARIA_SHARE *share, my_bool flushable);
#ifndef DBUG_OFF #ifndef DBUG_OFF
void _ma_print_bitmap(MARIA_FILE_BITMAP *bitmap, uchar *data, void _ma_print_bitmap(MARIA_FILE_BITMAP *bitmap, uchar *data,
ulonglong page); ulonglong page);
......
...@@ -59,9 +59,7 @@ static uint checkpoints_total= 0, /**< all checkpoint requests made */ ...@@ -59,9 +59,7 @@ static uint checkpoints_total= 0, /**< all checkpoint requests made */
struct st_filter_param struct st_filter_param
{ {
my_bool is_data_file; /**< is the file about data or index */
LSN up_to_lsn; /**< only pages with rec_lsn < this LSN */ LSN up_to_lsn; /**< only pages with rec_lsn < this LSN */
ulong pages_covered_by_bitmap; /**< to know which page is a bitmap page */
uint max_pages; /**< stop after flushing this number pages */ uint max_pages; /**< stop after flushing this number pages */
}; /**< information to determine which dirty pages should be flushed */ }; /**< information to determine which dirty pages should be flushed */
...@@ -74,10 +72,6 @@ filter_flush_file_full(enum pagecache_page_type type, ...@@ -74,10 +72,6 @@ filter_flush_file_full(enum pagecache_page_type type,
pgcache_page_no_t page, pgcache_page_no_t page,
LSN rec_lsn, void *arg); LSN rec_lsn, void *arg);
static enum pagecache_flush_filter_result static enum pagecache_flush_filter_result
filter_flush_file_indirect(enum pagecache_page_type type,
pgcache_page_no_t page,
LSN rec_lsn, void *arg);
static enum pagecache_flush_filter_result
filter_flush_file_evenly(enum pagecache_page_type type, filter_flush_file_evenly(enum pagecache_page_type type,
pgcache_page_no_t pageno, pgcache_page_no_t pageno,
LSN rec_lsn, void *arg); LSN rec_lsn, void *arg);
...@@ -264,7 +258,7 @@ static int really_execute_checkpoint(void) ...@@ -264,7 +258,7 @@ static int really_execute_checkpoint(void)
/* checkpoint succeeded */ /* checkpoint succeeded */
ptr= record_pieces[3].str; ptr= record_pieces[3].str;
pages_to_flush_before_next_checkpoint= uint4korr(ptr); pages_to_flush_before_next_checkpoint= uint4korr(ptr);
DBUG_PRINT("info",("%u pages to flush before next checkpoint", DBUG_PRINT("checkpoint",("%u pages to flush before next checkpoint",
(uint)pages_to_flush_before_next_checkpoint)); (uint)pages_to_flush_before_next_checkpoint));
/* compute log's low-water mark */ /* compute log's low-water mark */
...@@ -350,9 +344,11 @@ int ma_checkpoint_init(ulong interval) ...@@ -350,9 +344,11 @@ int ma_checkpoint_init(ulong interval)
@param what_to_flush 0: current bitmap and all data pages @param what_to_flush 0: current bitmap and all data pages
1: state 1: state
2: all bitmap pages
*/ */
static void flush_all_tables(int what_to_flush) static void flush_all_tables(int what_to_flush)
{ {
int res= 0;
LIST *pos; /**< to iterate over open tables */ LIST *pos; /**< to iterate over open tables */
pthread_mutex_lock(&THR_LOCK_maria); pthread_mutex_lock(&THR_LOCK_maria);
for (pos= maria_open_list; pos; pos= pos->next) for (pos= maria_open_list; pos; pos= pos->next)
...@@ -363,17 +359,21 @@ static void flush_all_tables(int what_to_flush) ...@@ -363,17 +359,21 @@ static void flush_all_tables(int what_to_flush)
switch (what_to_flush) switch (what_to_flush)
{ {
case 0: case 0:
_ma_flush_table_files(info, MARIA_FLUSH_DATA | MARIA_FLUSH_INDEX, res= _ma_flush_table_files(info, MARIA_FLUSH_DATA | MARIA_FLUSH_INDEX,
FLUSH_KEEP, FLUSH_KEEP); FLUSH_KEEP, FLUSH_KEEP);
break; break;
case 1: case 1:
_ma_state_info_write(info->s, 1|4); res= _ma_state_info_write(info->s, 1|4);
DBUG_PRINT("maria_flush_states", DBUG_PRINT("maria_flush_states",
("is_of_horizon: LSN (%lu,0x%lx)", ("is_of_horizon: LSN (%lu,0x%lx)",
LSN_IN_PARTS(info->s->state.is_of_horizon))); LSN_IN_PARTS(info->s->state.is_of_horizon)));
break; break;
case 2:
res= _ma_bitmap_flush_all(info->s);
break;
} }
} }
DBUG_ASSERT(res == 0);
} }
pthread_mutex_unlock(&THR_LOCK_maria); pthread_mutex_unlock(&THR_LOCK_maria);
} }
...@@ -387,6 +387,11 @@ static void flush_all_tables(int what_to_flush) ...@@ -387,6 +387,11 @@ static void flush_all_tables(int what_to_flush)
void ma_checkpoint_end(void) void ma_checkpoint_end(void)
{ {
DBUG_ENTER("ma_checkpoint_end"); DBUG_ENTER("ma_checkpoint_end");
DBUG_EXECUTE_IF("maria_flush_bitmap",
{
DBUG_PRINT("maria_flush_bitmap", ("now"));
flush_all_tables(2);
});
DBUG_EXECUTE_IF("maria_flush_whole_page_cache", DBUG_EXECUTE_IF("maria_flush_whole_page_cache",
{ {
DBUG_PRINT("maria_flush_whole_page_cache", ("now")); DBUG_PRINT("maria_flush_whole_page_cache", ("now"));
...@@ -447,8 +452,8 @@ void ma_checkpoint_end(void) ...@@ -447,8 +452,8 @@ void ma_checkpoint_end(void)
We flush data/index pages which have been dirty since the previous We flush data/index pages which have been dirty since the previous
checkpoint (this is the two-checkpoint rule: the REDO phase will not have checkpoint (this is the two-checkpoint rule: the REDO phase will not have
to start from earlier than the next-to-last checkpoint), and all dirty to start from earlier than the next-to-last checkpoint).
bitmap pages. Bitmap pages are handled by _ma_bitmap_flush_all().
@param type Page's type @param type Page's type
@param pageno Page's number @param pageno Page's number
...@@ -458,21 +463,20 @@ void ma_checkpoint_end(void) ...@@ -458,21 +463,20 @@ void ma_checkpoint_end(void)
static enum pagecache_flush_filter_result static enum pagecache_flush_filter_result
filter_flush_file_medium(enum pagecache_page_type type, filter_flush_file_medium(enum pagecache_page_type type,
pgcache_page_no_t pageno, pgcache_page_no_t pageno __attribute__ ((unused)),
LSN rec_lsn, void *arg) LSN rec_lsn, void *arg)
{ {
struct st_filter_param *param= (struct st_filter_param *)arg; struct st_filter_param *param= (struct st_filter_param *)arg;
return ((type == PAGECACHE_LSN_PAGE) && return (type == PAGECACHE_LSN_PAGE) &&
(cmp_translog_addr(rec_lsn, param->up_to_lsn) <= 0)) || (cmp_translog_addr(rec_lsn, param->up_to_lsn) <= 0);
(param->is_data_file &&
((pageno % param->pages_covered_by_bitmap) == 0));
} }
/** /**
@brief dirty-page filtering criteria for FULL checkpoint. @brief dirty-page filtering criteria for FULL checkpoint.
We flush all dirty data/index pages and all dirty bitmap pages. We flush all dirty data/index pages.
Bitmap pages are handled by _ma_bitmap_flush_all().
@param type Page's type @param type Page's type
@param pageno Page's number @param pageno Page's number
...@@ -482,39 +486,11 @@ filter_flush_file_medium(enum pagecache_page_type type, ...@@ -482,39 +486,11 @@ filter_flush_file_medium(enum pagecache_page_type type,
static enum pagecache_flush_filter_result static enum pagecache_flush_filter_result
filter_flush_file_full(enum pagecache_page_type type, filter_flush_file_full(enum pagecache_page_type type,
pgcache_page_no_t pageno, pgcache_page_no_t pageno __attribute__ ((unused)),
LSN rec_lsn __attribute__ ((unused)),
void *arg)
{
struct st_filter_param *param= (struct st_filter_param *)arg;
return (type == PAGECACHE_LSN_PAGE) ||
(param->is_data_file &&
((pageno % param->pages_covered_by_bitmap) == 0));
}
/**
@brief dirty-page filtering criteria for INDIRECT checkpoint.
We flush all dirty bitmap pages.
@param type Page's type
@param pageno Page's number
@param rec_lsn Page's rec_lsn
@param arg filter_param
*/
static enum pagecache_flush_filter_result
filter_flush_file_indirect(enum pagecache_page_type type
__attribute__ ((unused)),
pgcache_page_no_t pageno,
LSN rec_lsn __attribute__ ((unused)), LSN rec_lsn __attribute__ ((unused)),
void *arg) void *arg __attribute__ ((unused)))
{ {
struct st_filter_param *param= (struct st_filter_param *)arg; return (type == PAGECACHE_LSN_PAGE);
return
(param->is_data_file &&
((pageno % param->pages_covered_by_bitmap) == 0));
} }
...@@ -526,6 +502,8 @@ filter_flush_file_indirect(enum pagecache_page_type type ...@@ -526,6 +502,8 @@ filter_flush_file_indirect(enum pagecache_page_type type
to start from earlier than the next-to-last checkpoint), and no to start from earlier than the next-to-last checkpoint), and no
bitmap pages. But we flush no more than a certain number of pages (to have bitmap pages. But we flush no more than a certain number of pages (to have
an even flushing, no write burst). an even flushing, no write burst).
The reason to not flush bitmap pages is that they may not be in a flushable
state at this moment and we don't want to wait for them.
@param type Page's type @param type Page's type
@param pageno Page's number @param pageno Page's number
...@@ -574,9 +552,11 @@ pthread_handler_t ma_checkpoint_background(void *arg) ...@@ -574,9 +552,11 @@ pthread_handler_t ma_checkpoint_background(void *arg)
about the interval's value when it started. about the interval's value when it started.
*/ */
const ulong interval= (ulong)arg; const ulong interval= (ulong)arg;
uint sleeps; uint sleeps, sleep_time;
TRANSLOG_ADDRESS log_horizon_at_last_checkpoint= LSN_IMPOSSIBLE; TRANSLOG_ADDRESS log_horizon_at_last_checkpoint=
ulonglong pagecache_flushes_at_last_checkpoint= 0; translog_get_horizon();
ulonglong pagecache_flushes_at_last_checkpoint=
maria_pagecache->global_cache_write;
uint pages_bunch_size; uint pages_bunch_size;
struct st_filter_param filter_param; struct st_filter_param filter_param;
PAGECACHE_FILE *dfile; /**< data file currently being flushed */ PAGECACHE_FILE *dfile; /**< data file currently being flushed */
...@@ -602,7 +582,7 @@ pthread_handler_t ma_checkpoint_background(void *arg) ...@@ -602,7 +582,7 @@ pthread_handler_t ma_checkpoint_background(void *arg)
sleeps=0; sleeps=0;
#endif #endif
struct timespec abstime; struct timespec abstime;
switch((sleeps++) % interval) switch (sleeps % interval)
{ {
case 0: case 0:
/* /*
...@@ -626,8 +606,10 @@ pthread_handler_t ma_checkpoint_background(void *arg) ...@@ -626,8 +606,10 @@ pthread_handler_t ma_checkpoint_background(void *arg)
{ {
/* don't take checkpoint, so don't know what to flush */ /* don't take checkpoint, so don't know what to flush */
pages_to_flush_before_next_checkpoint= 0; pages_to_flush_before_next_checkpoint= 0;
sleep_time= interval;
break; break;
} }
sleep_time= 1;
ma_checkpoint_execute(CHECKPOINT_MEDIUM, TRUE); ma_checkpoint_execute(CHECKPOINT_MEDIUM, TRUE);
/* /*
Snapshot this kind of "state" of the engine. Note that the value below Snapshot this kind of "state" of the engine. Note that the value below
...@@ -653,11 +635,11 @@ pthread_handler_t ma_checkpoint_background(void *arg) ...@@ -653,11 +635,11 @@ pthread_handler_t ma_checkpoint_background(void *arg)
default: default:
if (pages_bunch_size > 0) if (pages_bunch_size > 0)
{ {
DBUG_PRINT("info", ("Maria background checkpoint thread: %u pages", DBUG_PRINT("checkpoint",
("Maria background checkpoint thread: %u pages",
pages_bunch_size)); pages_bunch_size));
/* flush a bunch of dirty pages */ /* flush a bunch of dirty pages */
filter_param.max_pages= pages_bunch_size; filter_param.max_pages= pages_bunch_size;
filter_param.is_data_file= TRUE;
while (dfile != dfiles_end) while (dfile != dfiles_end)
{ {
/* /*
...@@ -683,7 +665,6 @@ pthread_handler_t ma_checkpoint_background(void *arg) ...@@ -683,7 +665,6 @@ pthread_handler_t ma_checkpoint_background(void *arg)
we wrote enough pages. we wrote enough pages.
*/ */
} }
filter_param.is_data_file= FALSE;
while (kfile != kfiles_end) while (kfile != kfiles_end)
{ {
int res= int res=
...@@ -697,6 +678,12 @@ pthread_handler_t ma_checkpoint_background(void *arg) ...@@ -697,6 +678,12 @@ pthread_handler_t ma_checkpoint_background(void *arg)
break; /* and we will continue with the same file */ break; /* and we will continue with the same file */
kfile++; /* otherwise all this file is flushed, move to next file */ kfile++; /* otherwise all this file is flushed, move to next file */
} }
sleep_time= 1;
}
else
{
/* Can directly sleep until the next checkpoint moment */
sleep_time= interval - (sleeps % interval);
} }
} }
pthread_mutex_lock(&LOCK_checkpoint); pthread_mutex_lock(&LOCK_checkpoint);
...@@ -708,12 +695,14 @@ pthread_handler_t ma_checkpoint_background(void *arg) ...@@ -708,12 +695,14 @@ pthread_handler_t ma_checkpoint_background(void *arg)
pthread_mutex_lock(&LOCK_checkpoint); pthread_mutex_lock(&LOCK_checkpoint);
#else #else
/* To have a killable sleep, we use timedwait like our SQL GET_LOCK() */ /* To have a killable sleep, we use timedwait like our SQL GET_LOCK() */
set_timespec(abstime, 1); DBUG_PRINT("info", ("sleeping %u seconds", sleep_time));
set_timespec(abstime, sleep_time);
pthread_cond_timedwait(&COND_checkpoint, &LOCK_checkpoint, &abstime); pthread_cond_timedwait(&COND_checkpoint, &LOCK_checkpoint, &abstime);
#endif #endif
if (checkpoint_thread_die == 1) if (checkpoint_thread_die == 1)
break; break;
pthread_mutex_unlock(&LOCK_checkpoint); pthread_mutex_unlock(&LOCK_checkpoint);
sleeps+= sleep_time;
} }
pthread_mutex_unlock(&LOCK_checkpoint); pthread_mutex_unlock(&LOCK_checkpoint);
DBUG_PRINT("info",("Maria background checkpoint thread ends")); DBUG_PRINT("info",("Maria background checkpoint thread ends"));
...@@ -855,7 +844,7 @@ static int collect_tables(LEX_STRING *str, LSN checkpoint_start_log_horizon) ...@@ -855,7 +844,7 @@ static int collect_tables(LEX_STRING *str, LSN checkpoint_start_log_horizon)
filter= &filter_flush_file_full; filter= &filter_flush_file_full;
break; break;
case CHECKPOINT_INDIRECT: case CHECKPOINT_INDIRECT:
filter= &filter_flush_file_indirect; filter= NULL;
break; break;
default: default:
DBUG_ASSERT(0); DBUG_ASSERT(0);
...@@ -888,6 +877,7 @@ static int collect_tables(LEX_STRING *str, LSN checkpoint_start_log_horizon) ...@@ -888,6 +877,7 @@ static int collect_tables(LEX_STRING *str, LSN checkpoint_start_log_horizon)
{ {
MARIA_SHARE *share= distinct_shares[i]; MARIA_SHARE *share= distinct_shares[i];
PAGECACHE_FILE kfile, dfile; PAGECACHE_FILE kfile, dfile;
my_bool ignore_share;
if (!(share->in_checkpoint & MARIA_CHECKPOINT_LOOKS_AT_ME)) if (!(share->in_checkpoint & MARIA_CHECKPOINT_LOOKS_AT_ME))
{ {
/* No need for a mutex to read the above, only us can write this flag */ /* No need for a mutex to read the above, only us can write this flag */
...@@ -957,7 +947,6 @@ static int collect_tables(LEX_STRING *str, LSN checkpoint_start_log_horizon) ...@@ -957,7 +947,6 @@ static int collect_tables(LEX_STRING *str, LSN checkpoint_start_log_horizon)
for ( ; state_copy->index != i; state_copy++) for ( ; state_copy->index != i; state_copy++)
DBUG_ASSERT(state_copy < state_copies_end); DBUG_ASSERT(state_copy < state_copies_end);
filter_param.pages_covered_by_bitmap= share->bitmap.pages_covered;
/* OS file descriptors are ints which we stored in 4 bytes */ /* OS file descriptors are ints which we stored in 4 bytes */
compile_time_assert(sizeof(int) <= 4); compile_time_assert(sizeof(int) <= 4);
pthread_mutex_lock(&share->intern_lock); pthread_mutex_lock(&share->intern_lock);
...@@ -978,7 +967,9 @@ static int collect_tables(LEX_STRING *str, LSN checkpoint_start_log_horizon) ...@@ -978,7 +967,9 @@ static int collect_tables(LEX_STRING *str, LSN checkpoint_start_log_horizon)
onto a newer one (assuming the table has been reopened with a different onto a newer one (assuming the table has been reopened with a different
share but of course same physical index file). share but of course same physical index file).
*/ */
if ((share->id != 0) && (share->last_version != 0)) ignore_share= (share->id == 0) | (share->last_version == 0);
DBUG_PRINT("info", ("ignore_share: %d", ignore_share));
if (!ignore_share)
{ {
/** @todo avoid strlen */ /** @todo avoid strlen */
uint open_file_name_len= strlen(share->open_file_name) + 1; uint open_file_name_len= strlen(share->open_file_name) + 1;
...@@ -1061,14 +1052,12 @@ static int collect_tables(LEX_STRING *str, LSN checkpoint_start_log_horizon) ...@@ -1061,14 +1052,12 @@ static int collect_tables(LEX_STRING *str, LSN checkpoint_start_log_horizon)
each checkpoint if the table was once written and then not anymore. each checkpoint if the table was once written and then not anymore.
*/ */
} }
/** if (_ma_bitmap_flush_all(share))
@todo RECOVERY BUG this is going to flush the bitmap page possibly to {
disk even though it could be over-allocated with not yet any sync_error= 1;
REDO-UNDO complete group (WAL violation: no way to undo the /** @todo all write failures should mark table corrupted */
over-allocation if crash); see also _ma_change_bitmap_page(). ma_message_no_user(0, "checkpoint bitmap page flush failed");
*/ }
sync_error|=
_ma_bitmap_flush(share); /* after that, all is in page cache */
DBUG_ASSERT(share->pagecache == maria_pagecache); DBUG_ASSERT(share->pagecache == maria_pagecache);
} }
if (share->in_checkpoint & MARIA_CHECKPOINT_SHOULD_FREE_ME) if (share->in_checkpoint & MARIA_CHECKPOINT_SHOULD_FREE_ME)
...@@ -1135,26 +1124,21 @@ static int collect_tables(LEX_STRING *str, LSN checkpoint_start_log_horizon) ...@@ -1135,26 +1124,21 @@ static int collect_tables(LEX_STRING *str, LSN checkpoint_start_log_horizon)
the evicter will fail to write their page: corruption. the evicter will fail to write their page: corruption.
*/ */
/* if (!ignore_share)
We do NOT use FLUSH_KEEP_LAZY because we must be sure that bitmap pages {
have been flushed. That's a condition of correctness of Recovery: data if (filter != NULL)
pages may have been all flushed, if we write the checkpoint record {
Recovery will start from after their REDOs. If bitmap page was not if ((flush_pagecache_blocks_with_filter(maria_pagecache,
flushed, as the REDOs about it will be skipped, it will wrongly not be &dfile, FLUSH_KEEP_LAZY,
recovered. If bitmap pages had a rec_lsn it would be different.
*/
if ((filter_param.is_data_file= TRUE),
(flush_pagecache_blocks_with_filter(maria_pagecache,
&dfile, FLUSH_KEEP,
filter, &filter_param) & filter, &filter_param) &
PCFLUSH_ERROR)) PCFLUSH_ERROR))
ma_message_no_user(0, "checkpoint data page flush failed"); ma_message_no_user(0, "checkpoint data page flush failed");
if ((filter_param.is_data_file= FALSE), if ((flush_pagecache_blocks_with_filter(maria_pagecache,
(flush_pagecache_blocks_with_filter(maria_pagecache, &kfile, FLUSH_KEEP_LAZY,
&kfile, FLUSH_KEEP,
filter, &filter_param) & filter, &filter_param) &
PCFLUSH_ERROR)) PCFLUSH_ERROR))
ma_message_no_user(0, "checkpoint index page flush failed"); ma_message_no_user(0, "checkpoint index page flush failed");
}
/* /*
fsyncs the fd, that's the loooong operation (e.g. max 150 fsync fsyncs the fd, that's the loooong operation (e.g. max 150 fsync
per second, so if you have touched 1000 files it's 7 seconds). per second, so if you have touched 1000 files it's 7 seconds).
...@@ -1167,6 +1151,7 @@ static int collect_tables(LEX_STRING *str, LSN checkpoint_start_log_horizon) ...@@ -1167,6 +1151,7 @@ static int collect_tables(LEX_STRING *str, LSN checkpoint_start_log_horizon)
still useful. still useful.
*/ */
} }
}
if (sync_error) if (sync_error)
goto err; goto err;
......
...@@ -51,12 +51,6 @@ int ma_commit(TRN *trn) ...@@ -51,12 +51,6 @@ int ma_commit(TRN *trn)
So we need to go the first way. So we need to go the first way.
*/ */
/**
@todo RECOVERY share's state is written to disk only in
maria_lock_database(), so COMMIT record is not the last record of the
transaction! It is probably an issue. Recovery of the state is a problem
not yet solved.
*/
/* /*
We do not store "thd->transaction.xid_state.xid" for now, it will be We do not store "thd->transaction.xid_state.xid" for now, it will be
needed only when we support XA. needed only when we support XA.
......
...@@ -175,7 +175,7 @@ my_bool write_hook_for_clr_end(enum translog_record_type type ...@@ -175,7 +175,7 @@ my_bool write_hook_for_clr_end(enum translog_record_type type
/** /**
@brief write hook for undo key insert @brief write hook for undo key
*/ */
my_bool write_hook_for_undo_key(enum translog_record_type type, my_bool write_hook_for_undo_key(enum translog_record_type type,
......
...@@ -389,8 +389,6 @@ static LOG_DESC INIT_LOGREC_REDO_NOT_USED= ...@@ -389,8 +389,6 @@ static LOG_DESC INIT_LOGREC_REDO_NOT_USED=
{LOGRECTYPE_VARIABLE_LENGTH, 0, 8, NULL, write_hook_for_redo, NULL, 0, {LOGRECTYPE_VARIABLE_LENGTH, 0, 8, NULL, write_hook_for_redo, NULL, 0,
"redo_insert_row_blob", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL}; "redo_insert_row_blob", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
/** @todo RECOVERY BUG handle it in recovery */
/*QQ:TODO:header???*/
static LOG_DESC INIT_LOGREC_REDO_INSERT_ROW_BLOBS= static LOG_DESC INIT_LOGREC_REDO_INSERT_ROW_BLOBS=
{LOGRECTYPE_VARIABLE_LENGTH, 0, FILEID_STORE_SIZE, NULL, {LOGRECTYPE_VARIABLE_LENGTH, 0, FILEID_STORE_SIZE, NULL,
write_hook_for_redo, NULL, 0, write_hook_for_redo, NULL, 0,
......
...@@ -1100,7 +1100,6 @@ uint _ma_state_info_write(MARIA_SHARE *share, uint pWrite) ...@@ -1100,7 +1100,6 @@ uint _ma_state_info_write(MARIA_SHARE *share, uint pWrite)
uint _ma_state_info_write_sub(File file, MARIA_STATE_INFO *state, uint pWrite) uint _ma_state_info_write_sub(File file, MARIA_STATE_INFO *state, uint pWrite)
{ {
/** @todo RECOVERY write it only at checkpoint time */
uchar buff[MARIA_STATE_INFO_SIZE + MARIA_STATE_EXTRA_SIZE]; uchar buff[MARIA_STATE_INFO_SIZE + MARIA_STATE_EXTRA_SIZE];
uchar *ptr=buff; uchar *ptr=buff;
uint i, keys= (uint) state->header.keys; uint i, keys= (uint) state->header.keys;
...@@ -1143,7 +1142,6 @@ uint _ma_state_info_write_sub(File file, MARIA_STATE_INFO *state, uint pWrite) ...@@ -1143,7 +1142,6 @@ uint _ma_state_info_write_sub(File file, MARIA_STATE_INFO *state, uint pWrite)
{ {
mi_sizestore(ptr,state->key_root[i]); ptr+= 8; mi_sizestore(ptr,state->key_root[i]); ptr+= 8;
} }
/** @todo RECOVERY BUG key_del is a problem for recovery */
mi_sizestore(ptr,state->key_del); ptr+= 8; mi_sizestore(ptr,state->key_del); ptr+= 8;
if (pWrite & 2) /* From maria_chk */ if (pWrite & 2) /* From maria_chk */
{ {
......
...@@ -601,6 +601,10 @@ static uint pagecache_fwrite(PAGECACHE *pagecache, ...@@ -601,6 +601,10 @@ static uint pagecache_fwrite(PAGECACHE *pagecache,
{ {
DBUG_ENTER("pagecache_fwrite"); DBUG_ENTER("pagecache_fwrite");
DBUG_ASSERT(type != PAGECACHE_READ_UNKNOWN_PAGE); DBUG_ASSERT(type != PAGECACHE_READ_UNKNOWN_PAGE);
/**
@todo RECOVERY BUG Here, we should call a callback get_lsn(): it will use
lsn_korr() for LSN pages, and translog_get_horizon() for bitmap pages.
*/
if (type == PAGECACHE_LSN_PAGE) if (type == PAGECACHE_LSN_PAGE)
{ {
LSN lsn; LSN lsn;
...@@ -4185,18 +4189,8 @@ my_bool pagecache_collect_changed_blocks_with_lsn(PAGECACHE *pagecache, ...@@ -4185,18 +4189,8 @@ my_bool pagecache_collect_changed_blocks_with_lsn(PAGECACHE *pagecache,
*/ */
DBUG_ASSERT(block->hash_link != NULL); DBUG_ASSERT(block->hash_link != NULL);
DBUG_ASSERT(block->status & PCBLOCK_CHANGED); DBUG_ASSERT(block->status & PCBLOCK_CHANGED);
/**
@todo RECOVERY BUG
REDO phase uses PAGECACHE_PLAIN_PAGE, so the lines below would
confuse the indirect Checkpoint taken at the end of the REDO phase.
So we below collect even dirty pages of temporary tables as a result
:( Soon we should have the MARIA_SHARE accessible from the
pagecache's block and then we can test born_transactional.
*/
#ifdef TRANS_TABLES_ALWAYS_USE_LSN_PAGE
if (block->type != PAGECACHE_LSN_PAGE) if (block->type != PAGECACHE_LSN_PAGE)
continue; /* no need to store it */ continue; /* no need to store it */
#endif
stored_list_size++; stored_list_size++;
} }
} }
...@@ -4221,10 +4215,8 @@ my_bool pagecache_collect_changed_blocks_with_lsn(PAGECACHE *pagecache, ...@@ -4221,10 +4215,8 @@ my_bool pagecache_collect_changed_blocks_with_lsn(PAGECACHE *pagecache,
block; block;
block= block->next_changed) block= block->next_changed)
{ {
#ifdef TRANS_TABLES_ALWAYS_USE_LSN_PAGE
if (block->type != PAGECACHE_LSN_PAGE) if (block->type != PAGECACHE_LSN_PAGE)
continue; /* no need to store it in the checkpoint record */ continue; /* no need to store it in the checkpoint record */
#endif
compile_time_assert(sizeof(block->hash_link->file.file) <= 4); compile_time_assert(sizeof(block->hash_link->file.file) <= 4);
compile_time_assert(sizeof(block->hash_link->pageno) <= 4); compile_time_assert(sizeof(block->hash_link->pageno) <= 4);
int4store(ptr, block->hash_link->file.file); int4store(ptr, block->hash_link->file.file);
......
...@@ -348,11 +348,14 @@ int maria_apply_log(LSN from_lsn, enum maria_apply_log_way apply, ...@@ -348,11 +348,14 @@ int maria_apply_log(LSN from_lsn, enum maria_apply_log_way apply,
REDO phase does not fill blocks' rec_lsn, so a checkpoint now would be REDO phase does not fill blocks' rec_lsn, so a checkpoint now would be
wrong: if a future recovery used it, the REDO phase would always wrong: if a future recovery used it, the REDO phase would always
start from the checkpoint and never from before, wrongly skipping REDOs start from the checkpoint and never from before, wrongly skipping REDOs
(tested). (tested). Another problem is that the REDO phase uses
PAGECACHE_PLAIN_PAGE, while Checkpoint only collects PAGECACHE_LSN_PAGE.
@todo fix this; pagecache_write() now can have a rec_lsn argument. @todo fix this. pagecache_write() now can have a rec_lsn argument. And we
could make a function which goes through pages at end of REDO phase and
changes their type.
*/ */
#if 0 #ifdef FIX_AND_ENABLE_LATER
if (take_checkpoints && checkpoint_useful) if (take_checkpoints && checkpoint_useful)
{ {
/* /*
...@@ -478,14 +481,11 @@ prototype_redo_exec_hook(LONG_TRANSACTION_ID) ...@@ -478,14 +481,11 @@ prototype_redo_exec_hook(LONG_TRANSACTION_ID)
{ {
uint16 sid= rec->short_trid; uint16 sid= rec->short_trid;
TrID long_trid= all_active_trans[sid].long_trid; TrID long_trid= all_active_trans[sid].long_trid;
/* abort group of this trn (must be of before a crash) */ /*
LSN gslsn= all_active_trans[sid].group_start_lsn; Any incomplete group should be of an old crash which already had a
if (gslsn != LSN_IMPOSSIBLE) recovery and thus has logged INCOMPLETE_GROUP which we must have seen.
{ */
tprint(tracef, "Group at LSN (%lu,0x%lx) short_trid %u incomplete\n", DBUG_ASSERT(all_active_trans[sid].group_start_lsn == LSN_IMPOSSIBLE);
LSN_IN_PARTS(gslsn), sid);
all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
}
if (long_trid != 0) if (long_trid != 0)
{ {
LSN ulsn= all_active_trans[sid].undo_lsn; LSN ulsn= all_active_trans[sid].undo_lsn;
...@@ -1160,6 +1160,7 @@ static int new_table(uint16 sid, const char *name, ...@@ -1160,6 +1160,7 @@ static int new_table(uint16 sid, const char *name,
} }
if (maria_is_crashed(info)) if (maria_is_crashed(info))
{ {
/** @todo what should we do? how to continue recovery? */
tprint(tracef, "Table is crashed, can't apply log records to it\n"); tprint(tracef, "Table is crashed, can't apply log records to it\n");
goto end; goto end;
} }
...@@ -1566,10 +1567,6 @@ prototype_redo_exec_hook(UNDO_ROW_INSERT) ...@@ -1566,10 +1567,6 @@ prototype_redo_exec_hook(UNDO_ROW_INSERT)
} }
share->state.state.checksum+= ha_checksum_korr(buff); share->state.state.checksum+= ha_checksum_korr(buff);
} }
/**
@todo some bits below will rather be set when executing UNDOs related
to keys
*/
info->s->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED; info->s->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED;
} }
tprint(tracef, " rows' count %lu\n", (ulong)info->s->state.state.records); tprint(tracef, " rows' count %lu\n", (ulong)info->s->state.state.records);
...@@ -1605,8 +1602,8 @@ prototype_redo_exec_hook(UNDO_ROW_DELETE) ...@@ -1605,8 +1602,8 @@ prototype_redo_exec_hook(UNDO_ROW_DELETE)
} }
share->state.state.checksum+= ha_checksum_korr(buff); share->state.state.checksum+= ha_checksum_korr(buff);
} }
share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED | share->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED |
STATE_NOT_OPTIMIZED_ROWS); STATE_NOT_OPTIMIZED_ROWS;
} }
tprint(tracef, " rows' count %lu\n", (ulong)share->state.state.records); tprint(tracef, " rows' count %lu\n", (ulong)share->state.state.records);
_ma_unpin_all_pages(info, rec->lsn); _ma_unpin_all_pages(info, rec->lsn);
...@@ -1743,6 +1740,7 @@ prototype_redo_exec_hook(COMMIT) ...@@ -1743,6 +1740,7 @@ prototype_redo_exec_hook(COMMIT)
{ {
tprint(tracef, "We don't know about transaction with short_trid %u;" tprint(tracef, "We don't know about transaction with short_trid %u;"
"it probably committed long ago, forget it\n", sid); "it probably committed long ago, forget it\n", sid);
bzero(&all_active_trans[sid], sizeof(all_active_trans[sid]));
return 0; return 0;
} }
llstr(long_trid, llbuf); llstr(long_trid, llbuf);
...@@ -1792,6 +1790,7 @@ prototype_redo_exec_hook(CLR_END) ...@@ -1792,6 +1790,7 @@ prototype_redo_exec_hook(CLR_END)
break; break;
case LOGREC_UNDO_ROW_INSERT: case LOGREC_UNDO_ROW_INSERT:
share->state.state.records--; share->state.state.records--;
share->state.changed|= STATE_NOT_OPTIMIZED_ROWS;
row_entry= 1; row_entry= 1;
break; break;
case LOGREC_UNDO_ROW_UPDATE: case LOGREC_UNDO_ROW_UPDATE:
...@@ -1865,7 +1864,8 @@ prototype_undo_exec_hook(UNDO_ROW_INSERT) ...@@ -1865,7 +1864,8 @@ prototype_undo_exec_hook(UNDO_ROW_INSERT)
return 1; return 1;
} }
share= info->s; share= info->s;
share->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED; share->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED |
STATE_NOT_OPTIMIZED_ROWS;
record_ptr= rec->header; record_ptr= rec->header;
if (share->calc_checksum) if (share->calc_checksum)
...@@ -2205,8 +2205,9 @@ static int run_redo_phase(LSN lsn, enum maria_apply_log_way apply) ...@@ -2205,8 +2205,9 @@ static int run_redo_phase(LSN lsn, enum maria_apply_log_way apply)
if (log_desc->record_in_group == LOGREC_IS_GROUP_ITSELF) if (log_desc->record_in_group == LOGREC_IS_GROUP_ITSELF)
{ {
/* /*
can happen if the transaction got a table write error, then Can happen if the transaction got a table write error, then
unlocked tables thus wrote a COMMIT record. unlocked tables thus wrote a COMMIT record. Or can be an
INCOMPLETE_GROUP record written by a previous recovery.
*/ */
tprint(tracef, "\nDiscarding incomplete group before this record\n"); tprint(tracef, "\nDiscarding incomplete group before this record\n");
all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE; all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
...@@ -2677,6 +2678,8 @@ static LSN parse_checkpoint_record(LSN lsn) ...@@ -2677,6 +2678,8 @@ static LSN parse_checkpoint_record(LSN lsn)
tprint(tracef, "%u active transactions\n", nb_active_transactions); tprint(tracef, "%u active transactions\n", nb_active_transactions);
LSN minimum_rec_lsn_of_active_transactions= lsn_korr(ptr); LSN minimum_rec_lsn_of_active_transactions= lsn_korr(ptr);
ptr+= LSN_STORE_SIZE; ptr+= LSN_STORE_SIZE;
max_long_trid= transid_korr(ptr);
ptr+= TRANSID_SIZE;
/* /*
how much brain juice and discussions there was to come to writing this how much brain juice and discussions there was to come to writing this
......
...@@ -104,8 +104,8 @@ int main(int argc, char **argv) ...@@ -104,8 +104,8 @@ int main(int argc, char **argv)
maria_init(); maria_init();
/* /*
If we are doing a repair and we have requested logging (on by default), If we are doing a repair, user may want to store this repair into the log
enable transaction log handling. so that the log has a complete history and can be used to replay.
*/ */
if (opt_transaction_logging && (check_param.testflag & T_REP_ANY) && if (opt_transaction_logging && (check_param.testflag & T_REP_ANY) &&
(ma_control_file_create_or_open() || (ma_control_file_create_or_open() ||
......
...@@ -217,16 +217,19 @@ typedef struct st_maria_file_bitmap ...@@ -217,16 +217,19 @@ typedef struct st_maria_file_bitmap
ulonglong page; /* Page number for current bitmap */ ulonglong page; /* Page number for current bitmap */
uint used_size; /* Size of bitmap head that is not 0 */ uint used_size; /* Size of bitmap head that is not 0 */
my_bool changed; /* 1 if page needs to be flushed */ my_bool changed; /* 1 if page needs to be flushed */
my_bool flushable; /**< If bitmap and log are in sync */
PAGECACHE_FILE file; /* datafile where bitmap is stored */ PAGECACHE_FILE file; /* datafile where bitmap is stored */
#ifdef THREAD #ifdef THREAD
pthread_mutex_t bitmap_lock; pthread_mutex_t bitmap_lock;
pthread_cond_t bitmap_cond; /**< When bitmap becomes flushable */
#endif #endif
/* Constants, allocated when initiating bitmaps */ /* Constants, allocated when initiating bitmaps */
uint sizes[8]; /* Size per bit combination */ uint sizes[8]; /* Size per bit combination */
uint total_size; /* Total usable size of bitmap page */ uint total_size; /* Total usable size of bitmap page */
uint block_size; /* Block size of file */ uint block_size; /* Block size of file */
ulong pages_covered; /* Pages covered by bitmap + 1 */ ulong pages_covered; /* Pages covered by bitmap + 1 */
DYNAMIC_ARRAY pinned_pages; /**< not-yet-flushable bitmap pages */
} MARIA_FILE_BITMAP; } MARIA_FILE_BITMAP;
#define MARIA_CHECKPOINT_LOOKS_AT_ME 1 #define MARIA_CHECKPOINT_LOOKS_AT_ME 1
...@@ -511,7 +514,6 @@ struct st_maria_handler ...@@ -511,7 +514,6 @@ struct st_maria_handler
#define USE_WHOLE_KEY 65535 /* Use whole key in _search() */ #define USE_WHOLE_KEY 65535 /* Use whole key in _search() */
#define F_EXTRA_LCK -1 #define F_EXTRA_LCK -1
#define TRANSID_SIZE 6
/* bits in opt_flag */ /* bits in opt_flag */
#define MEMMAP_USED 32 #define MEMMAP_USED 32
......
...@@ -598,6 +598,7 @@ my_bool trnman_collect_transactions(LEX_STRING *str_act, LEX_STRING *str_com, ...@@ -598,6 +598,7 @@ my_bool trnman_collect_transactions(LEX_STRING *str_act, LEX_STRING *str_com,
pthread_mutex_lock(&LOCK_trn_list); pthread_mutex_lock(&LOCK_trn_list);
str_act->length= 2 + /* number of active transactions */ str_act->length= 2 + /* number of active transactions */
LSN_STORE_SIZE + /* minimum of their rec_lsn */ LSN_STORE_SIZE + /* minimum of their rec_lsn */
TRANSID_SIZE + /* current TrID generator value */
(2 + /* short id */ (2 + /* short id */
6 + /* long id */ 6 + /* long id */
LSN_STORE_SIZE + /* undo_lsn */ LSN_STORE_SIZE + /* undo_lsn */
...@@ -618,6 +619,8 @@ my_bool trnman_collect_transactions(LEX_STRING *str_act, LEX_STRING *str_com, ...@@ -618,6 +619,8 @@ my_bool trnman_collect_transactions(LEX_STRING *str_act, LEX_STRING *str_com,
goto err; goto err;
/* First, the active transactions */ /* First, the active transactions */
ptr= str_act->str + 2 + LSN_STORE_SIZE; ptr= str_act->str + 2 + LSN_STORE_SIZE;
transid_store(ptr, global_trid_generator);
ptr+= TRANSID_SIZE;
for (trn= active_list_min.next; trn != &active_list_max; trn= trn->next) for (trn= active_list_min.next; trn != &active_list_max; trn= trn->next)
{ {
/* /*
......
...@@ -55,6 +55,8 @@ my_bool trnman_has_locked_tables(TRN *trn); ...@@ -55,6 +55,8 @@ my_bool trnman_has_locked_tables(TRN *trn);
void trnman_reset_locked_tables(TRN *trn); void trnman_reset_locked_tables(TRN *trn);
TRN *trnman_recreate_trn_from_recovery(uint16 shortid, TrID longid); TRN *trnman_recreate_trn_from_recovery(uint16 shortid, TrID longid);
TRN *trnman_get_any_trn(); TRN *trnman_get_any_trn();
#define TRANSID_SIZE 6
#define transid_store(dst, id) int6store(dst,id)
#define transid_korr(P) uint6korr(P)
C_MODE_END C_MODE_END
#endif #endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment