Commit db39f3a8 authored by unknown's avatar unknown

BUG#27779 (Slave cannot read old rows log events):

Taking code from before BUG#22583 and incorporating as events to be able
to read old events. Also incorporating old pack and unpack functions
into patch.


client/Makefile.am:
  Adding files log_event_old.{h,cc} and rpl_record_old.{h,cc}
client/mysqlbinlog.cc:
  Adding log_event_old.cc.
libmysqld/Makefile.am:
  Adding files log_event_old.{h,cc} and rpl_record_old.{h,cc}
sql/CMakeLists.txt:
  Adding files log_event_old.{h,cc} and rpl_record_old.{h,cc}
sql/Makefile.am:
  Adding files log_event_old.{h,cc} and rpl_record_old.{h,cc}
sql/log_event.cc:
  Adding code to read pre-GA rows events.
sql/log_event.h:
  Refactoring to support inheritance and including "old" events definitions.
sql/log_event_old.cc:
  New BitKeeper file ``sql/log_event_old.cc''
sql/log_event_old.h:
  New BitKeeper file ``sql/log_event_old.h''
sql/rpl_record_old.cc:
  New BitKeeper file ``sql/rpl_record_old.cc''
sql/rpl_record_old.h:
  New BitKeeper file ``sql/rpl_record_old.h''
parent 9c267381
...@@ -96,7 +96,9 @@ DEFS = -DUNDEF_THREADS_HACK \ ...@@ -96,7 +96,9 @@ DEFS = -DUNDEF_THREADS_HACK \
-DDATADIR="\"$(localstatedir)\"" -DDATADIR="\"$(localstatedir)\""
sql_src=log_event.h mysql_priv.h rpl_constants.h \ sql_src=log_event.h mysql_priv.h rpl_constants.h \
log_event.cc my_decimal.h my_decimal.cc log_event.cc my_decimal.h my_decimal.cc \
log_event_old.h log_event_old.cc \
rpl_record_old.h rpl_record_old.cc
strings_src=decimal.c strings_src=decimal.c
link_sources: link_sources:
......
...@@ -1600,10 +1600,12 @@ int main(int argc, char** argv) ...@@ -1600,10 +1600,12 @@ int main(int argc, char** argv)
#include "decimal.c" #include "decimal.c"
#include "my_decimal.cpp" #include "my_decimal.cpp"
#include "log_event.cpp" #include "log_event.cpp"
#include "log_event_old.cpp"
#else #else
#include "my_decimal.h" #include "my_decimal.h"
#include "decimal.c" #include "decimal.c"
#include "my_decimal.cc" #include "my_decimal.cc"
#include "log_event.cc" #include "log_event.cc"
#include "log_event_old.cc"
#endif #endif
...@@ -53,7 +53,9 @@ sqlsources = derror.cc field.cc field_conv.cc strfunc.cc filesort.cc \ ...@@ -53,7 +53,9 @@ sqlsources = derror.cc field.cc field_conv.cc strfunc.cc filesort.cc \
item_func.cc item_strfunc.cc item_sum.cc item_timefunc.cc \ item_func.cc item_strfunc.cc item_sum.cc item_timefunc.cc \
item_geofunc.cc item_subselect.cc item_row.cc\ item_geofunc.cc item_subselect.cc item_row.cc\
item_xmlfunc.cc \ item_xmlfunc.cc \
key.cc lock.cc log.cc log_event.cc sql_state.c \ key.cc lock.cc log.cc sql_state.c \
log_event.cc \
log_event_old.cc rpl_record_old.cc \
protocol.cc net_serv.cc opt_range.cc \ protocol.cc net_serv.cc opt_range.cc \
opt_sum.cc procedure.cc records.cc sql_acl.cc \ opt_sum.cc procedure.cc records.cc sql_acl.cc \
sql_load.cc discover.cc sql_locale.cc \ sql_load.cc discover.cc sql_locale.cc \
......
...@@ -48,6 +48,7 @@ ADD_EXECUTABLE(mysqld ../sql-common/client.c derror.cc des_key_file.cc ...@@ -48,6 +48,7 @@ ADD_EXECUTABLE(mysqld ../sql-common/client.c derror.cc des_key_file.cc
item_create.cc item_func.cc item_geofunc.cc item_row.cc item_create.cc item_func.cc item_geofunc.cc item_row.cc
item_strfunc.cc item_subselect.cc item_sum.cc item_timefunc.cc item_strfunc.cc item_subselect.cc item_sum.cc item_timefunc.cc
key.cc log.cc lock.cc log_event.cc message.rc key.cc log.cc lock.cc log_event.cc message.rc
log_event_old.cc rpl_record_old.cc
message.h mf_iocache.cc my_decimal.cc ../sql-common/my_time.c message.h mf_iocache.cc my_decimal.cc ../sql-common/my_time.c
mysqld.cc net_serv.cc mysqld.cc net_serv.cc
nt_servc.cc nt_servc.h opt_range.cc opt_range.h opt_sum.cc nt_servc.cc nt_servc.h opt_range.cc opt_range.h opt_sum.cc
......
...@@ -59,6 +59,7 @@ noinst_HEADERS = item.h item_func.h item_sum.h item_cmpfunc.h \ ...@@ -59,6 +59,7 @@ noinst_HEADERS = item.h item_func.h item_sum.h item_cmpfunc.h \
lex.h lex_symbol.h sql_acl.h sql_crypt.h \ lex.h lex_symbol.h sql_acl.h sql_crypt.h \
log_event.h sql_repl.h slave.h rpl_filter.h \ log_event.h sql_repl.h slave.h rpl_filter.h \
rpl_injector.h \ rpl_injector.h \
log_event_old.h rpl_record_old.h \
stacktrace.h sql_sort.h sql_cache.h set_var.h \ stacktrace.h sql_sort.h sql_cache.h set_var.h \
spatial.h gstream.h client_settings.h tzfile.h \ spatial.h gstream.h client_settings.h tzfile.h \
tztime.h my_decimal.h\ tztime.h my_decimal.h\
...@@ -88,6 +89,7 @@ mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \ ...@@ -88,6 +89,7 @@ mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \
procedure.cc sql_test.cc \ procedure.cc sql_test.cc \
log.cc log_event.cc init.cc derror.cc sql_acl.cc \ log.cc log_event.cc init.cc derror.cc sql_acl.cc \
unireg.cc des_key_file.cc \ unireg.cc des_key_file.cc \
log_event_old.cc rpl_record_old.cc \
discover.cc time.cc opt_range.cc opt_sum.cc \ discover.cc time.cc opt_range.cc opt_sum.cc \
records.cc filesort.cc handler.cc \ records.cc filesort.cc handler.cc \
ha_partition.cc \ ha_partition.cc \
......
...@@ -995,6 +995,15 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, ...@@ -995,6 +995,15 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
ev = new Format_description_log_event(buf, event_len, description_event); ev = new Format_description_log_event(buf, event_len, description_event);
break; break;
#if defined(HAVE_REPLICATION) #if defined(HAVE_REPLICATION)
case PRE_GA_WRITE_ROWS_EVENT:
ev = new Write_rows_log_event_old(buf, event_len, description_event);
break;
case PRE_GA_UPDATE_ROWS_EVENT:
ev = new Update_rows_log_event_old(buf, event_len, description_event);
break;
case PRE_GA_DELETE_ROWS_EVENT:
ev = new Delete_rows_log_event_old(buf, event_len, description_event);
break;
case WRITE_ROWS_EVENT: case WRITE_ROWS_EVENT:
ev = new Write_rows_log_event(buf, event_len, description_event); ev = new Write_rows_log_event(buf, event_len, description_event);
break; break;
......
...@@ -2427,7 +2427,7 @@ class Update_rows_log_event : public Rows_log_event ...@@ -2427,7 +2427,7 @@ class Update_rows_log_event : public Rows_log_event
return Rows_log_event::is_valid() && m_cols_ai.bitmap; return Rows_log_event::is_valid() && m_cols_ai.bitmap;
} }
private: protected:
virtual Log_event_type get_type_code() { return (Log_event_type)TYPE_CODE; } virtual Log_event_type get_type_code() { return (Log_event_type)TYPE_CODE; }
#ifdef MYSQL_CLIENT #ifdef MYSQL_CLIENT
...@@ -2498,7 +2498,7 @@ class Delete_rows_log_event : public Rows_log_event ...@@ -2498,7 +2498,7 @@ class Delete_rows_log_event : public Rows_log_event
} }
#endif #endif
private: protected:
virtual Log_event_type get_type_code() { return (Log_event_type)TYPE_CODE; } virtual Log_event_type get_type_code() { return (Log_event_type)TYPE_CODE; }
#ifdef MYSQL_CLIENT #ifdef MYSQL_CLIENT
...@@ -2519,6 +2519,8 @@ class Delete_rows_log_event : public Rows_log_event ...@@ -2519,6 +2519,8 @@ class Delete_rows_log_event : public Rows_log_event
}; };
#include "log_event_old.h"
/** /**
Class representing an incident, an occurance out of the ordinary, Class representing an incident, an occurance out of the ordinary,
that happened on the master. that happened on the master.
......
#include "mysql_priv.h"
#include "log_event_old.h"
#include "rpl_record_old.h"
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
int
Write_rows_log_event_old::do_prepare_row(THD *thd, RELAY_LOG_INFO *rli,
TABLE *table,
char const *row_start,
char const **row_end)
{
DBUG_ASSERT(table != NULL);
DBUG_ASSERT(row_start && row_end);
int error;
error= unpack_row_old(rli, table, m_width, table->record[0],
row_start, &m_cols, row_end, &m_master_reclength,
table->write_set, PRE_GA_WRITE_ROWS_EVENT);
bitmap_copy(table->read_set, table->write_set);
return error;
}
int
Delete_rows_log_event_old::do_prepare_row(THD *thd, RELAY_LOG_INFO *rli,
TABLE *table,
char const *row_start,
char const **row_end)
{
int error;
DBUG_ASSERT(row_start && row_end);
/*
This assertion actually checks that there is at least as many
columns on the slave as on the master.
*/
DBUG_ASSERT(table->s->fields >= m_width);
error= unpack_row_old(rli, table, m_width, table->record[0],
row_start, &m_cols, row_end, &m_master_reclength,
table->read_set, PRE_GA_DELETE_ROWS_EVENT);
/*
If we will access rows using the random access method, m_key will
be set to NULL, so we do not need to make a key copy in that case.
*/
if (m_key)
{
KEY *const key_info= table->key_info;
key_copy(m_key, table->record[0], key_info, 0);
}
return error;
}
int Update_rows_log_event_old::do_prepare_row(THD *thd, RELAY_LOG_INFO *rli,
TABLE *table,
char const *row_start,
char const **row_end)
{
int error;
DBUG_ASSERT(row_start && row_end);
/*
This assertion actually checks that there is at least as many
columns on the slave as on the master.
*/
DBUG_ASSERT(table->s->fields >= m_width);
/* record[0] is the before image for the update */
error= unpack_row_old(rli, table, m_width, table->record[0],
row_start, &m_cols, row_end, &m_master_reclength,
table->read_set, PRE_GA_UPDATE_ROWS_EVENT);
row_start = *row_end;
/* m_after_image is the after image for the update */
error= unpack_row_old(rli, table, m_width, m_after_image,
row_start, &m_cols, row_end, &m_master_reclength,
table->write_set, PRE_GA_UPDATE_ROWS_EVENT);
DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
DBUG_DUMP("m_after_image", m_after_image, table->s->reclength);
/*
If we will access rows using the random access method, m_key will
be set to NULL, so we do not need to make a key copy in that case.
*/
if (m_key)
{
KEY *const key_info= table->key_info;
key_copy(m_key, table->record[0], key_info, 0);
}
return error;
}
#endif
#ifndef LOG_EVENT_OLD_H
#define LOG_EVENT_OLD_H
/*
Need to include this file at the proper position of log_event.h
*/
class Write_rows_log_event_old : public Write_rows_log_event
{
public:
enum
{
/* Support interface to THD::binlog_prepare_pending_rows_event */
TYPE_CODE = PRE_GA_WRITE_ROWS_EVENT
};
#if defined(HAVE_REPLICATION)
Write_rows_log_event_old(const char *buf, uint event_len,
const Format_description_log_event *descr)
: Write_rows_log_event(buf, event_len, descr)
{
}
#endif
private:
virtual Log_event_type get_type_code() { return (Log_event_type)TYPE_CODE; }
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
virtual int do_prepare_row(THD*, RELAY_LOG_INFO*, TABLE*,
char const *row_start, char const **row_end);
#endif
};
class Update_rows_log_event_old : public Update_rows_log_event
{
public:
enum
{
/* Support interface to THD::binlog_prepare_pending_rows_event */
TYPE_CODE = PRE_GA_UPDATE_ROWS_EVENT
};
#if defined(HAVE_REPLICATION)
Update_rows_log_event_old(const char *buf, uint event_len,
const Format_description_log_event *descr)
: Update_rows_log_event(buf, event_len, descr)
{
}
#endif
private:
virtual Log_event_type get_type_code() { return (Log_event_type)TYPE_CODE; }
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
virtual int do_prepare_row(THD*, RELAY_LOG_INFO*, TABLE*,
char const *row_start, char const **row_end);
#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
};
class Delete_rows_log_event_old : public Delete_rows_log_event
{
public:
enum
{
/* Support interface to THD::binlog_prepare_pending_rows_event */
TYPE_CODE = PRE_GA_DELETE_ROWS_EVENT
};
#if defined(HAVE_REPLICATION)
Delete_rows_log_event_old(const char *buf, uint event_len,
const Format_description_log_event *descr)
: Delete_rows_log_event(buf, event_len, descr)
{
}
#endif
private:
virtual Log_event_type get_type_code() { return (Log_event_type)TYPE_CODE; }
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
virtual int do_prepare_row(THD*, RELAY_LOG_INFO*, TABLE*,
char const *row_start, char const **row_end);
#endif
};
#endif
#include "mysql_priv.h"
#include "rpl_record_old.h"
my_size_t
pack_row_old(THD *thd, TABLE *table, MY_BITMAP const* cols,
byte *row_data, const byte *record)
{
Field **p_field= table->field, *field;
int n_null_bytes= table->s->null_bytes;
byte *ptr;
uint i;
my_ptrdiff_t const rec_offset= record - table->record[0];
my_ptrdiff_t const def_offset= table->s->default_values - table->record[0];
memcpy(row_data, record, n_null_bytes);
ptr= row_data+n_null_bytes;
for (i= 0 ; (field= *p_field) ; i++, p_field++)
{
if (bitmap_is_set(cols,i))
{
my_ptrdiff_t const offset=
field->is_null(rec_offset) ? def_offset : rec_offset;
field->move_field_offset(offset);
ptr= (byte*)field->pack((char *) ptr, field->ptr);
field->move_field_offset(-offset);
}
}
return (static_cast<my_size_t>(ptr - row_data));
}
/*
Unpack a row into a record.
SYNOPSIS
unpack_row()
rli Relay log info
table Table to unpack into
colcnt Number of columns to read from record
record Record where the data should be unpacked
row Packed row data
cols Pointer to columns data to fill in
row_end Pointer to variable that will hold the value of the
one-after-end position for the row
master_reclength
Pointer to variable that will be set to the length of the
record on the master side
rw_set Pointer to bitmap that holds either the read_set or the
write_set of the table
DESCRIPTION
The row is assumed to only consist of the fields for which the
bitset represented by 'arr' and 'bits'; the other parts of the
record are left alone.
At most 'colcnt' columns are read: if the table is larger than
that, the remaining fields are not filled in.
RETURN VALUE
Error code, or zero if no error. The following error codes can
be returned:
ER_NO_DEFAULT_FOR_FIELD
Returned if one of the fields existing on the slave but not on
the master does not have a default value (and isn't nullable)
*/
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
int
unpack_row_old(RELAY_LOG_INFO *rli,
TABLE *table, uint const colcnt, byte *record,
char const *row, MY_BITMAP const *cols,
char const **row_end, ulong *master_reclength,
MY_BITMAP* const rw_set, Log_event_type const event_type)
{
DBUG_ASSERT(record && row);
my_ptrdiff_t const offset= record - (byte*) table->record[0];
my_size_t master_null_bytes= table->s->null_bytes;
if (colcnt != table->s->fields)
{
Field **fptr= &table->field[colcnt-1];
do
master_null_bytes= (*fptr)->last_null_byte();
while (master_null_bytes == Field::LAST_NULL_BYTE_UNDEF &&
fptr-- > table->field);
/*
If master_null_bytes is LAST_NULL_BYTE_UNDEF (0) at this time,
there were no nullable fields nor BIT fields at all in the
columns that are common to the master and the slave. In that
case, there is only one null byte holding the X bit.
OBSERVE! There might still be nullable columns following the
common columns, so table->s->null_bytes might be greater than 1.
*/
if (master_null_bytes == Field::LAST_NULL_BYTE_UNDEF)
master_null_bytes= 1;
}
DBUG_ASSERT(master_null_bytes <= table->s->null_bytes);
memcpy(record, row, master_null_bytes); // [1]
int error= 0;
bitmap_set_all(rw_set);
Field **const begin_ptr = table->field;
Field **field_ptr;
char const *ptr= row + master_null_bytes;
Field **const end_ptr= begin_ptr + colcnt;
for (field_ptr= begin_ptr ; field_ptr < end_ptr ; ++field_ptr)
{
Field *const f= *field_ptr;
if (bitmap_is_set(cols, field_ptr - begin_ptr))
{
f->move_field_offset(offset);
ptr= f->unpack(f->ptr, ptr);
f->move_field_offset(-offset);
/* Field...::unpack() cannot return 0 */
DBUG_ASSERT(ptr != NULL);
}
else
bitmap_clear_bit(rw_set, field_ptr - begin_ptr);
}
*row_end = ptr;
if (master_reclength)
{
if (*field_ptr)
*master_reclength = (*field_ptr)->ptr - (char*) table->record[0];
else
*master_reclength = table->s->reclength;
}
/*
Set properties for remaining columns, if there are any. We let the
corresponding bit in the write_set be set, to write the value if
it was not there already. We iterate over all remaining columns,
even if there were an error, to get as many error messages as
possible. We are still able to return a pointer to the next row,
so redo that.
This generation of error messages is only relevant when inserting
new rows.
*/
for ( ; *field_ptr ; ++field_ptr)
{
uint32 const mask= NOT_NULL_FLAG | NO_DEFAULT_VALUE_FLAG;
DBUG_PRINT("debug", ("flags = 0x%x, mask = 0x%x, flags & mask = 0x%x",
(*field_ptr)->flags, mask,
(*field_ptr)->flags & mask));
if (event_type == WRITE_ROWS_EVENT &&
((*field_ptr)->flags & mask) == mask)
{
slave_print_msg(ERROR_LEVEL, rli, ER_NO_DEFAULT_FOR_FIELD,
"Field `%s` of table `%s`.`%s` "
"has no default value and cannot be NULL",
(*field_ptr)->field_name, table->s->db.str,
table->s->table_name.str);
error = ER_NO_DEFAULT_FOR_FIELD;
}
else
(*field_ptr)->set_default();
}
return error;
}
#endif
#ifndef RPL_RECORD_OLD_H
#define RPL_RECORD_OLD_H
#ifndef MYSQL_CLIENT
my_size_t pack_row_old(THD *thd, TABLE *table, MY_BITMAP const* cols,
byte *row_data, const byte *record);
#ifdef HAVE_REPLICATION
int unpack_row_old(RELAY_LOG_INFO *rli,
TABLE *table, uint const colcnt, byte *record,
char const *row, MY_BITMAP const *cols,
char const **row_end, ulong *master_reclength,
MY_BITMAP* const rw_set,
Log_event_type const event_type);
#endif
#endif
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment