Commit ad8b22f1 authored by Dave Gosselin's avatar Dave Gosselin

MDEV-34687: Attempt to warn on unexpected binlog file modification

Make an effort to emit a warning if we detect that an outside force
has modified the binlog.  This warning will not halt binlog writes.

We rely on file modification times to detect unexpected writes to
the binlog file.  This best-effort approach isn't fool-proof because
we cannot write to the binlog and get both the prior and current
modification times as an atomic operation.

There are four cases in which unexpected modification could occur:
 1. The time from when the we open the file until the first write.
 2. The time between the first and next writes (or between writes
    k and k+1)
 3. The time between the last write and when we close the file.
 4. In the case of 0 writes, the time from when we open the
    file until we close the file.

MariaDB writes binlogs with the help of the IO_CACHE, buffering
application writes together in memory, then later writing them
through to disk.  We need to see through the IO_CACHE to know when
file modifications occur so that we can compare the last known
file modification time to the most recent modification time.
However, not every user of IO_CACHE cares for this protection and
the overhead of extra stat system calls.  To encapsulate this new
behavior to binlogs, we add four new callbacks to the IO_CACHE
which in the default case do nothing.  These new callbacks are
triggered on certain file events:  file open, pre-write, post-write,
and file close.  We extend the IO_CACHE with a new type that holds
a copy of struct stat, containing the last file modification time,
keeping the new field(s) encapsulated away from the original IO_CACHE.
In the case of binlogs, we use instances of this new type wherever
we previously used IO_CACHE and we set the four new callbacks to invoke
the file modification time checks.
parent b68c1000
...@@ -460,6 +460,22 @@ typedef struct st_io_cache /* Used when caching files */ ...@@ -460,6 +460,22 @@ typedef struct st_io_cache /* Used when caching files */
somewhere else somewhere else
*/ */
size_t alloced_buffer; size_t alloced_buffer;
/*
The following four on_*_callback functions are invoked on the file
backing this cache (if configured) when:
*/
/* 1. we open it */
void (*on_open_callback)(struct st_io_cache *);
/* 2. just before we write through the cache to the file */
void (*on_pre_write_callback)(struct st_io_cache *);
/* 3. immediately after we write through the cache to the file*/
void (*on_post_write_callback)(struct st_io_cache *);
/* 4. we close it */
void (*on_close_callback)(struct st_io_cache *);
} IO_CACHE; } IO_CACHE;
typedef int (*qsort2_cmp)(const void *, const void *, const void *); typedef int (*qsort2_cmp)(const void *, const void *, const void *);
......
# MDEV-34687: Attempt to warn on unexpected binlog file modification
call mtr.add_suppression("The binlog file was written unexpectedly by another thread or process and may be corrupt");
FOUND 1 /The binlog file was written unexpectedly/ in mysqld.1.err
# MDEV-34687: Attempt to warn on unexpected binlog file modification
set timestamp=1000000000;
CREATE TABLE t1(word VARCHAR(20));
call mtr.add_suppression("The binlog file was written unexpectedly by another thread or process and may be corrupt");
SELECT SLEEP(5);
SLEEP(5)
0
NOT FOUND /The binlog file was written unexpectedly/ in mysqld.1.err
DROP TABLE t1;
--echo
--echo # MDEV-34687: Attempt to warn on unexpected binlog file modification
--echo
--source include/have_binlog_format_statement.inc
--let MYSQLD_DATADIR= `select @@datadir;`
--exec touch $MYSQLD_DATADIR/master-bin.000001
call mtr.add_suppression("The binlog file was written unexpectedly by another thread or process and may be corrupt");
--let SEARCH_FILE= $MYSQLTEST_VARDIR/log/mysqld.1.err
--let SEARCH_PATTERN=The binlog file was written unexpectedly
--source include/search_pattern_in_file.inc
--echo
--echo # MDEV-34687: Attempt to warn on unexpected binlog file modification
--echo
--source include/have_binlog_format_statement.inc
--let MYSQLD_DATADIR= `select @@datadir;`
set timestamp=1000000000;
CREATE TABLE t1(word VARCHAR(20));
--exec touch $MYSQLD_DATADIR/master-bin.000001
call mtr.add_suppression("The binlog file was written unexpectedly by another thread or process and may be corrupt");
SELECT SLEEP(5);
--let SEARCH_FILE= $MYSQLTEST_VARDIR/log/mysqld.1.err
--let SEARCH_PATTERN=The binlog file was written unexpectedly
--source include/search_pattern_in_file.inc
DROP TABLE t1;
...@@ -63,6 +63,7 @@ my_bool real_open_cached_file(IO_CACHE *cache) ...@@ -63,6 +63,7 @@ my_bool real_open_cached_file(IO_CACHE *cache)
O_BINARY, MYF(MY_WME | MY_TEMPORARY))) >= 0) O_BINARY, MYF(MY_WME | MY_TEMPORARY))) >= 0)
{ {
error=0; error=0;
cache->on_open_callback(cache);
} }
DBUG_RETURN(error); DBUG_RETURN(error);
} }
...@@ -74,6 +75,8 @@ void close_cached_file(IO_CACHE *cache) ...@@ -74,6 +75,8 @@ void close_cached_file(IO_CACHE *cache)
if (my_b_inited(cache)) if (my_b_inited(cache))
{ {
File file=cache->file; File file=cache->file;
if (file >= 0)
cache->on_close_callback(cache);
cache->file= -1; /* Don't flush data */ cache->file= -1; /* Don't flush data */
(void) end_io_cache(cache); (void) end_io_cache(cache);
if (file >= 0) if (file >= 0)
......
...@@ -73,11 +73,22 @@ int (*_my_b_encr_read)(IO_CACHE *info,uchar *Buffer,size_t Count)= 0; ...@@ -73,11 +73,22 @@ int (*_my_b_encr_read)(IO_CACHE *info,uchar *Buffer,size_t Count)= 0;
int (*_my_b_encr_write)(IO_CACHE *info,const uchar *Buffer,size_t Count)= 0; int (*_my_b_encr_write)(IO_CACHE *info,const uchar *Buffer,size_t Count)= 0;
/*
This function intentionally does nothing and is analogous to a virtual
method whose default implementation is empty.
*/
void nop_on_operation_callback(struct st_io_cache* io_cache)
{
}
static void static void
init_functions(IO_CACHE* info) init_functions(IO_CACHE* info)
{ {
enum cache_type type= info->type; enum cache_type type= info->type;
info->on_open_callback= nop_on_operation_callback;
info->on_pre_write_callback= nop_on_operation_callback;
info->on_post_write_callback= nop_on_operation_callback;
info->on_close_callback= nop_on_operation_callback;
info->read_function = 0; /* Force a core if used */ info->read_function = 0; /* Force a core if used */
info->write_function = 0; /* Force a core if used */ info->write_function = 0; /* Force a core if used */
switch (type) { switch (type) {
...@@ -1500,6 +1511,7 @@ int _my_b_get(IO_CACHE *info) ...@@ -1500,6 +1511,7 @@ int _my_b_get(IO_CACHE *info)
int _my_b_cache_write(IO_CACHE *info, const uchar *Buffer, size_t Count) int _my_b_cache_write(IO_CACHE *info, const uchar *Buffer, size_t Count)
{ {
size_t write_result= 0;
if (Buffer != info->write_buffer) if (Buffer != info->write_buffer)
{ {
Count= IO_ROUND_DN(Count); Count= IO_ROUND_DN(Count);
...@@ -1523,7 +1535,11 @@ int _my_b_cache_write(IO_CACHE *info, const uchar *Buffer, size_t Count) ...@@ -1523,7 +1535,11 @@ int _my_b_cache_write(IO_CACHE *info, const uchar *Buffer, size_t Count)
} }
info->seek_not_done=0; info->seek_not_done=0;
} }
if (mysql_file_write(info->file, Buffer, Count, info->myflags | MY_NABP)) info->on_pre_write_callback(info);
write_result= mysql_file_write(info->file, Buffer,
Count, info->myflags | MY_NABP);
info->on_post_write_callback(info);
if (write_result)
return info->error= -1; return info->error= -1;
info->pos_in_file+= Count; info->pos_in_file+= Count;
...@@ -1565,7 +1581,7 @@ static int _my_b_cache_write_r(IO_CACHE *info, const uchar *Buffer, size_t Count ...@@ -1565,7 +1581,7 @@ static int _my_b_cache_write_r(IO_CACHE *info, const uchar *Buffer, size_t Count
int my_b_append(IO_CACHE *info, const uchar *Buffer, size_t Count) int my_b_append(IO_CACHE *info, const uchar *Buffer, size_t Count)
{ {
size_t rest_length,length; size_t rest_length, length, write_result;
MEM_CHECK_DEFINED(Buffer, Count); MEM_CHECK_DEFINED(Buffer, Count);
...@@ -1592,7 +1608,11 @@ int my_b_append(IO_CACHE *info, const uchar *Buffer, size_t Count) ...@@ -1592,7 +1608,11 @@ int my_b_append(IO_CACHE *info, const uchar *Buffer, size_t Count)
if (Count >= IO_SIZE) if (Count >= IO_SIZE)
{ /* Fill first intern buffer */ { /* Fill first intern buffer */
length= IO_ROUND_DN(Count); length= IO_ROUND_DN(Count);
if (mysql_file_write(info->file,Buffer, length, info->myflags | MY_NABP)) info->on_pre_write_callback(info);
write_result= mysql_file_write(info->file,Buffer,
length, info->myflags | MY_NABP);
info->on_post_write_callback(info);
if (write_result)
{ {
unlock_append_buffer(info); unlock_append_buffer(info);
return info->error= -1; return info->error= -1;
...@@ -1633,7 +1653,7 @@ int my_b_safe_write(IO_CACHE *info, const uchar *Buffer, size_t Count) ...@@ -1633,7 +1653,7 @@ int my_b_safe_write(IO_CACHE *info, const uchar *Buffer, size_t Count)
int my_block_write(IO_CACHE *info, const uchar *Buffer, size_t Count, int my_block_write(IO_CACHE *info, const uchar *Buffer, size_t Count,
my_off_t pos) my_off_t pos)
{ {
size_t length; size_t length, pwrite_result;
int error=0; int error=0;
/* /*
...@@ -1646,12 +1666,22 @@ int my_block_write(IO_CACHE *info, const uchar *Buffer, size_t Count, ...@@ -1646,12 +1666,22 @@ int my_block_write(IO_CACHE *info, const uchar *Buffer, size_t Count,
if (pos < info->pos_in_file) if (pos < info->pos_in_file)
{ {
/* Of no overlap, write everything without buffering */ /* Of no overlap, write everything without buffering */
if (pos + Count <= info->pos_in_file) if (pos + Count <= info->pos_in_file) {
return (int)mysql_file_pwrite(info->file, Buffer, Count, pos, info->on_pre_write_callback(info);
pwrite_result= mysql_file_pwrite(info->file, Buffer,
Count, pos,
info->myflags | MY_NABP); info->myflags | MY_NABP);
info->on_post_write_callback(info);
return (int)pwrite_result;
}
/* Write the part of the block that is before buffer */ /* Write the part of the block that is before buffer */
length= (uint) (info->pos_in_file - pos); length= (uint) (info->pos_in_file - pos);
if (mysql_file_pwrite(info->file, Buffer, length, pos, info->myflags | MY_NABP)) info->on_pre_write_callback(info);
pwrite_result= mysql_file_pwrite(info->file, Buffer,
length, pos,
info->myflags | MY_NABP);
info->on_post_write_callback(info);
if (pwrite_result)
info->error= error= -1; info->error= error= -1;
Buffer+=length; Buffer+=length;
pos+= length; pos+= length;
...@@ -1691,7 +1721,7 @@ int my_block_write(IO_CACHE *info, const uchar *Buffer, size_t Count, ...@@ -1691,7 +1721,7 @@ int my_block_write(IO_CACHE *info, const uchar *Buffer, size_t Count,
int my_b_flush_io_cache(IO_CACHE *info, int need_append_buffer_lock) int my_b_flush_io_cache(IO_CACHE *info, int need_append_buffer_lock)
{ {
size_t length; size_t length, write_result;
my_bool append_cache= (info->type == SEQ_READ_APPEND); my_bool append_cache= (info->type == SEQ_READ_APPEND);
DBUG_ENTER("my_b_flush_io_cache"); DBUG_ENTER("my_b_flush_io_cache");
DBUG_PRINT("enter", ("cache: %p", info)); DBUG_PRINT("enter", ("cache: %p", info));
...@@ -1712,8 +1742,11 @@ int my_b_flush_io_cache(IO_CACHE *info, int need_append_buffer_lock) ...@@ -1712,8 +1742,11 @@ int my_b_flush_io_cache(IO_CACHE *info, int need_append_buffer_lock)
{ {
if (append_cache) if (append_cache)
{ {
if (mysql_file_write(info->file, info->write_buffer, length, info->on_pre_write_callback(info);
info->myflags | MY_NABP)) write_result= mysql_file_write(info->file, info->write_buffer, length,
info->myflags | MY_NABP);
info->on_post_write_callback(info);
if (write_result)
{ {
info->error= -1; info->error= -1;
DBUG_RETURN(-1); DBUG_RETURN(-1);
......
...@@ -329,8 +329,11 @@ class binlog_cache_data ...@@ -329,8 +329,11 @@ class binlog_cache_data
truncate(0,1); // Forget what's in cache truncate(0,1); // Forget what's in cache
if (!cache_was_empty) if (!cache_was_empty)
compute_statistics(); compute_statistics();
if (truncate_file) if (truncate_file) {
cache_log.on_pre_write();
my_chsize(cache_log.file, 0, 0, MYF(MY_WME)); my_chsize(cache_log.file, 0, 0, MYF(MY_WME));
cache_log.on_post_write();
}
status= 0; status= 0;
incident= FALSE; incident= FALSE;
...@@ -403,7 +406,7 @@ class binlog_cache_data ...@@ -403,7 +406,7 @@ class binlog_cache_data
/* /*
Cache to store data before copying it to the binary log. Cache to store data before copying it to the binary log.
*/ */
IO_CACHE cache_log; FileEventAwareIOCache cache_log;
private: private:
/* /*
...@@ -483,6 +486,7 @@ class binlog_cache_data ...@@ -483,6 +486,7 @@ class binlog_cache_data
set_pending(0); set_pending(0);
} }
reinit_io_cache(&cache_log, WRITE_CACHE, pos, 0, reset_cache); reinit_io_cache(&cache_log, WRITE_CACHE, pos, 0, reset_cache);
cache_log.on_cache_init();
cache_log.end_of_file= saved_max_binlog_cache_size; cache_log.end_of_file= saved_max_binlog_cache_size;
} }
...@@ -505,23 +509,40 @@ void Log_event_writer::set_incident() ...@@ -505,23 +509,40 @@ void Log_event_writer::set_incident()
class binlog_cache_mngr { class binlog_cache_mngr {
public: public:
binlog_cache_mngr(my_off_t param_max_binlog_stmt_cache_size, static binlog_cache_mngr* create()
my_off_t param_max_binlog_cache_size,
ulong *param_ptr_binlog_stmt_cache_use,
ulong *param_ptr_binlog_stmt_cache_disk_use,
ulong *param_ptr_binlog_cache_use,
ulong *param_ptr_binlog_cache_disk_use)
: last_commit_pos_offset(0), using_xa(FALSE), xa_xid(0)
{ {
stmt_cache.set_binlog_cache_info(param_max_binlog_stmt_cache_size, auto cache_mngr=
param_ptr_binlog_stmt_cache_use, (binlog_cache_mngr*) my_malloc(key_memory_binlog_cache_mngr,
param_ptr_binlog_stmt_cache_disk_use); sizeof(binlog_cache_mngr),
trx_cache.set_binlog_cache_info(param_max_binlog_cache_size, MYF(MY_ZEROFILL));
param_ptr_binlog_cache_use, if (!cache_mngr ||
param_ptr_binlog_cache_disk_use); open_cached_file(&cache_mngr->stmt_cache.cache_log, mysql_tmpdir,
last_commit_pos_file[0]= 0; LOG_PREFIX, (size_t)binlog_stmt_cache_size,
MYF(MY_WME)) ||
open_cached_file(&cache_mngr->trx_cache.cache_log, mysql_tmpdir,
LOG_PREFIX, (size_t)binlog_cache_size,
MYF(MY_WME)))
{
my_free(cache_mngr);
return nullptr;
}
cache_mngr= new (cache_mngr)
binlog_cache_mngr(max_binlog_stmt_cache_size,
max_binlog_cache_size,
&binlog_stmt_cache_use,
&binlog_stmt_cache_disk_use,
&binlog_cache_use,
&binlog_cache_disk_use);
return cache_mngr;
} }
binlog_cache_mngr() = delete;
binlog_cache_mngr(const binlog_cache_mngr&) = delete;
binlog_cache_mngr(binlog_cache_mngr&&) = delete;
binlog_cache_mngr& operator=(const binlog_cache_mngr&) = delete;
binlog_cache_mngr& operator=(binlog_cache_mngr&&) = delete;
void reset(bool do_stmt, bool do_trx) void reset(bool do_stmt, bool do_trx)
{ {
if (do_stmt) if (do_stmt)
...@@ -540,7 +561,7 @@ class binlog_cache_mngr { ...@@ -540,7 +561,7 @@ class binlog_cache_mngr {
return (is_transactional ? &trx_cache : &stmt_cache); return (is_transactional ? &trx_cache : &stmt_cache);
} }
IO_CACHE* get_binlog_cache_log(bool is_transactional) FileEventAwareIOCache* get_binlog_cache_log(bool is_transactional)
{ {
return (is_transactional ? &trx_cache.cache_log : &stmt_cache.cache_log); return (is_transactional ? &trx_cache.cache_log : &stmt_cache.cache_log);
} }
...@@ -575,9 +596,22 @@ class binlog_cache_mngr { ...@@ -575,9 +596,22 @@ class binlog_cache_mngr {
bool delayed_error; bool delayed_error;
private: private:
binlog_cache_mngr(my_off_t param_max_binlog_stmt_cache_size,
binlog_cache_mngr& operator=(const binlog_cache_mngr& info); my_off_t param_max_binlog_cache_size,
binlog_cache_mngr(const binlog_cache_mngr& info); ulong *param_ptr_binlog_stmt_cache_use,
ulong *param_ptr_binlog_stmt_cache_disk_use,
ulong *param_ptr_binlog_cache_use,
ulong *param_ptr_binlog_cache_disk_use)
: last_commit_pos_offset(0), using_xa(FALSE), xa_xid(0)
{
stmt_cache.set_binlog_cache_info(param_max_binlog_stmt_cache_size,
param_ptr_binlog_stmt_cache_use,
param_ptr_binlog_stmt_cache_disk_use);
trx_cache.set_binlog_cache_info(param_max_binlog_cache_size,
param_ptr_binlog_cache_use,
param_ptr_binlog_cache_disk_use);
last_commit_pos_file[0]= 0;
}
}; };
bool LOGGER::is_log_table_enabled(uint log_table_type) bool LOGGER::is_log_table_enabled(uint log_table_type)
...@@ -2902,7 +2936,7 @@ bool MYSQL_LOG::open( ...@@ -2902,7 +2936,7 @@ bool MYSQL_LOG::open(
MYF(MY_WME | MY_NABP | MYF(MY_WME | MY_NABP |
((log_type == LOG_BIN) ? MY_WAIT_IF_FULL : 0)))) ((log_type == LOG_BIN) ? MY_WAIT_IF_FULL : 0))))
goto err; goto err;
log_file.on_cache_init();
if (log_type == LOG_NORMAL) if (log_type == LOG_NORMAL)
{ {
char *end; char *end;
...@@ -3613,7 +3647,7 @@ bool MYSQL_BIN_LOG::open_index_file(const char *index_file_name_arg, ...@@ -3613,7 +3647,7 @@ bool MYSQL_BIN_LOG::open_index_file(const char *index_file_name_arg,
mysql_file_close(index_file_nr, MYF(0)); mysql_file_close(index_file_nr, MYF(0));
return TRUE; return TRUE;
} }
index_file.on_cache_init();
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
/* /*
Sync the index by purging any binary log file that is not registered. Sync the index by purging any binary log file that is not registered.
...@@ -3974,6 +4008,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name, ...@@ -3974,6 +4008,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
DBUG_ASSERT(my_b_inited(&index_file) != 0); DBUG_ASSERT(my_b_inited(&index_file) != 0);
reinit_io_cache(&index_file, WRITE_CACHE, reinit_io_cache(&index_file, WRITE_CACHE,
my_b_filelength(&index_file), 0, 0); my_b_filelength(&index_file), 0, 0);
index_file.on_cache_init();
/* /*
As this is a new log file, we write the file name to the index As this is a new log file, we write the file name to the index
file. As every time we write to the index file, we sync it. file. As every time we write to the index file, we sync it.
...@@ -4099,7 +4134,7 @@ int MYSQL_BIN_LOG::raw_get_current_log(LOG_INFO* linfo) ...@@ -4099,7 +4134,7 @@ int MYSQL_BIN_LOG::raw_get_current_log(LOG_INFO* linfo)
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
static bool copy_up_file_and_fill(IO_CACHE *index_file, my_off_t offset) static bool copy_up_file_and_fill(FileEventAwareIOCache *index_file, my_off_t offset)
{ {
int bytes_read; int bytes_read;
my_off_t init_offset= offset; my_off_t init_offset= offset;
...@@ -4128,6 +4163,7 @@ static bool copy_up_file_and_fill(IO_CACHE *index_file, my_off_t offset) ...@@ -4128,6 +4163,7 @@ static bool copy_up_file_and_fill(IO_CACHE *index_file, my_off_t offset)
/* Reset data in old index cache */ /* Reset data in old index cache */
reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 1); reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 1);
index_file->on_cache_init();
DBUG_RETURN(0); DBUG_RETURN(0);
err: err:
...@@ -4192,6 +4228,7 @@ int MYSQL_BIN_LOG::find_log_pos(LOG_INFO *linfo, const char *log_name, ...@@ -4192,6 +4228,7 @@ int MYSQL_BIN_LOG::find_log_pos(LOG_INFO *linfo, const char *log_name,
/* As the file is flushed, we can't get an error here */ /* As the file is flushed, we can't get an error here */
(void) reinit_io_cache(&index_file, READ_CACHE, (my_off_t) 0, 0, 0); (void) reinit_io_cache(&index_file, READ_CACHE, (my_off_t) 0, 0, 0);
index_file.on_cache_init();
for (;;) for (;;)
{ {
...@@ -4276,6 +4313,7 @@ int MYSQL_BIN_LOG::find_next_log(LOG_INFO* linfo, bool need_lock) ...@@ -4276,6 +4313,7 @@ int MYSQL_BIN_LOG::find_next_log(LOG_INFO* linfo, bool need_lock)
/* As the file is flushed, we can't get an error here */ /* As the file is flushed, we can't get an error here */
(void) reinit_io_cache(&index_file, READ_CACHE, linfo->index_file_offset, 0, (void) reinit_io_cache(&index_file, READ_CACHE, linfo->index_file_offset, 0,
0); 0);
index_file.on_cache_init();
linfo->index_file_start_offset= linfo->index_file_offset; linfo->index_file_start_offset= linfo->index_file_offset;
if ((length=my_b_gets(&index_file, fname, FN_REFLEN)) <= 1) if ((length=my_b_gets(&index_file, fname, FN_REFLEN)) <= 1)
...@@ -4886,6 +4924,7 @@ int MYSQL_BIN_LOG::open_purge_index_file(bool destroy) ...@@ -4886,6 +4924,7 @@ int MYSQL_BIN_LOG::open_purge_index_file(bool destroy)
sql_print_error("MYSQL_BIN_LOG::open_purge_index_file failed to open register " sql_print_error("MYSQL_BIN_LOG::open_purge_index_file failed to open register "
" file."); " file.");
} }
purge_index_file.on_cache_init();
} }
DBUG_RETURN(error); DBUG_RETURN(error);
} }
...@@ -4962,7 +5001,7 @@ int MYSQL_BIN_LOG::purge_index_entry(THD *thd, ulonglong *reclaimed_space, ...@@ -4962,7 +5001,7 @@ int MYSQL_BIN_LOG::purge_index_entry(THD *thd, ulonglong *reclaimed_space,
"for read"); "for read");
goto err; goto err;
} }
purge_index_file.on_cache_init();
for (;;) for (;;)
{ {
size_t length; size_t length;
...@@ -5800,6 +5839,100 @@ bool stmt_has_updated_non_trans_table(const THD* thd) ...@@ -5800,6 +5839,100 @@ bool stmt_has_updated_non_trans_table(const THD* thd)
return (thd->transaction->stmt.modified_non_trans_table); return (thd->transaction->stmt.modified_non_trans_table);
} }
void fecache_on_open_callback(struct st_io_cache* io_cache)
{
auto pcache= static_cast<FileEventAwareIOCache*>(io_cache);
pcache->on_open();
}
void fecache_on_pre_write_callback(struct st_io_cache* io_cache)
{
auto pcache= static_cast<FileEventAwareIOCache*>(io_cache);
pcache->on_pre_write();
}
void fecache_on_post_write_callback(struct st_io_cache* io_cache)
{
auto pcache = static_cast<FileEventAwareIOCache*>(io_cache);
pcache->on_post_write();
}
void fecache_on_close_callback(struct st_io_cache* io_cache)
{
auto pcache= static_cast<FileEventAwareIOCache*>(io_cache);
pcache->on_close();
}
static void get_tv_times(const struct stat& st, time_t& sec, time_t& nsec)
{
#ifdef __APPLE__
sec= st.st_mtimespec.tv_sec;
nsec= st.st_mtimespec.tv_nsec;
#else
sec= st.st_mtim.tv_sec;
nsec= st.st_mtim.tv_nsec;
#endif
}
void FileEventAwareIOCache::warn_on_mismatch() const
{
struct stat present_time;
fstat(file, &present_time);
time_t cs_sec, cs_nsec, pt_sec, pt_nsec;
get_tv_times(cached_stat, cs_sec, cs_nsec);
get_tv_times(present_time, pt_sec, pt_nsec);
if ((cs_sec != pt_sec) || (cs_nsec != pt_nsec)) {
sql_print_warning("The binlog file was written unexpectedly by another "
"thread or process and may be corrupt");
}
}
void FileEventAwareIOCache::initialize_callbacks()
{
on_open_callback= fecache_on_open_callback;
on_pre_write_callback= fecache_on_pre_write_callback;
on_post_write_callback= fecache_on_post_write_callback;
on_close_callback= fecache_on_close_callback;
}
void FileEventAwareIOCache::refresh_cached_file_stat()
{
fstat(file, &cached_stat);
}
FileEventAwareIOCache::FileEventAwareIOCache()
{
initialize_callbacks();
}
void FileEventAwareIOCache::on_cache_init()
{
initialize_callbacks();
refresh_cached_file_stat();
}
void FileEventAwareIOCache::on_open()
{
refresh_cached_file_stat();
}
void FileEventAwareIOCache::on_pre_write()
{
warn_on_mismatch();
}
void FileEventAwareIOCache::on_post_write()
{
refresh_cached_file_stat();
}
void FileEventAwareIOCache::on_close()
{
warn_on_mismatch();
}
/* /*
These functions are placed in this file since they need access to These functions are placed in this file since they need access to
binlog_hton, which has internal linkage. binlog_hton, which has internal linkage.
...@@ -5814,26 +5947,10 @@ binlog_cache_mngr *THD::binlog_setup_trx_data() ...@@ -5814,26 +5947,10 @@ binlog_cache_mngr *THD::binlog_setup_trx_data()
if (cache_mngr) if (cache_mngr)
DBUG_RETURN(cache_mngr); // Already set up DBUG_RETURN(cache_mngr); // Already set up
cache_mngr= (binlog_cache_mngr*) my_malloc(key_memory_binlog_cache_mngr, cache_mngr= binlog_cache_mngr::create();
sizeof(binlog_cache_mngr), MYF(MY_ZEROFILL)); if (cache_mngr)
if (!cache_mngr ||
open_cached_file(&cache_mngr->stmt_cache.cache_log, mysql_tmpdir,
LOG_PREFIX, (size_t)binlog_stmt_cache_size, MYF(MY_WME)) ||
open_cached_file(&cache_mngr->trx_cache.cache_log, mysql_tmpdir,
LOG_PREFIX, (size_t)binlog_cache_size, MYF(MY_WME)))
{
my_free(cache_mngr);
DBUG_RETURN(0); // Didn't manage to set it up
}
thd_set_ha_data(this, binlog_hton, cache_mngr); thd_set_ha_data(this, binlog_hton, cache_mngr);
cache_mngr= new (cache_mngr)
binlog_cache_mngr(max_binlog_stmt_cache_size,
max_binlog_cache_size,
&binlog_stmt_cache_use,
&binlog_stmt_cache_disk_use,
&binlog_cache_use,
&binlog_cache_disk_use);
DBUG_RETURN(cache_mngr); DBUG_RETURN(cache_mngr);
} }
...@@ -6408,7 +6525,7 @@ int ...@@ -6408,7 +6525,7 @@ int
MYSQL_BIN_LOG::write_state_to_file() MYSQL_BIN_LOG::write_state_to_file()
{ {
File file_no; File file_no;
IO_CACHE cache; FileEventAwareIOCache cache;
char buf[FN_REFLEN]; char buf[FN_REFLEN];
int err; int err;
bool opened= false; bool opened= false;
...@@ -6427,6 +6544,7 @@ MYSQL_BIN_LOG::write_state_to_file() ...@@ -6427,6 +6544,7 @@ MYSQL_BIN_LOG::write_state_to_file()
if ((err= init_io_cache(&cache, file_no, IO_SIZE, WRITE_CACHE, 0, 0, if ((err= init_io_cache(&cache, file_no, IO_SIZE, WRITE_CACHE, 0, 0,
MYF(MY_WME|MY_WAIT_IF_FULL)))) MYF(MY_WME|MY_WAIT_IF_FULL))))
goto err; goto err;
cache.on_cache_init();
log_inited= true; log_inited= true;
if ((err= rpl_global_gtid_binlog_state.write_to_iocache(&cache))) if ((err= rpl_global_gtid_binlog_state.write_to_iocache(&cache)))
goto err; goto err;
...@@ -6461,7 +6579,7 @@ int ...@@ -6461,7 +6579,7 @@ int
MYSQL_BIN_LOG::read_state_from_file() MYSQL_BIN_LOG::read_state_from_file()
{ {
File file_no; File file_no;
IO_CACHE cache; FileEventAwareIOCache cache;
char buf[FN_REFLEN]; char buf[FN_REFLEN];
int err; int err;
bool opened= false; bool opened= false;
...@@ -6492,6 +6610,7 @@ MYSQL_BIN_LOG::read_state_from_file() ...@@ -6492,6 +6610,7 @@ MYSQL_BIN_LOG::read_state_from_file()
if ((err= init_io_cache(&cache, file_no, IO_SIZE, READ_CACHE, 0, 0, if ((err= init_io_cache(&cache, file_no, IO_SIZE, READ_CACHE, 0, 0,
MYF(MY_WME|MY_WAIT_IF_FULL)))) MYF(MY_WME|MY_WAIT_IF_FULL))))
goto err; goto err;
cache.on_cache_init();
log_inited= true; log_inited= true;
if ((err= rpl_global_gtid_binlog_state.read_from_iocache(&cache))) if ((err= rpl_global_gtid_binlog_state.read_from_iocache(&cache)))
goto err; goto err;
...@@ -6683,7 +6802,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) ...@@ -6683,7 +6802,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
DBUG_RETURN(0); DBUG_RETURN(0);
#endif /* HAVE_REPLICATION */ #endif /* HAVE_REPLICATION */
IO_CACHE *file= NULL; FileEventAwareIOCache *file= NULL;
if (direct) if (direct)
{ {
...@@ -7221,7 +7340,7 @@ static int do_delete_gtid_domain(DYNAMIC_ARRAY *domain_drop_lex) ...@@ -7221,7 +7340,7 @@ static int do_delete_gtid_domain(DYNAMIC_ARRAY *domain_drop_lex)
Gtid_list_log_event *glev= NULL; Gtid_list_log_event *glev= NULL;
char buf[FN_REFLEN]; char buf[FN_REFLEN];
File file; File file;
IO_CACHE cache; FileEventAwareIOCache cache;
const char* errmsg= NULL; const char* errmsg= NULL;
char errbuf[MYSQL_ERRMSG_SIZE]= {0}; char errbuf[MYSQL_ERRMSG_SIZE]= {0};
...@@ -7236,6 +7355,7 @@ static int do_delete_gtid_domain(DYNAMIC_ARRAY *domain_drop_lex) ...@@ -7236,6 +7355,7 @@ static int do_delete_gtid_domain(DYNAMIC_ARRAY *domain_drop_lex)
bzero((char*) &cache, sizeof(cache)); bzero((char*) &cache, sizeof(cache));
if ((file= open_binlog(&cache, buf, &errmsg)) == (File) -1) if ((file= open_binlog(&cache, buf, &errmsg)) == (File) -1)
goto end; goto end;
cache.on_cache_init();
errmsg= get_gtid_list_event(&cache, &glev); errmsg= get_gtid_list_event(&cache, &glev);
end_io_cache(&cache); end_io_cache(&cache);
mysql_file_close(file, MYF(MY_WME)); mysql_file_close(file, MYF(MY_WME));
...@@ -7377,13 +7497,14 @@ class CacheWriter: public Log_event_writer ...@@ -7377,13 +7497,14 @@ class CacheWriter: public Log_event_writer
events prior to fill in the binlog cache. events prior to fill in the binlog cache.
*/ */
int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) int MYSQL_BIN_LOG::write_cache(THD *thd, FileEventAwareIOCache *cache)
{ {
DBUG_ENTER("MYSQL_BIN_LOG::write_cache"); DBUG_ENTER("MYSQL_BIN_LOG::write_cache");
mysql_mutex_assert_owner(&LOCK_log); mysql_mutex_assert_owner(&LOCK_log);
if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
DBUG_RETURN(ER_ERROR_ON_WRITE); DBUG_RETURN(ER_ERROR_ON_WRITE);
cache->on_cache_init();
size_t length= my_b_bytes_in_cache(cache), group, carry, hdr_offs; size_t length= my_b_bytes_in_cache(cache), group, carry, hdr_offs;
size_t val; size_t val;
size_t end_log_pos_inc= 0; // each event processed adds BINLOG_CHECKSUM_LEN 2 t size_t end_log_pos_inc= 0; // each event processed adds BINLOG_CHECKSUM_LEN 2 t
...@@ -10621,7 +10742,7 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name, ...@@ -10621,7 +10742,7 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name,
char binlog_checkpoint_name[FN_REFLEN]; char binlog_checkpoint_name[FN_REFLEN];
bool binlog_checkpoint_found; bool binlog_checkpoint_found;
bool first_round; bool first_round;
IO_CACHE log; FileEventAwareIOCache log;
File file= -1; File file= -1;
const char *errmsg; const char *errmsg;
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
...@@ -10794,6 +10915,7 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name, ...@@ -10794,6 +10915,7 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name,
sql_print_error("%s", errmsg); sql_print_error("%s", errmsg);
goto err2; goto err2;
} }
log.on_cache_init();
/* /*
We do not need to read the Format_description_log_event of other binlog We do not need to read the Format_description_log_event of other binlog
files. It is not possible for a binlog checkpoint to span multiple files. It is not possible for a binlog checkpoint to span multiple
...@@ -10844,7 +10966,7 @@ MYSQL_BIN_LOG::do_binlog_recovery(const char *opt_name, bool do_xa_recovery) ...@@ -10844,7 +10966,7 @@ MYSQL_BIN_LOG::do_binlog_recovery(const char *opt_name, bool do_xa_recovery)
{ {
LOG_INFO log_info; LOG_INFO log_info;
const char *errmsg; const char *errmsg;
IO_CACHE log; FileEventAwareIOCache log;
File file; File file;
Log_event *ev= 0; Log_event *ev= 0;
Format_description_log_event fdle(BINLOG_VERSION); Format_description_log_event fdle(BINLOG_VERSION);
...@@ -10895,6 +11017,7 @@ MYSQL_BIN_LOG::do_binlog_recovery(const char *opt_name, bool do_xa_recovery) ...@@ -10895,6 +11017,7 @@ MYSQL_BIN_LOG::do_binlog_recovery(const char *opt_name, bool do_xa_recovery)
sql_print_error("%s", errmsg); sql_print_error("%s", errmsg);
return 1; return 1;
} }
log.on_cache_init();
if ((ev= Log_event::read_log_event(&log, &fdle, if ((ev= Log_event::read_log_event(&log, &fdle,
opt_master_verify_checksum)) && opt_master_verify_checksum)) &&
......
...@@ -33,6 +33,35 @@ bool ending_trans(THD* thd, const bool all); ...@@ -33,6 +33,35 @@ bool ending_trans(THD* thd, const bool all);
bool ending_single_stmt_trans(THD* thd, const bool all); bool ending_single_stmt_trans(THD* thd, const bool all);
bool trans_has_updated_non_trans_table(const THD* thd); bool trans_has_updated_non_trans_table(const THD* thd);
bool stmt_has_updated_non_trans_table(const THD* thd); bool stmt_has_updated_non_trans_table(const THD* thd);
void fecache_on_open_callback(struct st_io_cache*);
void fecache_on_pre_write_callback(struct st_io_cache*);
void fecache_on_post_write_callback(struct st_io_cache*);
void fecache_on_close_callback(struct st_io_cache*);
/*
Extends IO_CACHE such that we can inspect and cache the
file modification time on each file event. Should we
inspect it and it not match the cached time (saved from
when we last inspected it) then emit a warning.
*/
class FileEventAwareIOCache : public IO_CACHE
{
private:
struct stat cached_stat;
void warn_on_mismatch() const;
void initialize_callbacks();
void refresh_cached_file_stat();
public:
FileEventAwareIOCache();
void on_cache_init();
void on_open();
void on_pre_write();
void on_post_write();
void on_close();
};
/* /*
Transaction Coordinator log - a base abstract class Transaction Coordinator log - a base abstract class
...@@ -333,7 +362,7 @@ class MYSQL_LOG ...@@ -333,7 +362,7 @@ class MYSQL_LOG
char log_file_name[FN_REFLEN]; char log_file_name[FN_REFLEN];
char time_buff[20], db[NAME_LEN + 1]; char time_buff[20], db[NAME_LEN + 1];
bool write_error, inited; bool write_error, inited;
IO_CACHE log_file; FileEventAwareIOCache log_file;
enum_log_type log_type; enum_log_type log_type;
volatile enum_log_state log_state; volatile enum_log_state log_state;
enum cache_type io_cache_type; enum cache_type io_cache_type;
...@@ -507,7 +536,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG ...@@ -507,7 +536,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
mysql_cond_t COND_xid_list; mysql_cond_t COND_xid_list;
mysql_cond_t COND_relay_log_updated, COND_bin_log_updated; mysql_cond_t COND_relay_log_updated, COND_bin_log_updated;
ulonglong bytes_written; ulonglong bytes_written;
IO_CACHE index_file; FileEventAwareIOCache index_file;
char index_file_name[FN_REFLEN]; char index_file_name[FN_REFLEN];
/* /*
purge_file is a temp file used in purge_logs so that the index file purge_file is a temp file used in purge_logs so that the index file
...@@ -515,7 +544,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG ...@@ -515,7 +544,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
recovery. It is created on demand the first time purge_logs is called recovery. It is created on demand the first time purge_logs is called
and then reused for subsequent calls. It is cleaned up in cleanup(). and then reused for subsequent calls. It is cleaned up in cleanup().
*/ */
IO_CACHE purge_index_file; FileEventAwareIOCache purge_index_file;
char purge_index_file_name[FN_REFLEN]; char purge_index_file_name[FN_REFLEN];
/* /*
The max size before rotation (usable only if log_type == LOG_BIN: binary The max size before rotation (usable only if log_type == LOG_BIN: binary
...@@ -830,7 +859,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG ...@@ -830,7 +859,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
bool write_incident_already_locked(THD *thd); bool write_incident_already_locked(THD *thd);
bool write_incident(THD *thd); bool write_incident(THD *thd);
void write_binlog_checkpoint_event_already_locked(const char *name, uint len); void write_binlog_checkpoint_event_already_locked(const char *name, uint len);
int write_cache(THD *thd, IO_CACHE *cache); int write_cache(THD *thd, FileEventAwareIOCache *cache);
void set_write_error(THD *thd, bool is_transactional); void set_write_error(THD *thd, bool is_transactional);
bool check_write_error(THD *thd); bool check_write_error(THD *thd);
bool check_cache_error(THD *thd, binlog_cache_data *cache_data); bool check_cache_error(THD *thd, binlog_cache_data *cache_data);
......
...@@ -932,6 +932,13 @@ class Log_event_writer ...@@ -932,6 +932,13 @@ class Log_event_writer
/* Log_event_writer is updated when ctx is set */ /* Log_event_writer is updated when ctx is set */
int (Log_event_writer::*encrypt_or_write)(const uchar *pos, size_t len); int (Log_event_writer::*encrypt_or_write)(const uchar *pos, size_t len);
public: public:
Log_event_writer() = delete;
Log_event_writer(const Log_event_writer&) = delete;
Log_event_writer(Log_event_writer&&) = delete;
Log_event_writer& operator=(const Log_event_writer&) = delete;
Log_event_writer& operator=(Log_event_writer&&) = delete;
virtual ~Log_event_writer() = default;
ulonglong bytes_written; ulonglong bytes_written;
void *ctx; ///< Encryption context or 0 if no encryption is needed void *ctx; ///< Encryption context or 0 if no encryption is needed
uint checksum_len; uint checksum_len;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment