Commit c1aacc17 authored by unknown's avatar unknown

Fixes problem with getting not LSN address gotten from

horizon addres.


storage/maria/ma_loghandler.c:
  New function to get correct LSN from chunk address.
storage/maria/ma_loghandler.h:
  New function to get correct LSN from chunk address.
parent 46aed8a6
...@@ -6594,6 +6594,49 @@ static uint32 translog_first_file(TRANSLOG_ADDRESS horizon, int is_protected) ...@@ -6594,6 +6594,49 @@ static uint32 translog_first_file(TRANSLOG_ADDRESS horizon, int is_protected)
} }
/**
@brief returns the most close LSN higher the given chunk address
@param addr the chunk address to start from
@param horizon the horizon if it is known or LSN_IMPOSSIBLE
@retval LSN_ERROR Error
@retval LSN_IMPOSSIBLE no LSNs after the address
@retval # LSN of the most close LSN higher the given chunk address
*/
LSN translog_next_LSN(TRANSLOG_ADDRESS addr, TRANSLOG_ADDRESS horizon)
{
uint chunk_type;
TRANSLOG_SCANNER_DATA scanner;
DBUG_ENTER("translog_next_LSN");
if (horizon == LSN_IMPOSSIBLE)
horizon= translog_get_horizon();
if (addr == horizon)
DBUG_RETURN(LSN_IMPOSSIBLE);
translog_init_scanner(addr, 0, &scanner);
chunk_type= scanner.page[scanner.page_offset] & TRANSLOG_CHUNK_TYPE;
DBUG_PRINT("info", ("type: %x byte: %x", (uint) chunk_type,
(uint) scanner.page[scanner.page_offset]));
while (chunk_type != TRANSLOG_CHUNK_LSN &&
chunk_type != TRANSLOG_CHUNK_FIXED &&
scanner.page[scanner.page_offset] != 0)
{
if (translog_get_next_chunk(&scanner))
DBUG_RETURN(LSN_ERROR);
chunk_type= scanner.page[scanner.page_offset] & TRANSLOG_CHUNK_TYPE;
DBUG_PRINT("info", ("type: %x byte: %x", (uint) chunk_type,
(uint) scanner.page[scanner.page_offset]));
}
if (scanner.page[scanner.page_offset] == 0)
DBUG_RETURN(LSN_IMPOSSIBLE); /* reached page filler */
DBUG_RETURN(scanner.page_addr + scanner.page_offset);
}
/** /**
@brief returns the LSN of the first record starting in this log @brief returns the LSN of the first record starting in this log
...@@ -6607,10 +6650,8 @@ LSN translog_first_lsn_in_log() ...@@ -6607,10 +6650,8 @@ LSN translog_first_lsn_in_log()
TRANSLOG_ADDRESS addr, horizon= translog_get_horizon(); TRANSLOG_ADDRESS addr, horizon= translog_get_horizon();
TRANSLOG_VALIDATOR_DATA data; TRANSLOG_VALIDATOR_DATA data;
uint file; uint file;
uint chunk_type;
uint16 chunk_offset; uint16 chunk_offset;
uchar *page; uchar *page;
TRANSLOG_SCANNER_DATA scanner;
DBUG_ENTER("translog_first_lsn_in_log"); DBUG_ENTER("translog_first_lsn_in_log");
DBUG_PRINT("info", ("Horizon: (%lu,0x%lx)", LSN_IN_PARTS(addr))); DBUG_PRINT("info", ("Horizon: (%lu,0x%lx)", LSN_IN_PARTS(addr)));
DBUG_ASSERT(translog_inited == 1); DBUG_ASSERT(translog_inited == 1);
...@@ -6623,30 +6664,15 @@ LSN translog_first_lsn_in_log() ...@@ -6623,30 +6664,15 @@ LSN translog_first_lsn_in_log()
addr= MAKE_LSN(file, TRANSLOG_PAGE_SIZE); /* the first page of the file */ addr= MAKE_LSN(file, TRANSLOG_PAGE_SIZE); /* the first page of the file */
data.addr= &addr; data.addr= &addr;
if ((page= translog_get_page(&data, scanner.buffer)) == NULL || {
uchar buffer[TRANSLOG_PAGE_SIZE];
if ((page= translog_get_page(&data, buffer)) == NULL ||
(chunk_offset= translog_get_first_chunk_offset(page)) == 0) (chunk_offset= translog_get_first_chunk_offset(page)) == 0)
DBUG_RETURN(LSN_ERROR); DBUG_RETURN(LSN_ERROR);
}
addr+= chunk_offset; addr+= chunk_offset;
if (addr == horizon)
DBUG_RETURN(LSN_IMPOSSIBLE);
translog_init_scanner(addr, 0, &scanner);
chunk_type= scanner.page[scanner.page_offset] & TRANSLOG_CHUNK_TYPE; DBUG_RETURN(translog_next_LSN(addr, horizon));
DBUG_PRINT("info", ("type: %x byte: %x", (uint) chunk_type,
(uint) scanner.page[scanner.page_offset]));
while (chunk_type != TRANSLOG_CHUNK_LSN &&
chunk_type != TRANSLOG_CHUNK_FIXED &&
scanner.page[scanner.page_offset] != 0)
{
if (translog_get_next_chunk(&scanner))
DBUG_RETURN(LSN_ERROR);
chunk_type= scanner.page[scanner.page_offset] & TRANSLOG_CHUNK_TYPE;
DBUG_PRINT("info", ("type: %x byte: %x", (uint) chunk_type,
(uint) scanner.page[scanner.page_offset]));
}
if (scanner.page[scanner.page_offset] == 0)
DBUG_RETURN(LSN_IMPOSSIBLE); /* reached page filler */
DBUG_RETURN(scanner.page_addr + scanner.page_offset);
} }
......
...@@ -274,6 +274,7 @@ extern my_bool translog_inited; ...@@ -274,6 +274,7 @@ extern my_bool translog_inited;
extern LSN translog_first_lsn_in_log(); extern LSN translog_first_lsn_in_log();
extern LSN translog_first_theoretical_lsn(); extern LSN translog_first_theoretical_lsn();
extern LSN translog_next_LSN(TRANSLOG_ADDRESS addr, TRANSLOG_ADDRESS horizon);
/* record parts descriptor */ /* record parts descriptor */
struct st_translog_parts struct st_translog_parts
......
...@@ -212,6 +212,13 @@ int maria_apply_log(LSN from_lsn, my_bool apply, FILE *trace_file, ...@@ -212,6 +212,13 @@ int maria_apply_log(LSN from_lsn, my_bool apply, FILE *trace_file,
from_lsn= parse_checkpoint_record(last_checkpoint_lsn); from_lsn= parse_checkpoint_record(last_checkpoint_lsn);
if (from_lsn == LSN_IMPOSSIBLE) if (from_lsn == LSN_IMPOSSIBLE)
goto err; goto err;
from_lsn= translog_next_LSN(from_lsn, LSN_IMPOSSIBLE);
if (from_lsn == LSN_ERROR)
goto err;
/*
from_lsn LSN_IMPOSSIBLE will be correctly processed
by run_redo_phase()
*/
} }
} }
...@@ -1260,9 +1267,10 @@ static int run_redo_phase(LSN lsn, my_bool apply) ...@@ -1260,9 +1267,10 @@ static int run_redo_phase(LSN lsn, my_bool apply)
TRANSLOG_HEADER_BUFFER rec; TRANSLOG_HEADER_BUFFER rec;
if (unlikely(lsn == translog_get_horizon())) if (unlikely(lsn == LSN_IMPOSSIBLE || lsn == translog_get_horizon()))
{ {
fprintf(tracef, "Cannot find a first record, empty log, nothing to do.\n"); fprintf(tracef, "checkpoint address refers to the log end log or "
"log is empty, nothing to do.\n");
return 0; return 0;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment