Commit 19b75b6c authored by unknown's avatar unknown

Fixes problem with getting not LSN address gotten from

horizon addres.


storage/maria/ma_loghandler.c:
  New function to get correct LSN from chunk address.
storage/maria/ma_loghandler.h:
  New function to get correct LSN from chunk address.
parent f77e2969
......@@ -5679,7 +5679,7 @@ int translog_read_record_header_scan(TRANSLOG_SCANNER_DATA *scanner,
/**
@brief Read record header and some fixed part of the next record (the part
depend on record type).
@param scanner data for scanning if lsn is NULL scanner data
will be used for continue scanning.
The scanner can be NULL.
......@@ -6594,6 +6594,49 @@ static uint32 translog_first_file(TRANSLOG_ADDRESS horizon, int is_protected)
}
/**
@brief returns the most close LSN higher the given chunk address
@param addr the chunk address to start from
@param horizon the horizon if it is known or LSN_IMPOSSIBLE
@retval LSN_ERROR Error
@retval LSN_IMPOSSIBLE no LSNs after the address
@retval # LSN of the most close LSN higher the given chunk address
*/
LSN translog_next_LSN(TRANSLOG_ADDRESS addr, TRANSLOG_ADDRESS horizon)
{
uint chunk_type;
TRANSLOG_SCANNER_DATA scanner;
DBUG_ENTER("translog_next_LSN");
if (horizon == LSN_IMPOSSIBLE)
horizon= translog_get_horizon();
if (addr == horizon)
DBUG_RETURN(LSN_IMPOSSIBLE);
translog_init_scanner(addr, 0, &scanner);
chunk_type= scanner.page[scanner.page_offset] & TRANSLOG_CHUNK_TYPE;
DBUG_PRINT("info", ("type: %x byte: %x", (uint) chunk_type,
(uint) scanner.page[scanner.page_offset]));
while (chunk_type != TRANSLOG_CHUNK_LSN &&
chunk_type != TRANSLOG_CHUNK_FIXED &&
scanner.page[scanner.page_offset] != 0)
{
if (translog_get_next_chunk(&scanner))
DBUG_RETURN(LSN_ERROR);
chunk_type= scanner.page[scanner.page_offset] & TRANSLOG_CHUNK_TYPE;
DBUG_PRINT("info", ("type: %x byte: %x", (uint) chunk_type,
(uint) scanner.page[scanner.page_offset]));
}
if (scanner.page[scanner.page_offset] == 0)
DBUG_RETURN(LSN_IMPOSSIBLE); /* reached page filler */
DBUG_RETURN(scanner.page_addr + scanner.page_offset);
}
/**
@brief returns the LSN of the first record starting in this log
......@@ -6607,10 +6650,8 @@ LSN translog_first_lsn_in_log()
TRANSLOG_ADDRESS addr, horizon= translog_get_horizon();
TRANSLOG_VALIDATOR_DATA data;
uint file;
uint chunk_type;
uint16 chunk_offset;
uchar *page;
TRANSLOG_SCANNER_DATA scanner;
DBUG_ENTER("translog_first_lsn_in_log");
DBUG_PRINT("info", ("Horizon: (%lu,0x%lx)", LSN_IN_PARTS(addr)));
DBUG_ASSERT(translog_inited == 1);
......@@ -6623,30 +6664,15 @@ LSN translog_first_lsn_in_log()
addr= MAKE_LSN(file, TRANSLOG_PAGE_SIZE); /* the first page of the file */
data.addr= &addr;
if ((page= translog_get_page(&data, scanner.buffer)) == NULL ||
(chunk_offset= translog_get_first_chunk_offset(page)) == 0)
DBUG_RETURN(LSN_ERROR);
addr+= chunk_offset;
if (addr == horizon)
DBUG_RETURN(LSN_IMPOSSIBLE);
translog_init_scanner(addr, 0, &scanner);
chunk_type= scanner.page[scanner.page_offset] & TRANSLOG_CHUNK_TYPE;
DBUG_PRINT("info", ("type: %x byte: %x", (uint) chunk_type,
(uint) scanner.page[scanner.page_offset]));
while (chunk_type != TRANSLOG_CHUNK_LSN &&
chunk_type != TRANSLOG_CHUNK_FIXED &&
scanner.page[scanner.page_offset] != 0)
{
if (translog_get_next_chunk(&scanner))
uchar buffer[TRANSLOG_PAGE_SIZE];
if ((page= translog_get_page(&data, buffer)) == NULL ||
(chunk_offset= translog_get_first_chunk_offset(page)) == 0)
DBUG_RETURN(LSN_ERROR);
chunk_type= scanner.page[scanner.page_offset] & TRANSLOG_CHUNK_TYPE;
DBUG_PRINT("info", ("type: %x byte: %x", (uint) chunk_type,
(uint) scanner.page[scanner.page_offset]));
}
if (scanner.page[scanner.page_offset] == 0)
DBUG_RETURN(LSN_IMPOSSIBLE); /* reached page filler */
DBUG_RETURN(scanner.page_addr + scanner.page_offset);
addr+= chunk_offset;
DBUG_RETURN(translog_next_LSN(addr, horizon));
}
......
......@@ -274,6 +274,7 @@ extern my_bool translog_inited;
extern LSN translog_first_lsn_in_log();
extern LSN translog_first_theoretical_lsn();
extern LSN translog_next_LSN(TRANSLOG_ADDRESS addr, TRANSLOG_ADDRESS horizon);
/* record parts descriptor */
struct st_translog_parts
......
......@@ -212,6 +212,13 @@ int maria_apply_log(LSN from_lsn, my_bool apply, FILE *trace_file,
from_lsn= parse_checkpoint_record(last_checkpoint_lsn);
if (from_lsn == LSN_IMPOSSIBLE)
goto err;
from_lsn= translog_next_LSN(from_lsn, LSN_IMPOSSIBLE);
if (from_lsn == LSN_ERROR)
goto err;
/*
from_lsn LSN_IMPOSSIBLE will be correctly processed
by run_redo_phase()
*/
}
}
......@@ -1260,9 +1267,10 @@ static int run_redo_phase(LSN lsn, my_bool apply)
TRANSLOG_HEADER_BUFFER rec;
if (unlikely(lsn == translog_get_horizon()))
if (unlikely(lsn == LSN_IMPOSSIBLE || lsn == translog_get_horizon()))
{
fprintf(tracef, "Cannot find a first record, empty log, nothing to do.\n");
fprintf(tracef, "checkpoint address refers to the log end log or "
"log is empty, nothing to do.\n");
return 0;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment