Commit 13d53bf6 authored by unknown's avatar unknown

Merge some changes from sql directory in 5.1 tree

Changed format for REDO_INSERT_ROWS_BLOBS
Fixed several bugs in handling of big blobs
Added redo_free_head_or_tail() & redo_insert_row_blobs()
Added uuid to control file
maria_checks now verifies that not used part of bitmap is 0
REDO_PURGE_BLOCKS -> REDO_FREE_BLOCKS
Added REDO_FREE_HEAD_OR_TAIL
Fixes problem when trying to read block outside of file during REDO


include/my_global.h:
  STACK_DIRECTION is already set by configure
mysql-test/r/maria.result:
  Updated results
mysql-test/t/maria.test:
  Test shrinking of VARCHAR
mysys/my_realloc.c:
  Fixed indentation
mysys/safemalloc.c:
  Fixed indentation
sql/filesort.cc:
  Removed some casts
sql/mysqld.cc:
  Added missing setting of myisam_stats_method_str
sql/uniques.cc:
  Removed some casts
storage/maria/ma_bitmap.c:
  Added printing of bitmap (for debugging)
  Renamed _ma_print_bitmap() -> _ma_print_bitmap_changes()
  Added _ma_set_full_page_bits()
  Fixed bug in ma_bitmap_find_new_place() (affecting updates) when using big files
storage/maria/ma_blockrec.c:
  Changed format for REDO_INSERT_ROWS_BLOBS
  Fixed several bugs in handling of big blobs
  Added code to fix some cases where redo when using blobs didn't produce idenital .MAD files as normal usage
  REDO_FREE_ROW_BLOCKS doesn't anymore change pages; We only mark things free in bitmap
  Remove TAIL and filler extents from REDO_FREE_BLOCKS log entry. (Fixed some asserts)
  REDO_PURGE_BLOCKS -> REDO_FREE_BLOCKS
  Delete tails in update. (Fixed bug when doing update that shrinks blob/varchar length)
  Fixed bug when doing insert in block outside of file size.
  Added redo_free_head_or_tail() & redo_insert_row_blobs()
  Added pagecache_unlock_by_link() when read fails.
  Much more comments, DBUG and ASSERT entries
storage/maria/ma_blockrec.h:
  Prototypes of new functions
  Define of SUB_RANGE_SIZE & BLOCK_FILLER_SIZE
storage/maria/ma_check.c:
  Verify that not used part of bitmap is 0
storage/maria/ma_control_file.c:
  Added uuid to control file
storage/maria/ma_loghandler.c:
  REDO_PURGE_BLOCKS -> REDO_FREE_BLOCKS
  Added REDO_FREE_HEAD_OR_TAIL
storage/maria/ma_loghandler.h:
  REDO_PURGE_BLOCKS -> REDO_FREE_BLOCKS
  Added REDO_FREE_HEAD_OR_TAIL
storage/maria/ma_pagecache.c:
  If we write full block, remove error flag for block.
  (Fixes problem when trying to read block outside of file)
storage/maria/ma_recovery.c:
  REDO_PURGE_BLOCKS -> REDO_FREE_BLOCKS
  Added REDO_FREE_HEAD_OR_TAIL
storage/maria/ma_test1.c:
  Allow option after 'b' to be compatible with ma_test2
  (This is just to simplify test scripts like ma_test_recovery)
storage/maria/ma_test2.c:
  Default size of blob is now 1000 instead of 1
storage/maria/ma_test_all.sh:
  Added test for bigger blobs
storage/maria/ma_test_recovery.expected:
  Updated results
storage/maria/ma_test_recovery:
  Added test for bigger blobs
parent df30832d
......@@ -88,13 +88,6 @@
#endif
#endif /* _WIN32... */
/*
STACK_DIRECTION was removed from 5.1 and then that was merged into Maria;
then it was added back into 5.1 but not yet merged into Maria.
When merge done, remove this.
*/
#define STACK_DIRECTION -1
/* Make it easier to add conditionl code for windows */
#ifdef __WIN__
#define IF_WIN(A,B) (A)
......
......@@ -679,10 +679,18 @@ test.t1 3235292310
checksum table t1 extended;
Table Checksum
test.t1 3235292310
alter table t1 engine=myisam;
alter table t1 row_format=fixed;
checksum table t1;
Table Checksum
test.t1 3235292310
alter table t1 row_format=dynamic;
checksum table t1;
Table Checksum
test.t1 4183529555
alter table t1 engine=myisam;
checksum table t1;
Table Checksum
test.t1 4183529555
drop table t1;
show variables like 'maria_stats_method';
Variable_name Value
......@@ -1856,7 +1864,7 @@ t1 CREATE TABLE `t1` (
drop table t1;
create table t1 (a int) row_format=dynamic transactional=1;
Warnings:
Note 1475 Row format set to PAGE because of TRANSACTIONAL=1 option
Note 1477 Row format set to PAGE because of TRANSACTIONAL=1 option
show create table t1;
Table Create Table
t1 CREATE TABLE `t1` (
......@@ -1870,7 +1878,7 @@ t1 CREATE TABLE `t1` (
) ENGINE=MARIA DEFAULT CHARSET=latin1 ROW_FORMAT=PAGE TRANSACTIONAL=1
alter table t1 row_format=DYNAMIC;
Warnings:
Note 1475 Row format set to PAGE because of TRANSACTIONAL=1 option
Note 1477 Row format set to PAGE because of TRANSACTIONAL=1 option
show create table t1;
Table Create Table
t1 CREATE TABLE `t1` (
......@@ -1966,6 +1974,21 @@ check table t1;
Table Op Msg_type Msg_text
test.t1 check status OK
drop table t1;
CREATE TABLE t1 (a int, b int, v varchar(60000)) checksum=1 engine=maria;
insert into t1 values (1,1,"aaa"),(1,2,null);
checksum table t1;
Table Checksum
test.t1 1112804611
lock table t1 write;
insert into t1 values (1,3,repeat('c',30000)),(4,4,repeat('a',30000));
update t1 set v="row5" where b=4;
delete from t1 where b=3;
select a, b, length(v) from t1;
a b length(v)
1 1 3
1 2 NULL
4 4 4
drop table t1;
CREATE TABLE t1 (
auto int(5) unsigned NOT NULL auto_increment,
string char(10) default "hello",
......
......@@ -622,7 +622,6 @@ update t1 set c2='A B' where c1=2;
check table t1;
drop table t1;
#
# Test CHECKSUM TABLE
#
......@@ -646,9 +645,9 @@ INSERT INTO t1 VALUES (11,91);
check table t1 extended;
checksum table t1;
checksum table t1 extended;
alter table t1 row_format=static;
alter table t1 row_format=fixed;
checksum table t1;
alter table t1 row_format=packed;
alter table t1 row_format=dynamic;
checksum table t1;
alter table t1 engine=myisam;
checksum table t1;
......@@ -1231,6 +1230,20 @@ update t1 set c=repeat('a',8192*2) where a between 200 and 202;
check table t1;
drop table t1;
#
# Test where we shrink varchar
#
CREATE TABLE t1 (a int, b int, v varchar(60000)) checksum=1 engine=maria;
insert into t1 values (1,1,"aaa"),(1,2,null);
checksum table t1;
lock table t1 write;
insert into t1 values (1,3,repeat('c',30000)),(4,4,repeat('a',30000));
update t1 set v="row5" where b=4;
delete from t1 where b=3;
select a, b, length(v) from t1;
drop table t1;
#
# Test tail pages for blobs
#
......
......@@ -32,6 +32,7 @@
@note if size==0 realloc() may return NULL; my_realloc() treats this as an
error which is not the intention of realloc()
*/
void* my_realloc(void* oldpoint, size_t size, myf my_flags)
{
void *point;
......
......@@ -436,7 +436,6 @@ void TERMINATE(FILE *file, uint flag)
This is usefull to call from withing a debugger
*/
void sf_malloc_report_allocated(void *memory)
{
struct st_irem *irem;
......
......@@ -1169,7 +1169,7 @@ int merge_buffers(SORTPARAM *param, IO_CACHE *from_file,
DBUG_RETURN(1); /* purecov: inspected */
for (buffpek= Fb ; buffpek <= Tb ; buffpek++)
{
buffpek->base= (uchar*) strpos;
buffpek->base= strpos;
buffpek->max_keys= maxcount;
strpos+= (uint) (error= (int) read_to_buffer(from_file, buffpek,
rec_length));
......@@ -1296,12 +1296,12 @@ int merge_buffers(SORTPARAM *param, IO_CACHE *from_file,
else
{
register uchar *end;
strpos= (uchar*) buffpek->key+offset;
strpos= buffpek->key+offset;
for (end= strpos+buffpek->mem_count*rec_length ;
strpos != end ;
strpos+= rec_length)
{
if (my_b_write(to_file, (uchar *) strpos, res_length))
if (my_b_write(to_file, strpos, res_length))
{
error=1; goto err;
}
......
......@@ -7755,6 +7755,7 @@ mysqld_get_one_option(int optid,
int method;
LINT_INIT(method_conv);
myisam_stats_method_str= argument;
method= find_type_or_exit(argument, &myisam_stats_method_typelib,
opt->name);
switch (method-1) {
......
......@@ -443,7 +443,7 @@ static bool merge_walk(uchar *merge_buffer, ulong merge_buffer_size,
*/
for (top= begin; top != end; ++top)
{
top->base= (uchar*) (merge_buffer + (top - begin) * piece_size);
top->base= merge_buffer + (top - begin) * piece_size;
top->max_keys= max_key_count_per_piece;
bytes_read= read_to_buffer(file, top, key_length);
if (bytes_read == (uint) (-1))
......
......@@ -448,7 +448,7 @@ const char *bits_to_txt[]=
"tail 00-40 % full", "tail 40-80 % full", "tail/blob full"
};
static void _ma_print_bitmap(MARIA_FILE_BITMAP *bitmap)
static void _ma_print_bitmap_changes(MARIA_FILE_BITMAP *bitmap)
{
uchar *pos, *end, *org_pos;
ulong page;
......@@ -487,6 +487,46 @@ static void _ma_print_bitmap(MARIA_FILE_BITMAP *bitmap)
memcpy(bitmap->map + bitmap->block_size, bitmap->map, bitmap->block_size);
}
/* Print content of bitmap for debugging */
void _ma_print_bitmap(MARIA_FILE_BITMAP *bitmap, uchar *data,
ulonglong page)
{
uchar *pos, *end;
char llbuff[22];
end= bitmap->map + bitmap->used_size;
DBUG_LOCK_FILE;
fprintf(DBUG_FILE,"\nDump of bitmap page at %s\n", llstr(page, llbuff));
page++; /* Skip bitmap page */
for (pos= data, end= pos + bitmap->total_size;
pos < end ;
pos+= 6)
{
ulonglong bits= uint6korr(pos); /* 6 bytes = 6*8/3= 16 patterns */
/*
Test if there is any changes in the next 16 bitmaps (to not have to
loop through all bits if we know they are the same)
*/
if (bits)
{
uint i;
for (i= 0; i < 16 ; i++, bits>>= 3)
{
if (bits & 7)
fprintf(DBUG_FILE, "Page: %8s %s\n", llstr(page+i, llbuff),
bits_to_txt[bits & 7]);
}
}
page+= 16;
}
fputc('\n', DBUG_FILE);
DBUG_UNLOCK_FILE;
}
#endif /* DBUG_OFF */
......@@ -698,7 +738,7 @@ static void fill_block(MARIA_FILE_BITMAP *bitmap,
tmp= (tmp & ~(7 << offset)) | (fill_pattern << offset);
int2store(data, tmp);
bitmap->changed= 1;
DBUG_EXECUTE("bitmap", _ma_print_bitmap(bitmap););
DBUG_EXECUTE("bitmap", _ma_print_bitmap_changes(bitmap););
}
......@@ -1037,7 +1077,7 @@ static ulong allocate_full_pages(MARIA_FILE_BITMAP *bitmap,
int6store(best_data, best_prefix_bits);
if (!(best_area_size-= best_prefix_area_size))
{
DBUG_EXECUTE("bitmap", _ma_print_bitmap(bitmap););
DBUG_EXECUTE("bitmap", _ma_print_bitmap_changes(bitmap););
DBUG_RETURN(block->page_count);
}
best_data+= 6;
......@@ -1055,7 +1095,7 @@ static ulong allocate_full_pages(MARIA_FILE_BITMAP *bitmap,
if (data_end < best_data)
bitmap->used_size= (uint) (best_data - bitmap->map);
bitmap->changed= 1;
DBUG_EXECUTE("bitmap", _ma_print_bitmap(bitmap););
DBUG_EXECUTE("bitmap", _ma_print_bitmap_changes(bitmap););
DBUG_RETURN(block->page_count);
}
......@@ -1318,7 +1358,7 @@ static void use_head(MARIA_HA *info, ulonglong page, uint size,
tmp= (tmp & ~(7 << offset)) | (FULL_HEAD_PAGE << offset);
int2store(data, tmp);
bitmap->changed= 1;
DBUG_EXECUTE("bitmap", _ma_print_bitmap(bitmap););
DBUG_EXECUTE("bitmap", _ma_print_bitmap_changes(bitmap););
}
......@@ -1564,6 +1604,7 @@ my_bool _ma_bitmap_find_new_place(MARIA_HA *info, MARIA_ROW *row,
my_bool res= 1;
uint full_page_size, position;
uint head_length, row_length, rest_length, extents_length;
ulonglong bitmap_page;
DBUG_ENTER("_ma_bitmap_find_new_place");
blocks->count= 0;
......@@ -1572,9 +1613,11 @@ my_bool _ma_bitmap_find_new_place(MARIA_HA *info, MARIA_ROW *row,
info->bitmap_blocks.elements= ELEMENTS_RESERVED_FOR_MAIN_PART;
pthread_mutex_lock(&share->bitmap.bitmap_lock);
if (share->bitmap.page != page / share->bitmap.pages_covered &&
_ma_change_bitmap_page(info, &share->bitmap,
page / share->bitmap.pages_covered))
bitmap_page= page / share->bitmap.pages_covered;
bitmap_page*= share->bitmap.pages_covered;
if (share->bitmap.page != bitmap_page &&
_ma_change_bitmap_page(info, &share->bitmap, bitmap_page))
goto abort;
/*
......@@ -1673,7 +1716,7 @@ static my_bool set_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap,
int2store(data, tmp);
bitmap->changed= 1;
DBUG_EXECUTE("bitmap", _ma_print_bitmap(bitmap););
DBUG_EXECUTE("bitmap", _ma_print_bitmap_changes(bitmap););
if (fill_pattern != 3 && fill_pattern != 7)
set_if_smaller(info->s->state.first_bitmap_with_space, bitmap_page);
/*
......@@ -1800,7 +1843,81 @@ my_bool _ma_reset_full_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap,
}
set_if_smaller(info->s->state.first_bitmap_with_space, bitmap_page);
bitmap->changed= 1;
DBUG_EXECUTE("bitmap", _ma_print_bitmap(bitmap););
DBUG_EXECUTE("bitmap", _ma_print_bitmap_changes(bitmap););
DBUG_RETURN(0);
}
/*
Set all pages in a region as used
SYNOPSIS
_ma_set_full_page_bits()
info Maria handler
bitmap Bitmap handler
page Start page
page_count Number of pages
NOTES
We assume that all pages in region is covered by same bitmap
One must have a lock on info->s->bitmap.bitmap_lock
RETURN
0 ok
1 Error (when reading bitmap)
*/
my_bool _ma_set_full_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap,
ulonglong page, uint page_count)
{
ulonglong bitmap_page;
uint offset, bit_start, bit_count, tmp;
uchar *data;
DBUG_ENTER("_ma_set_full_page_bits");
DBUG_PRINT("enter", ("page: %lu page_count: %u", (ulong) page, page_count));
safe_mutex_assert_owner(&info->s->bitmap.bitmap_lock);
bitmap_page= page - page % bitmap->pages_covered;
if (bitmap_page != bitmap->page &&
_ma_change_bitmap_page(info, bitmap, bitmap_page))
DBUG_RETURN(1);
/* Find page number from start of bitmap */
page= page - bitmap->page - 1;
/* Set bits from 'page * 3' -> '(page + page_count) * 3' */
bit_start= page * 3;
bit_count= page_count * 3;
data= bitmap->map + bit_start / 8;
offset= bit_start & 7;
tmp= (255 << offset); /* Bits to keep */
if (bit_count + offset < 8)
{
/* Only set bits between 'offset' and 'offset+bit_count-1' */
tmp^= (255 << (offset + bit_count));
}
*data|= tmp;
if ((int) (bit_count-= (8 - offset)) > 0)
{
uint fill;
data++;
/*
-1 is here to avoid one 'if' statement and to let the following code
handle the last byte
*/
if ((fill= (bit_count - 1) / 8))
{
bfill(data, fill, 255);
data+= fill;
}
bit_count-= fill * 8; /* Bits left to set */
tmp= (1 << bit_count) - 1;
*data|= tmp;
}
bitmap->changed= 1;
DBUG_EXECUTE("bitmap", _ma_print_bitmap_changes(bitmap););
DBUG_RETURN(0);
}
......@@ -1930,16 +2047,17 @@ my_bool _ma_bitmap_free_full_pages(MARIA_HA *info, const uchar *extents,
DBUG_ENTER("_ma_bitmap_free_full_pages");
pthread_mutex_lock(&info->s->bitmap.bitmap_lock);
for (; count--; extents += ROW_EXTENT_SIZE)
for (; count--; extents+= ROW_EXTENT_SIZE)
{
ulonglong page= uint5korr(extents);
uint page_count= uint2korr(extents + ROW_EXTENT_PAGE_SIZE);
if (!(page_count & TAIL_BIT))
{
if (page == 0 && page_count == 0)
continue; /* Not used extent */
if (pagecache_delete_pages(info->s->pagecache, &info->dfile, page,
page_count, PAGECACHE_LOCK_WRITE, 1))
DBUG_RETURN(1);
if (_ma_reset_full_page_bits(info, &info->s->bitmap, page, page_count))
page_count, PAGECACHE_LOCK_WRITE, 1) ||
_ma_reset_full_page_bits(info, &info->s->bitmap, page, page_count))
{
pthread_mutex_unlock(&info->s->bitmap.bitmap_lock);
DBUG_RETURN(1);
......
......@@ -342,7 +342,8 @@ static void _ma_print_directory(uchar *buff, uint block_size);
static void compact_page(uchar *buff, uint block_size, uint rownr,
my_bool extend_block);
static uchar *store_page_range(uchar *to, MARIA_BITMAP_BLOCK *block,
uint block_size, ulong length);
uint block_size, ulong length,
uint *tot_ranges);
static size_t fill_insert_undo_parts(MARIA_HA *info, const uchar *record,
LEX_STRING *log_parts,
uint *log_parts_count);
......@@ -733,7 +734,7 @@ static my_bool extend_area_on_page(uchar *buff, uchar *dir,
Used mainly to detect rows with wrong extent information
*/
static my_bool check_if_zero(uchar *pos, uint length)
my_bool _ma_check_if_zero(uchar *pos, uint length)
{
uchar *end;
for (end= pos+ length; pos != end ; pos++)
......@@ -1288,12 +1289,13 @@ static void make_empty_page(MARIA_HA *info, uchar *buff, uint page_type)
bzero(buff, PAGE_HEADER_SIZE);
#ifndef DONT_ZERO_PAGE_BLOCKS
#if !defined(DONT_ZERO_PAGE_BLOCKS) || defined(HAVE_purify)
/*
We zero the rest of the block to avoid getting old memory information
to disk and to allow the file to be compressed better if archived.
The code does not assume the block is zeroed.
*/
if (page_type != BLOB_PAGE)
bzero(buff+ PAGE_HEADER_SIZE, block_size - PAGE_HEADER_SIZE -
DIR_ENTRY_SIZE - PAGE_SUFFIX_SIZE);
#endif
......@@ -1596,6 +1598,13 @@ static my_bool write_full_pages(MARIA_HA *info,
copy_length= min(data_size, length);
memcpy(buff + LSN_SIZE + PAGE_TYPE_SIZE, data, copy_length);
length-= copy_length;
#ifdef IDENTICAL_PAGES_AFTER_RECOVERY
if (copy_length != data_size)
bzero(buff + block_size - PAGE_SUFFIX_SIZE - (data_size - copy_length),
(data_size - copy_length));
#endif
if (!(info->s->options & HA_OPTION_PAGE_CHECKSUM))
bfill(buff + block_size - KEYPAGE_CHECKSUM_SIZE,
KEYPAGE_CHECKSUM_SIZE, (uchar) 255);
......@@ -1627,19 +1636,40 @@ static my_bool write_full_pages(MARIA_HA *info,
length Length of data to be written
Normally this is full pages, except for the last
tail block that may only partly fit the last page.
tot_ranges Add here the number of ranges used
NOTES
The format of one entry is:
Ranges SUB_RANGE_SIZE
Empty bytes at end of last byte BLOCK_FILLER_SIZE
For each range
Page number PAGE_STORE_SIZE
Number of pages PAGERANGE_STORE_SIZE
RETURN
# end position for 'to'
*/
static uchar *store_page_range(uchar *to, MARIA_BITMAP_BLOCK *block,
uint block_size, ulong length)
uint block_size, ulong length,
uint *tot_ranges)
{
uint data_size= FULL_PAGE_SIZE(block_size);
ulong pages_left= (length + data_size -1) / data_size;
uint page_count;
uint page_count, ranges, empty_space;
uchar *to_start;
DBUG_ENTER("store_page_range");
to_start= to;
to+= SUB_RANGE_SIZE;
/* Store number of unused bytes at last page */
empty_space= pages_left * data_size - length;
int2store(to, empty_space);
to+= BLOCK_FILLER_SIZE;
ranges= 0;
do
{
ulonglong page;
......@@ -1653,7 +1683,12 @@ static uchar *store_page_range(uchar *to, MARIA_BITMAP_BLOCK *block,
to+= PAGE_STORE_SIZE;
pagerange_store(to, page_count);
to+= PAGERANGE_STORE_SIZE;
ranges++;
} while ((pages_left-= page_count));
/* Store number of ranges for this block */
int2store(to_start, ranges);
(*tot_ranges)+= ranges;
DBUG_RETURN(to);
}
......@@ -1710,6 +1745,10 @@ static void store_extent_info(uchar *to,
/*
Free regions of pages with logging
NOTES
We are removing filler events and tail page events from
row->extents to get smaller log.
RETURN
0 ok
1 error
......@@ -1720,20 +1759,78 @@ static my_bool free_full_pages(MARIA_HA *info, MARIA_ROW *row)
uchar log_data[FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE];
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 2];
LSN lsn;
size_t extents_length= row->extents_count * ROW_EXTENT_SIZE;
size_t extents_length;
uchar *extents= row->extents;
DBUG_ENTER("free_full_pages");
if (info->s->now_transactional)
{
/* Compact events by removing filler and tail events */
uchar *start= extents;
uchar *new_block= 0;
uchar *end;
for (end= extents + row->extents_count * ROW_EXTENT_SIZE ;
extents < end ;
extents+= ROW_EXTENT_SIZE)
{
uint page_count= uint2korr(extents + ROW_EXTENT_PAGE_SIZE);
if (! (page_count & TAIL_BIT) && page_count != 0)
{
/* Found correct extent */
if (!new_block)
new_block= extents; /* First extent in range */
continue;
}
/* Found extent to remove, move everything found up */
if (new_block)
{
if (new_block == start)
start= extents;
else
{
size_t length= (size_t) (extents - new_block);
memmove(start, new_block, length);
start+= length;
}
}
new_block= 0;
}
if (new_block)
{
if (new_block == start)
start= extents; /* Nothing to delete */
else
{
/* Move rest down */
size_t length= (size_t) (extents - new_block);
memmove(start, new_block, length);
start+= length;
}
}
if (!unlikely(extents_length= (start - row->extents)))
{
/*
No ranges. This happens in the rear case when we have a allocated
place for a blob on a tail page but it did fit into the main page.
*/
DBUG_RETURN(0);
}
row->extents_count= extents_length / ROW_EXTENT_SIZE;
pagerange_store(log_data + FILEID_STORE_SIZE,
row->extents_count);
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
log_array[TRANSLOG_INTERNAL_PARTS + 1].str= row->extents;
log_array[TRANSLOG_INTERNAL_PARTS + 1].length= extents_length;
if (translog_write_record(&lsn, LOGREC_REDO_PURGE_BLOCKS, info->trn,
if (translog_write_record(&lsn, LOGREC_REDO_FREE_BLOCKS, info->trn,
info, sizeof(log_data) + extents_length,
TRANSLOG_INTERNAL_PARTS + 2, log_array,
log_data, NULL))
DBUG_RETURN(1);
}
DBUG_RETURN(_ma_bitmap_free_full_pages(info, row->extents,
row->extents_count));
......@@ -1776,7 +1873,7 @@ static my_bool free_full_page_range(MARIA_HA *info, ulonglong page, uint count)
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
if (translog_write_record(&lsn, LOGREC_REDO_PURGE_BLOCKS,
if (translog_write_record(&lsn, LOGREC_REDO_FREE_BLOCKS,
info->trn, info, sizeof(log_data),
TRANSLOG_INTERNAL_PARTS + 1, log_array,
log_data, NULL))
......@@ -2343,14 +2440,16 @@ static my_bool write_block_record(MARIA_HA *info,
We write all here to be able to generate the UNDO record early
so that we can write the LSN for the UNDO record to all full pages.
*/
uchar tmp_log_data[FILEID_STORE_SIZE + LSN_STORE_SIZE + PAGE_STORE_SIZE +
ROW_EXTENT_SIZE * ROW_EXTENTS_ON_STACK];
uchar tmp_log_data[FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE +
(ROW_EXTENT_SIZE + BLOCK_FILLER_SIZE + SUB_RANGE_SIZE) *
ROW_EXTENTS_ON_STACK];
uchar *log_data, *log_pos;
LEX_STRING tmp_log_array[TRANSLOG_INTERNAL_PARTS + 2 +
ROW_EXTENTS_ON_STACK];
LEX_STRING *log_array_pos, *log_array;
int error;
ulong log_entry_length= 0;
uint ext_length, extents= 0, sub_extents= 0;
/* If few extents, then allocate things on stack to avoid a malloc call */
if (bitmap_blocks->count < ROW_EXTENTS_ON_STACK)
......@@ -2364,23 +2463,27 @@ static my_bool write_block_record(MARIA_HA *info,
(uint) ((bitmap_blocks->count +
TRANSLOG_INTERNAL_PARTS + 2) *
sizeof(*log_array)),
&log_data, bitmap_blocks->count * ROW_EXTENT_SIZE,
&log_data, FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE +
bitmap_blocks->count * (ROW_EXTENT_SIZE +
BLOCK_FILLER_SIZE +
SUB_RANGE_SIZE),
NullS))
goto disk_err;
}
log_pos= log_data + FILEID_STORE_SIZE;
log_pos= log_data + FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE * 2;
log_array_pos= log_array+ TRANSLOG_INTERNAL_PARTS+1;
if (tmp_data_used)
{
/* Full head pages */
/* Full head page */
size_t data_length= (ulong) (tmp_data - info->rec_buff);
log_pos= store_page_range(log_pos, head_block+1, block_size,
data_length);
data_length, &extents);
log_array_pos->str= (char*) info->rec_buff;
log_array_pos->length= data_length;
log_entry_length+= data_length;
log_array_pos++;
sub_extents++;
}
if (blob_full_pages_exists)
{
......@@ -2398,6 +2501,10 @@ static my_bool write_block_record(MARIA_HA *info,
continue;
length= tmp_column->length - portable_sizeof_char_ptr;
blob_length= *tmp_blob_lengths;
/*
If last part of blog was on tail page, change blob_length to
reflect this
*/
if (tmp_block[tmp_block->sub_blocks - 1].used & BLOCKUSED_TAIL)
blob_length-= (blob_length % FULL_PAGE_SIZE(block_size));
if (blob_length)
......@@ -2408,19 +2515,23 @@ static my_bool write_block_record(MARIA_HA *info,
log_array_pos->length= blob_length;
log_entry_length+= blob_length;
log_array_pos++;
sub_extents++;
log_pos= store_page_range(log_pos, tmp_block, block_size,
blob_length);
blob_length, &extents);
tmp_block+= tmp_block->sub_blocks;
}
}
}
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (size_t) (log_pos -
log_data);
log_entry_length+= (log_pos - log_data);
ext_length= (uint) (log_pos - log_data);
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= ext_length;
pagerange_store(log_data+ FILEID_STORE_SIZE, extents);
pagerange_store(log_data+ FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE,
sub_extents);
log_entry_length+= ext_length;
/* trn->rec_lsn is already set earlier in this function */
error= translog_write_record(&lsn, LOGREC_REDO_INSERT_ROW_BLOBS,
info->trn, info, log_entry_length,
......@@ -2847,7 +2958,7 @@ static my_bool _ma_update_block_record2(MARIA_HA *info,
DBUG_ASSERT(share->pagecache->block_size == block_size);
if (!(buff= pagecache_read(share->pagecache,
&info->dfile, (my_off_t) page, 0,
&info->dfile, (pgcache_page_no_t) page, 0,
info->buff, share->page_type,
PAGECACHE_LOCK_WRITE, &page_link.link)))
DBUG_RETURN(1);
......@@ -2890,6 +3001,9 @@ static my_bool _ma_update_block_record2(MARIA_HA *info,
/* Update cur_row, if someone calls update at once again */
cur_row->head_length= new_row->total_length;
if (*cur_row->tail_positions &&
delete_tails(info, cur_row->tail_positions))
goto err;
if (cur_row->extents_count && free_full_pages(info, cur_row))
goto err;
res= write_block_record(info, oldrec, record, new_row, blocks,
......@@ -3152,16 +3266,12 @@ static my_bool delete_head_or_tail(MARIA_HA *info,
{
if (info->s->now_transactional)
{
uchar log_data[FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE +
PAGE_STORE_SIZE + PAGERANGE_STORE_SIZE];
uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE];
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
pagerange_store(log_data + FILEID_STORE_SIZE, 1);
page_store(log_data + FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE, page);
pagerange_store(log_data + FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE +
PAGE_STORE_SIZE, 1);
page_store(log_data + FILEID_STORE_SIZE, page);
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
if (translog_write_record(&lsn, LOGREC_REDO_PURGE_BLOCKS,
if (translog_write_record(&lsn, LOGREC_REDO_FREE_HEAD_OR_TAIL,
info->trn, info, sizeof(log_data),
TRANSLOG_INTERNAL_PARTS + 1, log_array,
log_data, NULL))
......@@ -3521,15 +3631,15 @@ static my_bool read_long_data(MARIA_HA *info, uchar *to, ulong length,
uchar **data, uchar **end_of_data)
{
DBUG_ENTER("read_long_data");
DBUG_PRINT("enter", ("length: %lu", length));
DBUG_PRINT("enter", ("length: %lu left_length: %u",
length, (uint) (*end_of_data - *data)));
DBUG_ASSERT(*data <= *end_of_data);
/*
Fields are never split in middle. This means that if length > rest-of-data
we should start reading from the next extent. The reason we may have
data left on the page is that there fixed part of the row was less than
min_row_length and in this case the head block was extended to
min_row_length.
data left on the page is that if the fixed part of the row was less than
min_row_length the head block was extended to min_row_length.
This may change in the future, which is why we have the loop written
the way it's written.
......@@ -3545,6 +3655,7 @@ static my_bool read_long_data(MARIA_HA *info, uchar *to, ulong length,
{
memcpy(to, *data, length);
(*data)+= length;
DBUG_PRINT("info", ("left_length: %u", left_length - (uint) length));
DBUG_RETURN(0);
}
memcpy(to, *data, left_length);
......@@ -3816,7 +3927,7 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record,
/*
After we have read one extent, then each blob is in it's own extent
*/
if (extent.first_extent && (ulong) (end_of_data - data) < blob_length)
if (!extent.first_extent || (ulong) (end_of_data - data) < blob_length)
end_of_data= data; /* Force read of next extent */
if (read_long_data(info, blob_buffer, blob_length, &extent, &data,
......@@ -3842,10 +3953,17 @@ int _ma_read_block_record2(MARIA_HA *info, uchar *record,
if (extent.page_count)
goto err;
if (extent.extent_count > 1)
if (check_if_zero(extent.extent + ROW_EXTENT_SIZE,
{
if (_ma_check_if_zero(extent.extent + ROW_EXTENT_SIZE,
(extent.extent_count-1) * ROW_EXTENT_SIZE))
{
DBUG_PRINT("error", ("Data in extent is not zero"));
DBUG_DUMP("extent", extent.extent + ROW_EXTENT_SIZE,
(extent.extent_count-1) * ROW_EXTENT_SIZE);
goto err;
}
}
}
else
{
DBUG_PRINT("info", ("Row read"));
......@@ -5014,13 +5132,12 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn,
uint rownr, empty_space;
uint block_size= share->block_size;
uint rec_offset;
uchar *buff= info->keyread_buff, *dir;
uchar *buff, *dir;
MARIA_PINNED_PAGE page_link;
enum pagecache_page_lock unlock_method;
enum pagecache_page_pin unpin_method;
DBUG_ENTER("_ma_apply_redo_insert_row_head_or_tail");
info->keyread_buff_used= 1;
page= page_korr(header);
rownr= dirpos_korr(header+PAGE_STORE_SIZE);
......@@ -5040,12 +5157,15 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn,
DBUG_ASSERT(rownr == 0);
if (rownr != 0)
goto err;
unlock_method= PAGECACHE_LOCK_LEFT_UNLOCKED;
unpin_method= PAGECACHE_PIN_LEFT_UNPINNED;
buff= info->keyread_buff;
info->keyread_buff_used= 1;
make_empty_page(info, buff, page_type);
empty_space= (block_size - PAGE_OVERHEAD_SIZE);
rec_offset= PAGE_HEADER_SIZE;
dir= buff+ block_size - PAGE_SUFFIX_SIZE - DIR_ENTRY_SIZE;
unlock_method= PAGECACHE_LOCK_LEFT_UNLOCKED;
unpin_method= PAGECACHE_PIN_LEFT_UNPINNED;
}
else
{
......@@ -5053,8 +5173,21 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn,
page, 0, 0,
PAGECACHE_PLAIN_PAGE, PAGECACHE_LOCK_WRITE,
&page_link.link)))
{
if (my_errno != -1) /* If not read outside of file */
{
pagecache_unlock_by_link(share->pagecache, page_link.link,
PAGECACHE_LOCK_WRITE_UNLOCK,
PAGECACHE_UNPIN, LSN_IMPOSSIBLE,
LSN_IMPOSSIBLE);
DBUG_RETURN(my_errno);
if (lsn_korr(buff) >= lsn) /* Test if already applied */
}
/* Create new page */
buff= info->keyread_buff;
info->keyread_buff_used= 1;
buff[PAGE_TYPE_OFFSET]= UNALLOCATED_PAGE;
}
else if (lsn_korr(buff) >= lsn) /* Test if already applied */
{
pagecache_unlock_by_link(share->pagecache, page_link.link,
PAGECACHE_LOCK_WRITE_UNLOCK,
......@@ -5213,7 +5346,13 @@ uint _ma_apply_redo_purge_row_head_or_tail(MARIA_HA *info, LSN lsn,
page, 0, 0,
PAGECACHE_PLAIN_PAGE, PAGECACHE_LOCK_WRITE,
&page_link.link)))
{
pagecache_unlock_by_link(share->pagecache, page_link.link,
PAGECACHE_LOCK_WRITE_UNLOCK,
PAGECACHE_UNPIN, LSN_IMPOSSIBLE,
LSN_IMPOSSIBLE);
DBUG_RETURN(my_errno);
}
if (lsn_korr(buff) >= lsn)
{
......@@ -5268,51 +5407,94 @@ err:
/**
@brief Apply LOGREC_REDO_PURGE_BLOCKS
@brief Apply LOGREC_REDO_FREE_BLOCKS
@param info Maria handler
@param header Header (without FILEID)
@note It marks the page free in the bitmap, and sets the directory's count
to 0.
@note It marks the pages free in the bitmap
@return Operation status
@retval 0 OK
@retval !=0 Error
@retval 1 Error
*/
uint _ma_apply_redo_purge_blocks(MARIA_HA *info,
LSN lsn, const uchar *header)
uint _ma_apply_redo_free_blocks(MARIA_HA *info,
LSN lsn __attribute__((unused)),
const uchar *header)
{
MARIA_SHARE *share= info->s;
ulonglong page;
uint page_range, ranges;
uint res= 0;
uchar *buff= info->keyread_buff;
uint ranges;
DBUG_ENTER("_ma_apply_redo_purge_blocks");
info->keyread_buff_used= 1;
ranges= pagerange_korr(header);
header+= PAGERANGE_STORE_SIZE;
DBUG_ASSERT(ranges > 0);
while (ranges--)
{
uint i;
page= page_korr(header);
my_bool res;
uint page_range;
ulonglong page, start_page;
start_page= page= page_korr(header);
header+= PAGE_STORE_SIZE;
page_range= pagerange_korr(header);
header+= PAGERANGE_STORE_SIZE;
for (i= 0; i < page_range ; i++)
{
DBUG_PRINT("info", ("page: %lu pages: %u", (long) page, page_range));
DBUG_ASSERT((page_range & TAIL_BIT) == 0);
/** @todo leave bitmap lock to the bitmap code... */
pthread_mutex_lock(&share->bitmap.bitmap_lock);
res= _ma_reset_full_page_bits(info, &share->bitmap, start_page,
page_range);
pthread_mutex_unlock(&share->bitmap.bitmap_lock);
if (res)
DBUG_RETURN(res);
}
DBUG_RETURN(0);
}
/**
@brief Apply LOGREC_REDO_FREE_HEAD_OR_TAIL
@param info Maria handler
@param header Header (without FILEID)
@note It marks the page free in the bitmap, and sets the directory's count
to 0.
@return Operation status
@retval 0 OK
@retval 1 Error
*/
uint _ma_apply_redo_free_head_or_tail(MARIA_HA *info, LSN lsn,
const uchar *header)
{
MARIA_SHARE *share= info->s;
uchar *buff;
ulonglong page;
MARIA_PINNED_PAGE page_link;
my_bool res;
DBUG_ENTER("_ma_apply_redo_free_head_or_tail");
page= page_korr(header);
if (!(buff= pagecache_read(share->pagecache,
&info->dfile,
page+i, 0,
buff, PAGECACHE_PLAIN_PAGE,
page, 0, 0,
PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_WRITE, &page_link.link)))
DBUG_RETURN(my_errno);
{
pagecache_unlock_by_link(share->pagecache, page_link.link,
PAGECACHE_LOCK_WRITE_UNLOCK,
PAGECACHE_UNPIN, LSN_IMPOSSIBLE,
LSN_IMPOSSIBLE);
DBUG_RETURN(1);
}
if (lsn_korr(buff) >= lsn)
{
/* Already applied */
......@@ -5320,8 +5502,9 @@ uint _ma_apply_redo_purge_blocks(MARIA_HA *info,
PAGECACHE_LOCK_WRITE_UNLOCK,
PAGECACHE_UNPIN, LSN_IMPOSSIBLE,
LSN_IMPOSSIBLE);
continue;
}
else
{
buff[PAGE_TYPE_OFFSET]= UNALLOCATED_PAGE;
#ifdef IDENTICAL_PAGES_AFTER_RECOVERY
{
......@@ -5334,22 +5517,164 @@ uint _ma_apply_redo_purge_blocks(MARIA_HA *info,
#endif
lsn_store(buff, lsn);
if (pagecache_write(share->pagecache,
&info->dfile, page+i, 0,
&info->dfile, page, 0,
buff, PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_WRITE_UNLOCK, PAGECACHE_UNPIN,
PAGECACHE_WRITE_DELAY, 0))
DBUG_RETURN(1);
}
/** @todo leave bitmap lock to the bitmap code... */
pthread_mutex_lock(&share->bitmap.bitmap_lock);
res= _ma_reset_full_page_bits(info, &share->bitmap, page, 1);
pthread_mutex_unlock(&share->bitmap.bitmap_lock);
if (res)
DBUG_RETURN(res);
DBUG_RETURN(0);
}
/**
@brief Apply LOGREC_REDO_INSERT_ROW_BLOBS
@param info Maria handler
@param header Header (without FILEID)
@note Write full pages (full head & blob pages)
@return Operation status
@retval 0 OK
@retval !=0 Error
*/
uint _ma_apply_redo_insert_row_blobs(MARIA_HA *info,
LSN lsn, const uchar *header)
{
MARIA_SHARE *share= info->s;
const uchar *data;
uint data_size= FULL_PAGE_SIZE(info->s->block_size);
uint blob_count, ranges;
DBUG_ENTER("_ma_apply_redo_insert_row_blobs");
ranges= pagerange_korr(header);
header+= PAGERANGE_STORE_SIZE;
blob_count= pagerange_korr(header);
header+= PAGERANGE_STORE_SIZE;
DBUG_ASSERT(ranges >= blob_count);
data= (header + ranges * ROW_EXTENT_SIZE +
blob_count * (SUB_RANGE_SIZE + BLOCK_FILLER_SIZE));
while (blob_count--)
{
uint sub_ranges, empty_space;
sub_ranges= uint2korr(header);
header+= SUB_RANGE_SIZE;
empty_space= uint2korr(header);
header+= BLOCK_FILLER_SIZE;
DBUG_ASSERT(sub_ranges <= blob_count + 1 && empty_space < data_size);
while (sub_ranges--)
{
uint i;
uint res;
uint page_range;
ulonglong page, start_page;
uchar *buff;
start_page= page= page_korr(header);
header+= PAGE_STORE_SIZE;
page_range= pagerange_korr(header);
header+= PAGERANGE_STORE_SIZE;
for (i= page_range; i-- > 0 ; page++)
{
MARIA_PINNED_PAGE page_link;
enum pagecache_page_lock unlock_method;
enum pagecache_page_pin unpin_method;
uint length;
if ((page * info->s->block_size) > info->state->data_file_length)
{
/* New page or half written page at end of file */
info->state->data_file_length= page * info->s->block_size;
buff= info->keyread_buff;
info->keyread_buff_used= 1;
make_empty_page(info, buff, BLOB_PAGE);
unlock_method= PAGECACHE_LOCK_LEFT_UNLOCKED;
unpin_method= PAGECACHE_PIN_LEFT_UNPINNED;
}
else
{
if (!(buff= pagecache_read(share->pagecache,
&info->dfile,
page, 0, 0,
PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_WRITE, &page_link.link)))
{
if (my_errno != -1) /* If not read outside of file */
{
pagecache_unlock_by_link(share->pagecache, page_link.link,
PAGECACHE_LOCK_WRITE_UNLOCK,
PAGECACHE_UNPIN, LSN_IMPOSSIBLE,
LSN_IMPOSSIBLE);
DBUG_RETURN(my_errno);
}
/* Physical file was too short; Create new page */
buff= info->keyread_buff;
info->keyread_buff_used= 1;
make_empty_page(info, buff, BLOB_PAGE);
}
else
{
if (lsn_korr(buff) >= lsn)
{
/* Already applied */
pagecache_unlock_by_link(share->pagecache, page_link.link,
PAGECACHE_LOCK_WRITE_UNLOCK,
PAGECACHE_UNPIN, LSN_IMPOSSIBLE,
LSN_IMPOSSIBLE);
continue;
}
}
unlock_method= PAGECACHE_LOCK_WRITE_UNLOCK;
unpin_method= PAGECACHE_UNPIN;
}
lsn_store(buff, lsn);
buff[PAGE_TYPE_OFFSET]= BLOB_PAGE;
length= data_size;
if (i == 0 && sub_ranges == 0)
{
/* Last page may be only partly filled. */
length-= empty_space;
#ifdef IDENTICAL_PAGES_AFTER_RECOVERY
bzero(buff + info->s->block_size - PAGE_SUFFIX_SIZE - empty_space,
empty_space);
#endif
}
memcpy(buff+ PAGE_TYPE_OFFSET + 1, data, length);
data+= length;
if (pagecache_write(share->pagecache,
&info->dfile, page, 0,
buff, PAGECACHE_PLAIN_PAGE,
unlock_method, unpin_method,
PAGECACHE_WRITE_DELAY, 0))
DBUG_RETURN(my_errno);
}
/** @todo leave bitmap lock to the bitmap code... */
pthread_mutex_lock(&share->bitmap.bitmap_lock);
res= _ma_reset_full_page_bits(info, &share->bitmap, page, page_range);
res= _ma_set_full_page_bits(info, &share->bitmap, start_page,
page_range);
pthread_mutex_unlock(&share->bitmap.bitmap_lock);
if (res)
DBUG_RETURN(res);
}
}
DBUG_RETURN(0);
}
/****************************************************************************
Applying of UNDO entries
****************************************************************************/
......@@ -5383,7 +5708,6 @@ my_bool _ma_apply_undo_row_insert(MARIA_HA *info, LSN undo_lsn,
&page_link.link)))
DBUG_RETURN(1);
page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK;
push_dynamic(&info->pinned_pages, (void*) &page_link);
......
......@@ -35,6 +35,8 @@
#define ROW_EXTENT_PAGE_SIZE 5
#define ROW_EXTENT_COUNT_SIZE 2
#define SUB_RANGE_SIZE 2
#define BLOCK_FILLER_SIZE 2
#define ROW_EXTENT_SIZE (ROW_EXTENT_PAGE_SIZE + ROW_EXTENT_COUNT_SIZE)
#define TAIL_BIT 0x8000 /* Bit in page_count to signify tail */
/* Number of extents reserved MARIA_BITMAP_BLOCKS to store head part */
......@@ -139,6 +141,7 @@ my_bool _ma_once_init_block_record(MARIA_SHARE *share, File dfile);
my_bool _ma_once_end_block_record(MARIA_SHARE *share);
my_bool _ma_init_block_record(MARIA_HA *info);
void _ma_end_block_record(MARIA_HA *info);
my_bool _ma_check_if_zero(uchar *pos, uint length);
my_bool _ma_update_block_record(MARIA_HA *info, MARIA_RECORD_POS pos,
const uchar *oldrec, const uchar *newrec);
......@@ -174,6 +177,8 @@ my_bool _ma_bitmap_set(MARIA_HA *info, ulonglong pos, my_bool head,
uint empty_space);
my_bool _ma_reset_full_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap,
ulonglong page, uint page_count);
my_bool _ma_set_full_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap,
ulonglong page, uint page_count);
uint _ma_free_size_to_head_pattern(MARIA_FILE_BITMAP *bitmap, uint size);
my_bool _ma_bitmap_find_new_place(MARIA_HA *info, MARIA_ROW *new_row,
ulonglong page, uint free_size,
......@@ -187,6 +192,11 @@ my_bool _ma_check_if_right_bitmap_type(MARIA_HA *info,
uint *bitmap_pattern);
void _ma_bitmap_delete_all(MARIA_SHARE *share);
int _ma_bitmap_create_first(MARIA_SHARE *share);
#ifndef DBUG_OFF
void _ma_print_bitmap(MARIA_FILE_BITMAP *bitmap, uchar *data,
ulonglong page);
#endif
uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn,
uint page_type,
const uchar *header,
......@@ -195,8 +205,12 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn,
uint _ma_apply_redo_purge_row_head_or_tail(MARIA_HA *info, LSN lsn,
uint page_type,
const uchar *header);
uint _ma_apply_redo_purge_blocks(MARIA_HA *info, LSN lsn,
uint _ma_apply_redo_free_blocks(MARIA_HA *info, LSN lsn,
const uchar *header);
uint _ma_apply_redo_free_head_or_tail(MARIA_HA *info, LSN lsn,
const uchar *header);
uint _ma_apply_redo_insert_row_blobs(MARIA_HA *info,
LSN lsn, const uchar *header);
my_bool _ma_apply_undo_row_insert(MARIA_HA *info, LSN undo_lsn,
const uchar *header);
my_bool _ma_apply_undo_row_delete(MARIA_HA *info, LSN undo_lsn,
......
......@@ -1740,6 +1740,7 @@ static int check_block_record(HA_CHECK *param, MARIA_HA *info, int extend,
switch ((enum en_page_type) page_type) {
case UNALLOCATED_PAGE:
case MAX_PAGE_TYPE:
default:
DBUG_ASSERT(0); /* Impossible */
break;
case HEAD_PAGE:
......@@ -1800,6 +1801,32 @@ static int check_block_record(HA_CHECK *param, MARIA_HA *info, int extend,
goto err;
}
/* Verify that rest of bitmap is zero */
if ((pos / block_size) % info->s->bitmap.pages_covered)
{
/* Not at end of bitmap */
uint bitmap_pattern;
offset_page= (((pos / block_size) % info->s->bitmap.pages_covered) -1) * 3;
offset= offset_page & 7;
data= bitmap_buff + offset_page / 8;
bitmap_pattern= uint2korr(data);
if (((bitmap_pattern >> offset)) ||
(data + 2 < bitmap_buff + info->s->bitmap.total_size &&
_ma_check_if_zero(data+2, bitmap_buff + info->s->bitmap.total_size -
data - 2)))
{
ulonglong bitmap_page;
bitmap_page= pos / block_size / info->s->bitmap.pages_covered;
bitmap_page*= info->s->bitmap.pages_covered;
_ma_check_print_error(param, "Bitmap at %s has pages reserved outside of data file length",
llstr(bitmap_page, llbuff));
DBUG_EXECUTE("bitmap", _ma_print_bitmap(&info->s->bitmap, bitmap_buff,
bitmap_page););
}
}
_ma_scan_end_block_record(info);
if (full_page_count != param->full_page_count)
......
......@@ -32,7 +32,9 @@
#define CONTROL_FILE_MAGIC_STRING "\xfe\xfe\xc\1MACF"
#define CONTROL_FILE_MAGIC_STRING_OFFSET 0
#define CONTROL_FILE_MAGIC_STRING_SIZE (sizeof(CONTROL_FILE_MAGIC_STRING)-1)
#define CONTROL_FILE_CHECKSUM_OFFSET (CONTROL_FILE_MAGIC_STRING_OFFSET + CONTROL_FILE_MAGIC_STRING_SIZE)
#define CONTROL_FILE_UUID_OFFSET (CONTROL_FILE_MAGIC_STRING_OFFSET + CONTROL_FILE_MAGIC_STRING_SIZE)
#define CONTROL_FILE_UUID_SIZE MY_UUID_SIZE
#define CONTROL_FILE_CHECKSUM_OFFSET (CONTROL_FILE_UUID_OFFSET + CONTROL_FILE_UUID_SIZE)
#define CONTROL_FILE_CHECKSUM_SIZE 4
#define CONTROL_FILE_LSN_OFFSET (CONTROL_FILE_CHECKSUM_OFFSET + CONTROL_FILE_CHECKSUM_SIZE)
#define CONTROL_FILE_LSN_SIZE LSN_STORE_SIZE
......@@ -122,6 +124,10 @@ CONTROL_FILE_ERROR ma_control_file_create_or_open()
open_flags, MYF(MY_SYNC_DIR))) < 0)
DBUG_RETURN(CONTROL_FILE_UNKNOWN_ERROR);
/* Create unique uuid for the control file */
my_uuid_init((ulong) &buffer, (ulong) &maria_uuid);
my_uuid(maria_uuid);
/*
To be safer we should make sure that there are no logs or data/index
files around (indeed it could be that the control file alone was deleted
......@@ -190,6 +196,9 @@ CONTROL_FILE_ERROR ma_control_file_create_or_open()
error= CONTROL_FILE_BAD_MAGIC_STRING;
goto err;
}
memcpy(maria_uuid, buffer + CONTROL_FILE_UUID_OFFSET,
CONTROL_FILE_UUID_SIZE);
if (my_checksum(0, buffer + CONTROL_FILE_LSN_OFFSET,
CONTROL_FILE_SIZE - CONTROL_FILE_LSN_OFFSET) !=
uint4korr(buffer + CONTROL_FILE_CHECKSUM_OFFSET))
......@@ -252,6 +261,8 @@ int ma_control_file_write_and_force(const LSN checkpoint_lsn, uint32 logno,
memcpy(buffer + CONTROL_FILE_MAGIC_STRING_OFFSET,
CONTROL_FILE_MAGIC_STRING, CONTROL_FILE_MAGIC_STRING_SIZE);
memcpy(buffer + CONTROL_FILE_UUID_OFFSET, maria_uuid,
CONTROL_FILE_UUID_SIZE);
if (objs_to_write == CONTROL_FILE_UPDATE_ONLY_LSN)
update_checkpoint_lsn= TRUE;
......
......@@ -399,11 +399,18 @@ static LOG_DESC INIT_LOGREC_REDO_PURGE_ROW_TAIL=
NULL, write_hook_for_redo, NULL, 0,
"redo_purge_row_tail", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
static LOG_DESC INIT_LOGREC_REDO_PURGE_BLOCKS=
static LOG_DESC INIT_LOGREC_REDO_FREE_BLOCKS=
{LOGRECTYPE_VARIABLE_LENGTH, 0,
FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE,
NULL, write_hook_for_redo, NULL, 0,
"redo_purge_blocks", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
"redo_free_blocks", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
static LOG_DESC INIT_LOGREC_REDO_FREE_HEAD_OR_TAIL=
{LOGRECTYPE_FIXEDLENGTH,
FILEID_STORE_SIZE + PAGE_STORE_SIZE,
FILEID_STORE_SIZE + PAGE_STORE_SIZE,
NULL, write_hook_for_redo, NULL, 0,
"redo_free_head_or_tail", LOGREC_NOT_LAST_IN_GROUP, NULL, NULL};
/* not yet used; for when we have versioning */
static LOG_DESC INIT_LOGREC_REDO_DELETE_ROW=
......@@ -528,8 +535,10 @@ static void loghandler_init()
INIT_LOGREC_REDO_PURGE_ROW_HEAD;
log_record_type_descriptor[LOGREC_REDO_PURGE_ROW_TAIL]=
INIT_LOGREC_REDO_PURGE_ROW_TAIL;
log_record_type_descriptor[LOGREC_REDO_PURGE_BLOCKS]=
INIT_LOGREC_REDO_PURGE_BLOCKS;
log_record_type_descriptor[LOGREC_REDO_FREE_BLOCKS]=
INIT_LOGREC_REDO_FREE_BLOCKS;
log_record_type_descriptor[LOGREC_REDO_FREE_HEAD_OR_TAIL]=
INIT_LOGREC_REDO_FREE_HEAD_OR_TAIL;
log_record_type_descriptor[LOGREC_REDO_DELETE_ROW]=
INIT_LOGREC_REDO_DELETE_ROW;
log_record_type_descriptor[LOGREC_REDO_UPDATE_ROW_HEAD]=
......
......@@ -103,7 +103,8 @@ enum translog_record_type
LOGREC_REDO_INSERT_ROW_BLOBS,
LOGREC_REDO_PURGE_ROW_HEAD,
LOGREC_REDO_PURGE_ROW_TAIL,
LOGREC_REDO_PURGE_BLOCKS,
LOGREC_REDO_FREE_BLOCKS,
LOGREC_REDO_FREE_HEAD_OR_TAIL,
LOGREC_REDO_DELETE_ROW,
LOGREC_REDO_UPDATE_ROW_HEAD,
LOGREC_REDO_INDEX,
......
......@@ -3338,14 +3338,14 @@ restart:
if (! (block->status & PCBLOCK_CHANGED))
link_to_changed_list(pagecache, block);
if (! (block->status & PCBLOCK_ERROR))
{
if (!(size & 511))
bmove512(block->buffer + offset, buff, size);
else
memcpy(block->buffer + offset, buff, size);
block->status|= PCBLOCK_READ;
}
/* Page is correct again if we made a full write in it */
if (size == pagecache->block_size)
block->status&= ~PCBLOCK_ERROR;
}
if (need_lock_change)
......
......@@ -74,9 +74,11 @@ prototype_redo_exec_hook(REDO_DROP_TABLE);
prototype_redo_exec_hook(FILE_ID);
prototype_redo_exec_hook(REDO_INSERT_ROW_HEAD);
prototype_redo_exec_hook(REDO_INSERT_ROW_TAIL);
prototype_redo_exec_hook(REDO_INSERT_ROW_BLOBS);
prototype_redo_exec_hook(REDO_PURGE_ROW_HEAD);
prototype_redo_exec_hook(REDO_PURGE_ROW_TAIL);
prototype_redo_exec_hook(REDO_PURGE_BLOCKS);
prototype_redo_exec_hook(REDO_FREE_HEAD_OR_TAIL);
prototype_redo_exec_hook(REDO_FREE_BLOCKS);
prototype_redo_exec_hook(REDO_DELETE_ALL);
prototype_redo_exec_hook(UNDO_ROW_INSERT);
prototype_redo_exec_hook(UNDO_ROW_DELETE);
......@@ -1137,6 +1139,33 @@ end:
}
prototype_redo_exec_hook(REDO_INSERT_ROW_BLOBS)
{
int error= 1;
uchar *buff;
MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
if (info == NULL)
return 0;
enlarge_buffer(rec);
if (log_record_buffer.str == NULL ||
translog_read_record(rec->lsn, 0, rec->record_length,
log_record_buffer.str, NULL) !=
rec->record_length)
{
tprint(tracef, "Failed to read record\n");
goto end;
}
buff= log_record_buffer.str;
if (_ma_apply_redo_insert_row_blobs(info, current_group_end_lsn,
buff + FILEID_STORE_SIZE))
goto end;
error= 0;
end:
return error;
}
prototype_redo_exec_hook(REDO_PURGE_ROW_HEAD)
{
int error= 1;
......@@ -1169,7 +1198,7 @@ end:
}
prototype_redo_exec_hook(REDO_PURGE_BLOCKS)
prototype_redo_exec_hook(REDO_FREE_BLOCKS)
{
int error= 1;
uchar *buff;
......@@ -1188,7 +1217,7 @@ prototype_redo_exec_hook(REDO_PURGE_BLOCKS)
}
buff= log_record_buffer.str;
if (_ma_apply_redo_purge_blocks(info, current_group_end_lsn,
if (_ma_apply_redo_free_blocks(info, current_group_end_lsn,
buff + FILEID_STORE_SIZE))
goto end;
error= 0;
......@@ -1197,6 +1226,22 @@ end:
}
prototype_redo_exec_hook(REDO_FREE_HEAD_OR_TAIL)
{
int error= 1;
MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
if (info == NULL)
return 0;
if (_ma_apply_redo_free_head_or_tail(info, current_group_end_lsn,
rec->header + FILEID_STORE_SIZE))
goto end;
error= 0;
end:
return error;
}
prototype_redo_exec_hook(REDO_DELETE_ALL)
{
int error= 1;
......@@ -1556,9 +1601,11 @@ static int run_redo_phase(LSN lsn, my_bool apply)
install_redo_exec_hook(FILE_ID);
install_redo_exec_hook(REDO_INSERT_ROW_HEAD);
install_redo_exec_hook(REDO_INSERT_ROW_TAIL);
install_redo_exec_hook(REDO_INSERT_ROW_BLOBS);
install_redo_exec_hook(REDO_PURGE_ROW_HEAD);
install_redo_exec_hook(REDO_PURGE_ROW_TAIL);
install_redo_exec_hook(REDO_PURGE_BLOCKS);
install_redo_exec_hook(REDO_FREE_HEAD_OR_TAIL);
install_redo_exec_hook(REDO_FREE_BLOCKS);
install_redo_exec_hook(REDO_DELETE_ALL);
install_redo_exec_hook(UNDO_ROW_INSERT);
install_redo_exec_hook(UNDO_ROW_DELETE);
......@@ -1888,7 +1935,7 @@ static MARIA_HA *get_MARIA_HA_from_REDO_record(const
page= page_korr(rec->header + FILEID_STORE_SIZE);
/**
@todo RECOVERY BUG
- for REDO_PURGE_BLOCKS, page is not at this pos
- for REDO_FREE_BLOCKS, page is not at this pos
- for DELETE_ALL, record ends here! buffer overrun!
Solution: caller should pass a param enum { i_am_about_data_file,
i_am_about_index_file, none }.
......
......@@ -33,6 +33,7 @@ static int rec_pointer_size=0, flags[50], testflag;
static int key_field=FIELD_SKIP_PRESPACE,extra_field=FIELD_SKIP_ENDSPACE;
static int key_type=HA_KEYTYPE_NUM;
static int create_flag=0;
static ulong blob_length;
static enum data_file_type record_type= DYNAMIC_RECORD;
static uint insert_count, update_count, remove_count;
......@@ -690,7 +691,8 @@ static struct my_option my_long_options[] =
{"key-binary-pack", 'B', "Undocumented",
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
{"key-blob", 'b', "Undocumented",
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
(uchar**) &blob_length, (uchar**) &blob_length,
0, GET_ULONG, OPT_ARG, 0, 0, 0, 0, 0, 0},
{"key-cache", 'K', "Undocumented", (uchar**) &pagecacheing,
(uchar**) &pagecacheing, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
{"key-length", 'k', "Undocumented", (uchar**) &key_length,
......
......@@ -1005,7 +1005,7 @@ static void get_options(int argc, char **argv)
pack_type= HA_BINARY_PACK_KEY;
break;
case 'b':
use_blob= 1;
use_blob= 1000;
if (*++pos)
use_blob= atol(pos);
break;
......
......@@ -119,6 +119,8 @@ run_tests()
$maria_path/maria_chk$suffix -sm test2
$maria_path/ma_test2$suffix $silent -m10000 -e16384 -E16384 -K -L $row_type
$maria_path/maria_chk$suffix -sm test2
$maria_path/ma_test2$suffix $silent -M -T -c -b65000
$maria_path/maria_chk$suffix -se test2
}
run_repair_tests()
......
......@@ -96,7 +96,7 @@ echo "Testing the REDO PHASE ALONE"
# identical to the saved original.
# Does not test the index file as we don't have logging for it yet.
set -- "ma_test1 $silent -M -T -c" "ma_test2 $silent -L -K -W -P -M -T -c" "ma_test2 $silent -M -T -c -b"
set -- "ma_test1 $silent -M -T -c" "ma_test2 $silent -L -K -W -P -M -T -c" "ma_test2 $silent -M -T -c -b65000"
while [ $# != 0 ]
do
prog=$1
......
......@@ -44,23 +44,23 @@ Differences in maria_chk -dvv, recovery not yet perfect !
---
> Datafile length: 114688 Keyfile length: 8192
========DIFF END=======
TEST WITH ma_test2 -s -M -T -c -b
TEST WITH ma_test2 -s -M -T -c -b65000
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
11c11
< Datafile length: 114688 Keyfile length: 155648
< Datafile length: 2531328 Keyfile length: 155648
---
> Datafile length: 114688 Keyfile length: 8192
> Datafile length: 2531328 Keyfile length: 8192
========DIFF END=======
testing idempotency
applying log
Differences in maria_chk -dvv, recovery not yet perfect !
========DIFF START=======
11c11
< Datafile length: 114688 Keyfile length: 155648
< Datafile length: 2531328 Keyfile length: 155648
---
> Datafile length: 114688 Keyfile length: 8192
> Datafile length: 2531328 Keyfile length: 8192
========DIFF END=======
Testing the REDO AND UNDO PHASE
TEST WITH ma_test1 -s -M -T -c -N --testflag=1 (commit at end)
......@@ -623,7 +623,7 @@ Differences in maria_chk -dvv, recovery not yet perfect !
11c11
< Datafile length: 8192 Keyfile length: 8192
---
> Datafile length: 114688 Keyfile length: 212992
> Datafile length: 155648 Keyfile length: 212992
========DIFF END=======
testing idempotency
applying log
......@@ -636,7 +636,7 @@ Differences in maria_chk -dvv, recovery not yet perfect !
11c11
< Datafile length: 8192 Keyfile length: 8192
---
> Datafile length: 114688 Keyfile length: 212992
> Datafile length: 155648 Keyfile length: 212992
========DIFF END=======
testing applying of CLRs to recreate table
applying log
......@@ -649,7 +649,7 @@ Differences in maria_chk -dvv, recovery not yet perfect !
11c11
< Datafile length: 8192 Keyfile length: 8192
---
> Datafile length: 114688 Keyfile length: 8192
> Datafile length: 155648 Keyfile length: 8192
========DIFF END=======
TEST WITH ma_test1 -s -M -T -c -N -b --testflag=1 (commit at end)
TEST WITH ma_test1 -s -M -T -c -N -b --testflag=2 --test-undo=2 (additional aborted work)
......@@ -770,7 +770,7 @@ Differences in maria_chk -dvv, recovery not yet perfect !
11c11
< Datafile length: 8192 Keyfile length: 8192
---
> Datafile length: 114688 Keyfile length: 212992
> Datafile length: 155648 Keyfile length: 212992
========DIFF END=======
testing idempotency
applying log
......@@ -783,7 +783,7 @@ Differences in maria_chk -dvv, recovery not yet perfect !
11c11
< Datafile length: 8192 Keyfile length: 8192
---
> Datafile length: 114688 Keyfile length: 212992
> Datafile length: 155648 Keyfile length: 212992
========DIFF END=======
testing applying of CLRs to recreate table
applying log
......@@ -796,7 +796,7 @@ Differences in maria_chk -dvv, recovery not yet perfect !
11c11
< Datafile length: 8192 Keyfile length: 8192
---
> Datafile length: 114688 Keyfile length: 8192
> Datafile length: 155648 Keyfile length: 8192
========DIFF END=======
TEST WITH ma_test1 -s -M -T -c -N -b --testflag=1 (commit at end)
TEST WITH ma_test1 -s -M -T -c -N -b --testflag=2 --test-undo=3 (additional aborted work)
......@@ -917,7 +917,7 @@ Differences in maria_chk -dvv, recovery not yet perfect !
11c11
< Datafile length: 8192 Keyfile length: 8192
---
> Datafile length: 114688 Keyfile length: 212992
> Datafile length: 155648 Keyfile length: 212992
========DIFF END=======
testing idempotency
applying log
......@@ -930,7 +930,7 @@ Differences in maria_chk -dvv, recovery not yet perfect !
11c11
< Datafile length: 8192 Keyfile length: 8192
---
> Datafile length: 114688 Keyfile length: 212992
> Datafile length: 155648 Keyfile length: 212992
========DIFF END=======
testing applying of CLRs to recreate table
applying log
......@@ -943,5 +943,5 @@ Differences in maria_chk -dvv, recovery not yet perfect !
11c11
< Datafile length: 8192 Keyfile length: 8192
---
> Datafile length: 114688 Keyfile length: 8192
> Datafile length: 155648 Keyfile length: 8192
========DIFF END=======
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment