Commit 4d44697d authored by unknown's avatar unknown

Merge bk-internal.mysql.com:/home/bk/mysql-maria

into  mysql.com:/home/my/mysql-maria


storage/maria/ma_pagecache.c:
  Auto merged
parents a564b67d 5f473af7
...@@ -281,9 +281,11 @@ typedef struct st_maria_extent_cursor ...@@ -281,9 +281,11 @@ typedef struct st_maria_extent_cursor
/* Position to all tails in the row. Updated when reading a row */ /* Position to all tails in the row. Updated when reading a row */
MARIA_RECORD_POS *tail_positions; MARIA_RECORD_POS *tail_positions;
/* Current page */ /* Current page */
my_off_t page; ulonglong page;
/* How many pages in the page region */ /* How many pages in the page region */
uint page_count; uint page_count;
/* What kind of lock to use for tail pages */
enum pagecache_page_lock lock_for_tail_pages;
/* Total number of extents (i.e., entries in the 'extent' slot) */ /* Total number of extents (i.e., entries in the 'extent' slot) */
uint extent_count; uint extent_count;
/* <> 0 if current extent is a tail page; Set while using cursor */ /* <> 0 if current extent is a tail page; Set while using cursor */
...@@ -2435,7 +2437,7 @@ my_bool _ma_write_block_record(MARIA_HA *info __attribute__ ((unused)), ...@@ -2435,7 +2437,7 @@ my_bool _ma_write_block_record(MARIA_HA *info __attribute__ ((unused)),
/** /**
@brief Remove row written by _ma_write_block_record() @brief Remove row written by _ma_write_block_record() and log undo
@param info Maria handler @param info Maria handler
...@@ -2894,8 +2896,6 @@ my_bool _ma_delete_block_record(MARIA_HA *info, const uchar *record) ...@@ -2894,8 +2896,6 @@ my_bool _ma_delete_block_record(MARIA_HA *info, const uchar *record)
delete_tails(info, info->cur_row.tail_positions)) delete_tails(info, info->cur_row.tail_positions))
goto err; goto err;
info->s->state.split--;
if (info->cur_row.extents && free_full_pages(info, &info->cur_row)) if (info->cur_row.extents && free_full_pages(info, &info->cur_row))
goto err; goto err;
...@@ -3023,6 +3023,7 @@ static void init_extent(MARIA_EXTENT_CURSOR *extent, uchar *extent_info, ...@@ -3023,6 +3023,7 @@ static void init_extent(MARIA_EXTENT_CURSOR *extent, uchar *extent_info,
else else
extent->page_count= page_count; extent->page_count= page_count;
extent->tail_positions= tail_positions; extent->tail_positions= tail_positions;
extent->lock_for_tail_pages= PAGECACHE_LOCK_LEFT_UNLOCKED;
} }
...@@ -3050,6 +3051,8 @@ static uchar *read_next_extent(MARIA_HA *info, MARIA_EXTENT_CURSOR *extent, ...@@ -3050,6 +3051,8 @@ static uchar *read_next_extent(MARIA_HA *info, MARIA_EXTENT_CURSOR *extent,
{ {
MARIA_SHARE *share= info->s; MARIA_SHARE *share= info->s;
uchar *buff, *data; uchar *buff, *data;
MARIA_PINNED_PAGE page_link;
enum pagecache_page_lock lock;
DBUG_ENTER("read_next_extent"); DBUG_ENTER("read_next_extent");
if (!extent->page_count) if (!extent->page_count)
...@@ -3073,17 +3076,22 @@ static uchar *read_next_extent(MARIA_HA *info, MARIA_EXTENT_CURSOR *extent, ...@@ -3073,17 +3076,22 @@ static uchar *read_next_extent(MARIA_HA *info, MARIA_EXTENT_CURSOR *extent,
} }
extent->first_extent= 0; extent->first_extent= 0;
lock= PAGECACHE_LOCK_LEFT_UNLOCKED;
if (extent->tail)
lock= extent->lock_for_tail_pages;
DBUG_ASSERT(share->pagecache->block_size == share->block_size); DBUG_ASSERT(share->pagecache->block_size == share->block_size);
if (!(buff= pagecache_read(share->pagecache, if (!(buff= pagecache_read(share->pagecache,
&info->dfile, extent->page, 0, &info->dfile, extent->page, 0,
info->buff, share->page_type, info->buff, share->page_type,
PAGECACHE_LOCK_LEFT_UNLOCKED, 0))) lock, &page_link.link)))
{ {
/* check if we tried to read over end of file (ie: bad data in record) */ /* check if we tried to read over end of file (ie: bad data in record) */
if ((extent->page + 1) * share->block_size > info->state->data_file_length) if ((extent->page + 1) * share->block_size > info->state->data_file_length)
goto crashed; goto crashed;
DBUG_RETURN(0); DBUG_RETURN(0);
} }
if (!extent->tail) if (!extent->tail)
{ {
/* Full data page */ /* Full data page */
...@@ -3095,7 +3103,14 @@ static uchar *read_next_extent(MARIA_HA *info, MARIA_EXTENT_CURSOR *extent, ...@@ -3095,7 +3103,14 @@ static uchar *read_next_extent(MARIA_HA *info, MARIA_EXTENT_CURSOR *extent,
info->cur_row.full_page_count++; /* For maria_chk */ info->cur_row.full_page_count++; /* For maria_chk */
DBUG_RETURN(extent->data_start= buff + LSN_SIZE + PAGE_TYPE_SIZE); DBUG_RETURN(extent->data_start= buff + LSN_SIZE + PAGE_TYPE_SIZE);
} }
/* Found tail */ /* Found tail */
if (lock != PAGECACHE_LOCK_LEFT_UNLOCKED)
{
/* Read during redo */
page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK;
push_dynamic(&info->pinned_pages, (void*) &page_link);
}
if ((buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) != TAIL_PAGE) if ((buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) != TAIL_PAGE)
goto crashed; goto crashed;
...@@ -3492,6 +3507,105 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record, ...@@ -3492,6 +3507,105 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record,
} }
/** @brief Read positions to tail blocks and full blocks
@fn read_row_extent_info()
@param info Handler
@notes
This function is a simpler version of _ma_read_block_record2()
The data about the used pages is stored in info->cur_row.
@return
@retval 0 ok
@retval 1 Error. my_errno contains error number
*/
static my_bool read_row_extent_info(MARIA_HA *info, uchar *buff,
uint record_number)
{
MARIA_SHARE *share= info->s;
uchar *data, *end_of_data;
uint flag, row_extents, field_lengths;
MARIA_EXTENT_CURSOR extent;
DBUG_ENTER("read_row_extent_info");
if (!(data= get_record_position(buff, share->block_size,
record_number, &end_of_data)))
DBUG_RETURN(1); /* Wrong in record */
flag= (uint) (uchar) data[0];
/* Skip trans header */
data+= total_header_size[(flag & PRECALC_HEADER_BITMASK)];
row_extents= 0;
if (flag & ROW_FLAG_EXTENTS)
{
uint row_extent_size;
/*
Record is split over many data pages.
Get number of extents and first extent
*/
get_key_length(row_extents, data);
row_extent_size= row_extents * ROW_EXTENT_SIZE;
if (info->cur_row.extents_buffer_length < row_extent_size &&
_ma_alloc_buffer(&info->cur_row.extents,
&info->cur_row.extents_buffer_length,
row_extent_size))
DBUG_RETURN(1);
memcpy(info->cur_row.extents, data, ROW_EXTENT_SIZE);
data+= ROW_EXTENT_SIZE;
init_extent(&extent, info->cur_row.extents, row_extents,
info->cur_row.tail_positions);
extent.first_extent= 1;
}
else
(*info->cur_row.tail_positions)= 0;
info->cur_row.extents_count= row_extents;
if (share->base.max_field_lengths)
get_key_length(field_lengths, data);
if (share->calc_checksum)
info->cur_row.checksum= (uint) (uchar) *data++;
if (row_extents > 1)
{
MARIA_RECORD_POS *tail_pos;
uchar *extents, *end;
data+= share->base.null_bytes;
data+= share->base.pack_bytes;
data+= share->base.field_offsets * FIELD_OFFSET_SIZE;
/*
Read row extents (note that first extent was already read into
info->cur_row.extents above)
Lock tails with write lock as we will delete them later.
*/
extent.lock_for_tail_pages= PAGECACHE_LOCK_LEFT_WRITELOCKED;
if (read_long_data(info, info->cur_row.extents + ROW_EXTENT_SIZE,
(row_extents - 1) * ROW_EXTENT_SIZE,
&extent, &data, &end_of_data))
DBUG_RETURN(1);
/* Update tail_positions with pointer to tails */
tail_pos= info->cur_row.tail_positions;
for (extents= info->cur_row.extents, end= extents+ row_extents;
extents < end;
extents += ROW_EXTENT_SIZE)
{
ulonglong page= uint5korr(extents);
uint page_count= uint2korr(extents + ROW_EXTENT_PAGE_SIZE);
if (page_count & TAIL_BIT)
*(tail_pos++)= ma_recordpos(page, (page_count & ~TAIL_BIT));
}
*tail_pos= 0; /* End marker */
}
DBUG_RETURN(0);
}
/* /*
Read a record based on record position Read a record based on record position
...@@ -4575,3 +4689,62 @@ uint _ma_apply_redo_purge_blocks(MARIA_HA *info, ...@@ -4575,3 +4689,62 @@ uint _ma_apply_redo_purge_blocks(MARIA_HA *info,
} }
DBUG_RETURN(0); DBUG_RETURN(0);
} }
/****************************************************************************
Applying of UNDO entries
****************************************************************************/
my_bool _ma_apply_undo_row_insert(MARIA_HA *info, LSN undo_lsn,
const uchar *header)
{
ulonglong page;
uint record_number;
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE], *buff;
my_bool res= 1;
MARIA_PINNED_PAGE page_link;
LSN lsn;
DBUG_ENTER("_ma_apply_undo_row_insert");
page= page_korr(header);
record_number= dirpos_korr(header + PAGE_STORE_SIZE);
DBUG_PRINT("enter", ("Page: %lu record_number: %u", (ulong) page,
record_number));
if (!(buff= pagecache_read(info->s->pagecache,
&info->dfile, page, 0,
info->buff, info->s->page_type,
PAGECACHE_LOCK_WRITE,
&page_link.link)))
DBUG_RETURN(1);
page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK;
push_dynamic(&info->pinned_pages, (void*) &page_link);
if (read_row_extent_info(info, buff, record_number))
DBUG_RETURN(1);
if (delete_head_or_tail(info, page, record_number, 1, 1) ||
delete_tails(info, info->cur_row.tail_positions))
goto err;
if (info->cur_row.extents && free_full_pages(info, &info->cur_row))
goto err;
lsn_store(log_data + FILEID_STORE_SIZE, undo_lsn);
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
if (translog_write_record(&lsn, LOGREC_CLR_END,
info->trn, info, sizeof(log_data),
TRANSLOG_INTERNAL_PARTS + 1, log_array,
log_data))
goto err;
info->s->state.state.records--;
res= 0;
err:
_ma_unpin_all_pages(info, lsn);
DBUG_RETURN(res);
}
...@@ -187,3 +187,5 @@ uint _ma_apply_redo_purge_row_head_or_tail(MARIA_HA *info, LSN lsn, ...@@ -187,3 +187,5 @@ uint _ma_apply_redo_purge_row_head_or_tail(MARIA_HA *info, LSN lsn,
const uchar *header); const uchar *header);
uint _ma_apply_redo_purge_blocks(MARIA_HA *info, LSN lsn, uint _ma_apply_redo_purge_blocks(MARIA_HA *info, LSN lsn,
const uchar *header); const uchar *header);
my_bool _ma_apply_undo_row_insert(MARIA_HA *info, LSN undo_lsn,
const uchar *header);
...@@ -329,7 +329,7 @@ static LOG_DESC INIT_LOGREC_REDO_UNDELETE_ROW= ...@@ -329,7 +329,7 @@ static LOG_DESC INIT_LOGREC_REDO_UNDELETE_ROW=
"redo_undelete_row", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL}; "redo_undelete_row", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
static LOG_DESC INIT_LOGREC_CLR_END= static LOG_DESC INIT_LOGREC_CLR_END=
{LOGRECTYPE_PSEUDOFIXEDLENGTH, 5, 5, NULL, write_hook_for_redo, NULL, 1, {LOGRECTYPE_FIXEDLENGTH, 9, 9, NULL, write_hook_for_redo, NULL, 0,
"clr_end", LOGREC_LAST_IN_GROUP, NULL, NULL}; "clr_end", LOGREC_LAST_IN_GROUP, NULL, NULL};
static LOG_DESC INIT_LOGREC_PURGE_END= static LOG_DESC INIT_LOGREC_PURGE_END=
...@@ -6211,7 +6211,6 @@ static my_bool write_hook_for_undo(enum translog_record_type type ...@@ -6211,7 +6211,6 @@ static my_bool write_hook_for_undo(enum translog_record_type type
*/ */
} }
/** /**
@brief Gives a 2-byte-id to MARIA_SHARE and logs this fact @brief Gives a 2-byte-id to MARIA_SHARE and logs this fact
...@@ -6353,7 +6352,6 @@ my_bool translog_is_file(uint file_no) ...@@ -6353,7 +6352,6 @@ my_bool translog_is_file(uint file_no)
static uint32 translog_first_file(TRANSLOG_ADDRESS horizon, int is_protected) static uint32 translog_first_file(TRANSLOG_ADDRESS horizon, int is_protected)
{ {
TRANSLOG_ADDRESS addr;
uint min_file= 1, max_file; uint min_file= 1, max_file;
DBUG_ENTER("translog_first_file"); DBUG_ENTER("translog_first_file");
if (!is_protected) if (!is_protected)
......
...@@ -353,7 +353,7 @@ typedef struct st_log_record_type_descriptor ...@@ -353,7 +353,7 @@ typedef struct st_log_record_type_descriptor
/* a function to execute when we see the record during the REDO phase */ /* a function to execute when we see the record during the REDO phase */
int (*record_execute_in_redo_phase)(const TRANSLOG_HEADER_BUFFER *); int (*record_execute_in_redo_phase)(const TRANSLOG_HEADER_BUFFER *);
/* a function to execute when we see the record during the UNDO phase */ /* a function to execute when we see the record during the UNDO phase */
int (*record_execute_in_undo_phase)(const TRANSLOG_HEADER_BUFFER *); int (*record_execute_in_undo_phase)(const TRANSLOG_HEADER_BUFFER *, TRN *);
} LOG_DESC; } LOG_DESC;
extern LOG_DESC log_record_type_descriptor[LOGREC_NUMBER_OF_TYPES]; extern LOG_DESC log_record_type_descriptor[LOGREC_NUMBER_OF_TYPES];
......
...@@ -2885,6 +2885,7 @@ uchar *pagecache_valid_read(PAGECACHE *pagecache, ...@@ -2885,6 +2885,7 @@ uchar *pagecache_valid_read(PAGECACHE *pagecache,
&page_st); &page_st);
DBUG_ASSERT(block->type == PAGECACHE_EMPTY_PAGE || DBUG_ASSERT(block->type == PAGECACHE_EMPTY_PAGE ||
block->type == type || block->type == type ||
type == PAGECACHE_LSN_PAGE ||
type == PAGECACHE_READ_UNKNOWN_PAGE || type == PAGECACHE_READ_UNKNOWN_PAGE ||
block->type == PAGECACHE_READ_UNKNOWN_PAGE); block->type == PAGECACHE_READ_UNKNOWN_PAGE);
if (type != PAGECACHE_READ_UNKNOWN_PAGE || if (type != PAGECACHE_READ_UNKNOWN_PAGE ||
......
...@@ -49,27 +49,34 @@ static LSN current_group_end_lsn, ...@@ -49,27 +49,34 @@ static LSN current_group_end_lsn,
checkpoint_start= LSN_IMPOSSIBLE; checkpoint_start= LSN_IMPOSSIBLE;
static FILE *tracef; /**< trace file for debugging */ static FILE *tracef; /**< trace file for debugging */
#define prototype_exec_hook(R) \ #define prototype_redo_exec_hook(R) \
static int exec_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec) static int exec_REDO_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec)
#define prototype_exec_hook_dummy(R) \
static int exec_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec \ #define prototype_redo_exec_hook_dummy(R) \
static int exec_REDO_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec \
__attribute ((unused))) __attribute ((unused)))
prototype_exec_hook(LONG_TRANSACTION_ID);
prototype_exec_hook_dummy(CHECKPOINT); #define prototype_undo_exec_hook(R) \
prototype_exec_hook(REDO_CREATE_TABLE); static int exec_UNDO_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec, TRN *trn)
prototype_exec_hook(REDO_DROP_TABLE);
prototype_exec_hook(FILE_ID); prototype_redo_exec_hook(LONG_TRANSACTION_ID);
prototype_exec_hook(REDO_INSERT_ROW_HEAD); prototype_redo_exec_hook_dummy(CHECKPOINT);
prototype_exec_hook(REDO_INSERT_ROW_TAIL); prototype_redo_exec_hook(REDO_CREATE_TABLE);
prototype_exec_hook(REDO_PURGE_ROW_HEAD); prototype_redo_exec_hook(REDO_DROP_TABLE);
prototype_exec_hook(REDO_PURGE_ROW_TAIL); prototype_redo_exec_hook(FILE_ID);
prototype_exec_hook(REDO_PURGE_BLOCKS); prototype_redo_exec_hook(REDO_INSERT_ROW_HEAD);
prototype_exec_hook(REDO_DELETE_ALL); prototype_redo_exec_hook(REDO_INSERT_ROW_TAIL);
prototype_exec_hook(UNDO_ROW_INSERT); prototype_redo_exec_hook(REDO_PURGE_ROW_HEAD);
prototype_exec_hook(UNDO_ROW_DELETE); prototype_redo_exec_hook(REDO_PURGE_ROW_TAIL);
prototype_exec_hook(UNDO_ROW_UPDATE); prototype_redo_exec_hook(REDO_PURGE_BLOCKS);
prototype_exec_hook(UNDO_ROW_PURGE); prototype_redo_exec_hook(REDO_DELETE_ALL);
prototype_exec_hook(COMMIT); prototype_redo_exec_hook(UNDO_ROW_INSERT);
prototype_redo_exec_hook(UNDO_ROW_DELETE);
prototype_redo_exec_hook(UNDO_ROW_UPDATE);
prototype_redo_exec_hook(UNDO_ROW_PURGE);
prototype_redo_exec_hook(COMMIT);
prototype_undo_exec_hook(UNDO_ROW_INSERT);
static int run_redo_phase(LSN lsn, my_bool apply); static int run_redo_phase(LSN lsn, my_bool apply);
static uint end_of_redo_phase(my_bool prepare_for_undo_phase); static uint end_of_redo_phase(my_bool prepare_for_undo_phase);
static int run_undo_phase(uint unfinished); static int run_undo_phase(uint unfinished);
...@@ -159,6 +166,7 @@ int maria_apply_log(LSN from_lsn, my_bool apply, FILE *trace_file, ...@@ -159,6 +166,7 @@ int maria_apply_log(LSN from_lsn, my_bool apply, FILE *trace_file,
my_bool should_run_undo_phase) my_bool should_run_undo_phase)
{ {
int error= 0; int error= 0;
uint unfinished_trans;
DBUG_ENTER("maria_apply_log"); DBUG_ENTER("maria_apply_log");
DBUG_ASSERT(apply || !should_run_undo_phase); DBUG_ASSERT(apply || !should_run_undo_phase);
...@@ -199,7 +207,7 @@ int maria_apply_log(LSN from_lsn, my_bool apply, FILE *trace_file, ...@@ -199,7 +207,7 @@ int maria_apply_log(LSN from_lsn, my_bool apply, FILE *trace_file,
if (run_redo_phase(from_lsn, apply)) if (run_redo_phase(from_lsn, apply))
goto err; goto err;
uint unfinished_trans= end_of_redo_phase(should_run_undo_phase); unfinished_trans= end_of_redo_phase(should_run_undo_phase);
if (unfinished_trans == (uint)-1) if (unfinished_trans == (uint)-1)
goto err; goto err;
if (should_run_undo_phase) if (should_run_undo_phase)
...@@ -270,12 +278,12 @@ static int display_and_apply_record(const LOG_DESC *log_desc, ...@@ -270,12 +278,12 @@ static int display_and_apply_record(const LOG_DESC *log_desc,
return 1; return 1;
} }
if ((error= (*log_desc->record_execute_in_redo_phase)(rec))) if ((error= (*log_desc->record_execute_in_redo_phase)(rec)))
fprintf(tracef, "Got error when executing record\n"); fprintf(tracef, "Got error when executing redo on record\n");
return error; return error;
} }
prototype_exec_hook(LONG_TRANSACTION_ID) prototype_redo_exec_hook(LONG_TRANSACTION_ID)
{ {
uint16 sid= rec->short_trid; uint16 sid= rec->short_trid;
TrID long_trid= all_active_trans[sid].long_trid; TrID long_trid= all_active_trans[sid].long_trid;
...@@ -325,14 +333,14 @@ static void new_transaction(uint16 sid, TrID long_id, LSN undo_lsn, ...@@ -325,14 +333,14 @@ static void new_transaction(uint16 sid, TrID long_id, LSN undo_lsn,
} }
prototype_exec_hook_dummy(CHECKPOINT) prototype_redo_exec_hook_dummy(CHECKPOINT)
{ {
/* the only checkpoint we care about was found via control file, ignore */ /* the only checkpoint we care about was found via control file, ignore */
return 0; return 0;
} }
prototype_exec_hook(REDO_CREATE_TABLE) prototype_redo_exec_hook(REDO_CREATE_TABLE)
{ {
File dfile= -1, kfile= -1; File dfile= -1, kfile= -1;
char *linkname_ptr, filename[FN_REFLEN]; char *linkname_ptr, filename[FN_REFLEN];
...@@ -458,7 +466,7 @@ prototype_exec_hook(REDO_CREATE_TABLE) ...@@ -458,7 +466,7 @@ prototype_exec_hook(REDO_CREATE_TABLE)
} }
prototype_exec_hook(REDO_DROP_TABLE) prototype_redo_exec_hook(REDO_DROP_TABLE)
{ {
char *name; char *name;
int error= 1; int error= 1;
...@@ -528,7 +536,7 @@ prototype_exec_hook(REDO_DROP_TABLE) ...@@ -528,7 +536,7 @@ prototype_exec_hook(REDO_DROP_TABLE)
} }
prototype_exec_hook(FILE_ID) prototype_redo_exec_hook(FILE_ID)
{ {
uint16 sid; uint16 sid;
int error= 1; int error= 1;
...@@ -656,7 +664,7 @@ static int new_table(uint16 sid, const char *name, ...@@ -656,7 +664,7 @@ static int new_table(uint16 sid, const char *name,
} }
prototype_exec_hook(REDO_INSERT_ROW_HEAD) prototype_redo_exec_hook(REDO_INSERT_ROW_HEAD)
{ {
int error= 1; int error= 1;
uchar *buff= NULL; uchar *buff= NULL;
...@@ -701,7 +709,7 @@ prototype_exec_hook(REDO_INSERT_ROW_HEAD) ...@@ -701,7 +709,7 @@ prototype_exec_hook(REDO_INSERT_ROW_HEAD)
} }
prototype_exec_hook(REDO_INSERT_ROW_TAIL) prototype_redo_exec_hook(REDO_INSERT_ROW_TAIL)
{ {
int error= 1; int error= 1;
uchar *buff; uchar *buff;
...@@ -737,7 +745,7 @@ prototype_exec_hook(REDO_INSERT_ROW_TAIL) ...@@ -737,7 +745,7 @@ prototype_exec_hook(REDO_INSERT_ROW_TAIL)
} }
prototype_exec_hook(REDO_PURGE_ROW_HEAD) prototype_redo_exec_hook(REDO_PURGE_ROW_HEAD)
{ {
int error= 1; int error= 1;
MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec); MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
...@@ -753,7 +761,7 @@ prototype_exec_hook(REDO_PURGE_ROW_HEAD) ...@@ -753,7 +761,7 @@ prototype_exec_hook(REDO_PURGE_ROW_HEAD)
} }
prototype_exec_hook(REDO_PURGE_ROW_TAIL) prototype_redo_exec_hook(REDO_PURGE_ROW_TAIL)
{ {
int error= 1; int error= 1;
MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec); MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
...@@ -769,7 +777,7 @@ prototype_exec_hook(REDO_PURGE_ROW_TAIL) ...@@ -769,7 +777,7 @@ prototype_exec_hook(REDO_PURGE_ROW_TAIL)
} }
prototype_exec_hook(REDO_PURGE_BLOCKS) prototype_redo_exec_hook(REDO_PURGE_BLOCKS)
{ {
int error= 1; int error= 1;
uchar *buff; uchar *buff;
...@@ -797,7 +805,7 @@ prototype_exec_hook(REDO_PURGE_BLOCKS) ...@@ -797,7 +805,7 @@ prototype_exec_hook(REDO_PURGE_BLOCKS)
} }
prototype_exec_hook(REDO_DELETE_ALL) prototype_redo_exec_hook(REDO_DELETE_ALL)
{ {
int error= 1; int error= 1;
MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec); MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
...@@ -813,12 +821,12 @@ prototype_exec_hook(REDO_DELETE_ALL) ...@@ -813,12 +821,12 @@ prototype_exec_hook(REDO_DELETE_ALL)
} }
#define set_undo_lsn_for_active_trans(I, L) do { \ #define set_undo_lsn_for_active_trans(TRID, LSN) do { \
all_active_trans[I].undo_lsn= L; \ all_active_trans[TRID].undo_lsn= LSN; \
if (all_active_trans[I].first_undo_lsn == LSN_IMPOSSIBLE) \ if (all_active_trans[TRID].first_undo_lsn == LSN_IMPOSSIBLE) \
all_active_trans[I].first_undo_lsn= L; } while (0) all_active_trans[TRID].first_undo_lsn= LSN; } while (0)
prototype_exec_hook(UNDO_ROW_INSERT) prototype_redo_exec_hook(UNDO_ROW_INSERT)
{ {
int error= 1; int error= 1;
MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec); MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
...@@ -843,12 +851,13 @@ prototype_exec_hook(UNDO_ROW_INSERT) ...@@ -843,12 +851,13 @@ prototype_exec_hook(UNDO_ROW_INSERT)
} }
fprintf(tracef, " rows' count %lu\n", (ulong)info->s->state.state.records); fprintf(tracef, " rows' count %lu\n", (ulong)info->s->state.state.records);
error= 0; error= 0;
end: end:
return error; return error;
} }
prototype_exec_hook(UNDO_ROW_DELETE) prototype_redo_exec_hook(UNDO_ROW_DELETE)
{ {
int error= 1; int error= 1;
MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec); MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
...@@ -868,7 +877,7 @@ prototype_exec_hook(UNDO_ROW_DELETE) ...@@ -868,7 +877,7 @@ prototype_exec_hook(UNDO_ROW_DELETE)
} }
prototype_exec_hook(UNDO_ROW_UPDATE) prototype_redo_exec_hook(UNDO_ROW_UPDATE)
{ {
int error= 1; int error= 1;
MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec); MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
...@@ -885,7 +894,7 @@ prototype_exec_hook(UNDO_ROW_UPDATE) ...@@ -885,7 +894,7 @@ prototype_exec_hook(UNDO_ROW_UPDATE)
} }
prototype_exec_hook(UNDO_ROW_PURGE) prototype_redo_exec_hook(UNDO_ROW_PURGE)
{ {
int error= 1; int error= 1;
MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec); MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
...@@ -906,7 +915,7 @@ prototype_exec_hook(UNDO_ROW_PURGE) ...@@ -906,7 +915,7 @@ prototype_exec_hook(UNDO_ROW_PURGE)
} }
prototype_exec_hook(COMMIT) prototype_redo_exec_hook(COMMIT)
{ {
uint16 sid= rec->short_trid; uint16 sid= rec->short_trid;
TrID long_trid= all_active_trans[sid].long_trid; TrID long_trid= all_active_trans[sid].long_trid;
...@@ -945,28 +954,55 @@ prototype_exec_hook(COMMIT) ...@@ -945,28 +954,55 @@ prototype_exec_hook(COMMIT)
} }
prototype_undo_exec_hook(UNDO_ROW_INSERT)
{
my_bool error;
MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
if (info == NULL)
return 1;
info->s->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED |
STATE_NOT_OPTIMIZED_KEYS | STATE_NOT_SORTED_PAGES;
/* Set undo to point to previous undo record */
info->trn= trn;
info->trn->undo_lsn= lsn_korr(rec->header);
error= _ma_apply_undo_row_insert(info, rec->lsn,
rec->header + LSN_STORE_SIZE +
FILEID_STORE_SIZE);
info->trn= 0;
return error;
}
static int run_redo_phase(LSN lsn, my_bool apply) static int run_redo_phase(LSN lsn, my_bool apply)
{ {
/* install hooks for execution */ /* install hooks for execution */
#define install_exec_hook(R) \ #define install_redo_exec_hook(R) \
log_record_type_descriptor[LOGREC_ ## R].record_execute_in_redo_phase= \ log_record_type_descriptor[LOGREC_ ## R].record_execute_in_redo_phase= \
exec_LOGREC_ ## R; exec_REDO_LOGREC_ ## R;
install_exec_hook(LONG_TRANSACTION_ID); #define install_undo_exec_hook(R) \
install_exec_hook(CHECKPOINT); log_record_type_descriptor[LOGREC_ ## R].record_execute_in_undo_phase= \
install_exec_hook(REDO_CREATE_TABLE); exec_UNDO_LOGREC_ ## R;
install_exec_hook(REDO_DROP_TABLE); install_redo_exec_hook(LONG_TRANSACTION_ID);
install_exec_hook(FILE_ID); install_redo_exec_hook(CHECKPOINT);
install_exec_hook(REDO_INSERT_ROW_HEAD); install_redo_exec_hook(REDO_CREATE_TABLE);
install_exec_hook(REDO_INSERT_ROW_TAIL); install_redo_exec_hook(REDO_DROP_TABLE);
install_exec_hook(REDO_PURGE_ROW_HEAD); install_redo_exec_hook(FILE_ID);
install_exec_hook(REDO_PURGE_ROW_TAIL); install_redo_exec_hook(REDO_INSERT_ROW_HEAD);
install_exec_hook(REDO_PURGE_BLOCKS); install_redo_exec_hook(REDO_INSERT_ROW_TAIL);
install_exec_hook(REDO_DELETE_ALL); install_redo_exec_hook(REDO_PURGE_ROW_HEAD);
install_exec_hook(UNDO_ROW_INSERT); install_redo_exec_hook(REDO_PURGE_ROW_TAIL);
install_exec_hook(UNDO_ROW_DELETE); install_redo_exec_hook(REDO_PURGE_BLOCKS);
install_exec_hook(UNDO_ROW_UPDATE); install_redo_exec_hook(REDO_DELETE_ALL);
install_exec_hook(UNDO_ROW_PURGE); install_redo_exec_hook(UNDO_ROW_INSERT);
install_exec_hook(COMMIT); install_redo_exec_hook(UNDO_ROW_DELETE);
install_redo_exec_hook(UNDO_ROW_UPDATE);
install_redo_exec_hook(UNDO_ROW_PURGE);
install_redo_exec_hook(COMMIT);
install_undo_exec_hook(UNDO_ROW_INSERT);
current_group_end_lsn= LSN_IMPOSSIBLE; current_group_end_lsn= LSN_IMPOSSIBLE;
...@@ -1178,10 +1214,6 @@ static uint end_of_redo_phase(my_bool prepare_for_undo_phase) ...@@ -1178,10 +1214,6 @@ static uint end_of_redo_phase(my_bool prepare_for_undo_phase)
} }
} }
/* we don't need all_tables anymore, maria_open_list is enough */
my_free(all_tables, MYF(MY_ALLOW_ZERO_PTR));
all_tables= NULL;
/* /*
We could take a checkpoint here, in case of a crash during the UNDO We could take a checkpoint here, in case of a crash during the UNDO
phase. The drawback is that a page which got a REDO (thus, flushed phase. The drawback is that a page which got a REDO (thus, flushed
...@@ -1207,7 +1239,23 @@ static int run_undo_phase(uint unfinished) ...@@ -1207,7 +1239,23 @@ static int run_undo_phase(uint unfinished)
DBUG_ASSERT(trn != NULL); DBUG_ASSERT(trn != NULL);
llstr(trn->trid, llbuf); llstr(trn->trid, llbuf);
fprintf(tracef, "Rolling back transaction of long id %s\n", llbuf); fprintf(tracef, "Rolling back transaction of long id %s\n", llbuf);
/* of course we miss execution of UNDOs here */
/* Execute all undo entries */
while (trn->undo_lsn)
{
TRANSLOG_HEADER_BUFFER rec;
LOG_DESC *log_desc;
if (translog_read_record_header(trn->undo_lsn, &rec) ==
RECHEADER_READ_ERROR)
return 1;
log_desc= &log_record_type_descriptor[rec.type];
if (log_desc->record_execute_in_undo_phase(&rec, trn))
{
fprintf(tracef, "Got error when executing undo\n");
return 1;
}
}
if (trnman_rollback_trn(trn)) if (trnman_rollback_trn(trn))
return 1; return 1;
/* We could want to span a few threads (4?) instead of 1 */ /* We could want to span a few threads (4?) instead of 1 */
......
...@@ -38,7 +38,7 @@ static uint insert_count, update_count, remove_count; ...@@ -38,7 +38,7 @@ static uint insert_count, update_count, remove_count;
static uint pack_keys=0, pack_seg=0, key_length; static uint pack_keys=0, pack_seg=0, key_length;
static uint unique_key=HA_NOSAME; static uint unique_key=HA_NOSAME;
static my_bool pagecacheing, null_fields, silent, skip_update, opt_unique, static my_bool pagecacheing, null_fields, silent, skip_update, opt_unique,
verbose, skip_delete, transactional; verbose, skip_delete, transactional, die_in_middle_of_transaction;
static MARIA_COLUMNDEF recinfo[4]; static MARIA_COLUMNDEF recinfo[4];
static MARIA_KEYDEF keyinfo[10]; static MARIA_KEYDEF keyinfo[10];
static HA_KEYSEG keyseg[10]; static HA_KEYSEG keyseg[10];
...@@ -50,6 +50,19 @@ static void create_key(char *key,uint rownr); ...@@ -50,6 +50,19 @@ static void create_key(char *key,uint rownr);
static void create_record(char *record,uint rownr); static void create_record(char *record,uint rownr);
static void update_record(char *record); static void update_record(char *record);
/*
These are here only for testing of recovery with undo. We are not
including maria_def.h here as this test is also to be an example of
how to use maria outside of the maria directory
*/
extern int _ma_flush_table_files(MARIA_HA *info, uint flush_data_or_index,
enum flush_type flush_type_for_data,
enum flush_type flush_type_for_index);
#define MARIA_FLUSH_DATA 1
int main(int argc,char *argv[]) int main(int argc,char *argv[])
{ {
MY_INIT(argv[0]); MY_INIT(argv[0]);
...@@ -86,6 +99,9 @@ static int run_test(const char *filename) ...@@ -86,6 +99,9 @@ static int run_test(const char *filename)
MARIA_UNIQUEDEF uniquedef; MARIA_UNIQUEDEF uniquedef;
MARIA_CREATE_INFO create_info; MARIA_CREATE_INFO create_info;
if (die_in_middle_of_transaction)
null_fields= 1;
bzero((char*) recinfo,sizeof(recinfo)); bzero((char*) recinfo,sizeof(recinfo));
bzero((char*) &create_info,sizeof(create_info)); bzero((char*) &create_info,sizeof(create_info));
...@@ -198,6 +214,9 @@ static int run_test(const char *filename) ...@@ -198,6 +214,9 @@ static int run_test(const char *filename)
printf("J= %2d maria_write: %d errno: %d\n", j,error,my_errno); printf("J= %2d maria_write: %d errno: %d\n", j,error,my_errno);
} }
if (maria_commit(file) || maria_begin(file))
goto err;
/* Insert 2 rows with null values */ /* Insert 2 rows with null values */
if (null_fields) if (null_fields)
{ {
...@@ -215,6 +234,17 @@ static int run_test(const char *filename) ...@@ -215,6 +234,17 @@ static int run_test(const char *filename)
flags[0]=2; flags[0]=2;
} }
if (die_in_middle_of_transaction)
{
/*
Ensure we get changed pages and log to disk
As commit record is not done, the undo entries needs to be rolled back.
*/
_ma_flush_table_files(file, MARIA_FLUSH_DATA, FLUSH_RELEASE,
FLUSH_RELEASE);
exit(1);
}
if (!skip_update) if (!skip_update)
{ {
if (opt_unique) if (opt_unique)
...@@ -627,6 +657,11 @@ static struct my_option my_long_options[] = ...@@ -627,6 +657,11 @@ static struct my_option my_long_options[] =
(uchar**) &skip_delete, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0}, (uchar**) &skip_delete, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
{"skip-update", 'D', "Don't test updates", (uchar**) &skip_update, {"skip-update", 'D', "Don't test updates", (uchar**) &skip_update,
(uchar**) &skip_update, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0}, (uchar**) &skip_update, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
{"test-undo", 'A',
"Abort hard after doing inserts. Used for testing recovery with undo",
(uchar**) &die_in_middle_of_transaction,
(uchar**) &die_in_middle_of_transaction,
0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
{"transactional", 'T', {"transactional", 'T',
"Test in transactional mode. (Only works with block format)", "Test in transactional mode. (Only works with block format)",
(uchar**) &transactional, (uchar**) &transactional, 0, GET_BOOL, NO_ARG, (uchar**) &transactional, (uchar**) &transactional, 0, GET_BOOL, NO_ARG,
......
...@@ -93,8 +93,7 @@ int main(int argc, char **argv) ...@@ -93,8 +93,7 @@ int main(int argc, char **argv)
*/ */
fprintf(stdout, "TRACE of the last maria_read_log\n"); fprintf(stdout, "TRACE of the last maria_read_log\n");
/* Until we have UNDO records, no UNDO phase */ if (maria_apply_log(lsn, opt_display_and_apply, stdout, TRUE))
if (maria_apply_log(lsn, opt_display_and_apply, stdout, FALSE))
goto err; goto err;
fprintf(stdout, "%s: SUCCESS\n", my_progname); fprintf(stdout, "%s: SUCCESS\n", my_progname);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment