Commit dededdec authored by unknown's avatar unknown

BUG#13418 (V2): Bit columns should replicate correctly when using RBR


mysql-test/r/rpl_bit_npk.result:
  Updated results
mysql-test/t/disabled.def:
  rpl_bit_npk now works
sql/field.h:
  Field_bit::cmp_binary_offset wrongly used base class method that does not work for Field_bit
  This was discussed with Monty and should be pushed into 5.0 too
sql/log_event.cc:
  Added checks for null bits
  Swapped use of m_after_image (was m_search_record) and table->record[1] to use record[i] 
  in the same way as other MySQL code (i.e. use record[1] for scan data).
  Removed use of cmp_binary in record_compare (it is currently wrong to use that 
  without copying the null bits to the compare data record)
sql/log_event.h:
  Name change to indicate new semantics
parent b9697b20
......@@ -45,25 +45,27 @@ GSuppDf TINYINT,
VNotSupp TINYINT,
x034 TINYINT);
LOCK TABLES test.t1 WRITE;
INSERT INTO test.t1 VALUES (6,1,1,1,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,b'111111',b'111110',b'110101',4,5,5,5,5,5,5,5,5,5,3,2,1);
INSERT INTO test.t1 VALUES (6,1,1,1,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,b'111111',b'111110',b'110101',4,5,5,5,5,5,5,5,5,5,3,NULL,1);
INSERT INTO test.t1 VALUES (1,1,1,1,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,b'111111',b'000000',b'100100',4,5,5,5,5,5,5,5,5,5,3,2,1);
INSERT INTO test.t1 VALUES (2,1,1,1,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,b'000000',b'101010',b'010101',4,5,5,5,5,5,5,5,5,5,3,2,1);
INSERT INTO test.t1 VALUES (3,1,1,1,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,b'101010',b'111111',b'000000',4,5,5,5,5,5,5,5,5,5,3,2,1);
INSERT INTO test.t1 VALUES (4,1,1,1,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,4,5,5,5,5,5,5,5,5,5,3,2,1);
INSERT INTO test.t1 VALUES (4,1,1,1,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,b'0',1,1,4,5,5,5,5,5,5,5,5,5,3,2,1);
INSERT INTO test.t1 VALUES (5,1,1,1,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,4,5,5,5,5,5,5,5,5,5,3,2,1);
INSERT INTO test.t1 VALUES (7,1,1,1,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,4,5,5,5,5,5,5,5,5,5,3,2,1);
INSERT INTO test.t1 VALUES (8,1,1,1,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,4,5,5,5,5,5,5,5,5,5,3,2,1);
UNLOCK TABLES;
UPDATE test.t1 set x034 = 50 where bit3 = b'000000';
UPDATE test.t1 set VNotSupp = 33 where bit1 = b'0';
SELECT oSupp, sSuppD, GSuppDf, VNotSupp, x034 FROM test.t1;
oSupp sSuppD GSuppDf VNotSupp x034
5 5 3 2 1
5 5 3 2 1
5 5 3 2 1
5 5 3 2 1
5 5 3 2 1
5 5 3 2 1
5 5 3 2 1
5 5 3 2 1
5 5 3 NULL 1
5 5 3 2 1
5 5 3 33 1
5 5 3 2 50
5 5 3 33 1
5 5 3 33 1
5 5 3 33 1
5 5 3 33 1
SELECT hex(bit1) from test.t1;
hex(bit1)
3F
......@@ -96,14 +98,14 @@ hex(bit3)
1
SELECT oSupp, sSuppD, GSuppDf, VNotSupp, x034 FROM test.t1;
oSupp sSuppD GSuppDf VNotSupp x034
5 5 3 2 1
5 5 3 2 1
5 5 3 2 1
5 5 3 2 1
5 5 3 2 1
5 5 3 2 1
5 5 3 2 1
5 5 3 2 1
5 5 3 NULL 1
5 5 3 2 1
5 5 3 33 1
5 5 3 2 50
5 5 3 33 1
5 5 3 33 1
5 5 3 33 1
5 5 3 33 1
SELECT hex(bit1) from test.t1;
hex(bit1)
3F
......@@ -134,4 +136,30 @@ hex(bit3)
1
1
1
CREATE TABLE test.t2 (a INT, b BIT(1));
INSERT INTO test.t2 VALUES (1, b'0');
INSERT INTO test.t2 VALUES (1, b'1');
UPDATE test.t2 SET a = 2 WHERE b = b'1';
CREATE TABLE test.t3 (a INT, b INT);
INSERT INTO test.t3 VALUES (1, NULL);
INSERT INTO test.t3 VALUES (1, 0);
UPDATE test.t3 SET a = 2 WHERE b = 0;
SELECT a, hex(b) FROM test.t2;
a hex(b)
1 0
2 1
SELECT * FROM test.t3;
a b
1 NULL
2 0
SELECT a, hex(b) FROM test.t2;
a hex(b)
1 0
2 1
SELECT * FROM test.t3;
a b
1 NULL
2 0
DROP TABLE IF EXISTS test.t1;
DROP TABLE IF EXISTS test.t2;
DROP TABLE IF EXISTS test.t3;
......@@ -20,7 +20,7 @@ ndb_binlog_ddl_multi : Bug#17038 [PATCH PENDING]
ndb_load : Bug#17233
partition_03ndb : Bug#16385
ps_7ndb : dbug assert in RBR mode when executing test suite
rpl_bit_npk : Bug#13418
#rpl_bit_npk : Bug#13418
rpl_ddl : Bug#15963 SBR does not show "Definer" correctly
rpl_ndb_auto_inc : Bug#17086
rpl_ndb_basic : Bug#16228 [IN REVIEW]
......
......@@ -173,8 +173,9 @@ public:
virtual int cmp(const char *,const char *)=0;
virtual int cmp_binary(const char *a,const char *b, uint32 max_length=~0L)
{ return memcmp(a,b,pack_length()); }
int cmp_offset(uint row_offset) { return cmp(ptr,ptr+row_offset); }
int cmp_binary_offset(uint row_offset)
virtual int cmp_offset(uint row_offset)
{ return cmp(ptr,ptr+row_offset); }
virtual int cmp_binary_offset(uint row_offset)
{ return cmp_binary(ptr, ptr+row_offset); };
virtual int key_cmp(const byte *a,const byte *b)
{ return cmp((char*) a,(char*) b); }
......@@ -1317,6 +1318,20 @@ public:
};
/*
Note:
To use Field_bit::cmp_binary() you need to copy the bits stored in
the beginning of the record (the NULL bytes) to each memory you
want to compare (where the arguments point).
This is the reason:
- Field_bit::cmp_binary() is only implemented in the base class
(Field::cmp_binary()).
- Field::cmp_binary() currenly use pack_length() to calculate how
long the data is.
- pack_length() includes size of the bits stored in the NULL bytes
of the record.
*/
class Field_bit :public Field {
public:
uchar *bit_ptr; // position in record where 'uneven' bits store
......@@ -1342,6 +1357,8 @@ public:
my_decimal *val_decimal(my_decimal *);
int cmp(const char *a, const char *b)
{ return cmp_binary(a, b); }
int cmp_binary_offset(uint row_offset)
{ return cmp_offset(row_offset); }
int cmp_max(const char *a, const char *b, uint max_length);
int key_cmp(const byte *a, const byte *b)
{ return cmp_binary((char *) a, (char *) b); }
......
......@@ -6359,18 +6359,27 @@ void Write_rows_log_event::print(FILE *file, PRINT_EVENT_INFO* print_event_info)
**************************************************************************/
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
static int record_compare(TABLE *table, char const *a, char const *b)
/*
Compares table->record[0] and table->record[1]
Returns TRUE if different.
*/
static bool record_compare(TABLE *table)
{
for (my_size_t i= 0 ; i < table->s->fields ; ++i)
if (table->s->blob_fields + table->s->varchar_fields == 0)
return cmp_record(table,record[1]);
/* Compare null bits */
if (memcmp(table->null_flags,
table->null_flags+table->s->rec_buff_length,
table->s->null_bytes))
return TRUE; // Diff in NULL value
/* Compare updated fields */
for (Field **ptr=table->field ; *ptr ; ptr++)
{
uint const off= table->field[i]->offset();
uint const res= table->field[i]->cmp_binary(a + off,
b + off);
if (res != 0) {
return res;
}
if ((*ptr)->cmp_binary_offset(table->s->rec_buff_length))
return TRUE;
}
return 0;
return FALSE;
}
......@@ -6378,15 +6387,12 @@ static int record_compare(TABLE *table, char const *a, char const *b)
Find the row given by 'key', if the table has keys, or else use a table scan
to find (and fetch) the row. If the engine allows random access of the
records, a combination of position() and rnd_pos() will be used.
The 'record_buf' will be used as buffer for records while locating the
correct row.
*/
static int find_and_fetch_row(TABLE *table, byte *key, byte *record_buf)
static int find_and_fetch_row(TABLE *table, byte *key)
{
DBUG_ENTER("find_and_fetch_row(TABLE *table, byte *key, byte *record)");
DBUG_PRINT("enter", ("table=%p, key=%p, record=%p",
table, key, record_buf));
table, key, table->record[1]));
DBUG_ASSERT(table->in_use != NULL);
......@@ -6404,7 +6410,7 @@ static int find_and_fetch_row(TABLE *table, byte *key, byte *record_buf)
DBUG_RETURN(table->file->rnd_pos(table->record[0], table->file->ref));
}
DBUG_ASSERT(record_buf);
DBUG_ASSERT(table->record[1]);
/* We need to retrieve all fields */
table->file->ha_set_all_bits_in_read_set();
......@@ -6412,7 +6418,7 @@ static int find_and_fetch_row(TABLE *table, byte *key, byte *record_buf)
if (table->s->keys > 0)
{
int error;
if ((error= table->file->index_read_idx(record_buf, 0, key,
if ((error= table->file->index_read_idx(table->record[1], 0, key,
table->key_info->key_length,
HA_READ_KEY_EXACT)))
{
......@@ -6437,10 +6443,10 @@ static int find_and_fetch_row(TABLE *table, byte *key, byte *record_buf)
if (table->key_info->flags & HA_NOSAME)
DBUG_RETURN(0);
while (record_compare(table, (const char*)table->record[0], (const char*)record_buf) != 0)
while (record_compare(table))
{
int error;
if ((error= table->file->index_next(record_buf)))
if ((error= table->file->index_next(table->record[1])))
{
table->file->print_error(error, MYF(0));
DBUG_RETURN(error);
......@@ -6454,7 +6460,7 @@ static int find_and_fetch_row(TABLE *table, byte *key, byte *record_buf)
int error= 0;
do
{
error= table->file->rnd_next(record_buf);
error= table->file->rnd_next(table->record[1]);
switch (error)
{
case 0:
......@@ -6471,9 +6477,7 @@ static int find_and_fetch_row(TABLE *table, byte *key, byte *record_buf)
DBUG_RETURN(error);
}
}
while (restart_count < 2 &&
record_compare(table, (const char*)table->record[0],
(const char*)record_buf) != 0);
while (restart_count < 2 && record_compare(table));
DBUG_ASSERT(error == HA_ERR_END_OF_FILE || error == 0);
DBUG_RETURN(error);
......@@ -6493,7 +6497,7 @@ Delete_rows_log_event::Delete_rows_log_event(THD *thd_arg, TABLE *tbl_arg,
bool is_transactional)
: Rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional)
#ifdef HAVE_REPLICATION
,m_memory(NULL), m_key(NULL), m_search_record(NULL)
,m_memory(NULL), m_key(NULL), m_after_image(NULL)
#endif
{
}
......@@ -6510,7 +6514,7 @@ Delete_rows_log_event::Delete_rows_log_event(const char *buf, uint event_len,
: Rows_log_event(buf, event_len, DELETE_ROWS_EVENT, description_event)
#else
: Rows_log_event(buf, event_len, DELETE_ROWS_EVENT, description_event),
m_memory(NULL), m_key(NULL), m_search_record(NULL)
m_memory(NULL), m_key(NULL), m_after_image(NULL)
#endif
{
}
......@@ -6525,7 +6529,7 @@ int Delete_rows_log_event::do_before_row_operations(TABLE *table)
table->s->primary_key < MAX_KEY)
{
/*
We don't need to allocate any memory for m_search_record and
We don't need to allocate any memory for m_after_image and
m_key since they are not used.
*/
return 0;
......@@ -6537,14 +6541,14 @@ int Delete_rows_log_event::do_before_row_operations(TABLE *table)
{
m_memory=
my_multi_malloc(MYF(MY_WME),
&m_search_record, table->s->reclength,
&m_after_image, table->s->reclength,
&m_key, table->key_info->key_length,
NULL);
}
else
{
m_search_record= (byte*)my_malloc(table->s->reclength, MYF(MY_WME));
m_memory= (gptr)m_search_record;
m_after_image= (byte*)my_malloc(table->s->reclength, MYF(MY_WME));
m_memory= (gptr)m_after_image;
m_key= NULL;
}
if (!m_memory)
......@@ -6571,7 +6575,7 @@ int Delete_rows_log_event::do_after_row_operations(TABLE *table, int error)
table->file->ha_index_or_rnd_end();
my_free(m_memory, MYF(MY_ALLOW_ZERO_PTR)); // Free for multi_malloc
m_memory= NULL;
m_search_record= NULL;
m_after_image= NULL;
m_key= NULL;
return error;
......@@ -6609,7 +6613,7 @@ int Delete_rows_log_event::do_exec_row(TABLE *table)
{
DBUG_ASSERT(table != NULL);
int error= find_and_fetch_row(table, m_key, m_search_record);
int error= find_and_fetch_row(table, m_key);
if (error)
return error;
......@@ -6685,7 +6689,7 @@ int Update_rows_log_event::do_before_row_operations(TABLE *table)
table->s->primary_key < MAX_KEY)
{
/*
We don't need to allocate any memory for m_search_record and
We don't need to allocate any memory for m_after_image and
m_key since they are not used.
*/
return 0;
......@@ -6697,14 +6701,14 @@ int Update_rows_log_event::do_before_row_operations(TABLE *table)
{
m_memory=
my_multi_malloc(MYF(MY_WME),
&m_search_record, table->s->reclength,
&m_after_image, table->s->reclength,
&m_key, table->key_info->key_length,
NULL);
}
else
{
m_search_record= (byte*)my_malloc(table->s->reclength, MYF(MY_WME));
m_memory= (gptr)m_search_record;
m_after_image= (byte*)my_malloc(table->s->reclength, MYF(MY_WME));
m_memory= (gptr)m_after_image;
m_key= NULL;
}
if (!m_memory)
......@@ -6732,7 +6736,7 @@ int Update_rows_log_event::do_after_row_operations(TABLE *table, int error)
table->file->ha_index_or_rnd_end();
my_free(m_memory, MYF(MY_ALLOW_ZERO_PTR));
m_memory= NULL;
m_search_record= NULL;
m_after_image= NULL;
m_key= NULL;
return error;
......@@ -6752,8 +6756,8 @@ char const *Update_rows_log_event::do_prepare_row(THD *thd, TABLE *table,
/* record[0] is the before image for the update */
ptr= unpack_row(table, table->record[0], ptr, &m_cols);
DBUG_ASSERT(ptr != NULL);
/* record[1] is the after image for the update */
ptr= unpack_row(table, table->record[1], ptr, &m_cols);
/* m_after_image is the after image for the update */
ptr= unpack_row(table, m_after_image, ptr, &m_cols);
/*
If we will access rows using the random access method, m_key will
......@@ -6773,10 +6777,19 @@ int Update_rows_log_event::do_exec_row(TABLE *table)
{
DBUG_ASSERT(table != NULL);
int error= find_and_fetch_row(table, m_key, m_search_record);
int error= find_and_fetch_row(table, m_key);
if (error)
return error;
/*
This is only a precaution to make sure that the call to
ha_update_row is using record[1].
If this is not needed/required, then we could use m_after_image in
that call instead.
*/
bmove_align(table->record[1], m_after_image,(size_t) table->s->reclength);
/*
Now we should have the right row to update. The record that has
been fetched is guaranteed to be in record[0], so we use that.
......
......@@ -2012,7 +2012,7 @@ private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
gptr m_memory;
byte *m_search_record;
byte *m_after_image;
virtual int do_before_row_operations(TABLE *table);
virtual int do_after_row_operations(TABLE *table, int error);
......@@ -2076,7 +2076,7 @@ private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
gptr m_memory;
byte *m_key;
byte *m_search_record;
byte *m_after_image;
virtual int do_before_row_operations(TABLE *table);
virtual int do_after_row_operations(TABLE *table, int error);
......@@ -2146,7 +2146,7 @@ private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
gptr m_memory;
byte *m_key;
byte *m_search_record;
byte *m_after_image;
virtual int do_before_row_operations(TABLE *table);
virtual int do_after_row_operations(TABLE *table, int error);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment