Commit f43ca025 authored by Mats Kindahl's avatar Mats Kindahl

BUG#49618: Field length stored incorrectly in binary log

           for InnoDB

The class Field_bit_as_char stores the metadata for the
field incorrecly because bytes_in_rec and bit_len are set
to (field_length + 7 ) / 8 and 0 respectively, while
Field_bit has the correct values field_length / 8 and
field_length % 8.

Solved the problem by re-computing the values for the
metadata based on the field_length instead of using the
bytes_in_rec and bit_len variables.

To handle compatibility with old server, a table map
flag was added to indicate that the bit computation is
exact. If the flag is clear, the slave computes the
number of bytes required to store the bit field and
compares that instead, effectively allowing replication
*without conversion* from any field length that require
the same number of bytes to store.
parent c701fe6a
stop slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
reset master;
reset slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
start slave;
**** Resetting master and slave ****
include/stop_slave.inc
RESET SLAVE;
RESET MASTER;
include/start_slave.inc
SET @saved_slave_type_conversions = @@GLOBAL.SLAVE_TYPE_CONVERSIONS;
SET GLOBAL SLAVE_TYPE_CONVERSIONS = '';
CREATE TABLE t1(b1 BIT(1), b2 BIT(2), b3 BIT(3)) ENGINE=InnoDB;
INSERT INTO t1 VALUES (b'0', b'01', b'101');
Comparing tables master:test.t1 and slave:test.t1
DROP TABLE t1;
SET GLOBAL SLAVE_TYPE_CONVERSIONS = @saved_slave_type_conversions;
--innodb
\ No newline at end of file
--source include/have_binlog_format_row.inc
--source include/have_innodb.inc
--source include/master-slave.inc
#
# BUG#49618: Field length stored incorrectly in binary log for InnoDB
#
source include/reset_master_and_slave.inc;
connection slave;
SET @saved_slave_type_conversions = @@GLOBAL.SLAVE_TYPE_CONVERSIONS;
SET GLOBAL SLAVE_TYPE_CONVERSIONS = '';
connection master;
CREATE TABLE t1(b1 BIT(1), b2 BIT(2), b3 BIT(3)) ENGINE=InnoDB;
INSERT INTO t1 VALUES (b'0', b'01', b'101');
sync_slave_with_master;
let $diff_table_1=master:test.t1;
let $diff_table_2=slave:test.t1;
source include/diff_tables.inc;
connection master;
DROP TABLE t1;
sync_slave_with_master;
connection slave;
SET GLOBAL SLAVE_TYPE_CONVERSIONS = @saved_slave_type_conversions;
...@@ -1407,6 +1407,7 @@ bool Field::send_binary(Protocol *protocol) ...@@ -1407,6 +1407,7 @@ bool Field::send_binary(Protocol *protocol)
type, 0 is returned in <code>*order_var</code>. type, 0 is returned in <code>*order_var</code>.
@param field_metadata Encoded size in field metadata @param field_metadata Encoded size in field metadata
@param mflags Flags from the table map event for the table.
@param order_var Pointer to variable where the order @param order_var Pointer to variable where the order
between the source field and this field between the source field and this field
will be returned. will be returned.
...@@ -1416,6 +1417,7 @@ bool Field::send_binary(Protocol *protocol) ...@@ -1416,6 +1417,7 @@ bool Field::send_binary(Protocol *protocol)
*/ */
bool Field::compatible_field_size(uint field_metadata, bool Field::compatible_field_size(uint field_metadata,
Relay_log_info *rli_arg __attribute__((unused)), Relay_log_info *rli_arg __attribute__((unused)),
uint16 mflags __attribute__((unused)),
int *order_var) int *order_var)
{ {
uint const source_size= pack_length_from_metadata(field_metadata); uint const source_size= pack_length_from_metadata(field_metadata);
...@@ -2950,6 +2952,7 @@ uint Field_new_decimal::pack_length_from_metadata(uint field_metadata) ...@@ -2950,6 +2952,7 @@ uint Field_new_decimal::pack_length_from_metadata(uint field_metadata)
bool Field_new_decimal::compatible_field_size(uint field_metadata, bool Field_new_decimal::compatible_field_size(uint field_metadata,
Relay_log_info * __attribute__((unused)), Relay_log_info * __attribute__((unused)),
uint16 mflags __attribute__((unused)),
int *order_var) int *order_var)
{ {
uint const source_precision= (field_metadata >> 8U) & 0x00ff; uint const source_precision= (field_metadata >> 8U) & 0x00ff;
...@@ -6733,6 +6736,7 @@ check_field_for_37426(const void *param_arg) ...@@ -6733,6 +6736,7 @@ check_field_for_37426(const void *param_arg)
bool bool
Field_string::compatible_field_size(uint field_metadata, Field_string::compatible_field_size(uint field_metadata,
Relay_log_info *rli_arg, Relay_log_info *rli_arg,
uint16 mflags __attribute__((unused)),
int *order_var) int *order_var)
{ {
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
...@@ -6741,7 +6745,7 @@ Field_string::compatible_field_size(uint field_metadata, ...@@ -6741,7 +6745,7 @@ Field_string::compatible_field_size(uint field_metadata,
check_field_for_37426, &check_param)) check_field_for_37426, &check_param))
return FALSE; // Not compatible field sizes return FALSE; // Not compatible field sizes
#endif #endif
return Field::compatible_field_size(field_metadata, rli_arg, order_var); return Field::compatible_field_size(field_metadata, rli_arg, mflags, order_var);
} }
...@@ -9305,8 +9309,13 @@ int Field_bit::do_save_field_metadata(uchar *metadata_ptr) ...@@ -9305,8 +9309,13 @@ int Field_bit::do_save_field_metadata(uchar *metadata_ptr)
DBUG_ENTER("Field_bit::do_save_field_metadata"); DBUG_ENTER("Field_bit::do_save_field_metadata");
DBUG_PRINT("debug", ("bit_len: %d, bytes_in_rec: %d", DBUG_PRINT("debug", ("bit_len: %d, bytes_in_rec: %d",
bit_len, bytes_in_rec)); bit_len, bytes_in_rec));
*metadata_ptr= bit_len; /*
*(metadata_ptr + 1)= bytes_in_rec; Since this class and Field_bit_as_char have different ideas of
what should be stored here, we compute the values of the metadata
explicitly using the field_length.
*/
metadata_ptr[0]= field_length % 8;
metadata_ptr[1]= field_length / 8;
DBUG_RETURN(2); DBUG_RETURN(2);
} }
...@@ -9335,15 +9344,29 @@ uint Field_bit::pack_length_from_metadata(uint field_metadata) ...@@ -9335,15 +9344,29 @@ uint Field_bit::pack_length_from_metadata(uint field_metadata)
bool bool
Field_bit::compatible_field_size(uint field_metadata, Field_bit::compatible_field_size(uint field_metadata,
Relay_log_info * __attribute__((unused)), Relay_log_info * __attribute__((unused)),
uint16 mflags,
int *order_var) int *order_var)
{ {
DBUG_ENTER("Field_bit::compatible_field_size"); DBUG_ENTER("Field_bit::compatible_field_size");
DBUG_ASSERT((field_metadata >> 16) == 0); DBUG_ASSERT((field_metadata >> 16) == 0);
uint const from_bit_len= uint from_bit_len=
8 * (field_metadata >> 8) + (field_metadata & 0xff); 8 * (field_metadata >> 8) + (field_metadata & 0xff);
uint const to_bit_len= max_display_length(); uint to_bit_len= max_display_length();
DBUG_PRINT("debug", ("from_bit_len: %u, to_bit_len: %u", DBUG_PRINT("debug", ("from_bit_len: %u, to_bit_len: %u",
from_bit_len, to_bit_len)); from_bit_len, to_bit_len));
/*
If the bit length exact flag is clear, we are dealing with an old
master, so we allow some less strict behaviour if replicating by
moving both bit lengths to an even multiple of 8.
We do this by computing the number of bytes to store the field
instead, and then compare the result.
*/
if (!(mflags & Table_map_log_event::TM_BIT_LEN_EXACT_F)) {
from_bit_len= (from_bit_len + 7) / 8;
to_bit_len= (to_bit_len + 7) / 8;
}
*order_var= compare(from_bit_len, to_bit_len); *order_var= compare(from_bit_len, to_bit_len);
DBUG_RETURN(TRUE); DBUG_RETURN(TRUE);
} }
......
...@@ -165,7 +165,7 @@ class Field ...@@ -165,7 +165,7 @@ class Field
*/ */
virtual uint32 pack_length_in_rec() const { return pack_length(); } virtual uint32 pack_length_in_rec() const { return pack_length(); }
virtual bool compatible_field_size(uint metadata, Relay_log_info *rli, virtual bool compatible_field_size(uint metadata, Relay_log_info *rli,
int *order); uint16 mflags, int *order);
virtual uint pack_length_from_metadata(uint field_metadata) virtual uint pack_length_from_metadata(uint field_metadata)
{ {
DBUG_ENTER("Field::pack_length_from_metadata"); DBUG_ENTER("Field::pack_length_from_metadata");
...@@ -805,7 +805,7 @@ class Field_new_decimal :public Field_num { ...@@ -805,7 +805,7 @@ class Field_new_decimal :public Field_num {
uint pack_length_from_metadata(uint field_metadata); uint pack_length_from_metadata(uint field_metadata);
uint row_pack_length() { return pack_length(); } uint row_pack_length() { return pack_length(); }
bool compatible_field_size(uint field_metadata, Relay_log_info *rli, bool compatible_field_size(uint field_metadata, Relay_log_info *rli,
int *order_var); uint16 mflags, int *order_var);
uint is_equal(Create_field *new_field); uint is_equal(Create_field *new_field);
virtual const uchar *unpack(uchar* to, const uchar *from, virtual const uchar *unpack(uchar* to, const uchar *from,
uint param_data, bool low_byte_first); uint param_data, bool low_byte_first);
...@@ -1500,7 +1500,7 @@ class Field_string :public Field_longstr { ...@@ -1500,7 +1500,7 @@ class Field_string :public Field_longstr {
return (((field_metadata >> 4) & 0x300) ^ 0x300) + (field_metadata & 0x00ff); return (((field_metadata >> 4) & 0x300) ^ 0x300) + (field_metadata & 0x00ff);
} }
bool compatible_field_size(uint field_metadata, Relay_log_info *rli, bool compatible_field_size(uint field_metadata, Relay_log_info *rli,
int *order_var); uint16 mflags, int *order_var);
uint row_pack_length() { return field_length; } uint row_pack_length() { return field_length; }
int pack_cmp(const uchar *a,const uchar *b,uint key_length, int pack_cmp(const uchar *a,const uchar *b,uint key_length,
my_bool insert_or_update); my_bool insert_or_update);
...@@ -1964,7 +1964,7 @@ class Field_bit :public Field { ...@@ -1964,7 +1964,7 @@ class Field_bit :public Field {
uint row_pack_length() uint row_pack_length()
{ return (bytes_in_rec + ((bit_len > 0) ? 1 : 0)); } { return (bytes_in_rec + ((bit_len > 0) ? 1 : 0)); }
bool compatible_field_size(uint metadata, Relay_log_info *rli, bool compatible_field_size(uint metadata, Relay_log_info *rli,
int *order_var); uint16 mflags, int *order_var);
void sql_type(String &str) const; void sql_type(String &str) const;
virtual uchar *pack(uchar *to, const uchar *from, virtual uchar *pack(uchar *to, const uchar *from,
uint max_length, bool low_byte_first); uint max_length, bool low_byte_first);
......
...@@ -3846,11 +3846,8 @@ int THD::binlog_write_table_map(TABLE *table, bool is_trans) ...@@ -3846,11 +3846,8 @@ int THD::binlog_write_table_map(TABLE *table, bool is_trans)
DBUG_ASSERT(current_stmt_binlog_row_based && mysql_bin_log.is_open()); DBUG_ASSERT(current_stmt_binlog_row_based && mysql_bin_log.is_open());
DBUG_ASSERT(table->s->table_map_id != ULONG_MAX); DBUG_ASSERT(table->s->table_map_id != ULONG_MAX);
Table_map_log_event::flag_set const
flags= Table_map_log_event::TM_NO_FLAGS;
Table_map_log_event Table_map_log_event
the_event(this, table, table->s->table_map_id, is_trans, flags); the_event(this, table, table->s->table_map_id, is_trans);
if (is_trans && binlog_table_maps == 0) if (is_trans && binlog_table_maps == 0)
binlog_start_trans_and_stmt(); binlog_start_trans_and_stmt();
......
...@@ -7852,7 +7852,7 @@ int Table_map_log_event::save_field_metadata() ...@@ -7852,7 +7852,7 @@ int Table_map_log_event::save_field_metadata()
*/ */
#if !defined(MYSQL_CLIENT) #if !defined(MYSQL_CLIENT)
Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid, Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid,
bool is_transactional, uint16 flags) bool is_transactional)
: Log_event(thd, 0, true), : Log_event(thd, 0, true),
m_table(tbl), m_table(tbl),
m_dbnam(tbl->s->db.str), m_dbnam(tbl->s->db.str),
...@@ -7862,7 +7862,7 @@ Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid, ...@@ -7862,7 +7862,7 @@ Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid,
m_colcnt(tbl->s->fields), m_colcnt(tbl->s->fields),
m_memory(NULL), m_memory(NULL),
m_table_id(tid), m_table_id(tid),
m_flags(flags), m_flags(TM_BIT_LEN_EXACT_F),
m_data_size(0), m_data_size(0),
m_field_metadata(0), m_field_metadata(0),
m_field_metadata_size(0), m_field_metadata_size(0),
...@@ -8120,8 +8120,10 @@ int Table_map_log_event::do_apply_event(Relay_log_info const *rli) ...@@ -8120,8 +8120,10 @@ int Table_map_log_event::do_apply_event(Relay_log_info const *rli)
inside Relay_log_info::clear_tables_to_lock() by calling the inside Relay_log_info::clear_tables_to_lock() by calling the
table_def destructor explicitly. table_def destructor explicitly.
*/ */
new (&table_list->m_tabledef) table_def(m_coltype, m_colcnt, new (&table_list->m_tabledef)
m_field_metadata, m_field_metadata_size, m_null_bits); table_def(m_coltype, m_colcnt,
m_field_metadata, m_field_metadata_size,
m_null_bits, m_flags);
table_list->m_tabledef_valid= TRUE; table_list->m_tabledef_valid= TRUE;
/* /*
......
...@@ -3283,16 +3283,14 @@ class Table_map_log_event : public Log_event ...@@ -3283,16 +3283,14 @@ class Table_map_log_event : public Log_event
/* Special constants representing sets of flags */ /* Special constants representing sets of flags */
enum enum
{ {
TM_NO_FLAGS = 0U TM_NO_FLAGS = 0U,
TM_BIT_LEN_EXACT_F = (1U << 0)
}; };
void set_flags(flag_set flag) { m_flags |= flag; }
void clear_flags(flag_set flag) { m_flags &= ~flag; }
flag_set get_flags(flag_set flag) const { return m_flags & flag; } flag_set get_flags(flag_set flag) const { return m_flags & flag; }
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
Table_map_log_event(THD *thd, TABLE *tbl, ulong tid, Table_map_log_event(THD *thd, TABLE *tbl, ulong tid, bool is_transactional);
bool is_transactional, uint16 flags);
#endif #endif
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
Table_map_log_event(const char *buf, uint event_len, Table_map_log_event(const char *buf, uint event_len,
...@@ -3305,7 +3303,7 @@ class Table_map_log_event : public Log_event ...@@ -3305,7 +3303,7 @@ class Table_map_log_event : public Log_event
table_def *create_table_def() table_def *create_table_def()
{ {
return new table_def(m_coltype, m_colcnt, m_field_metadata, return new table_def(m_coltype, m_colcnt, m_field_metadata,
m_field_metadata_size, m_null_bits); m_field_metadata_size, m_null_bits, m_flags);
} }
ulong get_table_id() const { return m_table_id; } ulong get_table_id() const { return m_table_id; }
const char *get_table_name() const { return m_tblnam; } const char *get_table_name() const { return m_tblnam; }
......
...@@ -510,6 +510,9 @@ void show_sql_type(enum_field_types type, uint16 metadata, String *str) ...@@ -510,6 +510,9 @@ void show_sql_type(enum_field_types type, uint16 metadata, String *str)
/** /**
Check the order variable and print errors if the order is not Check the order variable and print errors if the order is not
acceptable according to the current settings. acceptable according to the current settings.
@param order The computed order of the conversion needed.
@param rli The relay log info data structure: for error reporting.
*/ */
bool is_conversion_ok(int order, Relay_log_info *rli) bool is_conversion_ok(int order, Relay_log_info *rli)
{ {
...@@ -562,6 +565,7 @@ bool is_conversion_ok(int order, Relay_log_info *rli) ...@@ -562,6 +565,7 @@ bool is_conversion_ok(int order, Relay_log_info *rli)
@param[in] type Source field type @param[in] type Source field type
@param[in] metadata Source field metadata @param[in] metadata Source field metadata
@param[in] rli Relay log info (for error reporting) @param[in] rli Relay log info (for error reporting)
@param[in] mflags Flags from the table map event
@param[out] order Order between source field and target field @param[out] order Order between source field and target field
@return @c true if conversion is possible according to the current @return @c true if conversion is possible according to the current
...@@ -571,7 +575,8 @@ bool is_conversion_ok(int order, Relay_log_info *rli) ...@@ -571,7 +575,8 @@ bool is_conversion_ok(int order, Relay_log_info *rli)
static bool static bool
can_convert_field_to(Field *field, can_convert_field_to(Field *field,
enum_field_types source_type, uint16 metadata, enum_field_types source_type, uint16 metadata,
Relay_log_info *rli, int *order_var) Relay_log_info *rli, uint16 mflags,
int *order_var)
{ {
DBUG_ENTER("can_convert_field_to"); DBUG_ENTER("can_convert_field_to");
#ifndef DBUG_OFF #ifndef DBUG_OFF
...@@ -588,7 +593,7 @@ can_convert_field_to(Field *field, ...@@ -588,7 +593,7 @@ can_convert_field_to(Field *field,
if (field->real_type() == source_type) if (field->real_type() == source_type)
{ {
DBUG_PRINT("debug", ("Base types are identical, doing field size comparison")); DBUG_PRINT("debug", ("Base types are identical, doing field size comparison"));
if (field->compatible_field_size(metadata, rli, order_var)) if (field->compatible_field_size(metadata, rli, mflags, order_var))
DBUG_RETURN(is_conversion_ok(*order_var, rli)); DBUG_RETURN(is_conversion_ok(*order_var, rli));
else else
DBUG_RETURN(false); DBUG_RETURN(false);
...@@ -765,7 +770,7 @@ table_def::compatible_with(THD *thd, Relay_log_info *rli, ...@@ -765,7 +770,7 @@ table_def::compatible_with(THD *thd, Relay_log_info *rli,
{ {
Field *const field= table->field[col]; Field *const field= table->field[col];
int order; int order;
if (can_convert_field_to(field, type(col), field_metadata(col), rli, &order)) if (can_convert_field_to(field, type(col), field_metadata(col), rli, m_flags, &order))
{ {
DBUG_PRINT("debug", ("Checking column %d -" DBUG_PRINT("debug", ("Checking column %d -"
" field '%s' can be converted - order: %d", " field '%s' can be converted - order: %d",
...@@ -937,9 +942,10 @@ TABLE *table_def::create_conversion_table(THD *thd, Relay_log_info *rli, TABLE * ...@@ -937,9 +942,10 @@ TABLE *table_def::create_conversion_table(THD *thd, Relay_log_info *rli, TABLE *
table_def::table_def(unsigned char *types, ulong size, table_def::table_def(unsigned char *types, ulong size,
uchar *field_metadata, int metadata_size, uchar *field_metadata, int metadata_size,
uchar *null_bitmap) uchar *null_bitmap, uint16 flags)
: m_size(size), m_type(0), m_field_metadata_size(metadata_size), : m_size(size), m_type(0), m_field_metadata_size(metadata_size),
m_field_metadata(0), m_null_bits(0), m_memory(NULL) m_field_metadata(0), m_null_bits(0), m_flags(flags),
m_memory(NULL)
{ {
m_memory= (uchar *)my_multi_malloc(MYF(MY_WME), m_memory= (uchar *)my_multi_malloc(MYF(MY_WME),
&m_type, size, &m_type, size,
......
...@@ -33,12 +33,6 @@ class Relay_log_info; ...@@ -33,12 +33,6 @@ class Relay_log_info;
- Extract and decode table definition data from the table map event - Extract and decode table definition data from the table map event
- Check if table definition in table map is compatible with table - Check if table definition in table map is compatible with table
definition on slave definition on slave
Currently, the only field type data available is an array of the
type operators that are present in the table map event.
@todo Add type operands to this structure to allow detection of
difference between, e.g., BIT(5) and BIT(10).
*/ */
class table_def class table_def
...@@ -54,7 +48,7 @@ class table_def ...@@ -54,7 +48,7 @@ class table_def
@param null_bitmap The bitmap of fields that can be null @param null_bitmap The bitmap of fields that can be null
*/ */
table_def(unsigned char *types, ulong size, uchar *field_metadata, table_def(unsigned char *types, ulong size, uchar *field_metadata,
int metadata_size, uchar *null_bitmap); int metadata_size, uchar *null_bitmap, uint16 flags);
~table_def(); ~table_def();
...@@ -215,6 +209,7 @@ class table_def ...@@ -215,6 +209,7 @@ class table_def
uint m_field_metadata_size; uint m_field_metadata_size;
uint16 *m_field_metadata; uint16 *m_field_metadata;
uchar *m_null_bits; uchar *m_null_bits;
uint16 m_flags; // Table flags
uchar *m_memory; uchar *m_memory;
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment