Commit e27890ca authored by unknown's avatar unknown

WL#3072 Maria recovery

* create page cache before initializing engine and not after, because
Maria's recovery needs a page cache
* make the creation of a bitmap page more crash-resistent
* bugfix (see ma_blockrec.c)
* back to old way: create an 8k bitmap page when creating table
* preparations for the UNDO phase: recreate TRNs
* preparations for Checkpoint: list of dirty pages, testing
of rec_lsn to know if page should be skipped during Recovery
(unused in this patch as no Checkpoint module pushed yet)
* maria_chk tags repaired table with a special LSN
* reworking all around in ma_recovery.c (less duplication)


mysys/my_realloc.c:
  noted an issue in my_realloc()
sql/mysqld.cc:
  page cache needs to be created before engines are initialized,
  because Maria's initialization may do a recovery which needs
  the page cache.
storage/maria/ha_maria.cc:
  update to new prototype
storage/maria/ma_bitmap.c:
  when creating the first bitmap page we used chsize to 8192 bytes then 
  pwrite (overwrite) the last 2 bytes (8191-8192). If crash between
  the two operations, this leaves a bitmap page full without its end
  marker. A later recovery may try to read this page and find it
  exists and misses a marker and conclude it's corrupted and fail.
  Changing the chsize to only 8190 bytes: recovery will then find
  the page is too short and recreate it entirely.
storage/maria/ma_blockrec.c:
  Fix for a bug: when executing a REDO, if the data page is created,
  data_file_length was increased before _ma_bitmap_set():
  _ma_bitmap_set() called _ma_read_bitmap_page() which, due to the
  increased data_file_length, expected to find a bitmap page on disk
  with a correct end marker; if the bitmap page didn't exist already
  in fact, this failed. Fixed by increasing data_file_length only after
  _ma_read_bitmap_page() has created the new bitmap page correctly.
  This bug could happen every time a REDO is about creating a new
  bitmap page.
storage/maria/ma_check.c:
  empty data file has a bitmap page
storage/maria/ma_control_file.c:
  useless parameter to ma_control_file_create_or_open(), just
  test if this is recovery.
storage/maria/ma_control_file.h:
  new prototype
storage/maria/ma_create.c:
  Back to how it was before: maria_create() creates an 8k bitmap page.
  Thus (bugfix) data_file_length needs to reflect this instead of being 0.
storage/maria/ma_loghandler.c:
  as ma_test1 and ma_test2 now use real transactions and not
  dummy_transaction_object, REDO for INSERT/UPDATE/DELETE are always
  about real transactions, can assert this.
  A function for Recovery to assign a short id to a table.
storage/maria/ma_loghandler.h:
  new function
storage/maria/ma_loghandler_lsn.h:
  maria_chk tags repaired tables with this LSN
storage/maria/ma_open.c:
  * enforce that DMLs on transactional tables use real transactions
  and not dummy_transaction_object.
  * test if table was repaired with maria_chk (which has to been
  seen as an import of an external table into the server), test
  validity of create_rename_lsn (header corruption detection)
  * comments.
storage/maria/ma_recovery.c:
  * preparations for the UNDO phase: recreate TRNs
  * preparations for Checkpoint: list of dirty pages, testing
  of rec_lsn to know if page should be skipped during Recovery
  (unused in this patch as no Checkpoint module pushed yet)
  * reworking all around (less duplication)
storage/maria/ma_recovery.h:
  a parameter to say if the UNDO phase should be skipped
storage/maria/maria_chk.c:
  tag repaired tables with a special LSN
storage/maria/maria_read_log.c:
  * update to new prototype
  * no UNDO phase in maria_read_log for now
storage/maria/trnman.c:
  * a function for Recovery to create a transaction (TRN), needed
  in the UNDO phase
  * a function for Recovery to grab an existing transaction, needed
  in the UNDO phase (rollback all existing transactions)
storage/maria/trnman_public.h:
  new functions
parent dd20a9a7
...@@ -22,6 +22,16 @@ ...@@ -22,6 +22,16 @@
/* My memory re allocator */ /* My memory re allocator */
/**
@brief wrapper around realloc()
@param oldpoint pointer to currently allocated area
@param size new size requested, must be >0
@param my_flags flags
@note if size==0 realloc() may return NULL; my_realloc() treats this as an
error which is not the intention of realloc()
*/
void* my_realloc(void* oldpoint, size_t size, myf my_flags) void* my_realloc(void* oldpoint, size_t size, myf my_flags)
{ {
void *point; void *point;
...@@ -29,6 +39,7 @@ void* my_realloc(void* oldpoint, size_t size, myf my_flags) ...@@ -29,6 +39,7 @@ void* my_realloc(void* oldpoint, size_t size, myf my_flags)
DBUG_PRINT("my",("ptr: 0x%lx size: %lu my_flags: %d", (long) oldpoint, DBUG_PRINT("my",("ptr: 0x%lx size: %lu my_flags: %d", (long) oldpoint,
(ulong) size, my_flags)); (ulong) size, my_flags));
DBUG_ASSERT(size > 0);
if (!oldpoint && (my_flags & MY_ALLOW_ZERO_PTR)) if (!oldpoint && (my_flags & MY_ALLOW_ZERO_PTR))
DBUG_RETURN(my_malloc(size,my_flags)); DBUG_RETURN(my_malloc(size,my_flags));
#ifdef USE_HALLOC #ifdef USE_HALLOC
......
...@@ -3418,6 +3418,17 @@ server."); ...@@ -3418,6 +3418,17 @@ server.");
using_update_log=1; using_update_log=1;
} }
/* call ha_init_key_cache() on all key caches to init them */
process_key_caches(&ha_init_key_cache);
/*
Maria's pagecache needs to be ready before Maria engine (Recovery uses
pagecache, and Checkpoint may happen at startup). Maria engine is taken up
in plugin_init().
*/
#ifdef WITH_MARIA_STORAGE_ENGINE
process_pagecaches(&ha_init_pagecache);
#endif /* WITH_MARIA_STORAGE_ENGINE */
/* Allow storage engine to give real error messages */ /* Allow storage engine to give real error messages */
if (ha_init_errors()) if (ha_init_errors())
DBUG_RETURN(1); DBUG_RETURN(1);
...@@ -3588,12 +3599,6 @@ server."); ...@@ -3588,12 +3599,6 @@ server.");
if (opt_myisam_log) if (opt_myisam_log)
(void) mi_log(1); (void) mi_log(1);
/* call ha_init_key_cache() on all key caches to init them */
process_key_caches(&ha_init_key_cache);
#ifdef WITH_MARIA_STORAGE_ENGINE
process_pagecaches(&ha_init_pagecache);
#endif /* WITH_MARIA_STORAGE_ENGINE */
#if defined(HAVE_MLOCKALL) && defined(MCL_CURRENT) && !defined(EMBEDDED_LIBRARY) #if defined(HAVE_MLOCKALL) && defined(MCL_CURRENT) && !defined(EMBEDDED_LIBRARY)
if (locked_in_memory && !getuid()) if (locked_in_memory && !getuid())
{ {
......
...@@ -32,6 +32,10 @@ ...@@ -32,6 +32,10 @@
#include "ma_blockrec.h" #include "ma_blockrec.h"
#include "ma_commit.h" #include "ma_commit.h"
/*
Note that in future versions, only *transactional* Maria tables can
rollback, so this flag should be up or down conditionally.
*/
#define MARIA_CANNOT_ROLLBACK HA_NO_TRANSACTIONS #define MARIA_CANNOT_ROLLBACK HA_NO_TRANSACTIONS
#ifdef MARIA_CANNOT_ROLLBACK #ifdef MARIA_CANNOT_ROLLBACK
#define trans_register_ha(A, B, C) do { /* nothing */ } while(0) #define trans_register_ha(A, B, C) do { /* nothing */ } while(0)
...@@ -2383,7 +2387,7 @@ static int ha_maria_init(void *p) ...@@ -2383,7 +2387,7 @@ static int ha_maria_init(void *p)
maria_hton->flags= HTON_CAN_RECREATE | HTON_SUPPORT_LOG_TABLES; maria_hton->flags= HTON_CAN_RECREATE | HTON_SUPPORT_LOG_TABLES;
bzero(maria_log_pagecache, sizeof(*maria_log_pagecache)); bzero(maria_log_pagecache, sizeof(*maria_log_pagecache));
maria_data_root= mysql_real_data_home; maria_data_root= mysql_real_data_home;
res= maria_init() || ma_control_file_create_or_open(TRUE) || res= maria_init() || ma_control_file_create_or_open() ||
(init_pagecache(maria_log_pagecache, (init_pagecache(maria_log_pagecache,
TRANSLOG_PAGECACHE_SIZE, 0, 0, TRANSLOG_PAGECACHE_SIZE, 0, 0,
TRANSLOG_PAGE_SIZE) == 0) || TRANSLOG_PAGE_SIZE) == 0) ||
......
...@@ -512,15 +512,19 @@ static my_bool _ma_read_bitmap_page(MARIA_SHARE *share, ...@@ -512,15 +512,19 @@ static my_bool _ma_read_bitmap_page(MARIA_SHARE *share,
MARIA_FILE_BITMAP *bitmap, MARIA_FILE_BITMAP *bitmap,
ulonglong page) ulonglong page)
{ {
my_off_t position= page * bitmap->block_size; my_off_t end_of_page= (page + 1) * bitmap->block_size;
my_bool res; my_bool res;
DBUG_ENTER("_ma_read_bitmap_page"); DBUG_ENTER("_ma_read_bitmap_page");
DBUG_ASSERT(page % bitmap->pages_covered == 0); DBUG_ASSERT(page % bitmap->pages_covered == 0);
bitmap->page= page; bitmap->page= page;
if (position >= share->state.state.data_file_length) if (end_of_page > share->state.state.data_file_length)
{ {
share->state.state.data_file_length= position + bitmap->block_size; /*
Inexistent or half-created page (could be crash in the middle of
_ma_bitmap_create_first(), before appending maria_bitmap_marker).
*/
share->state.state.data_file_length= end_of_page;
bzero(bitmap->map, bitmap->block_size); bzero(bitmap->map, bitmap->block_size);
memcpy(bitmap->map + bitmap->block_size - sizeof(maria_bitmap_marker), memcpy(bitmap->map + bitmap->block_size - sizeof(maria_bitmap_marker),
maria_bitmap_marker, sizeof(maria_bitmap_marker)); maria_bitmap_marker, sizeof(maria_bitmap_marker));
...@@ -2047,7 +2051,8 @@ int _ma_bitmap_create_first(MARIA_SHARE *share) ...@@ -2047,7 +2051,8 @@ int _ma_bitmap_create_first(MARIA_SHARE *share)
{ {
uint block_size= share->bitmap.block_size; uint block_size= share->bitmap.block_size;
File file= share->bitmap.file.file; File file= share->bitmap.file.file;
if (my_chsize(file, block_size, 0, MYF(MY_WME)) || if (my_chsize(file, block_size - sizeof(maria_bitmap_marker),
0, MYF(MY_WME)) ||
my_pwrite(file, maria_bitmap_marker, sizeof(maria_bitmap_marker), my_pwrite(file, maria_bitmap_marker, sizeof(maria_bitmap_marker),
block_size - sizeof(maria_bitmap_marker), block_size - sizeof(maria_bitmap_marker),
MYF(MY_NABP | MY_WME))) MYF(MY_NABP | MY_WME)))
......
...@@ -4163,9 +4163,6 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn, ...@@ -4163,9 +4163,6 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn,
empty_space= (block_size - PAGE_OVERHEAD_SIZE); empty_space= (block_size - PAGE_OVERHEAD_SIZE);
rec_offset= PAGE_HEADER_SIZE; rec_offset= PAGE_HEADER_SIZE;
dir= buff+ block_size - PAGE_SUFFIX_SIZE - DIR_ENTRY_SIZE; dir= buff+ block_size - PAGE_SUFFIX_SIZE - DIR_ENTRY_SIZE;
/* Update that file is extended */
info->state->data_file_length= (page + 1) * info->s->block_size;
} }
else else
{ {
...@@ -4302,6 +4299,16 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn, ...@@ -4302,6 +4299,16 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn,
if (_ma_bitmap_set(info, page, page_type == HEAD_PAGE, empty_space)) if (_ma_bitmap_set(info, page, page_type == HEAD_PAGE, empty_space))
DBUG_RETURN(my_errno); DBUG_RETURN(my_errno);
/*
Data page and bitmap page are in place, we can update data_file_length in
case we extended the file. We could not do it earlier: bitmap code tests
data_file_length to know if it has to create a new page or not.
*/
{
my_off_t end_of_page= (page + 1) * info->s->block_size;
set_if_bigger(info->state->data_file_length, end_of_page);
}
DBUG_RETURN(0); DBUG_RETURN(0);
err: err:
......
...@@ -2046,15 +2046,8 @@ int maria_repair(HA_CHECK *param, register MARIA_HA *info, ...@@ -2046,15 +2046,8 @@ int maria_repair(HA_CHECK *param, register MARIA_HA *info,
goto err; goto err;
} }
_ma_reset_status(sort_info.new_info); _ma_reset_status(sort_info.new_info);
#ifdef ASK_MONTY /* cf maria_create() */
/**
@todo ASK_MONTY
without this call, a REPAIR on an empty table leaves the data file of
size 0, which sounds reasonable.
*/
if (_ma_initialize_data_file(sort_info.new_info->s, new_file)) if (_ma_initialize_data_file(sort_info.new_info->s, new_file))
goto err; goto err;
#endif
block_record= 1; block_record= 1;
} }
} }
......
...@@ -41,6 +41,10 @@ ...@@ -41,6 +41,10 @@
#define CONTROL_FILE_SIZE (CONTROL_FILE_FILENO_OFFSET + CONTROL_FILE_FILENO_SIZE) #define CONTROL_FILE_SIZE (CONTROL_FILE_FILENO_OFFSET + CONTROL_FILE_FILENO_SIZE)
/* This module owns these two vars. */ /* This module owns these two vars. */
/**
This LSN serves for the two-checkpoint rule, and also to find the
checkpoint record when doing a recovery.
*/
LSN last_checkpoint_lsn= LSN_IMPOSSIBLE; LSN last_checkpoint_lsn= LSN_IMPOSSIBLE;
uint32 last_logno= FILENO_IMPOSSIBLE; uint32 last_logno= FILENO_IMPOSSIBLE;
...@@ -68,8 +72,6 @@ static int control_file_fd= -1; ...@@ -68,8 +72,6 @@ static int control_file_fd= -1;
the last_checkpoint_lsn and last_logno global variables. the last_checkpoint_lsn and last_logno global variables.
Called at engine's start. Called at engine's start.
@param create_if_missing
@note @note
The format of the control file is: The format of the control file is:
4 bytes: magic string 4 bytes: magic string
...@@ -78,11 +80,13 @@ static int control_file_fd= -1; ...@@ -78,11 +80,13 @@ static int control_file_fd= -1;
4 bytes: offset in log where last checkpoint is 4 bytes: offset in log where last checkpoint is
4 bytes: number of last log 4 bytes: number of last log
@note If in recovery, file is not created
@return Operation status @return Operation status
@retval 0 OK @retval 0 OK
@retval 1 Error (in which case the file is left closed) @retval 1 Error (in which case the file is left closed)
*/ */
CONTROL_FILE_ERROR ma_control_file_create_or_open(my_bool create_if_missing) CONTROL_FILE_ERROR ma_control_file_create_or_open()
{ {
char buffer[CONTROL_FILE_SIZE]; char buffer[CONTROL_FILE_SIZE];
char name[FN_REFLEN]; char name[FN_REFLEN];
...@@ -111,7 +115,8 @@ CONTROL_FILE_ERROR ma_control_file_create_or_open(my_bool create_if_missing) ...@@ -111,7 +115,8 @@ CONTROL_FILE_ERROR ma_control_file_create_or_open(my_bool create_if_missing)
if (create_file) if (create_file)
{ {
if (!create_if_missing) /* in a recovery, we expect to find a control file */
if (maria_in_recovery)
DBUG_RETURN(CONTROL_FILE_MISSING); DBUG_RETURN(CONTROL_FILE_MISSING);
if ((control_file_fd= my_create(name, 0, if ((control_file_fd= my_create(name, 0,
open_flags, MYF(MY_SYNC_DIR))) < 0) open_flags, MYF(MY_SYNC_DIR))) < 0)
......
...@@ -61,7 +61,7 @@ extern "C" { ...@@ -61,7 +61,7 @@ extern "C" {
If present, reads it to find out last checkpoint's LSN and last log. If present, reads it to find out last checkpoint's LSN and last log.
Called at engine's start. Called at engine's start.
*/ */
CONTROL_FILE_ERROR ma_control_file_create_or_open(my_bool); CONTROL_FILE_ERROR ma_control_file_create_or_open();
/* /*
Write information durably to the control file. Write information durably to the control file.
Called when we have created a new log (after syncing this log's creation) Called when we have created a new log (after syncing this log's creation)
......
...@@ -664,6 +664,14 @@ int maria_create(const char *name, enum data_file_type datafile_type, ...@@ -664,6 +664,14 @@ int maria_create(const char *name, enum data_file_type datafile_type,
share.base.keystart = share.state.state.key_file_length= share.base.keystart = share.state.state.key_file_length=
MY_ALIGN(info_length, maria_block_size); MY_ALIGN(info_length, maria_block_size);
if (share.data_file_type == BLOCK_RECORD)
{
/*
we are going to create a first bitmap page, set data_file_length
to reflect this, before the state goes to disk
*/
share.state.state.data_file_length= maria_block_size;
}
share.base.max_key_block_length= maria_block_size; share.base.max_key_block_length= maria_block_size;
share.base.max_key_length=ALIGN_SIZE(max_key_length+4); share.base.max_key_length=ALIGN_SIZE(max_key_length+4);
share.base.records=ci->max_rows; share.base.records=ci->max_rows;
...@@ -1041,36 +1049,8 @@ int maria_create(const char *name, enum data_file_type datafile_type, ...@@ -1041,36 +1049,8 @@ int maria_create(const char *name, enum data_file_type datafile_type,
goto err; goto err;
errpos=3; errpos=3;
/**
@todo ASK_MONTY
QQ: this sets data_file_length from 0 to 8192, but we wrote the state
already to the index file (because:
- log record is built from index header so state must be written before
log record
- data file must be created after log record, so that "missing log
record" implies "unusable table").
When we wrote the state, we hadn't called ma_initialize_data_file(), so
the data_file_length is 0!
Thus, we below create a 8192-byte data file, but its recorded size is 0,
so next time we read the bitmap (a maria_write() for example) we'll
overwrite the bitmap we just created below.
It's not very efficient.
It also makes maria_chk_size() print
Size of datafile is: 8192 Should be: 0
on a freshly created table (run "check.test" with a Maria table).
Why do we absolutely want to create a 8192-byte page for a freshly
created, empty table? Why don't we leave the data file empty?
Removing the call below at least removes the maria_chk_size() issue.
Monty wrote on IRC, about a size of 0:
"This basically ok; The first block is a bitmap that may or may not
exists", but later he asked that the first block always exists.???
*/
#ifdef ASK_MONTY
if (_ma_initialize_data_file(&share, dfile)) if (_ma_initialize_data_file(&share, dfile))
goto err; goto err;
#endif
} }
/* Enlarge files */ /* Enlarge files */
......
...@@ -5636,7 +5636,7 @@ static my_bool write_hook_for_redo(enum translog_record_type type ...@@ -5636,7 +5636,7 @@ static my_bool write_hook_for_redo(enum translog_record_type type
non-transactional log records (REPAIR, CREATE, RENAME, DROP) should not non-transactional log records (REPAIR, CREATE, RENAME, DROP) should not
call this hook; we trust them but verify ;) call this hook; we trust them but verify ;)
*/ */
DBUG_ASSERT(!(maria_multi_threaded && (trn->trid == 0))); DBUG_ASSERT(trn->trid != 0);
/* /*
If the hook stays so simple, it would be faster to pass If the hook stays so simple, it would be faster to pass
!trn->rec_lsn ? trn->rec_lsn : some_dummy_lsn !trn->rec_lsn ? trn->rec_lsn : some_dummy_lsn
...@@ -5665,7 +5665,7 @@ static my_bool write_hook_for_undo(enum translog_record_type type ...@@ -5665,7 +5665,7 @@ static my_bool write_hook_for_undo(enum translog_record_type type
struct st_translog_parts *parts struct st_translog_parts *parts
__attribute__ ((unused))) __attribute__ ((unused)))
{ {
DBUG_ASSERT(!(maria_multi_threaded && (trn->trid == 0))); DBUG_ASSERT(trn->trid != 0);
trn->undo_lsn= *lsn; trn->undo_lsn= *lsn;
if (unlikely(LSN_WITH_FLAGS_TO_LSN(trn->first_undo_lsn) == 0)) if (unlikely(LSN_WITH_FLAGS_TO_LSN(trn->first_undo_lsn) == 0))
trn->first_undo_lsn= trn->first_undo_lsn=
...@@ -5778,6 +5778,17 @@ void translog_deassign_id_from_share(MARIA_SHARE *share) ...@@ -5778,6 +5778,17 @@ void translog_deassign_id_from_share(MARIA_SHARE *share)
} }
void translog_assign_id_to_share_from_recovery(MARIA_SHARE *share,
uint16 id)
{
DBUG_ASSERT(maria_in_recovery && !maria_multi_threaded);
DBUG_ASSERT(share->data_file_type == BLOCK_RECORD);
DBUG_ASSERT(share->id == 0);
DBUG_ASSERT(id_to_share[id] == NULL);
id_to_share[share->id= id]= share;
}
/** /**
@brief returns the LSN of the first record starting in this log @brief returns the LSN of the first record starting in this log
......
...@@ -257,6 +257,9 @@ extern TRANSLOG_ADDRESS translog_get_horizon(); ...@@ -257,6 +257,9 @@ extern TRANSLOG_ADDRESS translog_get_horizon();
extern int translog_assign_id_to_share(struct st_maria_share *share, extern int translog_assign_id_to_share(struct st_maria_share *share,
struct st_transaction *trn); struct st_transaction *trn);
extern void translog_deassign_id_from_share(struct st_maria_share *share); extern void translog_deassign_id_from_share(struct st_maria_share *share);
extern void
translog_assign_id_to_share_from_recovery(struct st_maria_share *share,
uint16 id);
extern my_bool translog_inited; extern my_bool translog_inited;
/* /*
......
...@@ -82,6 +82,9 @@ typedef LSN LSN_WITH_FLAGS; ...@@ -82,6 +82,9 @@ typedef LSN LSN_WITH_FLAGS;
#define LOG_OFFSET_IMPOSSIBLE 0 /**< log always has a header */ #define LOG_OFFSET_IMPOSSIBLE 0 /**< log always has a header */
#define LSN_IMPOSSIBLE 0 #define LSN_IMPOSSIBLE 0
/** @brief some impossible LSN serve as markers */
#define LSN_REPAIRED_BY_MARIA_CHK ((LSN)1)
/** /**
@brief the maximum valid LSN. @brief the maximum valid LSN.
Unlike ULONGLONG_MAX, it can be safely used in comparison with valid LSNs Unlike ULONGLONG_MAX, it can be safely used in comparison with valid LSNs
......
...@@ -171,7 +171,8 @@ static MARIA_HA *maria_clone_internal(MARIA_SHARE *share, int mode, ...@@ -171,7 +171,8 @@ static MARIA_HA *maria_clone_internal(MARIA_SHARE *share, int mode,
share->delay_key_write=1; share->delay_key_write=1;
info.state= &share->state.state; /* Change global values by default */ info.state= &share->state.state; /* Change global values by default */
info.trn= &dummy_transaction_object; if (!share->base.born_transactional) /* but for transactional ones ... */
info.trn= &dummy_transaction_object; /* ... force crash if no trn given */
pthread_mutex_unlock(&share->intern_lock); pthread_mutex_unlock(&share->intern_lock);
/* Allocate buffer for one record */ /* Allocate buffer for one record */
...@@ -601,17 +602,32 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags) ...@@ -601,17 +602,32 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags)
{ {
share->page_type= PAGECACHE_LSN_PAGE; share->page_type= PAGECACHE_LSN_PAGE;
share->base_length+= TRANS_ROW_EXTRA_HEADER_SIZE; share->base_length+= TRANS_ROW_EXTRA_HEADER_SIZE;
if (unlikely((share->state.create_rename_lsn == (LSN)ULONGLONG_MAX) && if (share->state.create_rename_lsn == LSN_REPAIRED_BY_MARIA_CHK)
(open_flags & HA_OPEN_FROM_SQL_LAYER)))
{ {
/* /*
This table was repaired with maria_chk. Past log records should be Was repaired with maria_chk, maybe later maria_pack-ed. Some sort of
ignored, future log records should not: we define the present. import into the server. It starts its existence (from the point of
view of the server, including server's recovery) now.
*/ */
if ((open_flags & HA_OPEN_FROM_SQL_LAYER) || maria_in_recovery)
{
share->state.create_rename_lsn= translog_get_horizon(); share->state.create_rename_lsn= translog_get_horizon();
_ma_update_create_rename_lsn_on_disk(share, TRUE); _ma_update_create_rename_lsn_on_disk(share, TRUE);
} }
} }
else if (!LSN_VALID(share->state.create_rename_lsn) &&
!(open_flags & HA_OPEN_FOR_REPAIR))
{
/*
If in Recovery, it will not work. If LSN is invalid and not
LSN_REPAIRED_BY_MARIA_CHK, header must be corrupted.
In both cases, must repair.
*/
my_errno=((share->state.changed & STATE_CRASHED_ON_REPAIR) ?
HA_ERR_CRASHED_ON_REPAIR : HA_ERR_CRASHED_ON_USAGE);
goto err;
}
}
else else
share->page_type= PAGECACHE_PLAIN_PAGE; share->page_type= PAGECACHE_PLAIN_PAGE;
share->now_transactional= share->base.born_transactional; share->now_transactional= share->base.born_transactional;
...@@ -699,6 +715,14 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags) ...@@ -699,6 +715,14 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags)
{ {
share->lock.get_status=_ma_get_status; share->lock.get_status=_ma_get_status;
share->lock.copy_status=_ma_copy_status; share->lock.copy_status=_ma_copy_status;
/**
@todo RECOVERY
INSERT DELAYED and concurrent inserts are currently disabled for
transactional tables; when enabled again, we should re-evaluate
what problems the call to _ma_update_status() by
thr_reschedule_write_lock() can do (it may hurt Checkpoint as it
would be without intern_lock, and it modifies the state).
*/
share->lock.update_status=_ma_update_status; share->lock.update_status=_ma_update_status;
share->lock.restore_status=_ma_restore_status; share->lock.restore_status=_ma_restore_status;
share->lock.check_status=_ma_check_status; share->lock.check_status=_ma_check_status;
...@@ -958,6 +982,7 @@ uint _ma_state_info_write(File file, MARIA_STATE_INFO *state, uint pWrite) ...@@ -958,6 +982,7 @@ uint _ma_state_info_write(File file, MARIA_STATE_INFO *state, uint pWrite)
uchar buff[MARIA_STATE_INFO_SIZE + MARIA_STATE_EXTRA_SIZE]; uchar buff[MARIA_STATE_INFO_SIZE + MARIA_STATE_EXTRA_SIZE];
uchar *ptr=buff; uchar *ptr=buff;
uint i, keys= (uint) state->header.keys; uint i, keys= (uint) state->header.keys;
size_t res;
DBUG_ENTER("_ma_state_info_write"); DBUG_ENTER("_ma_state_info_write");
memcpy_fixed(ptr,&state->header,sizeof(state->header)); memcpy_fixed(ptr,&state->header,sizeof(state->header));
...@@ -1013,11 +1038,12 @@ uint _ma_state_info_write(File file, MARIA_STATE_INFO *state, uint pWrite) ...@@ -1013,11 +1038,12 @@ uint _ma_state_info_write(File file, MARIA_STATE_INFO *state, uint pWrite)
} }
} }
if (pWrite & 1) res= (pWrite & 1) ?
DBUG_RETURN(my_pwrite(file, buff, (size_t) (ptr-buff), 0L, my_pwrite(file, buff, (size_t) (ptr-buff), 0L,
MYF(MY_NABP | MY_THREADSAFE)) != 0); MYF(MY_NABP | MY_THREADSAFE)) :
DBUG_RETURN(my_write(file, buff, (size_t) (ptr-buff), my_write(file, buff, (size_t) (ptr-buff),
MYF(MY_NABP)) != 0); MYF(MY_NABP));
DBUG_RETURN(res != 0);
} }
...@@ -1072,6 +1098,16 @@ uchar *_ma_state_info_read(uchar *ptr, MARIA_STATE_INFO *state) ...@@ -1072,6 +1098,16 @@ uchar *_ma_state_info_read(uchar *ptr, MARIA_STATE_INFO *state)
} }
/**
@brief Fills the state by reading its copy on disk.
@note Does nothing in single user mode.
@param file file to read from
@param state state which will be filled
@param pRead if true, use my_pread(), otherwise my_read()
*/
uint _ma_state_info_read_dsk(File file, MARIA_STATE_INFO *state, my_bool pRead) uint _ma_state_info_read_dsk(File file, MARIA_STATE_INFO *state, my_bool pRead)
{ {
char buff[MARIA_STATE_INFO_SIZE + MARIA_STATE_EXTRA_SIZE]; char buff[MARIA_STATE_INFO_SIZE + MARIA_STATE_EXTRA_SIZE];
......
This diff is collapsed.
...@@ -25,5 +25,6 @@ ...@@ -25,5 +25,6 @@
C_MODE_START C_MODE_START
int maria_recover(); int maria_recover();
int maria_apply_log(LSN lsn, my_bool applyn, FILE *trace_file); int maria_apply_log(LSN lsn, my_bool apply, FILE *trace_file,
my_bool execute_undo_phase);
C_MODE_END C_MODE_END
...@@ -1034,7 +1034,7 @@ static int maria_chk(HA_CHECK *param, char *filename) ...@@ -1034,7 +1034,7 @@ static int maria_chk(HA_CHECK *param, char *filename)
that it will have to find and store it. that it will have to find and store it.
*/ */
if (share->base.born_transactional) if (share->base.born_transactional)
share->state.create_rename_lsn= (LSN)ULONGLONG_MAX; share->state.create_rename_lsn= LSN_REPAIRED_BY_MARIA_CHK;
if ((param->testflag & (T_REP_BY_SORT | T_REP_PARALLEL)) && if ((param->testflag & (T_REP_BY_SORT | T_REP_PARALLEL)) &&
(maria_is_any_key_active(share->state.key_map) || (maria_is_any_key_active(share->state.key_map) ||
(rep_quick && !param->keys_in_use && !recreate)) && (rep_quick && !param->keys_in_use && !recreate)) &&
......
...@@ -51,7 +51,7 @@ int main(int argc, char **argv) ...@@ -51,7 +51,7 @@ int main(int argc, char **argv)
goto err; goto err;
} }
/* we don't want to create a control file, it MUST exist */ /* we don't want to create a control file, it MUST exist */
if (ma_control_file_create_or_open(FALSE)) if (ma_control_file_create_or_open())
{ {
fprintf(stderr, "Can't open control file (%d)\n", errno); fprintf(stderr, "Can't open control file (%d)\n", errno);
goto err; goto err;
...@@ -88,7 +88,8 @@ int main(int argc, char **argv) ...@@ -88,7 +88,8 @@ int main(int argc, char **argv)
lsn= first_lsn_in_log(); /* LSN could be also --start-from-lsn=# */ lsn= first_lsn_in_log(); /* LSN could be also --start-from-lsn=# */
fprintf(stdout, "TRACE of the last maria_read_log\n"); fprintf(stdout, "TRACE of the last maria_read_log\n");
if (maria_apply_log(lsn, opt_display_and_apply, stdout)) /* Until we have UNDO records, no UNDO phase */
if (maria_apply_log(lsn, opt_display_and_apply, stdout, FALSE))
goto err; goto err;
fprintf(stdout, "%s: SUCCESS\n", my_progname); fprintf(stdout, "%s: SUCCESS\n", my_progname);
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include <my_sys.h> #include <my_sys.h>
#include <m_string.h> #include <m_string.h>
#include "trnman.h" #include "trnman.h"
#include "ma_control_file.h"
/* /*
status variables: status variables:
...@@ -708,3 +709,29 @@ end: ...@@ -708,3 +709,29 @@ end:
pthread_mutex_unlock(&LOCK_trn_list); pthread_mutex_unlock(&LOCK_trn_list);
DBUG_RETURN(error); DBUG_RETURN(error);
} }
TRN *trnman_recreate_trn_from_recovery(uint16 shortid, TrID longid)
{
TrID old_trid_generator= global_trid_generator;
TRN *trn;
DBUG_ASSERT(maria_in_recovery && !maria_multi_threaded);
if (unlikely((trn= trnman_new_trn(NULL, NULL, NULL)) == NULL))
return NULL;
/* deallocate excessive allocations of trnman_new_trn() */
global_trid_generator= old_trid_generator;
set_if_bigger(global_trid_generator, longid);
short_trid_to_active_trn[trn->short_id]= 0;
DBUG_ASSERT(short_trid_to_active_trn[shortid] == NULL);
short_trid_to_active_trn[shortid]= trn;
trn->trid= longid;
trn->short_id= shortid;
return trn;
}
TRN *trnman_get_any_trn()
{
TRN *trn= active_list_min.next;
return (trn != &active_list_max) ? trn : NULL;
}
...@@ -53,6 +53,8 @@ uint trnman_increment_locked_tables(TRN *trn); ...@@ -53,6 +53,8 @@ uint trnman_increment_locked_tables(TRN *trn);
uint trnman_decrement_locked_tables(TRN *trn); uint trnman_decrement_locked_tables(TRN *trn);
my_bool trnman_has_locked_tables(TRN *trn); my_bool trnman_has_locked_tables(TRN *trn);
void trnman_reset_locked_tables(TRN *trn); void trnman_reset_locked_tables(TRN *trn);
TRN *trnman_recreate_trn_from_recovery(uint16 shortid, TrID longid);
TRN *trnman_get_any_trn();
C_MODE_END C_MODE_END
#endif #endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment