Commit 7e292d6b authored by Michael Widenius's avatar Michael Widenius

Fix of LP#616253 Crash in _ma_bitmap_set_full_page_bits on Aria recovery

The bug was based on wrong undo data in recovery file and not enough checking of bad data.

sql/sql_select.h:
  Added comment
storage/maria/ma_blockrec.c:
  - Removed wrong sanity checks (didn't work for UNDO records)
  - More sanity checks and DBUG_ASSERT
  - More DBUG_ENTER and DBUG_PRINT
  - Removed filler blocks in extent_to_bitmap_blocks() as it caused problems in write_block_record().
    This was the main cause of the bug.
    (This change can make records generated by UNDO slightly smaller than original record, which we have to fix
    by correcting row_pos.length before calling write_block_record())
  - Fixed some problems in write_block_record() while doing UNDO.
  - Store header_length without TRANSID_SIZE into recovery log (as UNDO entires doesn't have TRANSID_SIZE)
  - Mark table crashed if something goes wrong during UNDO
storage/maria/maria_def.h:
  Added header_length
parent 0bdc300d
......@@ -365,6 +365,11 @@ class JOIN :public Sql_alloc
the number of rows in it may vary from one subquery execution to another.
*/
bool no_const_tables;
/*
This flag is set if we call no_rows_in_result() as par of end_group().
This is used as a simple speed optimization to avoiding calling
restore_no_rows_in_result() in ::reinit()
*/
bool no_rows_in_result_called;
/**
......
......@@ -334,6 +334,7 @@ typedef struct st_maria_extent_cursor
} \
} while (0)
static my_bool delete_tails(MARIA_HA *info, MARIA_RECORD_POS *tails);
static my_bool delete_head_or_tail(MARIA_HA *info,
pgcache_page_no_t page, uint record_number,
......@@ -1850,16 +1851,11 @@ static my_bool get_rowpos_in_head_or_tail_page(MARIA_HA *info,
goto err;
}
/*
The following dir entry is unused in case of insert / update but
not in case of undo_update / undo_delete
*/
dir= dir_entry_pos(buff, block_size, rownr);
#ifdef SANITY_CHECKS
/* Tail's should always be unused */
if (page_type == TAIL_PAGE && max_entry > rownr &&
(dir[0] != 0 || dir[1] != 0))
{
DBUG_ASSERT(0);
goto err;
}
#endif
if (extend_area_on_page(page_type == HEAD_PAGE ? info : 0, buff, dir,
rownr, block_size, length,
......@@ -2236,6 +2232,8 @@ static void store_extent_info(uchar *to,
MARIA_BITMAP_BLOCK *block, *end_block;
uint copy_length;
my_bool first_found= 0;
DBUG_ENTER("store_extent_info");
DBUG_PRINT("enter", ("count: %u", count));
for (block= first_block, end_block= first_block+count ;
block < end_block; block++)
......@@ -2255,6 +2253,7 @@ static void store_extent_info(uchar *to,
page_count|= START_EXTENT_BIT;
}
pagerange_store(to + PAGE_STORE_SIZE, page_count);
DBUG_DUMP("extent", to, ROW_EXTENT_SIZE);
to+= ROW_EXTENT_SIZE;
if (!first_found)
{
......@@ -2269,6 +2268,7 @@ static void store_extent_info(uchar *to,
data.
*/
bzero(to, (size_t) (row_extents_second_part + copy_length - to));
DBUG_VOID_RETURN;
}
......@@ -2287,7 +2287,8 @@ static void store_extent_info(uchar *to,
@return
@retval 0 ok
@retval 1 Error (out of memory or disk error changing bitmap)
@retval 1 Error (out of memory or disk error changing bitmap) or
wrong information in extent information
*/
static my_bool extent_to_bitmap_blocks(MARIA_HA *info,
......@@ -2298,7 +2299,7 @@ static my_bool extent_to_bitmap_blocks(MARIA_HA *info,
{
MARIA_BITMAP_BLOCK *block, *start_block;
MARIA_SHARE *share= info->s;
uint i;
uint i, tail_page;
DBUG_ENTER("extent_to_bitmap_blocks");
if (allocate_dynamic(&info->bitmap_blocks, extent_count + 2))
......@@ -2324,13 +2325,36 @@ static my_bool extent_to_bitmap_blocks(MARIA_HA *info,
page_count&= ~START_EXTENT_BIT;
start_block->sub_blocks= (uint) (block - start_block);
start_block= block;
}
block->page= page_korr(extent_info);
block->page_count= page_count;
block->sub_blocks= 0;
if (block->page_count == 0)
{
/* Extend allocated but not used by write_block_record() */
DBUG_ASSERT(block->page == 0);
/* This is the last block */
blocks->count= i;
break;
}
if ((tail_page= page_count & TAIL_BIT))
page_count= 1;
if (page_count & TAIL_BIT)
/* Check if wrong data */
if (block->page == 0 || page_count == 0 ||
(block->page + page_count) * share->block_size >
share->state.state.data_file_length)
{
DBUG_PRINT("error", ("page: %lu page_count: %u tail: %u length: %ld data_length: %ld",
(ulong) block->page,
(block->page_count & ~TAIL_BIT),
(uint) test(block->page_count & TAIL_BIT),
(ulong) ((block->page + (page_count & ~TAIL_BIT)) *
share->block_size),
(ulong) share->state.state.data_file_length));
DBUG_RETURN(1);
}
if (tail_page)
{
block->org_bitmap_value= _ma_bitmap_get_page_bits(info, &share->bitmap,
block->page);
......@@ -2342,7 +2366,7 @@ static my_bool extent_to_bitmap_blocks(MARIA_HA *info,
my_bool res;
pthread_mutex_lock(&share->bitmap.bitmap_lock);
res= _ma_bitmap_set_full_page_bits(info, &share->bitmap,
block->page, block->page_count);
block->page, page_count);
pthread_mutex_unlock(&share->bitmap.bitmap_lock);
if (res)
DBUG_RETURN(1);
......@@ -2764,9 +2788,16 @@ static my_bool write_block_record(MARIA_HA *info,
sizeof(char*));
memcpy(data, tmp_pos, *blob_lengths);
data+= *blob_lengths;
/* Skip over tail page that was to be used to store blob */
block++;
bitmap_blocks->tail_page_skipped= 1;
/*
The following is not true when we want to insert data into original
place. In this case we don't have any extra blocks allocated
*/
if (likely(undo_lsn == LSN_ERROR))
{
/* Skip over tail page that was prepared for storing blob */
block++;
bitmap_blocks->tail_page_skipped= 1;
}
}
if (head_block->sub_blocks > 1)
{
......@@ -2779,7 +2810,9 @@ static my_bool write_block_record(MARIA_HA *info,
/* Update page directory */
head_length= (uint) (data - row_pos->data);
DBUG_PRINT("info", ("Used head length on page: %u", head_length));
DBUG_PRINT("info", ("Used head length on page: %u header_length: %u",
head_length,
(uint) (flag & ROW_FLAG_TRANSID ? TRANSID_SIZE : 0)));
DBUG_ASSERT(data <= end_of_data);
if (head_length < share->base.min_block_length)
{
......@@ -2789,6 +2822,7 @@ static my_bool write_block_record(MARIA_HA *info,
data+= diff_length;
head_length= share->base.min_block_length;
}
DBUG_ASSERT(undo_lsn == LSN_ERROR || head_length == row_pos->length);
int2store(row_pos->dir + 2, head_length);
/* update empty space at start of block */
row_pos->empty_space-= head_length;
......@@ -2852,11 +2886,13 @@ static my_bool write_block_record(MARIA_HA *info,
{
/*
Set only a bit, to not cause bitmap code to believe a block is full
when there is still a lot of entries in it
when there is still a lot of entries in it.
*/
block->used|= BLOCKUSED_USED;
}
}
DBUG_ASSERT((undo_lsn == LSN_ERROR ||
block == bitmap_blocks->block + bitmap_blocks->count));
column= save_column;
block= save_block;
blob_lengths= save_blob_lengths;
......@@ -3250,9 +3286,10 @@ static my_bool write_block_record(MARIA_HA *info,
else
{
uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE +
PAGE_STORE_SIZE + DIRPOS_STORE_SIZE +
PAGE_STORE_SIZE + DIRPOS_STORE_SIZE + 2 +
HA_CHECKSUM_STORE_SIZE + 2 + PAGERANGE_STORE_SIZE +
ROW_EXTENT_SIZE];
uchar *log_pos;
ha_checksum checksum_delta;
/* LOGREC_UNDO_ROW_INSERT & LOGREC_UNDO_ROW_UPDATE share same header */
......@@ -3262,18 +3299,17 @@ static my_bool write_block_record(MARIA_HA *info,
dirpos_store(log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE +
PAGE_STORE_SIZE,
row_pos->rownr);
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length=
(LSN_STORE_SIZE + FILEID_STORE_SIZE + PAGE_STORE_SIZE +
DIRPOS_STORE_SIZE);
log_pos= (log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE +
PAGE_STORE_SIZE + DIRPOS_STORE_SIZE);
store_checksum_in_rec(share, checksum_delta,
row->checksum - old_record_checksum,
log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE +
PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
log_array[TRANSLOG_INTERNAL_PARTS + 0].length);
log_pos, log_pos);
compile_time_assert(sizeof(ha_checksum) == HA_CHECKSUM_STORE_SIZE);
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
log_data);
if (!old_record)
{
/* Store undo_lsn in case we are aborting the insert */
......@@ -3292,16 +3328,18 @@ static my_bool write_block_record(MARIA_HA *info,
else
{
/* Write UNDO log record for the UPDATE */
uchar *log_pos= (log_data +
log_array[TRANSLOG_INTERNAL_PARTS + 0].length);
size_t row_length, extents_length;
uint row_parts_count;
uint row_parts_count, cur_head_length;
/*
Write head length and extents of the original row so that we
during UNDO can put it back in the original position
during UNDO can put it back in the original position.
We don't store size for TRANSID, as we don't write this during
UNDO.
*/
int2store(log_pos, info->cur_row.head_length);
cur_head_length= (info->cur_row.head_length -
info->cur_row.header_length);
int2store(log_pos, cur_head_length);
pagerange_store(log_pos + 2, info->cur_row.extents_count);
log_pos+= 2 + PAGERANGE_STORE_SIZE;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length+= (2 +
......@@ -3783,6 +3821,8 @@ static my_bool _ma_update_block_record2(MARIA_HA *info,
This is the main reason we don't make a lot of subfunctions that are
common between _ma_update_block_record2() and this function.
Note: If something goes wrong we mark the file crashed
*/
static my_bool _ma_update_at_original_place(MARIA_HA *info,
......@@ -3838,6 +3878,10 @@ static my_bool _ma_update_at_original_place(MARIA_HA *info,
if ((org_empty_size + cur_row->head_length) < length_on_head_page)
{
DBUG_PRINT("error",
("org_empty_size: %u head_length: %u length_on_page: %u",
org_empty_size, (uint) cur_row->head_length,
length_on_head_page));
my_errno= HA_ERR_WRONG_IN_RECORD;
goto err;
}
......@@ -3857,7 +3901,6 @@ static my_bool _ma_update_at_original_place(MARIA_HA *info,
row_pos.empty_space= empty_size;
row_pos.dir= dir;
row_pos.data= buff + rec_offset;
row_pos.length= length_on_head_page;
/* Delete old row */
if (*cur_row->tail_positions &&
......@@ -3887,12 +3930,17 @@ static my_bool _ma_update_at_original_place(MARIA_HA *info,
max(new_row->total_length, share->base.min_block_length) <=
length_on_head_page);
/* Store same amount of data on head page as on original page */
row_pos.length= (length_on_head_page -
(extent_count + 1 - blocks->count) * ROW_EXTENT_SIZE);
set_if_bigger(row_pos.length, share->base.min_block_length);
if ((res= write_block_record(info, oldrec, record, new_row, blocks,
1, &row_pos, undo_lsn, old_checksum)))
goto err;
DBUG_RETURN(0);
err:
_ma_mark_file_crashed(share);
if (info->non_flushable_state)
_ma_bitmap_flushable(info, -1);
_ma_unpin_all_pages_and_finalize_row(info, LSN_IMPOSSIBLE);
......@@ -4223,7 +4271,8 @@ my_bool _ma_delete_block_record(MARIA_HA *info, const uchar *record)
log_pos= log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE + PAGE_STORE_SIZE;
dirpos_store(log_pos, record_number);
log_pos+= DIRPOS_STORE_SIZE;
int2store(log_pos, info->cur_row.head_length);
int2store(log_pos, info->cur_row.head_length -
info->cur_row.header_length);
log_pos+= 2;
pagerange_store(log_pos, info->cur_row.extents_count);
log_pos+= PAGERANGE_STORE_SIZE;
......@@ -4601,6 +4650,8 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record,
cur_row->head_length= (uint) (end_of_data - data);
cur_row->full_page_count= cur_row->tail_count= 0;
cur_row->blob_length= 0;
/* Number of bytes in header that we don't need to write during undo */
cur_row->header_length= total_header_size[(flag & PRECALC_HEADER_BITMASK)]-1;
if (flag & ROW_FLAG_TRANSID)
{
......@@ -4612,7 +4663,7 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record,
}
/* Skip trans header (for now, until we have MVCC csupport) */
data+= total_header_size[(flag & PRECALC_HEADER_BITMASK)];
data+= cur_row->header_length + 1 ;
if (flag & ROW_FLAG_NULLS_EXTENDED)
cur_null_bytes+= data[-1];
......@@ -5008,7 +5059,10 @@ int _ma_read_block_record(MARIA_HA *info, uchar *record,
uint offset;
uint block_size= share->block_size;
DBUG_ENTER("_ma_read_block_record");
DBUG_PRINT("enter", ("rowid: %lu", (long) record_pos));
DBUG_PRINT("enter", ("rowid: %lu page: %lu rownr: %u",
(ulong) record_pos,
(ulong) ma_recordpos_to_page(record_pos),
ma_recordpos_to_dir_entry(record_pos)));
offset= ma_recordpos_to_dir_entry(record_pos);
......@@ -6756,7 +6810,7 @@ my_bool _ma_apply_undo_row_insert(MARIA_HA *info, LSN undo_lsn,
pgcache_page_no_t page;
uint rownr;
uchar *buff;
my_bool res= 1;
my_bool res;
MARIA_PINNED_PAGE page_link;
MARIA_SHARE *share= info->s;
ha_checksum checksum;
......@@ -6802,11 +6856,16 @@ my_bool _ma_apply_undo_row_insert(MARIA_HA *info, LSN undo_lsn,
goto err;
res= 0;
err:
end:
if (info->non_flushable_state)
_ma_bitmap_flushable(info, -1);
_ma_unpin_all_pages_and_finalize_row(info, lsn);
DBUG_RETURN(res);
err:
res= 1;
_ma_mark_file_crashed(share);
goto end;
}
......@@ -7029,6 +7088,10 @@ my_bool _ma_apply_undo_row_delete(MARIA_HA *info, LSN undo_lsn,
{
DBUG_ASSERT(row.checksum == (share->calc_checksum)(info, record));
}
/* Store same amount of data on head page as on original page */
row_pos.length= (length_on_head_page -
(extent_count + 1 - blocks->count) * ROW_EXTENT_SIZE);
set_if_bigger(row_pos.length, share->base.min_block_length);
if (write_block_record(info, (uchar*) 0, record, &row,
blocks, blocks->block->org_bitmap_value != 0,
&row_pos, undo_lsn, 0))
......@@ -7038,6 +7101,7 @@ my_bool _ma_apply_undo_row_delete(MARIA_HA *info, LSN undo_lsn,
DBUG_RETURN(0);
err:
_ma_mark_file_crashed(share);
if (info->non_flushable_state)
_ma_bitmap_flushable(info, -1);
_ma_unpin_all_pages_and_finalize_row(info, LSN_IMPOSSIBLE);
......@@ -7068,7 +7132,7 @@ my_bool _ma_apply_undo_row_update(MARIA_HA *info, LSN undo_lsn,
pgcache_page_no_t page;
ha_checksum checksum_delta;
uint rownr, field_length_header, extent_count, length_on_head_page;
int error= 1;
int error;
DBUG_ENTER("_ma_apply_undo_row_update");
LINT_INIT(checksum_delta);
......@@ -7076,6 +7140,7 @@ my_bool _ma_apply_undo_row_update(MARIA_HA *info, LSN undo_lsn,
header+= PAGE_STORE_SIZE;
rownr= dirpos_korr(header);
header+= DIRPOS_STORE_SIZE;
record_pos= ma_recordpos(page, rownr);
DBUG_PRINT("enter", ("rowid: %lu page: %lu rownr: %u",
(ulong) record_pos, (ulong) page, rownr));
......@@ -7205,9 +7270,14 @@ my_bool _ma_apply_undo_row_update(MARIA_HA *info, LSN undo_lsn,
goto err;
error= 0;
err:
end:
my_free(current_record, MYF(0));
DBUG_RETURN(error);
err:
error= 1;
_ma_mark_file_crashed(share);
goto end;
}
......
......@@ -459,8 +459,9 @@ typedef struct st_maria_row
uint *null_field_lengths; /* All null field lengths */
ulong *blob_lengths; /* Length for each blob */
ulong min_length, normal_length, char_length, varchar_length;
ulong blob_length, head_length, total_length;
ulong blob_length, total_length;
size_t extents_buffer_length; /* Size of 'extents' buffer */
uint head_length, header_length;
uint field_lengths_length; /* Length of data in field_lengths */
uint extents_count; /* number of extents in 'extents' */
uint full_page_count, tail_count; /* For maria_chk */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment