Commit 896fb79e authored by Michael Widenius's avatar Michael Widenius

Bug fixing in Aria:

- Fixed some bugs in recovery of blobs
- Don't ASSERT() on checksum errors when running check table
- Added to maria_read_log option --tables-to-redo=list-of-tables to only recover some tables (good for debugging)


storage/maria/ma_blockrec.c:
  Don't ASSERT() on checksum errors when running check table
  Fixed bug in recovery of blog page that was not in dirty pages list
storage/maria/ma_check.c:
  Don't ASSERT() on checksum errors when running check table
storage/maria/ma_recovery.c:
  Handling of --tables-to-redo
storage/maria/ma_recovery.h:
  Handling of --tables-to-redo
storage/maria/ma_recovery_util.c:
  Give better warning if table was not in dirty pages list
storage/maria/maria_def.h:
  Added in_check_table
storage/maria/maria_read_log.c:
  Added --tables-to-redo=list-of-tables to only recover some tables (good for debugging)
  Cleaned up message when wrong arguments
parent 9f76e97e
...@@ -4934,7 +4934,7 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record, ...@@ -4934,7 +4934,7 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record,
goto err; goto err;
} }
#ifdef EXTRA_DEBUG #ifdef EXTRA_DEBUG
if (share->calc_checksum) if (share->calc_checksum && !info->in_check_table)
{ {
/* Esnure that row checksum is correct */ /* Esnure that row checksum is correct */
DBUG_ASSERT(((share->calc_checksum)(info, record) & 255) == DBUG_ASSERT(((share->calc_checksum)(info, record) & 255) ==
...@@ -6705,21 +6705,23 @@ uint _ma_apply_redo_insert_row_blobs(MARIA_HA *info, ...@@ -6705,21 +6705,23 @@ uint _ma_apply_redo_insert_row_blobs(MARIA_HA *info,
uint page_range; uint page_range;
pgcache_page_no_t page, start_page; pgcache_page_no_t page, start_page;
uchar *buff; uchar *buff;
uint data_on_page= data_size;
start_page= page= page_korr(header); start_page= page= page_korr(header);
header+= PAGE_STORE_SIZE; header+= PAGE_STORE_SIZE;
page_range= pagerange_korr(header); page_range= pagerange_korr(header);
header+= PAGERANGE_STORE_SIZE; header+= PAGERANGE_STORE_SIZE;
for (i= page_range; i-- > 0 ; page++) for (i= page_range; i-- > 0 ; page++, data+= data_on_page)
{ {
MARIA_PINNED_PAGE page_link; MARIA_PINNED_PAGE page_link;
enum pagecache_page_lock unlock_method; enum pagecache_page_lock unlock_method;
enum pagecache_page_pin unpin_method; enum pagecache_page_pin unpin_method;
uint length;
set_if_smaller(first_page2, page); set_if_smaller(first_page2, page);
set_if_bigger(last_page2, page); set_if_bigger(last_page2, page);
if (i == 0 && sub_ranges == 0)
data_on_page= data_size - empty_space; /* data on last page */
if (_ma_redo_not_needed_for_page(sid, redo_lsn, page, FALSE)) if (_ma_redo_not_needed_for_page(sid, redo_lsn, page, FALSE))
continue; continue;
...@@ -6798,19 +6800,16 @@ uint _ma_apply_redo_insert_row_blobs(MARIA_HA *info, ...@@ -6798,19 +6800,16 @@ uint _ma_apply_redo_insert_row_blobs(MARIA_HA *info,
lsn_store(buff, lsn); lsn_store(buff, lsn);
buff[PAGE_TYPE_OFFSET]= BLOB_PAGE; buff[PAGE_TYPE_OFFSET]= BLOB_PAGE;
length= data_size; if (data_on_page != data_size)
if (i == 0 && sub_ranges == 0)
{ {
/* /*
Last page may be only partly filled. We zero the rest, like Last page may be only partly filled. We zero the rest, like
write_full_pages() does. write_full_pages() does.
*/ */
length-= empty_space;
bzero(buff + share->block_size - PAGE_SUFFIX_SIZE - empty_space, bzero(buff + share->block_size - PAGE_SUFFIX_SIZE - empty_space,
empty_space); empty_space);
} }
memcpy(buff+ PAGE_TYPE_OFFSET + 1, data, length); memcpy(buff+ PAGE_TYPE_OFFSET + 1, data, data_on_page);
data+= length;
if (pagecache_write(share->pagecache, if (pagecache_write(share->pagecache,
&info->dfile, page, 0, &info->dfile, page, 0,
buff, PAGECACHE_PLAIN_PAGE, buff, PAGECACHE_PLAIN_PAGE,
......
...@@ -2039,6 +2039,8 @@ int maria_chk_data_link(HA_CHECK *param, MARIA_HA *info, my_bool extend) ...@@ -2039,6 +2039,8 @@ int maria_chk_data_link(HA_CHECK *param, MARIA_HA *info, my_bool extend)
bzero((char*) param->tmp_key_crc, bzero((char*) param->tmp_key_crc,
share->base.keys * sizeof(param->tmp_key_crc[0])); share->base.keys * sizeof(param->tmp_key_crc[0]));
info->in_check_table= 1; /* Don't assert on checksum errors */
switch (share->data_file_type) { switch (share->data_file_type) {
case BLOCK_RECORD: case BLOCK_RECORD:
error= check_block_record(param, info, extend, record); error= check_block_record(param, info, extend, record);
...@@ -2054,6 +2056,8 @@ int maria_chk_data_link(HA_CHECK *param, MARIA_HA *info, my_bool extend) ...@@ -2054,6 +2056,8 @@ int maria_chk_data_link(HA_CHECK *param, MARIA_HA *info, my_bool extend)
break; break;
} /* switch */ } /* switch */
info->in_check_table= 0;
if (error) if (error)
goto err; goto err;
......
...@@ -28,6 +28,7 @@ ...@@ -28,6 +28,7 @@
#include "trnman.h" #include "trnman.h"
#include "ma_key_recover.h" #include "ma_key_recover.h"
#include "ma_recovery_util.h" #include "ma_recovery_util.h"
#include "hash.h"
struct st_trn_for_recovery /* used only in the REDO phase */ struct st_trn_for_recovery /* used only in the REDO phase */
{ {
...@@ -58,6 +59,7 @@ static ulonglong now; /**< for tracking execution time of phases */ ...@@ -58,6 +59,7 @@ static ulonglong now; /**< for tracking execution time of phases */
static int (*save_error_handler_hook)(uint, const char *,myf); static int (*save_error_handler_hook)(uint, const char *,myf);
static uint recovery_warnings; /**< count of warnings */ static uint recovery_warnings; /**< count of warnings */
static uint recovery_found_crashed_tables; static uint recovery_found_crashed_tables;
HASH tables_to_redo; /* For maria_read_log */
#define prototype_redo_exec_hook(R) \ #define prototype_redo_exec_hook(R) \
static int exec_REDO_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec) static int exec_REDO_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec)
...@@ -184,6 +186,21 @@ static void print_preamble() ...@@ -184,6 +186,21 @@ static void print_preamble()
} }
static my_bool table_is_part_of_recovery_set(LEX_STRING *file_name)
{
uint offset =0;
if (!tables_to_redo.records)
return 1; /* Default, recover table */
/* Skip base directory */
if (file_name->str[0] == '.' &&
(file_name->str[1] == '/' || file_name->str[1] == '\\'))
offset= 2;
/* Only recover if table is in hash */
return my_hash_search(&tables_to_redo, (uchar*) file_name->str + offset,
file_name->length - offset) != 0;
}
/** /**
@brief Recovers from the last checkpoint. @brief Recovers from the last checkpoint.
...@@ -3037,6 +3054,12 @@ static MARIA_HA *get_MARIA_HA_from_REDO_record(const ...@@ -3037,6 +3054,12 @@ static MARIA_HA *get_MARIA_HA_from_REDO_record(const
share= info->s; share= info->s;
tprint(tracef, ", '%s'", share->open_file_name.str); tprint(tracef, ", '%s'", share->open_file_name.str);
DBUG_ASSERT(in_redo_phase); DBUG_ASSERT(in_redo_phase);
if (!table_is_part_of_recovery_set(&share->open_file_name))
{
tprint(tracef, ", skipped by user\n");
return NULL;
}
if (cmp_translog_addr(rec->lsn, share->lsn_of_file_id) <= 0) if (cmp_translog_addr(rec->lsn, share->lsn_of_file_id) <= 0)
{ {
/* /*
...@@ -3070,7 +3093,6 @@ static MARIA_HA *get_MARIA_HA_from_REDO_record(const ...@@ -3070,7 +3093,6 @@ static MARIA_HA *get_MARIA_HA_from_REDO_record(const
REDO_INSERT_ROW_BLOBS will consult list by itself, as it covers several REDO_INSERT_ROW_BLOBS will consult list by itself, as it covers several
pages. pages.
*/ */
tprint(tracef, " page %s", llbuf);
if (_ma_redo_not_needed_for_page(sid, rec->lsn, page, if (_ma_redo_not_needed_for_page(sid, rec->lsn, page,
index_page_redo_entry)) index_page_redo_entry))
return NULL; return NULL;
...@@ -3107,6 +3129,13 @@ static MARIA_HA *get_MARIA_HA_from_UNDO_record(const ...@@ -3107,6 +3129,13 @@ static MARIA_HA *get_MARIA_HA_from_UNDO_record(const
} }
share= info->s; share= info->s;
tprint(tracef, ", '%s'", share->open_file_name.str); tprint(tracef, ", '%s'", share->open_file_name.str);
if (!table_is_part_of_recovery_set(&share->open_file_name))
{
tprint(tracef, ", skipped by user\n");
return NULL;
}
if (cmp_translog_addr(rec->lsn, share->lsn_of_file_id) <= 0) if (cmp_translog_addr(rec->lsn, share->lsn_of_file_id) <= 0)
{ {
tprint(tracef, ", table's LOGREC_FILE_ID has LSN (%lu,0x%lx) more recent" tprint(tracef, ", table's LOGREC_FILE_ID has LSN (%lu,0x%lx) more recent"
......
...@@ -30,4 +30,6 @@ int maria_apply_log(LSN lsn, LSN lsn_end, enum maria_apply_log_way apply, ...@@ -30,4 +30,6 @@ int maria_apply_log(LSN lsn, LSN lsn_end, enum maria_apply_log_way apply,
FILE *trace_file, FILE *trace_file,
my_bool execute_undo_phase, my_bool skip_DDLs, my_bool execute_undo_phase, my_bool skip_DDLs,
my_bool take_checkpoints, uint *warnings_count); my_bool take_checkpoints, uint *warnings_count);
/* Table of tables to recover */
extern HASH tables_to_redo;
C_MODE_END C_MODE_END
...@@ -129,16 +129,20 @@ my_bool _ma_redo_not_needed_for_page(uint16 shortid, LSN lsn, ...@@ -129,16 +129,20 @@ my_bool _ma_redo_not_needed_for_page(uint16 shortid, LSN lsn,
Next 2 bytes: table's short id Next 2 bytes: table's short id
Next 5 bytes: page number Next 5 bytes: page number
*/ */
char llbuf[22];
uint64 file_and_page_id= uint64 file_and_page_id=
(((uint64)((index << 16) | shortid)) << 40) | page; (((uint64)((index << 16) | shortid)) << 40) | page;
struct st_dirty_page *dirty_page= (struct st_dirty_page *) struct st_dirty_page *dirty_page= (struct st_dirty_page *)
hash_search(&all_dirty_pages, hash_search(&all_dirty_pages,
(uchar *)&file_and_page_id, sizeof(file_and_page_id)); (uchar *)&file_and_page_id, sizeof(file_and_page_id));
DBUG_PRINT("info", ("in dirty pages list: %d", dirty_page != NULL)); DBUG_PRINT("info", ("page %lld in dirty pages list: %d",
(ulonglong) page,
dirty_page != NULL));
if ((dirty_page == NULL) || if ((dirty_page == NULL) ||
cmp_translog_addr(lsn, dirty_page->rec_lsn) < 0) cmp_translog_addr(lsn, dirty_page->rec_lsn) < 0)
{ {
tprint(tracef, ", ignoring because of dirty_pages list\n"); tprint(tracef, ", ignoring page %s because of dirty_pages list\n",
llstr((ulonglong) page, llbuf));
return TRUE; return TRUE;
} }
} }
......
...@@ -570,6 +570,7 @@ struct st_maria_handler ...@@ -570,6 +570,7 @@ struct st_maria_handler
my_bool was_locked; /* Was locked in panic */ my_bool was_locked; /* Was locked in panic */
my_bool append_insert_at_end; /* Set if concurrent insert */ my_bool append_insert_at_end; /* Set if concurrent insert */
my_bool quick_mode; my_bool quick_mode;
my_bool in_check_table; /* We are running check tables */
/* Marker if key_del_changed */ /* Marker if key_del_changed */
/* If info->keyread_buff can't be used for rnext */ /* If info->keyread_buff can't be used for rnext */
my_bool page_changed; my_bool page_changed;
......
...@@ -213,6 +213,9 @@ static struct my_option my_long_options[] = ...@@ -213,6 +213,9 @@ static struct my_option my_long_options[] =
{"silent", 's', "Print less information during apply/undo phase", {"silent", 's', "Print less information during apply/undo phase",
&opt_silent, &opt_silent, 0, &opt_silent, &opt_silent, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0}, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
{"tables-to-redo", 'T',
"List of tables sepearated with , that we should apply REDO on. Use this if you only want to recover some tables",
0, 0, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"verbose", 'v', "Print more information during apply/undo phase", {"verbose", 'v', "Print more information during apply/undo phase",
&maria_recovery_verbose, &maria_recovery_verbose, 0, &maria_recovery_verbose, &maria_recovery_verbose, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0}, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
...@@ -245,7 +248,7 @@ static void print_version(void) ...@@ -245,7 +248,7 @@ static void print_version(void)
static void usage(void) static void usage(void)
{ {
print_version(); print_version();
puts("Copyright (C) 2007 MySQL AB"); puts("Copyright (C) 2007 MySQL AB, 2009-2011 Monty Program Ab");
puts("This software comes with ABSOLUTELY NO WARRANTY. This is free software,"); puts("This software comes with ABSOLUTELY NO WARRANTY. This is free software,");
puts("and you are welcome to modify and redistribute it under the GPL license\n"); puts("and you are welcome to modify and redistribute it under the GPL license\n");
...@@ -266,10 +269,18 @@ static void usage(void) ...@@ -266,10 +269,18 @@ static void usage(void)
#include <help_end.h> #include <help_end.h>
static uchar* my_hash_get_string(const uchar *record, size_t *length,
my_bool first __attribute__ ((unused)))
{
*length= (size_t) (strcend((const char*) record,',')- (const char*) record);
return (uchar*) record;
}
static my_bool static my_bool
get_one_option(int optid __attribute__((unused)), get_one_option(int optid __attribute__((unused)),
const struct my_option *opt __attribute__((unused)), const struct my_option *opt __attribute__((unused)),
char *argument __attribute__((unused))) char *argument)
{ {
switch (optid) { switch (optid) {
case '?': case '?':
...@@ -278,6 +289,23 @@ get_one_option(int optid __attribute__((unused)), ...@@ -278,6 +289,23 @@ get_one_option(int optid __attribute__((unused)),
case 'V': case 'V':
print_version(); print_version();
exit(0); exit(0);
case 'T':
{
char *pos;
if (!my_hash_inited(&tables_to_redo))
{
my_hash_init2(&tables_to_redo, 16, &my_charset_bin,
16, 0, 0, my_hash_get_string, 0, HASH_UNIQUE);
}
do
{
pos= strcend(argument, ',');
if (pos != argument) /* Skip empty strings */
my_hash_insert(&tables_to_redo, (uchar*) argument);
argument= pos+1;
} while (*(pos++));
break;
}
#ifndef DBUG_OFF #ifndef DBUG_OFF
case '#': case '#':
DBUG_SET_INITIAL(argument ? argument : default_dbug_option); DBUG_SET_INITIAL(argument ? argument : default_dbug_option);
...@@ -290,6 +318,7 @@ get_one_option(int optid __attribute__((unused)), ...@@ -290,6 +318,7 @@ get_one_option(int optid __attribute__((unused)),
static void get_options(int *argc,char ***argv) static void get_options(int *argc,char ***argv)
{ {
int ho_error; int ho_error;
my_bool need_help= 0;
if ((ho_error=handle_options(argc, argv, my_long_options, get_one_option))) if ((ho_error=handle_options(argc, argv, my_long_options, get_one_option)))
exit(ho_error); exit(ho_error);
...@@ -297,8 +326,23 @@ static void get_options(int *argc,char ***argv) ...@@ -297,8 +326,23 @@ static void get_options(int *argc,char ***argv)
if (!opt_apply) if (!opt_apply)
opt_apply_undo= FALSE; opt_apply_undo= FALSE;
if (((opt_display_only + opt_apply) != 1) || (*argc > 0)) if (*argc > 0)
{
need_help= 1;
fprintf(stderr, "Too many arguments given\n");
}
if ((opt_display_only + opt_apply) != 1)
{
need_help= 1;
fprintf(stderr,
"You must use one and only one of the options 'display-only' or "
"'apply'\n");
}
if (need_help)
{ {
fflush(stderr);
need_help =1;
usage(); usage();
exit(1); exit(1);
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment