Commit ecf47bc2 authored by Nikita Malyavin's avatar Nikita Malyavin

MDEV-16329 [2/5] refactor binlog and cache_mngr

pump up binlog and cache manager to level of binlog_log_row_internal
parent a49e51b5
...@@ -7247,7 +7247,6 @@ int handler::binlog_log_row(TABLE *table, ...@@ -7247,7 +7247,6 @@ int handler::binlog_log_row(TABLE *table,
const uchar *after_record, const uchar *after_record,
Log_func *log_func) Log_func *log_func)
{ {
bool error;
THD *thd= table->in_use; THD *thd= table->in_use;
DBUG_ENTER("binlog_log_row"); DBUG_ENTER("binlog_log_row");
...@@ -7255,8 +7254,21 @@ int handler::binlog_log_row(TABLE *table, ...@@ -7255,8 +7254,21 @@ int handler::binlog_log_row(TABLE *table,
thd->binlog_write_table_maps()) thd->binlog_write_table_maps())
DBUG_RETURN(HA_ERR_RBR_LOGGING_FAILED); DBUG_RETURN(HA_ERR_RBR_LOGGING_FAILED);
error= (*log_func)(thd, table, row_logging_has_trans, DBUG_ASSERT(thd->is_current_stmt_binlog_format_row());
before_record, after_record); DBUG_ASSERT((WSREP_NNULL(thd) && wsrep_emulate_bin_log)
|| mysql_bin_log.is_open());
auto *cache_mngr= thd->binlog_setup_trx_data();
if (cache_mngr == NULL)
DBUG_RETURN(HA_ERR_OUT_OF_MEM);
bool is_trans= row_logging_has_trans;
/* Ensure that all events in a GTID group are in the same cache */
if (thd->variables.option_bits & OPTION_GTID_BEGIN)
is_trans= 1;
bool error= (*log_func)(thd, table, &mysql_bin_log, cache_mngr,
is_trans, before_record, after_record);
DBUG_RETURN(error ? HA_ERR_RBR_LOGGING_FAILED : 0); DBUG_RETURN(error ? HA_ERR_RBR_LOGGING_FAILED : 0);
} }
......
...@@ -653,7 +653,11 @@ given at all. */ ...@@ -653,7 +653,11 @@ given at all. */
#define HA_CREATE_PRINT_ALL_OPTIONS (1UL << 26) #define HA_CREATE_PRINT_ALL_OPTIONS (1UL << 26)
typedef ulonglong alter_table_operations; typedef ulonglong alter_table_operations;
typedef bool Log_func(THD*, TABLE*, bool, const uchar*, const uchar*);
class MYSQL_BIN_LOG;
class binlog_cache_mngr;
typedef bool Log_func(THD*, TABLE*, MYSQL_BIN_LOG *, binlog_cache_mngr *, bool,
const uchar*, const uchar*);
/* /*
These flags are set by the parser and describes the type of These flags are set by the parser and describes the type of
...@@ -5585,11 +5589,6 @@ inline const LEX_CSTRING *table_case_name(HA_CREATE_INFO *info, const LEX_CSTRIN ...@@ -5585,11 +5589,6 @@ inline const LEX_CSTRING *table_case_name(HA_CREATE_INFO *info, const LEX_CSTRIN
return ((lower_case_table_names == 2 && info->alias.str) ? &info->alias : name); return ((lower_case_table_names == 2 && info->alias.str) ? &info->alias : name);
} }
typedef bool Log_func(THD*, TABLE*, bool, const uchar*, const uchar*);
int binlog_log_row(TABLE* table,
const uchar *before_record,
const uchar *after_record,
Log_func *log_func);
/** /**
@def MYSQL_TABLE_IO_WAIT @def MYSQL_TABLE_IO_WAIT
......
...@@ -2088,7 +2088,7 @@ binlog_truncate_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr, bool all) ...@@ -2088,7 +2088,7 @@ binlog_truncate_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr, bool all)
else else
cache_mngr->trx_cache.restore_prev_position(); cache_mngr->trx_cache.restore_prev_position();
DBUG_ASSERT(thd->binlog_get_pending_rows_event(is_transactional) == NULL); DBUG_ASSERT(cache_mngr->trx_cache.pending() == NULL);
DBUG_RETURN(error); DBUG_RETURN(error);
} }
...@@ -6282,16 +6282,16 @@ binlog_cache_mngr *THD::binlog_get_cache_mngr() const ...@@ -6282,16 +6282,16 @@ binlog_cache_mngr *THD::binlog_get_cache_mngr() const
is @c true, the pending event is returned from the transactional cache. is @c true, the pending event is returned from the transactional cache.
Otherwise from the non-transactional cache. Otherwise from the non-transactional cache.
@param is_transactional @c true indicates a transactional cache, @param cache_mngr cache manager to return pending row from
@param use_trans_cache @c true indicates a transactional cache,
otherwise @c false a non-transactional. otherwise @c false a non-transactional.
@return @return
The row event if any. The row event if any.
*/ */
Rows_log_event* Rows_log_event* binlog_get_pending_rows_event(binlog_cache_mngr *cache_mngr,
THD::binlog_get_pending_rows_event(bool is_transactional) const bool use_trans_cache)
{ {
Rows_log_event* rows= NULL; Rows_log_event* rows= NULL;
binlog_cache_mngr *const cache_mngr= binlog_get_cache_mngr();
/* /*
This is less than ideal, but here's the story: If there is no cache_mngr, This is less than ideal, but here's the story: If there is no cache_mngr,
...@@ -6299,13 +6299,34 @@ THD::binlog_get_pending_rows_event(bool is_transactional) const ...@@ -6299,13 +6299,34 @@ THD::binlog_get_pending_rows_event(bool is_transactional) const
is set up there). In that case, we just return NULL. is set up there). In that case, we just return NULL.
*/ */
if (cache_mngr) if (cache_mngr)
rows= cache_mngr->get_binlog_cache_data(use_trans_cache)->pending();
return rows;
}
int binlog_flush_pending_rows_event(THD *thd, bool stmt_end,
bool is_transactional,
MYSQL_BIN_LOG *bin_log,
binlog_cache_mngr *cache_mngr,
bool use_trans_cache)
{
/*
Mark the event as the last event of a statement if the stmt_end
flag is set.
*/
int error= 0;
auto *pending= cache_mngr->get_binlog_cache_data(use_trans_cache)->pending();
if (pending)
{ {
binlog_cache_data *cache_data= if (stmt_end)
cache_mngr->get_binlog_cache_data(use_trans_cache(this, is_transactional)); {
pending->set_flags(Rows_log_event::STMT_END_F);
thd->reset_binlog_for_next_statement();
}
rows= cache_data->pending(); error= bin_log->flush_and_set_pending_rows_event(thd, 0, cache_mngr,
is_transactional);
} }
return (rows); return error;
} }
/** /**
...@@ -6315,18 +6336,18 @@ THD::binlog_get_pending_rows_event(bool is_transactional) const ...@@ -6315,18 +6336,18 @@ THD::binlog_get_pending_rows_event(bool is_transactional) const
into the non-transactional cache. into the non-transactional cache.
@param evt a pointer to the row event. @param evt a pointer to the row event.
@param is_transactional @c true indicates a transactional cache, @param use_trans_cache @c true indicates a transactional cache,
otherwise @c false a non-transactional. otherwise @c false a non-transactional.
*/ */
void void
THD::binlog_set_pending_rows_event(Rows_log_event* ev, bool is_transactional) THD::binlog_set_pending_rows_event(Rows_log_event* ev, bool use_trans_cache)
{ {
binlog_cache_mngr *const cache_mngr= binlog_setup_trx_data(); binlog_cache_mngr *const cache_mngr= binlog_setup_trx_data();
DBUG_ASSERT(cache_mngr); DBUG_ASSERT(cache_mngr);
binlog_cache_data *cache_data= binlog_cache_data *cache_data=
cache_mngr->get_binlog_cache_data(use_trans_cache(this, is_transactional)); cache_mngr->get_binlog_cache_data(use_trans_cache);
cache_data->set_pending(ev); cache_data->set_pending(ev);
} }
...@@ -6375,18 +6396,18 @@ MYSQL_BIN_LOG::remove_pending_rows_event(THD *thd, bool is_transactional) ...@@ -6375,18 +6396,18 @@ MYSQL_BIN_LOG::remove_pending_rows_event(THD *thd, bool is_transactional)
int int
MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd, MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
Rows_log_event* event, Rows_log_event* event,
binlog_cache_mngr *cache_mngr,
bool is_transactional) bool is_transactional)
{ {
DBUG_ENTER("MYSQL_BIN_LOG::flush_and_set_pending_rows_event(event)"); DBUG_ENTER("MYSQL_BIN_LOG::flush_and_set_pending_rows_event(event)");
DBUG_ASSERT(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()); DBUG_ASSERT(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open());
DBUG_PRINT("enter", ("event: %p", event)); DBUG_PRINT("enter", ("event: %p", event));
binlog_cache_mngr *const cache_mngr= thd->binlog_get_cache_mngr();
DBUG_ASSERT(cache_mngr); DBUG_ASSERT(cache_mngr);
bool should_use_trans_cache= use_trans_cache(thd, is_transactional);
binlog_cache_data *cache_data= binlog_cache_data *cache_data=
cache_mngr->get_binlog_cache_data(use_trans_cache(thd, is_transactional)); cache_mngr->get_binlog_cache_data(should_use_trans_cache);
DBUG_PRINT("info", ("cache_mngr->pending(): %p", cache_data->pending())); DBUG_PRINT("info", ("cache_mngr->pending(): %p", cache_data->pending()));
...@@ -6415,7 +6436,7 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd, ...@@ -6415,7 +6436,7 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
delete pending; delete pending;
} }
thd->binlog_set_pending_rows_event(event, is_transactional); thd->binlog_set_pending_rows_event(event, should_use_trans_cache);
DBUG_RETURN(0); DBUG_RETURN(0);
} }
......
...@@ -725,6 +725,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG ...@@ -725,6 +725,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
#if !defined(MYSQL_CLIENT) #if !defined(MYSQL_CLIENT)
int flush_and_set_pending_rows_event(THD *thd, Rows_log_event* event, int flush_and_set_pending_rows_event(THD *thd, Rows_log_event* event,
binlog_cache_mngr *cache_mngr,
bool is_transactional); bool is_transactional);
int remove_pending_rows_event(THD *thd, bool is_transactional); int remove_pending_rows_event(THD *thd, bool is_transactional);
...@@ -1175,6 +1176,13 @@ File open_binlog(IO_CACHE *log, const char *log_file_name, ...@@ -1175,6 +1176,13 @@ File open_binlog(IO_CACHE *log, const char *log_file_name,
void make_default_log_name(char **out, const char* log_ext, bool once); void make_default_log_name(char **out, const char* log_ext, bool once);
void binlog_reset_cache(THD *thd); void binlog_reset_cache(THD *thd);
bool write_annotated_row(THD *thd); bool write_annotated_row(THD *thd);
int binlog_flush_pending_rows_event(THD *thd, bool stmt_end,
bool is_transactional,
MYSQL_BIN_LOG *bin_log,
binlog_cache_mngr *cache_mngr,
bool use_trans_cache);
Rows_log_event* binlog_get_pending_rows_event(binlog_cache_mngr *cache_mngr,
bool use_trans_cache);
extern MYSQL_PLUGIN_IMPORT MYSQL_BIN_LOG mysql_bin_log; extern MYSQL_PLUGIN_IMPORT MYSQL_BIN_LOG mysql_bin_log;
extern handlerton *binlog_hton; extern handlerton *binlog_hton;
......
...@@ -4917,13 +4917,16 @@ class Write_rows_log_event : public Rows_log_event ...@@ -4917,13 +4917,16 @@ class Write_rows_log_event : public Rows_log_event
#endif #endif
#if defined(MYSQL_SERVER) #if defined(MYSQL_SERVER)
static bool binlog_row_logging_function(THD *thd, TABLE *table, static bool binlog_row_logging_function(THD *thd, TABLE *table,
MYSQL_BIN_LOG *bin_log,
binlog_cache_mngr *cache_mngr,
bool is_transactional, bool is_transactional,
const uchar *before_record const uchar *before_record
__attribute__((unused)), __attribute__((unused)),
const uchar *after_record) const uchar *after_record)
{ {
DBUG_ASSERT(!table->versioned(VERS_TRX_ID)); DBUG_ASSERT(!table->versioned(VERS_TRX_ID));
return thd->binlog_write_row(table, is_transactional, after_record); return thd->binlog_write_row(table, bin_log, cache_mngr, is_transactional,
after_record);
} }
#endif #endif
...@@ -5000,12 +5003,14 @@ class Update_rows_log_event : public Rows_log_event ...@@ -5000,12 +5003,14 @@ class Update_rows_log_event : public Rows_log_event
#ifdef MYSQL_SERVER #ifdef MYSQL_SERVER
static bool binlog_row_logging_function(THD *thd, TABLE *table, static bool binlog_row_logging_function(THD *thd, TABLE *table,
MYSQL_BIN_LOG *bin_log,
binlog_cache_mngr *cache_mngr,
bool is_transactional, bool is_transactional,
const uchar *before_record, const uchar *before_record,
const uchar *after_record) const uchar *after_record)
{ {
DBUG_ASSERT(!table->versioned(VERS_TRX_ID)); DBUG_ASSERT(!table->versioned(VERS_TRX_ID));
return thd->binlog_update_row(table, is_transactional, return thd->binlog_update_row(table, bin_log, cache_mngr, is_transactional,
before_record, after_record); before_record, after_record);
} }
#endif #endif
...@@ -5089,13 +5094,15 @@ class Delete_rows_log_event : public Rows_log_event ...@@ -5089,13 +5094,15 @@ class Delete_rows_log_event : public Rows_log_event
#endif #endif
#ifdef MYSQL_SERVER #ifdef MYSQL_SERVER
static bool binlog_row_logging_function(THD *thd, TABLE *table, static bool binlog_row_logging_function(THD *thd, TABLE *table,
MYSQL_BIN_LOG *bin_log,
binlog_cache_mngr *cache_mngr,
bool is_transactional, bool is_transactional,
const uchar *before_record, const uchar *before_record,
const uchar *after_record const uchar *after_record
__attribute__((unused))) __attribute__((unused)))
{ {
DBUG_ASSERT(!table->versioned(VERS_TRX_ID)); DBUG_ASSERT(!table->versioned(VERS_TRX_ID));
return thd->binlog_delete_row(table, is_transactional, return thd->binlog_delete_row(table, bin_log, cache_mngr, is_transactional,
before_record); before_record);
} }
#endif #endif
......
...@@ -6894,6 +6894,32 @@ bool THD::binlog_table_should_be_logged(const LEX_CSTRING *db) ...@@ -6894,6 +6894,32 @@ bool THD::binlog_table_should_be_logged(const LEX_CSTRING *db)
binlog_filter->db_ok(db->str))); binlog_filter->db_ok(db->str)));
} }
struct RowsEventFactory
{
int type_code;
Rows_log_event *(*create)(THD*, TABLE*, ulong, bool is_transactional);
};
/**
Creates RowsEventFactory, responsible for creating Rows_log_event descendant.
@tparam RowsEventT is a type which will be constructed by
RowsEventFactory::create.
@return a RowsEventFactory object with type_code equal to RowsEventT::TYPE_CODE
and create containing pointer to a RowsEventT constructor callback.
*/
template<class RowsEventT>
static RowsEventFactory binlog_get_rows_event_creator()
{
return { RowsEventT::TYPE_CODE,
[](THD* thd, TABLE* table, ulong flags, bool is_transactional)
-> Rows_log_event*
{
return new RowsEventT(thd, table, flags, is_transactional);
}
};
}
/* /*
Template member function for ensuring that there is an rows log Template member function for ensuring that there is an rows log
event of the apropriate type before proceeding. event of the apropriate type before proceeding.
...@@ -6915,31 +6941,25 @@ bool THD::binlog_table_should_be_logged(const LEX_CSTRING *db) ...@@ -6915,31 +6941,25 @@ bool THD::binlog_table_should_be_logged(const LEX_CSTRING *db)
If error, NULL. If error, NULL.
*/ */
template <class RowsEventT> Rows_log_event* Rows_log_event*
THD::binlog_prepare_pending_rows_event(TABLE* table, uint32 serv_id, binlog_prepare_pending_rows_event(THD *thd, TABLE* table,
size_t needed, MYSQL_BIN_LOG *bin_log,
binlog_cache_mngr *cache_mngr,
uint32 serv_id, size_t needed,
bool is_transactional, bool is_transactional,
RowsEventT *hint __attribute__((unused))) RowsEventFactory event_factory)
{ {
DBUG_ENTER("binlog_prepare_pending_rows_event"); DBUG_ENTER("binlog_prepare_pending_rows_event");
/* Pre-conditions */ /* Pre-conditions */
DBUG_ASSERT(table->s->table_map_id != ~0UL); DBUG_ASSERT(table->s->table_map_id != ~0UL);
/* Fetch the type code for the RowsEventT template parameter */
int const general_type_code= RowsEventT::TYPE_CODE;
/* Ensure that all events in a GTID group are in the same cache */
if (variables.option_bits & OPTION_GTID_BEGIN)
is_transactional= 1;
/* /*
There is no good place to set up the transactional data, so we There is no good place to set up the transactional data, so we
have to do it here. have to do it here.
*/ */
if (binlog_setup_trx_data() == NULL) Rows_log_event* pending= binlog_get_pending_rows_event(cache_mngr,
DBUG_RETURN(NULL); use_trans_cache(thd,
is_transactional));
Rows_log_event* pending= binlog_get_pending_rows_event(is_transactional);
if (unlikely(pending && !pending->is_valid())) if (unlikely(pending && !pending->is_valid()))
DBUG_RETURN(NULL); DBUG_RETURN(NULL);
...@@ -6957,13 +6977,13 @@ THD::binlog_prepare_pending_rows_event(TABLE* table, uint32 serv_id, ...@@ -6957,13 +6977,13 @@ THD::binlog_prepare_pending_rows_event(TABLE* table, uint32 serv_id,
if (!pending || if (!pending ||
pending->server_id != serv_id || pending->server_id != serv_id ||
pending->get_table_id() != table->s->table_map_id || pending->get_table_id() != table->s->table_map_id ||
pending->get_general_type_code() != general_type_code || pending->get_general_type_code() != event_factory.type_code ||
pending->get_data_size() + needed > opt_binlog_rows_event_max_size || pending->get_data_size() + needed > opt_binlog_rows_event_max_size ||
pending->read_write_bitmaps_cmp(table) == FALSE) pending->read_write_bitmaps_cmp(table) == FALSE)
{ {
/* Create a new RowsEventT... */ /* Create a new RowsEventT... */
Rows_log_event* const Rows_log_event* const
ev= new RowsEventT(this, table, table->s->table_map_id, ev= event_factory.create(thd, table, table->s->table_map_id,
is_transactional); is_transactional);
if (unlikely(!ev)) if (unlikely(!ev))
DBUG_RETURN(NULL); DBUG_RETURN(NULL);
...@@ -6972,8 +6992,7 @@ THD::binlog_prepare_pending_rows_event(TABLE* table, uint32 serv_id, ...@@ -6972,8 +6992,7 @@ THD::binlog_prepare_pending_rows_event(TABLE* table, uint32 serv_id,
flush the pending event and replace it with the newly created flush the pending event and replace it with the newly created
event... event...
*/ */
if (unlikely( if (unlikely(bin_log->flush_and_set_pending_rows_event(thd, ev, cache_mngr,
mysql_bin_log.flush_and_set_pending_rows_event(this, ev,
is_transactional))) is_transactional)))
{ {
delete ev; delete ev;
...@@ -7109,13 +7128,10 @@ CPP_UNNAMED_NS_START ...@@ -7109,13 +7128,10 @@ CPP_UNNAMED_NS_START
CPP_UNNAMED_NS_END CPP_UNNAMED_NS_END
int THD::binlog_write_row(TABLE* table, bool is_trans, int THD::binlog_write_row(TABLE* table, MYSQL_BIN_LOG *bin_log,
binlog_cache_mngr *cache_mngr, bool is_trans,
uchar const *record) uchar const *record)
{ {
DBUG_ASSERT(is_current_stmt_binlog_format_row());
DBUG_ASSERT((WSREP_NNULL(this) && wsrep_emulate_bin_log) ||
mysql_bin_log.is_open());
/* /*
Pack records into format for transfer. We are allocating more Pack records into format for transfer. We are allocating more
memory than needed, but that doesn't matter. memory than needed, but that doesn't matter.
...@@ -7129,21 +7145,14 @@ int THD::binlog_write_row(TABLE* table, bool is_trans, ...@@ -7129,21 +7145,14 @@ int THD::binlog_write_row(TABLE* table, bool is_trans,
size_t const len= pack_row(table, table->rpl_write_set, row_data, record); size_t const len= pack_row(table, table->rpl_write_set, row_data, record);
/* Ensure that all events in a GTID group are in the same cache */ auto creator= binlog_should_compress(len) ?
if (variables.option_bits & OPTION_GTID_BEGIN) binlog_get_rows_event_creator<Write_rows_compressed_log_event>() :
is_trans= 1; binlog_get_rows_event_creator<Write_rows_log_event>();
Rows_log_event* ev; auto *ev= binlog_prepare_pending_rows_event(this, table,
if (binlog_should_compress(len)) &mysql_bin_log, cache_mngr,
ev = variables.server_id,
binlog_prepare_pending_rows_event(table, variables.server_id, len, is_trans, creator);
len, is_trans,
static_cast<Write_rows_compressed_log_event*>(0));
else
ev =
binlog_prepare_pending_rows_event(table, variables.server_id,
len, is_trans,
static_cast<Write_rows_log_event*>(0));
if (unlikely(ev == 0)) if (unlikely(ev == 0))
return HA_ERR_OUT_OF_MEM; return HA_ERR_OUT_OF_MEM;
...@@ -7151,14 +7160,11 @@ int THD::binlog_write_row(TABLE* table, bool is_trans, ...@@ -7151,14 +7160,11 @@ int THD::binlog_write_row(TABLE* table, bool is_trans,
return ev->add_row_data(row_data, len); return ev->add_row_data(row_data, len);
} }
int THD::binlog_update_row(TABLE* table, bool is_trans, int THD::binlog_update_row(TABLE* table, MYSQL_BIN_LOG *bin_log,
binlog_cache_mngr *cache_mngr, bool is_trans,
const uchar *before_record, const uchar *before_record,
const uchar *after_record) const uchar *after_record)
{ {
DBUG_ASSERT(is_current_stmt_binlog_format_row());
DBUG_ASSERT((WSREP_NNULL(this) && wsrep_emulate_bin_log) ||
mysql_bin_log.is_open());
/** /**
Save a reference to the original read bitmaps Save a reference to the original read bitmaps
We will need this to restore the bitmaps at the end as We will need this to restore the bitmaps at the end as
...@@ -7191,11 +7197,6 @@ int THD::binlog_update_row(TABLE* table, bool is_trans, ...@@ -7191,11 +7197,6 @@ int THD::binlog_update_row(TABLE* table, bool is_trans,
before_record); before_record);
size_t const after_size= pack_row(table, table->rpl_write_set, after_row, size_t const after_size= pack_row(table, table->rpl_write_set, after_row,
after_record); after_record);
/* Ensure that all events in a GTID group are in the same cache */
if (variables.option_bits & OPTION_GTID_BEGIN)
is_trans= 1;
/* /*
Don't print debug messages when running valgrind since they can Don't print debug messages when running valgrind since they can
trigger false warnings. trigger false warnings.
...@@ -7207,17 +7208,14 @@ int THD::binlog_update_row(TABLE* table, bool is_trans, ...@@ -7207,17 +7208,14 @@ int THD::binlog_update_row(TABLE* table, bool is_trans,
DBUG_DUMP("after_row", after_row, after_size); DBUG_DUMP("after_row", after_row, after_size);
#endif #endif
Rows_log_event* ev; auto creator= binlog_should_compress(before_size + after_size) ?
if(binlog_should_compress(before_size + after_size)) binlog_get_rows_event_creator<Update_rows_compressed_log_event>() :
ev = binlog_get_rows_event_creator<Update_rows_log_event>();
binlog_prepare_pending_rows_event(table, variables.server_id, auto *ev= binlog_prepare_pending_rows_event(this, table,
before_size + after_size, is_trans, &mysql_bin_log, cache_mngr,
static_cast<Update_rows_compressed_log_event*>(0)); variables.server_id,
else before_size + after_size,
ev = is_trans, creator);
binlog_prepare_pending_rows_event(table, variables.server_id,
before_size + after_size, is_trans,
static_cast<Update_rows_log_event*>(0));
if (unlikely(ev == 0)) if (unlikely(ev == 0))
return HA_ERR_OUT_OF_MEM; return HA_ERR_OUT_OF_MEM;
...@@ -7232,12 +7230,10 @@ int THD::binlog_update_row(TABLE* table, bool is_trans, ...@@ -7232,12 +7230,10 @@ int THD::binlog_update_row(TABLE* table, bool is_trans,
} }
int THD::binlog_delete_row(TABLE* table, bool is_trans, int THD::binlog_delete_row(TABLE* table, MYSQL_BIN_LOG *bin_log,
binlog_cache_mngr *cache_mngr, bool is_trans,
uchar const *record) uchar const *record)
{ {
DBUG_ASSERT(is_current_stmt_binlog_format_row());
DBUG_ASSERT((WSREP_NNULL(this) && wsrep_emulate_bin_log) ||
mysql_bin_log.is_open());
/** /**
Save a reference to the original read bitmaps Save a reference to the original read bitmaps
We will need this to restore the bitmaps at the end as We will need this to restore the bitmaps at the end as
...@@ -7268,21 +7264,13 @@ int THD::binlog_delete_row(TABLE* table, bool is_trans, ...@@ -7268,21 +7264,13 @@ int THD::binlog_delete_row(TABLE* table, bool is_trans,
DBUG_DUMP("table->read_set", (uchar*) table->read_set->bitmap, (table->s->fields + 7) / 8); DBUG_DUMP("table->read_set", (uchar*) table->read_set->bitmap, (table->s->fields + 7) / 8);
size_t const len= pack_row(table, table->read_set, row_data, record); size_t const len= pack_row(table, table->read_set, row_data, record);
/* Ensure that all events in a GTID group are in the same cache */ auto creator= binlog_should_compress(len) ?
if (variables.option_bits & OPTION_GTID_BEGIN) binlog_get_rows_event_creator<Delete_rows_compressed_log_event>() :
is_trans= 1; binlog_get_rows_event_creator<Delete_rows_log_event>();
auto *ev= binlog_prepare_pending_rows_event(this, table,
Rows_log_event* ev; &mysql_bin_log, cache_mngr,
if(binlog_should_compress(len)) variables.server_id,
ev = len, is_trans, creator);
binlog_prepare_pending_rows_event(table, variables.server_id,
len, is_trans,
static_cast<Delete_rows_compressed_log_event*>(0));
else
ev =
binlog_prepare_pending_rows_event(table, variables.server_id,
len, is_trans,
static_cast<Delete_rows_log_event*>(0));
if (unlikely(ev == 0)) if (unlikely(ev == 0))
return HA_ERR_OUT_OF_MEM; return HA_ERR_OUT_OF_MEM;
...@@ -7399,22 +7387,14 @@ int THD::binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional) ...@@ -7399,22 +7387,14 @@ int THD::binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional)
if (variables.option_bits & OPTION_GTID_BEGIN) if (variables.option_bits & OPTION_GTID_BEGIN)
is_transactional= 1; is_transactional= 1;
/* auto *cache_mngr= binlog_get_cache_mngr();
Mark the event as the last event of a statement if the stmt_end if (!cache_mngr)
flag is set. DBUG_RETURN(0);
*/
int error= 0;
if (Rows_log_event *pending= binlog_get_pending_rows_event(is_transactional))
{
if (stmt_end)
{
pending->set_flags(Rows_log_event::STMT_END_F);
reset_binlog_for_next_statement();
}
error= mysql_bin_log.flush_and_set_pending_rows_event(this, 0,
is_transactional);
}
int error=
::binlog_flush_pending_rows_event(this, stmt_end, is_transactional,
&mysql_bin_log, cache_mngr,
use_trans_cache(this, is_transactional));
DBUG_RETURN(error); DBUG_RETURN(error);
} }
......
...@@ -2944,11 +2944,14 @@ class THD: public THD_count, /* this must be first */ ...@@ -2944,11 +2944,14 @@ class THD: public THD_count, /* this must be first */
*/ */
void binlog_start_trans_and_stmt(); void binlog_start_trans_and_stmt();
void binlog_set_stmt_begin(); void binlog_set_stmt_begin();
int binlog_write_row(TABLE* table, bool is_transactional, int binlog_write_row(TABLE* table, MYSQL_BIN_LOG *bin_log,
binlog_cache_mngr *cache_mngr, bool is_transactional,
const uchar *buf); const uchar *buf);
int binlog_delete_row(TABLE* table, bool is_transactional, int binlog_delete_row(TABLE* table, MYSQL_BIN_LOG *bin_log,
binlog_cache_mngr *cache_mngr, bool is_transactional,
const uchar *buf); const uchar *buf);
int binlog_update_row(TABLE* table, bool is_transactional, int binlog_update_row(TABLE* table, MYSQL_BIN_LOG *bin_log,
binlog_cache_mngr *cache_mngr, bool is_transactional,
const uchar *old_data, const uchar *new_data); const uchar *old_data, const uchar *new_data);
bool prepare_handlers_for_update(uint flag); bool prepare_handlers_for_update(uint flag);
bool binlog_write_annotated_row(Log_event_writer *writer); bool binlog_write_annotated_row(Log_event_writer *writer);
...@@ -2963,13 +2966,7 @@ class THD: public THD_count, /* this must be first */ ...@@ -2963,13 +2966,7 @@ class THD: public THD_count, /* this must be first */
Member functions to handle pending event for row-level logging. Member functions to handle pending event for row-level logging.
*/ */
binlog_cache_mngr *binlog_get_cache_mngr() const; binlog_cache_mngr *binlog_get_cache_mngr() const;
template <class RowsEventT> Rows_log_event* void binlog_set_pending_rows_event(Rows_log_event* ev, bool use_trans_cache);
binlog_prepare_pending_rows_event(TABLE* table, uint32 serv_id,
size_t needed,
bool is_transactional,
RowsEventT* hint);
Rows_log_event* binlog_get_pending_rows_event(bool is_transactional) const;
void binlog_set_pending_rows_event(Rows_log_event* ev, bool is_transactional);
inline int binlog_flush_pending_rows_event(bool stmt_end) inline int binlog_flush_pending_rows_event(bool stmt_end)
{ {
return (binlog_flush_pending_rows_event(stmt_end, FALSE) || return (binlog_flush_pending_rows_event(stmt_end, FALSE) ||
...@@ -2981,7 +2978,8 @@ class THD: public THD_count, /* this must be first */ ...@@ -2981,7 +2978,8 @@ class THD: public THD_count, /* this must be first */
bool binlog_need_stmt_format(bool is_transactional) const bool binlog_need_stmt_format(bool is_transactional) const
{ {
return log_current_statement() && return log_current_statement() &&
!binlog_get_pending_rows_event(is_transactional); !binlog_get_pending_rows_event(binlog_get_cache_mngr(),
use_trans_cache(this, is_transactional));
} }
bool binlog_for_noop_dml(bool transactional_table); bool binlog_for_noop_dml(bool transactional_table);
......
...@@ -243,7 +243,8 @@ size_t Wsrep_client_service::bytes_generated() const ...@@ -243,7 +243,8 @@ size_t Wsrep_client_service::bytes_generated() const
if (cache) if (cache)
{ {
size_t pending_rows_event_length= 0; size_t pending_rows_event_length= 0;
if (Rows_log_event* ev= m_thd->binlog_get_pending_rows_event(true)) auto *cache_mngr= m_thd->binlog_get_cache_mngr();
if (auto* ev= binlog_get_pending_rows_event(cache_mngr, true))
{ {
pending_rows_event_length= ev->get_data_size(); pending_rows_event_length= ev->get_data_size();
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment