Commit ee499431 authored by Nikita Malyavin's avatar Nikita Malyavin

MDEV-16329 [2/5] refactor binlog and cache_mngr

pump up binlog and cache manager to level of binlog_log_row_internal
parent 4a1ef202
...@@ -7239,7 +7239,6 @@ int handler::binlog_log_row(TABLE *table, ...@@ -7239,7 +7239,6 @@ int handler::binlog_log_row(TABLE *table,
const uchar *after_record, const uchar *after_record,
Log_func *log_func) Log_func *log_func)
{ {
bool error;
THD *thd= table->in_use; THD *thd= table->in_use;
DBUG_ENTER("binlog_log_row"); DBUG_ENTER("binlog_log_row");
...@@ -7247,8 +7246,21 @@ int handler::binlog_log_row(TABLE *table, ...@@ -7247,8 +7246,21 @@ int handler::binlog_log_row(TABLE *table,
thd->binlog_write_table_maps()) thd->binlog_write_table_maps())
DBUG_RETURN(HA_ERR_RBR_LOGGING_FAILED); DBUG_RETURN(HA_ERR_RBR_LOGGING_FAILED);
error= (*log_func)(thd, table, row_logging_has_trans, DBUG_ASSERT(thd->is_current_stmt_binlog_format_row());
before_record, after_record); DBUG_ASSERT((WSREP_NNULL(thd) && wsrep_emulate_bin_log)
|| mysql_bin_log.is_open());
auto *cache_mngr= thd->binlog_setup_trx_data();
if (cache_mngr == NULL)
DBUG_RETURN(HA_ERR_OUT_OF_MEM);
bool is_trans= row_logging_has_trans;
/* Ensure that all events in a GTID group are in the same cache */
if (thd->variables.option_bits & OPTION_GTID_BEGIN)
is_trans= 1;
bool error= (*log_func)(thd, table, &mysql_bin_log, cache_mngr,
is_trans, before_record, after_record);
DBUG_RETURN(error ? HA_ERR_RBR_LOGGING_FAILED : 0); DBUG_RETURN(error ? HA_ERR_RBR_LOGGING_FAILED : 0);
} }
......
...@@ -651,7 +651,11 @@ given at all. */ ...@@ -651,7 +651,11 @@ given at all. */
#define HA_CREATE_PRINT_ALL_OPTIONS (1UL << 26) #define HA_CREATE_PRINT_ALL_OPTIONS (1UL << 26)
typedef ulonglong alter_table_operations; typedef ulonglong alter_table_operations;
typedef bool Log_func(THD*, TABLE*, bool, const uchar*, const uchar*);
class MYSQL_BIN_LOG;
class binlog_cache_mngr;
typedef bool Log_func(THD*, TABLE*, MYSQL_BIN_LOG *, binlog_cache_mngr *, bool,
const uchar*, const uchar*);
/* /*
These flags are set by the parser and describes the type of These flags are set by the parser and describes the type of
...@@ -5539,11 +5543,6 @@ inline const LEX_CSTRING *table_case_name(HA_CREATE_INFO *info, const LEX_CSTRIN ...@@ -5539,11 +5543,6 @@ inline const LEX_CSTRING *table_case_name(HA_CREATE_INFO *info, const LEX_CSTRIN
return ((lower_case_table_names == 2 && info->alias.str) ? &info->alias : name); return ((lower_case_table_names == 2 && info->alias.str) ? &info->alias : name);
} }
typedef bool Log_func(THD*, TABLE*, bool, const uchar*, const uchar*);
int binlog_log_row(TABLE* table,
const uchar *before_record,
const uchar *after_record,
Log_func *log_func);
/** /**
@def MYSQL_TABLE_IO_WAIT @def MYSQL_TABLE_IO_WAIT
......
...@@ -2088,7 +2088,7 @@ binlog_truncate_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr, bool all) ...@@ -2088,7 +2088,7 @@ binlog_truncate_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr, bool all)
else else
cache_mngr->trx_cache.restore_prev_position(); cache_mngr->trx_cache.restore_prev_position();
DBUG_ASSERT(thd->binlog_get_pending_rows_event(is_transactional) == NULL); DBUG_ASSERT(cache_mngr->trx_cache.pending() == NULL);
DBUG_RETURN(error); DBUG_RETURN(error);
} }
...@@ -6282,16 +6282,16 @@ binlog_cache_mngr *THD::binlog_get_cache_mngr() const ...@@ -6282,16 +6282,16 @@ binlog_cache_mngr *THD::binlog_get_cache_mngr() const
is @c true, the pending event is returned from the transactional cache. is @c true, the pending event is returned from the transactional cache.
Otherwise from the non-transactional cache. Otherwise from the non-transactional cache.
@param is_transactional @c true indicates a transactional cache, @param cache_mngr cache manager to return pending row from
@param use_trans_cache @c true indicates a transactional cache,
otherwise @c false a non-transactional. otherwise @c false a non-transactional.
@return @return
The row event if any. The row event if any.
*/ */
Rows_log_event* Rows_log_event* binlog_get_pending_rows_event(binlog_cache_mngr *cache_mngr,
THD::binlog_get_pending_rows_event(bool is_transactional) const bool use_trans_cache)
{ {
Rows_log_event* rows= NULL; Rows_log_event* rows= NULL;
binlog_cache_mngr *const cache_mngr= binlog_get_cache_mngr();
/* /*
This is less than ideal, but here's the story: If there is no cache_mngr, This is less than ideal, but here's the story: If there is no cache_mngr,
...@@ -6299,13 +6299,34 @@ THD::binlog_get_pending_rows_event(bool is_transactional) const ...@@ -6299,13 +6299,34 @@ THD::binlog_get_pending_rows_event(bool is_transactional) const
is set up there). In that case, we just return NULL. is set up there). In that case, we just return NULL.
*/ */
if (cache_mngr) if (cache_mngr)
rows= cache_mngr->get_binlog_cache_data(use_trans_cache)->pending();
return rows;
}
int binlog_flush_pending_rows_event(THD *thd, bool stmt_end,
bool is_transactional,
MYSQL_BIN_LOG *bin_log,
binlog_cache_mngr *cache_mngr,
bool use_trans_cache)
{
/*
Mark the event as the last event of a statement if the stmt_end
flag is set.
*/
int error= 0;
auto *pending= cache_mngr->get_binlog_cache_data(use_trans_cache)->pending();
if (pending)
{ {
binlog_cache_data *cache_data= if (stmt_end)
cache_mngr->get_binlog_cache_data(use_trans_cache(this, is_transactional)); {
pending->set_flags(Rows_log_event::STMT_END_F);
thd->reset_binlog_for_next_statement();
}
rows= cache_data->pending(); error= bin_log->flush_and_set_pending_rows_event(thd, 0, cache_mngr,
is_transactional);
} }
return (rows); return error;
} }
/** /**
...@@ -6315,18 +6336,18 @@ THD::binlog_get_pending_rows_event(bool is_transactional) const ...@@ -6315,18 +6336,18 @@ THD::binlog_get_pending_rows_event(bool is_transactional) const
into the non-transactional cache. into the non-transactional cache.
@param evt a pointer to the row event. @param evt a pointer to the row event.
@param is_transactional @c true indicates a transactional cache, @param use_trans_cache @c true indicates a transactional cache,
otherwise @c false a non-transactional. otherwise @c false a non-transactional.
*/ */
void void
THD::binlog_set_pending_rows_event(Rows_log_event* ev, bool is_transactional) THD::binlog_set_pending_rows_event(Rows_log_event* ev, bool use_trans_cache)
{ {
binlog_cache_mngr *const cache_mngr= binlog_setup_trx_data(); binlog_cache_mngr *const cache_mngr= binlog_setup_trx_data();
DBUG_ASSERT(cache_mngr); DBUG_ASSERT(cache_mngr);
binlog_cache_data *cache_data= binlog_cache_data *cache_data=
cache_mngr->get_binlog_cache_data(use_trans_cache(this, is_transactional)); cache_mngr->get_binlog_cache_data(use_trans_cache);
cache_data->set_pending(ev); cache_data->set_pending(ev);
} }
...@@ -6375,18 +6396,18 @@ MYSQL_BIN_LOG::remove_pending_rows_event(THD *thd, bool is_transactional) ...@@ -6375,18 +6396,18 @@ MYSQL_BIN_LOG::remove_pending_rows_event(THD *thd, bool is_transactional)
int int
MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd, MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
Rows_log_event* event, Rows_log_event* event,
binlog_cache_mngr *cache_mngr,
bool is_transactional) bool is_transactional)
{ {
DBUG_ENTER("MYSQL_BIN_LOG::flush_and_set_pending_rows_event(event)"); DBUG_ENTER("MYSQL_BIN_LOG::flush_and_set_pending_rows_event(event)");
DBUG_ASSERT(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()); DBUG_ASSERT(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open());
DBUG_PRINT("enter", ("event: %p", event)); DBUG_PRINT("enter", ("event: %p", event));
binlog_cache_mngr *const cache_mngr= thd->binlog_get_cache_mngr();
DBUG_ASSERT(cache_mngr); DBUG_ASSERT(cache_mngr);
bool should_use_trans_cache= use_trans_cache(thd, is_transactional);
binlog_cache_data *cache_data= binlog_cache_data *cache_data=
cache_mngr->get_binlog_cache_data(use_trans_cache(thd, is_transactional)); cache_mngr->get_binlog_cache_data(should_use_trans_cache);
DBUG_PRINT("info", ("cache_mngr->pending(): %p", cache_data->pending())); DBUG_PRINT("info", ("cache_mngr->pending(): %p", cache_data->pending()));
...@@ -6415,7 +6436,7 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd, ...@@ -6415,7 +6436,7 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
delete pending; delete pending;
} }
thd->binlog_set_pending_rows_event(event, is_transactional); thd->binlog_set_pending_rows_event(event, should_use_trans_cache);
DBUG_RETURN(0); DBUG_RETURN(0);
} }
......
...@@ -725,6 +725,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG ...@@ -725,6 +725,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
#if !defined(MYSQL_CLIENT) #if !defined(MYSQL_CLIENT)
int flush_and_set_pending_rows_event(THD *thd, Rows_log_event* event, int flush_and_set_pending_rows_event(THD *thd, Rows_log_event* event,
binlog_cache_mngr *cache_mngr,
bool is_transactional); bool is_transactional);
int remove_pending_rows_event(THD *thd, bool is_transactional); int remove_pending_rows_event(THD *thd, bool is_transactional);
...@@ -1175,6 +1176,13 @@ File open_binlog(IO_CACHE *log, const char *log_file_name, ...@@ -1175,6 +1176,13 @@ File open_binlog(IO_CACHE *log, const char *log_file_name,
void make_default_log_name(char **out, const char* log_ext, bool once); void make_default_log_name(char **out, const char* log_ext, bool once);
void binlog_reset_cache(THD *thd); void binlog_reset_cache(THD *thd);
bool write_annotated_row(THD *thd); bool write_annotated_row(THD *thd);
int binlog_flush_pending_rows_event(THD *thd, bool stmt_end,
bool is_transactional,
MYSQL_BIN_LOG *bin_log,
binlog_cache_mngr *cache_mngr,
bool use_trans_cache);
Rows_log_event* binlog_get_pending_rows_event(binlog_cache_mngr *cache_mngr,
bool use_trans_cache);
extern MYSQL_PLUGIN_IMPORT MYSQL_BIN_LOG mysql_bin_log; extern MYSQL_PLUGIN_IMPORT MYSQL_BIN_LOG mysql_bin_log;
extern handlerton *binlog_hton; extern handlerton *binlog_hton;
......
...@@ -4872,13 +4872,16 @@ class Write_rows_log_event : public Rows_log_event ...@@ -4872,13 +4872,16 @@ class Write_rows_log_event : public Rows_log_event
#endif #endif
#if defined(MYSQL_SERVER) #if defined(MYSQL_SERVER)
static bool binlog_row_logging_function(THD *thd, TABLE *table, static bool binlog_row_logging_function(THD *thd, TABLE *table,
MYSQL_BIN_LOG *bin_log,
binlog_cache_mngr *cache_mngr,
bool is_transactional, bool is_transactional,
const uchar *before_record const uchar *before_record
__attribute__((unused)), __attribute__((unused)),
const uchar *after_record) const uchar *after_record)
{ {
DBUG_ASSERT(!table->versioned(VERS_TRX_ID)); DBUG_ASSERT(!table->versioned(VERS_TRX_ID));
return thd->binlog_write_row(table, is_transactional, after_record); return thd->binlog_write_row(table, bin_log, cache_mngr, is_transactional,
after_record);
} }
#endif #endif
...@@ -4955,12 +4958,14 @@ class Update_rows_log_event : public Rows_log_event ...@@ -4955,12 +4958,14 @@ class Update_rows_log_event : public Rows_log_event
#ifdef MYSQL_SERVER #ifdef MYSQL_SERVER
static bool binlog_row_logging_function(THD *thd, TABLE *table, static bool binlog_row_logging_function(THD *thd, TABLE *table,
MYSQL_BIN_LOG *bin_log,
binlog_cache_mngr *cache_mngr,
bool is_transactional, bool is_transactional,
const uchar *before_record, const uchar *before_record,
const uchar *after_record) const uchar *after_record)
{ {
DBUG_ASSERT(!table->versioned(VERS_TRX_ID)); DBUG_ASSERT(!table->versioned(VERS_TRX_ID));
return thd->binlog_update_row(table, is_transactional, return thd->binlog_update_row(table, bin_log, cache_mngr, is_transactional,
before_record, after_record); before_record, after_record);
} }
#endif #endif
...@@ -5044,13 +5049,15 @@ class Delete_rows_log_event : public Rows_log_event ...@@ -5044,13 +5049,15 @@ class Delete_rows_log_event : public Rows_log_event
#endif #endif
#ifdef MYSQL_SERVER #ifdef MYSQL_SERVER
static bool binlog_row_logging_function(THD *thd, TABLE *table, static bool binlog_row_logging_function(THD *thd, TABLE *table,
MYSQL_BIN_LOG *bin_log,
binlog_cache_mngr *cache_mngr,
bool is_transactional, bool is_transactional,
const uchar *before_record, const uchar *before_record,
const uchar *after_record const uchar *after_record
__attribute__((unused))) __attribute__((unused)))
{ {
DBUG_ASSERT(!table->versioned(VERS_TRX_ID)); DBUG_ASSERT(!table->versioned(VERS_TRX_ID));
return thd->binlog_delete_row(table, is_transactional, return thd->binlog_delete_row(table, bin_log, cache_mngr, is_transactional,
before_record); before_record);
} }
#endif #endif
......
This diff is collapsed.
...@@ -2944,11 +2944,14 @@ class THD: public THD_count, /* this must be first */ ...@@ -2944,11 +2944,14 @@ class THD: public THD_count, /* this must be first */
*/ */
void binlog_start_trans_and_stmt(); void binlog_start_trans_and_stmt();
void binlog_set_stmt_begin(); void binlog_set_stmt_begin();
int binlog_write_row(TABLE* table, bool is_transactional, int binlog_write_row(TABLE* table, MYSQL_BIN_LOG *bin_log,
binlog_cache_mngr *cache_mngr, bool is_transactional,
const uchar *buf); const uchar *buf);
int binlog_delete_row(TABLE* table, bool is_transactional, int binlog_delete_row(TABLE* table, MYSQL_BIN_LOG *bin_log,
binlog_cache_mngr *cache_mngr, bool is_transactional,
const uchar *buf); const uchar *buf);
int binlog_update_row(TABLE* table, bool is_transactional, int binlog_update_row(TABLE* table, MYSQL_BIN_LOG *bin_log,
binlog_cache_mngr *cache_mngr, bool is_transactional,
const uchar *old_data, const uchar *new_data); const uchar *old_data, const uchar *new_data);
bool prepare_handlers_for_update(uint flag); bool prepare_handlers_for_update(uint flag);
bool binlog_write_annotated_row(Log_event_writer *writer); bool binlog_write_annotated_row(Log_event_writer *writer);
...@@ -2963,13 +2966,7 @@ class THD: public THD_count, /* this must be first */ ...@@ -2963,13 +2966,7 @@ class THD: public THD_count, /* this must be first */
Member functions to handle pending event for row-level logging. Member functions to handle pending event for row-level logging.
*/ */
binlog_cache_mngr *binlog_get_cache_mngr() const; binlog_cache_mngr *binlog_get_cache_mngr() const;
template <class RowsEventT> Rows_log_event* void binlog_set_pending_rows_event(Rows_log_event* ev, bool use_trans_cache);
binlog_prepare_pending_rows_event(TABLE* table, uint32 serv_id,
size_t needed,
bool is_transactional,
RowsEventT* hint);
Rows_log_event* binlog_get_pending_rows_event(bool is_transactional) const;
void binlog_set_pending_rows_event(Rows_log_event* ev, bool is_transactional);
inline int binlog_flush_pending_rows_event(bool stmt_end) inline int binlog_flush_pending_rows_event(bool stmt_end)
{ {
return (binlog_flush_pending_rows_event(stmt_end, FALSE) || return (binlog_flush_pending_rows_event(stmt_end, FALSE) ||
...@@ -2981,7 +2978,8 @@ class THD: public THD_count, /* this must be first */ ...@@ -2981,7 +2978,8 @@ class THD: public THD_count, /* this must be first */
bool binlog_need_stmt_format(bool is_transactional) const bool binlog_need_stmt_format(bool is_transactional) const
{ {
return log_current_statement() && return log_current_statement() &&
!binlog_get_pending_rows_event(is_transactional); !binlog_get_pending_rows_event(binlog_get_cache_mngr(),
use_trans_cache(this, is_transactional));
} }
bool binlog_for_noop_dml(bool transactional_table); bool binlog_for_noop_dml(bool transactional_table);
......
...@@ -243,7 +243,8 @@ size_t Wsrep_client_service::bytes_generated() const ...@@ -243,7 +243,8 @@ size_t Wsrep_client_service::bytes_generated() const
if (cache) if (cache)
{ {
size_t pending_rows_event_length= 0; size_t pending_rows_event_length= 0;
if (Rows_log_event* ev= m_thd->binlog_get_pending_rows_event(true)) auto *cache_mngr= m_thd->binlog_get_cache_mngr();
if (auto* ev= binlog_get_pending_rows_event(cache_mngr, true))
{ {
pending_rows_event_length= ev->get_data_size(); pending_rows_event_length= ev->get_data_size();
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment