Commit ba1b1a5c authored by unknown's avatar unknown

Added applying of undo for updates

Fixed bug in duplicate key handling for block records during repair
All read-row methods now return error number in case of error
Don't calculate checksum for null fields
Fixed bug when running maria_read_log with -o


BUILD/SETUP.sh:
  Added STACK_DIRECTION
BUILD/compile-pentium-debug-max:
  Moved STACK_DIRECTION to SETUP
include/myisam.h:
  Added extra parameter to write_key
storage/maria/ma_blockrec.c:
  Added applying of undo for updates
  Fixed indentation
  Removed some not needed casts
  Fixed wrong logging of CLR record
  Split ma_update_block_record to two functions to be able to reuse it from undo-applying
  Simplify filling of packed fields
  ma_record_block_record) now returns error number on failure
  Sligtly changed log record information for undo-update
storage/maria/ma_check.c:
  Fixed bug in duplicate key handling for block records during repair
storage/maria/ma_checksum.c:
  Don't calculate checksum for null fields
storage/maria/ma_dynrec.c:
  _ma_read_dynamic_reocrd() now returns error number on error
  Rest of the changes are code simplification and indentation fixes
storage/maria/ma_locking.c:
  Added comment
storage/maria/ma_loghandler.c:
  More debugging
  Removed printing of total_record_length as this was always same as record_length
storage/maria/ma_open.c:
  Allocate bitmap for changed fields
storage/maria/ma_packrec.c:
  read_record now returns error number on error
storage/maria/ma_recovery.c:
  Fixed wrong arguments to undo_row_update
storage/maria/ma_statrec.c:
  read_record now returns error number on error (not 1)
  Code simplification
storage/maria/ma_test1.c:
  Added exit possibility after update phase (to test undo of updates)
storage/maria/maria_def.h:
  Include bitmap header file
storage/maria/maria_read_log.c:
  Fixed bug when running with -o
parent fdd6160b
...@@ -170,10 +170,10 @@ max_configs="$SSL_LIBRARY --with-plugins=max --with-embedded-server" ...@@ -170,10 +170,10 @@ max_configs="$SSL_LIBRARY --with-plugins=max --with-embedded-server"
# CPU and platform specific compilation flags. # CPU and platform specific compilation flags.
# #
alpha_cflags="$check_cpu_cflags -Wa,-m$cpu_flag" alpha_cflags="$check_cpu_cflags -Wa,-m$cpu_flag"
amd64_cflags="$check_cpu_cflags" amd64_cflags="$check_cpu_cflags -DSTACK_DIRECTION=-1"
amd64_cxxflags="" # If dropping '--with-big-tables', add here "-DBIG_TABLES" amd64_cxxflags="" # If dropping '--with-big-tables', add here "-DBIG_TABLES"
pentium_cflags="$check_cpu_cflags" pentium_cflags="$check_cpu_cflags -DSTACK_DIRECTION=-1"
pentium64_cflags="$check_cpu_cflags -m64" pentium64_cflags="$check_cpu_cflags -m64 -DSTACK_DIRECTION=-1"
ppc_cflags="$check_cpu_cflags" ppc_cflags="$check_cpu_cflags"
sparc_cflags="" sparc_cflags=""
......
...@@ -4,7 +4,7 @@ path=`dirname $0` ...@@ -4,7 +4,7 @@ path=`dirname $0`
set -- "$@" --with-debug=full set -- "$@" --with-debug=full
. "$path/SETUP.sh" . "$path/SETUP.sh"
extra_flags="$pentium_cflags $debug_cflags -DSTACK_DIRECTION=-1" extra_flags="$pentium_cflags $debug_cflags"
extra_configs="$pentium_configs $debug_configs $max_configs $error_inject --with-experimental-collations" extra_configs="$pentium_configs $debug_configs $max_configs $error_inject --with-experimental-collations"
. "$path/FINISH.sh" . "$path/FINISH.sh"
...@@ -364,7 +364,7 @@ typedef struct st_mi_sort_param ...@@ -364,7 +364,7 @@ typedef struct st_mi_sort_param
NEAR int (*write_keys)(struct st_mi_sort_param *, register uchar **, NEAR int (*write_keys)(struct st_mi_sort_param *, register uchar **,
uint , struct st_buffpek *, IO_CACHE *); uint , struct st_buffpek *, IO_CACHE *);
NEAR uint (*read_to_buffer)(IO_CACHE *,struct st_buffpek *, uint); NEAR uint (*read_to_buffer)(IO_CACHE *,struct st_buffpek *, uint);
NEAR int (*write_key)(struct st_mi_sort_param *, IO_CACHE *,char *, NEAR int (*write_key)(struct st_mi_sort_param *, IO_CACHE *,uchar *,
uint, uint); uint, uint);
} MI_SORT_PARAM; } MI_SORT_PARAM;
......
...@@ -677,6 +677,46 @@ static my_bool check_if_zero(uchar *pos, uint length) ...@@ -677,6 +677,46 @@ static my_bool check_if_zero(uchar *pos, uint length)
} }
/*
@brief Copy not changed fields from 'from' to 'to'
@notes
Assumption is that most fields are not changed!
(Which is why we don't test if all bits are set for some bytes in bitmap)
*/
void copy_not_changed_fields(MARIA_HA *info, MY_BITMAP *changed_fields,
uchar *to, uchar *from)
{
MARIA_COLUMNDEF *column, *end_column;
uchar *bitmap= (uchar*) changed_fields->bitmap;
MARIA_SHARE *share= info->s;
uint bit= 1;
for (column= share->columndef, end_column= column+ share->base.fields;
column < end_column; column++)
{
if (!(*bitmap & bit))
{
uint field_length= column->length;
if (column->type == FIELD_VARCHAR)
{
if (column->fill_length == 1)
field_length= (uint) from[column->offset] + 1;
else
field_length= uint2korr(from + column->offset) + 2;
}
memcpy(to + column->offset, from + column->offset, field_length);
}
if ((bit= (bit << 1)) == 256)
{
bitmap++;
bit= 1;
}
}
}
/* /*
Unpin all pinned pages Unpin all pinned pages
...@@ -878,7 +918,7 @@ static void calc_record_size(MARIA_HA *info, const uchar *record, ...@@ -878,7 +918,7 @@ static void calc_record_size(MARIA_HA *info, const uchar *record,
*blob_lengths++= 0; *blob_lengths++= 0;
continue; continue;
} }
switch ((enum en_fieldtype) column->type) { switch (column->type) {
case FIELD_CHECK: case FIELD_CHECK:
case FIELD_NORMAL: /* Fixed length field */ case FIELD_NORMAL: /* Fixed length field */
case FIELD_ZERO: case FIELD_ZERO:
...@@ -1321,8 +1361,8 @@ static my_bool write_tail(MARIA_HA *info, ...@@ -1321,8 +1361,8 @@ static my_bool write_tail(MARIA_HA *info,
LSN lsn; LSN lsn;
/* Log REDO changes of tail page */ /* Log REDO changes of tail page */
page_store(log_data+ FILEID_STORE_SIZE, block->page); page_store(log_data + FILEID_STORE_SIZE, block->page);
dirpos_store(log_data+ FILEID_STORE_SIZE + PAGE_STORE_SIZE, dirpos_store(log_data + FILEID_STORE_SIZE + PAGE_STORE_SIZE,
row_pos.rownr); row_pos.rownr);
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data; log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data); log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
...@@ -1804,7 +1844,7 @@ static my_bool write_block_record(MARIA_HA *info, ...@@ -1804,7 +1844,7 @@ static my_bool write_block_record(MARIA_HA *info,
continue; continue;
field_pos= record + column->offset; field_pos= record + column->offset;
switch ((enum en_fieldtype) column->type) { switch (column->type) {
case FIELD_NORMAL: /* Fixed length field */ case FIELD_NORMAL: /* Fixed length field */
case FIELD_SKIP_PRESPACE: case FIELD_SKIP_PRESPACE:
case FIELD_SKIP_ZERO: /* Fixed length field */ case FIELD_SKIP_ZERO: /* Fixed length field */
...@@ -2295,7 +2335,7 @@ static my_bool write_block_record(MARIA_HA *info, ...@@ -2295,7 +2335,7 @@ static my_bool write_block_record(MARIA_HA *info,
if (translog_write_record(&lsn, LOGREC_CLR_END, if (translog_write_record(&lsn, LOGREC_CLR_END,
info->trn, info, sizeof(log_data), info->trn, info, sizeof(log_data),
TRANSLOG_INTERNAL_PARTS + 1, log_array, TRANSLOG_INTERNAL_PARTS + 1, log_array,
log_data+ FILEID_STORE_SIZE)) log_data + LSN_STORE_SIZE))
goto disk_err; goto disk_err;
} }
else else
...@@ -2305,9 +2345,9 @@ static my_bool write_block_record(MARIA_HA *info, ...@@ -2305,9 +2345,9 @@ static my_bool write_block_record(MARIA_HA *info,
/* LOGREC_UNDO_ROW_INSERT & LOGREC_UNDO_ROW_INSERT share same header */ /* LOGREC_UNDO_ROW_INSERT & LOGREC_UNDO_ROW_INSERT share same header */
lsn_store(log_data, info->trn->undo_lsn); lsn_store(log_data, info->trn->undo_lsn);
page_store(log_data+ LSN_STORE_SIZE + FILEID_STORE_SIZE, page_store(log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE,
head_block->page); head_block->page);
dirpos_store(log_data+ LSN_STORE_SIZE + FILEID_STORE_SIZE + dirpos_store(log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE +
PAGE_STORE_SIZE, PAGE_STORE_SIZE,
row_pos->rownr); row_pos->rownr);
...@@ -2329,12 +2369,13 @@ static my_bool write_block_record(MARIA_HA *info, ...@@ -2329,12 +2369,13 @@ static my_bool write_block_record(MARIA_HA *info,
size_t row_length; size_t row_length;
uint row_parts_count; uint row_parts_count;
row_length= fill_update_undo_parts(info, old_record, record, row_length= fill_update_undo_parts(info, old_record, record,
info->log_row_parts + log_array +
TRANSLOG_INTERNAL_PARTS + 1, TRANSLOG_INTERNAL_PARTS + 1,
&row_parts_count); &row_parts_count);
if (translog_write_record(&lsn, LOGREC_UNDO_ROW_UPDATE, info->trn, if (translog_write_record(&lsn, LOGREC_UNDO_ROW_UPDATE, info->trn,
info, sizeof(log_data) + row_length, info, sizeof(log_data) + row_length,
TRANSLOG_INTERNAL_PARTS + 1 + row_parts_count, TRANSLOG_INTERNAL_PARTS + 1 +
row_parts_count,
log_array, log_data + LSN_STORE_SIZE)) log_array, log_data + LSN_STORE_SIZE))
goto disk_err; goto disk_err;
} }
...@@ -2601,8 +2642,11 @@ my_bool _ma_write_abort_block_record(MARIA_HA *info) ...@@ -2601,8 +2642,11 @@ my_bool _ma_write_abort_block_record(MARIA_HA *info)
for rows split into many extents. for rows split into many extents.
*/ */
my_bool _ma_update_block_record(MARIA_HA *info, MARIA_RECORD_POS record_pos, static my_bool _ma_update_block_record2(MARIA_HA *info,
const uchar *oldrec, const uchar *record) MARIA_RECORD_POS record_pos,
const uchar *oldrec,
const uchar *record,
LSN undo_lsn)
{ {
MARIA_BITMAP_BLOCKS *blocks= &info->cur_row.insert_blocks; MARIA_BITMAP_BLOCKS *blocks= &info->cur_row.insert_blocks;
uchar *buff; uchar *buff;
...@@ -2614,9 +2658,14 @@ my_bool _ma_update_block_record(MARIA_HA *info, MARIA_RECORD_POS record_pos, ...@@ -2614,9 +2658,14 @@ my_bool _ma_update_block_record(MARIA_HA *info, MARIA_RECORD_POS record_pos,
ulonglong page; ulonglong page;
struct st_row_pos_info row_pos; struct st_row_pos_info row_pos;
MARIA_SHARE *share= info->s; MARIA_SHARE *share= info->s;
DBUG_ENTER("_ma_update_block_record"); DBUG_ENTER("_ma_update_block_record2");
DBUG_PRINT("enter", ("rowid: %lu", (long) record_pos)); DBUG_PRINT("enter", ("rowid: %lu", (long) record_pos));
#ifdef ENABLE_IF_PROBLEM_WITH_UPDATE
DBUG_DUMP("oldrec", oldrec, share->base.reclength);
DBUG_DUMP("newrec", record, share->base.reclength);
#endif
calc_record_size(info, record, new_row); calc_record_size(info, record, new_row);
page= ma_recordpos_to_page(record_pos); page= ma_recordpos_to_page(record_pos);
...@@ -2669,11 +2718,12 @@ my_bool _ma_update_block_record(MARIA_HA *info, MARIA_RECORD_POS record_pos, ...@@ -2669,11 +2718,12 @@ my_bool _ma_update_block_record(MARIA_HA *info, MARIA_RECORD_POS record_pos,
if (cur_row->extents_count && free_full_pages(info, cur_row)) if (cur_row->extents_count && free_full_pages(info, cur_row))
goto err; goto err;
DBUG_RETURN(write_block_record(info, oldrec, record, new_row, blocks, DBUG_RETURN(write_block_record(info, oldrec, record, new_row, blocks,
1, &row_pos, 0)); 1, &row_pos, undo_lsn));
} }
/* /*
Allocate all size in block for record Allocate all size in block for record
QQ: Need to improve this to do compact if we can fit one more blob into TODO:
Need to improve this to do compact if we can fit one more blob into
the head page the head page
*/ */
head_length= uint2korr(dir + 2); head_length= uint2korr(dir + 2);
...@@ -2702,7 +2752,7 @@ my_bool _ma_update_block_record(MARIA_HA *info, MARIA_RECORD_POS record_pos, ...@@ -2702,7 +2752,7 @@ my_bool _ma_update_block_record(MARIA_HA *info, MARIA_RECORD_POS record_pos,
row_pos.data= buff + uint2korr(dir); row_pos.data= buff + uint2korr(dir);
row_pos.length= head_length; row_pos.length= head_length;
DBUG_RETURN(write_block_record(info, oldrec, record, new_row, blocks, 1, DBUG_RETURN(write_block_record(info, oldrec, record, new_row, blocks, 1,
&row_pos, 0)); &row_pos, undo_lsn));
err: err:
_ma_unpin_all_pages(info, 0); _ma_unpin_all_pages(info, 0);
...@@ -2710,6 +2760,16 @@ my_bool _ma_update_block_record(MARIA_HA *info, MARIA_RECORD_POS record_pos, ...@@ -2710,6 +2760,16 @@ my_bool _ma_update_block_record(MARIA_HA *info, MARIA_RECORD_POS record_pos,
} }
/* Wrapper for _ma_update_block_record2() used by ma_update() */
my_bool _ma_update_block_record(MARIA_HA *info, MARIA_RECORD_POS record_pos,
const uchar *orig_rec, const uchar *new_rec)
{
return _ma_update_block_record2(info, record_pos, orig_rec, new_rec, 0);
}
/* /*
Delete a directory entry Delete a directory entry
...@@ -2848,8 +2908,8 @@ static my_bool delete_head_or_tail(MARIA_HA *info, ...@@ -2848,8 +2908,8 @@ static my_bool delete_head_or_tail(MARIA_HA *info,
if (info->s->now_transactional) if (info->s->now_transactional)
{ {
/* Log REDO data */ /* Log REDO data */
page_store(log_data+ FILEID_STORE_SIZE, page); page_store(log_data + FILEID_STORE_SIZE, page);
dirpos_store(log_data+ FILEID_STORE_SIZE + PAGE_STORE_SIZE, dirpos_store(log_data + FILEID_STORE_SIZE + PAGE_STORE_SIZE,
record_number); record_number);
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data; log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data;
...@@ -2877,7 +2937,7 @@ static my_bool delete_head_or_tail(MARIA_HA *info, ...@@ -2877,7 +2937,7 @@ static my_bool delete_head_or_tail(MARIA_HA *info,
PAGE_STORE_SIZE + PAGERANGE_STORE_SIZE]; PAGE_STORE_SIZE + PAGERANGE_STORE_SIZE];
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1]; LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
pagerange_store(log_data + FILEID_STORE_SIZE, 1); pagerange_store(log_data + FILEID_STORE_SIZE, 1);
page_store(log_data+ FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE, page); page_store(log_data + FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE, page);
pagerange_store(log_data + FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE + pagerange_store(log_data + FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE +
PAGE_STORE_SIZE, 1); PAGE_STORE_SIZE, 1);
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data; log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data;
...@@ -2975,8 +3035,8 @@ my_bool _ma_delete_block_record(MARIA_HA *info, const uchar *record) ...@@ -2975,8 +3035,8 @@ my_bool _ma_delete_block_record(MARIA_HA *info, const uchar *record)
/* Write UNDO record */ /* Write UNDO record */
lsn_store(log_data, info->trn->undo_lsn); lsn_store(log_data, info->trn->undo_lsn);
page_store(log_data+ LSN_STORE_SIZE + FILEID_STORE_SIZE, page); page_store(log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE, page);
dirpos_store(log_data+ LSN_STORE_SIZE + FILEID_STORE_SIZE + dirpos_store(log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE +
PAGE_STORE_SIZE, record_number); PAGE_STORE_SIZE, record_number);
info->log_row_parts[TRANSLOG_INTERNAL_PARTS].str= (char*) log_data; info->log_row_parts[TRANSLOG_INTERNAL_PARTS].str= (char*) log_data;
...@@ -3421,16 +3481,14 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record, ...@@ -3421,16 +3481,14 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record,
for (end_column= share->columndef + share->base.fields; for (end_column= share->columndef + share->base.fields;
column < end_column; column++) column < end_column; column++)
{ {
enum en_fieldtype type= (enum en_fieldtype) column->type; enum en_fieldtype type= column->type;
uchar *field_pos= record + column->offset; uchar *field_pos= record + column->offset;
/* First check if field is present in record */ /* First check if field is present in record */
if ((record[column->null_pos] & column->null_bit) || if ((record[column->null_pos] & column->null_bit) ||
(cur_row->empty_bits[column->empty_pos] & column->empty_bit)) (cur_row->empty_bits[column->empty_pos] & column->empty_bit))
{ {
if (type == FIELD_SKIP_ENDSPACE) bfill(record + column->offset, column->fill_length,
bfill(record + column->offset, column->length, ' '); type == FIELD_SKIP_ENDSPACE ? ' ' : 0);
else
bzero(record + column->offset, column->fill_length);
continue; continue;
} }
switch (type) { switch (type) {
...@@ -3586,7 +3644,7 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record, ...@@ -3586,7 +3644,7 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record,
This function is a simpler version of _ma_read_block_record2() This function is a simpler version of _ma_read_block_record2()
The data about the used pages is stored in info->cur_row. The data about the used pages is stored in info->cur_row.
@return @return Status
@retval 0 ok @retval 0 ok
@retval 1 Error. my_errno contains error number @retval 1 Error. my_errno contains error number
*/ */
...@@ -3679,11 +3737,14 @@ static my_bool read_row_extent_info(MARIA_HA *info, uchar *buff, ...@@ -3679,11 +3737,14 @@ static my_bool read_row_extent_info(MARIA_HA *info, uchar *buff,
/* /*
Read a record based on record position Read a record based on record position
SYNOPSIS @fn _ma_read_block_record()
_ma_read_block_record() @param info Maria handler
info Maria handler @param record Store record here
record Store record here @param record_pos Record position
record_pos Record position
@return Status
@retval 0 ok
@retval # Error number
*/ */
int _ma_read_block_record(MARIA_HA *info, uchar *record, int _ma_read_block_record(MARIA_HA *info, uchar *record,
...@@ -3702,13 +3763,13 @@ int _ma_read_block_record(MARIA_HA *info, uchar *record, ...@@ -3702,13 +3763,13 @@ int _ma_read_block_record(MARIA_HA *info, uchar *record,
&info->dfile, ma_recordpos_to_page(record_pos), 0, &info->dfile, ma_recordpos_to_page(record_pos), 0,
info->buff, info->s->page_type, info->buff, info->s->page_type,
PAGECACHE_LOCK_LEFT_UNLOCKED, 0))) PAGECACHE_LOCK_LEFT_UNLOCKED, 0)))
DBUG_RETURN(1); DBUG_RETURN(my_errno);
DBUG_ASSERT((buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) == HEAD_PAGE); DBUG_ASSERT((buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) == HEAD_PAGE);
if (!(data= get_record_position(buff, block_size, offset, &end_of_data))) if (!(data= get_record_position(buff, block_size, offset, &end_of_data)))
{ {
my_errno= HA_ERR_WRONG_IN_RECORD; /* File crashed */
DBUG_PRINT("error", ("Wrong directory entry in data block")); DBUG_PRINT("error", ("Wrong directory entry in data block"));
DBUG_RETURN(1); my_errno= HA_ERR_WRONG_IN_RECORD; /* File crashed */
DBUG_RETURN(HA_ERR_WRONG_IN_RECORD);
} }
DBUG_RETURN(_ma_read_block_record2(info, record, data, end_of_data)); DBUG_RETURN(_ma_read_block_record2(info, record, data, end_of_data));
} }
...@@ -4204,7 +4265,7 @@ static size_t fill_insert_undo_parts(MARIA_HA *info, const uchar *record, ...@@ -4204,7 +4265,7 @@ static size_t fill_insert_undo_parts(MARIA_HA *info, const uchar *record,
column_pos= record+ column->offset; column_pos= record+ column->offset;
column_length= column->length; column_length= column->length;
switch ((enum en_fieldtype) column->type) { switch (column->type) {
case FIELD_CHECK: case FIELD_CHECK:
case FIELD_NORMAL: /* Fixed length field */ case FIELD_NORMAL: /* Fixed length field */
case FIELD_ZERO: case FIELD_ZERO:
...@@ -4288,7 +4349,7 @@ static size_t fill_insert_undo_parts(MARIA_HA *info, const uchar *record, ...@@ -4288,7 +4349,7 @@ static size_t fill_insert_undo_parts(MARIA_HA *info, const uchar *record,
Fields are stored in same order as the field array. Fields are stored in same order as the field array.
Number of changed fields (packed) Offset to changed field data (packed)
For each changed field For each changed field
Fieldnumber (packed) Fieldnumber (packed)
...@@ -4323,7 +4384,7 @@ static size_t fill_update_undo_parts(MARIA_HA *info, const uchar *oldrec, ...@@ -4323,7 +4384,7 @@ static size_t fill_update_undo_parts(MARIA_HA *info, const uchar *oldrec,
uchar *old_field_lengths= old_row->field_lengths; uchar *old_field_lengths= old_row->field_lengths;
uchar *new_field_lengths= new_row->field_lengths; uchar *new_field_lengths= new_row->field_lengths;
size_t row_length= 0; size_t row_length= 0;
uint field_count= 0; uint field_lengths;
LEX_STRING *start_log_parts; LEX_STRING *start_log_parts;
my_bool new_column_is_empty; my_bool new_column_is_empty;
DBUG_ENTER("fill_update_undo_parts"); DBUG_ENTER("fill_update_undo_parts");
...@@ -4341,7 +4402,6 @@ static size_t fill_update_undo_parts(MARIA_HA *info, const uchar *oldrec, ...@@ -4341,7 +4402,6 @@ static size_t fill_update_undo_parts(MARIA_HA *info, const uchar *oldrec,
{ {
/* Store changed null bits */ /* Store changed null bits */
*field_data++= (uchar) 255; /* Special case */ *field_data++= (uchar) 255; /* Special case */
field_count++;
log_parts->str= (char*) oldrec; log_parts->str= (char*) oldrec;
log_parts->length= share->base.null_bytes; log_parts->length= share->base.null_bytes;
row_length= log_parts->length; row_length= log_parts->length;
...@@ -4359,10 +4419,9 @@ static size_t fill_update_undo_parts(MARIA_HA *info, const uchar *oldrec, ...@@ -4359,10 +4419,9 @@ static size_t fill_update_undo_parts(MARIA_HA *info, const uchar *oldrec,
{ {
field_data= ma_store_length(field_data, field_data= ma_store_length(field_data,
(uint) (column - share->columndef)); (uint) (column - share->columndef));
field_count++;
log_parts->str= (char*) oldrec + column->offset; log_parts->str= (char*) oldrec + column->offset;
log_parts->length= column->length; log_parts->length= column->length;
row_length+= log_parts->length; row_length+= column->length;
log_parts++; log_parts++;
} }
} }
...@@ -4394,7 +4453,6 @@ static size_t fill_update_undo_parts(MARIA_HA *info, const uchar *oldrec, ...@@ -4394,7 +4453,6 @@ static size_t fill_update_undo_parts(MARIA_HA *info, const uchar *oldrec,
field_data= ma_store_length(field_data, field_data= ma_store_length(field_data,
(uint) (column - share->columndef)); (uint) (column - share->columndef));
field_data= ma_store_length(field_data, 0); field_data= ma_store_length(field_data, 0);
field_count++;
continue; continue;
} }
/* /*
...@@ -4409,7 +4467,7 @@ static size_t fill_update_undo_parts(MARIA_HA *info, const uchar *oldrec, ...@@ -4409,7 +4467,7 @@ static size_t fill_update_undo_parts(MARIA_HA *info, const uchar *oldrec,
new_column_pos= newrec + column->offset; new_column_pos= newrec + column->offset;
old_column_length= new_column_length= column->length; old_column_length= new_column_length= column->length;
switch ((enum en_fieldtype) column->type) { switch (column->type) {
case FIELD_CHECK: case FIELD_CHECK:
case FIELD_NORMAL: /* Fixed length field */ case FIELD_NORMAL: /* Fixed length field */
case FIELD_ZERO: case FIELD_ZERO:
...@@ -4418,6 +4476,8 @@ static size_t fill_update_undo_parts(MARIA_HA *info, const uchar *oldrec, ...@@ -4418,6 +4476,8 @@ static size_t fill_update_undo_parts(MARIA_HA *info, const uchar *oldrec,
break; break;
case FIELD_VARCHAR: case FIELD_VARCHAR:
new_column_length--; /* Skip length prefix */ new_column_length--; /* Skip length prefix */
old_column_pos+= column->fill_length;
new_column_pos+= column->fill_length;
/* Fall through */ /* Fall through */
case FIELD_SKIP_ENDSPACE: /* CHAR */ case FIELD_SKIP_ENDSPACE: /* CHAR */
{ {
...@@ -4465,22 +4525,22 @@ static size_t fill_update_undo_parts(MARIA_HA *info, const uchar *oldrec, ...@@ -4465,22 +4525,22 @@ static size_t fill_update_undo_parts(MARIA_HA *info, const uchar *oldrec,
field_data= ma_store_length(field_data, field_data= ma_store_length(field_data,
(uint) (column - share->columndef)); (uint) (column - share->columndef));
field_data= ma_store_length(field_data, old_column_length); field_data= ma_store_length(field_data, old_column_length);
field_count++;
log_parts->str= (char*) old_column_pos; log_parts->str= (char*) old_column_pos;
log_parts->length= old_column_length; log_parts->length= old_column_length;
row_length+= log_parts->length; row_length+= old_column_length;
log_parts++; log_parts++;
} }
} }
*log_parts_count= (log_parts - start_log_parts); *log_parts_count= (log_parts - start_log_parts);
/* Store number of fields before the field/field_lengths */ /* Store length of field length data before the field/field_lengths */
field_lengths= (field_data - start_field_data);
start_log_parts->str= ((char*) start_log_parts->str= ((char*)
(start_field_data - (start_field_data -
ma_calc_length_for_store_length(field_count))); ma_calc_length_for_store_length(field_lengths)));
ma_store_length(start_log_parts->str, field_count); ma_store_length(start_log_parts->str, field_lengths);
start_log_parts->length= (size_t) ((char*) field_data - start_log_parts->length= (size_t) ((char*) field_data -
start_log_parts->str); start_log_parts->str);
row_length+= start_log_parts->length; row_length+= start_log_parts->length;
...@@ -4864,7 +4924,7 @@ my_bool _ma_apply_undo_row_insert(MARIA_HA *info, LSN undo_lsn, ...@@ -4864,7 +4924,7 @@ my_bool _ma_apply_undo_row_insert(MARIA_HA *info, LSN undo_lsn,
if (translog_write_record(&lsn, LOGREC_CLR_END, if (translog_write_record(&lsn, LOGREC_CLR_END,
info->trn, info, sizeof(log_data), info->trn, info, sizeof(log_data),
TRANSLOG_INTERNAL_PARTS + 1, log_array, TRANSLOG_INTERNAL_PARTS + 1, log_array,
log_data+ FILEID_STORE_SIZE)) log_data + FILEID_STORE_SIZE))
goto err; goto err;
info->s->state.state.records--; info->s->state.state.records--;
...@@ -4952,10 +5012,11 @@ my_bool _ma_apply_undo_row_delete(MARIA_HA *info, LSN undo_lsn, ...@@ -4952,10 +5012,11 @@ my_bool _ma_apply_undo_row_delete(MARIA_HA *info, LSN undo_lsn,
else else
*blob_lengths++= 0; *blob_lengths++= 0;
if (share->calc_checksum) if (share->calc_checksum)
bzero(record + column->offset, column->length); bfill(record + column->offset, column->fill_length,
column->type == FIELD_SKIP_ENDSPACE ? ' ' : 0);
continue; continue;
} }
switch ((enum en_fieldtype) column->type) { switch (column->type) {
case FIELD_CHECK: case FIELD_CHECK:
case FIELD_NORMAL: /* Fixed length field */ case FIELD_NORMAL: /* Fixed length field */
case FIELD_ZERO: case FIELD_ZERO:
...@@ -5041,15 +5102,147 @@ my_bool _ma_apply_undo_row_delete(MARIA_HA *info, LSN undo_lsn, ...@@ -5041,15 +5102,147 @@ my_bool _ma_apply_undo_row_delete(MARIA_HA *info, LSN undo_lsn,
} }
/* Execute undo of a row update */ /*
Execute undo of a row update
@fn _ma_apply_undo_row_update()
@return Operation status
@retval 0 OK
@retval 1 Error
*/
my_bool _ma_apply_undo_row_update(MARIA_HA *info __attribute__ ((unused)), my_bool _ma_apply_undo_row_update(MARIA_HA *info, LSN undo_lsn,
LSN undo_lsn __attribute__ ((unused)), const uchar *header,
const uchar *header __attribute__ ((unused)), size_t header_length __attribute__((unused)))
size_t length __attribute__ ((unused)))
{ {
ulonglong page;
uint rownr, field_length_header;
MARIA_SHARE *share= info->s;
const uchar *field_length_data, *field_length_data_end;
uchar *current_record, *orig_record;
int error= 1;
MARIA_RECORD_POS record_pos;
DBUG_ENTER("_ma_apply_undo_row_update"); DBUG_ENTER("_ma_apply_undo_row_update");
fprintf(stderr, "Undo of row update is not yet done\n");
exit(1); page= page_korr(header);
DBUG_RETURN(0); rownr= dirpos_korr(header + PAGE_STORE_SIZE);
record_pos= ma_recordpos(page, rownr);
DBUG_PRINT("enter", ("Page: %lu rownr: %u", (ulong) page, rownr));
/*
Set header to point to old field values, generated by
fill_update_undo_parts()
*/
header+= PAGE_STORE_SIZE + DIRPOS_STORE_SIZE;
field_length_header= ma_get_length((uchar**) &header);
field_length_data= header;
header+= field_length_header;
field_length_data_end= header;
/* Allocate buffer for current row & original row */
if (!(current_record= my_malloc(share->base.reclength * 2, MYF(MY_WME))))
DBUG_RETURN(1);
orig_record= current_record+ share->base.reclength;
/* Read current record */
if (_ma_read_block_record(info, current_record, record_pos))
goto err;
if (*field_length_data == 255)
{
/* Bitmap changed */
field_length_data++;
memcpy(orig_record, header, share->base.null_bytes);
header+= share->base.null_bytes;
}
else
memcpy(orig_record, current_record, share->base.null_bytes);
bitmap_clear_all(&info->changed_fields);
while (field_length_data < field_length_data_end)
{
uint field_nr= ma_get_length((uchar**) &field_length_data), field_length;
MARIA_COLUMNDEF *column= share->columndef + field_nr;
uchar *orig_field_pos= orig_record + column->offset;
bitmap_set_bit(&info->changed_fields, field_nr);
if (field_nr >= share->base.fixed_not_null_fields)
{
if (!(field_length= ma_get_length((uchar**) &field_length_data)))
{
/* Null field or empty field */
bfill(orig_field_pos, column->fill_length,
column->type == FIELD_SKIP_ENDSPACE ? ' ' : 0);
continue;
}
}
else
field_length= column->length;
switch (column->type) {
case FIELD_CHECK:
case FIELD_NORMAL: /* Fixed length field */
case FIELD_ZERO:
case FIELD_SKIP_PRESPACE: /* Not packed */
memcpy(orig_field_pos, header, column->length);
header+= column->length;
break;
case FIELD_SKIP_ZERO: /* Number */
case FIELD_SKIP_ENDSPACE: /* CHAR */
{
uint diff;
memcpy(orig_field_pos, header, field_length);
if ((diff= (column->length - field_length)))
bfill(orig_field_pos + column->length - diff, diff,
column->type == FIELD_SKIP_ENDSPACE ? ' ' : 0);
header+= field_length;
}
break;
case FIELD_VARCHAR:
if (column->length <= 256)
{
*orig_field_pos++= (uchar) field_length;
}
else
{
int2store(orig_field_pos, field_length);
orig_field_pos+= 2;
}
memcpy(orig_field_pos, header, field_length);
header+= field_length;
break;
case FIELD_BLOB:
{
uint size_length= column->length - portable_sizeof_char_ptr;
_ma_store_blob_length(orig_field_pos, size_length, field_length);
memcpy_fixed(orig_field_pos + size_length, &header, sizeof(header));
header+= field_length;
break;
}
default:
DBUG_ASSERT(0);
}
}
copy_not_changed_fields(info, &info->changed_fields,
orig_record, current_record);
if (share->calc_checksum)
{
info->cur_row.checksum= (*share->calc_checksum)(info, orig_record);
info->state->checksum+= (info->cur_row.checksum -
(*share->calc_checksum)(info, current_record));
}
/*
Now records are up to date, execute the update to original values
*/
if (_ma_update_block_record2(info, record_pos, current_record, orig_record,
undo_lsn))
goto err;
error= 0;
err:
my_free(current_record, MYF(0));
DBUG_RETURN(error);
} }
...@@ -2165,9 +2165,19 @@ int maria_repair(HA_CHECK *param, register MARIA_HA *info, ...@@ -2165,9 +2165,19 @@ int maria_repair(HA_CHECK *param, register MARIA_HA *info,
param->error_printed=1; param->error_printed=1;
goto err; goto err;
} }
continue; /* purecov: begin tested */
if (block_record)
{
sort_info.new_info->state->records--;
if ((*sort_info.new_info->s->write_record_abort)(sort_info.new_info))
{
_ma_check_print_error(param,"Couldn't delete duplicate row");
goto err;
}
continue;
}
/* purecov: end */
} }
if (!block_record && _ma_sort_write_record(&sort_param)) if (!block_record && _ma_sort_write_record(&sort_param))
goto err; goto err;
} }
......
...@@ -31,6 +31,9 @@ ha_checksum _ma_checksum(MARIA_HA *info, const uchar *record) ...@@ -31,6 +31,9 @@ ha_checksum _ma_checksum(MARIA_HA *info, const uchar *record)
const uchar *pos= record + column->offset; const uchar *pos= record + column->offset;
ulong length; ulong length;
if (record[column->null_pos] & column->null_bit)
continue; /* Null field */
switch (column->type) { switch (column->type) {
case FIELD_BLOB: case FIELD_BLOB:
{ {
...@@ -45,12 +48,12 @@ ha_checksum _ma_checksum(MARIA_HA *info, const uchar *record) ...@@ -45,12 +48,12 @@ ha_checksum _ma_checksum(MARIA_HA *info, const uchar *record)
} }
case FIELD_VARCHAR: case FIELD_VARCHAR:
{ {
uint pack_length= HA_VARCHAR_PACKLENGTH(column->length-1); uint pack_length= column->fill_length;
if (pack_length == 1) if (pack_length == 1)
length= (ulong) *(uchar*) pos; length= (ulong) *(uchar*) pos;
else else
length= uint2korr(pos); length= uint2korr(pos);
pos+= pack_length; pos+= pack_length; /* Skip length information */
break; break;
} }
default: default:
......
...@@ -1138,10 +1138,14 @@ my_bool _ma_rec_check(MARIA_HA *info,const uchar *record, uchar *rec_buff, ...@@ -1138,10 +1138,14 @@ my_bool _ma_rec_check(MARIA_HA *info,const uchar *record, uchar *rec_buff,
} }
/*
@brief Unpacks a record
/* Unpacks a record */ @return Recordlength
/* Returns -1 and my_errno =HA_ERR_RECORD_DELETED if reclength isn't */ @retval >0 ok
/* right. Returns reclength (>0) if ok */ @retval MY_FILE_ERROR (== -1) Error.
my_errno is set to HA_ERR_WRONG_IN_RECORD
*/
ulong _ma_rec_unpack(register MARIA_HA *info, register uchar *to, uchar *from, ulong _ma_rec_unpack(register MARIA_HA *info, register uchar *to, uchar *from,
ulong found_length) ulong found_length)
...@@ -1369,9 +1373,10 @@ void _ma_store_blob_length(uchar *pos,uint pack_length,uint length) ...@@ -1369,9 +1373,10 @@ void _ma_store_blob_length(uchar *pos,uint pack_length,uint length)
part of the record. part of the record.
RETURN RETURN
0 OK 0 OK
1 Error # Error number
*/ */
int _ma_read_dynamic_record(MARIA_HA *info, uchar *buf, int _ma_read_dynamic_record(MARIA_HA *info, uchar *buf,
MARIA_RECORD_POS filepos) MARIA_RECORD_POS filepos)
{ {
...@@ -1379,103 +1384,102 @@ int _ma_read_dynamic_record(MARIA_HA *info, uchar *buf, ...@@ -1379,103 +1384,102 @@ int _ma_read_dynamic_record(MARIA_HA *info, uchar *buf,
uint b_type; uint b_type;
MARIA_BLOCK_INFO block_info; MARIA_BLOCK_INFO block_info;
File file; File file;
uchar *to;
uint left_length;
DBUG_ENTER("_ma_read_dynamic_record"); DBUG_ENTER("_ma_read_dynamic_record");
if (filepos != HA_OFFSET_ERROR) if (filepos == HA_OFFSET_ERROR)
goto err;
LINT_INIT(to);
LINT_INIT(left_length);
file= info->dfile.file;
block_of_record= 0; /* First block of record is numbered as zero. */
block_info.second_read= 0;
do
{ {
uchar *to; /* A corrupted table can have wrong pointers. (Bug# 19835) */
uint left_length; if (filepos == HA_OFFSET_ERROR)
goto panic;
LINT_INIT(to); if (info->opt_flag & WRITE_CACHE_USED &&
LINT_INIT(left_length); (info->rec_cache.pos_in_file < filepos +
file= info->dfile.file; MARIA_BLOCK_INFO_HEADER_LENGTH) &&
block_of_record= 0; /* First block of record is numbered as zero. */ flush_io_cache(&info->rec_cache))
block_info.second_read= 0; goto err;
do info->rec_cache.seek_not_done=1;
{ if ((b_type= _ma_get_block_info(&block_info, file, filepos)) &
/* A corrupted table can have wrong pointers. (Bug# 19835) */ (BLOCK_DELETED | BLOCK_ERROR | BLOCK_SYNC_ERROR |
if (filepos == HA_OFFSET_ERROR) BLOCK_FATAL_ERROR))
{
if (b_type & (BLOCK_SYNC_ERROR | BLOCK_DELETED))
my_errno=HA_ERR_RECORD_DELETED;
goto err;
}
if (block_of_record++ == 0) /* First block */
{
if (block_info.rec_len > (uint) info->s->base.max_pack_length)
goto panic; goto panic;
if (info->opt_flag & WRITE_CACHE_USED && if (info->s->base.blobs)
(info->rec_cache.pos_in_file < filepos +
MARIA_BLOCK_INFO_HEADER_LENGTH) &&
flush_io_cache(&info->rec_cache))
goto err;
info->rec_cache.seek_not_done=1;
if ((b_type= _ma_get_block_info(&block_info, file, filepos)) &
(BLOCK_DELETED | BLOCK_ERROR | BLOCK_SYNC_ERROR |
BLOCK_FATAL_ERROR))
{
if (b_type & (BLOCK_SYNC_ERROR | BLOCK_DELETED))
my_errno=HA_ERR_RECORD_DELETED;
goto err;
}
if (block_of_record++ == 0) /* First block */
{
if (block_info.rec_len > (uint) info->s->base.max_pack_length)
goto panic;
if (info->s->base.blobs)
{
if (_ma_alloc_buffer(&info->rec_buff, &info->rec_buff_size,
block_info.rec_len +
info->s->base.extra_rec_buff_size))
goto err;
}
to= info->rec_buff;
left_length=block_info.rec_len;
}
if (left_length < block_info.data_len || ! block_info.data_len)
goto panic; /* Wrong linked record */
/* copy information that is already read */
{ {
uint offset= (uint) (block_info.filepos - filepos); if (_ma_alloc_buffer(&info->rec_buff, &info->rec_buff_size,
uint prefetch_len= (sizeof(block_info.header) - offset); block_info.rec_len +
filepos+= sizeof(block_info.header); info->s->base.extra_rec_buff_size))
goto err;
if (prefetch_len > block_info.data_len)
prefetch_len= block_info.data_len;
if (prefetch_len)
{
memcpy((uchar*) to, block_info.header + offset, prefetch_len);
block_info.data_len-= prefetch_len;
left_length-= prefetch_len;
to+= prefetch_len;
}
} }
/* read rest of record from file */ to= info->rec_buff;
if (block_info.data_len) left_length=block_info.rec_len;
}
if (left_length < block_info.data_len || ! block_info.data_len)
goto panic; /* Wrong linked record */
/* copy information that is already read */
{
uint offset= (uint) (block_info.filepos - filepos);
uint prefetch_len= (sizeof(block_info.header) - offset);
filepos+= sizeof(block_info.header);
if (prefetch_len > block_info.data_len)
prefetch_len= block_info.data_len;
if (prefetch_len)
{ {
if (info->opt_flag & WRITE_CACHE_USED && memcpy((uchar*) to, block_info.header + offset, prefetch_len);
info->rec_cache.pos_in_file < filepos + block_info.data_len && block_info.data_len-= prefetch_len;
flush_io_cache(&info->rec_cache)) left_length-= prefetch_len;
goto err; to+= prefetch_len;
/*
What a pity that this method is not called 'file_pread' and that
there is no equivalent without seeking. We are at the right
position already. :(
*/
if (info->s->file_read(info, (uchar*) to, block_info.data_len,
filepos, MYF(MY_NABP)))
goto panic;
left_length-=block_info.data_len;
to+=block_info.data_len;
} }
filepos= block_info.next_filepos; }
} while (left_length); /* read rest of record from file */
if (block_info.data_len)
{
if (info->opt_flag & WRITE_CACHE_USED &&
info->rec_cache.pos_in_file < filepos + block_info.data_len &&
flush_io_cache(&info->rec_cache))
goto err;
/*
What a pity that this method is not called 'file_pread' and that
there is no equivalent without seeking. We are at the right
position already. :(
*/
if (info->s->file_read(info, (uchar*) to, block_info.data_len,
filepos, MYF(MY_NABP)))
goto panic;
left_length-=block_info.data_len;
to+=block_info.data_len;
}
filepos= block_info.next_filepos;
} while (left_length);
info->update|= HA_STATE_AKTIV; /* We have a aktive record */ info->update|= HA_STATE_AKTIV; /* We have a aktive record */
fast_ma_writeinfo(info); fast_ma_writeinfo(info);
DBUG_RETURN(_ma_rec_unpack(info,buf,info->rec_buff,block_info.rec_len) != DBUG_RETURN(_ma_rec_unpack(info,buf,info->rec_buff,block_info.rec_len) !=
MY_FILE_ERROR ? 0 : 1); MY_FILE_ERROR ? 0 : my_errno);
}
err:
fast_ma_writeinfo(info); fast_ma_writeinfo(info);
DBUG_RETURN(1); /* Wrong data to read */ DBUG_RETURN(my_errno);
panic: panic:
my_errno=HA_ERR_WRONG_IN_RECORD; my_errno=HA_ERR_WRONG_IN_RECORD;
err: goto err;
VOID(_ma_writeinfo(info,0));
DBUG_RETURN(1);
} }
/* compare unique constraint between stored rows */ /* compare unique constraint between stored rows */
...@@ -1655,7 +1659,7 @@ static my_bool _ma_cmp_buffer(File file, const uchar *buff, my_off_t filepos, ...@@ -1655,7 +1659,7 @@ static my_bool _ma_cmp_buffer(File file, const uchar *buff, my_off_t filepos,
RETURN RETURN
0 OK 0 OK
!= 0 Error != 0 Error number
*/ */
int _ma_read_rnd_dynamic_record(MARIA_HA *info, int _ma_read_rnd_dynamic_record(MARIA_HA *info,
...@@ -1663,7 +1667,7 @@ int _ma_read_rnd_dynamic_record(MARIA_HA *info, ...@@ -1663,7 +1667,7 @@ int _ma_read_rnd_dynamic_record(MARIA_HA *info,
MARIA_RECORD_POS filepos, MARIA_RECORD_POS filepos,
my_bool skip_deleted_blocks) my_bool skip_deleted_blocks)
{ {
int block_of_record, info_read, save_errno; int block_of_record, info_read;
uint left_len,b_type; uint left_len,b_type;
uchar *to; uchar *to;
MARIA_BLOCK_INFO block_info; MARIA_BLOCK_INFO block_info;
...@@ -1827,9 +1831,8 @@ int _ma_read_rnd_dynamic_record(MARIA_HA *info, ...@@ -1827,9 +1831,8 @@ int _ma_read_rnd_dynamic_record(MARIA_HA *info,
panic: panic:
my_errno=HA_ERR_WRONG_IN_RECORD; /* Something is fatal wrong */ my_errno=HA_ERR_WRONG_IN_RECORD; /* Something is fatal wrong */
err: err:
save_errno=my_errno; fast_ma_writeinfo(info);
VOID(_ma_writeinfo(info,0)); DBUG_RETURN(my_errno);
DBUG_RETURN(my_errno=save_errno);
} }
......
...@@ -387,6 +387,9 @@ int _ma_readinfo(register MARIA_HA *info, int lock_type, int check_keybuffer) ...@@ -387,6 +387,9 @@ int _ma_readinfo(register MARIA_HA *info, int lock_type, int check_keybuffer)
/* /*
Every isam-function that uppdates the isam-database MUST end with this Every isam-function that uppdates the isam-database MUST end with this
request request
NOTES
my_errno is not changed if this succeeds!
*/ */
int _ma_writeinfo(register MARIA_HA *info, uint operation) int _ma_writeinfo(register MARIA_HA *info, uint operation)
......
...@@ -4901,8 +4901,8 @@ my_bool translog_write_record(LSN *lsn, ...@@ -4901,8 +4901,8 @@ my_bool translog_write_record(LSN *lsn,
int rc; int rc;
uint short_trid= trn->short_id; uint short_trid= trn->short_id;
DBUG_ENTER("translog_write_record"); DBUG_ENTER("translog_write_record");
DBUG_PRINT("enter", ("type: %u ShortTrID: %u", DBUG_PRINT("enter", ("type: %u ShortTrID: %u rec_len: %lu",
(uint) type, (uint)short_trid)); (uint) type, (uint) short_trid, (ulong) rec_len));
if (tbl_info) if (tbl_info)
{ {
...@@ -4995,9 +4995,7 @@ my_bool translog_write_record(LSN *lsn, ...@@ -4995,9 +4995,7 @@ my_bool translog_write_record(LSN *lsn,
be add be add
*/ */
parts.total_record_length= parts.record_length; parts.total_record_length= parts.record_length;
DBUG_PRINT("info", ("record length: %lu %lu", DBUG_PRINT("info", ("record length: %lu", (ulong) parts.record_length));
(ulong) parts.record_length,
(ulong) parts.total_record_length));
/* process this parts */ /* process this parts */
if (!(rc= (log_record_type_descriptor[type].prewrite_hook && if (!(rc= (log_record_type_descriptor[type].prewrite_hook &&
......
...@@ -94,6 +94,7 @@ static MARIA_HA *maria_clone_internal(MARIA_SHARE *share, int mode, ...@@ -94,6 +94,7 @@ static MARIA_HA *maria_clone_internal(MARIA_SHARE *share, int mode,
int save_errno; int save_errno;
uint errpos; uint errpos;
MARIA_HA info,*m_info; MARIA_HA info,*m_info;
my_bitmap_map *changed_fields_bitmap;
DBUG_ENTER("maria_clone_internal"); DBUG_ENTER("maria_clone_internal");
errpos= 0; errpos= 0;
...@@ -120,6 +121,8 @@ static MARIA_HA *maria_clone_internal(MARIA_SHARE *share, int mode, ...@@ -120,6 +121,8 @@ static MARIA_HA *maria_clone_internal(MARIA_SHARE *share, int mode,
&info.first_mbr_key, share->base.max_key_length, &info.first_mbr_key, share->base.max_key_length,
&info.maria_rtree_recursion_state, &info.maria_rtree_recursion_state,
share->have_rtree ? 1024 : 0, share->have_rtree ? 1024 : 0,
&changed_fields_bitmap,
bitmap_buffer_size(share->base.fields),
NullS)) NullS))
goto err; goto err;
errpos= 6; errpos= 6;
...@@ -144,6 +147,8 @@ static MARIA_HA *maria_clone_internal(MARIA_SHARE *share, int mode, ...@@ -144,6 +147,8 @@ static MARIA_HA *maria_clone_internal(MARIA_SHARE *share, int mode,
info.errkey= -1; info.errkey= -1;
info.page_changed=1; info.page_changed=1;
info.keyread_buff= info.buff + share->base.max_key_block_length; info.keyread_buff= info.buff + share->base.max_key_block_length;
bitmap_init(&info.changed_fields, changed_fields_bitmap,
share->base.fields, 0);
if ((*share->init)(&info)) if ((*share->init)(&info))
goto err; goto err;
......
...@@ -728,8 +728,8 @@ static uint find_longest_bitstream(uint16 *table, uint16 *end) ...@@ -728,8 +728,8 @@ static uint find_longest_bitstream(uint16 *table, uint16 *end)
buf RETURN The buffer to receive the record. buf RETURN The buffer to receive the record.
RETURN RETURN
0 on success 0 On success
HA_ERR_WRONG_IN_RECORD or -1 on error # Error number
*/ */
int _ma_read_pack_record(MARIA_HA *info, uchar *buf, MARIA_RECORD_POS filepos) int _ma_read_pack_record(MARIA_HA *info, uchar *buf, MARIA_RECORD_POS filepos)
...@@ -739,7 +739,7 @@ int _ma_read_pack_record(MARIA_HA *info, uchar *buf, MARIA_RECORD_POS filepos) ...@@ -739,7 +739,7 @@ int _ma_read_pack_record(MARIA_HA *info, uchar *buf, MARIA_RECORD_POS filepos)
DBUG_ENTER("maria_read_pack_record"); DBUG_ENTER("maria_read_pack_record");
if (filepos == HA_OFFSET_ERROR) if (filepos == HA_OFFSET_ERROR)
DBUG_RETURN(-1); /* _search() didn't find record */ DBUG_RETURN(my_errno); /* _search() didn't find record */
file= info->dfile.file; file= info->dfile.file;
if (_ma_pack_get_block_info(info, &info->bit_buff, &block_info, if (_ma_pack_get_block_info(info, &info->bit_buff, &block_info,
...@@ -755,7 +755,7 @@ int _ma_read_pack_record(MARIA_HA *info, uchar *buf, MARIA_RECORD_POS filepos) ...@@ -755,7 +755,7 @@ int _ma_read_pack_record(MARIA_HA *info, uchar *buf, MARIA_RECORD_POS filepos)
panic: panic:
my_errno=HA_ERR_WRONG_IN_RECORD; my_errno=HA_ERR_WRONG_IN_RECORD;
err: err:
DBUG_RETURN(-1); DBUG_RETURN(my_errno);
} }
...@@ -1598,14 +1598,14 @@ static int _ma_read_mempack_record(MARIA_HA *info, uchar *buf, ...@@ -1598,14 +1598,14 @@ static int _ma_read_mempack_record(MARIA_HA *info, uchar *buf,
DBUG_ENTER("maria_read_mempack_record"); DBUG_ENTER("maria_read_mempack_record");
if (filepos == HA_OFFSET_ERROR) if (filepos == HA_OFFSET_ERROR)
DBUG_RETURN(-1); /* _search() didn't find record */ DBUG_RETURN(my_errno); /* _search() didn't find record */
if (!(pos= (uchar*) _ma_mempack_get_block_info(info, &info->bit_buff, if (!(pos= (uchar*) _ma_mempack_get_block_info(info, &info->bit_buff,
&block_info, &info->rec_buff, &block_info, &info->rec_buff,
&info->rec_buff_size, &info->rec_buff_size,
(uchar*) share->file_map+ (uchar*) share->file_map+
filepos))) filepos)))
DBUG_RETURN(-1); DBUG_RETURN(my_errno);
DBUG_RETURN(_ma_pack_rec_unpack(info, &info->bit_buff, buf, DBUG_RETURN(_ma_pack_rec_unpack(info, &info->bit_buff, buf,
pos, block_info.rec_len)); pos, block_info.rec_len));
} }
......
...@@ -1045,17 +1045,11 @@ prototype_undo_exec_hook(UNDO_ROW_UPDATE) ...@@ -1045,17 +1045,11 @@ prototype_undo_exec_hook(UNDO_ROW_UPDATE)
info->trn= trn; info->trn= trn;
info->trn->undo_lsn= lsn_korr(rec->header); info->trn->undo_lsn= lsn_korr(rec->header);
/*
For now we skip the page and directory entry. This is to be used
later when we mark rows as deleted.
*/
error= _ma_apply_undo_row_update(info, rec->lsn, error= _ma_apply_undo_row_update(info, rec->lsn,
log_record_buffer.str + LSN_STORE_SIZE + log_record_buffer.str + LSN_STORE_SIZE +
FILEID_STORE_SIZE + PAGE_STORE_SIZE + FILEID_STORE_SIZE,
DIRPOS_STORE_SIZE,
rec->record_length - rec->record_length -
(LSN_STORE_SIZE + FILEID_STORE_SIZE + (LSN_STORE_SIZE + FILEID_STORE_SIZE));
PAGE_STORE_SIZE + DIRPOS_STORE_SIZE));
info->trn= 0; info->trn= 0;
return error; return error;
} }
......
...@@ -183,26 +183,25 @@ int _ma_read_static_record(register MARIA_HA *info, register uchar *record, ...@@ -183,26 +183,25 @@ int _ma_read_static_record(register MARIA_HA *info, register uchar *record,
if (info->opt_flag & WRITE_CACHE_USED && if (info->opt_flag & WRITE_CACHE_USED &&
info->rec_cache.pos_in_file <= pos && info->rec_cache.pos_in_file <= pos &&
flush_io_cache(&info->rec_cache)) flush_io_cache(&info->rec_cache))
return(-1); return(my_errno);
info->rec_cache.seek_not_done=1; /* We have done a seek */ info->rec_cache.seek_not_done=1; /* We have done a seek */
error=info->s->file_read(info,(char*) record,info->s->base.reclength, error=info->s->file_read(info,(char*) record,info->s->base.reclength,
pos, MYF(MY_NABP)) != 0; pos, MYF(MY_NABP));
fast_ma_writeinfo(info);
if (! error) if (! error)
{ {
fast_ma_writeinfo(info);
if (!*record) if (!*record)
{ {
my_errno=HA_ERR_RECORD_DELETED; /* Record is deleted */
return(1); /* Record is deleted */ return ((my_errno=HA_ERR_RECORD_DELETED));
} }
info->update|= HA_STATE_AKTIV; /* Record is read */ info->update|= HA_STATE_AKTIV; /* Record is read */
return(0); return(0);
} }
return(-1); /* Error on read */
} }
fast_ma_writeinfo(info); /* No such record */ fast_ma_writeinfo(info); /* No such record */
return(-1); return(my_errno);
} }
...@@ -264,13 +263,7 @@ int _ma_read_rnd_static_record(MARIA_HA *info, uchar *buf, ...@@ -264,13 +263,7 @@ int _ma_read_rnd_static_record(MARIA_HA *info, uchar *buf,
if (! cache_read) /* No cacheing */ if (! cache_read) /* No cacheing */
{ {
if ((error= _ma_read_static_record(info, buf, filepos))) error= _ma_read_static_record(info, buf, filepos);
{
if (error > 0)
error=my_errno=HA_ERR_RECORD_DELETED;
else
error=my_errno;
}
DBUG_RETURN(error); DBUG_RETURN(error);
} }
......
...@@ -252,6 +252,9 @@ static int run_test(const char *filename) ...@@ -252,6 +252,9 @@ static int run_test(const char *filename)
exit(1); exit(1);
} }
if (maria_commit(file) || maria_begin(file))
goto err;
if (!skip_update) if (!skip_update)
{ {
if (opt_unique) if (opt_unique)
...@@ -289,7 +292,7 @@ static int run_test(const char *filename) ...@@ -289,7 +292,7 @@ static int run_test(const char *filename)
found=0; found=0;
while ((error= maria_scan(file,read_record)) == 0) while ((error= maria_scan(file,read_record)) == 0)
{ {
if (update_count-- == 0) { VOID(maria_close(file)) ; exit(0) ; } if (--update_count == 0) { VOID(maria_close(file)) ; exit(0) ; }
memcpy(record,read_record,rec_length); memcpy(record,read_record,rec_length);
update_record(record); update_record(record);
if (maria_update(file,read_record,record)) if (maria_update(file,read_record,record))
...@@ -304,6 +307,18 @@ static int run_test(const char *filename) ...@@ -304,6 +307,18 @@ static int run_test(const char *filename)
maria_scan_end(file); maria_scan_end(file);
} }
if (die_in_middle_of_transaction == 2)
{
/*
Ensure we get changed pages and log to disk
As commit record is not done, the undo entries needs to be rolled back.
*/
_ma_flush_table_files(file, MARIA_FLUSH_DATA, FLUSH_RELEASE,
FLUSH_RELEASE);
printf("Dying on request after update without maria_close()\n");
exit(1);
}
if (!silent) if (!silent)
printf("- Reopening file\n"); printf("- Reopening file\n");
if (maria_commit(file)) if (maria_commit(file))
...@@ -356,7 +371,7 @@ static int run_test(const char *filename) ...@@ -356,7 +371,7 @@ static int run_test(const char *filename)
} }
} }
if (die_in_middle_of_transaction == 2) if (die_in_middle_of_transaction == 3)
{ {
/* /*
Ensure we get changed pages and log to disk Ensure we get changed pages and log to disk
......
...@@ -16,8 +16,9 @@ ...@@ -16,8 +16,9 @@
/* This file is included by all internal maria files */ /* This file is included by all internal maria files */
#include "maria.h" /* Structs & some defines */ #include "maria.h" /* Structs & some defines */
#include "myisampack.h" /* packing of keys */ #include <myisampack.h> /* packing of keys */
#include <my_tree.h> #include <my_tree.h>
#include <my_bitmap.h>
#ifdef THREAD #ifdef THREAD
#include <my_pthread.h> #include <my_pthread.h>
#include <thr_lock.h> #include <thr_lock.h>
...@@ -437,6 +438,7 @@ struct st_maria_info ...@@ -437,6 +438,7 @@ struct st_maria_info
PAGECACHE_FILE dfile; /* The datafile */ PAGECACHE_FILE dfile; /* The datafile */
IO_CACHE rec_cache; /* When cacheing records */ IO_CACHE rec_cache; /* When cacheing records */
LIST open_list; LIST open_list;
MY_BITMAP changed_fields;
uint opt_flag; /* Optim. for space/speed */ uint opt_flag; /* Optim. for space/speed */
uint update; /* If file changed since open */ uint update; /* If file changed since open */
int lastinx; /* Last used index */ int lastinx; /* Last used index */
......
...@@ -93,7 +93,8 @@ int main(int argc, char **argv) ...@@ -93,7 +93,8 @@ int main(int argc, char **argv)
*/ */
fprintf(stdout, "TRACE of the last maria_read_log\n"); fprintf(stdout, "TRACE of the last maria_read_log\n");
if (maria_apply_log(lsn, opt_display_and_apply, stdout, TRUE)) if (maria_apply_log(lsn, opt_display_and_apply, stdout,
opt_display_and_apply))
goto err; goto err;
fprintf(stdout, "%s: SUCCESS\n", my_progname); fprintf(stdout, "%s: SUCCESS\n", my_progname);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment