Commit 332f41aa authored by Sergei Golubchik's avatar Sergei Golubchik

don't copy stmt IO_CACHE to trx IO_CACHE at the stmt end

instead use only one (trx) IO_CACHE and truncate it if the
statement is rolled back.

don't use binlog_cache_mngr to accumulate the data,
use binlog_cache_data instead.

(binlog_cache_data owns one IO_CACHE, binlog_cache_mngr owns
two binlog_cache_data's, trx and stmt).
parent 3099a756
......@@ -7255,18 +7255,13 @@ bool handler::check_table_binlog_row_based_internal()
static int binlog_log_row_online_alter(TABLE* table,
const uchar *before_record,
const uchar *after_record,
Log_func *log_func,
bool has_trans)
Log_func *log_func)
{
THD *thd= table->in_use;
if (!table->online_alter_cache)
{
auto *cache_mngr= online_alter_binlog_get_cache_mngr(thd, table);
// Use transaction cache directly, if it is not multi-transaction mode
table->online_alter_cache= binlog_get_cache_data(cache_mngr,
!thd->in_multi_stmt_transaction_mode());
table->online_alter_cache= online_alter_binlog_get_cache_data(thd, table);
trans_register_ha(thd, false, binlog_hton, 0);
if (thd->in_multi_stmt_transaction_mode())
trans_register_ha(thd, true, binlog_hton, 0);
......@@ -7278,7 +7273,7 @@ static int binlog_log_row_online_alter(TABLE* table,
table->rpl_write_set= &table->s->all_set;
int error= (*log_func)(thd, table, table->s->online_alter_binlog,
table->online_alter_cache, has_trans,
table->online_alter_cache, true,
before_record, after_record);
table->rpl_write_set= old_rpl_write_set;
......@@ -7337,7 +7332,7 @@ int handler::binlog_log_row(const uchar *before_record,
#ifdef HAVE_REPLICATION
if (unlikely(!error && table->s->online_alter_binlog))
error= binlog_log_row_online_alter(table, before_record, after_record,
log_func, row_logging_has_trans);
log_func);
#endif // HAVE_REPLICATION
DBUG_RETURN(error);
......
......@@ -276,10 +276,10 @@ void make_default_log_name(char **out, const char* log_ext, bool once)
Helper classes to store non-transactional and transactional data
before copying it to the binary log.
*/
class binlog_cache_data
class binlog_cache_data: public ilist_node<>
{
public:
binlog_cache_data(): m_pending(0), status(0),
binlog_cache_data(): share(0), m_pending(0), status(0),
before_stmt_pos(MY_OFF_T_UNDEF),
incident(FALSE),
saved_max_binlog_cache_size(0), ptr_binlog_cache_use(0),
......@@ -362,6 +362,11 @@ class binlog_cache_data
before_stmt_pos= pos;
}
void store_prev_position()
{
before_stmt_pos= my_b_write_tell(&cache_log);
}
void restore_prev_position()
{
truncate(before_stmt_pos);
......@@ -414,6 +419,7 @@ class binlog_cache_data
*/
IO_CACHE cache_log;
TABLE_SHARE *share; // for online alter table
private:
/*
Pending binrows event. This event is the event where the rows are currently
......@@ -512,16 +518,15 @@ void Log_event_writer::set_incident()
}
class binlog_cache_mngr: public ilist_node<> {
class binlog_cache_mngr {
public:
binlog_cache_mngr(my_off_t param_max_binlog_stmt_cache_size,
my_off_t param_max_binlog_cache_size,
ulong *param_ptr_binlog_stmt_cache_use,
ulong *param_ptr_binlog_stmt_cache_disk_use,
ulong *param_ptr_binlog_cache_use,
ulong *param_ptr_binlog_cache_disk_use,
TABLE_SHARE *share)
: last_commit_pos_offset(0), using_xa(FALSE), xa_xid(0), share(share)
ulong *param_ptr_binlog_cache_disk_use)
: last_commit_pos_offset(0), using_xa(FALSE), xa_xid(0)
{
stmt_cache.set_binlog_cache_info(param_max_binlog_stmt_cache_size,
param_ptr_binlog_stmt_cache_use,
......@@ -587,8 +592,6 @@ class binlog_cache_mngr: public ilist_node<> {
//Will be reset when gtid is written into binlog
uchar gtid_flags3;
decltype (rpl_gtid::seq_no) sa_seq_no;
TABLE_SHARE *share;
private:
binlog_cache_mngr& operator=(const binlog_cache_mngr& info);
......@@ -2259,21 +2262,16 @@ static int binlog_commit_flush_xa_prepare(THD *thd, bool all,
#ifdef HAVE_REPLICATION
static void
binlog_online_alter_cleanup(ilist<binlog_cache_mngr> &list,
bool ending_trans)
binlog_online_alter_cleanup(ilist<binlog_cache_data> &list, bool ending_trans)
{
for (auto &cache: list)
{
cache.reset(true, ending_trans);
}
if (ending_trans)
{
auto it= list.begin();
while (it != list.end())
{
auto &cache= *it++;
cache.~binlog_cache_mngr();
my_free(&cache);
cache.reset();
delete &cache;
}
list.clear();
DBUG_ASSERT(list.empty());
......@@ -5980,7 +5978,7 @@ bool stmt_has_updated_non_trans_table(const THD* thd)
binlog_hton, which has internal linkage.
*/
binlog_cache_mngr *binlog_setup_cache_mngr(TABLE_SHARE *share)
binlog_cache_mngr *binlog_setup_cache_mngr()
{
auto *cache_mngr= (binlog_cache_mngr*) my_malloc(key_memory_binlog_cache_mngr,
sizeof(binlog_cache_mngr),
......@@ -6001,8 +5999,7 @@ binlog_cache_mngr *binlog_setup_cache_mngr(TABLE_SHARE *share)
&binlog_stmt_cache_use,
&binlog_stmt_cache_disk_use,
&binlog_cache_use,
&binlog_cache_disk_use,
share);
&binlog_cache_disk_use);
return cache_mngr;
}
......@@ -6015,7 +6012,7 @@ binlog_cache_mngr *THD::binlog_setup_trx_data()
if (!cache_mngr)
{
cache_mngr= binlog_setup_cache_mngr(NULL);
cache_mngr= binlog_setup_cache_mngr();
thd_set_ha_data(this, binlog_hton, cache_mngr);
}
......@@ -6381,9 +6378,27 @@ bool MYSQL_BIN_LOG::write_table_map(THD *thd, TABLE *table, bool with_annotate)
}
binlog_cache_mngr *online_alter_binlog_get_cache_mngr(THD *thd, TABLE *table)
#ifdef HAVE_REPLICATION
binlog_cache_data *binlog_setup_cache_data(TABLE_SHARE *share)
{
ilist<binlog_cache_mngr> &list= thd->online_alter_cache_list;
auto cache= new binlog_cache_data();
if (!cache || open_cached_file(&cache->cache_log, mysql_tmpdir,
LOG_PREFIX, (size_t)binlog_cache_size, MYF(MY_WME)))
{
delete cache;
return NULL;
}
cache->share= share;
cache->set_binlog_cache_info(max_binlog_cache_size, &binlog_cache_use,
&binlog_cache_disk_use);
cache->store_prev_position();
return cache;
}
binlog_cache_data *online_alter_binlog_get_cache_data(THD *thd, TABLE *table)
{
ilist<binlog_cache_data> &list= thd->online_alter_cache_list;
/* we assume it's very rare to have more than one online ALTER running */
for (auto &cache: list)
......@@ -6392,11 +6407,12 @@ binlog_cache_mngr *online_alter_binlog_get_cache_mngr(THD *thd, TABLE *table)
return &cache;
}
auto *new_cache_mngr= binlog_setup_cache_mngr(table->s);
list.push_back(*new_cache_mngr);
auto *new_cache_data= binlog_setup_cache_data(table->s);
list.push_back(*new_cache_data);
return new_cache_mngr;
return new_cache_data;
}
#endif
binlog_cache_mngr *THD::binlog_get_cache_mngr() const
{
......@@ -7654,28 +7670,6 @@ class CacheWriter: public Log_event_writer
bool first;
};
#ifdef HAVE_REPLICATION
static int cache_copy(IO_CACHE *to, IO_CACHE *from)
{
DBUG_ENTER("cache_copy");
if (reinit_io_cache(from, READ_CACHE, 0, 0, 0))
DBUG_RETURN(ER_ERROR_ON_WRITE);
size_t bytes_in_cache= my_b_bytes_in_cache(from);
do
{
my_b_write(to, from->read_pos, bytes_in_cache);
from->read_pos += bytes_in_cache;
bytes_in_cache= my_b_fill(from);
if (from->error || to->error)
DBUG_RETURN(ER_ERROR_ON_WRITE);
} while (bytes_in_cache);
DBUG_RETURN(0);
}
#endif
static int binlog_online_alter_commit(THD *thd, bool all)
{
DBUG_ENTER("binlog_online_alter_commit");
......@@ -7687,28 +7681,21 @@ static int binlog_online_alter_commit(THD *thd, bool all)
bool is_ending_transaction= ending_trans(thd, all);
for (auto &cache_mngr: thd->online_alter_cache_list)
for (auto &cache: thd->online_alter_cache_list)
{
auto *binlog= cache_mngr.share->online_alter_binlog;
auto *binlog= cache.share->online_alter_binlog;
DBUG_ASSERT(binlog);
// do not set STMT_END for last event to leave table open in altering thd
error= binlog_flush_pending_rows_event(thd, false, true, binlog,
is_ending_transaction
? &cache_mngr.trx_cache
: &cache_mngr.stmt_cache);
error= binlog_flush_pending_rows_event(thd, false, true, binlog, &cache);
if (is_ending_transaction)
{
mysql_mutex_lock(binlog->get_log_lock());
error= binlog->write_cache(thd, &cache_mngr.trx_cache.cache_log);
error= binlog->write_cache(thd, &cache.cache_log);
mysql_mutex_unlock(binlog->get_log_lock());
}
else
{
error= cache_copy(&cache_mngr.trx_cache.cache_log,
&cache_mngr.stmt_cache.cache_log);
}
cache.store_prev_position();
if (error)
{
......@@ -7736,12 +7723,16 @@ static void binlog_online_alter_rollback(THD *thd, bool all)
#ifdef HAVE_REPLICATION
bool is_ending_trans= ending_trans(thd, all);
if (!is_ending_trans)
for (auto &cache: thd->online_alter_cache_list)
cache.restore_prev_position();
/*
This is a crucial moment that we are running through
thd->online_alter_cache_list, and not through thd->open_tables to cleanup
stmt cache, though both have it. The reason is that the tables can be closed
to that moment in case of an error.
The same reason applies to the fact we don't store cache_mngr in the table
The same reason applies to the fact we don't store cache in the table
itself -- because it can happen to be not existing.
Still in case if tables are left opened
*/
......
......@@ -1320,7 +1320,7 @@ int binlog_flush_pending_rows_event(THD *thd, bool stmt_end,
binlog_cache_data *cache_data);
Rows_log_event* binlog_get_pending_rows_event(binlog_cache_mngr *cache_mngr,
bool use_trans_cache);
binlog_cache_mngr *online_alter_binlog_get_cache_mngr(THD *thd, TABLE *table);
binlog_cache_data *online_alter_binlog_get_cache_data(THD *thd, TABLE *table);
binlog_cache_data* binlog_get_cache_data(binlog_cache_mngr *cache_mngr,
bool use_trans_cache);
......
......@@ -5556,7 +5556,7 @@ class THD: public THD_count, /* this must be first */
Item *sp_prepare_func_item(Item **it_addr, uint cols);
bool sp_eval_expr(Field *result_field, Item **expr_item_ptr);
ilist<binlog_cache_mngr> online_alter_cache_list;
ilist<binlog_cache_data> online_alter_cache_list;
bool sql_parser(LEX *old_lex, LEX *lex,
char *str, uint str_len, bool stmt_prepare_mode);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment