Commit 60f5f314 authored by Mattias Jonsson's avatar Mattias Jonsson

Bug#14589559: ASSERTION `FILE_ENTRY_BUF[2] == 0'

FAILED IN DEACTIVATE_DDL_LOG_ENTRY

deallocate_ddl_log_entry() can be called without having
locked LOCK_gdl. It uses a global buffer for reading and
writing entries in the ddl_log, and since it is not protected
by any mutex, two concurrent threads can overwrite the
content in the global buffer, so it can be different from
what was read.
Thread a reads from entry 1 into global
buffer, thread b reads from entry 2 into global buffer,
thread a writes from global buffer into entry 1
-> entry 1 is not the content of entry 2.

This is especially bad for replace entries, which uses
two phases, and does not deactivate the whole entry
after the first phase, but increases the phase instead.

Fixed by using thread local storage (stack) instead of global
storage (global buffer).

Also added buffer and size arguments to
read/write_ddl_log_file_entry.

Also only read/write first bytes in entries in
deactivate_ddl_log_entry.

Also fixed the scenario where it will try to recover from a server
compiled with a different value of IO_SIZE (very uncommon!)

updated patch with set_ddl_log_entry_from_buf
and removed read_ddl_log_entry.

Manually tested, no test case included.
parent fe36ad97
...@@ -617,13 +617,6 @@ uint build_tmptable_filename(THD* thd, char *buff, size_t bufflen) ...@@ -617,13 +617,6 @@ uint build_tmptable_filename(THD* thd, char *buff, size_t bufflen)
struct st_global_ddl_log struct st_global_ddl_log
{ {
/*
We need to adjust buffer size to be able to handle downgrades/upgrades
where IO_SIZE has changed. We'll set the buffer size such that we can
handle that the buffer size was upto 4 times bigger in the version
that wrote the DDL log.
*/
char file_entry_buf[4*IO_SIZE];
char file_name_str[FN_REFLEN]; char file_name_str[FN_REFLEN];
char *file_name; char *file_name;
DDL_LOG_MEMORY_ENTRY *first_free; DDL_LOG_MEMORY_ENTRY *first_free;
...@@ -651,51 +644,60 @@ pthread_mutex_t LOCK_gdl; ...@@ -651,51 +644,60 @@ pthread_mutex_t LOCK_gdl;
#define DDL_LOG_NUM_ENTRY_POS 0 #define DDL_LOG_NUM_ENTRY_POS 0
#define DDL_LOG_NAME_LEN_POS 4 #define DDL_LOG_NAME_LEN_POS 4
#define DDL_LOG_IO_SIZE_POS 8 #define DDL_LOG_IO_SIZE_POS 8
#define DDL_LOG_HEADER_SIZE 12
/* /**
Read one entry from ddl log file Read one entry from ddl log file.
SYNOPSIS @param[out] file_entry_buf Buffer to read into
read_ddl_log_file_entry() @param entry_no Entry number to read
entry_no Entry number to read @param size Number of bytes of the entry to read
RETURN VALUES
TRUE Error @return Operation status
FALSE Success @retval true Error
@retval false Success
*/ */
static bool read_ddl_log_file_entry(uint entry_no) static bool read_ddl_log_file_entry(uchar *file_entry_buf,
uint entry_no,
uint size)
{ {
bool error= FALSE; bool error= FALSE;
File file_id= global_ddl_log.file_id; File file_id= global_ddl_log.file_id;
uchar *file_entry_buf= (uchar*)global_ddl_log.file_entry_buf;
uint io_size= global_ddl_log.io_size; uint io_size= global_ddl_log.io_size;
DBUG_ENTER("read_ddl_log_file_entry"); DBUG_ENTER("read_ddl_log_file_entry");
DBUG_ASSERT(io_size >= size);
if (my_pread(file_id, file_entry_buf, io_size, io_size * entry_no, if (my_pread(file_id, file_entry_buf, size, io_size * entry_no,
MYF(MY_WME)) != io_size) MYF(MY_WME)) != size)
error= TRUE; error= TRUE;
DBUG_RETURN(error); DBUG_RETURN(error);
} }
/* /**
Write one entry from ddl log file Write one entry to ddl log file.
SYNOPSIS
write_ddl_log_file_entry() @param file_entry_buf Buffer to write
entry_no Entry number to write @param entry_no Entry number to write
RETURN VALUES @param size Number of bytes of the entry to write
TRUE Error
FALSE Success @return Operation status
@retval true Error
@retval false Success
*/ */
static bool write_ddl_log_file_entry(uint entry_no) static bool write_ddl_log_file_entry(uchar *file_entry_buf,
uint entry_no,
uint size)
{ {
bool error= FALSE; bool error= FALSE;
File file_id= global_ddl_log.file_id; File file_id= global_ddl_log.file_id;
char *file_entry_buf= (char*)global_ddl_log.file_entry_buf; uint io_size= global_ddl_log.io_size;
DBUG_ENTER("write_ddl_log_file_entry"); DBUG_ENTER("write_ddl_log_file_entry");
DBUG_ASSERT(io_size >= size);
if (my_pwrite(file_id, (uchar*)file_entry_buf, if (my_pwrite(file_id, file_entry_buf, size,
IO_SIZE, IO_SIZE * entry_no, MYF(MY_WME)) != IO_SIZE) io_size * entry_no, MYF(MY_WME)) != size)
error= TRUE; error= TRUE;
DBUG_RETURN(error); DBUG_RETURN(error);
} }
...@@ -714,17 +716,20 @@ static bool write_ddl_log_header() ...@@ -714,17 +716,20 @@ static bool write_ddl_log_header()
{ {
uint16 const_var; uint16 const_var;
bool error= FALSE; bool error= FALSE;
uchar file_entry_buf[DDL_LOG_HEADER_SIZE];
DBUG_ENTER("write_ddl_log_header"); DBUG_ENTER("write_ddl_log_header");
DBUG_ASSERT((DDL_LOG_NAME_POS + 3 * global_ddl_log.name_len)
<= global_ddl_log.io_size);
int4store(&global_ddl_log.file_entry_buf[DDL_LOG_NUM_ENTRY_POS], int4store(&file_entry_buf[DDL_LOG_NUM_ENTRY_POS],
global_ddl_log.num_entries); global_ddl_log.num_entries);
const_var= FN_LEN; const_var= global_ddl_log.name_len;
int4store(&global_ddl_log.file_entry_buf[DDL_LOG_NAME_LEN_POS], int4store(&file_entry_buf[DDL_LOG_NAME_LEN_POS],
(ulong) const_var); (ulong) const_var);
const_var= IO_SIZE; const_var= global_ddl_log.io_size;
int4store(&global_ddl_log.file_entry_buf[DDL_LOG_IO_SIZE_POS], int4store(&file_entry_buf[DDL_LOG_IO_SIZE_POS],
(ulong) const_var); (ulong) const_var);
if (write_ddl_log_file_entry(0UL)) if (write_ddl_log_file_entry(file_entry_buf, 0UL, DDL_LOG_HEADER_SIZE))
{ {
sql_print_error("Error writing ddl log header"); sql_print_error("Error writing ddl log header");
DBUG_RETURN(TRUE); DBUG_RETURN(TRUE);
...@@ -764,17 +769,19 @@ static inline void create_ddl_log_file_name(char *file_name) ...@@ -764,17 +769,19 @@ static inline void create_ddl_log_file_name(char *file_name)
static uint read_ddl_log_header() static uint read_ddl_log_header()
{ {
char *file_entry_buf= (char*)global_ddl_log.file_entry_buf; char file_entry_buf[DDL_LOG_HEADER_SIZE];
char file_name[FN_REFLEN]; char file_name[FN_REFLEN];
uint entry_no; uint entry_no;
bool successful_open= FALSE; bool successful_open= FALSE;
DBUG_ENTER("read_ddl_log_header"); DBUG_ENTER("read_ddl_log_header");
DBUG_ASSERT(global_ddl_log.io_size <= IO_SIZE);
create_ddl_log_file_name(file_name); create_ddl_log_file_name(file_name);
if ((global_ddl_log.file_id= my_open(file_name, if ((global_ddl_log.file_id= my_open(file_name,
O_RDWR | O_BINARY, MYF(0))) >= 0) O_RDWR | O_BINARY, MYF(0))) >= 0)
{ {
if (read_ddl_log_file_entry(0UL)) if (read_ddl_log_file_entry((uchar *) file_entry_buf, 0UL,
DDL_LOG_HEADER_SIZE))
{ {
/* Write message into error log */ /* Write message into error log */
sql_print_error("Failed to read ddl log file in recovery"); sql_print_error("Failed to read ddl log file in recovery");
...@@ -787,8 +794,6 @@ static uint read_ddl_log_header() ...@@ -787,8 +794,6 @@ static uint read_ddl_log_header()
entry_no= uint4korr(&file_entry_buf[DDL_LOG_NUM_ENTRY_POS]); entry_no= uint4korr(&file_entry_buf[DDL_LOG_NUM_ENTRY_POS]);
global_ddl_log.name_len= uint4korr(&file_entry_buf[DDL_LOG_NAME_LEN_POS]); global_ddl_log.name_len= uint4korr(&file_entry_buf[DDL_LOG_NAME_LEN_POS]);
global_ddl_log.io_size= uint4korr(&file_entry_buf[DDL_LOG_IO_SIZE_POS]); global_ddl_log.io_size= uint4korr(&file_entry_buf[DDL_LOG_IO_SIZE_POS]);
DBUG_ASSERT(global_ddl_log.io_size <=
sizeof(global_ddl_log.file_entry_buf));
} }
else else
{ {
...@@ -803,30 +808,21 @@ static uint read_ddl_log_header() ...@@ -803,30 +808,21 @@ static uint read_ddl_log_header()
} }
/* /**
Read a ddl log entry Set ddl log entry struct from buffer
SYNOPSIS @param read_entry Entry number
read_ddl_log_entry() @param file_entry_buf Buffer to use
read_entry Number of entry to read @param ddl_log_entry Entry to be set
out:entry_info Information from entry
RETURN VALUES @note Pointers in ddl_log_entry will point into file_entry_buf!
TRUE Error
FALSE Success
DESCRIPTION
Read a specified entry in the ddl log
*/ */
bool read_ddl_log_entry(uint read_entry, DDL_LOG_ENTRY *ddl_log_entry) static void set_ddl_log_entry_from_buf(uint read_entry,
uchar *file_entry_buf,
DDL_LOG_ENTRY *ddl_log_entry)
{ {
char *file_entry_buf= (char*)&global_ddl_log.file_entry_buf;
uint inx; uint inx;
uchar single_char; uchar single_char;
DBUG_ENTER("read_ddl_log_entry");
if (read_ddl_log_file_entry(read_entry))
{
DBUG_RETURN(TRUE);
}
ddl_log_entry->entry_pos= read_entry; ddl_log_entry->entry_pos= read_entry;
single_char= file_entry_buf[DDL_LOG_ENTRY_TYPE_POS]; single_char= file_entry_buf[DDL_LOG_ENTRY_TYPE_POS];
ddl_log_entry->entry_type= (enum ddl_log_entry_code)single_char; ddl_log_entry->entry_type= (enum ddl_log_entry_code)single_char;
...@@ -834,14 +830,13 @@ bool read_ddl_log_entry(uint read_entry, DDL_LOG_ENTRY *ddl_log_entry) ...@@ -834,14 +830,13 @@ bool read_ddl_log_entry(uint read_entry, DDL_LOG_ENTRY *ddl_log_entry)
ddl_log_entry->action_type= (enum ddl_log_action_code)single_char; ddl_log_entry->action_type= (enum ddl_log_action_code)single_char;
ddl_log_entry->phase= file_entry_buf[DDL_LOG_PHASE_POS]; ddl_log_entry->phase= file_entry_buf[DDL_LOG_PHASE_POS];
ddl_log_entry->next_entry= uint4korr(&file_entry_buf[DDL_LOG_NEXT_ENTRY_POS]); ddl_log_entry->next_entry= uint4korr(&file_entry_buf[DDL_LOG_NEXT_ENTRY_POS]);
ddl_log_entry->name= &file_entry_buf[DDL_LOG_NAME_POS]; ddl_log_entry->name= (char*) &file_entry_buf[DDL_LOG_NAME_POS];
inx= DDL_LOG_NAME_POS + global_ddl_log.name_len; inx= DDL_LOG_NAME_POS + global_ddl_log.name_len;
ddl_log_entry->from_name= &file_entry_buf[inx]; ddl_log_entry->from_name= (char*) &file_entry_buf[inx];
inx+= global_ddl_log.name_len; inx+= global_ddl_log.name_len;
ddl_log_entry->handler_name= &file_entry_buf[inx]; ddl_log_entry->handler_name= (char*) &file_entry_buf[inx];
DBUG_RETURN(FALSE);
} }
/* /*
Initialise ddl log Initialise ddl log
...@@ -1044,6 +1039,7 @@ static bool get_free_ddl_log_entry(DDL_LOG_MEMORY_ENTRY **active_entry, ...@@ -1044,6 +1039,7 @@ static bool get_free_ddl_log_entry(DDL_LOG_MEMORY_ENTRY **active_entry,
DDL_LOG_MEMORY_ENTRY *first_used= global_ddl_log.first_used; DDL_LOG_MEMORY_ENTRY *first_used= global_ddl_log.first_used;
DBUG_ENTER("get_free_ddl_log_entry"); DBUG_ENTER("get_free_ddl_log_entry");
safe_mutex_assert_owner(&LOCK_gdl);
if (global_ddl_log.first_free == NULL) if (global_ddl_log.first_free == NULL)
{ {
if (!(used_entry= (DDL_LOG_MEMORY_ENTRY*)my_malloc( if (!(used_entry= (DDL_LOG_MEMORY_ENTRY*)my_malloc(
...@@ -1100,34 +1096,35 @@ bool write_ddl_log_entry(DDL_LOG_ENTRY *ddl_log_entry, ...@@ -1100,34 +1096,35 @@ bool write_ddl_log_entry(DDL_LOG_ENTRY *ddl_log_entry,
DDL_LOG_MEMORY_ENTRY **active_entry) DDL_LOG_MEMORY_ENTRY **active_entry)
{ {
bool error, write_header; bool error, write_header;
char file_entry_buf[IO_SIZE];
DBUG_ENTER("write_ddl_log_entry"); DBUG_ENTER("write_ddl_log_entry");
if (init_ddl_log()) if (init_ddl_log())
{ {
DBUG_RETURN(TRUE); DBUG_RETURN(TRUE);
} }
global_ddl_log.file_entry_buf[DDL_LOG_ENTRY_TYPE_POS]= file_entry_buf[DDL_LOG_ENTRY_TYPE_POS]=
(char)DDL_LOG_ENTRY_CODE; (char)DDL_LOG_ENTRY_CODE;
global_ddl_log.file_entry_buf[DDL_LOG_ACTION_TYPE_POS]= file_entry_buf[DDL_LOG_ACTION_TYPE_POS]=
(char)ddl_log_entry->action_type; (char)ddl_log_entry->action_type;
global_ddl_log.file_entry_buf[DDL_LOG_PHASE_POS]= 0; file_entry_buf[DDL_LOG_PHASE_POS]= 0;
int4store(&global_ddl_log.file_entry_buf[DDL_LOG_NEXT_ENTRY_POS], int4store(&file_entry_buf[DDL_LOG_NEXT_ENTRY_POS],
ddl_log_entry->next_entry); ddl_log_entry->next_entry);
DBUG_ASSERT(strlen(ddl_log_entry->name) < FN_LEN); DBUG_ASSERT(strlen(ddl_log_entry->name) < global_ddl_log.name_len);
strmake(&global_ddl_log.file_entry_buf[DDL_LOG_NAME_POS], strmake(&file_entry_buf[DDL_LOG_NAME_POS], ddl_log_entry->name,
ddl_log_entry->name, FN_LEN - 1); global_ddl_log.name_len - 1);
if (ddl_log_entry->action_type == DDL_LOG_RENAME_ACTION || if (ddl_log_entry->action_type == DDL_LOG_RENAME_ACTION ||
ddl_log_entry->action_type == DDL_LOG_REPLACE_ACTION) ddl_log_entry->action_type == DDL_LOG_REPLACE_ACTION)
{ {
DBUG_ASSERT(strlen(ddl_log_entry->from_name) < FN_LEN); DBUG_ASSERT(strlen(ddl_log_entry->from_name) < global_ddl_log.name_len);
strmake(&global_ddl_log.file_entry_buf[DDL_LOG_NAME_POS + FN_LEN], strmake(&file_entry_buf[DDL_LOG_NAME_POS + global_ddl_log.name_len],
ddl_log_entry->from_name, FN_LEN - 1); ddl_log_entry->from_name, global_ddl_log.name_len - 1);
} }
else else
global_ddl_log.file_entry_buf[DDL_LOG_NAME_POS + FN_LEN]= 0; file_entry_buf[DDL_LOG_NAME_POS + global_ddl_log.name_len]= 0;
DBUG_ASSERT(strlen(ddl_log_entry->handler_name) < FN_LEN); DBUG_ASSERT(strlen(ddl_log_entry->handler_name) < global_ddl_log.name_len);
strmake(&global_ddl_log.file_entry_buf[DDL_LOG_NAME_POS + (2*FN_LEN)], strmake(&file_entry_buf[DDL_LOG_NAME_POS + (2*global_ddl_log.name_len)],
ddl_log_entry->handler_name, FN_LEN - 1); ddl_log_entry->handler_name, global_ddl_log.name_len - 1);
if (get_free_ddl_log_entry(active_entry, &write_header)) if (get_free_ddl_log_entry(active_entry, &write_header))
{ {
DBUG_RETURN(TRUE); DBUG_RETURN(TRUE);
...@@ -1135,14 +1132,15 @@ bool write_ddl_log_entry(DDL_LOG_ENTRY *ddl_log_entry, ...@@ -1135,14 +1132,15 @@ bool write_ddl_log_entry(DDL_LOG_ENTRY *ddl_log_entry,
error= FALSE; error= FALSE;
DBUG_PRINT("ddl_log", DBUG_PRINT("ddl_log",
("write type %c next %u name '%s' from_name '%s' handler '%s'", ("write type %c next %u name '%s' from_name '%s' handler '%s'",
(char) global_ddl_log.file_entry_buf[DDL_LOG_ACTION_TYPE_POS], (char) file_entry_buf[DDL_LOG_ACTION_TYPE_POS],
ddl_log_entry->next_entry, ddl_log_entry->next_entry,
(char*) &global_ddl_log.file_entry_buf[DDL_LOG_NAME_POS], (char*) &file_entry_buf[DDL_LOG_NAME_POS],
(char*) &global_ddl_log.file_entry_buf[DDL_LOG_NAME_POS (char*) &file_entry_buf[DDL_LOG_NAME_POS +
+ FN_LEN], global_ddl_log.name_len],
(char*) &global_ddl_log.file_entry_buf[DDL_LOG_NAME_POS (char*) &file_entry_buf[DDL_LOG_NAME_POS +
+ (2*FN_LEN)])); (2*global_ddl_log.name_len)]));
if (write_ddl_log_file_entry((*active_entry)->entry_pos)) if (write_ddl_log_file_entry((uchar*) file_entry_buf,
(*active_entry)->entry_pos, IO_SIZE))
{ {
error= TRUE; error= TRUE;
sql_print_error("Failed to write entry_no = %u", sql_print_error("Failed to write entry_no = %u",
...@@ -1192,7 +1190,7 @@ bool write_execute_ddl_log_entry(uint first_entry, ...@@ -1192,7 +1190,7 @@ bool write_execute_ddl_log_entry(uint first_entry,
DDL_LOG_MEMORY_ENTRY **active_entry) DDL_LOG_MEMORY_ENTRY **active_entry)
{ {
bool write_header= FALSE; bool write_header= FALSE;
char *file_entry_buf= (char*)global_ddl_log.file_entry_buf; char file_entry_buf[IO_SIZE];
DBUG_ENTER("write_execute_ddl_log_entry"); DBUG_ENTER("write_execute_ddl_log_entry");
if (init_ddl_log()) if (init_ddl_log())
...@@ -1216,8 +1214,8 @@ bool write_execute_ddl_log_entry(uint first_entry, ...@@ -1216,8 +1214,8 @@ bool write_execute_ddl_log_entry(uint first_entry,
file_entry_buf[DDL_LOG_PHASE_POS]= 0; file_entry_buf[DDL_LOG_PHASE_POS]= 0;
int4store(&file_entry_buf[DDL_LOG_NEXT_ENTRY_POS], first_entry); int4store(&file_entry_buf[DDL_LOG_NEXT_ENTRY_POS], first_entry);
file_entry_buf[DDL_LOG_NAME_POS]= 0; file_entry_buf[DDL_LOG_NAME_POS]= 0;
file_entry_buf[DDL_LOG_NAME_POS + FN_LEN]= 0; file_entry_buf[DDL_LOG_NAME_POS + global_ddl_log.name_len]= 0;
file_entry_buf[DDL_LOG_NAME_POS + 2*FN_LEN]= 0; file_entry_buf[DDL_LOG_NAME_POS + 2*global_ddl_log.name_len]= 0;
if (!(*active_entry)) if (!(*active_entry))
{ {
if (get_free_ddl_log_entry(active_entry, &write_header)) if (get_free_ddl_log_entry(active_entry, &write_header))
...@@ -1225,7 +1223,9 @@ bool write_execute_ddl_log_entry(uint first_entry, ...@@ -1225,7 +1223,9 @@ bool write_execute_ddl_log_entry(uint first_entry,
DBUG_RETURN(TRUE); DBUG_RETURN(TRUE);
} }
} }
if (write_ddl_log_file_entry((*active_entry)->entry_pos)) if (write_ddl_log_file_entry((uchar*) file_entry_buf,
(*active_entry)->entry_pos,
IO_SIZE))
{ {
sql_print_error("Error writing execute entry in ddl log"); sql_print_error("Error writing execute entry in ddl log");
release_ddl_log_memory_entry(*active_entry); release_ddl_log_memory_entry(*active_entry);
...@@ -1270,10 +1270,16 @@ bool write_execute_ddl_log_entry(uint first_entry, ...@@ -1270,10 +1270,16 @@ bool write_execute_ddl_log_entry(uint first_entry,
bool deactivate_ddl_log_entry(uint entry_no) bool deactivate_ddl_log_entry(uint entry_no)
{ {
char *file_entry_buf= (char*)global_ddl_log.file_entry_buf; uchar file_entry_buf[DDL_LOG_NAME_POS];
DBUG_ENTER("deactivate_ddl_log_entry"); DBUG_ENTER("deactivate_ddl_log_entry");
if (!read_ddl_log_file_entry(entry_no))
/*
Only need to read and write the first bytes of the entry, where
ENTRY_TYPE, ACTION_TYPE and PHASE reside. Using DDL_LOG_NAME_POS
to include all info except for the names.
*/
if (!read_ddl_log_file_entry(file_entry_buf, entry_no, DDL_LOG_NAME_POS))
{ {
if (file_entry_buf[DDL_LOG_ENTRY_TYPE_POS] == DDL_LOG_ENTRY_CODE) if (file_entry_buf[DDL_LOG_ENTRY_TYPE_POS] == DDL_LOG_ENTRY_CODE)
{ {
...@@ -1291,7 +1297,7 @@ bool deactivate_ddl_log_entry(uint entry_no) ...@@ -1291,7 +1297,7 @@ bool deactivate_ddl_log_entry(uint entry_no)
{ {
DBUG_ASSERT(0); DBUG_ASSERT(0);
} }
if (write_ddl_log_file_entry(entry_no)) if (write_ddl_log_file_entry(file_entry_buf, entry_no, DDL_LOG_NAME_POS))
{ {
sql_print_error("Error in deactivating log entry. Position = %u", sql_print_error("Error in deactivating log entry. Position = %u",
entry_no); entry_no);
...@@ -1352,6 +1358,7 @@ void release_ddl_log_memory_entry(DDL_LOG_MEMORY_ENTRY *log_entry) ...@@ -1352,6 +1358,7 @@ void release_ddl_log_memory_entry(DDL_LOG_MEMORY_ENTRY *log_entry)
DDL_LOG_MEMORY_ENTRY *next_log_entry= log_entry->next_log_entry; DDL_LOG_MEMORY_ENTRY *next_log_entry= log_entry->next_log_entry;
DDL_LOG_MEMORY_ENTRY *prev_log_entry= log_entry->prev_log_entry; DDL_LOG_MEMORY_ENTRY *prev_log_entry= log_entry->prev_log_entry;
DBUG_ENTER("release_ddl_log_memory_entry"); DBUG_ENTER("release_ddl_log_memory_entry");
safe_mutex_assert_owner(&LOCK_gdl);
global_ddl_log.first_free= log_entry; global_ddl_log.first_free= log_entry;
log_entry->next_log_entry= first_free; log_entry->next_log_entry= first_free;
...@@ -1381,18 +1388,20 @@ bool execute_ddl_log_entry(THD *thd, uint first_entry) ...@@ -1381,18 +1388,20 @@ bool execute_ddl_log_entry(THD *thd, uint first_entry)
{ {
DDL_LOG_ENTRY ddl_log_entry; DDL_LOG_ENTRY ddl_log_entry;
uint read_entry= first_entry; uint read_entry= first_entry;
uchar file_entry_buf[IO_SIZE];
DBUG_ENTER("execute_ddl_log_entry"); DBUG_ENTER("execute_ddl_log_entry");
pthread_mutex_lock(&LOCK_gdl); pthread_mutex_lock(&LOCK_gdl);
do do
{ {
if (read_ddl_log_entry(read_entry, &ddl_log_entry)) if (read_ddl_log_file_entry(file_entry_buf, read_entry, IO_SIZE))
{ {
/* Write to error log and continue with next log entry */ /* Write to error log and continue with next log entry */
sql_print_error("Failed to read entry = %u from ddl log", sql_print_error("Failed to read entry = %u from ddl log",
read_entry); read_entry);
break; break;
} }
set_ddl_log_entry_from_buf(read_entry, file_entry_buf, &ddl_log_entry);
DBUG_ASSERT(ddl_log_entry.entry_type == DDL_LOG_ENTRY_CODE || DBUG_ASSERT(ddl_log_entry.entry_type == DDL_LOG_ENTRY_CODE ||
ddl_log_entry.entry_type == DDL_IGNORE_LOG_ENTRY_CODE); ddl_log_entry.entry_type == DDL_IGNORE_LOG_ENTRY_CODE);
...@@ -1443,13 +1452,14 @@ void execute_ddl_log_recovery() ...@@ -1443,13 +1452,14 @@ void execute_ddl_log_recovery()
uint num_entries, i; uint num_entries, i;
THD *thd; THD *thd;
DDL_LOG_ENTRY ddl_log_entry; DDL_LOG_ENTRY ddl_log_entry;
uchar *file_entry_buf;
uint io_size;
char file_name[FN_REFLEN]; char file_name[FN_REFLEN];
DBUG_ENTER("execute_ddl_log_recovery"); DBUG_ENTER("execute_ddl_log_recovery");
/* /*
Initialise global_ddl_log struct Initialise global_ddl_log struct
*/ */
bzero(global_ddl_log.file_entry_buf, sizeof(global_ddl_log.file_entry_buf));
global_ddl_log.inited= FALSE; global_ddl_log.inited= FALSE;
global_ddl_log.recovery_phase= TRUE; global_ddl_log.recovery_phase= TRUE;
global_ddl_log.io_size= IO_SIZE; global_ddl_log.io_size= IO_SIZE;
...@@ -1464,14 +1474,23 @@ void execute_ddl_log_recovery() ...@@ -1464,14 +1474,23 @@ void execute_ddl_log_recovery()
thd->store_globals(); thd->store_globals();
num_entries= read_ddl_log_header(); num_entries= read_ddl_log_header();
io_size= global_ddl_log.io_size;
file_entry_buf= (uchar*) my_malloc(io_size, MYF(0));
if (!file_entry_buf)
{
sql_print_error("Failed to allocate buffer for recover ddl log");
DBUG_VOID_RETURN;
}
for (i= 1; i < num_entries + 1; i++) for (i= 1; i < num_entries + 1; i++)
{ {
if (read_ddl_log_entry(i, &ddl_log_entry)) if (read_ddl_log_file_entry(file_entry_buf, i, io_size))
{ {
sql_print_error("Failed to read entry no = %u from ddl log", sql_print_error("Failed to read entry no = %u from ddl log",
i); i);
continue; continue;
} }
set_ddl_log_entry_from_buf(i, file_entry_buf, &ddl_log_entry);
if (ddl_log_entry.entry_type == DDL_LOG_EXECUTE_CODE) if (ddl_log_entry.entry_type == DDL_LOG_EXECUTE_CODE)
{ {
if (execute_ddl_log_entry(thd, ddl_log_entry.next_entry)) if (execute_ddl_log_entry(thd, ddl_log_entry.next_entry))
...@@ -1486,6 +1505,7 @@ void execute_ddl_log_recovery() ...@@ -1486,6 +1505,7 @@ void execute_ddl_log_recovery()
VOID(my_delete(file_name, MYF(0))); VOID(my_delete(file_name, MYF(0)));
global_ddl_log.recovery_phase= FALSE; global_ddl_log.recovery_phase= FALSE;
delete thd; delete thd;
my_free(file_entry_buf, MYF(0));
/* Remember that we don't have a THD */ /* Remember that we don't have a THD */
my_pthread_setspecific_ptr(THR_THD, 0); my_pthread_setspecific_ptr(THR_THD, 0);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
...@@ -1502,14 +1522,16 @@ void execute_ddl_log_recovery() ...@@ -1502,14 +1522,16 @@ void execute_ddl_log_recovery()
void release_ddl_log() void release_ddl_log()
{ {
DDL_LOG_MEMORY_ENTRY *free_list= global_ddl_log.first_free; DDL_LOG_MEMORY_ENTRY *free_list;
DDL_LOG_MEMORY_ENTRY *used_list= global_ddl_log.first_used; DDL_LOG_MEMORY_ENTRY *used_list;
DBUG_ENTER("release_ddl_log"); DBUG_ENTER("release_ddl_log");
if (!global_ddl_log.do_release) if (!global_ddl_log.do_release)
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
pthread_mutex_lock(&LOCK_gdl); pthread_mutex_lock(&LOCK_gdl);
free_list= global_ddl_log.first_free;
used_list= global_ddl_log.first_used;
while (used_list) while (used_list)
{ {
DDL_LOG_MEMORY_ENTRY *tmp= used_list->next_log_entry; DDL_LOG_MEMORY_ENTRY *tmp= used_list->next_log_entry;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment