Commit e914e469 authored by unknown's avatar unknown

Remove SAFE_MODE for opt_range as it disables UPDATE to use keys

REDO optimization (Bascily avoid moving blocks from/to pagecache)
More command line arguments to maria_read_log
Fixed recovery bug when recreating table


sql/opt_range.cc:
  Remove SAFE_MODE for opt_range as it disables UPDATE to use keys
storage/maria/ma_blockrec.c:
  REDO optimization
  Use new interface for pagecache_reads to avoid copying page buffers
storage/maria/ma_loghandler.c:
  Patch from Sanja:
  - Added new parameter to translog_get_page to use direct links to pagecache
  - Changed scanner to be able to use direct links
  
  This avoids a lot of calls to bmove512() in page cache.
storage/maria/ma_loghandler.h:
  Added direct link to pagecache objects
storage/maria/ma_open.c:
  Added const to parameter
  Added missing braces
storage/maria/ma_pagecache.c:
  From Sanja:
  - Added direct links to pagecache (from pagecache_read())
    Dirrect link means that on pagecache_read we get back a pointer to the pagecache buffer
  
  
  From Monty:
  - Fixed arguments to init_page_cache to handle big page caches
  - Fixed compiler warnings
  - Replaced PAGECACHE_PAGE_LINK with PAGECACHE_BLOCK_LINK * to catch errors
storage/maria/ma_pagecache.h:
  Changed block numbers from int to long to be able to handle big page caches
  Changed some PAGECACHE_PAGE_LINK to PAGECACHE_BLOCK_LINK
storage/maria/ma_recovery.c:
  Fixed recovery bug when recreating table (table was kept open)
  Moved some variables to function start (portability)
  Added space to some print messages
storage/maria/maria_chk.c:
  key_buffer_size -> page_buffer_size
storage/maria/maria_def.h:
  Changed default page_buffer_size to 10M
storage/maria/maria_read_log.c:
  Added more startup options:
  --version
  --undo (apply undo)
  --page_cache_size (to run with big cache sizes)
  --silent (to not get any output from --apply)
storage/maria/unittest/ma_control_file-t.c:
  Fixed compiler warning
storage/maria/unittest/ma_test_loghandler-t.c:
  Added new argument to translog_init_scanner()
storage/maria/unittest/ma_test_loghandler_multigroup-t.c:
  Added new argument to translog_init_scanner()
storage/maria/unittest/ma_test_loghandler_multithread-t.c:
  Added new argument to translog_init_scanner()
parent 4c90a51d
......@@ -2130,9 +2130,6 @@ int SQL_SELECT::test_quick_select(THD *thd, key_map keys_to_use,
quick=0;
needed_reg.clear_all();
quick_keys.clear_all();
if ((specialflag & SPECIAL_SAFE_MODE) && ! force_quick_range ||
!limit)
DBUG_RETURN(0); /* purecov: inspected */
if (keys_to_use.is_clear_all())
DBUG_RETURN(0);
records= head->file->stats.records;
......
......@@ -4609,6 +4609,9 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn,
uint block_size= share->block_size;
uint rec_offset;
uchar *buff= info->keyread_buff, *dir;
MARIA_PINNED_PAGE page_link;
enum pagecache_page_lock unlock_method;
enum pagecache_page_pin unpin_method;
DBUG_ENTER("_ma_apply_redo_insert_row_head_or_tail");
info->keyread_buff_used= 1;
......@@ -4635,26 +4638,31 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn,
empty_space= (block_size - PAGE_OVERHEAD_SIZE);
rec_offset= PAGE_HEADER_SIZE;
dir= buff+ block_size - PAGE_SUFFIX_SIZE - DIR_ENTRY_SIZE;
unlock_method= PAGECACHE_LOCK_LEFT_UNLOCKED;
unpin_method= PAGECACHE_PIN_LEFT_UNPINNED;
}
else
{
uint max_entry;
if (!(buff= pagecache_read(share->pagecache,
&info->dfile,
page, 0,
buff, PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED, 0)))
if (!(buff= pagecache_read(share->pagecache, &info->dfile,
page, 0, 0,
PAGECACHE_PLAIN_PAGE, PAGECACHE_LOCK_WRITE,
&page_link.link)))
DBUG_RETURN(my_errno);
if (lsn_korr(buff) >= lsn)
if (lsn_korr(buff) >= lsn) /* Test if already applied */
{
/* Already applied */
pagecache_unlock_by_link(share->pagecache, page_link.link,
PAGECACHE_LOCK_WRITE_UNLOCK,
PAGECACHE_UNPIN, LSN_IMPOSSIBLE,
LSN_IMPOSSIBLE);
/* Fix bitmap, just in case */
empty_space= uint2korr(buff + EMPTY_SPACE_OFFSET);
if (_ma_bitmap_set(info, page, page_type == HEAD_PAGE, empty_space))
DBUG_RETURN(my_errno);
DBUG_RETURN(0);
}
unlock_method= PAGECACHE_LOCK_WRITE_UNLOCK;
unpin_method= PAGECACHE_UNPIN;
max_entry= (uint) ((uchar*) buff)[DIR_COUNT_OFFSET];
if (((buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) != page_type))
......@@ -4725,8 +4733,7 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn,
if (pagecache_write(share->pagecache,
&info->dfile, page, 0,
buff, PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED,
PAGECACHE_PIN_LEFT_UNPINNED,
unlock_method, unpin_method,
PAGECACHE_WRITE_DELAY, 0))
DBUG_RETURN(my_errno);
......@@ -4747,6 +4754,11 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn,
DBUG_RETURN(0);
err:
if (unlock_method == PAGECACHE_LOCK_WRITE_UNLOCK)
pagecache_unlock_by_link(share->pagecache, page_link.link,
PAGECACHE_LOCK_WRITE_UNLOCK,
PAGECACHE_UNPIN, LSN_IMPOSSIBLE,
LSN_IMPOSSIBLE);
DBUG_RETURN(HA_ERR_WRONG_IN_RECORD);
}
......@@ -4778,6 +4790,8 @@ uint _ma_apply_redo_purge_row_head_or_tail(MARIA_HA *info, LSN lsn,
uint rownr, empty_space;
uint block_size= share->block_size;
uchar *buff= info->keyread_buff;
int result;
MARIA_PINNED_PAGE page_link;
DBUG_ENTER("_ma_apply_redo_purge_row_head_or_tail");
page= page_korr(header);
......@@ -4788,11 +4802,10 @@ uint _ma_apply_redo_purge_row_head_or_tail(MARIA_HA *info, LSN lsn,
info->keyread_buff_used= 1;
if (!(buff= pagecache_read(share->pagecache,
&info->dfile,
page, 0,
buff, PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED, 0)))
if (!(buff= pagecache_read(share->pagecache, &info->dfile,
page, 0, 0,
PAGECACHE_PLAIN_PAGE, PAGECACHE_LOCK_WRITE,
&page_link.link)))
DBUG_RETURN(my_errno);
if (lsn_korr(buff) >= lsn)
......@@ -4802,6 +4815,11 @@ uint _ma_apply_redo_purge_row_head_or_tail(MARIA_HA *info, LSN lsn,
Note that in case the page is not anymore a head or tail page
a future redo will fix the bitmap.
*/
pagecache_unlock_by_link(share->pagecache, page_link.link,
PAGECACHE_LOCK_WRITE_UNLOCK,
PAGECACHE_UNPIN, LSN_IMPOSSIBLE,
LSN_IMPOSSIBLE);
if ((buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) == page_type)
{
empty_space= uint2korr(buff+EMPTY_SPACE_OFFSET);
......@@ -4815,22 +4833,30 @@ uint _ma_apply_redo_purge_row_head_or_tail(MARIA_HA *info, LSN lsn,
DBUG_ASSERT((buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) == (uchar) page_type);
if (delete_dir_entry(buff, block_size, rownr, &empty_space) < 0)
DBUG_RETURN(HA_ERR_WRONG_IN_RECORD);
goto err;
lsn_store(buff, lsn);
result= 0;
if (pagecache_write(share->pagecache,
&info->dfile, page, 0,
buff, PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED,
PAGECACHE_PIN_LEFT_UNPINNED,
PAGECACHE_LOCK_WRITE_UNLOCK, PAGECACHE_UNPIN,
PAGECACHE_WRITE_DELAY, 0))
DBUG_RETURN(my_errno);
result= my_errno;
/* This will work even if the page was marked as UNALLOCATED_PAGE */
if (_ma_bitmap_set(info, page, page_type == HEAD_PAGE, empty_space))
DBUG_RETURN(my_errno);
result= my_errno;
DBUG_RETURN(result);
err:
pagecache_unlock_by_link(share->pagecache, page_link.link,
PAGECACHE_LOCK_WRITE_UNLOCK,
PAGECACHE_UNPIN, LSN_IMPOSSIBLE,
LSN_IMPOSSIBLE);
DBUG_RETURN(HA_ERR_WRONG_IN_RECORD);
DBUG_RETURN(0);
}
......@@ -4872,16 +4898,21 @@ uint _ma_apply_redo_purge_blocks(MARIA_HA *info,
for (i= 0; i < page_range ; i++)
{
MARIA_PINNED_PAGE page_link;
if (!(buff= pagecache_read(share->pagecache,
&info->dfile,
page+i, 0,
buff, PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED, 0)))
PAGECACHE_LOCK_WRITE, &page_link.link)))
DBUG_RETURN(my_errno);
if (lsn_korr(buff) >= lsn)
{
/* Already applied */
pagecache_unlock_by_link(share->pagecache, page_link.link,
PAGECACHE_LOCK_WRITE_UNLOCK,
PAGECACHE_UNPIN, LSN_IMPOSSIBLE,
LSN_IMPOSSIBLE);
continue;
}
buff[PAGE_TYPE_OFFSET]= UNALLOCATED_PAGE;
......@@ -4889,8 +4920,7 @@ uint _ma_apply_redo_purge_blocks(MARIA_HA *info,
if (pagecache_write(share->pagecache,
&info->dfile, page+i, 0,
buff, PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED,
PAGECACHE_PIN_LEFT_UNPINNED,
PAGECACHE_LOCK_WRITE_UNLOCK, PAGECACHE_UNPIN,
PAGECACHE_WRITE_DELAY, 0))
DBUG_RETURN(my_errno);
}
......
......@@ -2296,21 +2296,21 @@ my_bool translog_unlock()
}
/*
Get log page by file number and offset of the beginning of the page
/**
@brief Get log page by file number and offset of the beginning of the page
SYNOPSIS
translog_get_page()
data validator data, which contains the page address
buffer buffer for page placing
@param data validator data, which contains the page address
@param buffer buffer for page placing
(might not be used in some cache implementations)
@param direct_link if it is not NULL then caller can accept direct
link to the page cache
RETURN
NULL - Error
# pointer to the page cache which should be used to read this page
@retval NULL Error
@retval # pointer to the page cache which should be used to read this page
*/
static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer)
static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer,
PAGECACHE_PAGE_LINK *direct_link)
{
TRANSLOG_ADDRESS addr= *(data->addr), in_buffers;
uint cache_index;
......@@ -2324,6 +2324,9 @@ static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer)
/* it is really page address */
DBUG_ASSERT(LSN_OFFSET(addr) % TRANSLOG_PAGE_SIZE == 0);
if (direct_link)
*direct_link= NULL;
in_buffers= translog_only_in_buffers();
DBUG_PRINT("info", ("in_buffers: (%lu,0x%lx)",
LSN_IN_PARTS(in_buffers)));
......@@ -2336,7 +2339,9 @@ static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer)
if (cmp_translog_addr(addr, in_buffers) >= 0)
{
uint16 buffer_no= log_descriptor.bc.buffer_no;
#ifndef DBUG_OFF
uint16 buffer_start= buffer_no;
#endif
struct st_translog_buffer *buffer_unlock= log_descriptor.bc.buffer;
struct st_translog_buffer *curr_buffer= log_descriptor.bc.buffer;
for (;;)
......@@ -2437,13 +2442,23 @@ static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer)
}
file.file= log_descriptor.log_file_num[cache_index];
buffer= (uchar*)
buffer=
(uchar*) (direct_link ?
pagecache_valid_read(log_descriptor.pagecache, &file,
LSN_OFFSET(addr) / TRANSLOG_PAGE_SIZE,
3, NULL,
PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_READ, direct_link,
&translog_page_validator, (uchar*) data) :
pagecache_valid_read(log_descriptor.pagecache, &file,
LSN_OFFSET(addr) / TRANSLOG_PAGE_SIZE,
3, (char*) buffer,
PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED, 0,
&translog_page_validator, (uchar*) data);
PAGECACHE_LOCK_LEFT_UNLOCKED, direct_link,
&translog_page_validator, (uchar*) data));
DBUG_PRINT("info", ("Direct link is assigned to : 0x%lx * 0x%lx",
(ulong) direct_link,
(ulong)(direct_link ? *direct_link : NULL)));
}
else
{
......@@ -2468,6 +2483,24 @@ static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer)
DBUG_RETURN(buffer);
}
/**
@brief free direct log page link
@param direct_link the direct log page link to be freed
*/
static void translog_free_link(PAGECACHE_PAGE_LINK *direct_link)
{
DBUG_ENTER("translog_free_link");
DBUG_PRINT("info", ("Direct link: 0x%lx",
(ulong) direct_link));
if (direct_link)
pagecache_unlock_by_link(log_descriptor.pagecache, direct_link,
PAGECACHE_LOCK_READ_UNLOCK, PAGECACHE_UNPIN,
LSN_IMPOSSIBLE, LSN_IMPOSSIBLE);
DBUG_VOID_RETURN;
}
/*
Finds last page of the given log file
......@@ -2796,7 +2829,7 @@ my_bool translog_init(const char *directory,
TRANSLOG_VALIDATOR_DATA data;
uchar buffer[TRANSLOG_PAGE_SIZE], *page;
data.addr= &current_page;
if ((page= translog_get_page(&data, buffer)) == NULL)
if ((page= translog_get_page(&data, buffer, NULL)) == NULL)
DBUG_RETURN(1);
if (data.was_recovered)
{
......@@ -2848,7 +2881,7 @@ my_bool translog_init(const char *directory,
/* continue old log */
DBUG_ASSERT(LSN_FILE_NO(last_valid_page)==
LSN_FILE_NO(log_descriptor.horizon));
if ((page= translog_get_page(&data, buffer)) == NULL ||
if ((page= translog_get_page(&data, buffer, NULL)) == NULL ||
(chunk_offset= translog_get_first_chunk_offset(page)) == 0)
DBUG_RETURN(1);
......@@ -5153,24 +5186,54 @@ static my_bool translog_scanner_set_last_page(TRANSLOG_SCANNER_DATA
}
/*
Initialize reader scanner
/**
@brief Get page from page cache according to requested method
SYNOPSIS
translog_init_scanner()
lsn LSN with which it have to be inited
fixed_horizon true if it is OK do not read records which was written
@param scanner The scanner data
@return operation status
@retval 0 OK
@retval 1 Error
*/
static my_bool
translog_scanner_get_page(TRANSLOG_SCANNER_DATA *scanner)
{
TRANSLOG_VALIDATOR_DATA data;
DBUG_ENTER("translog_scanner_get_page");
data.addr= &scanner->page_addr;
data.was_recovered= 0;
DBUG_RETURN((scanner->page=
translog_get_page(&data, scanner->buffer,
(scanner->use_direct_link ?
&scanner->direct_link :
NULL))) ==
NULL);
}
/**
@brief Initialize reader scanner.
@param lsn LSN with which it have to be inited
@param fixed_horizon true if it is OK do not read records which was written
after scanning beginning
scanner scanner which have to be inited
@param scanner scanner which have to be inited
@param use_direct prefer using direct lings from page handler
where it is possible.
RETURN
0 OK
1 Error
@note If direct link was used translog_destroy_scanner should be
called after it using
@return status of the operation
@retval 0 OK
@retval 1 Error
*/
my_bool translog_init_scanner(LSN lsn,
my_bool fixed_horizon,
struct st_translog_scanner_data *scanner)
TRANSLOG_SCANNER_DATA *scanner,
my_bool use_direct)
{
TRANSLOG_VALIDATOR_DATA data;
DBUG_ENTER("translog_init_scanner");
......@@ -5184,6 +5247,8 @@ my_bool translog_init_scanner(LSN lsn,
scanner->page_offset= LSN_OFFSET(lsn) % TRANSLOG_PAGE_SIZE;
scanner->fixed_horizon= fixed_horizon;
scanner->use_direct_link= use_direct;
scanner->direct_link= NULL;
scanner->horizon= translog_get_horizon();
DBUG_PRINT("info", ("horizon: (0x%lu,0x%lx)",
......@@ -5198,12 +5263,24 @@ my_bool translog_init_scanner(LSN lsn,
if (translog_scanner_set_last_page(scanner))
DBUG_RETURN(1);
if ((scanner->page= translog_get_page(&data, scanner->buffer)) == NULL)
if (translog_scanner_get_page(scanner))
DBUG_RETURN(1);
DBUG_RETURN(0);
}
/**
@brief Destroy scanner object;
@param scanner The scanner object to destroy
*/
void translog_destroy_scanner(TRANSLOG_SCANNER_DATA *scanner)
{
translog_free_link(scanner->direct_link);
}
/*
Checks End of the Log
......@@ -5298,7 +5375,6 @@ static my_bool translog_scanner_eof(TRANSLOG_SCANNER_DATA *scanner)
scanner->last_file_page);
}
/*
Move scanner to the next chunk
......@@ -5315,7 +5391,6 @@ static my_bool
translog_get_next_chunk(TRANSLOG_SCANNER_DATA *scanner)
{
uint16 len;
TRANSLOG_VALIDATOR_DATA data;
DBUG_ENTER("translog_get_next_chunk");
if ((len= translog_get_total_chunk_length(scanner->page,
......@@ -5331,6 +5406,8 @@ translog_get_next_chunk(TRANSLOG_SCANNER_DATA *scanner)
}
if (translog_scanner_eop(scanner))
{
/* before reading next page we should unpin current one if it was pinned */
translog_free_link(scanner->direct_link);
if (translog_scanner_eof(scanner))
{
DBUG_PRINT("info", ("horizon: (%lu,0x%lx) pageaddr: (%lu,0x%lx)",
......@@ -5350,9 +5427,7 @@ translog_get_next_chunk(TRANSLOG_SCANNER_DATA *scanner)
scanner->page_addr+= TRANSLOG_PAGE_SIZE; /* offset increased */
}
data.addr= &scanner->page_addr;
data.was_recovered= 0;
if ((scanner->page= translog_get_page(&data, scanner->buffer)) == NULL)
if (translog_scanner_get_page(scanner))
DBUG_RETURN(1);
scanner->page_offset= translog_get_first_chunk_offset(scanner->page);
......@@ -5482,7 +5557,7 @@ translog_variable_length_header(uchar *page, translog_size_t page_offset,
{
DBUG_PRINT("info", ("use internal scanner for header reading"));
scanner= &internal_scanner;
if (translog_init_scanner(buff->lsn, 1, scanner))
if (translog_init_scanner(buff->lsn, 1, scanner, 0))
DBUG_RETURN(RECHEADER_READ_ERROR);
}
if (translog_get_next_chunk(scanner))
......@@ -5502,13 +5577,15 @@ translog_variable_length_header(uchar *page, translog_size_t page_offset,
}
base_lsn= buff->groups[0].addr;
translog_init_scanner(base_lsn, 1, scanner);
translog_init_scanner(base_lsn, 1, scanner, scanner == &internal_scanner);
/* first group chunk is always chunk type 2 */
page= scanner->page;
page_offset= scanner->page_offset;
src= page + page_offset + 1;
page_rest= TRANSLOG_PAGE_SIZE - (src - page);
body_len= page_rest;
if (scanner == &internal_scanner)
translog_destroy_scanner(scanner);
}
if (lsns)
{
......@@ -5615,6 +5692,7 @@ int translog_read_record_header(LSN lsn, TRANSLOG_HEADER_BUFFER *buff)
{
uchar buffer[TRANSLOG_PAGE_SIZE], *page;
translog_size_t res, page_offset= LSN_OFFSET(lsn) % TRANSLOG_PAGE_SIZE;
PAGECACHE_PAGE_LINK direct_link;
TRANSLOG_ADDRESS addr;
TRANSLOG_VALIDATOR_DATA data;
DBUG_ENTER("translog_read_record_header");
......@@ -5628,8 +5706,10 @@ int translog_read_record_header(LSN lsn, TRANSLOG_HEADER_BUFFER *buff)
data.was_recovered= 0;
addr= lsn;
addr-= page_offset; /* offset decreasing */
res= (!(page= translog_get_page(&data, buffer))) ? RECHEADER_READ_ERROR :
res= (!(page= translog_get_page(&data, buffer, &direct_link))) ?
RECHEADER_READ_ERROR :
translog_read_record_header_from_buffer(page, page_offset, buff, 0);
translog_free_link(direct_link);
DBUG_RETURN(res);
}
......@@ -5774,8 +5854,9 @@ static my_bool translog_record_read_next_chunk(struct st_translog_reader_data
data->current_group++;
data->current_chunk= 0;
DBUG_PRINT("info", ("skip to group: #%u", data->current_group));
translog_destroy_scanner(&data->scanner);
translog_init_scanner(data->header.groups[data->current_group].addr,
1, &data->scanner);
1, &data->scanner, 1);
}
else
{
......@@ -5794,7 +5875,8 @@ static my_bool translog_record_read_next_chunk(struct st_translog_reader_data
DBUG_ASSERT(data->header.groups_no - 1 == data->current_group);
DBUG_ASSERT(data->header.lsn ==
data->scanner.page_addr + data->scanner.page_offset);
translog_init_scanner(data->header.chunk0_data_addr, 1, &data->scanner);
translog_destroy_scanner(&data->scanner);
translog_init_scanner(data->header.chunk0_data_addr, 1, &data->scanner, 1);
data->chunk_size= data->header.chunk0_data_len;
data->body_offset= data->scanner.page_offset;
data->current_offset= new_current_offset;
......@@ -5844,7 +5926,7 @@ static my_bool translog_init_reader_data(LSN lsn,
{
int read_header;
DBUG_ENTER("translog_init_reader_data");
if (translog_init_scanner(lsn, 1, &data->scanner) ||
if (translog_init_scanner(lsn, 1, &data->scanner, 1) ||
((read_header=
translog_read_record_header_scan(&data->scanner, &data->header, 1))
== RECHEADER_READ_ERROR))
......@@ -5865,6 +5947,16 @@ static my_bool translog_init_reader_data(LSN lsn,
}
/**
@brief Destroy reader data object
*/
static void translog_destroy_reader_data(struct st_translog_reader_data *data)
{
translog_destroy_scanner(&data->scanner);
}
/*
Read a part of the record.
......@@ -5924,7 +6016,10 @@ translog_size_t translog_read_record(LSN lsn,
memcpy(buffer, data->header.header + offset, len);
length-= len;
if (length == 0)
{
translog_destroy_reader_data(data);
DBUG_RETURN(requested_length);
}
offset+= len;
buffer+= len;
DBUG_PRINT("info",
......@@ -5952,7 +6047,10 @@ translog_size_t translog_read_record(LSN lsn,
(offset - data->current_offset), len);
length-= len;
if (length == 0)
{
translog_destroy_reader_data(data);
DBUG_RETURN(requested_length);
}
offset+= len;
buffer+= len;
DBUG_PRINT("info",
......@@ -5961,8 +6059,11 @@ translog_size_t translog_read_record(LSN lsn,
(ulong) length));
}
if (translog_record_read_next_chunk(data))
{
translog_destroy_reader_data(data);
DBUG_RETURN(requested_length - length);
}
}
}
......@@ -6624,6 +6725,7 @@ LSN translog_next_LSN(TRANSLOG_ADDRESS addr, TRANSLOG_ADDRESS horizon)
{
uint chunk_type;
TRANSLOG_SCANNER_DATA scanner;
LSN result;
DBUG_ENTER("translog_next_LSN");
if (horizon == LSN_IMPOSSIBLE)
......@@ -6632,7 +6734,7 @@ LSN translog_next_LSN(TRANSLOG_ADDRESS addr, TRANSLOG_ADDRESS horizon)
if (addr == horizon)
DBUG_RETURN(LSN_IMPOSSIBLE);
translog_init_scanner(addr, 0, &scanner);
translog_init_scanner(addr, 0, &scanner, 1);
chunk_type= scanner.page[scanner.page_offset] & TRANSLOG_CHUNK_TYPE;
DBUG_PRINT("info", ("type: %x byte: %x", (uint) chunk_type,
......@@ -6647,9 +6749,13 @@ LSN translog_next_LSN(TRANSLOG_ADDRESS addr, TRANSLOG_ADDRESS horizon)
DBUG_PRINT("info", ("type: %x byte: %x", (uint) chunk_type,
(uint) scanner.page[scanner.page_offset]));
}
if (scanner.page[scanner.page_offset] == 0)
DBUG_RETURN(LSN_IMPOSSIBLE); /* reached page filler */
DBUG_RETURN(scanner.page_addr + scanner.page_offset);
result= LSN_IMPOSSIBLE; /* reached page filler */
else
result= scanner.page_addr + scanner.page_offset;
translog_destroy_scanner(&scanner);
DBUG_RETURN(result);
}
/**
......@@ -6681,7 +6787,7 @@ LSN translog_first_lsn_in_log()
data.addr= &addr;
{
uchar buffer[TRANSLOG_PAGE_SIZE];
if ((page= translog_get_page(&data, buffer)) == NULL ||
if ((page= translog_get_page(&data, buffer, NULL)) == NULL ||
(chunk_offset= translog_get_first_chunk_offset(page)) == 0)
DBUG_RETURN(LSN_ERROR);
}
......@@ -6719,7 +6825,7 @@ LSN translog_first_theoretical_lsn()
addr= MAKE_LSN(1, TRANSLOG_PAGE_SIZE); /* the first page of the file */
data.addr= &addr;
if ((page= translog_get_page(&data, buffer)) == NULL)
if ((page= translog_get_page(&data, buffer, NULL)) == NULL)
DBUG_RETURN(LSN_ERROR);
DBUG_RETURN(MAKE_LSN(1, TRANSLOG_PAGE_SIZE +
......
......@@ -182,10 +182,14 @@ typedef struct st_translog_scanner_data
TRANSLOG_ADDRESS horizon;
TRANSLOG_ADDRESS last_file_page; /* Last page on in this file */
uchar *page; /* page content pointer */
/* direct link on the current page or NULL if it is not supported/requested */
PAGECACHE_PAGE_LINK direct_link;
/* offset of the chunk in the page */
translog_size_t page_offset;
/* set horizon only once at init */
my_bool fixed_horizon;
/* try to get direct link on the page if it is possible */
my_bool use_direct_link;
} TRANSLOG_SCANNER_DATA;
......@@ -245,7 +249,9 @@ extern my_bool translog_flush(LSN lsn);
extern my_bool translog_init_scanner(LSN lsn,
my_bool fixed_horizon,
struct st_translog_scanner_data *scanner);
struct st_translog_scanner_data *scanner,
my_bool use_direct_link);
extern void translog_destroy_scanner(TRANSLOG_SCANNER_DATA *scanner);
extern int translog_read_next_record_header(TRANSLOG_SCANNER_DATA *scanner,
TRANSLOG_HEADER_BUFFER *buff);
......
......@@ -58,7 +58,7 @@ if (pos > end_pos) \
** In MySQL the server will handle version issues.
******************************************************************************/
MARIA_HA *_ma_test_if_reopen(char *filename)
MARIA_HA *_ma_test_if_reopen(const char *filename)
{
LIST *pos;
......@@ -1001,7 +1001,9 @@ uint _ma_state_info_write(MARIA_SHARE *share, uint pWrite)
if (pWrite & 4)
pthread_mutex_lock(&share->intern_lock);
else if (maria_multi_threaded)
{
safe_mutex_assert_owner(&share->intern_lock);
}
if (share->base.born_transactional && translog_inited &&
!maria_in_recovery)
{
......
......@@ -96,7 +96,7 @@
#define PCBLOCK_INFO(B) \
DBUG_PRINT("info", \
("block: 0x%lx file: %lu page: %lu s: %0x hshL: 0x%lx req: %u/%u " \
"wrlocks: %u", \
"wrlocks: %u pins: %u", \
(ulong)(B), \
(ulong)((B)->hash_link ? \
(B)->hash_link->file.file : \
......@@ -110,7 +110,8 @@
(uint)((B)->hash_link ? \
(B)->hash_link->requests : \
0), \
block->wlocks))
block->wlocks, \
(uint)(B)->pins))
/* TODO: put it to my_static.c */
my_bool my_disable_flush_pagecache_blocks= 0;
......@@ -457,8 +458,10 @@ static my_bool info_check_lock(PAGECACHE_BLOCK_LINK *block,
#define FLUSH_CACHE 2000 /* sort this many blocks at once */
static void free_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block);
#ifndef DBUG_OFF
static void test_key_cache(PAGECACHE *pagecache,
const char *where, my_bool lock);
#endif
#define PAGECACHE_HASH(p, f, pos) (((ulong) (pos) + \
(ulong) (f).file) & (p->hash_entries-1))
......@@ -655,11 +658,11 @@ static inline uint next_power(uint value)
*/
int init_pagecache(PAGECACHE *pagecache, size_t use_mem,
ulong init_pagecache(PAGECACHE *pagecache, size_t use_mem,
uint division_limit, uint age_threshold,
uint block_size)
{
uint blocks, hash_links, length;
ulong blocks, hash_links, length;
int error;
DBUG_ENTER("init_pagecache");
DBUG_ASSERT(block_size >= 512);
......@@ -689,7 +692,7 @@ int init_pagecache(PAGECACHE *pagecache, size_t use_mem,
block_size));
DBUG_ASSERT(((uint)(1 << pagecache->shift)) == block_size);
blocks= (int) (use_mem / (sizeof(PAGECACHE_BLOCK_LINK) +
blocks= (ulong) (use_mem / (sizeof(PAGECACHE_BLOCK_LINK) +
2 * sizeof(PAGECACHE_HASH_LINK) +
sizeof(PAGECACHE_HASH_LINK*) *
5/4 + block_size));
......@@ -714,7 +717,7 @@ int init_pagecache(PAGECACHE *pagecache, size_t use_mem,
ALIGN_SIZE(hash_links * sizeof(PAGECACHE_HASH_LINK)) +
ALIGN_SIZE(sizeof(PAGECACHE_HASH_LINK*) *
pagecache->hash_entries))) +
(((ulong) blocks) << pagecache->shift) > use_mem)
(blocks << pagecache->shift) > use_mem)
blocks--;
/* Allocate memory for cache page buffers */
if ((pagecache->block_mem=
......@@ -726,8 +729,7 @@ int init_pagecache(PAGECACHE *pagecache, size_t use_mem,
For each block 2 hash links are allocated
*/
if ((pagecache->block_root=
(PAGECACHE_BLOCK_LINK*) my_malloc((uint) length,
MYF(0))))
(PAGECACHE_BLOCK_LINK*) my_malloc((size_t) length, MYF(0))))
break;
my_large_free(pagecache->block_mem, MYF(0));
pagecache->block_mem= 0;
......@@ -739,8 +741,8 @@ int init_pagecache(PAGECACHE *pagecache, size_t use_mem,
}
blocks= blocks / 4*3;
}
pagecache->blocks_unused= (ulong) blocks;
pagecache->disk_blocks= (int) blocks;
pagecache->blocks_unused= blocks;
pagecache->disk_blocks= (long) blocks;
pagecache->hash_links= hash_links;
pagecache->hash_root=
(PAGECACHE_HASH_LINK**) ((char*) pagecache->block_root +
......@@ -782,8 +784,8 @@ int init_pagecache(PAGECACHE *pagecache, size_t use_mem,
pagecache->waiting_for_hash_link.last_thread= NULL;
pagecache->waiting_for_block.last_thread= NULL;
DBUG_PRINT("exit",
("disk_blocks: %d block_root: 0x%lx hash_entries: %d\
hash_root: 0x%lx hash_links: %d hash_link_root: 0x%lx",
("disk_blocks: %ld block_root: 0x%lx hash_entries: %ld\
hash_root: 0x%lx hash_links: %ld hash_link_root: 0x%lx",
pagecache->disk_blocks, (long) pagecache->block_root,
pagecache->hash_entries, (long) pagecache->hash_root,
pagecache->hash_links, (long) pagecache->hash_link_root));
......@@ -796,7 +798,7 @@ int init_pagecache(PAGECACHE *pagecache, size_t use_mem,
}
pagecache->blocks= pagecache->disk_blocks > 0 ? pagecache->disk_blocks : 0;
DBUG_RETURN((int) pagecache->disk_blocks);
DBUG_RETURN((ulong) pagecache->disk_blocks);
err:
error= my_errno;
......@@ -887,11 +889,11 @@ static int flush_all_key_blocks(PAGECACHE *pagecache)
So we disable it for now.
*/
#if NOT_USED /* keep disabled until code is fixed see above !! */
int resize_pagecache(PAGECACHE *pagecache,
ulong resize_pagecache(PAGECACHE *pagecache,
size_t use_mem, uint division_limit,
uint age_threshold)
{
int blocks;
ulong blocks;
#ifdef THREAD
struct st_my_thread_var *thread;
WQUEUE *wqueue;
......@@ -1282,8 +1284,10 @@ static void unlink_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block)
DBUG_ENTER("unlink_block");
DBUG_PRINT("unlink_block", ("unlink 0x%lx", (ulong)block));
if (block->next_used == block)
{
/* The list contains only one member */
pagecache->used_last= pagecache->used_ins= NULL;
}
else
{
block->next_used->prev_used= block->prev_used;
......@@ -2661,13 +2665,12 @@ void pagecache_unpin(PAGECACHE *pagecache,
*/
void pagecache_unlock_by_link(PAGECACHE *pagecache,
PAGECACHE_PAGE_LINK *link,
PAGECACHE_BLOCK_LINK *block,
enum pagecache_page_lock lock,
enum pagecache_page_pin pin,
LSN first_REDO_LSN_for_page,
LSN lsn)
{
PAGECACHE_BLOCK_LINK *block= (PAGECACHE_BLOCK_LINK *)link;
DBUG_ENTER("pagecache_unlock_by_link");
DBUG_PRINT("enter", ("block: 0x%lx fd: %u page: %lu %s %s",
(ulong) block,
......@@ -2820,8 +2823,9 @@ void pagecache_unpin_by_link(PAGECACHE *pagecache,
Pin will be chosen according to lock parameter (see lock_to_pin)
*/
static enum pagecache_page_pin lock_to_pin[]=
static enum pagecache_page_pin lock_to_pin[2][8]=
{
{
PAGECACHE_PIN_LEFT_UNPINNED /*PAGECACHE_LOCK_LEFT_UNLOCKED*/,
PAGECACHE_PIN_LEFT_UNPINNED /*PAGECACHE_LOCK_LEFT_READLOCKED*/,
PAGECACHE_PIN_LEFT_PINNED /*PAGECACHE_LOCK_LEFT_WRITELOCKED*/,
......@@ -2830,6 +2834,17 @@ static enum pagecache_page_pin lock_to_pin[]=
PAGECACHE_PIN_LEFT_UNPINNED /*PAGECACHE_LOCK_READ_UNLOCK*/,
PAGECACHE_UNPIN /*PAGECACHE_LOCK_WRITE_UNLOCK*/,
PAGECACHE_UNPIN /*PAGECACHE_LOCK_WRITE_TO_READ*/
},
{
PAGECACHE_PIN_LEFT_UNPINNED /*PAGECACHE_LOCK_LEFT_UNLOCKED*/,
PAGECACHE_PIN_LEFT_PINNED /*PAGECACHE_LOCK_LEFT_READLOCKED*/,
PAGECACHE_PIN_LEFT_PINNED /*PAGECACHE_LOCK_LEFT_WRITELOCKED*/,
PAGECACHE_PIN /*PAGECACHE_LOCK_READ*/,
PAGECACHE_PIN /*PAGECACHE_LOCK_WRITE*/,
PAGECACHE_PIN_LEFT_UNPINNED /*PAGECACHE_LOCK_READ_UNLOCK*/,
PAGECACHE_UNPIN /*PAGECACHE_LOCK_WRITE_UNLOCK*/,
PAGECACHE_PIN_LEFT_PINNED /*PAGECACHE_LOCK_WRITE_TO_READ*/
}
};
uchar *pagecache_valid_read(PAGECACHE *pagecache,
......@@ -2839,24 +2854,27 @@ uchar *pagecache_valid_read(PAGECACHE *pagecache,
uchar *buff,
enum pagecache_page_type type,
enum pagecache_page_lock lock,
PAGECACHE_PAGE_LINK *link,
PAGECACHE_BLOCK_LINK **link,
pagecache_disk_read_validator validator,
uchar* validator_data)
{
int error= 0;
enum pagecache_page_pin pin= lock_to_pin[lock];
PAGECACHE_PAGE_LINK fake_link;
enum pagecache_page_pin pin= lock_to_pin[test(buff==0)][lock];
PAGECACHE_BLOCK_LINK *fake_link;
DBUG_ENTER("pagecache_valid_read");
DBUG_PRINT("enter", ("fd: %u page: %lu level: %u t:%s %s %s",
(uint) file->file, (ulong) pageno, level,
DBUG_PRINT("enter", ("fd: %u page: %lu buffer: 0x%lx level: %u "
"t:%s %s %s",
(uint) file->file, (ulong) pageno,
(ulong) buff, level,
page_cache_page_type_str[type],
page_cache_page_lock_str[lock],
page_cache_page_pin_str[pin]));
DBUG_ASSERT(buff != 0 || (buff == 0 && (pin == PAGECACHE_PIN ||
pin == PAGECACHE_PIN_LEFT_PINNED)));
if (!link)
link= &fake_link;
else
*link= 0;
*link= 0; /* Catch errors */
restart:
......@@ -2910,7 +2928,12 @@ uchar *pagecache_valid_read(PAGECACHE *pagecache,
goto restart;
}
if (! ((status= block->status) & PCBLOCK_ERROR))
status= block->status;
if (!buff)
buff= block->buffer;
else
{
if (!(status & PCBLOCK_ERROR))
{
#if !defined(SERIALIZED_READ_FROM_CACHE)
pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
......@@ -2924,6 +2947,7 @@ uchar *pagecache_valid_read(PAGECACHE *pagecache,
pagecache_pthread_mutex_lock(&pagecache->cache_lock);
#endif
}
}
remove_reader(block);
/*
......@@ -2934,7 +2958,7 @@ uchar *pagecache_valid_read(PAGECACHE *pagecache,
if (pin == PAGECACHE_PIN_LEFT_UNPINNED || pin == PAGECACHE_UNPIN)
unreg_request(pagecache, block, 1);
else
*link= (PAGECACHE_PAGE_LINK)block;
*link= block;
dec_counter_for_resize_op(pagecache);
......@@ -2983,7 +3007,7 @@ my_bool pagecache_delete(PAGECACHE *pagecache,
my_bool flush)
{
int error= 0;
enum pagecache_page_pin pin= lock_to_pin[lock];
enum pagecache_page_pin pin= lock_to_pin[0][lock];
DBUG_ENTER("pagecache_delete");
DBUG_PRINT("enter", ("fd: %u page: %lu %s %s",
(uint) file->file, (ulong) pageno,
......@@ -3830,7 +3854,8 @@ int flush_pagecache_blocks(PAGECACHE *pagecache,
0 on success (always because it can't fail)
*/
int reset_pagecache_counters(const char *name, PAGECACHE *pagecache)
int reset_pagecache_counters(const char *name __attribute__((unused)),
PAGECACHE *pagecache)
{
DBUG_ENTER("reset_pagecache_counters");
if (!pagecache->inited)
......
......@@ -73,8 +73,6 @@ enum pagecache_write_mode
PAGECACHE_WRITE_DONE
};
typedef void *PAGECACHE_PAGE_LINK;
/* file descriptor for Maria */
typedef struct st_pagecache_file
{
......@@ -93,6 +91,8 @@ typedef struct st_pagecache_page PAGECACHE_PAGE;
struct st_pagecache_hash_link;
typedef struct st_pagecache_hash_link PAGECACHE_HASH_LINK;
typedef PAGECACHE_BLOCK_LINK * PAGECACHE_PAGE_LINK; /* To be removed */
#include <wqueue.h>
typedef my_bool (*pagecache_disk_read_validator)(uchar *page, uchar *data);
......@@ -106,25 +106,22 @@ typedef my_bool (*pagecache_disk_read_validator)(uchar *page, uchar *data);
typedef struct st_pagecache
{
my_bool inited;
my_bool resize_in_flush; /* true during flush of resize operation */
my_bool can_be_used; /* usage of cache for read/write is allowed */
uint shift; /* block size = 2 ^ shift */
size_t mem_size; /* specified size of the cache memory */
uint32 block_size; /* size of the page buffer of a cache block */
ulong min_warm_blocks; /* min number of warm blocks; */
ulong age_threshold; /* age threshold for hot blocks */
ulonglong time; /* total number of block link operations */
uint hash_entries; /* max number of entries in the hash table */
int hash_links; /* max number of hash links */
int hash_links_used; /* number of hash links taken from free links pool */
int disk_blocks; /* max number of blocks in the cache */
ulong hash_entries; /* max number of entries in the hash table */
long hash_links; /* max number of hash links */
long hash_links_used; /* number of hash links taken from free links pool */
long disk_blocks; /* max number of blocks in the cache */
ulong blocks_used; /* maximum number of concurrently used blocks */
ulong blocks_unused; /* number of currently unused blocks */
ulong blocks_changed; /* number of currently dirty blocks */
ulong warm_blocks; /* number of blocks in warm sub-chain */
ulong cnt_for_resize_op; /* counter to block resize operation */
ulong blocks_available; /* number of blocks available in the LRU chain */
long blocks; /* max number of blocks in the cache */
uint32 block_size; /* size of the page buffer of a cache block */
PAGECACHE_HASH_LINK **hash_root;/* arr. of entries into hash table buckets */
PAGECACHE_HASH_LINK *hash_link_root;/* memory for hash table links */
PAGECACHE_HASH_LINK *free_hash_list;/* list of free hash links */
......@@ -159,17 +156,20 @@ typedef struct st_pagecache
ulonglong global_cache_r_requests;/* number of read requests (read hits) */
ulonglong global_cache_read; /* number of reads from files to cache */
int blocks; /* max number of blocks in the cache */
uint shift; /* block size = 2 ^ shift */
my_bool inited;
my_bool resize_in_flush; /* true during flush of resize operation */
my_bool can_be_used; /* usage of cache for read/write is allowed */
my_bool in_init; /* Set to 1 in MySQL during init/resize */
} PAGECACHE;
/* The default key cache */
extern PAGECACHE dflt_pagecache_var, *dflt_pagecache;
extern int init_pagecache(PAGECACHE *pagecache, size_t use_mem,
extern ulong init_pagecache(PAGECACHE *pagecache, size_t use_mem,
uint division_limit, uint age_threshold,
uint block_size);
extern int resize_pagecache(PAGECACHE *pagecache,
extern ulong resize_pagecache(PAGECACHE *pagecache,
size_t use_mem, uint division_limit,
uint age_threshold);
extern void change_pagecache_param(PAGECACHE *pagecache, uint division_limit,
......@@ -185,7 +185,7 @@ extern uchar *pagecache_valid_read(PAGECACHE *pagecache,
uchar *buff,
enum pagecache_page_type type,
enum pagecache_page_lock lock,
PAGECACHE_PAGE_LINK *link,
PAGECACHE_BLOCK_LINK **link,
pagecache_disk_read_validator validator,
uchar* validator_data);
......@@ -218,7 +218,7 @@ extern void pagecache_unlock(PAGECACHE *pagecache,
LSN first_REDO_LSN_for_page,
LSN lsn);
extern void pagecache_unlock_by_link(PAGECACHE *pagecache,
PAGECACHE_PAGE_LINK *link,
PAGECACHE_BLOCK_LINK *block,
enum pagecache_page_lock lock,
enum pagecache_page_pin pin,
LSN first_REDO_LSN_for_page,
......
......@@ -105,6 +105,7 @@ static int new_table(uint16 sid, const char *name,
static int new_page(File fileid, pgcache_page_no_t pageid, LSN rec_lsn,
struct st_dirty_page *dirty_page);
static int close_all_tables(void);
static my_bool close_one_table(const char *name, LSN addr);
static void print_redo_phase_progress(TRANSLOG_ADDRESS addr);
/** @brief global [out] buffer for translog_read_record(); never shrinks */
......@@ -415,6 +416,8 @@ prototype_redo_exec_hook(REDO_CREATE_TABLE)
uint flags;
int error= 1, create_mode= O_RDWR | O_TRUNC;
MARIA_HA *info= NULL;
uint kfile_size_before_extension, keystart;
if (skip_DDLs)
{
tprint(tracef, "we skip DDLs\n");
......@@ -431,6 +434,12 @@ prototype_redo_exec_hook(REDO_CREATE_TABLE)
}
name= log_record_buffer.str;
tprint(tracef, "Table '%s'", name);
if (close_one_table(name, rec->lsn))
{
tprint(tracef, " got error %d on close\n", my_errno);
ALERT_USER();
goto end;
}
/* we try hard to get create_rename_lsn, to avoid mistakes if possible */
info= maria_open(name, O_RDONLY, HA_OPEN_FOR_REPAIR);
if (info)
......@@ -474,7 +483,7 @@ prototype_redo_exec_hook(REDO_CREATE_TABLE)
info= NULL;
}
else /* one or two files absent, or header corrupted... */
tprint(tracef, "can't be opened, probably does not exist");
tprint(tracef, " can't be opened, probably does not exist");
/* if does not exist, or is older, overwrite it */
/** @todo symlinks */
ptr= name + strlen(name) + 1;
......@@ -490,13 +499,13 @@ prototype_redo_exec_hook(REDO_CREATE_TABLE)
if ((kfile= my_create_with_symlink(linkname_ptr, filename, 0, create_mode,
MYF(MY_WME|create_flag))) < 0)
{
tprint(tracef, "Failed to create index file\n");
tprint(tracef, " Failed to create index file\n");
goto end;
}
ptr++;
uint kfile_size_before_extension= uint2korr(ptr);
kfile_size_before_extension= uint2korr(ptr);
ptr+= 2;
uint keystart= uint2korr(ptr);
keystart= uint2korr(ptr);
ptr+= 2;
/* set create_rename_lsn (for maria_read_log to be idempotent) */
lsn_store(ptr + sizeof(info->s->state.header) + 2, rec->lsn);
......@@ -507,7 +516,7 @@ prototype_redo_exec_hook(REDO_CREATE_TABLE)
kfile_size_before_extension, 0, MYF(MY_NABP|MY_WME)) ||
my_chsize(kfile, keystart, 0, MYF(MY_WME)))
{
tprint(tracef, "Failed to write to index file\n");
tprint(tracef, " Failed to write to index file\n");
goto end;
}
if (!(flags & HA_DONT_TOUCH_DATA))
......@@ -521,7 +530,7 @@ prototype_redo_exec_hook(REDO_CREATE_TABLE)
MYF(MY_WME | create_flag))) < 0) ||
my_close(dfile, MYF(MY_WME)))
{
tprint(tracef, "Failed to create data file\n");
tprint(tracef, " Failed to create data file\n");
goto end;
}
/*
......@@ -533,7 +542,7 @@ prototype_redo_exec_hook(REDO_CREATE_TABLE)
if (((info= maria_open(name, O_RDONLY, 0)) == NULL) ||
_ma_initialize_data_file(info->s, info->dfile.file))
{
tprint(tracef, "Failed to open new table or write to data file\n");
tprint(tracef, " Failed to open new table or write to data file\n");
goto end;
}
}
......@@ -1436,6 +1445,11 @@ prototype_undo_exec_hook(UNDO_ROW_UPDATE)
static int run_redo_phase(LSN lsn, my_bool apply)
{
TRANSLOG_HEADER_BUFFER rec;
struct st_translog_scanner_data scanner;
int len;
uint i;
/* install hooks for execution */
#define install_redo_exec_hook(R) \
log_record_type_descriptor[LOGREC_ ## R].record_execute_in_redo_phase= \
......@@ -1467,8 +1481,6 @@ static int run_redo_phase(LSN lsn, my_bool apply)
current_group_end_lsn= LSN_IMPOSSIBLE;
TRANSLOG_HEADER_BUFFER rec;
if (unlikely(lsn == LSN_IMPOSSIBLE || lsn == translog_get_horizon()))
{
tprint(tracef, "checkpoint address refers to the log end log or "
......@@ -1476,7 +1488,7 @@ static int run_redo_phase(LSN lsn, my_bool apply)
return 0;
}
int len= translog_read_record_header(lsn, &rec);
len= translog_read_record_header(lsn, &rec);
/** @todo EOF should be detected */
if (len == RECHEADER_READ_ERROR)
......@@ -1484,13 +1496,11 @@ static int run_redo_phase(LSN lsn, my_bool apply)
tprint(tracef, "Failed to read header of the first record.\n");
return 1;
}
struct st_translog_scanner_data scanner;
if (translog_init_scanner(lsn, 1, &scanner))
if (translog_init_scanner(lsn, 1, &scanner, 0))
{
tprint(tracef, "Scanner init failed\n");
return 1;
}
uint i;
for (i= 1;;i++)
{
uint16 sid= rec.short_trid;
......@@ -1533,7 +1543,7 @@ static int run_redo_phase(LSN lsn, my_bool apply)
tprint(tracef, "Cannot find record where it should be\n");
return 1;
}
if (translog_init_scanner(rec2.lsn, 1, &scanner2))
if (translog_init_scanner(rec2.lsn, 1, &scanner2, 0))
{
tprint(tracef, "Scanner2 init failed\n");
return 1;
......@@ -1607,6 +1617,7 @@ static uint end_of_redo_phase(my_bool prepare_for_undo_phase)
{
uint sid, unfinished= 0;
char llbuf[22];
LSN addr;
hash_free(&all_dirty_pages);
/*
......@@ -1667,7 +1678,7 @@ static uint end_of_redo_phase(my_bool prepare_for_undo_phase)
The UNDO phase uses some normal run-time code of ROLLBACK: generates log
records, etc; prepare tables for that
*/
LSN addr= translog_get_horizon();
addr= translog_get_horizon();
for (sid= 0; sid <= SHARE_ID_MAX; sid++)
{
MARIA_HA *info= all_tables[sid].info;
......@@ -2070,6 +2081,42 @@ static int close_all_tables(void)
return error;
}
/* Close one table during redo phase */
static my_bool close_one_table(const char *open_name, LSN addr)
{
my_bool res= 0;
LIST *pos;
/* There are no other threads using the tables, so we don't need any locks */
for (pos=maria_open_list ; pos ;)
{
MARIA_HA *info= (MARIA_HA*) pos->data;
MARIA_SHARE *share= info->s;
pos= pos->next;
if (!strcmp(share->open_file_name, open_name))
{
struct st_table_for_recovery *internal_table, *end;
for (internal_table= all_tables, end= internal_table + SHARE_ID_MAX + 1;
internal_table < end ;
internal_table++)
{
if (internal_table->info == info)
{
internal_table->info= 0;
break;
}
}
prepare_table_for_close(info, addr);
if (maria_close(info))
res= 1;
}
}
return res;
}
static void print_redo_phase_progress(TRANSLOG_ADDRESS addr)
{
static int end_logno= FILENO_IMPOSSIBLE, end_offset, percentage_printed= 0;
......
......@@ -154,7 +154,7 @@ int main(int argc, char **argv)
enum options_mc {
OPT_CHARSETS_DIR=256, OPT_SET_COLLATION,OPT_START_CHECK_POS,
OPT_CORRECT_CHECKSUM, OPT_KEY_BUFFER_SIZE,
OPT_CORRECT_CHECKSUM, OPT_PAGE_BUFFER_SIZE,
OPT_KEY_CACHE_BLOCK_SIZE, OPT_MARIA_BLOCK_SIZE,
OPT_READ_BUFFER_SIZE, OPT_WRITE_BUFFER_SIZE, OPT_SORT_BUFFER_SIZE,
OPT_SORT_KEY_BLOCKS, OPT_DECODE_BITS, OPT_FT_MIN_WORD_LEN,
......@@ -296,7 +296,7 @@ static struct my_option my_long_options[] =
{"wait", 'w',
"Wait if table is locked.",
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
{ "key_buffer_size", OPT_KEY_BUFFER_SIZE, "",
{ "page_buffer_size", OPT_PAGE_BUFFER_SIZE, "",
(uchar**) &check_param.use_buffers, (uchar**) &check_param.use_buffers, 0,
GET_ULONG, REQUIRED_ARG, (long) USE_BUFFER_INIT, (long) MALLOC_OVERHEAD,
(long) ~0L, (long) MALLOC_OVERHEAD, (long) IO_SIZE, 0},
......
......@@ -842,7 +842,7 @@ typedef struct st_maria_block_info
#define UPDATE_AUTO_INC 8
#define UPDATE_OPEN_COUNT 16
#define USE_BUFFER_INIT (((1024L*512L-MALLOC_OVERHEAD)/IO_SIZE)*IO_SIZE)
#define USE_BUFFER_INIT (((1024L*1024L*10-MALLOC_OVERHEAD)/8192)*8192)
#define READ_BUFFER_INIT (1024L*256L-MALLOC_OVERHEAD)
#define SORT_BUFFER_INIT (2048L*1024L-MALLOC_OVERHEAD)
#define MIN_SORT_BUFFER (4096-MALLOC_OVERHEAD)
......@@ -906,7 +906,7 @@ my_bool _ma_check_status(void *param);
void _ma_reset_status(MARIA_HA *maria);
#include "ma_commit.h"
extern MARIA_HA *_ma_test_if_reopen(char *filename);
extern MARIA_HA *_ma_test_if_reopen(const char *filename);
my_bool _ma_check_table_is_closed(const char *name, const char *where);
int _ma_open_datafile(MARIA_HA *info, MARIA_SHARE *share, File file_to_dup);
int _ma_open_keyfile(MARIA_SHARE *share);
......
......@@ -17,7 +17,6 @@
#include "ma_recovery.h"
#include <my_getopt.h>
#define PCACHE_SIZE (1024*1024*10)
#define LOG_FLAGS 0
#define LOG_FILE_SIZE (1024L*1024L)
......@@ -30,7 +29,8 @@ const char *default_dbug_option= "d:t:i:O,\\maria_read_log.trace";
const char *default_dbug_option= "d:t:i:o,/tmp/maria_read_log.trace";
#endif
#endif /* DBUG_OFF */
static my_bool opt_only_display, opt_display_and_apply;
static my_bool opt_only_display, opt_apply, opt_apply_undo, opt_silent;
static ulong opt_page_buffer_size;
int main(int argc, char **argv)
{
......@@ -63,7 +63,7 @@ int main(int argc, char **argv)
}
/* same page cache for log and data; assumes same page size... */
DBUG_ASSERT(maria_block_size == TRANSLOG_PAGE_SIZE);
if (init_pagecache(maria_pagecache, PCACHE_SIZE, 0, 0,
if (init_pagecache(maria_pagecache, opt_page_buffer_size, 0, 0,
TRANSLOG_PAGE_SIZE) == 0)
{
fprintf(stderr, "Got error in init_pagecache() (errno: %d)\n", errno);
......@@ -100,8 +100,8 @@ int main(int argc, char **argv)
LSN_IN_PARTS(lsn));
fprintf(stdout, "TRACE of the last maria_read_log\n");
if (maria_apply_log(lsn, opt_display_and_apply, stdout,
opt_display_and_apply, FALSE))
if (maria_apply_log(lsn, opt_apply, opt_silent ? NULL : stdout,
opt_apply_undo, FALSE))
goto err;
fprintf(stdout, "%s: SUCCESS\n", my_progname);
......@@ -121,19 +121,32 @@ int main(int argc, char **argv)
static struct my_option my_long_options[] =
{
{"apply", 'a',
"Apply log to tables. Will display a lot of information if not run with --silent",
(uchar **) &opt_apply, (uchar **) &opt_apply, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
#ifndef DBUG_OFF
{"debug", '#', "Output debug log. Often the argument is 'd:t:o,filename'.",
0, 0, 0, GET_STR, OPT_ARG, 0, 0, 0, 0, 0, 0},
#endif
{"help", '?', "Display this help and exit.",
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
{"only-display", 'o', "display brief info about records's header",
(uchar **) &opt_only_display, (uchar **) &opt_only_display, 0, GET_BOOL,
NO_ARG,0, 0, 0, 0, 0, 0},
{"display-and-apply", 'a',
"like --only-display but displays more info and modifies tables",
(uchar **) &opt_display_and_apply, (uchar **) &opt_display_and_apply, 0,
{ "page_buffer_size", 'P', "",
(uchar**) &opt_page_buffer_size, (uchar**) &opt_page_buffer_size, 0,
GET_ULONG, REQUIRED_ARG, (long) USE_BUFFER_INIT,
(long) MALLOC_OVERHEAD, (long) ~(ulong) 0, (long) MALLOC_OVERHEAD,
(long) IO_SIZE, 0},
{"silent", 's', "Print less information during apply/undo phase",
(uchar **) &opt_silent, (uchar **) &opt_silent, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
#ifndef DBUG_OFF
{"debug", '#', "Output debug log. Often this is 'd:t:o,filename'.",
0, 0, 0, GET_STR, OPT_ARG, 0, 0, 0, 0, 0, 0},
#endif
{"undo", 'u', "Apply undos to tables. (disable with --disable-undo)",
(uchar **) &opt_apply_undo, (uchar **) &opt_apply_undo, 0,
GET_BOOL, NO_ARG, 1, 0, 0, 0, 0, 0},
{"version", 'V', "Print version and exit.",
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
{ 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
};
......@@ -141,7 +154,7 @@ static struct my_option my_long_options[] =
static void print_version(void)
{
VOID(printf("%s Ver 1.0 for %s on %s\n",
VOID(printf("%s Ver 1.1 for %s on %s\n",
my_progname, SYSTEM_TYPE, MACHINE_TYPE));
NETWARE_SET_SCREEN_MODE(1);
}
......@@ -174,6 +187,9 @@ get_one_option(int optid __attribute__((unused)),
case '?':
usage();
exit(0);
case 'V':
print_version();
exit(0);
#ifndef DBUG_OFF
case '#':
DBUG_SET_INITIAL(argument ? argument : default_dbug_option);
......@@ -192,7 +208,10 @@ static void get_options(int *argc,char ***argv)
if ((ho_error=handle_options(argc, argv, my_long_options, get_one_option)))
exit(ho_error);
if ((opt_only_display + opt_display_and_apply) != 1)
if (opt_apply_undo)
opt_apply= 1;
if ((opt_only_display + opt_apply) != 1)
{
usage();
exit(1);
......
......@@ -407,7 +407,7 @@ static void version()
static my_bool
get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
char *argument)
char *argument __attribute__((unused)))
{
switch(optid) {
case 'V':
......
......@@ -363,7 +363,7 @@ int main(int argc __attribute__((unused)), char *argv[])
read_ok(&rec);
translog_free_record_header(&rec);
lsn= first_lsn;
if (translog_init_scanner(first_lsn, 1, &scanner))
if (translog_init_scanner(first_lsn, 1, &scanner, 0))
{
fprintf(stderr, "scanner init failed\n");
goto err;
......
......@@ -378,7 +378,7 @@ int main(int argc __attribute__((unused)), char *argv[])
ok(1, "read record");
translog_free_record_header(&rec);
lsn= first_lsn;
if (translog_init_scanner(first_lsn, 1, &scanner))
if (translog_init_scanner(first_lsn, 1, &scanner, 0))
{
fprintf(stderr, "scanner init failed\n");
goto err;
......
......@@ -373,7 +373,7 @@ int main(int argc __attribute__((unused)),
bzero(indeces, sizeof(indeces));
if (translog_init_scanner(first_lsn, 1, &scanner))
if (translog_init_scanner(first_lsn, 1, &scanner, 0))
{
fprintf(stderr, "scanner init failed\n");
goto err;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment