Commit de3178fd authored by unknown's avatar unknown

Checking that the very last record is fully written

  on the loghandler start.
Variable definition moved because it is C programm.


storage/maria/ma_loghandler.c:
  Checking that the very last record is fully written
    on the loghandler start.
storage/maria/ma_recovery.c:
  Variable definition moved because it is C programm.
parent 694c722b
...@@ -236,6 +236,8 @@ static my_atomic_rwlock_t LOCK_id_to_share; ...@@ -236,6 +236,8 @@ static my_atomic_rwlock_t LOCK_id_to_share;
static my_bool translog_page_validator(uchar *page_addr, uchar* data_ptr); static my_bool translog_page_validator(uchar *page_addr, uchar* data_ptr);
static my_bool translog_get_next_chunk(TRANSLOG_SCANNER_DATA *scanner);
/* /*
Initialize log_record_type_descriptors Initialize log_record_type_descriptors
...@@ -2657,6 +2659,59 @@ static uint16 translog_get_chunk_header_length(uchar *page, uint16 offset) ...@@ -2657,6 +2659,59 @@ static uint16 translog_get_chunk_header_length(uchar *page, uint16 offset)
} }
/**
@brief Truncate the log to the given address
@param addr new horizon
@retval 0 OK
@retval 1 Error
*/
static my_bool translog_truncate_log(TRANSLOG_ADDRESS addr)
{
uint32 next_page_offset, page_rest;
uint32 i;
File fd;
char path[FN_REFLEN], page[TRANSLOG_PAGE_SIZE];
DBUG_ENTER("translog_truncate_log");
translog_lock();
/* TODO: write warning to the client */
DBUG_PRINT("warning", ("removing all records from (%lx,0x%lx) "
"till (%lx,0x%lx)",
LSN_IN_PARTS(addr),
LSN_IN_PARTS(log_descriptor.horizon)));
DBUG_ASSERT(cmp_translog_addr(addr, log_descriptor.horizon) < 0);
/* remove files between the address and horizon */
for (i= LSN_FILE_NO(addr) + 1; i <= LSN_FILE_NO(log_descriptor.horizon); i++)
if (my_delete(translog_filename_by_fileno(i, path), MYF(MY_WME)))
{
translog_unlock();
DBUG_RETURN(1);
}
/* truncate the last file up to the last page */
next_page_offset= LSN_OFFSET(addr);
next_page_offset= (next_page_offset -
((next_page_offset - 1) % TRANSLOG_PAGE_SIZE + 1) +
TRANSLOG_PAGE_SIZE);
page_rest= next_page_offset - LSN_OFFSET(addr);
memset(page, TRANSLOG_FILLER, TRANSLOG_PAGE_SIZE);
if ((fd= open_logfile_by_number_no_cache(LSN_FILE_NO(addr))) < 0 ||
my_chsize(fd, next_page_offset, TRANSLOG_FILLER, MYF(MY_WME)) ||
(page_rest && my_pwrite(fd, page, page_rest, LSN_OFFSET(addr),
log_write_flags)) ||
my_sync(fd, MYF(MY_WME)) ||
my_close(fd, MYF(MY_WME)))
{
translog_unlock();
DBUG_RETURN(1);
}
log_descriptor.horizon= addr;
translog_unlock();
DBUG_RETURN(0);
}
/* /*
Initialize transaction log Initialize transaction log
...@@ -3012,7 +3067,143 @@ my_bool translog_init(const char *directory, ...@@ -3012,7 +3067,143 @@ my_bool translog_init(const char *directory,
if (unlikely(!id_to_share)) if (unlikely(!id_to_share))
DBUG_RETURN(1); DBUG_RETURN(1);
id_to_share--; /* min id is 1 */ id_to_share--; /* min id is 1 */
translog_inited= 1; translog_inited= 1;
/* Check the last LSN record integrity */
if (logs_found)
{
TRANSLOG_SCANNER_DATA scanner;
TRANSLOG_ADDRESS page_addr;
LSN last_lsn= LSN_IMPOSSIBLE;
/*
take very last page address and try to find LSN record on it
if it fail take address of previous page and so on
*/
page_addr= (log_descriptor.horizon -
((log_descriptor.horizon - 1) % TRANSLOG_PAGE_SIZE + 1));
if (translog_init_scanner(page_addr, 1, &scanner, 1))
DBUG_RETURN(1);
scanner.page_offset= page_overhead[scanner.page[TRANSLOG_PAGE_FLAGS]];
for (;;)
{
uint chunk_type;
chunk_type= scanner.page[scanner.page_offset] & TRANSLOG_CHUNK_TYPE;
DBUG_PRINT("info", ("type: %x byte: %x", (uint) chunk_type,
(uint) scanner.page[scanner.page_offset]));
while (chunk_type != TRANSLOG_CHUNK_LSN &&
chunk_type != TRANSLOG_CHUNK_FIXED &&
scanner.page != END_OF_LOG &&
scanner.page[scanner.page_offset] != TRANSLOG_FILLER &&
scanner.page_addr == page_addr)
{
if (translog_get_next_chunk(&scanner))
{
translog_destroy_scanner(&scanner);
DBUG_RETURN(1);
}
if (scanner.page != END_OF_LOG)
{
chunk_type= scanner.page[scanner.page_offset] & TRANSLOG_CHUNK_TYPE;
DBUG_PRINT("info", ("type: %x byte: %x", (uint) chunk_type,
(uint) scanner.page[scanner.page_offset]));
}
}
if (chunk_type == TRANSLOG_CHUNK_LSN ||
chunk_type == TRANSLOG_CHUNK_FIXED)
{
last_lsn= scanner.page_addr + scanner.page_offset;
if (translog_get_next_chunk(&scanner))
{
translog_destroy_scanner(&scanner);
DBUG_RETURN(1);
}
if (scanner.page == END_OF_LOG)
break; /* it was the last record */
chunk_type= scanner.page[scanner.page_offset] & TRANSLOG_CHUNK_TYPE;
DBUG_PRINT("info", ("type: %x byte: %x", (uint) chunk_type,
(uint) scanner.page[scanner.page_offset]));
continue; /* try to find other record on this page */
}
if (last_lsn != LSN_IMPOSSIBLE)
break; /* there is no more records on the page */
/* We have to make step back */
if (unlikely(LSN_OFFSET(page_addr) == TRANSLOG_PAGE_SIZE))
{
uint32 file_no= LSN_FILE_NO(page_addr);
bool last_page_ok;
/* it is beginning of the current file */
if (unlikely(file_no == 1))
{
/*
It is beginning of the log => there is no LSNs in the log =>
There is no harm in leaving it "as-is".
*/
DBUG_RETURN(0);
}
file_no--;
page_addr= MAKE_LSN(file_no, TRANSLOG_PAGE_SIZE);
translog_get_last_page_addr(&page_addr, &last_page_ok);
/* page should be OK as it is not the last file */
DBUG_ASSERT(last_page_ok);
}
else
{
page_addr-= TRANSLOG_PAGE_SIZE;
}
translog_destroy_scanner(&scanner);
if (translog_init_scanner(page_addr, 1, &scanner, 1))
DBUG_RETURN(1);
scanner.page_offset= page_overhead[scanner.page[TRANSLOG_PAGE_FLAGS]];
}
translog_destroy_scanner(&scanner);
/* Now scanner points to the last LSN chunk, lets check it */
{
TRANSLOG_HEADER_BUFFER rec;
translog_size_t rec_len;
int len;
uchar buffer[1];
DBUG_PRINT("info", ("going to check the last found record (%lu,0x%lx)",
LSN_IN_PARTS(last_lsn)));
len=
translog_read_record_header(last_lsn, &rec);
if (unlikely (len == RECHEADER_READ_ERROR ||
len == RECHEADER_READ_EOF))
{
DBUG_PRINT("error", ("unexpected end of log or record during "
"reading record header: (%lu,0x%lx) len: %d",
LSN_IN_PARTS(last_lsn), len));
if (translog_truncate_log(last_lsn))
DBUG_RETURN(1);
}
else
{
DBUG_ASSERT(last_lsn == rec.lsn);
if (likely(rec.record_length != 0))
{
/*
Reading the last byte of record will trigger scanning all
record chunks for now
*/
rec_len= translog_read_record(rec.lsn, rec.record_length - 1, 1,
buffer, NULL);
if (rec_len != 1)
{
DBUG_PRINT("error", ("unexpected end of log or record during "
"reading record body: (%lu,0x%lx) len: %d",
LSN_IN_PARTS(rec.lsn),
len));
if (translog_truncate_log(last_lsn))
DBUG_RETURN(1);
}
}
}
}
}
DBUG_RETURN(0); DBUG_RETURN(0);
} }
......
...@@ -1689,8 +1689,9 @@ static int run_redo_phase(LSN lsn, enum maria_apply_log_way apply) ...@@ -1689,8 +1689,9 @@ static int run_redo_phase(LSN lsn, enum maria_apply_log_way apply)
display_record_position(log_desc2, &rec2, 0); display_record_position(log_desc2, &rec2, 0);
if (apply == MARIA_LOG_CHECK) if (apply == MARIA_LOG_CHECK)
{ {
translog_size_t read_len;
enlarge_buffer(&rec2); enlarge_buffer(&rec2);
translog_size_t read_len= read_len=
translog_read_record(rec2.lsn, 0, rec2.record_length, translog_read_record(rec2.lsn, 0, rec2.record_length,
log_record_buffer.str, NULL); log_record_buffer.str, NULL);
if (read_len != rec2.record_length) if (read_len != rec2.record_length)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment