Commit e251add4 authored by unknown's avatar unknown

Merge gbichot@bk-internal.mysql.com:/home/bk/mysql-maria

into  gbichot4.local:/home/mysql_src/mysql-maria-for-undo-phase


storage/maria/ha_maria.cc:
  Auto merged
storage/maria/ma_blockrec.c:
  Auto merged
storage/maria/ma_loghandler.c:
  Auto merged
storage/maria/ma_loghandler.h:
  Auto merged
storage/maria/ma_loghandler_lsn.h:
  Auto merged
storage/maria/maria_chk.c:
  Auto merged
storage/maria/maria_read_log.c:
  Auto merged
parents 6302357d da153a3b
......@@ -22,6 +22,16 @@
/* My memory re allocator */
/**
@brief wrapper around realloc()
@param oldpoint pointer to currently allocated area
@param size new size requested, must be >0
@param my_flags flags
@note if size==0 realloc() may return NULL; my_realloc() treats this as an
error which is not the intention of realloc()
*/
void* my_realloc(void* oldpoint, size_t size, myf my_flags)
{
void *point;
......@@ -29,6 +39,7 @@ void* my_realloc(void* oldpoint, size_t size, myf my_flags)
DBUG_PRINT("my",("ptr: 0x%lx size: %lu my_flags: %d", (long) oldpoint,
(ulong) size, my_flags));
DBUG_ASSERT(size > 0);
if (!oldpoint && (my_flags & MY_ALLOW_ZERO_PTR))
DBUG_RETURN(my_malloc(size,my_flags));
#ifdef USE_HALLOC
......
......@@ -3418,6 +3418,17 @@ server.");
using_update_log=1;
}
/* call ha_init_key_cache() on all key caches to init them */
process_key_caches(&ha_init_key_cache);
/*
Maria's pagecache needs to be ready before Maria engine (Recovery uses
pagecache, and Checkpoint may happen at startup). Maria engine is taken up
in plugin_init().
*/
#ifdef WITH_MARIA_STORAGE_ENGINE
process_pagecaches(&ha_init_pagecache);
#endif /* WITH_MARIA_STORAGE_ENGINE */
/* Allow storage engine to give real error messages */
if (ha_init_errors())
DBUG_RETURN(1);
......@@ -3588,12 +3599,6 @@ server.");
if (opt_myisam_log)
(void) mi_log(1);
/* call ha_init_key_cache() on all key caches to init them */
process_key_caches(&ha_init_key_cache);
#ifdef WITH_MARIA_STORAGE_ENGINE
process_pagecaches(&ha_init_pagecache);
#endif /* WITH_MARIA_STORAGE_ENGINE */
#if defined(HAVE_MLOCKALL) && defined(MCL_CURRENT) && !defined(EMBEDDED_LIBRARY)
if (locked_in_memory && !getuid())
{
......
......@@ -33,6 +33,10 @@ C_MODE_START
#include "ma_blockrec.h"
C_MODE_END
/*
Note that in future versions, only *transactional* Maria tables can
rollback, so this flag should be up or down conditionally.
*/
#define MARIA_CANNOT_ROLLBACK HA_NO_TRANSACTIONS
#ifdef MARIA_CANNOT_ROLLBACK
#define trans_register_ha(A, B, C) do { /* nothing */ } while(0)
......@@ -2385,7 +2389,7 @@ static int ha_maria_init(void *p)
maria_hton->flags= HTON_CAN_RECREATE | HTON_SUPPORT_LOG_TABLES;
bzero(maria_log_pagecache, sizeof(*maria_log_pagecache));
maria_data_root= mysql_real_data_home;
res= maria_init() || ma_control_file_create_or_open(TRUE) ||
res= maria_init() || ma_control_file_create_or_open() ||
(init_pagecache(maria_log_pagecache,
TRANSLOG_PAGECACHE_SIZE, 0, 0,
TRANSLOG_PAGE_SIZE) == 0) ||
......
......@@ -512,15 +512,19 @@ static my_bool _ma_read_bitmap_page(MARIA_SHARE *share,
MARIA_FILE_BITMAP *bitmap,
ulonglong page)
{
my_off_t position= page * bitmap->block_size;
my_off_t end_of_page= (page + 1) * bitmap->block_size;
my_bool res;
DBUG_ENTER("_ma_read_bitmap_page");
DBUG_ASSERT(page % bitmap->pages_covered == 0);
bitmap->page= page;
if (position >= share->state.state.data_file_length)
if (end_of_page > share->state.state.data_file_length)
{
share->state.state.data_file_length= position + bitmap->block_size;
/*
Inexistent or half-created page (could be crash in the middle of
_ma_bitmap_create_first(), before appending maria_bitmap_marker).
*/
share->state.state.data_file_length= end_of_page;
bzero(bitmap->map, bitmap->block_size);
memcpy(bitmap->map + bitmap->block_size - sizeof(maria_bitmap_marker),
maria_bitmap_marker, sizeof(maria_bitmap_marker));
......@@ -2047,7 +2051,8 @@ int _ma_bitmap_create_first(MARIA_SHARE *share)
{
uint block_size= share->bitmap.block_size;
File file= share->bitmap.file.file;
if (my_chsize(file, block_size, 0, MYF(MY_WME)) ||
if (my_chsize(file, block_size - sizeof(maria_bitmap_marker),
0, MYF(MY_WME)) ||
my_pwrite(file, maria_bitmap_marker, sizeof(maria_bitmap_marker),
block_size - sizeof(maria_bitmap_marker),
MYF(MY_NABP | MY_WME)))
......
......@@ -4178,9 +4178,6 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn,
empty_space= (block_size - PAGE_OVERHEAD_SIZE);
rec_offset= PAGE_HEADER_SIZE;
dir= buff+ block_size - PAGE_SUFFIX_SIZE - DIR_ENTRY_SIZE;
/* Update that file is extended */
info->state->data_file_length= (page + 1) * info->s->block_size;
}
else
{
......@@ -4304,6 +4301,16 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn,
if (_ma_bitmap_set(info, page, page_type == HEAD_PAGE, empty_space))
DBUG_RETURN(my_errno);
/*
Data page and bitmap page are in place, we can update data_file_length in
case we extended the file. We could not do it earlier: bitmap code tests
data_file_length to know if it has to create a new page or not.
*/
{
my_off_t end_of_page= (page + 1) * info->s->block_size;
set_if_bigger(info->state->data_file_length, end_of_page);
}
DBUG_RETURN(0);
err:
......
......@@ -2046,15 +2046,8 @@ int maria_repair(HA_CHECK *param, register MARIA_HA *info,
goto err;
}
_ma_reset_status(sort_info.new_info);
#ifdef ASK_MONTY /* cf maria_create() */
/**
@todo ASK_MONTY
without this call, a REPAIR on an empty table leaves the data file of
size 0, which sounds reasonable.
*/
if (_ma_initialize_data_file(sort_info.new_info->s, new_file))
goto err;
#endif
block_record= 1;
}
}
......
......@@ -41,6 +41,10 @@
#define CONTROL_FILE_SIZE (CONTROL_FILE_FILENO_OFFSET + CONTROL_FILE_FILENO_SIZE)
/* This module owns these two vars. */
/**
This LSN serves for the two-checkpoint rule, and also to find the
checkpoint record when doing a recovery.
*/
LSN last_checkpoint_lsn= LSN_IMPOSSIBLE;
uint32 last_logno= FILENO_IMPOSSIBLE;
......@@ -68,8 +72,6 @@ static int control_file_fd= -1;
the last_checkpoint_lsn and last_logno global variables.
Called at engine's start.
@param create_if_missing
@note
The format of the control file is:
4 bytes: magic string
......@@ -78,11 +80,13 @@ static int control_file_fd= -1;
4 bytes: offset in log where last checkpoint is
4 bytes: number of last log
@note If in recovery, file is not created
@return Operation status
@retval 0 OK
@retval 1 Error (in which case the file is left closed)
*/
CONTROL_FILE_ERROR ma_control_file_create_or_open(my_bool create_if_missing)
CONTROL_FILE_ERROR ma_control_file_create_or_open()
{
char buffer[CONTROL_FILE_SIZE];
char name[FN_REFLEN];
......@@ -111,7 +115,8 @@ CONTROL_FILE_ERROR ma_control_file_create_or_open(my_bool create_if_missing)
if (create_file)
{
if (!create_if_missing)
/* in a recovery, we expect to find a control file */
if (maria_in_recovery)
DBUG_RETURN(CONTROL_FILE_MISSING);
if ((control_file_fd= my_create(name, 0,
open_flags, MYF(MY_SYNC_DIR))) < 0)
......
......@@ -61,7 +61,7 @@ extern "C" {
If present, reads it to find out last checkpoint's LSN and last log.
Called at engine's start.
*/
CONTROL_FILE_ERROR ma_control_file_create_or_open(my_bool);
CONTROL_FILE_ERROR ma_control_file_create_or_open();
/*
Write information durably to the control file.
Called when we have created a new log (after syncing this log's creation)
......
......@@ -664,6 +664,14 @@ int maria_create(const char *name, enum data_file_type datafile_type,
share.base.keystart = share.state.state.key_file_length=
MY_ALIGN(info_length, maria_block_size);
if (share.data_file_type == BLOCK_RECORD)
{
/*
we are going to create a first bitmap page, set data_file_length
to reflect this, before the state goes to disk
*/
share.state.state.data_file_length= maria_block_size;
}
share.base.max_key_block_length= maria_block_size;
share.base.max_key_length=ALIGN_SIZE(max_key_length+4);
share.base.records=ci->max_rows;
......@@ -1041,36 +1049,8 @@ int maria_create(const char *name, enum data_file_type datafile_type,
goto err;
errpos=3;
/**
@todo ASK_MONTY
QQ: this sets data_file_length from 0 to 8192, but we wrote the state
already to the index file (because:
- log record is built from index header so state must be written before
log record
- data file must be created after log record, so that "missing log
record" implies "unusable table").
When we wrote the state, we hadn't called ma_initialize_data_file(), so
the data_file_length is 0!
Thus, we below create a 8192-byte data file, but its recorded size is 0,
so next time we read the bitmap (a maria_write() for example) we'll
overwrite the bitmap we just created below.
It's not very efficient.
It also makes maria_chk_size() print
Size of datafile is: 8192 Should be: 0
on a freshly created table (run "check.test" with a Maria table).
Why do we absolutely want to create a 8192-byte page for a freshly
created, empty table? Why don't we leave the data file empty?
Removing the call below at least removes the maria_chk_size() issue.
Monty wrote on IRC, about a size of 0:
"This basically ok; The first block is a bitmap that may or may not
exists", but later he asked that the first block always exists.???
*/
#ifdef ASK_MONTY
if (_ma_initialize_data_file(&share, dfile))
goto err;
#endif
}
/* Enlarge files */
......
......@@ -6158,7 +6158,7 @@ static my_bool write_hook_for_redo(enum translog_record_type type
non-transactional log records (REPAIR, CREATE, RENAME, DROP) should not
call this hook; we trust them but verify ;)
*/
DBUG_ASSERT(!(maria_multi_threaded && (trn->trid == 0)));
DBUG_ASSERT(trn->trid != 0);
/*
If the hook stays so simple, it would be faster to pass
!trn->rec_lsn ? trn->rec_lsn : some_dummy_lsn
......@@ -6187,7 +6187,7 @@ static my_bool write_hook_for_undo(enum translog_record_type type
struct st_translog_parts *parts
__attribute__ ((unused)))
{
DBUG_ASSERT(!(maria_multi_threaded && (trn->trid == 0)));
DBUG_ASSERT(trn->trid != 0);
trn->undo_lsn= *lsn;
if (unlikely(LSN_WITH_FLAGS_TO_LSN(trn->first_undo_lsn) == 0))
trn->first_undo_lsn=
......@@ -6300,6 +6300,17 @@ void translog_deassign_id_from_share(MARIA_SHARE *share)
}
void translog_assign_id_to_share_from_recovery(MARIA_SHARE *share,
uint16 id)
{
DBUG_ASSERT(maria_in_recovery && !maria_multi_threaded);
DBUG_ASSERT(share->data_file_type == BLOCK_RECORD);
DBUG_ASSERT(share->id == 0);
DBUG_ASSERT(id_to_share[id] == NULL);
id_to_share[share->id= id]= share;
}
/**
@brief check if such log file exists
......
......@@ -258,6 +258,9 @@ extern TRANSLOG_ADDRESS translog_get_horizon();
extern int translog_assign_id_to_share(struct st_maria_share *share,
struct st_transaction *trn);
extern void translog_deassign_id_from_share(struct st_maria_share *share);
extern void
translog_assign_id_to_share_from_recovery(struct st_maria_share *share,
uint16 id);
extern my_bool translog_inited;
/*
......
......@@ -84,6 +84,9 @@ typedef LSN LSN_WITH_FLAGS;
/* following LSN also is impossible */
#define LSN_ERROR 1
/** @brief some impossible LSN serve as markers */
#define LSN_REPAIRED_BY_MARIA_CHK ((LSN)1)
/**
@brief the maximum valid LSN.
Unlike ULONGLONG_MAX, it can be safely used in comparison with valid LSNs
......
......@@ -171,7 +171,8 @@ static MARIA_HA *maria_clone_internal(MARIA_SHARE *share, int mode,
share->delay_key_write=1;
info.state= &share->state.state; /* Change global values by default */
info.trn= &dummy_transaction_object;
if (!share->base.born_transactional) /* but for transactional ones ... */
info.trn= &dummy_transaction_object; /* ... force crash if no trn given */
pthread_mutex_unlock(&share->intern_lock);
/* Allocate buffer for one record */
......@@ -601,15 +602,30 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags)
{
share->page_type= PAGECACHE_LSN_PAGE;
share->base_length+= TRANS_ROW_EXTRA_HEADER_SIZE;
if (unlikely((share->state.create_rename_lsn == (LSN)ULONGLONG_MAX) &&
(open_flags & HA_OPEN_FROM_SQL_LAYER)))
if (share->state.create_rename_lsn == LSN_REPAIRED_BY_MARIA_CHK)
{
/*
This table was repaired with maria_chk. Past log records should be
ignored, future log records should not: we define the present.
Was repaired with maria_chk, maybe later maria_pack-ed. Some sort of
import into the server. It starts its existence (from the point of
view of the server, including server's recovery) now.
*/
share->state.create_rename_lsn= translog_get_horizon();
_ma_update_create_rename_lsn_on_disk(share, TRUE);
if ((open_flags & HA_OPEN_FROM_SQL_LAYER) || maria_in_recovery)
{
share->state.create_rename_lsn= translog_get_horizon();
_ma_update_create_rename_lsn_on_disk(share, TRUE);
}
}
else if (!LSN_VALID(share->state.create_rename_lsn) &&
!(open_flags & HA_OPEN_FOR_REPAIR))
{
/*
If in Recovery, it will not work. If LSN is invalid and not
LSN_REPAIRED_BY_MARIA_CHK, header must be corrupted.
In both cases, must repair.
*/
my_errno=((share->state.changed & STATE_CRASHED_ON_REPAIR) ?
HA_ERR_CRASHED_ON_REPAIR : HA_ERR_CRASHED_ON_USAGE);
goto err;
}
}
else
......@@ -699,6 +715,14 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags)
{
share->lock.get_status=_ma_get_status;
share->lock.copy_status=_ma_copy_status;
/**
@todo RECOVERY
INSERT DELAYED and concurrent inserts are currently disabled for
transactional tables; when enabled again, we should re-evaluate
what problems the call to _ma_update_status() by
thr_reschedule_write_lock() can do (it may hurt Checkpoint as it
would be without intern_lock, and it modifies the state).
*/
share->lock.update_status=_ma_update_status;
share->lock.restore_status=_ma_restore_status;
share->lock.check_status=_ma_check_status;
......@@ -958,6 +982,7 @@ uint _ma_state_info_write(File file, MARIA_STATE_INFO *state, uint pWrite)
uchar buff[MARIA_STATE_INFO_SIZE + MARIA_STATE_EXTRA_SIZE];
uchar *ptr=buff;
uint i, keys= (uint) state->header.keys;
size_t res;
DBUG_ENTER("_ma_state_info_write");
memcpy_fixed(ptr,&state->header,sizeof(state->header));
......@@ -1013,11 +1038,12 @@ uint _ma_state_info_write(File file, MARIA_STATE_INFO *state, uint pWrite)
}
}
if (pWrite & 1)
DBUG_RETURN(my_pwrite(file, buff, (size_t) (ptr-buff), 0L,
MYF(MY_NABP | MY_THREADSAFE)) != 0);
DBUG_RETURN(my_write(file, buff, (size_t) (ptr-buff),
MYF(MY_NABP)) != 0);
res= (pWrite & 1) ?
my_pwrite(file, buff, (size_t) (ptr-buff), 0L,
MYF(MY_NABP | MY_THREADSAFE)) :
my_write(file, buff, (size_t) (ptr-buff),
MYF(MY_NABP));
DBUG_RETURN(res != 0);
}
......@@ -1072,6 +1098,16 @@ uchar *_ma_state_info_read(uchar *ptr, MARIA_STATE_INFO *state)
}
/**
@brief Fills the state by reading its copy on disk.
@note Does nothing in single user mode.
@param file file to read from
@param state state which will be filled
@param pRead if true, use my_pread(), otherwise my_read()
*/
uint _ma_state_info_read_dsk(File file, MARIA_STATE_INFO *state, my_bool pRead)
{
char buff[MARIA_STATE_INFO_SIZE + MARIA_STATE_EXTRA_SIZE];
......
......@@ -23,25 +23,39 @@
#include "maria_def.h"
#include "ma_recovery.h"
#include "ma_blockrec.h"
#include "trnman.h"
struct TRN_FOR_RECOVERY
struct st_trn_for_recovery /* used only in the REDO phase */
{
LSN group_start_lsn, undo_lsn;
LSN group_start_lsn, undo_lsn, first_undo_lsn;
TrID long_trid;
};
struct st_dirty_page /* used only in the REDO phase */
{
uint64 file_and_page_id;
LSN rec_lsn;
};
struct st_table_for_recovery /* used in the REDO and UNDO phase */
{
MARIA_HA *info;
File org_kfile, org_dfile; /**< OS descriptors when Checkpoint saw table */
};
/* Variables used by all functions of this module. Ok as single-threaded */
static struct TRN_FOR_RECOVERY *all_active_trans;
static MARIA_HA **all_tables;
static LSN current_group_end_lsn;
FILE *tracef; /**< trace file for debugging */
#define prototype_exec_hook(R) \
static int exec_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec)
static struct st_trn_for_recovery *all_active_trans;
static struct st_table_for_recovery *all_tables;
static HASH all_dirty_pages;
static struct st_dirty_page *dirty_pages_pool;
static LSN current_group_end_lsn,
checkpoint_start= LSN_IMPOSSIBLE;
static FILE *tracef; /**< trace file for debugging */
#define prototype_exec_hook(R) \
static int exec_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec)
#define prototype_exec_hook_dummy(R) \
static int exec_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec \
__attribute ((unused)))
prototype_exec_hook(LONG_TRANSACTION_ID);
#ifdef MARIA_CHECKPOINT
prototype_exec_hook(CHECKPOINT);
#endif
prototype_exec_hook_dummy(CHECKPOINT);
prototype_exec_hook(REDO_CREATE_TABLE);
prototype_exec_hook(REDO_DROP_TABLE);
prototype_exec_hook(FILE_ID);
......@@ -53,10 +67,11 @@ prototype_exec_hook(REDO_PURGE_BLOCKS);
prototype_exec_hook(REDO_DELETE_ALL);
prototype_exec_hook(UNDO_ROW_INSERT);
prototype_exec_hook(UNDO_ROW_DELETE);
prototype_exec_hook(UNDO_ROW_UPDATE);
prototype_exec_hook(UNDO_ROW_PURGE);
prototype_exec_hook(COMMIT);
static int end_of_redo_phase();
static int run_redo_phase(LSN lsn, my_bool apply);
static uint end_of_redo_phase(my_bool prepare_for_undo_phase);
static int run_undo_phase(uint unfinished);
static void display_record_position(const LOG_DESC *log_desc,
const TRANSLOG_HEADER_BUFFER *rec,
uint number);
......@@ -66,83 +81,57 @@ static MARIA_HA *get_MARIA_HA_from_REDO_record(const
TRANSLOG_HEADER_BUFFER *rec);
static MARIA_HA *get_MARIA_HA_from_UNDO_record(const
TRANSLOG_HEADER_BUFFER *rec);
static int close_recovered_table(MARIA_HA *info);
static void prepare_table_for_close(MARIA_HA *info, LSN at_lsn);
static int parse_checkpoint_record(LSN lsn);
static void new_transaction(uint16 sid, TrID long_id, LSN undo_lsn,
LSN first_undo_lsn);
static int new_table(uint16 sid, const char *name,
File org_kfile, File org_dfile, LSN lsn);
static int new_page(File fileid, pgcache_page_no_t pageid, LSN rec_lsn,
struct st_dirty_page *dirty_page);
static int close_all_tables();
/** @brief global [out] buffer for translog_read_record(); never shrinks */
static LEX_STRING log_record_buffer;
#define enlarge_buffer(rec) \
if (log_record_buffer.length < rec->record_length) \
if (log_record_buffer.length < (rec)->record_length) \
{ \
log_record_buffer.length= rec->record_length; \
log_record_buffer.length= (rec)->record_length; \
log_record_buffer.str= my_realloc(log_record_buffer.str, \
rec->record_length, MYF(MY_WME)); \
(rec)->record_length, MYF(MY_WME)); \
}
#define ALERT_USER() DBUG_ASSERT(0)
#define LSN_IN_HEX(L) (ulong)LSN_FILE_NO(L),(ulong)LSN_OFFSET(L)
/**
@brief Recovers from the last checkpoint
@brief Recovers from the last checkpoint.
Runs the REDO phase using special structures, then sets up the playground
of runtime: recreates transactions inside trnman, open tables with their
two-byte-id mapping; takes a checkpoint and runs the UNDO phase. Closes all
tables.
*/
int maria_recover()
{
my_bool res= TRUE;
LSN from_lsn;
int res= 1;
FILE *trace_file;
DBUG_ENTER("maria_recover");
DBUG_ASSERT(!maria_in_recovery);
maria_in_recovery= TRUE;
if (last_checkpoint_lsn == LSN_IMPOSSIBLE)
{
from_lsn= translog_first_theoretical_lsn();
/*
as far as we have not yet any checkpoint then the very first
log file should be present.
*/
DBUG_ASSERT(from_lsn != LSN_IMPOSSIBLE);
/*
@todo process eroror of getting checkpoint
if (from_lsn == ERROR_LSN)
...
*/
}
else
{
DBUG_ASSERT(0); /* not yet implemented */
/**
@todo read the checkpoint record, fill structures
and use the minimum of checkpoint_start_lsn, rec_lsn of trns, rec_lsn
of dirty pages.
*/
//from_lsn= something;
}
/*
mysqld has not yet initialized any page cache. Let's create a dedicated
one for recovery.
*/
if ((trace_file= fopen("maria_recovery.trace", "w")))
{
fprintf(trace_file, "TRACE of the last MARIA recovery from mysqld\n");
res= (init_pagecache(maria_pagecache,
/** @todo what size? */
1024*1024,
0, 0,
maria_block_size) == 0) ||
maria_apply_log(from_lsn, TRUE, trace_file);
end_pagecache(maria_pagecache, TRUE);
DBUG_ASSERT(maria_pagecache->inited);
res= maria_apply_log(LSN_IMPOSSIBLE, TRUE, trace_file, TRUE);
if (!res)
fprintf(trace_file, "SUCCESS\n");
fclose(trace_file);
}
/**
@todo take checkpoint if log applying did some work.
Be sure to not checkpoint if no work.
*/
maria_in_recovery= FALSE;
DBUG_RETURN(res);
}
......@@ -151,7 +140,8 @@ int maria_recover()
/**
@brief Displays and/or applies the log
@param lsn LSN from which log reading/applying should start
@param from_lsn LSN from which log reading/applying should start;
LSN_IMPOSSIBLE means "use last checkpoint"
@param apply if log records should be applied or not
@param trace_file trace file where progress/debug messages will go
......@@ -164,190 +154,81 @@ int maria_recover()
@retval !=0 Error
*/
int maria_apply_log(LSN lsn, my_bool apply, FILE *trace_file)
int maria_apply_log(LSN from_lsn, my_bool apply, FILE *trace_file,
my_bool should_run_undo_phase)
{
int error= 0;
DBUG_ENTER("maria_apply_log");
DBUG_ASSERT(apply || !should_run_undo_phase);
DBUG_ASSERT(!maria_multi_threaded);
all_active_trans= (struct TRN_FOR_RECOVERY *)
my_malloc((SHORT_TRID_MAX + 1) * sizeof(struct TRN_FOR_RECOVERY),
all_active_trans= (struct st_trn_for_recovery *)
my_malloc((SHORT_TRID_MAX + 1) * sizeof(struct st_trn_for_recovery),
MYF(MY_ZEROFILL));
all_tables= (struct st_table_for_recovery *)
my_malloc((SHARE_ID_MAX + 1) * sizeof(struct st_table_for_recovery),
MYF(MY_ZEROFILL));
all_tables= (MARIA_HA **)my_malloc((SHARE_ID_MAX + 1) * sizeof(MARIA_HA *),
MYF(MY_ZEROFILL));
if (!all_active_trans || !all_tables)
goto err;
tracef= trace_file;
/* install hooks for execution */
#define install_exec_hook(R) \
log_record_type_descriptor[LOGREC_ ## R].record_execute_in_redo_phase= \
exec_LOGREC_ ## R;
install_exec_hook(LONG_TRANSACTION_ID);
#ifdef MARIA_CHECKPOINT
install_exec_hook(CHECKPOINT);
#endif
install_exec_hook(REDO_CREATE_TABLE);
install_exec_hook(REDO_DROP_TABLE);
install_exec_hook(FILE_ID);
install_exec_hook(REDO_INSERT_ROW_HEAD);
install_exec_hook(REDO_INSERT_ROW_TAIL);
install_exec_hook(REDO_PURGE_ROW_HEAD);
install_exec_hook(REDO_PURGE_ROW_TAIL);
install_exec_hook(REDO_PURGE_BLOCKS);
install_exec_hook(REDO_DELETE_ALL);
install_exec_hook(UNDO_ROW_INSERT);
install_exec_hook(UNDO_ROW_DELETE);
install_exec_hook(UNDO_ROW_UPDATE);
install_exec_hook(UNDO_ROW_PURGE);
install_exec_hook(COMMIT);
current_group_end_lsn= LSN_IMPOSSIBLE;
TRANSLOG_HEADER_BUFFER rec;
struct st_translog_scanner_data scanner;
uint i= 1;
int len= translog_read_record_header(lsn, &rec);
/** @todo EOF should be detected */
if (len == RECHEADER_READ_ERROR)
if (from_lsn == LSN_IMPOSSIBLE)
{
fprintf(tracef, "Cannot find a first record\n");
goto err;
if (last_checkpoint_lsn == LSN_IMPOSSIBLE)
from_lsn= first_lsn_in_log();
else
{
DBUG_ASSERT(0); /* not yet implemented */
from_lsn= parse_checkpoint_record(last_checkpoint_lsn);
if (from_lsn == LSN_IMPOSSIBLE)
goto err;
}
}
if (translog_init_scanner(lsn, 1, &scanner))
{
fprintf(tracef, "Scanner init failed\n");
if (run_redo_phase(from_lsn, apply))
goto err;
}
for (;;i++)
{
uint16 sid= rec.short_trid;
const LOG_DESC *log_desc= &log_record_type_descriptor[rec.type];
display_record_position(log_desc, &rec, i);
/*
A complete group is a set of log records with an "end mark" record
(e.g. a set of REDOs for an operation, terminated by an UNDO for this
operation); if there is no "end mark" record the group is incomplete
and won't be executed.
*/
if ((log_desc->record_in_group == LOGREC_IS_GROUP_ITSELF) ||
(log_desc->record_in_group == LOGREC_LAST_IN_GROUP))
{
if (all_active_trans[sid].group_start_lsn != LSN_IMPOSSIBLE)
{
if (log_desc->record_in_group == LOGREC_IS_GROUP_ITSELF)
{
/*
can happen if the transaction got a table write error, then
unlocked tables thus wrote a COMMIT record.
*/
fprintf(tracef, "\nDiscarding unfinished group before this record\n");
ALERT_USER();
all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
}
else
{
/*
There is a complete group for this transaction, containing more
than this event.
*/
fprintf(tracef, " ends a group:\n");
struct st_translog_scanner_data scanner2;
TRANSLOG_HEADER_BUFFER rec2;
len=
translog_read_record_header(all_active_trans[sid].group_start_lsn, &rec2);
if (len < 0) /* EOF or error */
{
fprintf(tracef, "Cannot find record where it should be\n");
goto err;
}
if (translog_init_scanner(rec2.lsn, 1, &scanner2))
{
fprintf(tracef, "Scanner2 init failed\n");
goto err;
}
current_group_end_lsn= rec.lsn;
do
{
if (rec2.short_trid == sid) /* it's in our group */
{
const LOG_DESC *log_desc2= &log_record_type_descriptor[rec2.type];
display_record_position(log_desc2, &rec2, 0);
if (apply && display_and_apply_record(log_desc2, &rec2))
goto err;
}
len= translog_read_next_record_header(&scanner2, &rec2);
if (len < 0) /* EOF or error */
{
fprintf(tracef, "Cannot find record where it should be\n");
goto err;
}
}
while (rec2.lsn < rec.lsn);
translog_free_record_header(&rec2);
/* group finished */
all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
current_group_end_lsn= LSN_IMPOSSIBLE; /* for debugging */
display_record_position(log_desc, &rec, 0);
}
}
if (apply && display_and_apply_record(log_desc, &rec))
goto err;
}
else /* record does not end group */
{
/* just record the fact, can't know if can execute yet */
if (all_active_trans[sid].group_start_lsn == LSN_IMPOSSIBLE)
{
/* group not yet started */
all_active_trans[sid].group_start_lsn= rec.lsn;
}
}
len= translog_read_next_record_header(&scanner, &rec);
if (len < 0)
{
switch (len)
{
case RECHEADER_READ_EOF:
fprintf(tracef, "EOF on the log\n");
break;
case RECHEADER_READ_ERROR:
fprintf(stderr, "Error reading log\n");
goto err;
}
break;
}
uint unfinished_trans= end_of_redo_phase(should_run_undo_phase);
if (unfinished_trans == (uint)-1)
goto err;
if (should_run_undo_phase)
{
if (run_undo_phase(unfinished_trans))
return 1;
}
translog_free_record_header(&rec);
else if (unfinished_trans > 0)
fprintf(tracef, "WARNING: %u unfinished transactions; some tables may be"
" left inconsistent!\n", unfinished_trans);
/*
So we have applied all REDOs.
We may now have unfinished transactions.
I don't think it's this program's job to roll them back:
to roll back and at the same time stay idempotent, it needs to write log
records (without CLRs, 2nd rollback would hit the effects of first
rollback and fail). But this standalone tool is not allowed to write to
the server's transaction log. So we do not roll back anything.
In the real Recovery code, or the code to do "recover after online
backup", yes we will roll back.
we don't use maria_panic() because it would maria_end(), and Recovery does
not want that (we want to keep modules initialized for runtime).
*/
if (end_of_redo_phase())
if (close_all_tables())
goto err;
/*
At this stage, end of recovery, trnman is left initialized. This is for
the future, when we have an online UNDO phase or prepared transactions.
*/
goto end;
err:
error= 1;
fprintf(tracef, "Recovery of tables with transaction logs FAILED\n");
end:
hash_free(&all_dirty_pages);
bzero(&all_dirty_pages, sizeof(all_dirty_pages));
my_free(dirty_pages_pool, MYF(MY_ALLOW_ZERO_PTR));
dirty_pages_pool= NULL;
my_free(all_tables, MYF(MY_ALLOW_ZERO_PTR));
all_tables= NULL;
my_free(all_active_trans, MYF(MY_ALLOW_ZERO_PTR));
all_active_trans= NULL;
my_free(log_record_buffer.str, MYF(MY_ALLOW_ZERO_PTR));
log_record_buffer.str= NULL;
log_record_buffer.length= 0;
/* we don't cleanly close tables if we hit some error (may corrupt them) */
DBUG_RETURN(error);
}
......@@ -362,9 +243,8 @@ static void display_record_position(const LOG_DESC *log_desc,
form a group, so we indent below the group's end record
*/
fprintf(tracef, "%sRec#%u LSN (%lu,0x%lx) short_trid %u %s(num_type:%u) len %lu\n",
number ? "" : " ", number,
(ulong) LSN_FILE_NO(rec->lsn), (ulong) LSN_OFFSET(rec->lsn),
rec->short_trid, log_desc->name, rec->type,
number ? "" : " ", number, LSN_IN_HEX(rec->lsn),
rec->short_trid, log_desc->name, rec->type,
(ulong)rec->record_length);
}
......@@ -391,11 +271,10 @@ prototype_exec_hook(LONG_TRANSACTION_ID)
TrID long_trid= all_active_trans[sid].long_trid;
/* abort group of this trn (must be of before a crash) */
LSN gslsn= all_active_trans[sid].group_start_lsn;
char llbuf[22];
if (gslsn != LSN_IMPOSSIBLE)
{
fprintf(tracef, "Group at LSN (%lu,0x%lx) short_trid %u aborted\n",
(ulong) LSN_FILE_NO(gslsn), (ulong) LSN_OFFSET(gslsn), sid);
LSN_IN_HEX(gslsn), sid);
all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
}
if (long_trid != 0)
......@@ -403,18 +282,17 @@ prototype_exec_hook(LONG_TRANSACTION_ID)
LSN ulsn= all_active_trans[sid].undo_lsn;
if (ulsn != LSN_IMPOSSIBLE)
{
char llbuf[22];
llstr(long_trid, llbuf);
fprintf(tracef, "Found an old transaction long_trid %s short_trid %u"
" with same short id as this new transaction, and has neither"
" committed nor rollback (undo_lsn: (%lu,0x%lx))\n", llbuf,
sid, (ulong) LSN_FILE_NO(ulsn), (ulong) LSN_OFFSET(ulsn));
sid, LSN_IN_HEX(ulsn));
goto err;
}
}
long_trid= uint6korr(rec->header);
all_active_trans[sid].long_trid= long_trid;
llstr(long_trid, llbuf);
fprintf(tracef, "Transaction long_trid %s short_trid %u starts\n", llbuf, sid);
new_transaction(sid, long_trid, LSN_IMPOSSIBLE, LSN_IMPOSSIBLE);
goto end;
err:
ALERT_USER();
......@@ -424,13 +302,24 @@ prototype_exec_hook(LONG_TRANSACTION_ID)
}
#ifdef MARIA_CHECKPOINT
prototype_exec_hook(CHECKPOINT)
static void new_transaction(uint16 sid, TrID long_id, LSN undo_lsn,
LSN first_undo_lsn)
{
char llbuf[22];
all_active_trans[sid].long_trid= long_id;
llstr(long_id, llbuf);
fprintf(tracef, "Transaction long_trid %s short_trid %u starts\n",
llbuf, sid);
all_active_trans[sid].undo_lsn= undo_lsn;
all_active_trans[sid].first_undo_lsn= first_undo_lsn;
}
prototype_exec_hook_dummy(CHECKPOINT)
{
/* the only checkpoint we care about was found via control file, ignore */
return 0;
}
#endif
prototype_exec_hook(REDO_CREATE_TABLE)
......@@ -475,9 +364,9 @@ prototype_exec_hook(REDO_CREATE_TABLE)
}
if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0)
{
fprintf(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than record",
(ulong) LSN_FILE_NO(rec->lsn),
(ulong) LSN_OFFSET(rec->lsn));
fprintf(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than"
" record, ignoring",
LSN_IN_HEX(share->state.create_rename_lsn));
error= 0;
goto end;
}
......@@ -490,7 +379,7 @@ prototype_exec_hook(REDO_CREATE_TABLE)
info= NULL;
}
/* if does not exist, is older, or its header is corrupted, overwrite it */
// TODO symlinks
/** @todo symlinks */
ptr= name + strlen(name) + 1;
if ((flags= ptr[0] ? HA_DONT_TOUCH_DATA : 0))
fprintf(tracef, ", we will only touch index file");
......@@ -592,9 +481,9 @@ prototype_exec_hook(REDO_DROP_TABLE)
}
if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0)
{
fprintf(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than record",
(ulong) LSN_FILE_NO(rec->lsn),
(ulong) LSN_OFFSET(rec->lsn));
fprintf(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than"
" record, ignoring",
LSN_IN_HEX(share->state.create_rename_lsn));
error= 0;
goto end;
}
......@@ -633,9 +522,15 @@ prototype_exec_hook(FILE_ID)
{
uint16 sid;
int error= 1;
char *name, *buff;
MARIA_HA *info= NULL;
MARIA_SHARE *share;
const char *name;
MARIA_HA *info;
if (cmp_translog_addr(rec->lsn, checkpoint_start) < 0)
{
fprintf(tracef, "ignoring because before checkpoint\n");
return 0;
}
enlarge_buffer(rec);
if (log_record_buffer.str == NULL ||
translog_read_record(rec->lsn, 0, rec->record_length,
......@@ -645,21 +540,40 @@ prototype_exec_hook(FILE_ID)
fprintf(tracef, "Failed to read record\n");
goto end;
}
buff= log_record_buffer.str;
sid= fileid_korr(buff);
name= buff + FILEID_STORE_SIZE;
info= all_tables[sid];
sid= fileid_korr(log_record_buffer.str);
info= all_tables[sid].info;
if (info != NULL)
{
all_tables[sid]= NULL;
if (close_recovered_table(info))
fprintf(tracef, " Closing table '%s'\n", info->s->open_file_name);
prepare_table_for_close(info, rec->lsn);
if (maria_close(info))
{
fprintf(tracef, "Failed to close table\n");
goto end;
}
all_tables[sid].info= NULL;
}
name= log_record_buffer.str + FILEID_STORE_SIZE;
if (new_table(sid, name, -1, -1, rec->lsn))
goto end;
error= 0;
end:
return error;
}
static int new_table(uint16 sid, const char *name,
File org_kfile, File org_dfile, LSN lsn)
{
/*
-1 (skip table): close table and return 0;
1 (error): close table and return 1;
0 (success): leave table open and return 0.
*/
int error= 1;
fprintf(tracef, "Table '%s', id %u", name, sid);
info= maria_open(name, O_RDWR, HA_OPEN_FOR_REPAIR);
MARIA_HA *info= maria_open(name, O_RDWR, HA_OPEN_FOR_REPAIR);
if (info == NULL)
{
fprintf(tracef, ", is absent (must have been dropped later?)"
......@@ -677,7 +591,7 @@ prototype_exec_hook(FILE_ID)
execute them, we should not reject the crashed table here.
*/
}
share= info->s;
MARIA_SHARE *share= info->s;
/* check that we're not already using it */
DBUG_ASSERT(share->reopen == 1);
DBUG_ASSERT(share->now_transactional == share->base.born_transactional);
......@@ -685,10 +599,17 @@ prototype_exec_hook(FILE_ID)
{
fprintf(tracef, ", is not transactional\n");
ALERT_USER();
error= 0;
error= -1;
goto end;
}
if (cmp_translog_addr(lsn, share->state.create_rename_lsn) <= 0)
{
fprintf(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than"
" record, ignoring",
LSN_IN_HEX(share->state.create_rename_lsn));
error= -1;
goto end;
}
all_tables[sid]= info;
/* don't log any records for this work */
_ma_tmp_disable_logging_for_table(share);
/* execution of some REDO records relies on data_file_length */
......@@ -702,17 +623,25 @@ prototype_exec_hook(FILE_ID)
}
share->state.state.data_file_length= dfile_len;
share->state.state.key_file_length= kfile_len;
if ((dfile_len == 0) || ((dfile_len % share->block_size) > 0))
if ((dfile_len % share->block_size) > 0)
{
fprintf(tracef, ", has too short last page\n");
/* Recovery will fix this, no error */
ALERT_USER();
}
all_tables[sid].info= info;
all_tables[sid].org_kfile= org_kfile;
all_tables[sid].org_dfile= org_dfile;
fprintf(tracef, ", opened\n");
error= 0;
end:
if (error && info != NULL)
error|= maria_close(info);
if (error)
{
if (info != NULL)
maria_close(info);
if (error == -1)
error= 0;
}
return error;
}
......@@ -765,7 +694,7 @@ prototype_exec_hook(REDO_INSERT_ROW_HEAD)
prototype_exec_hook(REDO_INSERT_ROW_TAIL)
{
int error= 1;
uchar *buff;
uchar *buff= NULL;
MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
if (info == NULL)
goto end;
......@@ -833,24 +762,11 @@ prototype_exec_hook(REDO_PURGE_ROW_TAIL)
prototype_exec_hook(REDO_PURGE_BLOCKS)
{
int error= 1;
uchar *buff;
MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
if (info == NULL)
goto end;
enlarge_buffer(rec);
if (log_record_buffer.str == NULL ||
translog_read_record(rec->lsn, 0, rec->record_length,
log_record_buffer.str, NULL) !=
rec->record_length)
{
fprintf(tracef, "Failed to read record\n");
goto end;
}
buff= log_record_buffer.str;
if (_ma_apply_redo_purge_blocks(info, current_group_end_lsn,
buff + FILEID_STORE_SIZE))
rec->header + FILEID_STORE_SIZE))
goto end;
error= 0;
end:
......@@ -874,17 +790,18 @@ prototype_exec_hook(REDO_DELETE_ALL)
}
#define set_undo_lsn_for_active_trans(I, L) do { \
all_active_trans[I].undo_lsn= L; \
if (all_active_trans[I].first_undo_lsn == LSN_IMPOSSIBLE) \
all_active_trans[I].first_undo_lsn= L; } while (0)
prototype_exec_hook(UNDO_ROW_INSERT)
{
int error= 1;
MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
if (info == NULL)
goto end;
all_active_trans[rec->short_trid].undo_lsn= rec->lsn;
/*
todo: instead of above, call write_hook_for_undo, it will also set
first_undo_lsn
*/
set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
/*
in an upcoming patch ("recovery of the state"), we introduce
state.is_of_lsn. For now, we just assume the state is old (true when we
......@@ -893,6 +810,7 @@ prototype_exec_hook(UNDO_ROW_INSERT)
{
fprintf(tracef, " state older than record, updating rows' count\n");
info->s->state.state.records++;
/** @todo RECOVERY BUG Also update the table's checksum */
}
fprintf(tracef, " rows' count %lu\n", (ulong)info->s->state.state.records);
error= 0;
......@@ -907,11 +825,7 @@ prototype_exec_hook(UNDO_ROW_DELETE)
MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
if (info == NULL)
goto end;
all_active_trans[rec->short_trid].undo_lsn= rec->lsn;
/*
todo: instead of above, call write_hook_for_undo, it will also set
first_undo_lsn
*/
set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
{
fprintf(tracef, " state older than record, updating rows' count\n");
info->s->state.state.records--;
......@@ -923,23 +837,6 @@ prototype_exec_hook(UNDO_ROW_DELETE)
}
prototype_exec_hook(UNDO_ROW_UPDATE)
{
int error= 1;
MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
if (info == NULL)
goto end;
all_active_trans[rec->short_trid].undo_lsn= rec->lsn;
/*
todo: instead of above, call write_hook_for_undo, it will also set
first_undo_lsn
*/
error= 0;
end:
return error;
}
prototype_exec_hook(UNDO_ROW_PURGE)
{
int error= 1;
......@@ -947,11 +844,7 @@ prototype_exec_hook(UNDO_ROW_PURGE)
if (info == NULL)
goto end;
/* this a bit broken, but this log record type will be deleted soon */
all_active_trans[rec->short_trid].undo_lsn= rec->lsn;
/*
todo: instead of above, call write_hook_for_undo, it will also set
first_undo_lsn
*/
set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
{
fprintf(tracef, " state older than record, updating rows' count\n");
info->s->state.state.records--;
......@@ -1002,77 +895,284 @@ prototype_exec_hook(COMMIT)
}
/* Just to inform about any aborted groups or unfinished transactions */
static int end_of_redo_phase()
static int run_redo_phase(LSN lsn, my_bool apply)
{
/* install hooks for execution */
#define install_exec_hook(R) \
log_record_type_descriptor[LOGREC_ ## R].record_execute_in_redo_phase= \
exec_LOGREC_ ## R;
install_exec_hook(LONG_TRANSACTION_ID);
install_exec_hook(CHECKPOINT);
install_exec_hook(REDO_CREATE_TABLE);
install_exec_hook(REDO_DROP_TABLE);
install_exec_hook(FILE_ID);
install_exec_hook(REDO_INSERT_ROW_HEAD);
install_exec_hook(REDO_INSERT_ROW_TAIL);
install_exec_hook(REDO_PURGE_ROW_HEAD);
install_exec_hook(REDO_PURGE_ROW_TAIL);
install_exec_hook(REDO_PURGE_BLOCKS);
install_exec_hook(REDO_DELETE_ALL);
install_exec_hook(UNDO_ROW_INSERT);
install_exec_hook(UNDO_ROW_DELETE);
install_exec_hook(UNDO_ROW_PURGE);
install_exec_hook(COMMIT);
current_group_end_lsn= LSN_IMPOSSIBLE;
TRANSLOG_HEADER_BUFFER rec;
/*
instead of this block below we will soon use
translog_first_lsn_in_log()...
*/
int len= translog_read_record_header(lsn, &rec);
/** @todo EOF should be detected */
if (len == RECHEADER_READ_ERROR)
{
fprintf(tracef, "Cannot find a first record\n");
return 1;
}
struct st_translog_scanner_data scanner;
if (translog_init_scanner(lsn, 1, &scanner))
{
fprintf(tracef, "Scanner init failed\n");
return 1;
}
uint i;
for (i= 1;;i++)
{
uint16 sid= rec.short_trid;
const LOG_DESC *log_desc= &log_record_type_descriptor[rec.type];
display_record_position(log_desc, &rec, i);
/*
A complete group is a set of log records with an "end mark" record
(e.g. a set of REDOs for an operation, terminated by an UNDO for this
operation); if there is no "end mark" record the group is incomplete
and won't be executed.
*/
if ((log_desc->record_in_group == LOGREC_IS_GROUP_ITSELF) ||
(log_desc->record_in_group == LOGREC_LAST_IN_GROUP))
{
if (all_active_trans[sid].group_start_lsn != LSN_IMPOSSIBLE)
{
if (log_desc->record_in_group == LOGREC_IS_GROUP_ITSELF)
{
/*
can happen if the transaction got a table write error, then
unlocked tables thus wrote a COMMIT record.
*/
fprintf(tracef, "\nDiscarding unfinished group before this record\n");
ALERT_USER();
all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
}
else
{
/*
There is a complete group for this transaction, containing more
than this event.
*/
fprintf(tracef, " ends a group:\n");
struct st_translog_scanner_data scanner2;
TRANSLOG_HEADER_BUFFER rec2;
len=
translog_read_record_header(all_active_trans[sid].group_start_lsn, &rec2);
if (len < 0) /* EOF or error */
{
fprintf(tracef, "Cannot find record where it should be\n");
return 1;
}
if (translog_init_scanner(rec2.lsn, 1, &scanner2))
{
fprintf(tracef, "Scanner2 init failed\n");
return 1;
}
current_group_end_lsn= rec.lsn;
do
{
if (rec2.short_trid == sid) /* it's in our group */
{
const LOG_DESC *log_desc2= &log_record_type_descriptor[rec2.type];
display_record_position(log_desc2, &rec2, 0);
if (apply && display_and_apply_record(log_desc2, &rec2))
return 1;
}
len= translog_read_next_record_header(&scanner2, &rec2);
if (len < 0) /* EOF or error */
{
fprintf(tracef, "Cannot find record where it should be\n");
return 1;
}
}
while (rec2.lsn < rec.lsn);
translog_free_record_header(&rec2);
/* group finished */
all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
current_group_end_lsn= LSN_IMPOSSIBLE; /* for debugging */
display_record_position(log_desc, &rec, 0);
}
}
if (apply && display_and_apply_record(log_desc, &rec))
return 1;
}
else /* record does not end group */
{
/* just record the fact, can't know if can execute yet */
if (all_active_trans[sid].group_start_lsn == LSN_IMPOSSIBLE)
{
/* group not yet started */
all_active_trans[sid].group_start_lsn= rec.lsn;
}
}
len= translog_read_next_record_header(&scanner, &rec);
if (len < 0)
{
switch (len)
{
case RECHEADER_READ_EOF:
fprintf(tracef, "EOF on the log\n");
break;
case RECHEADER_READ_ERROR:
fprintf(stderr, "Error reading log\n");
return 1;
}
break;
}
}
translog_free_record_header(&rec);
return 0;
}
/**
@brief Informs about any aborted groups or unfinished transactions,
prepares for the UNDO phase if needed.
@param prepare_for_undo_phase
@note Observe that it may init trnman.
*/
static uint end_of_redo_phase(my_bool prepare_for_undo_phase)
{
uint sid, unfinished= 0, error= 0;
uint sid, unfinished= 0;
hash_free(&all_dirty_pages);
/*
hash_free() can be called multiple times probably, but be safe it that
changes
*/
bzero(&all_dirty_pages, sizeof(all_dirty_pages));
my_free(dirty_pages_pool, MYF(MY_ALLOW_ZERO_PTR));
dirty_pages_pool= NULL;
if (prepare_for_undo_phase && trnman_init())
return -1;
for (sid= 0; sid <= SHORT_TRID_MAX; sid++)
{
TrID long_trid= all_active_trans[sid].long_trid;
LSN gslsn= all_active_trans[sid].group_start_lsn;
TRN *trn;
if (gslsn != LSN_IMPOSSIBLE)
{
fprintf(tracef, "Group at LSN (%lu,0x%lx) short_trid %u aborted\n",
(ulong) LSN_FILE_NO(gslsn), (ulong) LSN_OFFSET(gslsn), sid);
ALERT_USER();
}
if (all_active_trans[sid].undo_lsn != LSN_IMPOSSIBLE)
{
char llbuf[22];
llstr(long_trid, llbuf);
fprintf(tracef, "Transaction long_trid %s short_trid %u unfinished\n",
llbuf, sid);
/* dummy_transaction_object serves only for DDLs */
DBUG_ASSERT(long_trid != 0);
if (prepare_for_undo_phase)
{
if ((trn= trnman_recreate_trn_from_recovery(sid, long_trid)) == NULL)
return -1;
trn->undo_lsn= all_active_trans[sid].undo_lsn;
}
/* otherwise we will just warn about it */
unfinished++;
}
if (gslsn != LSN_IMPOSSIBLE)
{
fprintf(tracef, "Group at LSN (%lu,0x%lx) short_trid %u aborted\n",
(ulong) LSN_FILE_NO(gslsn), (ulong) LSN_OFFSET(gslsn), sid);
ALERT_USER();
}
/* If real recovery: roll back unfinished transaction */
#ifdef MARIA_VERSIONING
/*
If real recovery: transaction was committed, move it to some separate
list for soon purging. Create TRNs.
If real recovery: if transaction was committed, move it to some separate
list for soon purging.
*/
#endif
}
/*
We don't close tables if there are some unfinished transactions, because
closing tables normally requires that all unfinished transactions on them
be rolled back. Unfinished transactions are symptom of a crash, we
reproduce the crash.
For example, closing will soon write the state to disk and when doing that
it will think this is a committed state, but it may not be.
my_free(all_active_trans, MYF(MY_ALLOW_ZERO_PTR));
all_active_trans= NULL;
/*
The UNDO phase uses some normal run-time code of ROLLBACK: generates log
records, etc; prepare tables for that
*/
if (unfinished > 0)
fprintf(tracef, "WARNING: %u unfinished transactions; some tables may be"
" left inconsistent!\n", unfinished);
LSN addr= translog_get_horizon();
for (sid= 0; sid <= SHARE_ID_MAX; sid++)
{
MARIA_HA *info= all_tables[sid];
MARIA_HA *info= all_tables[sid].info;
if (info != NULL)
{
/* if error, still close other tables */
error|= close_recovered_table(info);
prepare_table_for_close(info, addr);
/*
But we don't close it; we leave it available for the UNDO phase;
it's likely that the UNDO phase will need it.
*/
if (prepare_for_undo_phase)
translog_assign_id_to_share_from_recovery(info->s, sid);
}
}
return error;
/* we don't need all_tables anymore, maria_open_list is enough */
my_free(all_tables, MYF(MY_ALLOW_ZERO_PTR));
all_tables= NULL;
/*
We could take a checkpoint here, in case of a crash during the UNDO
phase. The drawback is that a page which got a REDO (thus, flushed
by this would-be checkpoint) is likely to have an UNDO executed on it
soon. And so, the flush was probably lost time.
So for now we prefer to do recovery with maximum speed and take a
checkpoint only at the end of the UNDO phase.
*/
return unfinished;
}
static int close_recovered_table(MARIA_HA *info)
static int run_undo_phase(uint unfinished)
{
if (unfinished > 0)
{
fprintf(tracef, "%u transactions will be rolled back\n", unfinished);
for( ; unfinished-- ; )
{
char llbuf[22];
TRN *trn= trnman_get_any_trn();
DBUG_ASSERT(trn != NULL);
llstr(trn->trid, llbuf);
fprintf(tracef, "Rolling back transaction of long id %s\n", llbuf);
/* of course we miss execution of UNDOs here */
if (trnman_rollback_trn(trn))
return 1;
/* We could want to span a few threads (4?) instead of 1 */
/* In the future, we want to have this phase *online* */
}
}
return 0;
}
static void prepare_table_for_close(MARIA_HA *info,
LSN at_lsn __attribute__ ((unused)))
{
int error;
MARIA_SHARE *share= info->s;
fprintf(tracef, " Closing table '%s'\n", share->open_file_name);
/* we will soon use at_lsn here */
_ma_reenable_logging_for_table(share);
/*
Recovery normally corrected problems, don't scare user with "table was not
closed properly" in CHECK TABLE and don't automatically check table at
next open (when we have --maria-recover).
*/
share->state.open_count= share->global_changed ? 1 : 0;
/* this var is set only by non-recovery operations (mi_write() etc) */
DBUG_ASSERT(!share->global_changed);
if ((error= maria_close(info)))
fprintf(tracef, "Failed to close table\n");
return error;
}
......@@ -1080,16 +1180,22 @@ static MARIA_HA *get_MARIA_HA_from_REDO_record(const
TRANSLOG_HEADER_BUFFER *rec)
{
uint16 sid;
ulonglong page;
pgcache_page_no_t page;
MARIA_HA *info;
char llbuf[22];
sid= fileid_korr(rec->header);
page= page_korr(rec->header + FILEID_STORE_SIZE);
/* BUG not correct for REDO_PURGE_BLOCKS, page is not at this pos */
/**
@todo RECOVERY BUG
- for REDO_PURGE_BLOCKS, page is not at this pos
- for DELETE_ALL, record ends here! buffer overrun!
Solution: caller should pass a param enum { i_am_about_data_file,
i_am_about_index_file, none }.
*/
llstr(page, llbuf);
fprintf(tracef, " For page %s of table of short id %u", llbuf, sid);
info= all_tables[sid];
info= all_tables[sid].info;
if (info == NULL)
{
fprintf(tracef, ", table skipped, so skipping record\n");
......@@ -1098,23 +1204,38 @@ static MARIA_HA *get_MARIA_HA_from_REDO_record(const
fprintf(tracef, ", '%s'", info->s->open_file_name);
/* detect if an open instance of a dropped table (internal bug) */
DBUG_ASSERT(info->s->last_version != 0);
if (cmp_translog_addr(info->s->state.create_rename_lsn, rec->lsn) >= 0)
if (cmp_translog_addr(rec->lsn, checkpoint_start) < 0)
{
fprintf(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than log"
" record\n",
(ulong) LSN_FILE_NO(rec->lsn), (ulong) LSN_OFFSET(rec->lsn));
return NULL;
/**
@todo RECOVERY BUG always assuming this is REDO for data file, but it
could soon be index file
*/
uint64 file_and_page_id=
(((uint64)all_tables[sid].org_dfile) << 32) | page;
struct st_dirty_page *dirty_page= (struct st_dirty_page *)
hash_search(&all_dirty_pages,
(uchar *)&file_and_page_id, sizeof(file_and_page_id));
if ((dirty_page == NULL) ||
cmp_translog_addr(rec->lsn, dirty_page->rec_lsn) < 0)
{
fprintf(tracef, ", ignoring because of dirty_pages list\n");
return NULL;
}
}
fprintf(tracef, ", applying record\n");
return info;
/*
Soon we will also skip the page depending on the rec_lsn for this page in
the checkpoint record, but this is not absolutely needed for now (just
assume we have made no checkpoint). Btw rec_lsn and bitmap's recovery is a
an unsolved problem (rec_lsn is to ignore a REDO without reading the data
page and to do so we need to be sure the corresponding bitmap page does
not need a _ma_bitmap_set()).
So we are going to read the page, and if its LSN is older than the
record's we will modify the page
*/
fprintf(tracef, ", applying record\n");
/* A future CHECK/OPTIMIZE/REPAIR should not be fooled: */
/**
@todo but the ones about keys should be set only if REDO for keys. Same
in ..._from_UNDO_record
*/
info->s->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED |
STATE_NOT_OPTIMIZED_KEYS | STATE_NOT_SORTED_PAGES;
return info;
}
......@@ -1126,7 +1247,7 @@ static MARIA_HA *get_MARIA_HA_from_UNDO_record(const
sid= fileid_korr(rec->header + LSN_STORE_SIZE);
fprintf(tracef, " For table of short id %u", sid);
info= all_tables[sid];
info= all_tables[sid].info;
if (info == NULL)
{
fprintf(tracef, ", table skipped, so skipping record\n");
......@@ -1134,24 +1255,180 @@ static MARIA_HA *get_MARIA_HA_from_UNDO_record(const
}
fprintf(tracef, ", '%s'", info->s->open_file_name);
DBUG_ASSERT(info->s->last_version != 0);
if (cmp_translog_addr(info->s->state.create_rename_lsn, rec->lsn) >= 0)
{
fprintf(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than log"
" record\n",
(ulong) LSN_FILE_NO(rec->lsn), (ulong) LSN_OFFSET(rec->lsn));
return NULL;
}
fprintf(tracef, ", applying record\n");
/* execution of UNDOs may increment the records' count: */
info->s->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED |
STATE_NOT_OPTIMIZED_KEYS | STATE_NOT_SORTED_PAGES;
return info;
}
static int parse_checkpoint_record(LSN lsn)
{
uint i;
TRANSLOG_HEADER_BUFFER rec;
fprintf(tracef, "Loading data from checkpoint record\n");
int len= translog_read_record_header(lsn, &rec);
/** @todo EOF should be detected */
if (len == RECHEADER_READ_ERROR)
{
fprintf(tracef, "Cannot find checkpoint record where it should be\n");
return 1;
}
enlarge_buffer(&rec);
if (log_record_buffer.str == NULL ||
translog_read_record(rec.lsn, 0, rec.record_length,
log_record_buffer.str, NULL) !=
rec.record_length)
{
fprintf(tracef, "Failed to read record\n");
return 1;
}
char *ptr= log_record_buffer.str;
checkpoint_start= lsn_korr(ptr);
ptr+= LSN_STORE_SIZE;
/* transactions */
uint nb_active_transactions= uint2korr(ptr);
ptr+= 2;
fprintf(tracef, "%u active transactions\n", nb_active_transactions);
LSN minimum_rec_lsn_of_active_transactions= lsn_korr(ptr);
ptr+= LSN_STORE_SIZE;
/*
Soon we will also skip the page depending on the rec_lsn for this page in
the checkpoint record, but this is not absolutely needed for now (just
assume we have made no checkpoint).
how much brain juice and discussions there was to come to writing this
line
*/
set_if_smaller(checkpoint_start, minimum_rec_lsn_of_active_transactions);
for (i= 0; i < nb_active_transactions; i++)
{
uint16 sid= uint2korr(ptr);
ptr+= 2;
TrID long_id= uint6korr(ptr);
ptr+= 6;
DBUG_ASSERT(sid > 0 && long_id > 0);
LSN undo_lsn= lsn_korr(ptr);
ptr+= LSN_STORE_SIZE;
LSN first_undo_lsn= lsn_korr(ptr);
ptr+= LSN_STORE_SIZE;
new_transaction(sid, long_id, undo_lsn, first_undo_lsn);
}
uint nb_committed_transactions= uint4korr(ptr);
ptr+= 4;
fprintf(tracef, "%lu committed transactions\n",
(ulong)nb_committed_transactions);
/* no purging => committed transactions are not important */
ptr+= (6 + LSN_STORE_SIZE) * nb_committed_transactions;
/* tables */
uint nb_tables= uint4korr(ptr);
fprintf(tracef, "%u open tables\n", nb_tables);
for (i= 0; i< nb_tables; i++)
{
char name[FN_REFLEN];
uint16 sid= uint2korr(ptr);
ptr+= 2;
DBUG_ASSERT(sid > 0);
File kfile= uint4korr(ptr);
ptr+= 4;
File dfile= uint4korr(ptr);
ptr+= 4;
LSN first_log_write_lsn= lsn_korr(ptr);
ptr+= LSN_STORE_SIZE;
uint name_len= strlen(ptr) + 1;
ptr+= name_len;
strnmov(name, ptr, sizeof(name));
if (new_table(sid, name, kfile, dfile, first_log_write_lsn))
return 1;
}
/* dirty pages */
uint nb_dirty_pages= uint4korr(ptr);
ptr+= 4;
if (hash_init(&all_dirty_pages, &my_charset_bin, nb_dirty_pages,
offsetof(struct st_dirty_page, file_and_page_id),
sizeof(((struct st_dirty_page *)NULL)->file_and_page_id),
NULL, NULL, 0))
return 1;
dirty_pages_pool=
(struct st_dirty_page *)my_malloc(nb_dirty_pages *
sizeof(struct st_dirty_page),
MYF(MY_WME));
if (unlikely(dirty_pages_pool == NULL))
return 1;
struct st_dirty_page *next_dirty_page_in_pool= dirty_pages_pool;
LSN minimum_rec_lsn_of_dirty_pages= LSN_MAX;
for (i= 0; i < nb_dirty_pages ; i++)
{
File fileid= uint4korr(ptr);
ptr+= 4;
pgcache_page_no_t pageid= uint4korr(ptr);
ptr+= 4;
LSN rec_lsn= lsn_korr(ptr);
ptr+= LSN_STORE_SIZE;
if (new_page(fileid, pageid, rec_lsn, next_dirty_page_in_pool++))
return 1;
set_if_smaller(minimum_rec_lsn_of_dirty_pages, rec_lsn);
}
/* after that, there will be no insert/delete into the hash */
/*
sanity check on record (did we screw up with all those "ptr+=", did the
checkpoint write code and checkpoint read code go out of sync?).
*/
/**
@todo This probably presently and hopefully detects that
first_log_write_lsn is not written by the checkpoint record; we need
to add MARIA_SHARE::first_log_write_lsn, fill it with a inwrite-hook of
LOGREC_FILE_ID (note that when we write this record we hold intern_lock,
so Checkpoint will read the LSN correctly), and store it in the
checkpoint record.
*/
if (ptr != (log_record_buffer.str + log_record_buffer.length))
{
fprintf(tracef, "checkpoint record corrupted\n");
return 1;
}
set_if_smaller(checkpoint_start, minimum_rec_lsn_of_dirty_pages);
return 0;
}
static int new_page(File fileid, pgcache_page_no_t pageid, LSN rec_lsn,
struct st_dirty_page *dirty_page)
{
/* serves as hash key */
dirty_page->file_and_page_id= (((uint64)fileid) << 32) | pageid;
dirty_page->rec_lsn= rec_lsn;
return my_hash_insert(&all_dirty_pages, (uchar *)dirty_page);
}
static int close_all_tables()
{
int error= 0;
LIST *list_element, *next_open;
MARIA_HA *info;
pthread_mutex_lock(&THR_LOCK_maria);
if (maria_open_list == NULL)
goto end;
fprintf(tracef, "Closing all tables\n");
for (list_element= maria_open_list ; list_element ; list_element= next_open)
{
next_open= list_element->next;
info= (MARIA_HA*)list_element->data;
pthread_mutex_unlock(&THR_LOCK_maria); /* ok, UNDO phase not online yet */
error|= maria_close(info);
pthread_mutex_lock(&THR_LOCK_maria);
}
end:
pthread_mutex_unlock(&THR_LOCK_maria);
return error;
}
/* some comments and pseudo-code which we keep for later */
#if 0
......
......@@ -25,5 +25,6 @@
C_MODE_START
int maria_recover();
int maria_apply_log(LSN lsn, my_bool applyn, FILE *trace_file);
int maria_apply_log(LSN lsn, my_bool apply, FILE *trace_file,
my_bool execute_undo_phase);
C_MODE_END
......@@ -1035,7 +1035,7 @@ static int maria_chk(HA_CHECK *param, char *filename)
that it will have to find and store it.
*/
if (share->base.born_transactional)
share->state.create_rename_lsn= (LSN)ULONGLONG_MAX;
share->state.create_rename_lsn= LSN_REPAIRED_BY_MARIA_CHK;
if ((param->testflag & (T_REP_BY_SORT | T_REP_PARALLEL)) &&
(maria_is_any_key_active(share->state.key_map) ||
(rep_quick && !param->keys_in_use && !recreate)) &&
......
......@@ -51,7 +51,7 @@ int main(int argc, char **argv)
goto err;
}
/* we don't want to create a control file, it MUST exist */
if (ma_control_file_create_or_open(FALSE))
if (ma_control_file_create_or_open())
{
fprintf(stderr, "Can't open control file (%d)\n", errno);
goto err;
......@@ -93,7 +93,8 @@ int main(int argc, char **argv)
*/
fprintf(stdout, "TRACE of the last maria_read_log\n");
if (maria_apply_log(lsn, opt_display_and_apply, stdout))
/* Until we have UNDO records, no UNDO phase */
if (maria_apply_log(lsn, opt_display_and_apply, stdout, FALSE))
goto err;
fprintf(stdout, "%s: SUCCESS\n", my_progname);
......
......@@ -18,6 +18,7 @@
#include <my_sys.h>
#include <m_string.h>
#include "trnman.h"
#include "ma_control_file.h"
/*
status variables:
......@@ -708,3 +709,29 @@ my_bool trnman_collect_transactions(LEX_STRING *str_act, LEX_STRING *str_com,
pthread_mutex_unlock(&LOCK_trn_list);
DBUG_RETURN(error);
}
TRN *trnman_recreate_trn_from_recovery(uint16 shortid, TrID longid)
{
TrID old_trid_generator= global_trid_generator;
TRN *trn;
DBUG_ASSERT(maria_in_recovery && !maria_multi_threaded);
if (unlikely((trn= trnman_new_trn(NULL, NULL, NULL)) == NULL))
return NULL;
/* deallocate excessive allocations of trnman_new_trn() */
global_trid_generator= old_trid_generator;
set_if_bigger(global_trid_generator, longid);
short_trid_to_active_trn[trn->short_id]= 0;
DBUG_ASSERT(short_trid_to_active_trn[shortid] == NULL);
short_trid_to_active_trn[shortid]= trn;
trn->trid= longid;
trn->short_id= shortid;
return trn;
}
TRN *trnman_get_any_trn()
{
TRN *trn= active_list_min.next;
return (trn != &active_list_max) ? trn : NULL;
}
......@@ -53,6 +53,8 @@ uint trnman_increment_locked_tables(TRN *trn);
uint trnman_decrement_locked_tables(TRN *trn);
my_bool trnman_has_locked_tables(TRN *trn);
void trnman_reset_locked_tables(TRN *trn);
TRN *trnman_recreate_trn_from_recovery(uint16 shortid, TrID longid);
TRN *trnman_get_any_trn();
C_MODE_END
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment