Commit 8bd5301a authored by Vicențiu Ciorbaru's avatar Vicențiu Ciorbaru

[MDEV-6877] Add binlog writing according to columns

The logging function now writes to the binlog according
to the bitmap columns implied by the binlog_row_image variable.
parent edff3f3f
......@@ -5658,8 +5658,7 @@ static int write_locked_table_maps(THD *thd)
}
typedef bool Log_func(THD*, TABLE*, bool, MY_BITMAP*,
uint, const uchar*, const uchar*);
typedef bool Log_func(THD*, TABLE*, bool, const uchar*, const uchar*);
static int binlog_log_row(TABLE* table,
const uchar *before_record,
......@@ -5705,8 +5704,7 @@ static int binlog_log_row(TABLE* table,
*/
bool const has_trans= thd->lex->sql_command == SQLCOM_CREATE_TABLE ||
table->file->has_transactions();
error= (*log_func)(thd, table, has_trans, &cols, table->s->fields,
before_record, after_record);
error= (*log_func)(thd, table, has_trans, before_record, after_record);
}
if (!use_bitbuf)
my_bitmap_free(&cols);
......
......@@ -4528,14 +4528,11 @@ class Write_rows_log_event : public Rows_log_event
#if defined(MYSQL_SERVER)
static bool binlog_row_logging_function(THD *thd, TABLE *table,
bool is_transactional,
MY_BITMAP *cols,
uint fields,
const uchar *before_record
__attribute__((unused)),
const uchar *after_record)
{
return thd->binlog_write_row(table, is_transactional,
cols, fields, after_record);
return thd->binlog_write_row(table, is_transactional, after_record);
}
#endif
......@@ -4596,13 +4593,11 @@ class Update_rows_log_event : public Rows_log_event
#ifdef MYSQL_SERVER
static bool binlog_row_logging_function(THD *thd, TABLE *table,
bool is_transactional,
MY_BITMAP *cols,
uint fields,
const uchar *before_record,
const uchar *after_record)
{
return thd->binlog_update_row(table, is_transactional,
cols, fields, before_record, after_record);
before_record, after_record);
}
#endif
......@@ -4668,14 +4663,12 @@ class Delete_rows_log_event : public Rows_log_event
#ifdef MYSQL_SERVER
static bool binlog_row_logging_function(THD *thd, TABLE *table,
bool is_transactional,
MY_BITMAP *cols,
uint fields,
const uchar *before_record,
const uchar *after_record
__attribute__((unused)))
{
return thd->binlog_delete_row(table, is_transactional,
cols, fields, before_record);
before_record);
}
#endif
......
......@@ -369,14 +369,11 @@ class Write_rows_log_event_old : public Old_rows_log_event
#if !defined(MYSQL_CLIENT)
static bool binlog_row_logging_function(THD *thd, TABLE *table,
bool is_transactional,
MY_BITMAP *cols,
uint fields,
const uchar *before_record
__attribute__((unused)),
const uchar *after_record)
{
return thd->binlog_write_row(table, is_transactional,
cols, fields, after_record);
return thd->binlog_write_row(table, is_transactional, after_record);
}
#endif
......@@ -452,7 +449,7 @@ class Update_rows_log_event_old : public Old_rows_log_event
const uchar *after_record)
{
return thd->binlog_update_row(table, is_transactional,
cols, fields, before_record, after_record);
before_record, after_record);
}
#endif
......@@ -526,8 +523,7 @@ class Delete_rows_log_event_old : public Old_rows_log_event
const uchar *after_record
__attribute__((unused)))
{
return thd->binlog_delete_row(table, is_transactional,
cols, fields, before_record);
return thd->binlog_delete_row(table, is_transactional, before_record);
}
#endif
......
......@@ -6112,9 +6112,8 @@ CPP_UNNAMED_NS_START
CPP_UNNAMED_NS_END
int THD::binlog_write_row(TABLE* table, bool is_trans,
MY_BITMAP const* cols, size_t colcnt,
uchar const *record)
int THD::binlog_write_row(TABLE* table, bool is_trans,
uchar const *record)
{
DBUG_ASSERT(is_current_stmt_binlog_format_row() &&
......@@ -6129,7 +6128,7 @@ int THD::binlog_write_row(TABLE* table, bool is_trans,
uchar *row_data= memory.slot(0);
size_t const len= pack_row(table, cols, row_data, record);
size_t const len= pack_row(table, table->write_set, row_data, record);
/* Ensure that all events in a GTID group are in the same cache */
if (variables.option_bits & OPTION_GTID_BEGIN)
......@@ -6147,13 +6146,19 @@ int THD::binlog_write_row(TABLE* table, bool is_trans,
}
int THD::binlog_update_row(TABLE* table, bool is_trans,
MY_BITMAP const* cols, size_t colcnt,
const uchar *before_record,
const uchar *after_record)
{
DBUG_ASSERT(is_current_stmt_binlog_format_row() &&
((WSREP(this) && wsrep_emulate_bin_log) || mysql_bin_log.is_open()));
/**
Save a reference to the original read and write set bitmaps.
We will need this to restore the bitmaps at the end.
*/
MY_BITMAP *old_read_set= table->read_set;
MY_BITMAP *old_write_set= table->write_set;
size_t const before_maxlen = max_row_length(table, before_record);
size_t const after_maxlen = max_row_length(table, after_record);
......@@ -6164,9 +6169,9 @@ int THD::binlog_update_row(TABLE* table, bool is_trans,
uchar *before_row= row_data.slot(0);
uchar *after_row= row_data.slot(1);
size_t const before_size= pack_row(table, cols, before_row,
size_t const before_size= pack_row(table, table->read_set, before_row,
before_record);
size_t const after_size= pack_row(table, cols, after_row,
size_t const after_size= pack_row(table, table->write_set, after_row,
after_record);
/* Ensure that all events in a GTID group are in the same cache */
......@@ -6192,19 +6197,37 @@ int THD::binlog_update_row(TABLE* table, bool is_trans,
if (unlikely(ev == 0))
return HA_ERR_OUT_OF_MEM;
return
ev->add_row_data(before_row, before_size) ||
ev->add_row_data(after_row, after_size);
int error= ev->add_row_data(before_row, before_size) ||
ev->add_row_data(after_row, after_size);
/* restore read/write set for the rest of execution */
table->column_bitmaps_set_no_signal(old_read_set,
old_write_set);
return error;
}
int THD::binlog_delete_row(TABLE* table, bool is_trans,
MY_BITMAP const* cols, size_t colcnt,
uchar const *record)
{
DBUG_ASSERT(is_current_stmt_binlog_format_row() &&
((WSREP(this) && wsrep_emulate_bin_log) || mysql_bin_log.is_open()));
/**
Save a reference to the original read and write set bitmaps.
We will need this to restore the bitmaps at the end.
*/
MY_BITMAP *old_read_set= table->read_set;
MY_BITMAP *old_write_set= table->write_set;
/*
/**
This will remove spurious fields required during execution but
not needed for binlogging. This is done according to the:
binlog-row-image option.
*/
binlog_prepare_row_images(table);
/*
Pack records into format for transfer. We are allocating more
memory than needed, but that doesn't matter.
*/
......@@ -6214,7 +6237,8 @@ int THD::binlog_delete_row(TABLE* table, bool is_trans,
uchar *row_data= memory.slot(0);
size_t const len= pack_row(table, cols, row_data, record);
DBUG_DUMP("table->read_set", (uchar*) table->read_set->bitmap, (table->s->fields + 7) / 8);
size_t const len= pack_row(table, table->read_set, row_data, record);
/* Ensure that all events in a GTID group are in the same cache */
if (variables.option_bits & OPTION_GTID_BEGIN)
......@@ -6228,10 +6252,81 @@ int THD::binlog_delete_row(TABLE* table, bool is_trans,
if (unlikely(ev == 0))
return HA_ERR_OUT_OF_MEM;
return ev->add_row_data(row_data, len);
int error= ev->add_row_data(row_data, len);
/* restore read/write set for the rest of execution */
table->column_bitmaps_set_no_signal(old_read_set,
old_write_set);
return error;
}
void THD::binlog_prepare_row_images(TABLE *table)
{
DBUG_ENTER("THD::binlog_prepare_row_images");
/**
Remove from read_set spurious columns. The write_set has been
handled before in table->mark_columns_needed_for_update.
*/
DBUG_PRINT_BITSET("debug", "table->read_set (before preparing): %s", table->read_set);
THD *thd= table->in_use;
/**
if there is a primary key in the table (ie, user declared PK or a
non-null unique index) and we dont want to ship the entire image,
and the handler involved supports this.
*/
if (table->s->primary_key < MAX_KEY &&
(thd->variables.binlog_row_image < BINLOG_ROW_IMAGE_FULL) &&
!ha_check_storage_engine_flag(table->s->db_type(), HTON_NO_BINLOG_ROW_OPT))
{
/**
Just to be sure that tmp_set is currently not in use as
the read_set already.
*/
DBUG_ASSERT(table->read_set != &table->tmp_set);
bitmap_clear_all(&table->tmp_set);
switch(thd->variables.binlog_row_image)
{
case BINLOG_ROW_IMAGE_MINIMAL:
/* MINIMAL: Mark only PK */
table->mark_columns_used_by_index_no_reset(table->s->primary_key,
&table->tmp_set);
break;
case BINLOG_ROW_IMAGE_NOBLOB:
/**
NOBLOB: Remove unnecessary BLOB fields from read_set
(the ones that are not part of PK).
*/
bitmap_union(&table->tmp_set, table->read_set);
for (Field **ptr=table->field ; *ptr ; ptr++)
{
Field *field= (*ptr);
if ((field->type() == MYSQL_TYPE_BLOB) &&
!(field->flags & PRI_KEY_FLAG))
bitmap_clear_bit(&table->tmp_set, field->field_index);
}
break;
default:
DBUG_ASSERT(0); // impossible.
}
/* set the temporary read_set */
table->column_bitmaps_set_no_signal(&table->tmp_set,
table->write_set);
}
DBUG_PRINT_BITSET("debug", "table->read_set (after preparing): %s", table->read_set);
DBUG_VOID_RETURN;
}
int THD::binlog_remove_pending_rows_event(bool clear_maps,
bool is_transactional)
{
......
......@@ -2127,14 +2127,12 @@ class THD :public Statement,
int binlog_write_table_map(TABLE *table, bool is_transactional,
my_bool *with_annotate= 0);
int binlog_write_row(TABLE* table, bool is_transactional,
MY_BITMAP const* cols, size_t colcnt,
const uchar *buf);
int binlog_delete_row(TABLE* table, bool is_transactional,
MY_BITMAP const* cols, size_t colcnt,
const uchar *buf);
int binlog_update_row(TABLE* table, bool is_transactional,
MY_BITMAP const* cols, size_t colcnt,
const uchar *old_data, const uchar *new_data);
void binlog_prepare_row_images(TABLE* table);
void set_server_id(uint32 sid) { variables.server_id = sid; }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment