Commit 079aaf43 authored by Michael Widenius's avatar Michael Widenius

Fixed several errors in Aria discovered by test case for lp:727869...

Fixed several errors in Aria discovered by test case for lp:727869 ma_pagecache.c:2103: find_block: Assertion `block->rlocks == 0
- Fixed assert in transaction log handler when aria_check was run on block-record table that was much bigger than expected.
- Fixed warnings about wrong mutex order between bitmap and intern_lock
- Fixed error in bitmap that could cause two rows to use same block for a block record.
- Fixed wrong test that could cause error if last page for a bitmap was used by a blob.
- Fixed several bugs in pagecache for the case where pagecase had very few blocks and there was a lot of threads competing to get the blocks (very unlikely case).


mysql-test/suite/maria/r/maria-recovery3.result:
  Updated results
sql/mysqld.cc:
  Allow mi_check() to send information messages for log file
storage/maria/ma_bitmap.c:
  Fixed problem with wrong mutex order when bitmap was the first page that was flushed out of page cache
  - Fixed by introducing _ma_bitmap_mark_file_changed() that marks file changed without a bitmap lock.
  - Fixed one case in _ma_change_bitmap_page() where we didn't mark the bitmap changed. This could cause to rows to reuse same block if this was the only change to the bitmap.
  - Split _ma_bitmap_get_page_bits() in two parts to not take a bitmap lock when we already have it
  - Fixed bug in _ma_bitmap_set_full_page_bits() that caused an error if last page for a bitmap was used by a blob
storage/maria/ma_check.c:
  Better handling of wrong file length.
  Fixed bug when we tried to write to transaction log when it was not opened (happened when block record file was bigger than expected)
storage/maria/ma_pagecache.c:
  Fixed several bugs in pagecache for the case where pagecase had very few blocks and there was a lot of threads competing to get the blocks:
  - In link_block() mark a block given to another thread with PCBLOCK_REASSIGNED to ensure that no other threads can start re-using the block
    before the thread that requsted a block.
  - In free_block(), don't reset status for a block that is in re-assign by link_block() (we don't want to loose the PCBLOCK_REASSIGNED flag).
  - Added call to wait_for_flush() when we got a new block in find_block() to ensure that we don't use a block that is beeing flushed by another thread.
  - Moved setting of hits_left and last_hit_time in find_block() to where we assign the block.
  
  
  Code cleanup and making code uniform:
  - Changed a lot of KEYCACHE_DBUG_PRINT to use DBUG_PRINT
  - Streamlined all reporting of 'signal' and 'wait' between threads to be identical.
  - Use thread name instead of thread number (for each match against --debug)
  - Added more DBUG_ENTER, DBUG_PRINT and DBUG_ASSERT()
  - Added more comments
storage/myisam/ha_myisam.cc:
  Only print information about that we make a backup if we are really making a backup.
storage/myisam/mi_check.c:
  Inform mysqld that we are creating a backup of the data file (for inclusion in error log).
parent 66136a30
...@@ -78,7 +78,7 @@ ERROR HY000: Lost connection to MySQL server during query ...@@ -78,7 +78,7 @@ ERROR HY000: Lost connection to MySQL server during query
* recovery happens * recovery happens
check table t1 extended; check table t1 extended;
Table Op Msg_type Msg_text Table Op Msg_type Msg_text
mysqltest.t1 check warning Size of indexfile is: 372 Should be: 8192 mysqltest.t1 check warning Size of indexfile is: 372 Expected: 8192
mysqltest.t1 check status OK mysqltest.t1 check status OK
* testing that checksum after recovery is as expected * testing that checksum after recovery is as expected
Checksum-check Checksum-check
......
...@@ -3048,7 +3048,8 @@ int my_message_sql(uint error, const char *str, myf MyFlags) ...@@ -3048,7 +3048,8 @@ int my_message_sql(uint error, const char *str, myf MyFlags)
{ {
/* At least, prevent new abuse ... */ /* At least, prevent new abuse ... */
DBUG_ASSERT(strncmp(str, "MyISAM table", 12) == 0 || DBUG_ASSERT(strncmp(str, "MyISAM table", 12) == 0 ||
strncmp(str, "Aria table", 11) == 0); strncmp(str, "Aria table", 11) == 0 ||
(MyFlags & ME_JUST_INFO));
error= ER_UNKNOWN_ERROR; error= ER_UNKNOWN_ERROR;
} }
......
...@@ -310,6 +310,31 @@ my_bool _ma_bitmap_end(MARIA_SHARE *share) ...@@ -310,6 +310,31 @@ my_bool _ma_bitmap_end(MARIA_SHARE *share)
return res; return res;
} }
/*
Ensure that we have incremented open count before we try to read/write
a page while we have the bitmap lock.
This is needed to ensure that we don't call _ma_mark_file_changed() as
part of flushing a page to disk, as this locks share->internal_lock
and then mutex lock would happen in the wrong order.
*/
static inline void _ma_bitmap_mark_file_changed(MARIA_SHARE *share)
{
/*
It's extremely unlikely that the following test is true as it
only happens once if the table has changed.
*/
if (unlikely(!share->global_changed &&
(share->state.changed & STATE_CHANGED)))
{
/* purecov: begin inspected */
/* unlock mutex as it can't be hold during _ma_mark_file_changed() */
pthread_mutex_unlock(&share->bitmap.bitmap_lock);
_ma_mark_file_changed(share);
pthread_mutex_lock(&share->bitmap.bitmap_lock);
/* purecov: end */
}
}
/* /*
Send updated bitmap to the page cache Send updated bitmap to the page cache
...@@ -413,24 +438,13 @@ my_bool _ma_bitmap_flush_all(MARIA_SHARE *share) ...@@ -413,24 +438,13 @@ my_bool _ma_bitmap_flush_all(MARIA_SHARE *share)
DBUG_RETURN(0); DBUG_RETURN(0);
} }
_ma_bitmap_mark_file_changed(share);
/* /*
Before flusing bitmap, ensure that we have incremented open count. The following should be true as it was tested above. We have to test
This is needed to ensure that we don't call this again as _ma_bitmap_mark_file_changed() did temporarly release
_ma_mark_file_changed() as part of flushing bitmap page as in this the bitmap mutex.
case we would use mutex lock in wrong order.
It's extremely unlikely that the following test is true as normally
this is happening when table is flushed.
*/ */
if (unlikely(!share->global_changed))
{
/* purecov: begin inspected */
/* unlock bitmap mutex as it can't be hold during _ma_mark_file_changed */
pthread_mutex_unlock(&bitmap->bitmap_lock);
_ma_mark_file_changed(share);
pthread_mutex_lock(&bitmap->bitmap_lock);
/* purecov: end */
}
if (bitmap->changed || bitmap->changed_not_flushed) if (bitmap->changed || bitmap->changed_not_flushed)
{ {
bitmap->flush_all_requested++; bitmap->flush_all_requested++;
...@@ -1010,6 +1024,8 @@ static my_bool _ma_change_bitmap_page(MARIA_HA *info, ...@@ -1010,6 +1024,8 @@ static my_bool _ma_change_bitmap_page(MARIA_HA *info,
{ {
DBUG_ENTER("_ma_change_bitmap_page"); DBUG_ENTER("_ma_change_bitmap_page");
_ma_bitmap_mark_file_changed(info->s);
if (bitmap->changed) if (bitmap->changed)
{ {
if (write_changed_bitmap(info->s, bitmap)) if (write_changed_bitmap(info->s, bitmap))
...@@ -1447,10 +1463,7 @@ static ulong allocate_full_pages(MARIA_FILE_BITMAP *bitmap, ...@@ -1447,10 +1463,7 @@ static ulong allocate_full_pages(MARIA_FILE_BITMAP *bitmap,
best_prefix_bits|= tmp; best_prefix_bits|= tmp;
int6store(best_data, best_prefix_bits); int6store(best_data, best_prefix_bits);
if (!(best_area_size-= best_prefix_area_size)) if (!(best_area_size-= best_prefix_area_size))
{ goto end;
DBUG_EXECUTE("bitmap", _ma_print_bitmap_changes(bitmap););
DBUG_RETURN(block->page_count);
}
best_data+= 6; best_data+= 6;
} }
best_area_size*= 3; /* Bits to set */ best_area_size*= 3; /* Bits to set */
...@@ -1468,6 +1481,7 @@ static ulong allocate_full_pages(MARIA_FILE_BITMAP *bitmap, ...@@ -1468,6 +1481,7 @@ static ulong allocate_full_pages(MARIA_FILE_BITMAP *bitmap,
bitmap->used_size= (uint) (best_data - bitmap->map); bitmap->used_size= (uint) (best_data - bitmap->map);
DBUG_ASSERT(bitmap->used_size <= bitmap->total_size); DBUG_ASSERT(bitmap->used_size <= bitmap->total_size);
} }
end:
bitmap->changed= 1; bitmap->changed= 1;
DBUG_EXECUTE("bitmap", _ma_print_bitmap_changes(bitmap);); DBUG_EXECUTE("bitmap", _ma_print_bitmap_changes(bitmap););
DBUG_RETURN(block->page_count); DBUG_RETURN(block->page_count);
...@@ -2142,7 +2156,7 @@ static my_bool set_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap, ...@@ -2142,7 +2156,7 @@ static my_bool set_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap,
Get bitmap pattern for a given page Get bitmap pattern for a given page
SYNOPSIS SYNOPSIS
get_page_bits() bitmap_get_page_bits()
info Maria handler info Maria handler
bitmap Bitmap handler bitmap Bitmap handler
page Page number page Page number
...@@ -2152,8 +2166,8 @@ static my_bool set_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap, ...@@ -2152,8 +2166,8 @@ static my_bool set_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap,
~0 Error (couldn't read page) ~0 Error (couldn't read page)
*/ */
uint _ma_bitmap_get_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap, static uint bitmap_get_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap,
pgcache_page_no_t page) pgcache_page_no_t page)
{ {
pgcache_page_no_t bitmap_page; pgcache_page_no_t bitmap_page;
uint offset_page, offset, tmp; uint offset_page, offset, tmp;
...@@ -2179,6 +2193,19 @@ uint _ma_bitmap_get_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap, ...@@ -2179,6 +2193,19 @@ uint _ma_bitmap_get_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap,
} }
/* As above, but take a lock while getting the data */
uint _ma_bitmap_get_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap,
pgcache_page_no_t page)
{
uint tmp;
pthread_mutex_lock(&bitmap->bitmap_lock);
tmp= bitmap_get_page_bits(info, bitmap, page);
pthread_mutex_unlock(&bitmap->bitmap_lock);
return tmp;
}
/* /*
Mark all pages in a region as free Mark all pages in a region as free
...@@ -2258,6 +2285,7 @@ my_bool _ma_bitmap_reset_full_page_bits(MARIA_HA *info, ...@@ -2258,6 +2285,7 @@ my_bool _ma_bitmap_reset_full_page_bits(MARIA_HA *info,
DBUG_RETURN(0); DBUG_RETURN(0);
} }
/* /*
Set all pages in a region as used Set all pages in a region as used
...@@ -2290,7 +2318,7 @@ my_bool _ma_bitmap_set_full_page_bits(MARIA_HA *info, ...@@ -2290,7 +2318,7 @@ my_bool _ma_bitmap_set_full_page_bits(MARIA_HA *info,
bitmap_page= page - page % bitmap->pages_covered; bitmap_page= page - page % bitmap->pages_covered;
if (page == bitmap_page || if (page == bitmap_page ||
page + page_count >= bitmap_page + bitmap->pages_covered) page + page_count > bitmap_page + bitmap->pages_covered)
{ {
DBUG_ASSERT(0); /* Wrong in data */ DBUG_ASSERT(0); /* Wrong in data */
DBUG_RETURN(1); DBUG_RETURN(1);
...@@ -2494,7 +2522,7 @@ my_bool _ma_bitmap_release_unused(MARIA_HA *info, MARIA_BITMAP_BLOCKS *blocks) ...@@ -2494,7 +2522,7 @@ my_bool _ma_bitmap_release_unused(MARIA_HA *info, MARIA_BITMAP_BLOCKS *blocks)
else else
{ {
DBUG_ASSERT(current_bitmap_value == DBUG_ASSERT(current_bitmap_value ==
_ma_bitmap_get_page_bits(info, bitmap, block->page)); bitmap_get_page_bits(info, bitmap, block->page));
} }
/* Handle all full pages and tail pages (for head page and blob) */ /* Handle all full pages and tail pages (for head page and blob) */
...@@ -2526,7 +2554,7 @@ my_bool _ma_bitmap_release_unused(MARIA_HA *info, MARIA_BITMAP_BLOCKS *blocks) ...@@ -2526,7 +2554,7 @@ my_bool _ma_bitmap_release_unused(MARIA_HA *info, MARIA_BITMAP_BLOCKS *blocks)
to not set the bits to the same value as before. to not set the bits to the same value as before.
*/ */
DBUG_ASSERT(current_bitmap_value == DBUG_ASSERT(current_bitmap_value ==
_ma_bitmap_get_page_bits(info, bitmap, block->page)); bitmap_get_page_bits(info, bitmap, block->page));
if (bits != current_bitmap_value) if (bits != current_bitmap_value)
{ {
......
...@@ -412,12 +412,13 @@ int maria_chk_size(HA_CHECK *param, register MARIA_HA *info) ...@@ -412,12 +412,13 @@ int maria_chk_size(HA_CHECK *param, register MARIA_HA *info)
{ {
error=1; error=1;
_ma_check_print_error(param, _ma_check_print_error(param,
"Size of indexfile is: %-8s Should be: %s", "Size of indexfile is: %-8s Expected: %s",
llstr(size,buff), llstr(skr,buff2)); llstr(size,buff), llstr(skr,buff2));
share->state.state.key_file_length= size;
} }
else if (!(param->testflag & T_VERY_SILENT)) else if (!(param->testflag & T_VERY_SILENT))
_ma_check_print_warning(param, _ma_check_print_warning(param,
"Size of indexfile is: %-8s Should be: %s", "Size of indexfile is: %-8s Expected: %s",
llstr(size,buff), llstr(skr,buff2)); llstr(size,buff), llstr(skr,buff2));
} }
if (!(param->testflag & T_VERY_SILENT) && if (!(param->testflag & T_VERY_SILENT) &&
...@@ -439,18 +440,18 @@ int maria_chk_size(HA_CHECK *param, register MARIA_HA *info) ...@@ -439,18 +440,18 @@ int maria_chk_size(HA_CHECK *param, register MARIA_HA *info)
#endif #endif
if (skr != size) if (skr != size)
{ {
share->state.state.data_file_length=size; /* Skip other errors */
if (skr > size && skr != size + MEMMAP_EXTRA_MARGIN) if (skr > size && skr != size + MEMMAP_EXTRA_MARGIN)
{ {
share->state.state.data_file_length=size; /* Skip other errors */
error=1; error=1;
_ma_check_print_error(param,"Size of datafile is: %-9s Should be: %s", _ma_check_print_error(param,"Size of datafile is: %-9s Expected: %s",
llstr(size,buff), llstr(skr,buff2)); llstr(size,buff), llstr(skr,buff2));
param->testflag|=T_RETRY_WITHOUT_QUICK; param->testflag|=T_RETRY_WITHOUT_QUICK;
} }
else else
{ {
_ma_check_print_warning(param, _ma_check_print_warning(param,
"Size of datafile is: %-9s Should be: %s", "Size of datafile is: %-9s Expected: %s",
llstr(size,buff), llstr(skr,buff2)); llstr(size,buff), llstr(skr,buff2));
} }
} }
...@@ -1801,7 +1802,7 @@ static int check_block_record(HA_CHECK *param, MARIA_HA *info, int extend, ...@@ -1801,7 +1802,7 @@ static int check_block_record(HA_CHECK *param, MARIA_HA *info, int extend,
char llbuff[22], llbuff2[22]; char llbuff[22], llbuff2[22];
uint block_size= share->block_size; uint block_size= share->block_size;
ha_rows full_page_count, tail_count; ha_rows full_page_count, tail_count;
my_bool full_dir; my_bool full_dir, now_transactional;
uint offset_page, offset, free_count; uint offset_page, offset, free_count;
LINT_INIT(full_dir); LINT_INIT(full_dir);
...@@ -1812,6 +1813,10 @@ static int check_block_record(HA_CHECK *param, MARIA_HA *info, int extend, ...@@ -1812,6 +1813,10 @@ static int check_block_record(HA_CHECK *param, MARIA_HA *info, int extend,
my_errno); my_errno);
return 1; return 1;
} }
now_transactional= info->s->now_transactional;
info->s->now_transactional= 0; /* Don't log changes */
bitmap_buff= info->scan.bitmap_buff; bitmap_buff= info->scan.bitmap_buff;
page_buff= info->scan.page_buff; page_buff= info->scan.page_buff;
full_page_count= tail_count= 0; full_page_count= tail_count= 0;
...@@ -1831,7 +1836,8 @@ static int check_block_record(HA_CHECK *param, MARIA_HA *info, int extend, ...@@ -1831,7 +1836,8 @@ static int check_block_record(HA_CHECK *param, MARIA_HA *info, int extend,
if (_ma_killed_ptr(param)) if (_ma_killed_ptr(param))
{ {
_ma_scan_end_block_record(info); _ma_scan_end_block_record(info);
return -1; info->s->now_transactional= now_transactional;
return -1; /* Interrupted */
} }
if ((page % share->bitmap.pages_covered) == 0) if ((page % share->bitmap.pages_covered) == 0)
{ {
...@@ -1999,10 +2005,12 @@ static int check_block_record(HA_CHECK *param, MARIA_HA *info, int extend, ...@@ -1999,10 +2005,12 @@ static int check_block_record(HA_CHECK *param, MARIA_HA *info, int extend,
llstr(param->tail_count, llbuff), llstr(param->tail_count, llbuff),
llstr(tail_count, llbuff2)); llstr(tail_count, llbuff2));
info->s->now_transactional= now_transactional;
return param->error_printed != 0; return param->error_printed != 0;
err: err:
_ma_scan_end_block_record(info); _ma_scan_end_block_record(info);
info->s->now_transactional= now_transactional;
return 1; return 1;
} }
......
...@@ -494,6 +494,7 @@ static my_bool info_check_lock(PAGECACHE_BLOCK_LINK *block, ...@@ -494,6 +494,7 @@ static my_bool info_check_lock(PAGECACHE_BLOCK_LINK *block,
#define FLUSH_CACHE 2000 /* sort this many blocks at once */ #define FLUSH_CACHE 2000 /* sort this many blocks at once */
static void free_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block); static void free_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block);
static void unlink_hash(PAGECACHE *pagecache, PAGECACHE_HASH_LINK *hash_link);
#ifndef DBUG_OFF #ifndef DBUG_OFF
static void test_key_cache(PAGECACHE *pagecache, static void test_key_cache(PAGECACHE *pagecache,
const char *where, my_bool lock); const char *where, my_bool lock);
...@@ -540,7 +541,7 @@ static void pagecache_debug_print _VARARGS((const char *fmt, ...)); ...@@ -540,7 +541,7 @@ static void pagecache_debug_print _VARARGS((const char *fmt, ...));
#define KEYCACHE_DBUG_ASSERT(a) \ #define KEYCACHE_DBUG_ASSERT(a) \
{ if (! (a) && pagecache_debug_log) \ { if (! (a) && pagecache_debug_log) \
fclose(pagecache_debug_log); \ fclose(pagecache_debug_log); \
assert(a); } DBUG_ASSERT(a); }
#else #else
#define KEYCACHE_PRINT(l, m) #define KEYCACHE_PRINT(l, m)
#define KEYCACHE_DBUG_PRINT(l, m) DBUG_PRINT(l, m) #define KEYCACHE_DBUG_PRINT(l, m) DBUG_PRINT(l, m)
...@@ -1030,8 +1031,7 @@ ulong resize_pagecache(PAGECACHE *pagecache, ...@@ -1030,8 +1031,7 @@ ulong resize_pagecache(PAGECACHE *pagecache,
#ifdef THREAD #ifdef THREAD
while (pagecache->cnt_for_resize_op) while (pagecache->cnt_for_resize_op)
{ {
KEYCACHE_DBUG_PRINT("resize_pagecache: wait", DBUG_PRINT("wait", ("suspend thread %s %ld", thread->name, thread->id));
("suspend thread %ld", thread->id));
pagecache_pthread_cond_wait(&thread->suspend, &pagecache->cache_lock); pagecache_pthread_cond_wait(&thread->suspend, &pagecache->cache_lock);
} }
#else #else
...@@ -1050,8 +1050,9 @@ ulong resize_pagecache(PAGECACHE *pagecache, ...@@ -1050,8 +1050,9 @@ ulong resize_pagecache(PAGECACHE *pagecache,
/* Signal for the next resize request to proceeed if any */ /* Signal for the next resize request to proceeed if any */
if (wqueue->last_thread) if (wqueue->last_thread)
{ {
KEYCACHE_DBUG_PRINT("resize_pagecache: signal", DBUG_PRINT("signal",
("thread %ld", wqueue->last_thread->next->id)); ("thread %s %ld", wqueue->last_thread->next->name,
wqueue->last_thread->next->id));
pagecache_pthread_cond_signal(&wqueue->last_thread->next->suspend); pagecache_pthread_cond_signal(&wqueue->last_thread->next->suspend);
} }
#endif #endif
...@@ -1075,6 +1076,7 @@ static inline void inc_counter_for_resize_op(PAGECACHE *pagecache) ...@@ -1075,6 +1076,7 @@ static inline void inc_counter_for_resize_op(PAGECACHE *pagecache)
Decrement counter blocking resize key cache operation; Decrement counter blocking resize key cache operation;
Signal the operation to proceed when counter becomes equal zero Signal the operation to proceed when counter becomes equal zero
*/ */
static inline void dec_counter_for_resize_op(PAGECACHE *pagecache) static inline void dec_counter_for_resize_op(PAGECACHE *pagecache)
{ {
#ifdef THREAD #ifdef THREAD
...@@ -1083,8 +1085,9 @@ static inline void dec_counter_for_resize_op(PAGECACHE *pagecache) ...@@ -1083,8 +1085,9 @@ static inline void dec_counter_for_resize_op(PAGECACHE *pagecache)
if (!--pagecache->cnt_for_resize_op && if (!--pagecache->cnt_for_resize_op &&
(last_thread= pagecache->resize_queue.last_thread)) (last_thread= pagecache->resize_queue.last_thread))
{ {
KEYCACHE_DBUG_PRINT("dec_counter_for_resize_op: signal", DBUG_PRINT("signal",
("thread %ld", last_thread->next->id)); ("thread %s %ld", last_thread->next->name,
last_thread->next->id));
pagecache_pthread_cond_signal(&last_thread->next->suspend); pagecache_pthread_cond_signal(&last_thread->next->suspend);
} }
#else #else
...@@ -1322,6 +1325,7 @@ static void link_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block, ...@@ -1322,6 +1325,7 @@ static void link_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block,
{ {
PAGECACHE_BLOCK_LINK *ins; PAGECACHE_BLOCK_LINK *ins;
PAGECACHE_BLOCK_LINK **ptr_ins; PAGECACHE_BLOCK_LINK **ptr_ins;
DBUG_ENTER("link_block");
PCBLOCK_INFO(block); PCBLOCK_INFO(block);
KEYCACHE_DBUG_ASSERT(! (block->hash_link && block->hash_link->requests)); KEYCACHE_DBUG_ASSERT(! (block->hash_link && block->hash_link->requests));
...@@ -1336,6 +1340,11 @@ static void link_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block, ...@@ -1336,6 +1340,11 @@ static void link_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block,
PAGECACHE_HASH_LINK *hash_link= PAGECACHE_HASH_LINK *hash_link=
(PAGECACHE_HASH_LINK *) first_thread->opt_info; (PAGECACHE_HASH_LINK *) first_thread->opt_info;
struct st_my_thread_var *thread; struct st_my_thread_var *thread;
DBUG_ASSERT(block->requests + block->wlocks + block->rlocks +
block->pins == 0);
DBUG_ASSERT(block->next_used == NULL);
do do
{ {
thread= next_thread; thread= next_thread;
...@@ -1346,7 +1355,7 @@ static void link_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block, ...@@ -1346,7 +1355,7 @@ static void link_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block,
*/ */
if ((PAGECACHE_HASH_LINK *) thread->opt_info == hash_link) if ((PAGECACHE_HASH_LINK *) thread->opt_info == hash_link)
{ {
KEYCACHE_DBUG_PRINT("link_block: signal", ("thread: %ld", thread->id)); DBUG_PRINT("signal", ("thread: %s %ld", thread->name, thread->id));
pagecache_pthread_cond_signal(&thread->suspend); pagecache_pthread_cond_signal(&thread->suspend);
wqueue_unlink_from_queue(&pagecache->waiting_for_block, thread); wqueue_unlink_from_queue(&pagecache->waiting_for_block, thread);
block->requests++; block->requests++;
...@@ -1354,14 +1363,17 @@ static void link_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block, ...@@ -1354,14 +1363,17 @@ static void link_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block,
} }
while (thread != last_thread); while (thread != last_thread);
hash_link->block= block; hash_link->block= block;
KEYCACHE_THREAD_TRACE("link_block: after signaling"); /* Ensure that no other thread tries to use this block */
block->status|= PCBLOCK_REASSIGNED;
DBUG_PRINT("signal", ("after signal"));
#if defined(PAGECACHE_DEBUG) #if defined(PAGECACHE_DEBUG)
KEYCACHE_DBUG_PRINT("link_block", KEYCACHE_DBUG_PRINT("link_block",
("linked,unlinked block: %u status: %x #requests: %u #available: %u", ("linked,unlinked block: %u status: %x #requests: %u #available: %u",
PCBLOCK_NUMBER(pagecache, block), block->status, PCBLOCK_NUMBER(pagecache, block), block->status,
block->requests, pagecache->blocks_available)); block->requests, pagecache->blocks_available));
#endif #endif
return; DBUG_VOID_RETURN;
} }
#else /* THREAD */ #else /* THREAD */
KEYCACHE_DBUG_ASSERT(! (!hot && pagecache->waiting_for_block.last_thread)); KEYCACHE_DBUG_ASSERT(! (!hot && pagecache->waiting_for_block.last_thread));
...@@ -1381,8 +1393,6 @@ static void link_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block, ...@@ -1381,8 +1393,6 @@ static void link_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block,
else else
{ {
/* The LRU chain is empty */ /* The LRU chain is empty */
/* QQ: Ask sanja if next line is correct; Should we really put block
in both chain if one chain is empty ? */
pagecache->used_last= pagecache->used_ins= block->next_used= block; pagecache->used_last= pagecache->used_ins= block->next_used= block;
block->prev_used= &block->next_used; block->prev_used= &block->next_used;
} }
...@@ -1396,6 +1406,7 @@ static void link_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block, ...@@ -1396,6 +1406,7 @@ static void link_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block,
KEYCACHE_DBUG_ASSERT((ulong) pagecache->blocks_available <= KEYCACHE_DBUG_ASSERT((ulong) pagecache->blocks_available <=
pagecache->blocks_used); pagecache->blocks_used);
#endif #endif
DBUG_VOID_RETURN;
} }
...@@ -1404,7 +1415,7 @@ static void link_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block, ...@@ -1404,7 +1415,7 @@ static void link_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block,
SYNOPSIS SYNOPSIS
unlink_block() unlink_block()
pagecache pointer to a page cache data structure pagecache pointer to a page cache data structure
block pointer to the block to unlink from the LRU chain block pointer to the block to unlink from the LRU chain
RETURN VALUE RETURN VALUE
...@@ -1511,7 +1522,7 @@ static void unreg_request(PAGECACHE *pagecache, ...@@ -1511,7 +1522,7 @@ static void unreg_request(PAGECACHE *pagecache,
PAGECACHE_BLOCK_LINK *block, int at_end) PAGECACHE_BLOCK_LINK *block, int at_end)
{ {
DBUG_ENTER("unreg_request"); DBUG_ENTER("unreg_request");
DBUG_PRINT("enter", ("block 0x%lx (%u) status: %x reqs: %u", DBUG_PRINT("enter", ("block 0x%lx (%u) status: %x requests: %u",
(ulong)block, PCBLOCK_NUMBER(pagecache, block), (ulong)block, PCBLOCK_NUMBER(pagecache, block),
block->status, block->requests)); block->status, block->requests));
PCBLOCK_INFO(block); PCBLOCK_INFO(block);
...@@ -1580,18 +1591,23 @@ static inline void remove_reader(PAGECACHE_BLOCK_LINK *block) ...@@ -1580,18 +1591,23 @@ static inline void remove_reader(PAGECACHE_BLOCK_LINK *block)
static inline void wait_for_readers(PAGECACHE *pagecache static inline void wait_for_readers(PAGECACHE *pagecache
__attribute__((unused)), __attribute__((unused)),
PAGECACHE_BLOCK_LINK *block) PAGECACHE_BLOCK_LINK *block
__attribute__((unused)))
{ {
#ifdef THREAD #ifdef THREAD
struct st_my_thread_var *thread= my_thread_var; struct st_my_thread_var *thread= my_thread_var;
DBUG_ASSERT(block->condvar == NULL);
while (block->hash_link->requests) while (block->hash_link->requests)
{ {
KEYCACHE_DBUG_PRINT("wait_for_readers: wait", DBUG_ENTER("wait_for_readers");
("suspend thread: %ld block: %u", DBUG_PRINT("wait",
thread->id, PCBLOCK_NUMBER(pagecache, block))); ("suspend thread: %s %ld block: %u",
thread->name, thread->id,
PCBLOCK_NUMBER(pagecache, block)));
block->condvar= &thread->suspend; block->condvar= &thread->suspend;
pagecache_pthread_cond_wait(&thread->suspend, &pagecache->cache_lock); pagecache_pthread_cond_wait(&thread->suspend, &pagecache->cache_lock);
block->condvar= NULL; block->condvar= NULL;
DBUG_VOID_RETURN;
} }
#else #else
KEYCACHE_DBUG_ASSERT(block->hash_link->requests == 0); KEYCACHE_DBUG_ASSERT(block->hash_link->requests == 0);
...@@ -1599,6 +1615,30 @@ static inline void wait_for_readers(PAGECACHE *pagecache ...@@ -1599,6 +1615,30 @@ static inline void wait_for_readers(PAGECACHE *pagecache
} }
/*
Wait until the flush of the page is done.
*/
static void wait_for_flush(PAGECACHE *pagecache
__attribute__((unused)),
PAGECACHE_BLOCK_LINK *block
__attribute__((unused)))
{
DBUG_ENTER("wait_for_flush");
struct st_my_thread_var *thread= my_thread_var;
wqueue_add_to_queue(&block->wqueue[COND_FOR_SAVED], thread);
do
{
DBUG_PRINT("wait",
("suspend thread %s %ld", thread->name, thread->id));
pagecache_pthread_cond_wait(&thread->suspend,
&pagecache->cache_lock);
}
while(thread->next);
DBUG_VOID_RETURN;
}
/* /*
Add a hash link to a bucket in the hash_table Add a hash link to a bucket in the hash_table
*/ */
...@@ -1620,10 +1660,14 @@ static inline void link_hash(PAGECACHE_HASH_LINK **start, ...@@ -1620,10 +1660,14 @@ static inline void link_hash(PAGECACHE_HASH_LINK **start,
static void unlink_hash(PAGECACHE *pagecache, PAGECACHE_HASH_LINK *hash_link) static void unlink_hash(PAGECACHE *pagecache, PAGECACHE_HASH_LINK *hash_link)
{ {
KEYCACHE_DBUG_PRINT("unlink_hash", ("fd: %u pos_ %lu #requests=%u", DBUG_ENTER("unlink_hash");
(uint) hash_link->file.file, (ulong) hash_link->pageno, DBUG_PRINT("enter", ("hash_link: %p fd: %u pos: %lu requests: %u",
hash_link->requests)); hash_link, (uint) hash_link->file.file,
KEYCACHE_DBUG_ASSERT(hash_link->requests == 0); (ulong) hash_link->pageno,
hash_link->requests));
DBUG_ASSERT(hash_link->requests == 0);
DBUG_ASSERT(!hash_link->block || hash_link->block->pins == 0);
if ((*hash_link->prev= hash_link->next)) if ((*hash_link->prev= hash_link->next))
hash_link->next->prev= hash_link->prev; hash_link->next->prev= hash_link->prev;
hash_link->block= NULL; hash_link->block= NULL;
...@@ -1654,23 +1698,32 @@ static void unlink_hash(PAGECACHE *pagecache, PAGECACHE_HASH_LINK *hash_link) ...@@ -1654,23 +1698,32 @@ static void unlink_hash(PAGECACHE *pagecache, PAGECACHE_HASH_LINK *hash_link)
if (page->file.file == hash_link->file.file && if (page->file.file == hash_link->file.file &&
page->pageno == hash_link->pageno) page->pageno == hash_link->pageno)
{ {
KEYCACHE_DBUG_PRINT("unlink_hash: signal", ("thread %ld", thread->id)); DBUG_PRINT("signal", ("thread %s %ld", thread->name, thread->id));
pagecache_pthread_cond_signal(&thread->suspend); pagecache_pthread_cond_signal(&thread->suspend);
wqueue_unlink_from_queue(&pagecache->waiting_for_hash_link, thread); wqueue_unlink_from_queue(&pagecache->waiting_for_hash_link, thread);
} }
} }
while (thread != last_thread); while (thread != last_thread);
/*
Add this to the hash, so that the waiting threads can find it
when they retry the call to get_hash_link(). This entry is special
in that it has no associated block.
*/
link_hash(&pagecache->hash_root[PAGECACHE_HASH(pagecache, link_hash(&pagecache->hash_root[PAGECACHE_HASH(pagecache,
hash_link->file, hash_link->file,
hash_link->pageno)], hash_link->pageno)],
hash_link); hash_link);
return; DBUG_VOID_RETURN;
} }
#else /* THREAD */ #else /* THREAD */
KEYCACHE_DBUG_ASSERT(! (pagecache->waiting_for_hash_link.last_thread)); KEYCACHE_DBUG_ASSERT(! (pagecache->waiting_for_hash_link.last_thread));
#endif /* THREAD */ #endif /* THREAD */
/* Add hash to free hash list */
hash_link->next= pagecache->free_hash_list; hash_link->next= pagecache->free_hash_list;
pagecache->free_hash_list= hash_link; pagecache->free_hash_list= hash_link;
DBUG_VOID_RETURN;
} }
...@@ -1701,8 +1754,6 @@ static PAGECACHE_HASH_LINK *get_present_hash_link(PAGECACHE *pagecache, ...@@ -1701,8 +1754,6 @@ static PAGECACHE_HASH_LINK *get_present_hash_link(PAGECACHE *pagecache,
#endif #endif
DBUG_ENTER("get_present_hash_link"); DBUG_ENTER("get_present_hash_link");
DBUG_PRINT("enter", ("fd: %u pos: %lu", (uint) file->file, (ulong) pageno)); DBUG_PRINT("enter", ("fd: %u pos: %lu", (uint) file->file, (ulong) pageno));
KEYCACHE_PRINT("get_present_hash_link", ("fd: %u pos: %lu",
(uint) file->file, (ulong) pageno));
/* /*
Find the bucket in the hash table for the pair (file, pageno); Find the bucket in the hash table for the pair (file, pageno);
...@@ -1737,6 +1788,7 @@ static PAGECACHE_HASH_LINK *get_present_hash_link(PAGECACHE *pagecache, ...@@ -1737,6 +1788,7 @@ static PAGECACHE_HASH_LINK *get_present_hash_link(PAGECACHE *pagecache,
} }
if (hash_link) if (hash_link)
{ {
DBUG_PRINT("exit", ("hash_link: %p", hash_link));
/* Register the request for the page */ /* Register the request for the page */
hash_link->requests++; hash_link->requests++;
} }
...@@ -1758,9 +1810,7 @@ static PAGECACHE_HASH_LINK *get_hash_link(PAGECACHE *pagecache, ...@@ -1758,9 +1810,7 @@ static PAGECACHE_HASH_LINK *get_hash_link(PAGECACHE *pagecache,
{ {
reg1 PAGECACHE_HASH_LINK *hash_link; reg1 PAGECACHE_HASH_LINK *hash_link;
PAGECACHE_HASH_LINK **start; PAGECACHE_HASH_LINK **start;
DBUG_ENTER("get_hash_link");
KEYCACHE_DBUG_PRINT("get_hash_link", ("fd: %u pos: %lu",
(uint) file->file, (ulong) pageno));
restart: restart:
/* try to find the page in the cache */ /* try to find the page in the cache */
...@@ -1771,6 +1821,9 @@ static PAGECACHE_HASH_LINK *get_hash_link(PAGECACHE *pagecache, ...@@ -1771,6 +1821,9 @@ static PAGECACHE_HASH_LINK *get_hash_link(PAGECACHE *pagecache,
/* There is no hash link in the hash table for the pair (file, pageno) */ /* There is no hash link in the hash table for the pair (file, pageno) */
if (pagecache->free_hash_list) if (pagecache->free_hash_list)
{ {
DBUG_PRINT("info", ("free_hash_list: %p free_hash_list->next: %p",
pagecache->free_hash_list,
pagecache->free_hash_list->next));
hash_link= pagecache->free_hash_list; hash_link= pagecache->free_hash_list;
pagecache->free_hash_list= hash_link->next; pagecache->free_hash_list= hash_link->next;
} }
...@@ -1784,21 +1837,20 @@ static PAGECACHE_HASH_LINK *get_hash_link(PAGECACHE *pagecache, ...@@ -1784,21 +1837,20 @@ static PAGECACHE_HASH_LINK *get_hash_link(PAGECACHE *pagecache,
/* Wait for a free hash link */ /* Wait for a free hash link */
struct st_my_thread_var *thread= my_thread_var; struct st_my_thread_var *thread= my_thread_var;
PAGECACHE_PAGE page; PAGECACHE_PAGE page;
KEYCACHE_DBUG_PRINT("get_hash_link", ("waiting"));
page.file= *file; page.file= *file;
page.pageno= pageno; page.pageno= pageno;
thread->opt_info= (void *) &page; thread->opt_info= (void *) &page;
wqueue_link_into_queue(&pagecache->waiting_for_hash_link, thread); wqueue_link_into_queue(&pagecache->waiting_for_hash_link, thread);
KEYCACHE_DBUG_PRINT("get_hash_link: wait", DBUG_PRINT("wait",
("suspend thread %ld", thread->id)); ("suspend thread %s %ld", thread->name, thread->id));
pagecache_pthread_cond_wait(&thread->suspend, pagecache_pthread_cond_wait(&thread->suspend,
&pagecache->cache_lock); &pagecache->cache_lock);
thread->opt_info= NULL; thread->opt_info= NULL;
DBUG_PRINT("thread", ("restarting..."));
goto restart;
#else #else
KEYCACHE_DBUG_ASSERT(0); DBUG_ASSERT(0);
#endif #endif
DBUG_PRINT("info", ("restarting..."));
goto restart;
} }
hash_link->file= *file; hash_link->file= *file;
DBUG_ASSERT(pageno < ((ULL(1)) << 40)); DBUG_ASSERT(pageno < ((ULL(1)) << 40));
...@@ -1807,6 +1859,7 @@ static PAGECACHE_HASH_LINK *get_hash_link(PAGECACHE *pagecache, ...@@ -1807,6 +1859,7 @@ static PAGECACHE_HASH_LINK *get_hash_link(PAGECACHE *pagecache,
/* Register the request for the page */ /* Register the request for the page */
hash_link->requests++; hash_link->requests++;
DBUG_ASSERT(hash_link->block == 0); DBUG_ASSERT(hash_link->block == 0);
DBUG_ASSERT(hash_link->requests == 1);
} }
else else
{ {
...@@ -1816,7 +1869,9 @@ static PAGECACHE_HASH_LINK *get_hash_link(PAGECACHE *pagecache, ...@@ -1816,7 +1869,9 @@ static PAGECACHE_HASH_LINK *get_hash_link(PAGECACHE *pagecache,
*/ */
hash_link->file.flush_log_callback= file->flush_log_callback; hash_link->file.flush_log_callback= file->flush_log_callback;
} }
return hash_link; DBUG_PRINT("exit", ("hash_link: %p block: %p", hash_link,
hash_link->block));
DBUG_RETURN(hash_link);
} }
...@@ -1930,16 +1985,7 @@ static PAGECACHE_BLOCK_LINK *find_block(PAGECACHE *pagecache, ...@@ -1930,16 +1985,7 @@ static PAGECACHE_BLOCK_LINK *find_block(PAGECACHE *pagecache,
hash_link->requests--; hash_link->requests--;
{ {
#ifdef THREAD #ifdef THREAD
struct st_my_thread_var *thread= my_thread_var; wait_for_flush(pagecache, block);
wqueue_add_to_queue(&block->wqueue[COND_FOR_SAVED], thread);
do
{
KEYCACHE_DBUG_PRINT("find_block: wait",
("suspend thread %ld", thread->id));
pagecache_pthread_cond_wait(&thread->suspend,
&pagecache->cache_lock);
}
while(thread->next);
#else #else
KEYCACHE_DBUG_ASSERT(0); KEYCACHE_DBUG_ASSERT(0);
/* /*
...@@ -1952,6 +1998,7 @@ static PAGECACHE_BLOCK_LINK *find_block(PAGECACHE *pagecache, ...@@ -1952,6 +1998,7 @@ static PAGECACHE_BLOCK_LINK *find_block(PAGECACHE *pagecache,
#endif #endif
} }
/* Invalidate page in the block if it has not been done yet */ /* Invalidate page in the block if it has not been done yet */
DBUG_ASSERT(block->status); /* Should always be true */
if (block->status) if (block->status)
free_block(pagecache, block); free_block(pagecache, block);
return 0; return 0;
...@@ -1990,8 +2037,8 @@ static PAGECACHE_BLOCK_LINK *find_block(PAGECACHE *pagecache, ...@@ -1990,8 +2037,8 @@ static PAGECACHE_BLOCK_LINK *find_block(PAGECACHE *pagecache,
/* Wait until the request can be resubmitted */ /* Wait until the request can be resubmitted */
do do
{ {
KEYCACHE_DBUG_PRINT("find_block: wait", DBUG_PRINT("wait",
("suspend thread %ld", thread->id)); ("suspend thread %s %ld", thread->name, thread->id));
pagecache_pthread_cond_wait(&thread->suspend, pagecache_pthread_cond_wait(&thread->suspend,
&pagecache->cache_lock); &pagecache->cache_lock);
} }
...@@ -2063,32 +2110,39 @@ static PAGECACHE_BLOCK_LINK *find_block(PAGECACHE *pagecache, ...@@ -2063,32 +2110,39 @@ static PAGECACHE_BLOCK_LINK *find_block(PAGECACHE *pagecache,
/* There are no never used blocks, use a block from the LRU chain */ /* There are no never used blocks, use a block from the LRU chain */
/* /*
Wait until a new block is added to the LRU chain; Ensure that we are going to register the block.
several threads might wait here for the same page, (This should be true as a new block could not have been
all of them must get the same block pinned by caller).
*/ */
DBUG_ASSERT(reg_req);
#ifdef THREAD #ifdef THREAD
if (! pagecache->used_last) if (! pagecache->used_last)
{ {
/*
Wait until a new block is added to the LRU chain;
several threads might wait here for the same page,
all of them must get the same block.
The block is given to us by the next thread executing
link_block().
*/
struct st_my_thread_var *thread= my_thread_var; struct st_my_thread_var *thread= my_thread_var;
thread->opt_info= (void *) hash_link; thread->opt_info= (void *) hash_link;
wqueue_link_into_queue(&pagecache->waiting_for_block, thread); wqueue_link_into_queue(&pagecache->waiting_for_block, thread);
do do
{ {
KEYCACHE_DBUG_PRINT("find_block: wait", DBUG_PRINT("wait",
("suspend thread %ld", thread->id)); ("suspend thread %s %ld", thread->name, thread->id));
pagecache_pthread_cond_wait(&thread->suspend, pagecache_pthread_cond_wait(&thread->suspend,
&pagecache->cache_lock); &pagecache->cache_lock);
} }
while (thread->next); while (thread->next);
thread->opt_info= NULL; thread->opt_info= NULL;
block= hash_link->block; block= hash_link->block;
/* /* Ensure that the block is registered */
Ensure that we are register this block (all blocks not used by this DBUG_ASSERT(block->requests >= 1);
thread has to be registered).
*/
DBUG_ASSERT(reg_req);
} }
else else
#else #else
...@@ -2100,21 +2154,24 @@ static PAGECACHE_BLOCK_LINK *find_block(PAGECACHE *pagecache, ...@@ -2100,21 +2154,24 @@ static PAGECACHE_BLOCK_LINK *find_block(PAGECACHE *pagecache,
unlinking it from the chain unlinking it from the chain
*/ */
block= pagecache->used_last->next_used; block= pagecache->used_last->next_used;
block->hits_left= init_hits_left;
block->last_hit_time= 0;
if (reg_req) if (reg_req)
reg_requests(pagecache, block, 1); reg_requests(pagecache, block, 1);
hash_link->block= block; hash_link->block= block;
DBUG_ASSERT(block->requests == 1);
} }
PCBLOCK_INFO(block); PCBLOCK_INFO(block);
DBUG_ASSERT(block->wlocks == 0);
DBUG_ASSERT(block->rlocks == 0); DBUG_ASSERT(block->hash_link == hash_link ||
DBUG_ASSERT(block->rlocks_queue == 0); !(block->status & PCBLOCK_IN_SWITCH));
DBUG_ASSERT(block->pins == 0);
if (block->hash_link != hash_link && if (block->hash_link != hash_link &&
! (block->status & PCBLOCK_IN_SWITCH) ) ! (block->status & PCBLOCK_IN_SWITCH) )
{ {
/* If another thread is flushing the block, wait for it. */
if (block->status & PCBLOCK_IN_FLUSH)
wait_for_flush(pagecache, block);
/* this is a primary request for a new page */ /* this is a primary request for a new page */
DBUG_ASSERT(block->wlocks == 0); DBUG_ASSERT(block->wlocks == 0);
DBUG_ASSERT(block->rlocks == 0); DBUG_ASSERT(block->rlocks == 0);
...@@ -2161,15 +2218,20 @@ static PAGECACHE_BLOCK_LINK *find_block(PAGECACHE *pagecache, ...@@ -2161,15 +2218,20 @@ static PAGECACHE_BLOCK_LINK *find_block(PAGECACHE *pagecache,
/* Remove the hash link for this page from the hash table */ /* Remove the hash link for this page from the hash table */
unlink_hash(pagecache, block->hash_link); unlink_hash(pagecache, block->hash_link);
/* All pending requests for this page must be resubmitted */
#ifdef THREAD #ifdef THREAD
/* All pending requests for this page must be resubmitted. */
if (block->wqueue[COND_FOR_SAVED].last_thread) if (block->wqueue[COND_FOR_SAVED].last_thread)
wqueue_release_queue(&block->wqueue[COND_FOR_SAVED]); wqueue_release_queue(&block->wqueue[COND_FOR_SAVED]);
#endif #endif
} }
link_to_file_list(pagecache, block, file, link_to_file_list(pagecache, block, file,
(my_bool)(block->hash_link ? 1 : 0)); (my_bool)(block->hash_link ? 1 : 0));
block->hash_link= hash_link;
PCBLOCK_INFO(block); PCBLOCK_INFO(block);
block->hits_left= init_hits_left;
block->last_hit_time= 0;
block->status= error ? PCBLOCK_ERROR : 0; block->status= error ? PCBLOCK_ERROR : 0;
block->error= error ? (int16) my_errno : 0; block->error= error ? (int16) my_errno : 0;
#ifndef DBUG_OFF #ifndef DBUG_OFF
...@@ -2177,7 +2239,6 @@ static PAGECACHE_BLOCK_LINK *find_block(PAGECACHE *pagecache, ...@@ -2177,7 +2239,6 @@ static PAGECACHE_BLOCK_LINK *find_block(PAGECACHE *pagecache,
if (error) if (error)
my_debug_put_break_here(); my_debug_put_break_here();
#endif #endif
block->hash_link= hash_link;
page_status= PAGE_TO_BE_READ; page_status= PAGE_TO_BE_READ;
DBUG_PRINT("info", ("page to be read set for page 0x%lx", DBUG_PRINT("info", ("page to be read set for page 0x%lx",
(ulong)block)); (ulong)block));
...@@ -2200,12 +2261,18 @@ static PAGECACHE_BLOCK_LINK *find_block(PAGECACHE *pagecache, ...@@ -2200,12 +2261,18 @@ static PAGECACHE_BLOCK_LINK *find_block(PAGECACHE *pagecache,
} }
else else
{ {
/*
The block was found in the cache. It's either a already read
block or a block waiting to be read by another thread.
*/
if (reg_req) if (reg_req)
reg_requests(pagecache, block, 1); reg_requests(pagecache, block, 1);
KEYCACHE_DBUG_PRINT("find_block", KEYCACHE_DBUG_PRINT("find_block",
("block->hash_link: %p hash_link: %p " ("block->hash_link: %p hash_link: %p "
"block->status: %u", block->hash_link, "block->status: %u", block->hash_link,
hash_link, block->status )); hash_link, block->status ));
KEYCACHE_DBUG_ASSERT(block->hash_link == hash_link &&
hash_link->block == block);
page_status= (((block->hash_link == hash_link) && page_status= (((block->hash_link == hash_link) &&
(block->status & PCBLOCK_READ)) ? (block->status & PCBLOCK_READ)) ?
PAGE_READ : PAGE_WAIT_TO_BE_READ); PAGE_READ : PAGE_WAIT_TO_BE_READ);
...@@ -2339,8 +2406,8 @@ static my_bool pagecache_wait_lock(PAGECACHE *pagecache, ...@@ -2339,8 +2406,8 @@ static my_bool pagecache_wait_lock(PAGECACHE *pagecache,
dec_counter_for_resize_op(pagecache); dec_counter_for_resize_op(pagecache);
do do
{ {
KEYCACHE_DBUG_PRINT("get_wrlock: wait", DBUG_PRINT("wait",
("suspend thread %ld", thread->id)); ("suspend thread %s %ld", thread->name, thread->id));
pagecache_pthread_cond_wait(&thread->suspend, pagecache_pthread_cond_wait(&thread->suspend,
&pagecache->cache_lock); &pagecache->cache_lock);
} }
...@@ -2582,6 +2649,7 @@ static my_bool make_lock_and_pin(PAGECACHE *pagecache, ...@@ -2582,6 +2649,7 @@ static my_bool make_lock_and_pin(PAGECACHE *pagecache,
DBUG_ASSERT(!any || DBUG_ASSERT(!any ||
((lock == PAGECACHE_LOCK_LEFT_UNLOCKED) && ((lock == PAGECACHE_LOCK_LEFT_UNLOCKED) &&
(pin == PAGECACHE_UNPIN))); (pin == PAGECACHE_UNPIN)));
DBUG_ASSERT(block->hash_link->block == block);
switch (lock) { switch (lock) {
case PAGECACHE_LOCK_WRITE: /* free -> write */ case PAGECACHE_LOCK_WRITE: /* free -> write */
...@@ -2748,8 +2816,8 @@ static void read_block(PAGECACHE *pagecache, ...@@ -2748,8 +2816,8 @@ static void read_block(PAGECACHE *pagecache,
wqueue_add_to_queue(&block->wqueue[COND_FOR_REQUESTED], thread); wqueue_add_to_queue(&block->wqueue[COND_FOR_REQUESTED], thread);
do do
{ {
DBUG_PRINT("read_block: wait", DBUG_PRINT("wait",
("suspend thread %ld", thread->id)); ("suspend thread %s %ld", thread->name, thread->id));
pagecache_pthread_cond_wait(&thread->suspend, pagecache_pthread_cond_wait(&thread->suspend,
&pagecache->cache_lock); &pagecache->cache_lock);
} }
...@@ -3634,7 +3702,7 @@ static my_bool pagecache_delete_internal(PAGECACHE *pagecache, ...@@ -3634,7 +3702,7 @@ static my_bool pagecache_delete_internal(PAGECACHE *pagecache,
DBUG_ASSERT(0); DBUG_ASSERT(0);
DBUG_ASSERT(block->hash_link->requests > 0); DBUG_ASSERT(block->hash_link->requests > 0);
page_link->requests--; page_link->requests--;
/* See NOTE for pagecache_unlock about registering requests. */ /* See NOTE for pagecache_unlock() about registering requests. */
free_block(pagecache, block); free_block(pagecache, block);
dec_counter_for_resize_op(pagecache); dec_counter_for_resize_op(pagecache);
return 0; return 0;
...@@ -4239,6 +4307,7 @@ my_bool pagecache_write_part(PAGECACHE *pagecache, ...@@ -4239,6 +4307,7 @@ my_bool pagecache_write_part(PAGECACHE *pagecache,
static void free_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block) static void free_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block)
{ {
uint status= block->status;
KEYCACHE_THREAD_TRACE("free block"); KEYCACHE_THREAD_TRACE("free block");
KEYCACHE_DBUG_PRINT("free_block", KEYCACHE_DBUG_PRINT("free_block",
("block: %u hash_link 0x%lx", ("block: %u hash_link 0x%lx",
...@@ -4264,29 +4333,43 @@ static void free_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block) ...@@ -4264,29 +4333,43 @@ static void free_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block)
DBUG_ASSERT(block->rlocks_queue == 0); DBUG_ASSERT(block->rlocks_queue == 0);
DBUG_ASSERT(block->pins == 0); DBUG_ASSERT(block->pins == 0);
DBUG_ASSERT((block->status & ~(PCBLOCK_ERROR | PCBLOCK_READ | PCBLOCK_IN_FLUSH | PCBLOCK_CHANGED | PCBLOCK_REASSIGNED | PCBLOCK_DEL_WRITE)) == 0); DBUG_ASSERT((block->status & ~(PCBLOCK_ERROR | PCBLOCK_READ | PCBLOCK_IN_FLUSH | PCBLOCK_CHANGED | PCBLOCK_REASSIGNED | PCBLOCK_DEL_WRITE)) == 0);
DBUG_ASSERT(block->requests >= 1);
DBUG_ASSERT(block->next_used == NULL);
block->status= 0; block->status= 0;
#ifndef DBUG_OFF #ifndef DBUG_OFF
block->type= PAGECACHE_EMPTY_PAGE; block->type= PAGECACHE_EMPTY_PAGE;
#endif #endif
block->rec_lsn= LSN_MAX; block->rec_lsn= LSN_MAX;
block->hash_link= NULL;
if (block->temperature == PCBLOCK_WARM)
pagecache->warm_blocks--;
block->temperature= PCBLOCK_COLD;
KEYCACHE_THREAD_TRACE("free block"); KEYCACHE_THREAD_TRACE("free block");
KEYCACHE_DBUG_PRINT("free_block", KEYCACHE_DBUG_PRINT("free_block",
("block is freed")); ("block is freed"));
unreg_request(pagecache, block, 0); unreg_request(pagecache, block, 0);
DBUG_ASSERT(block->requests == 0);
DBUG_ASSERT(block->next_used != 0);
block->hash_link= NULL;
/* Remove the free block from the LRU ring. */ /*
unlink_block(pagecache, block); Block->requests is != 0 if unreg_requests()/link_block() gave the block
if (block->temperature == PCBLOCK_WARM) to a waiting thread
pagecache->warm_blocks--; */
block->temperature= PCBLOCK_COLD; if (!block->requests)
/* Insert the free block in the free list. */ {
block->next_used= pagecache->free_block_list; DBUG_ASSERT(block->next_used != 0);
pagecache->free_block_list= block;
/* Keep track of the number of currently unused blocks. */ /* Remove the free block from the LRU ring. */
pagecache->blocks_unused++; unlink_block(pagecache, block);
/* Insert the free block in the free list. */
block->next_used= pagecache->free_block_list;
pagecache->free_block_list= block;
/* Keep track of the number of currently unused blocks. */
pagecache->blocks_unused++;
}
else
{
/* keep flag set by link_block() */
block->status= status & PCBLOCK_REASSIGNED;
}
#ifdef THREAD #ifdef THREAD
/* All pending requests for this page must be resubmitted. */ /* All pending requests for this page must be resubmitted. */
...@@ -4544,8 +4627,9 @@ static int flush_pagecache_blocks_int(PAGECACHE *pagecache, ...@@ -4544,8 +4627,9 @@ static int flush_pagecache_blocks_int(PAGECACHE *pagecache,
wqueue_add_to_queue(&other_flusher->flush_queue, thread); wqueue_add_to_queue(&other_flusher->flush_queue, thread);
do do
{ {
KEYCACHE_DBUG_PRINT("flush_pagecache_blocks_int: wait1", DBUG_PRINT("wait",
("suspend thread %ld", thread->id)); ("(1) suspend thread %s %ld",
thread->name, thread->id));
pagecache_pthread_cond_wait(&thread->suspend, pagecache_pthread_cond_wait(&thread->suspend,
&pagecache->cache_lock); &pagecache->cache_lock);
} }
...@@ -4706,8 +4790,9 @@ static int flush_pagecache_blocks_int(PAGECACHE *pagecache, ...@@ -4706,8 +4790,9 @@ static int flush_pagecache_blocks_int(PAGECACHE *pagecache,
wqueue_add_to_queue(&block->wqueue[COND_FOR_SAVED], thread); wqueue_add_to_queue(&block->wqueue[COND_FOR_SAVED], thread);
do do
{ {
KEYCACHE_DBUG_PRINT("flush_pagecache_blocks_int: wait2", DBUG_PRINT("wait",
("suspend thread %ld", thread->id)); ("(2) suspend thread %s %ld",
thread->name, thread->id));
pagecache_pthread_cond_wait(&thread->suspend, pagecache_pthread_cond_wait(&thread->suspend,
&pagecache->cache_lock); &pagecache->cache_lock);
} }
...@@ -4917,8 +5002,8 @@ my_bool pagecache_collect_changed_blocks_with_lsn(PAGECACHE *pagecache, ...@@ -4917,8 +5002,8 @@ my_bool pagecache_collect_changed_blocks_with_lsn(PAGECACHE *pagecache,
wqueue_add_to_queue(&other_flusher->flush_queue, thread); wqueue_add_to_queue(&other_flusher->flush_queue, thread);
do do
{ {
KEYCACHE_DBUG_PRINT("pagecache_collect_changed_blocks_with_lsn: wait", DBUG_PRINT("wait",
("suspend thread %ld", thread->id)); ("suspend thread %s %ld", thread->name, thread->id));
pagecache_pthread_cond_wait(&thread->suspend, pagecache_pthread_cond_wait(&thread->suspend,
&pagecache->cache_lock); &pagecache->cache_lock);
} }
...@@ -5062,7 +5147,7 @@ static void pagecache_dump(PAGECACHE *pagecache) ...@@ -5062,7 +5147,7 @@ static void pagecache_dump(PAGECACHE *pagecache)
PAGECACHE_PAGE *page; PAGECACHE_PAGE *page;
uint i; uint i;
fprintf(pagecache_dump_file, "thread:%u\n", thread->id); fprintf(pagecache_dump_file, "thread: %s %ld\n", thread->name, thread->id);
i=0; i=0;
thread=last=waiting_for_hash_link.last_thread; thread=last=waiting_for_hash_link.last_thread;
...@@ -5073,8 +5158,9 @@ static void pagecache_dump(PAGECACHE *pagecache) ...@@ -5073,8 +5158,9 @@ static void pagecache_dump(PAGECACHE *pagecache)
thread= thread->next; thread= thread->next;
page= (PAGECACHE_PAGE *) thread->opt_info; page= (PAGECACHE_PAGE *) thread->opt_info;
fprintf(pagecache_dump_file, fprintf(pagecache_dump_file,
"thread:%u, (file,pageno)=(%u,%lu)\n", "thread: %s %ld, (file,pageno)=(%u,%lu)\n",
thread->id,(uint) page->file.file,(ulong) page->pageno); thread->name, thread->id,
(uint) page->file.file,(ulong) page->pageno);
if (++i == MAX_QUEUE_LEN) if (++i == MAX_QUEUE_LEN)
break; break;
} }
...@@ -5089,8 +5175,9 @@ static void pagecache_dump(PAGECACHE *pagecache) ...@@ -5089,8 +5175,9 @@ static void pagecache_dump(PAGECACHE *pagecache)
thread=thread->next; thread=thread->next;
hash_link= (PAGECACHE_HASH_LINK *) thread->opt_info; hash_link= (PAGECACHE_HASH_LINK *) thread->opt_info;
fprintf(pagecache_dump_file, fprintf(pagecache_dump_file,
"thread:%u hash_link:%u (file,pageno)=(%u,%lu)\n", "thread: %s %u hash_link:%u (file,pageno)=(%u,%lu)\n",
thread->id, (uint) PAGECACHE_HASH_LINK_NUMBER(pagecache, hash_link), thread->name, thread->id,
(uint) PAGECACHE_HASH_LINK_NUMBER(pagecache, hash_link),
(uint) hash_link->file.file,(ulong) hash_link->pageno); (uint) hash_link->file.file,(ulong) hash_link->pageno);
if (++i == MAX_QUEUE_LEN) if (++i == MAX_QUEUE_LEN)
break; break;
...@@ -5119,7 +5206,7 @@ static void pagecache_dump(PAGECACHE *pagecache) ...@@ -5119,7 +5206,7 @@ static void pagecache_dump(PAGECACHE *pagecache)
{ {
thread=thread->next; thread=thread->next;
fprintf(pagecache_dump_file, fprintf(pagecache_dump_file,
"thread:%u\n", thread->id); "thread: %s %ld\n", thread->name, thread->id);
if (++i == MAX_QUEUE_LEN) if (++i == MAX_QUEUE_LEN)
break; break;
} }
......
...@@ -1072,7 +1072,7 @@ int ha_myisam::repair(THD* thd, HA_CHECK_OPT *check_opt) ...@@ -1072,7 +1072,7 @@ int ha_myisam::repair(THD* thd, HA_CHECK_OPT *check_opt)
param.testflag&= ~(T_RETRY_WITHOUT_QUICK | T_QUICK); param.testflag&= ~(T_RETRY_WITHOUT_QUICK | T_QUICK);
/* Ensure we don't loose any rows when retrying without quick */ /* Ensure we don't loose any rows when retrying without quick */
param.testflag|= T_SAFE_REPAIR; param.testflag|= T_SAFE_REPAIR;
sql_print_information("Retrying repair of: '%s' without quick", sql_print_information("Retrying repair of: '%s' including modifying data file",
table->s->path.str); table->s->path.str);
continue; continue;
} }
...@@ -1666,15 +1666,15 @@ bool ha_myisam::check_and_repair(THD *thd) ...@@ -1666,15 +1666,15 @@ bool ha_myisam::check_and_repair(THD *thd)
if ((marked_crashed= mi_is_crashed(file)) || check(thd, &check_opt)) if ((marked_crashed= mi_is_crashed(file)) || check(thd, &check_opt))
{ {
sql_print_warning("Recovering table: '%s'",table->s->path.str); sql_print_warning("Recovering table: '%s'",table->s->path.str);
if (myisam_recover_options & (HA_RECOVER_FULL_BACKUP | HA_RECOVER_BACKUP)) if (myisam_recover_options & HA_RECOVER_FULL_BACKUP)
{ {
char buff[MY_BACKUP_NAME_EXTRA_LENGTH+1]; char buff[MY_BACKUP_NAME_EXTRA_LENGTH+1];
my_create_backup_name(buff, "", check_opt.start_time); my_create_backup_name(buff, "", check_opt.start_time);
sql_print_information("Making backup of data with extension '%s'", buff); sql_print_information("Making backup of index file with extension '%s'",
} buff);
if (myisam_recover_options & HA_RECOVER_FULL_BACKUP)
mi_make_backup_of_index(file, check_opt.start_time, mi_make_backup_of_index(file, check_opt.start_time,
MYF(MY_WME | ME_JUST_WARNING)); MYF(MY_WME | ME_JUST_WARNING));
}
check_opt.flags= check_opt.flags=
(((myisam_recover_options & (((myisam_recover_options &
(HA_RECOVER_BACKUP | HA_RECOVER_FULL_BACKUP)) ? T_BACKUP_DATA : 0) | (HA_RECOVER_BACKUP | HA_RECOVER_FULL_BACKUP)) ? T_BACKUP_DATA : 0) |
......
...@@ -85,6 +85,8 @@ static SORT_KEY_BLOCKS *alloc_key_blocks(HA_CHECK *param, uint blocks, ...@@ -85,6 +85,8 @@ static SORT_KEY_BLOCKS *alloc_key_blocks(HA_CHECK *param, uint blocks,
uint buffer_length); uint buffer_length);
static ha_checksum mi_byte_checksum(const uchar *buf, uint length); static ha_checksum mi_byte_checksum(const uchar *buf, uint length);
static void set_data_file_type(MI_SORT_INFO *sort_info, MYISAM_SHARE *share); static void set_data_file_type(MI_SORT_INFO *sort_info, MYISAM_SHARE *share);
static int replace_data_file(HA_CHECK *param, MI_INFO *info,
const char *name, File new_file);
void myisamchk_init(HA_CHECK *param) void myisamchk_init(HA_CHECK *param)
{ {
...@@ -1733,17 +1735,8 @@ int mi_repair(HA_CHECK *param, register MI_INFO *info, ...@@ -1733,17 +1735,8 @@ int mi_repair(HA_CHECK *param, register MI_INFO *info,
/* Replace the actual file with the temporary file */ /* Replace the actual file with the temporary file */
if (new_file >= 0) if (new_file >= 0)
{ {
my_close(new_file,MYF(0)); got_error= replace_data_file(param, info, name, new_file);
info->dfile=new_file= -1; new_file= -1;
if (change_to_newfile(share->data_file_name,MI_NAME_DEXT,
DATA_TMP_EXT,
param->backup_time,
share->base.raid_chunks,
(param->testflag & T_BACKUP_DATA ?
MYF(MY_REDEL_MAKE_BACKUP): MYF(0))) ||
mi_open_datafile(info,share,name,-1))
got_error=1;
param->retry_repair= 0; param->retry_repair= 0;
} }
} }
...@@ -2553,15 +2546,8 @@ int mi_repair_by_sort(HA_CHECK *param, register MI_INFO *info, ...@@ -2553,15 +2546,8 @@ int mi_repair_by_sort(HA_CHECK *param, register MI_INFO *info,
/* Replace the actual file with the temporary file */ /* Replace the actual file with the temporary file */
if (new_file >= 0) if (new_file >= 0)
{ {
my_close(new_file,MYF(0)); got_error= replace_data_file(param, info, name, new_file);
info->dfile=new_file= -1; new_file= -1;
if (change_to_newfile(share->data_file_name,MI_NAME_DEXT,
DATA_TMP_EXT, param->backup_time,
share->base.raid_chunks,
(param->testflag & T_BACKUP_DATA ?
MYF(MY_REDEL_MAKE_BACKUP): MYF(0))) ||
mi_open_datafile(info,share,name,-1))
got_error=1;
} }
} }
if (got_error) if (got_error)
...@@ -3092,15 +3078,8 @@ int mi_repair_parallel(HA_CHECK *param, register MI_INFO *info, ...@@ -3092,15 +3078,8 @@ int mi_repair_parallel(HA_CHECK *param, register MI_INFO *info,
/* Replace the actual file with the temporary file */ /* Replace the actual file with the temporary file */
if (new_file >= 0) if (new_file >= 0)
{ {
my_close(new_file,MYF(0)); got_error= replace_data_file(param, info, name, new_file);
info->dfile=new_file= -1; new_file= -1;
if (change_to_newfile(share->data_file_name,MI_NAME_DEXT,
DATA_TMP_EXT, param->backup_time,
share->base.raid_chunks,
(param->testflag & T_BACKUP_DATA ?
MYF(MY_REDEL_MAKE_BACKUP): MYF(0))) ||
mi_open_datafile(info,share,name,-1))
got_error=1;
} }
} }
if (got_error) if (got_error)
...@@ -4765,3 +4744,29 @@ int mi_make_backup_of_index(MI_INFO *info, time_t backup_time, myf flags) ...@@ -4765,3 +4744,29 @@ int mi_make_backup_of_index(MI_INFO *info, time_t backup_time, myf flags)
my_create_backup_name(backup_name, info->s->index_file_name, backup_time); my_create_backup_name(backup_name, info->s->index_file_name, backup_time);
return my_copy(info->s->index_file_name, backup_name, flags); return my_copy(info->s->index_file_name, backup_name, flags);
} }
static int replace_data_file(HA_CHECK *param, MI_INFO *info,
const char *name, File new_file)
{
MYISAM_SHARE *share=info->s;
my_close(new_file,MYF(0));
info->dfile= -1;
if (param->testflag & T_BACKUP_DATA)
{
char buff[MY_BACKUP_NAME_EXTRA_LENGTH+1];
my_create_backup_name(buff, "", param->backup_time);
my_printf_error(0, /* No error, just info */
"Making backup of data file with extension '%s'",
MYF(ME_JUST_INFO | ME_NOREFRESH), buff);
}
if (change_to_newfile(share->data_file_name,MI_NAME_DEXT,
DATA_TMP_EXT, param->backup_time,
share->base.raid_chunks,
(param->testflag & T_BACKUP_DATA ?
MYF(MY_REDEL_MAKE_BACKUP): MYF(0))) ||
mi_open_datafile(info, share, name, -1))
return 1;
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment