Commit 5b0afd8a authored by Michael Widenius's avatar Michael Widenius

Bug fixing in Aria:

- Fixed some bugs in recovery of blobs
- Don't ASSERT() on checksum errors when running check table
- Added to maria_read_log option --tables-to-redo=list-of-tables to only recover some tables (good for debugging)


storage/maria/ma_blockrec.c:
  Don't ASSERT() on checksum errors when running check table
  Fixed bug in recovery of blog page that was not in dirty pages list
storage/maria/ma_check.c:
  Don't ASSERT() on checksum errors when running check table
storage/maria/ma_recovery.c:
  Handling of --tables-to-redo
storage/maria/ma_recovery.h:
  Handling of --tables-to-redo
storage/maria/ma_recovery_util.c:
  Give better warning if table was not in dirty pages list
storage/maria/maria_def.h:
  Added in_check_table
storage/maria/maria_read_log.c:
  Added --tables-to-redo=list-of-tables to only recover some tables (good for debugging)
  Cleaned up message when wrong arguments
parent 6b03fbf9
......@@ -4934,7 +4934,7 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record,
goto err;
}
#ifdef EXTRA_DEBUG
if (share->calc_checksum)
if (share->calc_checksum && !info->in_check_table)
{
/* Esnure that row checksum is correct */
DBUG_ASSERT(((share->calc_checksum)(info, record) & 255) ==
......@@ -6705,21 +6705,23 @@ uint _ma_apply_redo_insert_row_blobs(MARIA_HA *info,
uint page_range;
pgcache_page_no_t page, start_page;
uchar *buff;
uint data_on_page= data_size;
start_page= page= page_korr(header);
header+= PAGE_STORE_SIZE;
page_range= pagerange_korr(header);
header+= PAGERANGE_STORE_SIZE;
for (i= page_range; i-- > 0 ; page++)
for (i= page_range; i-- > 0 ; page++, data+= data_on_page)
{
MARIA_PINNED_PAGE page_link;
enum pagecache_page_lock unlock_method;
enum pagecache_page_pin unpin_method;
uint length;
set_if_smaller(first_page2, page);
set_if_bigger(last_page2, page);
if (i == 0 && sub_ranges == 0)
data_on_page= data_size - empty_space; /* data on last page */
if (_ma_redo_not_needed_for_page(sid, redo_lsn, page, FALSE))
continue;
......@@ -6798,19 +6800,16 @@ uint _ma_apply_redo_insert_row_blobs(MARIA_HA *info,
lsn_store(buff, lsn);
buff[PAGE_TYPE_OFFSET]= BLOB_PAGE;
length= data_size;
if (i == 0 && sub_ranges == 0)
if (data_on_page != data_size)
{
/*
Last page may be only partly filled. We zero the rest, like
write_full_pages() does.
*/
length-= empty_space;
bzero(buff + share->block_size - PAGE_SUFFIX_SIZE - empty_space,
empty_space);
}
memcpy(buff+ PAGE_TYPE_OFFSET + 1, data, length);
data+= length;
memcpy(buff+ PAGE_TYPE_OFFSET + 1, data, data_on_page);
if (pagecache_write(share->pagecache,
&info->dfile, page, 0,
buff, PAGECACHE_PLAIN_PAGE,
......
......@@ -2039,6 +2039,8 @@ int maria_chk_data_link(HA_CHECK *param, MARIA_HA *info, my_bool extend)
bzero((char*) param->tmp_key_crc,
share->base.keys * sizeof(param->tmp_key_crc[0]));
info->in_check_table= 1; /* Don't assert on checksum errors */
switch (share->data_file_type) {
case BLOCK_RECORD:
error= check_block_record(param, info, extend, record);
......@@ -2054,6 +2056,8 @@ int maria_chk_data_link(HA_CHECK *param, MARIA_HA *info, my_bool extend)
break;
} /* switch */
info->in_check_table= 0;
if (error)
goto err;
......
......@@ -28,6 +28,7 @@
#include "trnman.h"
#include "ma_key_recover.h"
#include "ma_recovery_util.h"
#include "hash.h"
struct st_trn_for_recovery /* used only in the REDO phase */
{
......@@ -58,6 +59,7 @@ static ulonglong now; /**< for tracking execution time of phases */
static int (*save_error_handler_hook)(uint, const char *,myf);
static uint recovery_warnings; /**< count of warnings */
static uint recovery_found_crashed_tables;
HASH tables_to_redo; /* For maria_read_log */
#define prototype_redo_exec_hook(R) \
static int exec_REDO_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec)
......@@ -184,6 +186,21 @@ static void print_preamble()
}
static my_bool table_is_part_of_recovery_set(LEX_STRING *file_name)
{
uint offset =0;
if (!tables_to_redo.records)
return 1; /* Default, recover table */
/* Skip base directory */
if (file_name->str[0] == '.' &&
(file_name->str[1] == '/' || file_name->str[1] == '\\'))
offset= 2;
/* Only recover if table is in hash */
return my_hash_search(&tables_to_redo, (uchar*) file_name->str + offset,
file_name->length - offset) != 0;
}
/**
@brief Recovers from the last checkpoint.
......@@ -3037,6 +3054,12 @@ static MARIA_HA *get_MARIA_HA_from_REDO_record(const
share= info->s;
tprint(tracef, ", '%s'", share->open_file_name.str);
DBUG_ASSERT(in_redo_phase);
if (!table_is_part_of_recovery_set(&share->open_file_name))
{
tprint(tracef, ", skipped by user\n");
return NULL;
}
if (cmp_translog_addr(rec->lsn, share->lsn_of_file_id) <= 0)
{
/*
......@@ -3070,7 +3093,6 @@ static MARIA_HA *get_MARIA_HA_from_REDO_record(const
REDO_INSERT_ROW_BLOBS will consult list by itself, as it covers several
pages.
*/
tprint(tracef, " page %s", llbuf);
if (_ma_redo_not_needed_for_page(sid, rec->lsn, page,
index_page_redo_entry))
return NULL;
......@@ -3107,6 +3129,13 @@ static MARIA_HA *get_MARIA_HA_from_UNDO_record(const
}
share= info->s;
tprint(tracef, ", '%s'", share->open_file_name.str);
if (!table_is_part_of_recovery_set(&share->open_file_name))
{
tprint(tracef, ", skipped by user\n");
return NULL;
}
if (cmp_translog_addr(rec->lsn, share->lsn_of_file_id) <= 0)
{
tprint(tracef, ", table's LOGREC_FILE_ID has LSN (%lu,0x%lx) more recent"
......
......@@ -30,4 +30,6 @@ int maria_apply_log(LSN lsn, LSN lsn_end, enum maria_apply_log_way apply,
FILE *trace_file,
my_bool execute_undo_phase, my_bool skip_DDLs,
my_bool take_checkpoints, uint *warnings_count);
/* Table of tables to recover */
extern HASH tables_to_redo;
C_MODE_END
......@@ -129,16 +129,20 @@ my_bool _ma_redo_not_needed_for_page(uint16 shortid, LSN lsn,
Next 2 bytes: table's short id
Next 5 bytes: page number
*/
char llbuf[22];
uint64 file_and_page_id=
(((uint64)((index << 16) | shortid)) << 40) | page;
struct st_dirty_page *dirty_page= (struct st_dirty_page *)
hash_search(&all_dirty_pages,
(uchar *)&file_and_page_id, sizeof(file_and_page_id));
DBUG_PRINT("info", ("in dirty pages list: %d", dirty_page != NULL));
DBUG_PRINT("info", ("page %lld in dirty pages list: %d",
(ulonglong) page,
dirty_page != NULL));
if ((dirty_page == NULL) ||
cmp_translog_addr(lsn, dirty_page->rec_lsn) < 0)
{
tprint(tracef, ", ignoring because of dirty_pages list\n");
tprint(tracef, ", ignoring page %s because of dirty_pages list\n",
llstr((ulonglong) page, llbuf));
return TRUE;
}
}
......
......@@ -570,6 +570,7 @@ struct st_maria_handler
my_bool was_locked; /* Was locked in panic */
my_bool append_insert_at_end; /* Set if concurrent insert */
my_bool quick_mode;
my_bool in_check_table; /* We are running check tables */
/* Marker if key_del_changed */
/* If info->keyread_buff can't be used for rnext */
my_bool page_changed;
......
......@@ -213,6 +213,9 @@ static struct my_option my_long_options[] =
{"silent", 's', "Print less information during apply/undo phase",
&opt_silent, &opt_silent, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
{"tables-to-redo", 'T',
"List of tables sepearated with , that we should apply REDO on. Use this if you only want to recover some tables",
0, 0, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"verbose", 'v', "Print more information during apply/undo phase",
&maria_recovery_verbose, &maria_recovery_verbose, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
......@@ -245,7 +248,7 @@ static void print_version(void)
static void usage(void)
{
print_version();
puts("Copyright (C) 2007 MySQL AB");
puts("Copyright (C) 2007 MySQL AB, 2009-2011 Monty Program Ab");
puts("This software comes with ABSOLUTELY NO WARRANTY. This is free software,");
puts("and you are welcome to modify and redistribute it under the GPL license\n");
......@@ -266,10 +269,18 @@ static void usage(void)
#include <help_end.h>
static uchar* my_hash_get_string(const uchar *record, size_t *length,
my_bool first __attribute__ ((unused)))
{
*length= (size_t) (strcend((const char*) record,',')- (const char*) record);
return (uchar*) record;
}
static my_bool
get_one_option(int optid __attribute__((unused)),
const struct my_option *opt __attribute__((unused)),
char *argument __attribute__((unused)))
char *argument)
{
switch (optid) {
case '?':
......@@ -278,6 +289,23 @@ get_one_option(int optid __attribute__((unused)),
case 'V':
print_version();
exit(0);
case 'T':
{
char *pos;
if (!my_hash_inited(&tables_to_redo))
{
my_hash_init2(&tables_to_redo, 16, &my_charset_bin,
16, 0, 0, my_hash_get_string, 0, HASH_UNIQUE);
}
do
{
pos= strcend(argument, ',');
if (pos != argument) /* Skip empty strings */
my_hash_insert(&tables_to_redo, (uchar*) argument);
argument= pos+1;
} while (*(pos++));
break;
}
#ifndef DBUG_OFF
case '#':
DBUG_SET_INITIAL(argument ? argument : default_dbug_option);
......@@ -290,6 +318,7 @@ get_one_option(int optid __attribute__((unused)),
static void get_options(int *argc,char ***argv)
{
int ho_error;
my_bool need_help= 0;
if ((ho_error=handle_options(argc, argv, my_long_options, get_one_option)))
exit(ho_error);
......@@ -297,8 +326,23 @@ static void get_options(int *argc,char ***argv)
if (!opt_apply)
opt_apply_undo= FALSE;
if (((opt_display_only + opt_apply) != 1) || (*argc > 0))
if (*argc > 0)
{
need_help= 1;
fprintf(stderr, "Too many arguments given\n");
}
if ((opt_display_only + opt_apply) != 1)
{
need_help= 1;
fprintf(stderr,
"You must use one and only one of the options 'display-only' or "
"'apply'\n");
}
if (need_help)
{
fflush(stderr);
need_help =1;
usage();
exit(1);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment