Commit 5bcf3a24 authored by Michael Widenius's avatar Michael Widenius

Aria fixes:

- Fixed a bug where we didn't signal a thread waiting for bitmap flush that it's now time to continue which caused a deadlock in Aria.
- Fix for lp:700623 "Aria recovery: ma_blockrec.c:3930: _ma_update_at_original_place: Assertion `block->org_bitmap_value == _ma_bitmap_get_page_bits(info, &info->s->bitmap, page)' failed"
- Fixed a bug in pagecache where a block could change while it was in use.
- In maria_chk set --update-state to on by default so that open_count is cleared if table was ok during check.

storage/maria/ma_bitmap.c:
  Fixed a bug where we didn't signal a thread waiting for bitmap flush that it's now time to continue.
  This fix adds counters for the different conditions that may be waited upon and signals if there is a waiter when the condition changes.
storage/maria/ma_blockrec.c:
  Check if directory if full when calculating what should be in the bitmap.
  Fixes lp:700623 "Aria recovery: ma_blockrec.c:3930: _ma_update_at_original_place: Assertion `block->org_bitmap_value == _ma_bitmap_get_page_bits(info, &info->s->bitmap, page)' failed"
storage/maria/ma_pagecache.c:
  Added more comments
  Removed some duplicated DBUG_PRINT and DBUG_ASSERT()
  find_block() now waits for block to be usable if we are not copying it directly. This fixes a bug where a block changed information while we where using it. Fixed by adding an extra parameter to find_block()
  Simplified code in make_lock_and_pin() as block can never be == 0 here.
storage/maria/ma_recovery.c:
  Reset open_count for tables that are closed in middle of recovery.
storage/maria/maria_chk.c:
  Set --update-state to on by default so that open_count is cleared if table was ok during check.
  Update time when table was recovered/checked if --update-state was used.
  Updated --help message with missing information.
storage/maria/maria_def.h:
  Added wait counters
parent 785695e7
......@@ -235,7 +235,8 @@ my_bool _ma_bitmap_init(MARIA_SHARE *share, File file)
The +1 is to add the bitmap page, as this doesn't have to be covered
*/
bitmap->pages_covered= aligned_bit_blocks * 16 + 1;
bitmap->flush_all_requested= 0;
bitmap->flush_all_requested= bitmap->waiting_for_flush_all_requested=
bitmap->waiting_for_non_flushable= 0;
bitmap->non_flushable= 0;
/* Update size for bits */
......@@ -365,6 +366,7 @@ filter_flush_bitmap_pages(enum pagecache_page_type type
my_bool _ma_bitmap_flush_all(MARIA_SHARE *share)
{
my_bool res= 0;
uint send_signal= 0;
MARIA_FILE_BITMAP *bitmap= &share->bitmap;
DBUG_ENTER("_ma_bitmap_flush_all");
......@@ -374,7 +376,7 @@ my_bool _ma_bitmap_flush_all(MARIA_SHARE *share)
uint len= my_sprintf(buff,
(buff, "bitmap_flush: fd: %d id: %u "
"changed: %d changed_not_flushed: %d "
"flush_all_requsted: %d",
"flush_all_requested: %d",
share->bitmap.file.file,
share->id,
bitmap->changed,
......@@ -389,6 +391,7 @@ my_bool _ma_bitmap_flush_all(MARIA_SHARE *share)
if (bitmap->changed || bitmap->changed_not_flushed)
{
bitmap->flush_all_requested++;
bitmap->waiting_for_non_flushable++;
#ifndef WRONG_BITMAP_FLUSH
while (bitmap->non_flushable > 0)
{
......@@ -396,6 +399,7 @@ my_bool _ma_bitmap_flush_all(MARIA_SHARE *share)
pthread_cond_wait(&bitmap->bitmap_cond, &bitmap->bitmap_lock);
}
#endif
bitmap->waiting_for_non_flushable--;
#ifdef EXTRA_DEBUG_BITMAP
{
char tmp[MAX_BITMAP_INFO_LENGTH];
......@@ -440,10 +444,12 @@ my_bool _ma_bitmap_flush_all(MARIA_SHARE *share)
become false, wake them up.
*/
DBUG_PRINT("info", ("bitmap flusher waking up others"));
if (bitmap->flush_all_requested)
pthread_cond_broadcast(&bitmap->bitmap_cond);
send_signal= (bitmap->waiting_for_flush_all_requested |
bitmap->waiting_for_non_flushable);
}
pthread_mutex_unlock(&bitmap->bitmap_lock);
if (send_signal)
pthread_cond_broadcast(&bitmap->bitmap_cond);
DBUG_RETURN(res);
}
......@@ -473,11 +479,13 @@ void _ma_bitmap_lock(MARIA_SHARE *share)
pthread_mutex_lock(&bitmap->bitmap_lock);
bitmap->flush_all_requested++;
bitmap->waiting_for_non_flushable++;
while (bitmap->non_flushable)
{
DBUG_PRINT("info", ("waiting for bitmap to be flushable"));
pthread_cond_wait(&bitmap->bitmap_cond, &bitmap->bitmap_lock);
}
bitmap->waiting_for_non_flushable--;
/*
Ensure that _ma_bitmap_flush_all() and _ma_bitmap_lock() are blocked.
ma_bitmap_flushable() is blocked thanks to 'flush_all_requested'.
......@@ -497,6 +505,7 @@ void _ma_bitmap_lock(MARIA_SHARE *share)
void _ma_bitmap_unlock(MARIA_SHARE *share)
{
MARIA_FILE_BITMAP *bitmap= &share->bitmap;
uint send_signal;
DBUG_ENTER("_ma_bitmap_unlock");
if (!share->now_transactional)
......@@ -504,10 +513,12 @@ void _ma_bitmap_unlock(MARIA_SHARE *share)
DBUG_ASSERT(bitmap->flush_all_requested > 0 && bitmap->non_flushable == 1);
pthread_mutex_lock(&bitmap->bitmap_lock);
bitmap->flush_all_requested--;
bitmap->non_flushable= 0;
send_signal= bitmap->waiting_for_non_flushable;
if (!--bitmap->flush_all_requested)
send_signal|= bitmap->waiting_for_flush_all_requested;
pthread_mutex_unlock(&bitmap->bitmap_lock);
if (bitmap->flush_all_requested > 0)
if (send_signal)
pthread_cond_broadcast(&bitmap->bitmap_cond);
DBUG_VOID_RETURN;
}
......@@ -2334,7 +2345,7 @@ void _ma_bitmap_flushable(MARIA_HA *info, int non_flushable_inc)
the bitmap's mutex.
*/
_ma_bitmap_unpin_all(share);
if (unlikely(bitmap->flush_all_requested))
if (unlikely(bitmap->waiting_for_non_flushable))
{
DBUG_PRINT("info", ("bitmap flushable waking up flusher"));
pthread_cond_broadcast(&bitmap->bitmap_cond);
......@@ -2347,6 +2358,8 @@ void _ma_bitmap_flushable(MARIA_HA *info, int non_flushable_inc)
}
DBUG_ASSERT(non_flushable_inc == 1);
DBUG_ASSERT(info->non_flushable_state == 0);
bitmap->waiting_for_flush_all_requested++;
while (unlikely(bitmap->flush_all_requested))
{
/*
......@@ -2363,6 +2376,7 @@ void _ma_bitmap_flushable(MARIA_HA *info, int non_flushable_inc)
DBUG_PRINT("info", ("waiting for bitmap flusher"));
pthread_cond_wait(&bitmap->bitmap_cond, &bitmap->bitmap_lock);
}
bitmap->waiting_for_flush_all_requested--;
bitmap->non_flushable++;
DBUG_PRINT("info", ("bitmap->non_flushable: %u", bitmap->non_flushable));
pthread_mutex_unlock(&bitmap->bitmap_lock);
......@@ -2490,7 +2504,7 @@ my_bool _ma_bitmap_release_unused(MARIA_HA *info, MARIA_BITMAP_BLOCKS *blocks)
if (--bitmap->non_flushable == 0)
{
_ma_bitmap_unpin_all(info->s);
if (unlikely(bitmap->flush_all_requested))
if (unlikely(bitmap->waiting_for_non_flushable))
{
DBUG_PRINT("info", ("bitmap flushable waking up flusher"));
pthread_cond_broadcast(&bitmap->bitmap_cond);
......
......@@ -3922,8 +3922,11 @@ static my_bool _ma_update_at_original_place(MARIA_HA *info,
goto err;
block= blocks->block;
block->empty_space= row_pos.empty_space;
block->org_bitmap_value= _ma_free_size_to_head_pattern(&share->bitmap,
org_empty_size);
block->org_bitmap_value=
_ma_free_size_to_head_pattern(&share->bitmap,
(enough_free_entries_on_page(share, buff) ?
org_empty_size : 0));
DBUG_ASSERT(block->org_bitmap_value ==
_ma_bitmap_get_page_bits(info, &info->s->bitmap, page));
block->used|= BLOCKUSED_USE_ORG_BITMAP;
......
......@@ -113,7 +113,7 @@
(uint)((B)->hash_link ? \
(B)->hash_link->requests : \
0), \
block->wlocks, block->rlocks, block->rlocks_queue, \
(B)->wlocks, (B)->rlocks, (B)->rlocks_queue, \
(uint)(B)->pins, (uint)(B)->status, \
page_cache_page_type_str[(B)->type]))
......@@ -153,8 +153,20 @@ struct st_pagecache_hash_link
/* simple states of a block */
#define PCBLOCK_ERROR 1 /* an error occurred when performing disk i/o */
#define PCBLOCK_READ 2 /* the is page in the block buffer */
#define PCBLOCK_IN_SWITCH 4 /* block is preparing to read new page */
#define PCBLOCK_REASSIGNED 8 /* block does not accept requests for old page */
/*
A tread is reading the data to the page. The page is not yet ready
to be used.
*/
#define PCBLOCK_IN_SWITCH 4
/*
Block does not accept new requests for old page that would cause
the page to be pinned or written to.
(Reads that copies the block can still continue).
This state happens when another thread is waiting for readers to finish
to read/write new data to the block.
*/
#define PCBLOCK_REASSIGNED 8
#define PCBLOCK_IN_FLUSH 16 /* block is in flush operation */
#define PCBLOCK_CHANGED 32 /* block buffer contains a dirty page */
#define PCBLOCK_DIRECT_W 64 /* possible direct write to the block */
......@@ -1807,7 +1819,9 @@ static PAGECACHE_HASH_LINK *get_hash_link(PAGECACHE *pagecache,
pageno number of the page in the file
init_hits_left how initialize the block counter for the page
wrmode <-> get for writing
reg_req Register request to thye page
block_is_copied 1 if block will be copied from page cache under
the pagelock mutex.
reg_req Register request to the page
page_st out {PAGE_READ,PAGE_TO_BE_READ,PAGE_WAIT_TO_BE_READ}
RETURN VALUE
......@@ -1836,6 +1850,7 @@ static PAGECACHE_BLOCK_LINK *find_block(PAGECACHE *pagecache,
pgcache_page_no_t pageno,
int init_hits_left,
my_bool wrmode,
my_bool block_is_copied,
my_bool reg_req,
int *page_st)
{
......@@ -1941,7 +1956,7 @@ static PAGECACHE_BLOCK_LINK *find_block(PAGECACHE *pagecache,
Only reading requests can proceed until the old dirty page is flushed,
all others are to be suspended, then resubmitted
*/
if (!wrmode && !(block->status & PCBLOCK_REASSIGNED))
if (!wrmode && block_is_copied && !(block->status & PCBLOCK_REASSIGNED))
{
if (reg_req)
reg_requests(pagecache, block, 1);
......@@ -2135,7 +2150,7 @@ static PAGECACHE_BLOCK_LINK *find_block(PAGECACHE *pagecache,
(my_bool)(block->hash_link ? 1 : 0));
PCBLOCK_INFO(block);
block->status= error ? PCBLOCK_ERROR : 0;
block->error= (int16) my_errno;
block->error= error ? (int16) my_errno : 0;
#ifndef DBUG_OFF
block->type= PAGECACHE_EMPTY_PAGE;
if (error)
......@@ -2536,22 +2551,13 @@ static my_bool make_lock_and_pin(PAGECACHE *pagecache,
my_bool any)
{
DBUG_ENTER("make_lock_and_pin");
DBUG_PRINT("enter", ("block: 0x%lx (%u) lock: %s pin: %s any %d",
(ulong)block, PCBLOCK_NUMBER(pagecache, block),
page_cache_page_lock_str[lock],
page_cache_page_pin_str[pin], (int)any));
PCBLOCK_INFO(block);
DBUG_PRINT("enter", ("block: 0x%lx", (ulong)block));
#ifndef DBUG_OFF
if (block)
{
DBUG_PRINT("enter", ("block: 0x%lx (%u) wrlocks: %u rdlocks: %u "
"rdlocks_q: %u pins: %u lock: %s pin: %s any %d",
(ulong)block, PCBLOCK_NUMBER(pagecache, block),
block->wlocks, block->rlocks, block->rlocks_queue,
block->pins,
page_cache_page_lock_str[lock],
page_cache_page_pin_str[pin], (int)any));
PCBLOCK_INFO(block);
}
#endif
DBUG_ASSERT(block);
DBUG_ASSERT(!any ||
((lock == PAGECACHE_LOCK_LEFT_UNLOCKED) &&
(pin == PAGECACHE_UNPIN)));
......@@ -2619,17 +2625,13 @@ static my_bool make_lock_and_pin(PAGECACHE *pagecache,
DBUG_ASSERT(0); /* Never should happened */
}
#ifndef DBUG_OFF
if (block)
PCBLOCK_INFO(block);
#endif
PCBLOCK_INFO(block);
DBUG_RETURN(0);
retry:
DBUG_PRINT("INFO", ("Retry block 0x%lx", (ulong)block));
PCBLOCK_INFO(block);
DBUG_ASSERT(block->hash_link->requests > 0);
block->hash_link->requests--;
PCBLOCK_INFO(block);
DBUG_RETURN(1);
}
......@@ -2661,7 +2663,6 @@ static void read_block(PAGECACHE *pagecache,
PAGECACHE_BLOCK_LINK *block,
my_bool primary)
{
DBUG_ENTER("read_block");
DBUG_PRINT("enter", ("read block: 0x%lx primary: %d",
(ulong)block, primary));
......@@ -2823,8 +2824,7 @@ void pagecache_unlock(PAGECACHE *pagecache,
page_cache_page_pin_str[pin]));
/* we do not allow any lock/pin increasing here */
DBUG_ASSERT(pin != PAGECACHE_PIN);
DBUG_ASSERT(lock != PAGECACHE_LOCK_READ);
DBUG_ASSERT(lock != PAGECACHE_LOCK_WRITE);
DBUG_ASSERT(lock != PAGECACHE_LOCK_READ && lock != PAGECACHE_LOCK_WRITE);
pagecache_pthread_mutex_lock(&pagecache->cache_lock);
/*
......@@ -2835,7 +2835,7 @@ void pagecache_unlock(PAGECACHE *pagecache,
inc_counter_for_resize_op(pagecache);
/* See NOTE for pagecache_unlock about registering requests */
block= find_block(pagecache, file, pageno, 0, 0,
block= find_block(pagecache, file, pageno, 0, 0, 0,
pin == PAGECACHE_PIN_LEFT_UNPINNED, &page_st);
PCBLOCK_INFO(block);
DBUG_ASSERT(block != 0 && page_st == PAGE_READ);
......@@ -2924,7 +2924,7 @@ void pagecache_unpin(PAGECACHE *pagecache,
inc_counter_for_resize_op(pagecache);
/* See NOTE for pagecache_unlock about registering requests */
block= find_block(pagecache, file, pageno, 0, 0, 0, &page_st);
block= find_block(pagecache, file, pageno, 0, 0, 0, 0, &page_st);
DBUG_ASSERT(block != 0);
DBUG_ASSERT(page_st == PAGE_READ);
/* we can't unpin such page without unlock */
......@@ -3367,7 +3367,7 @@ uchar *pagecache_read(PAGECACHE *pagecache,
/* See NOTE for pagecache_unlock about registering requests. */
reg_request= ((new_pin == PAGECACHE_PIN_LEFT_UNPINNED) ||
(new_pin == PAGECACHE_PIN));
block= find_block(pagecache, file, pageno, level,
block= find_block(pagecache, file, pageno, level, buff != 0,
lock == PAGECACHE_LOCK_WRITE,
reg_request, &page_st);
DBUG_PRINT("info", ("Block type: %s current type %s",
......@@ -3785,7 +3785,6 @@ my_bool pagecache_delete(PAGECACHE *pagecache,
/* See NOTE for pagecache_unlock about registering requests. */
if (pin == PAGECACHE_PIN)
reg_requests(pagecache, block, 1);
DBUG_ASSERT(block != 0);
if (make_lock_and_pin(pagecache, block, lock, pin, FALSE))
{
/*
......@@ -3964,7 +3963,7 @@ my_bool pagecache_write_part(PAGECACHE *pagecache,
reg_request= ((pin == PAGECACHE_PIN_LEFT_UNPINNED) ||
(pin == PAGECACHE_PIN));
block= find_block(pagecache, file, pageno, level,
TRUE,
TRUE, FALSE,
reg_request, &page_st);
if (!block)
{
......
/* Copyright (C) 2006, 2007 MySQL AB
Copyright (C) 2010 Monty Program Ab
Copyright (C) 2010-2011 Monty Program Ab
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
......@@ -1292,6 +1292,21 @@ prototype_redo_exec_hook(FILE_ID)
{
tprint(tracef, " Closing table '%s'\n", info->s->open_file_name.str);
prepare_table_for_close(info, rec->lsn);
/*
Ensure that open count is 1 on close. This is needed as the
table may initially had an open_count > 0 when we initially
opened it as the server may have crashed without closing it
properly. As we now have applied all redo's for the table up to
now, we know the table is ok, so it's safe to reset the open
count to 0.
*/
if (info->s->state.open_count != 0 && info->s->reopen == 1)
{
/* let ma_close() mark the table properly closed */
info->s->state.open_count= 1;
info->s->global_changed= 1;
}
if (maria_close(info))
{
eprint(tracef, "Failed to close table");
......@@ -3412,7 +3427,7 @@ static int close_all_tables(void)
*/
if (info->s->state.open_count != 0)
{
/* let ma_close() mark the table properly closed */
/* let maria_close() mark the table properly closed */
info->s->state.open_count= 1;
info->s->global_changed= 1;
}
......
......@@ -42,7 +42,7 @@ static CHARSET_INFO *set_collation;
static int stopwords_inited= 0;
static MY_TMPDIR maria_chk_tmpdir;
static my_bool opt_transaction_logging, opt_debug, opt_require_control_file;
static my_bool opt_warning_for_wrong_transid;
static my_bool opt_warning_for_wrong_transid, opt_update_state;
static const char *type_names[]=
{
......@@ -348,7 +348,8 @@ static struct my_option my_long_options[] =
"Mark tables as crashed if any errors were found and clean if check didn't "
"find any errors. This allows one to get rid of warnings like 'table not "
"properly closed'",
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
&opt_update_state, &opt_update_state, 0, GET_BOOL, NO_ARG,
1, 0, 0, 0, 0, 0},
{"unpack", 'u',
"Unpack file packed with maria_pack.",
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
......@@ -483,8 +484,12 @@ static void usage(void)
-i, --information Print statistics information about table that is checked.\n\
-m, --medium-check Faster than extend-check, but only finds 99.99% of\n\
all errors. Should be good enough for most cases.\n\
-T, --read-only Don't mark table as checked.\n\
-U, --update-state Mark tables as crashed if you find any errors.\n\
-T, --read-only Don't mark table as checked.\n");
--warning-for-wrong-transaction-id\n\
Give a warning if we find a transaction id in the table that is bigger\n\
than what exists in the control file. Use --skip-... to disable warning\n\
");
puts("\
Recover (repair)/ options (When using '--recover' or '--safe-recover'):\n\
......@@ -856,6 +861,7 @@ static void get_options(register int *argc,register char ***argv)
default_argv= *argv;
if (isatty(fileno(stdout)))
check_param.testflag|=T_WRITE_LOOP;
check_param.testflag= T_UPDATE_STATE;
if ((ho_error=handle_options(argc, argv, my_long_options, get_one_option)))
exit(ho_error);
......@@ -1326,13 +1332,18 @@ static int maria_chk(HA_CHECK *param, char *filename)
(my_bool) !test(param->testflag & T_AUTO_INC));
if (info->update & HA_STATE_CHANGED && ! (param->testflag & T_READONLY))
{
error|=maria_update_state_info(param, info,
UPDATE_OPEN_COUNT |
(((param->testflag & T_REP_ANY) ?
(((param->testflag &
(T_REP_ANY | T_UPDATE_STATE)) ?
UPDATE_TIME : 0) |
(state_updated ? UPDATE_STAT : 0) |
((param->testflag & T_SORT_RECORDS) ?
UPDATE_SORT : 0)));
if (!(param->testflag & T_SILENT))
printf("State updated\n");
}
info->update&= ~HA_STATE_CHANGED;
_ma_reenable_logging_for_table(info, FALSE);
maria_lock_database(info, F_UNLCK);
......@@ -1438,7 +1449,7 @@ static void descript(HA_CHECK *param, register MARIA_HA *info, char *name)
if (share->state.check_time)
{
get_date(buff,1,share->state.check_time);
printf("Recover time: %s\n",buff);
printf("Check/recover time: %s\n",buff);
}
if (share->base.born_transactional)
{
......
......@@ -249,7 +249,9 @@ typedef struct st_maria_file_bitmap
my_bool changed; /* 1 if page needs to be written */
my_bool changed_not_flushed; /* 1 if some bitmap is not flushed */
uint flush_all_requested; /**< If _ma_bitmap_flush_all waiting */
uint waiting_for_flush_all_requested; /* If someone is waiting for above */
uint non_flushable; /**< 0 if bitmap and log are in sync */
uint waiting_for_non_flushable; /* If someone is waiting for above */
PAGECACHE_FILE file; /* datafile where bitmap is stored */
#ifdef THREAD
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment