Commit 0b67af5a authored by Sergei Golubchik's avatar Sergei Golubchik

cleanup

no functional changes here
parent a8a22b7a
...@@ -104,8 +104,8 @@ static int binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr, ...@@ -104,8 +104,8 @@ static int binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr,
Log_event *end_ev, bool all, bool using_stmt, Log_event *end_ev, bool all, bool using_stmt,
bool using_trx, bool is_ro_1pc); bool using_trx, bool is_ro_1pc);
int binlog_online_alter_commit(THD *thd, bool all); static int binlog_online_alter_commit(THD *thd, bool all);
void binlog_online_alter_rollback(THD *thd, bool all); static void binlog_online_alter_rollback(THD *thd, bool all);
static const LEX_CSTRING write_error_msg= static const LEX_CSTRING write_error_msg=
{ STRING_WITH_LEN("error writing to the binary log") }; { STRING_WITH_LEN("error writing to the binary log") };
...@@ -2271,12 +2271,11 @@ binlog_online_alter_cleanup(ilist<binlog_cache_mngr> &list, ...@@ -2271,12 +2271,11 @@ binlog_online_alter_cleanup(ilist<binlog_cache_mngr> &list,
auto it= list.begin(); auto it= list.begin();
while (it != list.end()) while (it != list.end())
{ {
auto &cache= *it; auto &cache= *it++;
it++;
list.remove(cache);
cache.~binlog_cache_mngr(); cache.~binlog_cache_mngr();
my_free(&cache); my_free(&cache);
} }
list.clear();
DBUG_ASSERT(list.empty()); DBUG_ASSERT(list.empty());
} }
} }
...@@ -2649,18 +2648,13 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv) ...@@ -2649,18 +2648,13 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
Write ROLLBACK TO SAVEPOINT to the binlog cache if we have updated some Write ROLLBACK TO SAVEPOINT to the binlog cache if we have updated some
non-transactional table. Otherwise, truncate the binlog cache starting non-transactional table. Otherwise, truncate the binlog cache starting
from the SAVEPOINT command. from the SAVEPOINT command.
*/
#ifdef WITH_WSREP For streaming replication, we must replicate savepoint rollback so that
/* for streaming replication, we must replicate savepoint rollback so that slaves can maintain SR transactions
slaves can maintain SR transactions
*/ */
if (unlikely(thd->wsrep_trx().is_streaming() || if (IF_WSREP(thd->wsrep_trx().is_streaming(),0) ||
(trans_has_updated_non_trans_table(thd)) || trans_has_updated_non_trans_table(thd) ||
(thd->variables.option_bits & OPTION_BINLOG_THIS_TRX))) (thd->variables.option_bits & OPTION_BINLOG_THIS_TRX))
#else
if (unlikely(trans_has_updated_non_trans_table(thd) ||
(thd->variables.option_bits & OPTION_BINLOG_THIS_TRX)))
#endif /* WITH_WSREP */
{ {
char buf[1024]; char buf[1024];
String log_query(buf, sizeof(buf), &my_charset_bin); String log_query(buf, sizeof(buf), &my_charset_bin);
...@@ -3801,40 +3795,20 @@ bool MYSQL_BIN_LOG::open_index_file(const char *index_file_name_arg, ...@@ -3801,40 +3795,20 @@ bool MYSQL_BIN_LOG::open_index_file(const char *index_file_name_arg,
} }
bool Event_log::open(const char *log_name, bool Event_log::open(enum cache_type io_cache_type_arg)
const char *new_name, ulong next_file_number,
enum cache_type io_cache_type_arg)
{ {
bool error= false; bool error= init_io_cache(&log_file, -1, LOG_BIN_IO_SIZE, io_cache_type_arg,
if (log_name || new_name) 0, 0, MYF(MY_WME | MY_NABP | MY_WAIT_IF_FULL));
{
error= MYSQL_LOG::open(
#ifdef HAVE_PSI_INTERFACE
0,
#endif
log_name, LOG_NORMAL, new_name, next_file_number, io_cache_type_arg);
}
else
{
#ifdef HAVE_PSI_INTERFACE
/* Keep the key for reopen */
m_log_file_key= 0;
#endif
error= init_io_cache(&log_file, -1, LOG_BIN_IO_SIZE,
io_cache_type_arg, 0, 0,
MYF(MY_WME | MY_NABP | MY_WAIT_IF_FULL));
log_state= LOG_OPENED; log_state= LOG_OPENED;
inited= true; inited= true;
}
if (error) if (error)
return error; return error;
longlong bytes_written= write_description_event( longlong bytes_written= write_description_event(
(enum_binlog_checksum_alg)binlog_checksum_options, (enum_binlog_checksum_alg)binlog_checksum_options,
encrypt_binlog, false, false); encrypt_binlog, true, false);
error= bytes_written < 0; return bytes_written < 0;
return error;
} }
longlong longlong
...@@ -6446,11 +6420,7 @@ Rows_log_event* binlog_get_pending_rows_event(binlog_cache_mngr *cache_mngr, ...@@ -6446,11 +6420,7 @@ Rows_log_event* binlog_get_pending_rows_event(binlog_cache_mngr *cache_mngr,
bool use_trans_cache) bool use_trans_cache)
{ {
DBUG_ASSERT(cache_mngr); DBUG_ASSERT(cache_mngr);
Rows_log_event* rows= NULL; return cache_mngr->get_binlog_cache_data(use_trans_cache)->pending();
if (cache_mngr)
rows= cache_mngr->get_binlog_cache_data(use_trans_cache)->pending();
return rows;
} }
binlog_cache_data* binlog_get_cache_data(binlog_cache_mngr *cache_mngr, binlog_cache_data* binlog_get_cache_data(binlog_cache_mngr *cache_mngr,
...@@ -6464,14 +6434,14 @@ int binlog_flush_pending_rows_event(THD *thd, bool stmt_end, ...@@ -6464,14 +6434,14 @@ int binlog_flush_pending_rows_event(THD *thd, bool stmt_end,
Event_log *bin_log, Event_log *bin_log,
binlog_cache_data *cache_data) binlog_cache_data *cache_data)
{ {
/*
Mark the event as the last event of a statement if the stmt_end
flag is set.
*/
int error= 0; int error= 0;
auto *pending= cache_data->pending(); auto *pending= cache_data->pending();
if (pending) if (pending)
{ {
/*
Mark the event as the last event of a statement if the stmt_end
flag is set.
*/
if (stmt_end) if (stmt_end)
{ {
pending->set_flags(Rows_log_event::STMT_END_F); pending->set_flags(Rows_log_event::STMT_END_F);
...@@ -6519,10 +6489,9 @@ MYSQL_BIN_LOG::remove_pending_rows_event(THD *thd, binlog_cache_data *cache_data ...@@ -6519,10 +6489,9 @@ MYSQL_BIN_LOG::remove_pending_rows_event(THD *thd, binlog_cache_data *cache_data
otherwise @c false a non-transactional. otherwise @c false a non-transactional.
*/ */
int int
Event_log::flush_and_set_pending_rows_event(THD *thd, Event_log::flush_and_set_pending_rows_event(THD *thd, Rows_log_event* event,
Rows_log_event* event, binlog_cache_data *cache_data,
binlog_cache_data *cache_data, bool is_transactional)
bool is_transactional)
{ {
DBUG_ENTER("MYSQL_BIN_LOG::flush_and_set_pending_rows_event(event)"); DBUG_ENTER("MYSQL_BIN_LOG::flush_and_set_pending_rows_event(event)");
DBUG_ASSERT(WSREP_EMULATE_BINLOG(thd) || is_open()); DBUG_ASSERT(WSREP_EMULATE_BINLOG(thd) || is_open());
...@@ -6580,10 +6549,10 @@ Event_log::flush_and_set_pending_rows_event(THD *thd, ...@@ -6580,10 +6549,10 @@ Event_log::flush_and_set_pending_rows_event(THD *thd,
Rows_log_event* Rows_log_event*
Event_log::prepare_pending_rows_event(THD *thd, TABLE* table, Event_log::prepare_pending_rows_event(THD *thd, TABLE* table,
binlog_cache_data *cache_data, binlog_cache_data *cache_data,
uint32 serv_id, size_t needed, uint32 serv_id, size_t needed,
bool is_transactional, bool is_transactional,
Rows_event_factory event_factory) Rows_event_factory event_factory)
{ {
DBUG_ENTER("MYSQL_BIN_LOG::prepare_pending_rows_event"); DBUG_ENTER("MYSQL_BIN_LOG::prepare_pending_rows_event");
/* Pre-conditions */ /* Pre-conditions */
...@@ -7685,7 +7654,8 @@ class CacheWriter: public Log_event_writer ...@@ -7685,7 +7654,8 @@ class CacheWriter: public Log_event_writer
bool first; bool first;
}; };
int cache_copy(IO_CACHE *to, IO_CACHE *from) #ifdef HAVE_REPLICATION
static int cache_copy(IO_CACHE *to, IO_CACHE *from)
{ {
DBUG_ENTER("cache_copy"); DBUG_ENTER("cache_copy");
if (reinit_io_cache(from, READ_CACHE, 0, 0, 0)) if (reinit_io_cache(from, READ_CACHE, 0, 0, 0))
...@@ -7704,10 +7674,11 @@ int cache_copy(IO_CACHE *to, IO_CACHE *from) ...@@ -7704,10 +7674,11 @@ int cache_copy(IO_CACHE *to, IO_CACHE *from)
DBUG_RETURN(0); DBUG_RETURN(0);
} }
#endif
int binlog_online_alter_commit(THD *thd, bool all) static int binlog_online_alter_commit(THD *thd, bool all)
{ {
DBUG_ENTER("online_alter_commit"); DBUG_ENTER("binlog_online_alter_commit");
int error= 0; int error= 0;
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
...@@ -7721,14 +7692,8 @@ int binlog_online_alter_commit(THD *thd, bool all) ...@@ -7721,14 +7692,8 @@ int binlog_online_alter_commit(THD *thd, bool all)
auto *binlog= cache_mngr.share->online_alter_binlog; auto *binlog= cache_mngr.share->online_alter_binlog;
DBUG_ASSERT(binlog); DBUG_ASSERT(binlog);
error= binlog_flush_pending_rows_event(thd, // do not set STMT_END for last event to leave table open in altering thd
/* error= binlog_flush_pending_rows_event(thd, false, true, binlog,
do not set STMT_END for last event
to leave table open in altering thd
*/
false,
true,
binlog,
is_ending_transaction is_ending_transaction
? &cache_mngr.trx_cache ? &cache_mngr.trx_cache
: &cache_mngr.stmt_cache); : &cache_mngr.stmt_cache);
...@@ -7766,7 +7731,7 @@ int binlog_online_alter_commit(THD *thd, bool all) ...@@ -7766,7 +7731,7 @@ int binlog_online_alter_commit(THD *thd, bool all)
DBUG_RETURN(error); DBUG_RETURN(error);
} }
void binlog_online_alter_rollback(THD *thd, bool all) static void binlog_online_alter_rollback(THD *thd, bool all)
{ {
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
bool is_ending_trans= ending_trans(thd, all); bool is_ending_trans= ending_trans(thd, all);
......
...@@ -419,10 +419,7 @@ class Event_log: public MYSQL_LOG ...@@ -419,10 +419,7 @@ class Event_log: public MYSQL_LOG
MY_MUTEX_INIT_SLOW); MY_MUTEX_INIT_SLOW);
} }
bool open( bool open(enum cache_type io_cache_type_arg);
const char *log_name,
const char *new_name, ulong next_file_number,
enum cache_type io_cache_type_arg);
virtual IO_CACHE *get_log_file() { return &log_file; } virtual IO_CACHE *get_log_file() { return &log_file; }
longlong write_description_event(enum_binlog_checksum_alg checksum_alg, longlong write_description_event(enum_binlog_checksum_alg checksum_alg,
...@@ -445,7 +442,7 @@ class Event_log: public MYSQL_LOG ...@@ -445,7 +442,7 @@ class Event_log: public MYSQL_LOG
TODO should be unnecessary after MDEV-24676 is done TODO should be unnecessary after MDEV-24676 is done
*/ */
class Cache_flip_event_log: public Event_log { class Cache_flip_event_log: public Event_log, public Sql_alloc {
IO_CACHE alt_buf; IO_CACHE alt_buf;
IO_CACHE *current, *alt; IO_CACHE *current, *alt;
public: public:
...@@ -456,7 +453,7 @@ class Cache_flip_event_log: public Event_log { ...@@ -456,7 +453,7 @@ class Cache_flip_event_log: public Event_log {
{ {
log_file.dir= mysql_tmpdir; log_file.dir= mysql_tmpdir;
alt_buf.dir= log_file.dir; alt_buf.dir= log_file.dir;
bool res= Event_log::open(NULL, NULL, 0, io_cache_type_arg); bool res= Event_log::open(io_cache_type_arg);
if (res) if (res)
return res; return res;
......
...@@ -950,8 +950,8 @@ struct rpl_group_info ...@@ -950,8 +950,8 @@ struct rpl_group_info
if (ptr->table == table_arg) if (ptr->table == table_arg)
{ {
auto *rpl_table_list= static_cast<RPL_TABLE_LIST*>(ptr); auto *rpl_table_list= static_cast<RPL_TABLE_LIST*>(ptr);
if (rpl_table_list->m_tabledef_valid) DBUG_ASSERT(rpl_table_list->m_tabledef_valid);
*tabledef_var= &rpl_table_list->m_tabledef; *tabledef_var= &rpl_table_list->m_tabledef;
*conv_table_var= rpl_table_list->m_conv_table; *conv_table_var= rpl_table_list->m_conv_table;
*copy= rpl_table_list->m_online_alter_copy_fields; *copy= rpl_table_list->m_online_alter_copy_fields;
*copy_end= rpl_table_list->m_online_alter_copy_fields_end; *copy_end= rpl_table_list->m_online_alter_copy_fields_end;
......
...@@ -2001,7 +2001,7 @@ bool log_drop_table(THD *thd, const LEX_CSTRING *db_name, ...@@ -2001,7 +2001,7 @@ bool log_drop_table(THD *thd, const LEX_CSTRING *db_name,
in the binary log. We log this for non temporary tables, as the slave in the binary log. We log this for non temporary tables, as the slave
may use a filter to ignore queries for a specific database. may use a filter to ignore queries for a specific database.
*/ */
error= thd->binlog_query(THD::STMT_QUERY_TYPE, error= thd->binlog_query(THD::STMT_QUERY_TYPE,
query.ptr(), query.length(), query.ptr(), query.length(),
FALSE, FALSE, temporary_table, 0) > 0; FALSE, FALSE, temporary_table, 0) > 0;
} }
...@@ -10025,8 +10025,7 @@ bool mysql_alter_table(THD *thd, const LEX_CSTRING *new_db, ...@@ -10025,8 +10025,7 @@ bool mysql_alter_table(THD *thd, const LEX_CSTRING *new_db,
has been already processed. has been already processed.
*/ */
table_list->required_type= TABLE_TYPE_NORMAL; table_list->required_type= TABLE_TYPE_NORMAL;
if (alter_info->requested_lock == Alter_info::ALTER_TABLE_LOCK_SHARED if (alter_info->requested_lock == Alter_info::ALTER_TABLE_LOCK_SHARED
|| alter_info->requested_lock == Alter_info::ALTER_TABLE_LOCK_EXCLUSIVE || alter_info->requested_lock == Alter_info::ALTER_TABLE_LOCK_EXCLUSIVE
|| thd->locked_tables_mode == LTM_LOCK_TABLES || thd->locked_tables_mode == LTM_LOCK_TABLES
...@@ -10132,8 +10131,7 @@ bool mysql_alter_table(THD *thd, const LEX_CSTRING *new_db, ...@@ -10132,8 +10131,7 @@ bool mysql_alter_table(THD *thd, const LEX_CSTRING *new_db,
a new one if needed. a new one if needed.
*/ */
table->s->tdc->flushed= 1; // Force close of all instances table->s->tdc->flushed= 1; // Force close of all instances
if (thd->mdl_context.upgrade_shared_lock(mdl_ticket, if (thd->mdl_context.upgrade_shared_lock(mdl_ticket, MDL_EXCLUSIVE,
MDL_EXCLUSIVE,
thd->variables.lock_wait_timeout)) thd->variables.lock_wait_timeout))
DBUG_RETURN(1); DBUG_RETURN(1);
quick_rm_table(thd, table->file->ht, &table_list->db, quick_rm_table(thd, table->file->ht, &table_list->db,
...@@ -10142,8 +10140,7 @@ bool mysql_alter_table(THD *thd, const LEX_CSTRING *new_db, ...@@ -10142,8 +10140,7 @@ bool mysql_alter_table(THD *thd, const LEX_CSTRING *new_db,
goto end_inplace; goto end_inplace;
} }
if (!if_exists && if (!if_exists &&
(table->file->partition_ht()->flags & (table->file->partition_ht()->flags & HTON_TABLE_MAY_NOT_EXIST_ON_SLAVE))
HTON_TABLE_MAY_NOT_EXIST_ON_SLAVE))
{ {
/* /*
Table is a shared table that may not exist on the slave. Table is a shared table that may not exist on the slave.
...@@ -11582,9 +11579,7 @@ static void online_alter_cleanup_binlog(THD *thd, TABLE_SHARE *s) ...@@ -11582,9 +11579,7 @@ static void online_alter_cleanup_binlog(THD *thd, TABLE_SHARE *s)
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
if (!s->online_alter_binlog) if (!s->online_alter_binlog)
return; return;
// s->online_alter_binlog->reset_logs(thd, false, NULL, 0, 0);
s->online_alter_binlog->cleanup(); s->online_alter_binlog->cleanup();
s->online_alter_binlog->~Cache_flip_event_log();
s->online_alter_binlog= NULL; s->online_alter_binlog= NULL;
#endif #endif
} }
...@@ -11631,9 +11626,7 @@ copy_data_between_tables(THD *thd, TABLE *from, TABLE *to, ...@@ -11631,9 +11626,7 @@ copy_data_between_tables(THD *thd, TABLE *from, TABLE *to,
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
if (online) if (online)
{ {
void *buf= alloc_root(thd->mem_root, sizeof (Cache_flip_event_log)); from->s->online_alter_binlog= new (thd->mem_root) Cache_flip_event_log();
from->s->online_alter_binlog= new (buf) Cache_flip_event_log();
if (!from->s->online_alter_binlog) if (!from->s->online_alter_binlog)
DBUG_RETURN(1); DBUG_RETURN(1);
...@@ -11641,8 +11634,6 @@ copy_data_between_tables(THD *thd, TABLE *from, TABLE *to, ...@@ -11641,8 +11634,6 @@ copy_data_between_tables(THD *thd, TABLE *from, TABLE *to,
error= from->s->online_alter_binlog->open(WRITE_CACHE); error= from->s->online_alter_binlog->open(WRITE_CACHE);
DBUG_ASSERT(!error);
if (!error) if (!error)
{ {
/* /*
...@@ -11988,10 +11979,8 @@ copy_data_between_tables(THD *thd, TABLE *from, TABLE *to, ...@@ -11988,10 +11979,8 @@ copy_data_between_tables(THD *thd, TABLE *from, TABLE *to,
to->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); to->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
if (likely(online && error < 0)) if (online && error < 0)
{ {
Ha_trx_info *trx_info_save= thd->transaction->all.ha_list;
thd->transaction->all.ha_list = NULL;
thd_progress_next_stage(thd); thd_progress_next_stage(thd);
Table_map_log_event table_event(thd, from, from->s->table_map_id, Table_map_log_event table_event(thd, from, from->s->table_map_id,
from->file->has_transactions()); from->file->has_transactions());
...@@ -12028,10 +12017,8 @@ copy_data_between_tables(THD *thd, TABLE *from, TABLE *to, ...@@ -12028,10 +12017,8 @@ copy_data_between_tables(THD *thd, TABLE *from, TABLE *to,
thd_progress_next_stage(thd); thd_progress_next_stage(thd);
error= online_alter_read_from_binlog(thd, &rgi, binlog); error= online_alter_read_from_binlog(thd, &rgi, binlog);
} }
thd->transaction->all.ha_list = trx_info_save;
} }
else if (unlikely(online)) // error was on copy stage else if (online) // error was on copy stage
{ {
/* /*
We need to issue a barrier to clean up gracefully. We need to issue a barrier to clean up gracefully.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment