Commit f366cd84 authored by unknown's avatar unknown

Added undo of deleted row

Added part of undo of update row
Extended ma_test1 for recovery testing
Some bug fixes


storage/maria/ha_maria.cc:
  Ignore 'state.split' in case of block records
storage/maria/ma_bitmap.c:
  Added return value for _ma_bitmap_find_place() for how much data we should put on head page
storage/maria/ma_blockrec.c:
  Added undo of deleted row.
  - Added logging of CLR_END records in write_block_record()
  - Split ma_write_init_block_record() to two functions to get better code reuse
  - Added _ma_apply_undo_row_delete()
  - Added ma_get_length()
  
  Added 'empty' prototype for undo_row_update()
  
  Fixed bug when moving data withing a head/tail page.
  Fixed bug when reading a page with bigger LSN but of different type than was expected.
  Store undo_lsn first in CLR_END record
  
  Simplified some code by adding local variables.
  Changed log format for UNDO_ROW_DELETE to store total length of used blobs
storage/maria/ma_blockrec.h:
  Added prototypes for undo code.
storage/maria/ma_pagecache.c:
  Allow plain page to change to LSN page (needed in recovery to apply UNDO)
storage/maria/ma_recovery.c:
  Added undo handling of UNDO_ROW_DELETE and UNDO_ROW_UPDATE
storage/maria/ma_test1.c:
  Extended --test-undo option to allow us to die after insert or after delete.
  Fixed bug in printing key values when using -v
storage/maria/maria_def.h:
  Moved some variables around to be getter alignment
  Added length_buff buffer to be used during undo handling
parent 4d44697d
...@@ -1219,7 +1219,9 @@ int ha_maria::repair(THD *thd, HA_CHECK &param, bool do_optimize) ...@@ -1219,7 +1219,9 @@ int ha_maria::repair(THD *thd, HA_CHECK &param, bool do_optimize)
} }
if (!do_optimize || if (!do_optimize ||
((file->state->del || share->state.split != file->state->records) && ((file->state->del ||
((file->s->data_file_type != BLOCK_RECORD) &&
share->state.split != file->state->records)) &&
(!(param.testflag & T_QUICK) || (!(param.testflag & T_QUICK) ||
(share->state.changed & (STATE_NOT_OPTIMIZED_KEYS | (share->state.changed & (STATE_NOT_OPTIMIZED_KEYS |
STATE_NOT_OPTIMIZED_ROWS))))) STATE_NOT_OPTIMIZED_ROWS)))))
......
...@@ -1421,6 +1421,8 @@ static my_bool write_rest_of_head(MARIA_HA *info, uint position, ...@@ -1421,6 +1421,8 @@ static my_bool write_rest_of_head(MARIA_HA *info, uint position,
RETURN RETURN
0 ok 0 ok
row->space_on_head_page contains minimum number of bytes we
expect to put on the head page.
1 error 1 error
*/ */
...@@ -1457,6 +1459,7 @@ my_bool _ma_bitmap_find_place(MARIA_HA *info, MARIA_ROW *row, ...@@ -1457,6 +1459,7 @@ my_bool _ma_bitmap_find_place(MARIA_HA *info, MARIA_ROW *row,
position= ELEMENTS_RESERVED_FOR_MAIN_PART - 1; position= ELEMENTS_RESERVED_FOR_MAIN_PART - 1;
if (find_head(info, (uint) row->total_length, position)) if (find_head(info, (uint) row->total_length, position))
goto abort; goto abort;
row->space_on_head_page= row->total_length;
goto end; goto end;
} }
...@@ -1474,6 +1477,7 @@ my_bool _ma_bitmap_find_place(MARIA_HA *info, MARIA_ROW *row, ...@@ -1474,6 +1477,7 @@ my_bool _ma_bitmap_find_place(MARIA_HA *info, MARIA_ROW *row,
position= ELEMENTS_RESERVED_FOR_MAIN_PART - 1; position= ELEMENTS_RESERVED_FOR_MAIN_PART - 1;
if (find_head(info, head_length, position)) if (find_head(info, head_length, position))
goto abort; goto abort;
row->space_on_head_page= head_length;
goto end; goto end;
} }
...@@ -1490,6 +1494,7 @@ my_bool _ma_bitmap_find_place(MARIA_HA *info, MARIA_ROW *row, ...@@ -1490,6 +1494,7 @@ my_bool _ma_bitmap_find_place(MARIA_HA *info, MARIA_ROW *row,
position= ELEMENTS_RESERVED_FOR_MAIN_PART -2; /* Only head and tail */ position= ELEMENTS_RESERVED_FOR_MAIN_PART -2; /* Only head and tail */
if (find_head(info, row_length, position)) if (find_head(info, row_length, position))
goto abort; goto abort;
row->space_on_head_page= row_length;
rest_length= head_length - row_length; rest_length= head_length - row_length;
if (write_rest_of_head(info, position, rest_length)) if (write_rest_of_head(info, position, rest_length))
goto abort; goto abort;
......
...@@ -449,7 +449,7 @@ my_bool _ma_init_block_record(MARIA_HA *info) ...@@ -449,7 +449,7 @@ my_bool _ma_init_block_record(MARIA_HA *info)
&info->log_row_parts, &info->log_row_parts,
sizeof(*info->log_row_parts) * sizeof(*info->log_row_parts) *
(TRANSLOG_INTERNAL_PARTS + 2 + (TRANSLOG_INTERNAL_PARTS + 2 +
info->s->base.fields + 2), info->s->base.fields + 3),
&info->update_field_data, &info->update_field_data,
(info->s->base.fields * 4 + (info->s->base.fields * 4 +
info->s->base.max_field_lengths + 1 + 4), info->s->base.max_field_lengths + 1 + 4),
...@@ -655,9 +655,6 @@ static my_bool extend_area_on_page(uchar *buff, uchar *dir, ...@@ -655,9 +655,6 @@ static my_bool extend_area_on_page(uchar *buff, uchar *dir,
} }
/* /*
Check that a region is all zero Check that a region is all zero
...@@ -726,6 +723,23 @@ void _ma_unpin_all_pages(MARIA_HA *info, LSN undo_lsn) ...@@ -726,6 +723,23 @@ void _ma_unpin_all_pages(MARIA_HA *info, LSN undo_lsn)
} }
#ifdef NOT_YET_NEEDED
/* Calculate empty space on a page */
static uint empty_space_on_page(uchar *buff, uint block_size)
{
enum en_page_type;
page_type= (enum en_page_type) (buff[PAGE_TYPE_OFFSET] &
~(uchar) PAGE_CAN_BE_COMPACTED);
if (page_type == UNALLOCATED_PAGE)
return block_size;
if ((uint) page_type <= TAIL_PAGE)
return uint2korr(buff+EMPTY_SPACE_OFFSET);
return 0; /* Blob page */
}
#endif
/* /*
Find free position in directory Find free position in directory
...@@ -917,8 +931,8 @@ static void calc_record_size(MARIA_HA *info, const uchar *record, ...@@ -917,8 +931,8 @@ static void calc_record_size(MARIA_HA *info, const uchar *record,
{ {
uint length, field_length_data_length; uint length, field_length_data_length;
const uchar *field_pos= record + column->offset; const uchar *field_pos= record + column->offset;
/* 256 is correct as this includes the length uchar */
/* 256 is correct as this includes the length uchar */
field_length_data[0]= field_pos[0]; field_length_data[0]= field_pos[0];
if (column->length <= 256) if (column->length <= 256)
{ {
...@@ -1174,9 +1188,9 @@ static void make_empty_page(uchar *buff, uint block_size, uint page_type) ...@@ -1174,9 +1188,9 @@ static void make_empty_page(uchar *buff, uint block_size, uint page_type)
struct st_row_pos_info struct st_row_pos_info
{ {
uchar *buff; /* page buffer */ uchar *buff; /* page buffer */
uchar *data; /* Place for data */ uchar *data; /* Place for data */
uchar *dir; /* Directory */ uchar *dir; /* Directory */
uint length; /* Length for data */ uint length; /* Length for data */
uint rownr; /* Offset in directory */ uint rownr; /* Offset in directory */
uint empty_space; /* Space left on page */ uint empty_space; /* Space left on page */
...@@ -1228,16 +1242,20 @@ static my_bool get_head_or_tail_page(MARIA_HA *info, ...@@ -1228,16 +1242,20 @@ static my_bool get_head_or_tail_page(MARIA_HA *info,
if (res->length < length) if (res->length < length)
{ {
if (res->empty_space + res->length < length) if (res->empty_space + res->length >= length)
{ {
compact_page(res->buff, block_size, res->rownr, 1); compact_page(res->buff, block_size, res->rownr, 1);
/* All empty space are now after current position */ /* All empty space are now after current position */
dir= (res->buff + block_size - DIR_ENTRY_SIZE * res->rownr - dir= (res->buff + block_size - DIR_ENTRY_SIZE * res->rownr -
PAGE_SUFFIX_SIZE); DIR_ENTRY_SIZE - PAGE_SUFFIX_SIZE);
res->length= res->empty_space= uint2korr(dir+2); res->length= res->empty_space= uint2korr(dir+2);
} }
if (res->length < length) if (res->length < length)
{
DBUG_PRINT("error", ("length: %u res->length: %u empty_space: %u",
length, res->length, res->empty_space));
goto crashed; /* Wrong bitmap information */ goto crashed; /* Wrong bitmap information */
}
} }
res->dir= dir; res->dir= dir;
res->data= res->buff + uint2korr(dir); res->data= res->buff + uint2korr(dir);
...@@ -1631,6 +1649,7 @@ static my_bool free_full_page_range(MARIA_HA *info, ulonglong page, uint count) ...@@ -1631,6 +1649,7 @@ static my_bool free_full_page_range(MARIA_HA *info, ulonglong page, uint count)
/** /**
@brief Write a record to a (set of) pages @brief Write a record to a (set of) pages
@fn write_block_record()
@param info Maria handler @param info Maria handler
@param old_record Original record in case of update; NULL in case of @param old_record Original record in case of update; NULL in case of
insert insert
...@@ -1640,6 +1659,7 @@ static my_bool free_full_page_range(MARIA_HA *info, ulonglong page, uint count) ...@@ -1640,6 +1659,7 @@ static my_bool free_full_page_range(MARIA_HA *info, ulonglong page, uint count)
@param map_blocks On which pages the record should be stored @param map_blocks On which pages the record should be stored
@param row_pos Position on head page where to put head part of @param row_pos Position on head page where to put head part of
record record
@param undo_lsn <> 0 if we are in UNDO
@note @note
On return all pinned pages are released. On return all pinned pages are released.
...@@ -1654,7 +1674,8 @@ static my_bool write_block_record(MARIA_HA *info, ...@@ -1654,7 +1674,8 @@ static my_bool write_block_record(MARIA_HA *info,
MARIA_ROW *row, MARIA_ROW *row,
MARIA_BITMAP_BLOCKS *bitmap_blocks, MARIA_BITMAP_BLOCKS *bitmap_blocks,
my_bool head_block_is_read, my_bool head_block_is_read,
struct st_row_pos_info *row_pos) struct st_row_pos_info *row_pos,
LSN undo_lsn)
{ {
uchar *data, *end_of_data, *tmp_data_used, *tmp_data; uchar *data, *end_of_data, *tmp_data_used, *tmp_data;
uchar *row_extents_first_part, *row_extents_second_part; uchar *row_extents_first_part, *row_extents_second_part;
...@@ -1862,7 +1883,7 @@ static my_bool write_block_record(MARIA_HA *info, ...@@ -1862,7 +1883,7 @@ static my_bool write_block_record(MARIA_HA *info,
{ {
/* Update page directory */ /* Update page directory */
uint length= (uint) (data - row_pos->data); uint length= (uint) (data - row_pos->data);
DBUG_PRINT("info", ("head length: %u", length)); DBUG_PRINT("info", ("Used head length on page: %u", length));
if (length < info->s->base.min_row_length) if (length < info->s->base.min_row_length)
{ {
uint diff_length= info->s->base.min_row_length - length; uint diff_length= info->s->base.min_row_length - length;
...@@ -2256,51 +2277,70 @@ static my_bool write_block_record(MARIA_HA *info, ...@@ -2256,51 +2277,70 @@ static my_bool write_block_record(MARIA_HA *info,
goto disk_err; goto disk_err;
} }
/* Write UNDO record */ /* Write UNDO or CLR record */
lsn= 0;
if (share->now_transactional) if (share->now_transactional)
{ {
uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE +
PAGE_STORE_SIZE + DIRPOS_STORE_SIZE];
LEX_STRING *log_array= info->log_row_parts; LEX_STRING *log_array= info->log_row_parts;
/* LOGREC_UNDO_ROW_INSERT & LOGREC_UNDO_ROW_INSERT share same header */ if (undo_lsn)
lsn_store(log_data, info->trn->undo_lsn); {
page_store(log_data+ LSN_STORE_SIZE + FILEID_STORE_SIZE, uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE];
head_block->page);
dirpos_store(log_data+ LSN_STORE_SIZE + FILEID_STORE_SIZE +
PAGE_STORE_SIZE,
row_pos->rownr);
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data; /* undo_lsn must be first for compression to work */
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data); lsn_store(log_data, undo_lsn);
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
if (!old_record) if (translog_write_record(&lsn, LOGREC_CLR_END,
{
/* Write UNDO log record for the INSERT */
if (translog_write_record(&lsn, LOGREC_UNDO_ROW_INSERT,
info->trn, info, sizeof(log_data), info->trn, info, sizeof(log_data),
TRANSLOG_INTERNAL_PARTS + 1, log_array, TRANSLOG_INTERNAL_PARTS + 1, log_array,
log_data + LSN_STORE_SIZE)) log_data+ FILEID_STORE_SIZE))
goto disk_err; goto disk_err;
} }
else else
{ {
/* Write UNDO log record for the UPDATE */ uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE +
size_t row_length; PAGE_STORE_SIZE + DIRPOS_STORE_SIZE];
uint row_parts_count;
row_length= fill_update_undo_parts(info, old_record, record, /* LOGREC_UNDO_ROW_INSERT & LOGREC_UNDO_ROW_INSERT share same header */
info->log_row_parts + lsn_store(log_data, info->trn->undo_lsn);
TRANSLOG_INTERNAL_PARTS + 1, page_store(log_data+ LSN_STORE_SIZE + FILEID_STORE_SIZE,
&row_parts_count); head_block->page);
if (translog_write_record(&lsn, LOGREC_UNDO_ROW_UPDATE, info->trn, dirpos_store(log_data+ LSN_STORE_SIZE + FILEID_STORE_SIZE +
info, sizeof(log_data) + row_length, PAGE_STORE_SIZE,
TRANSLOG_INTERNAL_PARTS + 1 + row_parts_count, row_pos->rownr);
log_array, log_data + LSN_STORE_SIZE))
goto disk_err; log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
if (!old_record)
{
/* Write UNDO log record for the INSERT */
if (translog_write_record(&lsn, LOGREC_UNDO_ROW_INSERT,
info->trn, info, sizeof(log_data),
TRANSLOG_INTERNAL_PARTS + 1, log_array,
log_data + LSN_STORE_SIZE))
goto disk_err;
}
else
{
/* Write UNDO log record for the UPDATE */
size_t row_length;
uint row_parts_count;
row_length= fill_update_undo_parts(info, old_record, record,
info->log_row_parts +
TRANSLOG_INTERNAL_PARTS + 1,
&row_parts_count);
if (translog_write_record(&lsn, LOGREC_UNDO_ROW_UPDATE, info->trn,
info, sizeof(log_data) + row_length,
TRANSLOG_INTERNAL_PARTS + 1 + row_parts_count,
log_array, log_data + LSN_STORE_SIZE))
goto disk_err;
}
} }
} }
_ma_unpin_all_pages(info, lsn);
_ma_unpin_all_pages(info, info->trn->undo_lsn);
if (tmp_data_used) if (tmp_data_used)
{ {
...@@ -2379,7 +2419,49 @@ static my_bool write_block_record(MARIA_HA *info, ...@@ -2379,7 +2419,49 @@ static my_bool write_block_record(MARIA_HA *info,
/* /*
Write a record (to get the row id for it) @brief Write a record
@fn allocate_and_write_block_record()
@param info Maria handler
@param record Record to write
@param row Information about fields in 'record'
@param undo_lsn <> 0 if in undo
@return
@retval 0 ok
@retval 1 Error
*/
static my_bool allocate_and_write_block_record(MARIA_HA *info,
const uchar *record,
MARIA_ROW *row,
LSN undo_lsn)
{
struct st_row_pos_info row_pos;
MARIA_BITMAP_BLOCKS *blocks= &row->insert_blocks;
DBUG_ENTER("allocate_and_write_block_record");
if (_ma_bitmap_find_place(info, row, blocks))
DBUG_RETURN(1); /* Error reading bitmap */
/* page will be pinned & locked by get_head_or_tail_page */
if (get_head_or_tail_page(info, blocks->block, info->buff,
row->space_on_head_page, HEAD_PAGE,
PAGECACHE_LOCK_WRITE, &row_pos))
DBUG_RETURN(1);
row->lastpos= ma_recordpos(blocks->block->page, row_pos.rownr);
if (info->s->calc_checksum)
row->checksum= (info->s->calc_checksum)(info,record);
if (write_block_record(info, (uchar*) 0, record, row,
blocks, blocks->block->org_bitmap_value != 0,
&row_pos, undo_lsn))
DBUG_RETURN(1); /* Error reading bitmap */
DBUG_PRINT("exit", ("Rowid: %lu", (ulong) row->lastpos));
DBUG_RETURN(0);
}
/*
Write a record and return rowid for it
SYNOPSIS SYNOPSIS
_ma_write_init_block_record() _ma_write_init_block_record()
...@@ -2397,27 +2479,11 @@ static my_bool write_block_record(MARIA_HA *info, ...@@ -2397,27 +2479,11 @@ static my_bool write_block_record(MARIA_HA *info,
MARIA_RECORD_POS _ma_write_init_block_record(MARIA_HA *info, MARIA_RECORD_POS _ma_write_init_block_record(MARIA_HA *info,
const uchar *record) const uchar *record)
{ {
MARIA_BITMAP_BLOCKS *blocks= &info->cur_row.insert_blocks;
struct st_row_pos_info row_pos;
DBUG_ENTER("_ma_write_init_block_record"); DBUG_ENTER("_ma_write_init_block_record");
calc_record_size(info, record, &info->cur_row); calc_record_size(info, record, &info->cur_row);
if (_ma_bitmap_find_place(info, &info->cur_row, blocks)) if (allocate_and_write_block_record(info, record, &info->cur_row, 0))
DBUG_RETURN(HA_OFFSET_ERROR); /* Error reading bitmap */
/* page will be pinned & locked by get_head_or_tail_page */
if (get_head_or_tail_page(info, blocks->block, info->buff,
info->s->base.min_row_length, HEAD_PAGE,
PAGECACHE_LOCK_WRITE, &row_pos))
DBUG_RETURN(HA_OFFSET_ERROR); DBUG_RETURN(HA_OFFSET_ERROR);
info->cur_row.lastpos= ma_recordpos(blocks->block->page, row_pos.rownr);
if (info->s->calc_checksum)
info->cur_row.checksum= (info->s->calc_checksum)(info,record);
if (write_block_record(info, (uchar*) 0, record, &info->cur_row,
blocks, blocks->block->org_bitmap_value != 0,
&row_pos))
DBUG_RETURN(HA_OFFSET_ERROR); /* Error reading bitmap */
DBUG_PRINT("exit", ("Rowid: %lu", (ulong) info->cur_row.lastpos));
info->s->state.split++;
DBUG_RETURN(info->cur_row.lastpos); DBUG_RETURN(info->cur_row.lastpos);
} }
...@@ -2603,7 +2669,7 @@ my_bool _ma_update_block_record(MARIA_HA *info, MARIA_RECORD_POS record_pos, ...@@ -2603,7 +2669,7 @@ my_bool _ma_update_block_record(MARIA_HA *info, MARIA_RECORD_POS record_pos,
if (cur_row->extents_count && free_full_pages(info, cur_row)) if (cur_row->extents_count && free_full_pages(info, cur_row))
goto err; goto err;
DBUG_RETURN(write_block_record(info, oldrec, record, new_row, blocks, DBUG_RETURN(write_block_record(info, oldrec, record, new_row, blocks,
1, &row_pos)); 1, &row_pos, 0));
} }
/* /*
Allocate all size in block for record Allocate all size in block for record
...@@ -2636,7 +2702,7 @@ my_bool _ma_update_block_record(MARIA_HA *info, MARIA_RECORD_POS record_pos, ...@@ -2636,7 +2702,7 @@ my_bool _ma_update_block_record(MARIA_HA *info, MARIA_RECORD_POS record_pos,
row_pos.data= buff + uint2korr(dir); row_pos.data= buff + uint2korr(dir);
row_pos.length= head_length; row_pos.length= head_length;
DBUG_RETURN(write_block_record(info, oldrec, record, new_row, blocks, 1, DBUG_RETURN(write_block_record(info, oldrec, record, new_row, blocks, 1,
&row_pos)); &row_pos, 0));
err: err:
_ma_unpin_all_pages(info, 0); _ma_unpin_all_pages(info, 0);
...@@ -3218,6 +3284,7 @@ static my_bool read_long_data(MARIA_HA *info, uchar *to, ulong length, ...@@ -3218,6 +3284,7 @@ static my_bool read_long_data(MARIA_HA *info, uchar *to, ulong length,
cur_row.extents_counts contains number of extents cur_row.extents_counts contains number of extents
cur_row.empty_bits is set to empty bits cur_row.empty_bits is set to empty bits
cur_row.field_lengths contains packed length of all fields cur_row.field_lengths contains packed length of all fields
cur_row.blob_length contains total length of all blobs.
RETURN RETURN
0 ok 0 ok
...@@ -3233,6 +3300,7 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record, ...@@ -3233,6 +3300,7 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record,
my_bool found_blob= 0; my_bool found_blob= 0;
MARIA_EXTENT_CURSOR extent; MARIA_EXTENT_CURSOR extent;
MARIA_COLUMNDEF *column, *end_column; MARIA_COLUMNDEF *column, *end_column;
MARIA_ROW *cur_row= &info->cur_row;
DBUG_ENTER("_ma_read_block_record2"); DBUG_ENTER("_ma_read_block_record2");
LINT_INIT(field_length_data); LINT_INIT(field_length_data);
...@@ -3242,8 +3310,9 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record, ...@@ -3242,8 +3310,9 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record,
flag= (uint) (uchar) data[0]; flag= (uint) (uchar) data[0];
cur_null_bytes= share->base.original_null_bytes; cur_null_bytes= share->base.original_null_bytes;
null_bytes= share->base.null_bytes; null_bytes= share->base.null_bytes;
info->cur_row.head_length= (uint) (end_of_data - data); cur_row->head_length= (uint) (end_of_data - data);
info->cur_row.full_page_count= info->cur_row.tail_count= 0; cur_row->full_page_count= cur_row->tail_count= 0;
cur_row->blob_length= 0;
/* Skip trans header (for now, until we have MVCC csupport) */ /* Skip trans header (for now, until we have MVCC csupport) */
data+= total_header_size[(flag & PRECALC_HEADER_BITMASK)]; data+= total_header_size[(flag & PRECALC_HEADER_BITMASK)];
...@@ -3259,22 +3328,22 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record, ...@@ -3259,22 +3328,22 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record,
Get number of extents and first extent Get number of extents and first extent
*/ */
get_key_length(row_extents, data); get_key_length(row_extents, data);
info->cur_row.extents_count= row_extents; cur_row->extents_count= row_extents;
row_extent_size= row_extents * ROW_EXTENT_SIZE; row_extent_size= row_extents * ROW_EXTENT_SIZE;
if (info->cur_row.extents_buffer_length < row_extent_size && if (cur_row->extents_buffer_length < row_extent_size &&
_ma_alloc_buffer(&info->cur_row.extents, _ma_alloc_buffer(&cur_row->extents,
&info->cur_row.extents_buffer_length, &cur_row->extents_buffer_length,
row_extent_size)) row_extent_size))
DBUG_RETURN(my_errno); DBUG_RETURN(my_errno);
memcpy(info->cur_row.extents, data, ROW_EXTENT_SIZE); memcpy(cur_row->extents, data, ROW_EXTENT_SIZE);
data+= ROW_EXTENT_SIZE; data+= ROW_EXTENT_SIZE;
init_extent(&extent, info->cur_row.extents, row_extents, init_extent(&extent, cur_row->extents, row_extents,
info->cur_row.tail_positions); cur_row->tail_positions);
} }
else else
{ {
info->cur_row.extents_count= 0; cur_row->extents_count= 0;
(*info->cur_row.tail_positions)= 0; (*cur_row->tail_positions)= 0;
extent.page_count= 0; extent.page_count= 0;
extent.extent_count= 1; extent.extent_count= 1;
} }
...@@ -3284,7 +3353,7 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record, ...@@ -3284,7 +3353,7 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record,
if (share->base.max_field_lengths) if (share->base.max_field_lengths)
{ {
get_key_length(field_lengths, data); get_key_length(field_lengths, data);
info->cur_row.field_lengths_length= field_lengths; cur_row->field_lengths_length= field_lengths;
#ifdef SANITY_CHECKS #ifdef SANITY_CHECKS
if (field_lengths > share->base.max_field_lengths) if (field_lengths > share->base.max_field_lengths)
goto err; goto err;
...@@ -3292,7 +3361,7 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record, ...@@ -3292,7 +3361,7 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record,
} }
if (share->calc_checksum) if (share->calc_checksum)
info->cur_row.checksum= (uint) (uchar) *data++; cur_row->checksum= (uint) (uchar) *data++;
/* data now points on null bits */ /* data now points on null bits */
memcpy(record, data, cur_null_bytes); memcpy(record, data, cur_null_bytes);
if (unlikely(cur_null_bytes != null_bytes)) if (unlikely(cur_null_bytes != null_bytes))
...@@ -3305,7 +3374,7 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record, ...@@ -3305,7 +3374,7 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record,
} }
data+= null_bytes; data+= null_bytes;
/* We copy the empty bits to be able to use them for delete/update */ /* We copy the empty bits to be able to use them for delete/update */
memcpy(info->cur_row.empty_bits, data, share->base.pack_bytes); memcpy(cur_row->empty_bits, data, share->base.pack_bytes);
data+= share->base.pack_bytes; data+= share->base.pack_bytes;
/* TODO: Use field offsets, instead of just skipping them */ /* TODO: Use field offsets, instead of just skipping them */
...@@ -3313,11 +3382,11 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record, ...@@ -3313,11 +3382,11 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record,
/* /*
Read row extents (note that first extent was already read into Read row extents (note that first extent was already read into
info->cur_row.extents above) cur_row->extents above)
*/ */
if (row_extents > 1) if (row_extents > 1)
{ {
if (read_long_data(info, info->cur_row.extents + ROW_EXTENT_SIZE, if (read_long_data(info, cur_row->extents + ROW_EXTENT_SIZE,
(row_extents - 1) * ROW_EXTENT_SIZE, (row_extents - 1) * ROW_EXTENT_SIZE,
&extent, &data, &end_of_data)) &extent, &data, &end_of_data))
DBUG_RETURN(my_errno); DBUG_RETURN(my_errno);
...@@ -3342,7 +3411,7 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record, ...@@ -3342,7 +3411,7 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record,
/* Read array of field lengths. This may be stored in several extents */ /* Read array of field lengths. This may be stored in several extents */
if (field_lengths) if (field_lengths)
{ {
field_length_data= info->cur_row.field_lengths; field_length_data= cur_row->field_lengths;
if (read_long_data(info, field_length_data, field_lengths, &extent, if (read_long_data(info, field_length_data, field_lengths, &extent,
&data, &end_of_data)) &data, &end_of_data))
DBUG_RETURN(my_errno); DBUG_RETURN(my_errno);
...@@ -3356,7 +3425,7 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record, ...@@ -3356,7 +3425,7 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record,
uchar *field_pos= record + column->offset; uchar *field_pos= record + column->offset;
/* First check if field is present in record */ /* First check if field is present in record */
if ((record[column->null_pos] & column->null_bit) || if ((record[column->null_pos] & column->null_bit) ||
(info->cur_row.empty_bits[column->empty_pos] & column->empty_bit)) (cur_row->empty_bits[column->empty_pos] & column->empty_bit))
{ {
if (type == FIELD_SKIP_ENDSPACE) if (type == FIELD_SKIP_ENDSPACE)
bfill(record + column->offset, column->length, ' '); bfill(record + column->offset, column->length, ' ');
...@@ -3432,13 +3501,14 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record, ...@@ -3432,13 +3501,14 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record,
{ {
uint size_length; uint size_length;
if ((record[blob_field->null_pos] & blob_field->null_bit) || if ((record[blob_field->null_pos] & blob_field->null_bit) ||
(info->cur_row.empty_bits[blob_field->empty_pos] & (cur_row->empty_bits[blob_field->empty_pos] &
blob_field->empty_bit)) blob_field->empty_bit))
continue; continue;
size_length= blob_field->length - portable_sizeof_char_ptr; size_length= blob_field->length - portable_sizeof_char_ptr;
blob_lengths+= _ma_calc_blob_length(size_length, length_data); blob_lengths+= _ma_calc_blob_length(size_length, length_data);
length_data+= size_length; length_data+= size_length;
} }
cur_row->blob_length= blob_lengths;
DBUG_PRINT("info", ("Total blob length: %lu", blob_lengths)); DBUG_PRINT("info", ("Total blob length: %lu", blob_lengths));
if (_ma_alloc_buffer(&info->rec_buff, &info->rec_buff_size, if (_ma_alloc_buffer(&info->rec_buff, &info->rec_buff_size,
blob_lengths)) blob_lengths))
...@@ -4012,6 +4082,37 @@ uint ma_calc_length_for_store_length(ulong nr) ...@@ -4012,6 +4082,37 @@ uint ma_calc_length_for_store_length(ulong nr)
} }
/* Retrive a stored number */
static ulong ma_get_length(uchar **packet)
{
reg1 uchar *pos= *packet;
if (*pos < 251)
{
(*packet)++;
return (ulong) *pos;
}
if (*pos == 251)
{
(*packet)+= 2;
return (ulong) pos[1];
}
if (*pos == 252)
{
(*packet)+= 3;
return (ulong) uint2korr(pos+1);
}
if (*pos == 253)
{
(*packet)+= 4;
return (ulong) uint3korr(pos+1);
}
DBUG_ASSERT(*pos == 254);
(*packet)+= 5;
return (ulong) uint4korr(pos+1);
}
/* /*
Fill array with pointers to field parts to be stored in log for insert Fill array with pointers to field parts to be stored in log for insert
...@@ -4058,7 +4159,7 @@ static size_t fill_insert_undo_parts(MARIA_HA *info, const uchar *record, ...@@ -4058,7 +4159,7 @@ static size_t fill_insert_undo_parts(MARIA_HA *info, const uchar *record,
if (share->base.max_field_lengths) if (share->base.max_field_lengths)
{ {
/* Store field lenghts, with a prefix of number of bytes */ /* Store length of all not empty char, varchar and blob fields */
log_parts->str= field_lengths-2; log_parts->str= field_lengths-2;
log_parts->length= info->cur_row.field_lengths_length+2; log_parts->length= info->cur_row.field_lengths_length+2;
int2store(log_parts->str, info->cur_row.field_lengths_length); int2store(log_parts->str, info->cur_row.field_lengths_length);
...@@ -4066,6 +4167,17 @@ static size_t fill_insert_undo_parts(MARIA_HA *info, const uchar *record, ...@@ -4066,6 +4167,17 @@ static size_t fill_insert_undo_parts(MARIA_HA *info, const uchar *record,
log_parts++; log_parts++;
} }
if (share->base.blobs)
{
/* Store total blob length to make buffer allocation easier during undo */
log_parts->str= info->length_buff;
log_parts->length= (uint) (ma_store_length(log_parts->str,
info->cur_row.blob_length) -
(uchar*) log_parts->str);
row_length+= log_parts->length;
log_parts++;
}
/* Handle constant length fields that are always present */ /* Handle constant length fields that are always present */
for (column= share->columndef, for (column= share->columndef,
end_column= column+ share->base.fixed_not_null_fields; end_column= column+ share->base.fixed_not_null_fields;
...@@ -4574,14 +4686,18 @@ uint _ma_apply_redo_purge_row_head_or_tail(MARIA_HA *info, LSN lsn, ...@@ -4574,14 +4686,18 @@ uint _ma_apply_redo_purge_row_head_or_tail(MARIA_HA *info, LSN lsn,
{ {
MARIA_SHARE *share= info->s; MARIA_SHARE *share= info->s;
ulonglong page; ulonglong page;
uint record_number, empty_space; uint rownr, empty_space;
uint block_size= share->block_size; uint block_size= share->block_size;
uchar *buff= info->keyread_buff; uchar *buff= info->keyread_buff;
DBUG_ENTER("_ma_apply_redo_purge_row_head_or_tail"); DBUG_ENTER("_ma_apply_redo_purge_row_head_or_tail");
info->keyread_buff_used= 1;
page= page_korr(header); page= page_korr(header);
record_number= dirpos_korr(header+PAGE_STORE_SIZE); rownr= dirpos_korr(header+PAGE_STORE_SIZE);
DBUG_PRINT("enter", ("rowid: %lu page: %lu rownr: %u",
(ulong) ma_recordpos(page, rownr),
(ulong) page, rownr));
info->keyread_buff_used= 1;
if (!(buff= pagecache_read(share->pagecache, if (!(buff= pagecache_read(share->pagecache,
&info->dfile, &info->dfile,
...@@ -4589,18 +4705,27 @@ uint _ma_apply_redo_purge_row_head_or_tail(MARIA_HA *info, LSN lsn, ...@@ -4589,18 +4705,27 @@ uint _ma_apply_redo_purge_row_head_or_tail(MARIA_HA *info, LSN lsn,
buff, PAGECACHE_PLAIN_PAGE, buff, PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED, 0))) PAGECACHE_LOCK_LEFT_UNLOCKED, 0)))
DBUG_RETURN(my_errno); DBUG_RETURN(my_errno);
DBUG_ASSERT((buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) == (uchar) page_type);
if (lsn_korr(buff) >= lsn) if (lsn_korr(buff) >= lsn)
{ {
/* Already applied */ /*
empty_space= uint2korr(buff + EMPTY_SPACE_OFFSET); Already applied
if (_ma_bitmap_set(info, page, page_type == HEAD_PAGE, empty_space)) Note that in case the page is not anymore a head or tail page
DBUG_RETURN(my_errno); a future redo will fix the bitmap.
*/
if ((buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) == page_type)
{
empty_space= uint2korr(buff+EMPTY_SPACE_OFFSET);
if (_ma_bitmap_set(info, page, page_type == HEAD_PAGE,
empty_space))
DBUG_RETURN(my_errno);
}
DBUG_RETURN(0); DBUG_RETURN(0);
} }
if (delete_dir_entry(buff, block_size, record_number, &empty_space) < 0) DBUG_ASSERT((buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) == (uchar) page_type);
if (delete_dir_entry(buff, block_size, rownr, &empty_space) < 0)
DBUG_RETURN(HA_ERR_WRONG_IN_RECORD); DBUG_RETURN(HA_ERR_WRONG_IN_RECORD);
lsn_store(buff, lsn); lsn_store(buff, lsn);
...@@ -4698,7 +4823,7 @@ my_bool _ma_apply_undo_row_insert(MARIA_HA *info, LSN undo_lsn, ...@@ -4698,7 +4823,7 @@ my_bool _ma_apply_undo_row_insert(MARIA_HA *info, LSN undo_lsn,
const uchar *header) const uchar *header)
{ {
ulonglong page; ulonglong page;
uint record_number; uint rownr;
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1]; LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE], *buff; uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE], *buff;
my_bool res= 1; my_bool res= 1;
...@@ -4707,9 +4832,8 @@ my_bool _ma_apply_undo_row_insert(MARIA_HA *info, LSN undo_lsn, ...@@ -4707,9 +4832,8 @@ my_bool _ma_apply_undo_row_insert(MARIA_HA *info, LSN undo_lsn,
DBUG_ENTER("_ma_apply_undo_row_insert"); DBUG_ENTER("_ma_apply_undo_row_insert");
page= page_korr(header); page= page_korr(header);
record_number= dirpos_korr(header + PAGE_STORE_SIZE); rownr= dirpos_korr(header + PAGE_STORE_SIZE);
DBUG_PRINT("enter", ("Page: %lu record_number: %u", (ulong) page, DBUG_PRINT("enter", ("Page: %lu rownr: %u", (ulong) page, rownr));
record_number));
if (!(buff= pagecache_read(info->s->pagecache, if (!(buff= pagecache_read(info->s->pagecache,
&info->dfile, page, 0, &info->dfile, page, 0,
...@@ -4722,24 +4846,25 @@ my_bool _ma_apply_undo_row_insert(MARIA_HA *info, LSN undo_lsn, ...@@ -4722,24 +4846,25 @@ my_bool _ma_apply_undo_row_insert(MARIA_HA *info, LSN undo_lsn,
page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK; page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK;
push_dynamic(&info->pinned_pages, (void*) &page_link); push_dynamic(&info->pinned_pages, (void*) &page_link);
if (read_row_extent_info(info, buff, record_number)) if (read_row_extent_info(info, buff, rownr))
DBUG_RETURN(1); DBUG_RETURN(1);
if (delete_head_or_tail(info, page, record_number, 1, 1) || if (delete_head_or_tail(info, page, rownr, 1, 1) ||
delete_tails(info, info->cur_row.tail_positions)) delete_tails(info, info->cur_row.tail_positions))
goto err; goto err;
if (info->cur_row.extents && free_full_pages(info, &info->cur_row)) if (info->cur_row.extents && free_full_pages(info, &info->cur_row))
goto err; goto err;
lsn_store(log_data + FILEID_STORE_SIZE, undo_lsn); /* undo_lsn must be first for compression to work */
lsn_store(log_data, undo_lsn);
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data; log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data); log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
if (translog_write_record(&lsn, LOGREC_CLR_END, if (translog_write_record(&lsn, LOGREC_CLR_END,
info->trn, info, sizeof(log_data), info->trn, info, sizeof(log_data),
TRANSLOG_INTERNAL_PARTS + 1, log_array, TRANSLOG_INTERNAL_PARTS + 1, log_array,
log_data)) log_data+ FILEID_STORE_SIZE))
goto err; goto err;
info->s->state.state.records--; info->s->state.state.records--;
...@@ -4748,3 +4873,183 @@ my_bool _ma_apply_undo_row_insert(MARIA_HA *info, LSN undo_lsn, ...@@ -4748,3 +4873,183 @@ my_bool _ma_apply_undo_row_insert(MARIA_HA *info, LSN undo_lsn,
_ma_unpin_all_pages(info, lsn); _ma_unpin_all_pages(info, lsn);
DBUG_RETURN(res); DBUG_RETURN(res);
} }
/* Execute undo of a row delete (insert the row back somewhere) */
my_bool _ma_apply_undo_row_delete(MARIA_HA *info, LSN undo_lsn,
const uchar *header, size_t length)
{
uchar *record;
const uchar *null_bits, *field_length_data;
MARIA_SHARE *share= info->s;
MARIA_ROW row;
uint *null_field_lengths;
ulong *blob_lengths;
MARIA_COLUMNDEF *column, *end_column;
DBUG_ENTER("_ma_apply_undo_row_delete");
/*
Use cur row as a base; We need to make a copy as we will change
some buffers to point directly to 'header'
*/
memcpy(&row, &info->cur_row, sizeof(row));
null_field_lengths= row.null_field_lengths;
blob_lengths= row.blob_lengths;
/*
Fill in info->cur_row with information about the row, like in
calc_record_size(), to be used by write_block_record()
*/
row.normal_length= row.char_length= row.varchar_length=
row.blob_length= row.extents_count= row.field_lengths_length= 0;
null_bits= header;
header+= share->base.null_bytes;
row.empty_bits= (uchar*) header;
header+= share->base.pack_bytes;
if (share->base.max_field_lengths)
{
row.field_lengths_length= uint2korr(header);
row.field_lengths= (uchar*) header + 2 ;
header+= 2 + row.field_lengths_length;
}
if (share->base.blobs)
row.blob_length= ma_get_length((uchar**) &header);
/* We need to build up a record (without blobs) in rec_buff */
if (_ma_alloc_buffer(&info->rec_buff, &info->rec_buff_size,
length - row.blob_length))
DBUG_RETURN(1);
record= info->rec_buff;
memcpy(record, null_bits, share->base.null_bytes);
/* Copy field information from header to record */
/* Handle constant length fields that are always present */
for (column= share->columndef,
end_column= column+ share->base.fixed_not_null_fields;
column < end_column;
column++)
{
memcpy(record + column->offset, header, column->length);
header+= column->length;
}
/* Handle NULL fields and CHAR/VARCHAR fields */
field_length_data= row.field_lengths;
for (end_column= share->columndef + share->base.fields;
column < end_column;
column++, null_field_lengths++)
{
if ((record[column->null_pos] & column->null_bit) ||
row.empty_bits[column->empty_pos] & column->empty_bit)
{
if (column->type != FIELD_BLOB)
*null_field_lengths= 0;
else
*blob_lengths++= 0;
if (share->calc_checksum)
bzero(record + column->offset, column->length);
continue;
}
switch ((enum en_fieldtype) column->type) {
case FIELD_CHECK:
case FIELD_NORMAL: /* Fixed length field */
case FIELD_ZERO:
case FIELD_SKIP_PRESPACE: /* Not packed */
case FIELD_SKIP_ZERO: /* Fixed length field */
row.normal_length+= column->length;
*null_field_lengths= column->length;
memcpy(record + column->offset, header, column->length);
header+= column->length;
break;
case FIELD_SKIP_ENDSPACE: /* CHAR */
if (column->length <= 255)
length= (uint) *field_length_data++;
else
{
length= uint2korr(field_length_data);
field_length_data+= 2;
}
row.char_length+= length;
*null_field_lengths= length;
memcpy(record + column->offset, header, length);
if (share->calc_checksum)
bfill(record + column->offset + length, (column->length - length),
' ');
header+= length;
break;
case FIELD_VARCHAR:
{
uint length;
uchar *field_pos= record + column->offset;
/* 256 is correct as this includes the length uchar */
if (column->length <= 256)
{
field_pos[0]= *field_length_data;
length= (uint) *field_length_data++;
}
else
{
field_pos[0]= field_length_data[0];
field_pos[1]= field_length_data[1];
length= uint2korr(field_length_data);
field_length_data+= 2;
}
row.varchar_length+= length;
*null_field_lengths= length;
memcpy(record + column->offset, header, length);
header+= length;
break;
}
case FIELD_BLOB:
{
/* Copy length of blob and pointer to blob data to record */
uchar *field_pos= record + column->offset;
uint size_length= column->length - portable_sizeof_char_ptr;
ulong blob_length= _ma_calc_blob_length(size_length, field_length_data);
memcpy(field_pos, field_length_data, size_length);
field_length_data+= size_length;
memcpy(field_pos + size_length, &header, sizeof(&header));
header+= blob_length;
*blob_lengths++= blob_length;
row.blob_length+= blob_length;
break;
}
default:
DBUG_ASSERT(0);
}
}
row.head_length= (row.base_length +
share->base.fixed_not_null_fields_length +
row.field_lengths_length +
size_to_store_key_length(row.field_lengths_length) +
row.normal_length +
row.char_length + row.varchar_length);
row.total_length= (row.head_length + row.blob_length);
if (row.total_length < share->base.min_row_length)
row.total_length= share->base.min_row_length;
/* Row is now up to date. Time to insert the record */
DBUG_RETURN(allocate_and_write_block_record(info, record, &row, undo_lsn));
}
/* Execute undo of a row update */
my_bool _ma_apply_undo_row_update(MARIA_HA *info __attribute__ ((unused)),
LSN undo_lsn __attribute__ ((unused)),
const uchar *header __attribute__ ((unused)),
size_t length __attribute__ ((unused)))
{
DBUG_ENTER("_ma_apply_undo_row_update");
fprintf(stderr, "Undo of row update is not yet done\n");
exit(1);
DBUG_RETURN(0);
}
...@@ -189,3 +189,7 @@ uint _ma_apply_redo_purge_blocks(MARIA_HA *info, LSN lsn, ...@@ -189,3 +189,7 @@ uint _ma_apply_redo_purge_blocks(MARIA_HA *info, LSN lsn,
const uchar *header); const uchar *header);
my_bool _ma_apply_undo_row_insert(MARIA_HA *info, LSN undo_lsn, my_bool _ma_apply_undo_row_insert(MARIA_HA *info, LSN undo_lsn,
const uchar *header); const uchar *header);
my_bool _ma_apply_undo_row_delete(MARIA_HA *info, LSN undo_lsn,
const uchar *header, size_t length);
my_bool _ma_apply_undo_row_update(MARIA_HA *info, LSN undo_lsn,
const uchar *header, size_t length);
...@@ -3258,7 +3258,9 @@ my_bool pagecache_write_part(PAGECACHE *pagecache, ...@@ -3258,7 +3258,9 @@ my_bool pagecache_write_part(PAGECACHE *pagecache,
DBUG_ASSERT(block->type == PAGECACHE_EMPTY_PAGE || DBUG_ASSERT(block->type == PAGECACHE_EMPTY_PAGE ||
block->type == PAGECACHE_READ_UNKNOWN_PAGE || block->type == PAGECACHE_READ_UNKNOWN_PAGE ||
block->type == type); block->type == type ||
(block->type == PAGECACHE_PLAIN_PAGE &&
type == PAGECACHE_LSN_PAGE));
block->type= type; block->type= type;
if (make_lock_and_pin(pagecache, block, if (make_lock_and_pin(pagecache, block,
......
...@@ -76,6 +76,8 @@ prototype_redo_exec_hook(UNDO_ROW_UPDATE); ...@@ -76,6 +76,8 @@ prototype_redo_exec_hook(UNDO_ROW_UPDATE);
prototype_redo_exec_hook(UNDO_ROW_PURGE); prototype_redo_exec_hook(UNDO_ROW_PURGE);
prototype_redo_exec_hook(COMMIT); prototype_redo_exec_hook(COMMIT);
prototype_undo_exec_hook(UNDO_ROW_INSERT); prototype_undo_exec_hook(UNDO_ROW_INSERT);
prototype_undo_exec_hook(UNDO_ROW_DELETE);
prototype_undo_exec_hook(UNDO_ROW_UPDATE);
static int run_redo_phase(LSN lsn, my_bool apply); static int run_redo_phase(LSN lsn, my_bool apply);
static uint end_of_redo_phase(my_bool prepare_for_undo_phase); static uint end_of_redo_phase(my_bool prepare_for_undo_phase);
...@@ -977,6 +979,88 @@ prototype_undo_exec_hook(UNDO_ROW_INSERT) ...@@ -977,6 +979,88 @@ prototype_undo_exec_hook(UNDO_ROW_INSERT)
} }
prototype_undo_exec_hook(UNDO_ROW_DELETE)
{
my_bool error;
MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
if (info == NULL)
return 1;
info->s->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED |
STATE_NOT_OPTIMIZED_KEYS | STATE_NOT_SORTED_PAGES;
enlarge_buffer(rec);
if (log_record_buffer.str == NULL ||
translog_read_record(rec->lsn, 0, rec->record_length,
log_record_buffer.str, NULL) !=
rec->record_length)
{
fprintf(tracef, "Failed to read record\n");
return 1;
}
/* Set undo to point to previous undo record */
info->trn= trn;
info->trn->undo_lsn= lsn_korr(rec->header);
/*
For now we skip the page and directory entry. This is to be used
later when we mark rows as deleted.
*/
error= _ma_apply_undo_row_delete(info, rec->lsn,
log_record_buffer.str + LSN_STORE_SIZE +
FILEID_STORE_SIZE + PAGE_STORE_SIZE +
DIRPOS_STORE_SIZE,
rec->record_length -
(LSN_STORE_SIZE + FILEID_STORE_SIZE +
PAGE_STORE_SIZE + DIRPOS_STORE_SIZE));
info->trn= 0;
return error;
}
prototype_undo_exec_hook(UNDO_ROW_UPDATE)
{
my_bool error;
MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
if (info == NULL)
return 1;
info->s->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED |
STATE_NOT_OPTIMIZED_KEYS | STATE_NOT_SORTED_PAGES;
enlarge_buffer(rec);
if (log_record_buffer.str == NULL ||
translog_read_record(rec->lsn, 0, rec->record_length,
log_record_buffer.str, NULL) !=
rec->record_length)
{
fprintf(tracef, "Failed to read record\n");
return 1;
}
/* Set undo to point to previous undo record */
info->trn= trn;
info->trn->undo_lsn= lsn_korr(rec->header);
/*
For now we skip the page and directory entry. This is to be used
later when we mark rows as deleted.
*/
error= _ma_apply_undo_row_update(info, rec->lsn,
log_record_buffer.str + LSN_STORE_SIZE +
FILEID_STORE_SIZE + PAGE_STORE_SIZE +
DIRPOS_STORE_SIZE,
rec->record_length -
(LSN_STORE_SIZE + FILEID_STORE_SIZE +
PAGE_STORE_SIZE + DIRPOS_STORE_SIZE));
info->trn= 0;
return error;
}
static int run_redo_phase(LSN lsn, my_bool apply) static int run_redo_phase(LSN lsn, my_bool apply)
{ {
/* install hooks for execution */ /* install hooks for execution */
...@@ -1003,6 +1087,8 @@ static int run_redo_phase(LSN lsn, my_bool apply) ...@@ -1003,6 +1087,8 @@ static int run_redo_phase(LSN lsn, my_bool apply)
install_redo_exec_hook(UNDO_ROW_PURGE); install_redo_exec_hook(UNDO_ROW_PURGE);
install_redo_exec_hook(COMMIT); install_redo_exec_hook(COMMIT);
install_undo_exec_hook(UNDO_ROW_INSERT); install_undo_exec_hook(UNDO_ROW_INSERT);
install_undo_exec_hook(UNDO_ROW_DELETE);
install_undo_exec_hook(UNDO_ROW_UPDATE);
current_group_end_lsn= LSN_IMPOSSIBLE; current_group_end_lsn= LSN_IMPOSSIBLE;
......
...@@ -37,8 +37,9 @@ static enum data_file_type record_type= DYNAMIC_RECORD; ...@@ -37,8 +37,9 @@ static enum data_file_type record_type= DYNAMIC_RECORD;
static uint insert_count, update_count, remove_count; static uint insert_count, update_count, remove_count;
static uint pack_keys=0, pack_seg=0, key_length; static uint pack_keys=0, pack_seg=0, key_length;
static uint unique_key=HA_NOSAME; static uint unique_key=HA_NOSAME;
static my_bool pagecacheing, null_fields, silent, skip_update, opt_unique, static uint die_in_middle_of_transaction;
verbose, skip_delete, transactional, die_in_middle_of_transaction; static my_bool pagecacheing, null_fields, silent, skip_update, opt_unique;
static my_bool verbose, skip_delete, transactional;
static MARIA_COLUMNDEF recinfo[4]; static MARIA_COLUMNDEF recinfo[4];
static MARIA_KEYDEF keyinfo[10]; static MARIA_KEYDEF keyinfo[10];
static HA_KEYSEG keyseg[10]; static HA_KEYSEG keyseg[10];
...@@ -94,6 +95,7 @@ static int run_test(const char *filename) ...@@ -94,6 +95,7 @@ static int run_test(const char *filename)
{ {
MARIA_HA *file; MARIA_HA *file;
int i,j,error,deleted,rec_length,uniques=0; int i,j,error,deleted,rec_length,uniques=0;
uint offset_to_key;
ha_rows found,row_count; ha_rows found,row_count;
char record[MAX_REC_LENGTH],key[MAX_REC_LENGTH],read_record[MAX_REC_LENGTH]; char record[MAX_REC_LENGTH],key[MAX_REC_LENGTH],read_record[MAX_REC_LENGTH];
MARIA_UNIQUEDEF uniquedef; MARIA_UNIQUEDEF uniquedef;
...@@ -182,6 +184,10 @@ static int run_test(const char *filename) ...@@ -182,6 +184,10 @@ static int run_test(const char *filename)
else else
uniques=0; uniques=0;
offset_to_key= test(null_fields);
if (key_field == FIELD_BLOB)
offset_to_key+= 2;
if (!silent) if (!silent)
printf("- Creating maria file\n"); printf("- Creating maria file\n");
create_info.max_rows=(ulong) (rec_pointer_size ? create_info.max_rows=(ulong) (rec_pointer_size ?
...@@ -234,7 +240,7 @@ static int run_test(const char *filename) ...@@ -234,7 +240,7 @@ static int run_test(const char *filename)
flags[0]=2; flags[0]=2;
} }
if (die_in_middle_of_transaction) if (die_in_middle_of_transaction == 1)
{ {
/* /*
Ensure we get changed pages and log to disk Ensure we get changed pages and log to disk
...@@ -242,6 +248,7 @@ static int run_test(const char *filename) ...@@ -242,6 +248,7 @@ static int run_test(const char *filename)
*/ */
_ma_flush_table_files(file, MARIA_FLUSH_DATA, FLUSH_RELEASE, _ma_flush_table_files(file, MARIA_FLUSH_DATA, FLUSH_RELEASE,
FLUSH_RELEASE); FLUSH_RELEASE);
printf("Dying on request after insert without maria_close()\n");
exit(1); exit(1);
} }
...@@ -333,14 +340,14 @@ static int run_test(const char *filename) ...@@ -333,14 +340,14 @@ static int run_test(const char *filename)
if (verbose || (flags[j] >= 1 || if (verbose || (flags[j] >= 1 ||
(error && my_errno != HA_ERR_KEY_NOT_FOUND))) (error && my_errno != HA_ERR_KEY_NOT_FOUND)))
printf("key: '%.*s' maria_rkey: %3d errno: %3d\n", printf("key: '%.*s' maria_rkey: %3d errno: %3d\n",
(int) key_length,key+test(null_fields),error,my_errno); (int) key_length,key+offset_to_key,error,my_errno);
} }
else else
{ {
error=maria_delete(file,read_record); error=maria_delete(file,read_record);
if (verbose || error) if (verbose || error)
printf("key: '%.*s' maria_delete: %3d errno: %3d\n", printf("key: '%.*s' maria_delete: %3d errno: %3d\n",
(int) key_length, key+test(null_fields), error, my_errno); (int) key_length, key+offset_to_key, error, my_errno);
if (! error) if (! error)
{ {
deleted++; deleted++;
...@@ -348,6 +355,18 @@ static int run_test(const char *filename) ...@@ -348,6 +355,18 @@ static int run_test(const char *filename)
} }
} }
} }
if (die_in_middle_of_transaction == 2)
{
/*
Ensure we get changed pages and log to disk
As commit record is not done, the undo entries needs to be rolled back.
*/
_ma_flush_table_files(file, MARIA_FLUSH_DATA, FLUSH_RELEASE,
FLUSH_RELEASE);
printf("Dying on request after delete without maria_close()\n");
exit(1);
}
} }
if (!silent) if (!silent)
printf("- Reading rows with key\n"); printf("- Reading rows with key\n");
...@@ -362,7 +381,7 @@ static int run_test(const char *filename) ...@@ -362,7 +381,7 @@ static int run_test(const char *filename)
(error && (flags[i] != 0 || my_errno != HA_ERR_KEY_NOT_FOUND))) (error && (flags[i] != 0 || my_errno != HA_ERR_KEY_NOT_FOUND)))
{ {
printf("key: '%.*s' maria_rkey: %3d errno: %3d record: %s\n", printf("key: '%.*s' maria_rkey: %3d errno: %3d record: %s\n",
(int) key_length,key+test(null_fields),error,my_errno,record+1); (int) key_length,key+offset_to_key,error,my_errno,record+1);
} }
} }
...@@ -661,7 +680,7 @@ static struct my_option my_long_options[] = ...@@ -661,7 +680,7 @@ static struct my_option my_long_options[] =
"Abort hard after doing inserts. Used for testing recovery with undo", "Abort hard after doing inserts. Used for testing recovery with undo",
(uchar**) &die_in_middle_of_transaction, (uchar**) &die_in_middle_of_transaction,
(uchar**) &die_in_middle_of_transaction, (uchar**) &die_in_middle_of_transaction,
0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0}, 0, GET_INT, OPT_ARG, 0, 0, 0, 0, 0, 0},
{"transactional", 'T', {"transactional", 'T',
"Test in transactional mode. (Only works with block format)", "Test in transactional mode. (Only works with block format)",
(uchar**) &transactional, (uchar**) &transactional, 0, GET_BOOL, NO_ARG, (uchar**) &transactional, (uchar**) &transactional, 0, GET_BOOL, NO_ARG,
...@@ -749,6 +768,12 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)), ...@@ -749,6 +768,12 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
case 'K': /* Use key cacheing */ case 'K': /* Use key cacheing */
pagecacheing=1; pagecacheing=1;
break; break;
case 'A':
if (!argument)
die_in_middle_of_transaction= 1;
else
die_in_middle_of_transaction= atoi(argument);
break;
case 'V': case 'V':
printf("test1 Ver 1.2 \n"); printf("test1 Ver 1.2 \n");
exit(0); exit(0);
......
...@@ -369,10 +369,11 @@ typedef struct st_maria_row ...@@ -369,10 +369,11 @@ typedef struct st_maria_row
ulong *blob_lengths; /* Length for each blob */ ulong *blob_lengths; /* Length for each blob */
ulong base_length, normal_length, char_length, varchar_length, blob_length; ulong base_length, normal_length, char_length, varchar_length, blob_length;
ulong head_length, total_length; ulong head_length, total_length;
size_t extents_buffer_length; /* Size of 'extents' buffer */ size_t extents_buffer_length; /* Size of 'extents' buffer */
uint field_lengths_length; /* Length of data in field_lengths */ uint field_lengths_length; /* Length of data in field_lengths */
uint extents_count; /* number of extents in 'extents' */ uint extents_count; /* number of extents in 'extents' */
uint full_page_count, tail_count; /* For maria_chk */ uint full_page_count, tail_count; /* For maria_chk */
uint space_on_head_page;
} MARIA_ROW; } MARIA_ROW;
/* Data to scan row in blocked format */ /* Data to scan row in blocked format */
...@@ -434,6 +435,8 @@ struct st_maria_info ...@@ -434,6 +435,8 @@ struct st_maria_info
ulong packed_length, blob_length; /* Length of found, packed record */ ulong packed_length, blob_length; /* Length of found, packed record */
size_t rec_buff_size; size_t rec_buff_size;
PAGECACHE_FILE dfile; /* The datafile */ PAGECACHE_FILE dfile; /* The datafile */
IO_CACHE rec_cache; /* When cacheing records */
LIST open_list;
uint opt_flag; /* Optim. for space/speed */ uint opt_flag; /* Optim. for space/speed */
uint update; /* If file changed since open */ uint update; /* If file changed since open */
int lastinx; /* Last used index */ int lastinx; /* Last used index */
...@@ -449,8 +452,6 @@ struct st_maria_info ...@@ -449,8 +452,6 @@ struct st_maria_info
uint data_changed; /* Somebody has changed data */ uint data_changed; /* Somebody has changed data */
uint save_update; /* When using KEY_READ */ uint save_update; /* When using KEY_READ */
int save_lastinx; int save_lastinx;
LIST open_list;
IO_CACHE rec_cache; /* When cacheing records */
uint preload_buff_size; /* When preloading indexes */ uint preload_buff_size; /* When preloading indexes */
myf lock_wait; /* is 0 or MY_DONT_WAIT */ myf lock_wait; /* is 0 or MY_DONT_WAIT */
my_bool was_locked; /* Was locked in panic */ my_bool was_locked; /* Was locked in panic */
...@@ -468,6 +469,7 @@ struct st_maria_info ...@@ -468,6 +469,7 @@ struct st_maria_info
THR_LOCK_DATA lock; THR_LOCK_DATA lock;
#endif #endif
uchar *maria_rtree_recursion_state; /* For RTREE */ uchar *maria_rtree_recursion_state; /* For RTREE */
uchar length_buff[5]; /* temp buff to store blob lengths */
int maria_rtree_recursion_depth; int maria_rtree_recursion_depth;
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment