Commit cb521746 authored by Nikita Malyavin's avatar Nikita Malyavin

online alter: extract the source to a separate file

Move all the functions dedicated to online alter to a newly created
online_alter.cc.

With that, make many functions static and simplify the static functions
naming.

Also, rename binlog_log_row_online_alter -> online_alter_log_row.
parent 830bdfcc
......@@ -218,8 +218,9 @@ MYSQL_ADD_PLUGIN(partition ha_partition.cc STORAGE_ENGINE DEFAULT STATIC_ONLY
RECOMPILE_FOR_EMBEDDED)
MYSQL_ADD_PLUGIN(sql_sequence ha_sequence.cc STORAGE_ENGINE MANDATORY STATIC_ONLY
RECOMPILE_FOR_EMBEDDED)
MYSQL_ADD_PLUGIN(online_alter_log log.cc STORAGE_ENGINE MANDATORY STATIC_ONLY
NOT_EMBEDDED)
MYSQL_ADD_PLUGIN(online_alter_log online_alter.cc STORAGE_ENGINE MANDATORY
STATIC_ONLY NOT_EMBEDDED)
ADD_LIBRARY(sql STATIC ${SQL_SOURCE})
MAYBE_DISABLE_IPO(sql)
......@@ -227,6 +228,7 @@ DTRACE_INSTRUMENT(sql)
TARGET_LINK_LIBRARIES(sql
mysys mysys_ssl dbug strings vio pcre2-8
tpool
online_alter_log
${LIBWRAP} ${LIBCRYPT} ${CMAKE_DL_LIBS} ${CMAKE_THREAD_LIBS_INIT}
${SSL_LIBRARIES}
${LIBSYSTEMD})
......
......@@ -7304,8 +7304,8 @@ int handler::binlog_log_row(const uchar *before_record,
#ifdef HAVE_REPLICATION
if (unlikely(!error && table->s->online_alter_binlog && is_root_handler()))
error= binlog_log_row_online_alter(table, before_record, after_record,
log_func);
error= online_alter_log_row(table, before_record, after_record,
log_func);
#endif // HAVE_REPLICATION
DBUG_RETURN(error);
......
......@@ -1098,8 +1098,6 @@ extern MYSQL_PLUGIN_IMPORT st_plugin_int *hton2plugin[MAX_HA];
struct handlerton;
extern handlerton *online_alter_hton;
#define view_pseudo_hton ((handlerton *)1)
/*
......@@ -1972,9 +1970,7 @@ class Ha_trx_info
bool is_trx_read_write() const
{
DBUG_ASSERT(is_started());
bool result= m_flags & (int) TRX_READ_WRITE;
DBUG_ASSERT(!result || m_ht != online_alter_hton);
return result;
return m_flags & (int) TRX_READ_WRITE;
}
bool is_started() const { return m_ht != NULL; }
/** Mark this transaction read-write if the argument is read-write. */
......
......@@ -59,6 +59,7 @@
#include "sp_rcontext.h"
#include "sp_head.h"
#include "sql_table.h"
#include "log_cache.h"
#include "wsrep_mysqld.h"
#ifdef WITH_WSREP
......@@ -75,9 +76,6 @@
/* max size of the log message */
#define MAX_LOG_BUFFER_SIZE 1024
#define MAX_TIME_SIZE 32
#define MY_OFF_T_UNDEF (~(my_off_t)0UL)
/* Truncate cache log files bigger than this */
#define CACHE_FILE_TRUNC_SIZE 65536
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
......@@ -104,8 +102,6 @@ static int binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr,
Log_event *end_ev, bool all, bool using_stmt,
bool using_trx, bool is_ro_1pc);
static int binlog_online_alter_end_trans(THD *thd, bool all, bool commit);
static const LEX_CSTRING write_error_msg=
{ STRING_WITH_LEN("error writing to the binary log") };
......@@ -275,243 +271,7 @@ void make_default_log_name(char **out, const char* log_ext, bool once)
Helper classes to store non-transactional and transactional data
before copying it to the binary log.
*/
class binlog_cache_data
{
public:
binlog_cache_data(): before_stmt_pos(MY_OFF_T_UNDEF), m_pending(0), status(0),
incident(FALSE), saved_max_binlog_cache_size(0), ptr_binlog_cache_use(0),
ptr_binlog_cache_disk_use(0)
{ }
~binlog_cache_data()
{
DBUG_ASSERT(empty());
close_cached_file(&cache_log);
}
/*
Return 1 if there is no relevant entries in the cache
This is:
- Cache is empty
- There are row or critical (DDL?) events in the cache
The status test is needed to avoid writing entries with only
a table map entry, which would crash in do_apply_event() on the slave
as it assumes that there is always a row entry after a table map.
*/
bool empty() const
{
return (pending() == NULL &&
(my_b_write_tell(&cache_log) == 0 ||
((status & (LOGGED_ROW_EVENT | LOGGED_CRITICAL)) == 0)));
}
Rows_log_event *pending() const
{
return m_pending;
}
void set_pending(Rows_log_event *const pending_arg)
{
m_pending= pending_arg;
}
void set_incident(void)
{
incident= TRUE;
}
bool has_incident(void) const
{
return(incident);
}
void reset()
{
bool cache_was_empty= empty();
bool truncate_file= (cache_log.file != -1 &&
my_b_write_tell(&cache_log) > CACHE_FILE_TRUNC_SIZE);
truncate(0,1); // Forget what's in cache
if (!cache_was_empty)
compute_statistics();
if (truncate_file)
my_chsize(cache_log.file, 0, 0, MYF(MY_WME));
status= 0;
incident= FALSE;
before_stmt_pos= MY_OFF_T_UNDEF;
DBUG_ASSERT(empty());
}
my_off_t get_byte_position() const
{
return my_b_tell(&cache_log);
}
my_off_t get_prev_position() const
{
return(before_stmt_pos);
}
void set_prev_position(my_off_t pos)
{
before_stmt_pos= pos;
}
void restore_prev_position()
{
truncate(before_stmt_pos);
}
void restore_savepoint(my_off_t pos)
{
truncate(pos);
if (pos < before_stmt_pos)
before_stmt_pos= MY_OFF_T_UNDEF;
}
void set_binlog_cache_info(my_off_t param_max_binlog_cache_size,
ulong *param_ptr_binlog_cache_use,
ulong *param_ptr_binlog_cache_disk_use)
{
/*
The assertions guarantee that the set_binlog_cache_info is
called just once and information passed as parameters are
never zero.
This is done while calling the constructor binlog_cache_mngr.
We cannot set information in the constructor binlog_cache_data
because the space for binlog_cache_mngr is allocated through
a placement new.
In the future, we can refactor this and change it to avoid
the set_binlog_info.
*/
DBUG_ASSERT(saved_max_binlog_cache_size == 0);
DBUG_ASSERT(param_max_binlog_cache_size != 0);
DBUG_ASSERT(ptr_binlog_cache_use == 0);
DBUG_ASSERT(param_ptr_binlog_cache_use != 0);
DBUG_ASSERT(ptr_binlog_cache_disk_use == 0);
DBUG_ASSERT(param_ptr_binlog_cache_disk_use != 0);
saved_max_binlog_cache_size= param_max_binlog_cache_size;
ptr_binlog_cache_use= param_ptr_binlog_cache_use;
ptr_binlog_cache_disk_use= param_ptr_binlog_cache_disk_use;
cache_log.end_of_file= saved_max_binlog_cache_size;
}
void add_status(enum_logged_status status_arg)
{
status|= status_arg;
}
/*
Cache to store data before copying it to the binary log.
*/
IO_CACHE cache_log;
protected:
/*
Binlog position before the start of the current statement.
*/
my_off_t before_stmt_pos;
private:
/*
Pending binrows event. This event is the event where the rows are currently
written.
*/
Rows_log_event *m_pending;
/*
Bit flags for what has been writing to cache. Used to
discard logs without any data changes.
see enum_logged_status;
*/
uint32 status;
/*
This indicates that some events did not get into the cache and most likely
it is corrupted.
*/
bool incident;
/**
This function computes binlog cache and disk usage.
*/
void compute_statistics()
{
statistic_increment(*ptr_binlog_cache_use, &LOCK_status);
if (cache_log.disk_writes != 0)
{
#ifdef REAL_STATISTICS
statistic_add(*ptr_binlog_cache_disk_use,
cache_log.disk_writes, &LOCK_status);
#else
statistic_increment(*ptr_binlog_cache_disk_use, &LOCK_status);
#endif
cache_log.disk_writes= 0;
}
}
/*
Stores the values of maximum size of the cache allowed when this cache
is configured. This corresponds to either
. max_binlog_cache_size or max_binlog_stmt_cache_size.
*/
my_off_t saved_max_binlog_cache_size;
/*
Stores a pointer to the status variable that keeps track of the in-memory
cache usage. This corresponds to either
. binlog_cache_use or binlog_stmt_cache_use.
*/
ulong *ptr_binlog_cache_use;
/*
Stores a pointer to the status variable that keeps track of the disk
cache usage. This corresponds to either
. binlog_cache_disk_use or binlog_stmt_cache_disk_use.
*/
ulong *ptr_binlog_cache_disk_use;
/*
It truncates the cache to a certain position. This includes deleting the
pending event.
*/
void truncate(my_off_t pos, bool reset_cache=0)
{
DBUG_PRINT("info", ("truncating to position %lu", (ulong) pos));
cache_log.error=0;
if (pending())
{
delete pending();
set_pending(0);
}
my_bool res= reinit_io_cache(&cache_log, WRITE_CACHE, pos, 0, reset_cache);
DBUG_ASSERT(res == 0);
cache_log.end_of_file= saved_max_binlog_cache_size;
}
binlog_cache_data& operator=(const binlog_cache_data& info);
binlog_cache_data(const binlog_cache_data& info);
};
class online_alter_cache_data: public Sql_alloc, public ilist_node<>,
public binlog_cache_data
{
public:
void store_prev_position()
{
before_stmt_pos= my_b_write_tell(&cache_log);
}
handlerton *hton;
Cache_flip_event_log *sink_log;
SAVEPOINT *sv_list;
};
void Log_event_writer::add_status(enum_logged_status status)
{
......@@ -2266,61 +2026,6 @@ static int binlog_commit_flush_xa_prepare(THD *thd, bool all,
return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, TRUE));
}
#ifdef HAVE_REPLICATION
int binlog_log_row_online_alter(TABLE* table, const uchar *before_record,
const uchar *after_record, Log_func *log_func)
{
THD *thd= table->in_use;
if (!table->online_alter_cache)
{
table->online_alter_cache= online_alter_binlog_get_cache_data(thd, table);
trans_register_ha(thd, false, online_alter_hton, 0);
if (thd->in_multi_stmt_transaction_mode())
trans_register_ha(thd, true, online_alter_hton, 0);
}
// We need to log all columns for the case if alter table changes primary key
DBUG_ASSERT(!before_record || bitmap_is_set_all(table->read_set));
MY_BITMAP *old_rpl_write_set= table->rpl_write_set;
table->rpl_write_set= &table->s->all_set;
table->online_alter_cache->store_prev_position();
int error= (*log_func)(thd, table, table->s->online_alter_binlog,
table->online_alter_cache,
table->file->has_transactions_and_rollback(),
BINLOG_ROW_IMAGE_FULL,
before_record, after_record);
table->rpl_write_set= old_rpl_write_set;
if (unlikely(error))
{
table->online_alter_cache->restore_prev_position();
return HA_ERR_RBR_LOGGING_FAILED;
}
return 0;
}
static void
binlog_online_alter_cleanup(ilist<online_alter_cache_data> &list, bool ending_trans)
{
if (ending_trans)
{
auto it= list.begin();
while (it != list.end())
{
auto &cache= *it++;
cache.sink_log->release();
cache.reset();
delete &cache;
}
list.clear();
DBUG_ASSERT(list.empty());
}
}
#endif // HAVE_REPLICATION
/**
This function is called once after each statement.
......@@ -6433,54 +6138,6 @@ bool MYSQL_BIN_LOG::write_table_map(THD *thd, TABLE *table, bool with_annotate)
}
#ifdef HAVE_REPLICATION
static online_alter_cache_data *
online_alter_binlog_setup_cache_data(MEM_ROOT *root, TABLE_SHARE *share)
{
static ulong online_alter_cache_use= 0, online_alter_cache_disk_use= 0;
auto cache= new (root) online_alter_cache_data();
if (!cache || open_cached_file(&cache->cache_log, mysql_tmpdir,
LOG_PREFIX, (size_t)binlog_cache_size, MYF(MY_WME)))
{
delete cache;
return NULL;
}
share->online_alter_binlog->acquire();
cache->hton= share->db_type();
cache->sink_log= share->online_alter_binlog;
my_off_t binlog_max_size= SIZE_T_MAX; // maximum possible cache size
DBUG_EXECUTE_IF("online_alter_small_cache", binlog_max_size= 4096;);
cache->set_binlog_cache_info(binlog_max_size,
&online_alter_cache_use,
&online_alter_cache_disk_use);
cache->store_prev_position();
return cache;
}
online_alter_cache_data *online_alter_binlog_get_cache_data(THD *thd, TABLE *table)
{
ilist<online_alter_cache_data> &list= thd->online_alter_cache_list;
/* we assume it's very rare to have more than one online ALTER running */
for (auto &cache: list)
{
if (cache.sink_log == table->s->online_alter_binlog)
return &cache;
}
MEM_ROOT *root= &thd->transaction->mem_root;
auto *new_cache_data= online_alter_binlog_setup_cache_data(root, table->s);
list.push_back(*new_cache_data);
return new_cache_data;
}
#endif
binlog_cache_mngr *THD::binlog_get_cache_mngr() const
{
return (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton);
......@@ -7737,129 +7394,6 @@ class CacheWriter: public Log_event_writer
bool first;
};
static int binlog_online_alter_end_trans(THD *thd, bool all, bool commit)
{
DBUG_ENTER("binlog_online_alter_end_trans");
int error= 0;
#ifdef HAVE_REPLICATION
if (thd->online_alter_cache_list.empty())
DBUG_RETURN(0);
bool is_ending_transaction= ending_trans(thd, all);
for (auto &cache: thd->online_alter_cache_list)
{
auto *binlog= cache.sink_log;
DBUG_ASSERT(binlog);
bool non_trans= cache.hton->flags & HTON_NO_ROLLBACK // Aria
|| !cache.hton->rollback;
bool do_commit= (commit && is_ending_transaction) || non_trans;
if (commit || non_trans)
{
// Do not set STMT_END for last event to leave table open in altering thd
error= binlog_flush_pending_rows_event(thd, false, true, binlog, &cache);
}
if (do_commit)
{
/*
If the cache wasn't reinited to write, then it remains empty after
the last write.
*/
if (my_b_bytes_in_cache(&cache.cache_log) && likely(!error))
{
DBUG_ASSERT(cache.cache_log.type != READ_CACHE);
mysql_mutex_lock(binlog->get_log_lock());
error= binlog->write_cache_raw(thd, &cache.cache_log);
mysql_mutex_unlock(binlog->get_log_lock());
}
}
else if (!commit) // rollback
{
DBUG_ASSERT(!non_trans);
cache.restore_prev_position();
}
else
{
DBUG_ASSERT(!is_ending_transaction);
cache.store_prev_position();
}
if (error)
{
my_error(ER_ERROR_ON_WRITE, MYF(ME_ERROR_LOG),
binlog->get_name(), errno);
binlog_online_alter_cleanup(thd->online_alter_cache_list,
is_ending_transaction);
DBUG_RETURN(error);
}
}
binlog_online_alter_cleanup(thd->online_alter_cache_list,
is_ending_transaction);
for (TABLE *table= thd->open_tables; table; table= table->next)
table->online_alter_cache= NULL;
#endif // HAVE_REPLICATION
DBUG_RETURN(error);
}
SAVEPOINT** find_savepoint_in_list(THD *thd, LEX_CSTRING name,
SAVEPOINT ** const list);
SAVEPOINT* savepoint_add(THD *thd, LEX_CSTRING name, SAVEPOINT **list,
int (*release_old)(THD*, SAVEPOINT*));
int online_alter_savepoint_set(THD *thd, LEX_CSTRING name)
{
DBUG_ENTER("binlog_online_alter_savepoint");
#ifdef HAVE_REPLICATION
if (thd->online_alter_cache_list.empty())
DBUG_RETURN(0);
if (savepoint_alloc_size < sizeof (SAVEPOINT) + sizeof(my_off_t))
savepoint_alloc_size= sizeof (SAVEPOINT) + sizeof(my_off_t);
for (auto &cache: thd->online_alter_cache_list)
{
if (cache.hton->savepoint_set == NULL)
continue;
SAVEPOINT *sv= savepoint_add(thd, name, &cache.sv_list, NULL);
if(unlikely(sv == NULL))
DBUG_RETURN(1);
my_off_t *pos= (my_off_t*)(sv+1);
*pos= cache.get_byte_position();
sv->prev= cache.sv_list;
cache.sv_list= sv;
}
#endif
DBUG_RETURN(0);
}
int online_alter_savepoint_rollback(THD *thd, LEX_CSTRING name)
{
DBUG_ENTER("online_alter_savepoint_rollback");
#ifdef HAVE_REPLICATION
for (auto &cache: thd->online_alter_cache_list)
{
if (cache.hton->savepoint_set == NULL)
continue;
SAVEPOINT **sv= find_savepoint_in_list(thd, name, &cache.sv_list);
// sv is null if savepoint was set up before online table was modified
my_off_t pos= *sv ? *(my_off_t*)(*sv+1) : 0;
cache.restore_savepoint(pos);
}
#endif
DBUG_RETURN(0);
}
int Event_log::write_cache_raw(THD *thd, IO_CACHE *cache)
{
......@@ -12528,61 +12062,3 @@ void wsrep_register_binlog_handler(THD *thd, bool trx)
}
#endif /* WITH_WSREP */
static int online_alter_close_connection(handlerton *hton, THD *thd)
{
DBUG_ASSERT(thd->online_alter_cache_list.empty());
return 0;
}
handlerton *online_alter_hton;
int online_alter_log_init(void *p)
{
online_alter_hton= (handlerton *)p;
online_alter_hton->db_type= DB_TYPE_ONLINE_ALTER;
online_alter_hton->savepoint_offset= sizeof(my_off_t);
online_alter_hton->close_connection= online_alter_close_connection;
online_alter_hton->savepoint_set= // Done by online_alter_savepoint_set
[](handlerton *, THD *, void *){ return 0; };
online_alter_hton->savepoint_rollback= // Done by online_alter_savepoint_rollback
[](handlerton *, THD *, void *){ return 0; };
online_alter_hton->savepoint_rollback_can_release_mdl=
[](handlerton *hton, THD *thd){ return true; };
online_alter_hton->commit= [](handlerton *, THD *thd, bool all)
{ return binlog_online_alter_end_trans(thd, all, true); };
online_alter_hton->rollback= [](handlerton *, THD *thd, bool all)
{ return binlog_online_alter_end_trans(thd, all, false); };
online_alter_hton->commit_by_xid= [](handlerton *hton, XID *xid)
{ return binlog_online_alter_end_trans(current_thd, true, true); };
online_alter_hton->rollback_by_xid= [](handlerton *hton, XID *xid)
{ return binlog_online_alter_end_trans(current_thd, true, false); };
online_alter_hton->drop_table= [](handlerton *, const char*) { return -1; };
online_alter_hton->flags= HTON_NOT_USER_SELECTABLE | HTON_HIDDEN
| HTON_NO_ROLLBACK;
return 0;
}
struct st_mysql_storage_engine online_alter_storage_engine=
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
maria_declare_plugin(online_alter_log)
{
MYSQL_STORAGE_ENGINE_PLUGIN,
&online_alter_storage_engine,
"online_alter_log",
"MariaDB PLC",
"This is a pseudo storage engine to represent the online alter log in a transaction",
PLUGIN_LICENSE_GPL,
online_alter_log_init,
NULL,
0x0100, // 1.0
NULL, // no status vars
NULL, // no sysvars
"1.0",
MariaDB_PLUGIN_MATURITY_STABLE
}
maria_declare_plugin_end;
......@@ -1343,15 +1343,13 @@ int binlog_flush_pending_rows_event(THD *thd, bool stmt_end,
binlog_cache_data *cache_data);
Rows_log_event* binlog_get_pending_rows_event(binlog_cache_mngr *cache_mngr,
bool use_trans_cache);
int binlog_log_row_online_alter(TABLE* table, const uchar *before_record,
const uchar *after_record, Log_func *log_func);
online_alter_cache_data *online_alter_binlog_get_cache_data(THD *thd, TABLE *table);
int online_alter_log_row(TABLE* table, const uchar *before_record,
const uchar *after_record, Log_func *log_func);
binlog_cache_data* binlog_get_cache_data(binlog_cache_mngr *cache_mngr,
bool use_trans_cache);
extern MYSQL_PLUGIN_IMPORT MYSQL_BIN_LOG mysql_bin_log;
extern handlerton *binlog_hton;
extern handlerton *online_alter_hton;
extern LOGGER logger;
extern const char *log_bin_index;
......
/*
Copyright (c) 2023, MariaDB plc
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA
*/
#include "log_event.h"
static constexpr my_off_t MY_OFF_T_UNDEF= ~0ULL;
/** Truncate cache log files bigger than this */
static constexpr my_off_t CACHE_FILE_TRUNC_SIZE = 65536;
class binlog_cache_data
{
public:
binlog_cache_data(): before_stmt_pos(MY_OFF_T_UNDEF), m_pending(0), status(0),
incident(FALSE), saved_max_binlog_cache_size(0), ptr_binlog_cache_use(0),
ptr_binlog_cache_disk_use(0)
{ }
~binlog_cache_data()
{
DBUG_ASSERT(empty());
close_cached_file(&cache_log);
}
/*
Return 1 if there is no relevant entries in the cache
This is:
- Cache is empty
- There are row or critical (DDL?) events in the cache
The status test is needed to avoid writing entries with only
a table map entry, which would crash in do_apply_event() on the slave
as it assumes that there is always a row entry after a table map.
*/
bool empty() const
{
return (pending() == NULL &&
(my_b_write_tell(&cache_log) == 0 ||
((status & (LOGGED_ROW_EVENT | LOGGED_CRITICAL)) == 0)));
}
Rows_log_event *pending() const
{
return m_pending;
}
void set_pending(Rows_log_event *const pending_arg)
{
m_pending= pending_arg;
}
void set_incident(void)
{
incident= TRUE;
}
bool has_incident(void) const
{
return(incident);
}
void reset()
{
bool cache_was_empty= empty();
bool truncate_file= (cache_log.file != -1 &&
my_b_write_tell(&cache_log) > CACHE_FILE_TRUNC_SIZE);
truncate(0,1); // Forget what's in cache
if (!cache_was_empty)
compute_statistics();
if (truncate_file)
my_chsize(cache_log.file, 0, 0, MYF(MY_WME));
status= 0;
incident= FALSE;
before_stmt_pos= MY_OFF_T_UNDEF;
DBUG_ASSERT(empty());
}
my_off_t get_byte_position() const
{
return my_b_tell(&cache_log);
}
my_off_t get_prev_position() const
{
return(before_stmt_pos);
}
void set_prev_position(my_off_t pos)
{
before_stmt_pos= pos;
}
void restore_prev_position()
{
truncate(before_stmt_pos);
}
void restore_savepoint(my_off_t pos)
{
truncate(pos);
if (pos < before_stmt_pos)
before_stmt_pos= MY_OFF_T_UNDEF;
}
void set_binlog_cache_info(my_off_t param_max_binlog_cache_size,
ulong *param_ptr_binlog_cache_use,
ulong *param_ptr_binlog_cache_disk_use)
{
/*
The assertions guarantee that the set_binlog_cache_info is
called just once and information passed as parameters are
never zero.
This is done while calling the constructor binlog_cache_mngr.
We cannot set information in the constructor binlog_cache_data
because the space for binlog_cache_mngr is allocated through
a placement new.
In the future, we can refactor this and change it to avoid
the set_binlog_info.
*/
DBUG_ASSERT(saved_max_binlog_cache_size == 0);
DBUG_ASSERT(param_max_binlog_cache_size != 0);
DBUG_ASSERT(ptr_binlog_cache_use == 0);
DBUG_ASSERT(param_ptr_binlog_cache_use != 0);
DBUG_ASSERT(ptr_binlog_cache_disk_use == 0);
DBUG_ASSERT(param_ptr_binlog_cache_disk_use != 0);
saved_max_binlog_cache_size= param_max_binlog_cache_size;
ptr_binlog_cache_use= param_ptr_binlog_cache_use;
ptr_binlog_cache_disk_use= param_ptr_binlog_cache_disk_use;
cache_log.end_of_file= saved_max_binlog_cache_size;
}
void add_status(enum_logged_status status_arg)
{
status|= status_arg;
}
/*
Cache to store data before copying it to the binary log.
*/
IO_CACHE cache_log;
protected:
/*
Binlog position before the start of the current statement.
*/
my_off_t before_stmt_pos;
private:
/*
Pending binrows event. This event is the event where the rows are currently
written.
*/
Rows_log_event *m_pending;
/*
Bit flags for what has been writing to cache. Used to
discard logs without any data changes.
see enum_logged_status;
*/
uint32 status;
/*
This indicates that some events did not get into the cache and most likely
it is corrupted.
*/
bool incident;
/**
This function computes binlog cache and disk usage.
*/
void compute_statistics()
{
statistic_increment(*ptr_binlog_cache_use, &LOCK_status);
if (cache_log.disk_writes != 0)
{
#ifdef REAL_STATISTICS
statistic_add(*ptr_binlog_cache_disk_use,
cache_log.disk_writes, &LOCK_status);
#else
statistic_increment(*ptr_binlog_cache_disk_use, &LOCK_status);
#endif
cache_log.disk_writes= 0;
}
}
/*
Stores the values of maximum size of the cache allowed when this cache
is configured. This corresponds to either
. max_binlog_cache_size or max_binlog_stmt_cache_size.
*/
my_off_t saved_max_binlog_cache_size;
/*
Stores a pointer to the status variable that keeps track of the in-memory
cache usage. This corresponds to either
. binlog_cache_use or binlog_stmt_cache_use.
*/
ulong *ptr_binlog_cache_use;
/*
Stores a pointer to the status variable that keeps track of the disk
cache usage. This corresponds to either
. binlog_cache_disk_use or binlog_stmt_cache_disk_use.
*/
ulong *ptr_binlog_cache_disk_use;
/*
It truncates the cache to a certain position. This includes deleting the
pending event.
*/
void truncate(my_off_t pos, bool reset_cache=0)
{
DBUG_PRINT("info", ("truncating to position %lu", (ulong) pos));
cache_log.error=0;
if (pending())
{
delete pending();
set_pending(0);
}
my_bool res= reinit_io_cache(&cache_log, WRITE_CACHE, pos, 0, reset_cache);
DBUG_ASSERT(res == 0);
cache_log.end_of_file= saved_max_binlog_cache_size;
}
binlog_cache_data& operator=(const binlog_cache_data& info);
binlog_cache_data(const binlog_cache_data& info);
};
/*
Copyright (c) 2023, MariaDB plc
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA
*/
#include "my_global.h"
#include "handler.h"
#include "sql_class.h"
#include "log_cache.h"
static handlerton *online_alter_hton;
class online_alter_cache_data: public Sql_alloc, public ilist_node<>,
public binlog_cache_data
{
public:
void store_prev_position()
{
before_stmt_pos= my_b_write_tell(&cache_log);
}
handlerton *hton;
Cache_flip_event_log *sink_log;
SAVEPOINT *sv_list;
};
static
online_alter_cache_data *setup_cache_data(MEM_ROOT *root, TABLE_SHARE *share)
{
static ulong online_alter_cache_use= 0, online_alter_cache_disk_use= 0;
auto cache= new (root) online_alter_cache_data();
if (!cache || open_cached_file(&cache->cache_log, mysql_tmpdir, LOG_PREFIX,
(size_t)binlog_cache_size, MYF(MY_WME)))
{
delete cache;
return NULL;
}
share->online_alter_binlog->acquire();
cache->hton= share->db_type();
cache->sink_log= share->online_alter_binlog;
my_off_t binlog_max_size= SIZE_T_MAX; // maximum possible cache size
DBUG_EXECUTE_IF("online_alter_small_cache", binlog_max_size= 4096;);
cache->set_binlog_cache_info(binlog_max_size,
&online_alter_cache_use,
&online_alter_cache_disk_use);
cache->store_prev_position();
return cache;
}
static online_alter_cache_data *get_cache_data(THD *thd, TABLE *table)
{
ilist<online_alter_cache_data> &list= thd->online_alter_cache_list;
/* we assume it's very rare to have more than one online ALTER running */
for (auto &cache: list)
{
if (cache.sink_log == table->s->online_alter_binlog)
return &cache;
}
MEM_ROOT *root= &thd->transaction->mem_root;
auto *new_cache_data= setup_cache_data(root, table->s);
list.push_back(*new_cache_data);
return new_cache_data;
}
int online_alter_log_row(TABLE* table, const uchar *before_record,
const uchar *after_record, Log_func *log_func)
{
THD *thd= table->in_use;
if (!table->online_alter_cache)
{
table->online_alter_cache= get_cache_data(thd, table);
trans_register_ha(thd, false, online_alter_hton, 0);
if (thd->in_multi_stmt_transaction_mode())
trans_register_ha(thd, true, online_alter_hton, 0);
}
// We need to log all columns for the case if alter table changes primary key
DBUG_ASSERT(!before_record || bitmap_is_set_all(table->read_set));
MY_BITMAP *old_rpl_write_set= table->rpl_write_set;
table->rpl_write_set= &table->s->all_set;
table->online_alter_cache->store_prev_position();
int error= (*log_func)(thd, table, table->s->online_alter_binlog,
table->online_alter_cache,
table->file->has_transactions_and_rollback(),
BINLOG_ROW_IMAGE_FULL,
before_record, after_record);
table->rpl_write_set= old_rpl_write_set;
if (unlikely(error))
{
table->online_alter_cache->restore_prev_position();
return HA_ERR_RBR_LOGGING_FAILED;
}
return 0;
}
static void
cleanup_cache_list(ilist<online_alter_cache_data> &list, bool ending_trans)
{
if (ending_trans)
{
auto it= list.begin();
while (it != list.end())
{
auto &cache= *it++;
cache.sink_log->release();
cache.reset();
delete &cache;
}
list.clear();
DBUG_ASSERT(list.empty());
}
}
static
int online_alter_end_trans(handlerton *hton, THD *thd, bool all, bool commit)
{
DBUG_ENTER("online_alter_end_trans");
int error= 0;
if (thd->online_alter_cache_list.empty())
DBUG_RETURN(0);
bool is_ending_transaction= ending_trans(thd, all);
for (auto &cache: thd->online_alter_cache_list)
{
auto *binlog= cache.sink_log;
DBUG_ASSERT(binlog);
bool non_trans= cache.hton->flags & HTON_NO_ROLLBACK // Aria
|| !cache.hton->rollback;
bool do_commit= (commit && is_ending_transaction) || non_trans;
if (commit || non_trans)
{
// Do not set STMT_END for last event to leave table open in altering thd
error= binlog_flush_pending_rows_event(thd, false, true, binlog, &cache);
}
if (do_commit)
{
/*
If the cache wasn't reinited to write, then it remains empty after
the last write.
*/
if (my_b_bytes_in_cache(&cache.cache_log) && likely(!error))
{
DBUG_ASSERT(cache.cache_log.type != READ_CACHE);
mysql_mutex_lock(binlog->get_log_lock());
error= binlog->write_cache_raw(thd, &cache.cache_log);
mysql_mutex_unlock(binlog->get_log_lock());
}
}
else if (!commit) // rollback
{
DBUG_ASSERT(!non_trans);
cache.restore_prev_position();
}
else
{
DBUG_ASSERT(!is_ending_transaction);
cache.store_prev_position();
}
if (error)
{
my_error(ER_ERROR_ON_WRITE, MYF(ME_ERROR_LOG),
binlog->get_name(), errno);
cleanup_cache_list(thd->online_alter_cache_list,
is_ending_transaction);
DBUG_RETURN(error);
}
}
cleanup_cache_list(thd->online_alter_cache_list,
is_ending_transaction);
for (TABLE *table= thd->open_tables; table; table= table->next)
table->online_alter_cache= NULL;
DBUG_RETURN(error);
}
SAVEPOINT** find_savepoint_in_list(THD *thd, LEX_CSTRING name,
SAVEPOINT ** const list);
SAVEPOINT* savepoint_add(THD *thd, LEX_CSTRING name, SAVEPOINT **list,
int (*release_old)(THD*, SAVEPOINT*));
int online_alter_savepoint_set(THD *thd, LEX_CSTRING name)
{
DBUG_ENTER("binlog_online_alter_savepoint");
if (thd->online_alter_cache_list.empty())
DBUG_RETURN(0);
if (savepoint_alloc_size < sizeof (SAVEPOINT) + sizeof(my_off_t))
savepoint_alloc_size= sizeof (SAVEPOINT) + sizeof(my_off_t);
for (auto &cache: thd->online_alter_cache_list)
{
if (cache.hton->savepoint_set == NULL)
continue;
SAVEPOINT *sv= savepoint_add(thd, name, &cache.sv_list, NULL);
if(unlikely(sv == NULL))
DBUG_RETURN(1);
my_off_t *pos= (my_off_t*)(sv+1);
*pos= cache.get_byte_position();
sv->prev= cache.sv_list;
cache.sv_list= sv;
}
DBUG_RETURN(0);
}
int online_alter_savepoint_rollback(THD *thd, LEX_CSTRING name)
{
DBUG_ENTER("online_alter_savepoint_rollback");
for (auto &cache: thd->online_alter_cache_list)
{
if (cache.hton->savepoint_set == NULL)
continue;
SAVEPOINT **sv= find_savepoint_in_list(thd, name, &cache.sv_list);
// sv is null if savepoint was set up before online table was modified
my_off_t pos= *sv ? *(my_off_t*)(*sv+1) : 0;
cache.restore_savepoint(pos);
}
DBUG_RETURN(0);
}
static int online_alter_close_connection(handlerton *hton, THD *thd)
{
DBUG_ASSERT(thd->online_alter_cache_list.empty());
return 0;
}
static int online_alter_log_init(void *p)
{
online_alter_hton= (handlerton *)p;
online_alter_hton->db_type= DB_TYPE_ONLINE_ALTER;
online_alter_hton->savepoint_offset= sizeof(my_off_t);
online_alter_hton->close_connection= online_alter_close_connection;
online_alter_hton->savepoint_set= // Done by online_alter_savepoint_set
[](handlerton *, THD *, void *){ return 0; };
online_alter_hton->savepoint_rollback= // Done by online_alter_savepoint_rollback
[](handlerton *, THD *, void *){ return 0; };
online_alter_hton->savepoint_rollback_can_release_mdl=
[](handlerton *hton, THD *thd){ return true; };
online_alter_hton->commit= [](handlerton *hton, THD *thd, bool all)
{ return online_alter_end_trans(hton, thd, all, true); };
online_alter_hton->rollback= [](handlerton *hton, THD *thd, bool all)
{ return online_alter_end_trans(hton, thd, all, false); };
online_alter_hton->commit_by_xid= [](handlerton *hton, XID *xid)
{ return online_alter_end_trans(hton, current_thd, true, true); };
online_alter_hton->rollback_by_xid= [](handlerton *hton, XID *xid)
{ return online_alter_end_trans(hton, current_thd, true, false); };
online_alter_hton->drop_table= [](handlerton *, const char*) { return -1; };
online_alter_hton->flags= HTON_NOT_USER_SELECTABLE | HTON_HIDDEN
| HTON_NO_ROLLBACK;
return 0;
}
struct st_mysql_storage_engine online_alter_storage_engine=
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
maria_declare_plugin(online_alter_log)
{
MYSQL_STORAGE_ENGINE_PLUGIN,
&online_alter_storage_engine,
"online_alter_log",
"MariaDB PLC",
"A pseudo storage engine for the online alter log",
PLUGIN_LICENSE_GPL,
online_alter_log_init,
NULL,
0x0100, // 1.0
NULL, // no status vars
NULL, // no sysvars
"1.0",
MariaDB_PLUGIN_MATURITY_STABLE
}
maria_declare_plugin_end;
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment