Commit 99ed5070 authored by unknown's avatar unknown

Merge romeo.(none):/home/bkroot/mysql-5.1-new-rpl

into  romeo.(none):/home/bk/b20265-mysql-5.1-new-rpl


sql/sql_class.h:
  Auto merged
sql/log.cc:
  Manual merge
sql/sql_insert.cc:
  Manual merge
parents 4c962720 89f45796
...@@ -359,15 +359,6 @@ show binlog events from 102; ...@@ -359,15 +359,6 @@ show binlog events from 102;
Log_name Pos Event_type Server_id End_log_pos Info Log_name Pos Event_type Server_id End_log_pos Info
master-bin.000001 # Table_map 1 # table_id: # (test.t1) master-bin.000001 # Table_map 1 # table_id: # (test.t1)
master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F
master-bin.000001 # Query 1 # use `test`; BEGIN
master-bin.000001 # Query 1 # use `test`; CREATE TABLE `t2` (
`a` int(11) NOT NULL DEFAULT '0',
`b` int(11) DEFAULT NULL,
PRIMARY KEY (`a`)
) ENGINE=InnoDB
master-bin.000001 # Table_map 1 # table_id: # (test.t2)
master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F
master-bin.000001 # Xid 1 # COMMIT /* xid= */
master-bin.000001 # Query 1 # use `test`; DROP TABLE if exists t2 master-bin.000001 # Query 1 # use `test`; DROP TABLE if exists t2
master-bin.000001 # Table_map 1 # table_id: # (test.t1) master-bin.000001 # Table_map 1 # table_id: # (test.t1)
master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F
...@@ -375,15 +366,6 @@ master-bin.000001 # Query 1 # use `test`; DROP TABLE IF EXISTS t2 ...@@ -375,15 +366,6 @@ master-bin.000001 # Query 1 # use `test`; DROP TABLE IF EXISTS t2
master-bin.000001 # Query 1 # use `test`; CREATE TABLE t2 (a int, b int, primary key (a)) engine=innodb master-bin.000001 # Query 1 # use `test`; CREATE TABLE t2 (a int, b int, primary key (a)) engine=innodb
master-bin.000001 # Table_map 1 # table_id: # (test.t1) master-bin.000001 # Table_map 1 # table_id: # (test.t1)
master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F
master-bin.000001 # Query 1 # use `test`; BEGIN
master-bin.000001 # Query 1 # use `test`; CREATE TABLE `t2` (
`a` int(11) NOT NULL DEFAULT '0',
`b` int(11) DEFAULT NULL,
PRIMARY KEY (`a`)
) ENGINE=InnoDB
master-bin.000001 # Table_map 1 # table_id: # (test.t2)
master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F
master-bin.000001 # Xid 1 # COMMIT /* xid= */
master-bin.000001 # Query 1 # use `test`; TRUNCATE table t2 master-bin.000001 # Query 1 # use `test`; TRUNCATE table t2
master-bin.000001 # Xid 1 # COMMIT /* xid= */ master-bin.000001 # Xid 1 # COMMIT /* xid= */
master-bin.000001 # Table_map 1 # table_id: # (test.t1) master-bin.000001 # Table_map 1 # table_id: # (test.t1)
......
This diff is collapsed.
...@@ -930,6 +930,7 @@ class THD :public Statement, ...@@ -930,6 +930,7 @@ class THD :public Statement,
/* /*
Public interface to write RBR events to the binlog Public interface to write RBR events to the binlog
*/ */
void binlog_start_trans_and_stmt();
int binlog_write_table_map(TABLE *table, bool is_transactional); int binlog_write_table_map(TABLE *table, bool is_transactional);
int binlog_write_row(TABLE* table, bool is_transactional, int binlog_write_row(TABLE* table, bool is_transactional,
MY_BITMAP const* cols, my_size_t colcnt, MY_BITMAP const* cols, my_size_t colcnt,
......
...@@ -1266,7 +1266,7 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info) ...@@ -1266,7 +1266,7 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info)
if (thd->lex->current_select) if (thd->lex->current_select)
thd->lex->current_select->no_error= 0; // Give error thd->lex->current_select->no_error= 0; // Give error
table->file->print_error(error,MYF(0)); table->file->print_error(error,MYF(0));
before_trg_err: before_trg_err:
table->file->restore_auto_increment(prev_insert_id); table->file->restore_auto_increment(prev_insert_id);
if (key) if (key)
...@@ -2014,6 +2014,10 @@ pthread_handler_t handle_delayed_insert(void *arg) ...@@ -2014,6 +2014,10 @@ pthread_handler_t handle_delayed_insert(void *arg)
rolled back. We only need to roll back a potential statement rolled back. We only need to roll back a potential statement
transaction, since real transactions are rolled back in transaction, since real transactions are rolled back in
close_thread_tables(). close_thread_tables().
TODO: This is not true any more, table maps are generated on the
first call to ha_*_row() instead. Remove code that are used to
cover for the case outlined above.
*/ */
ha_rollback_stmt(thd); ha_rollback_stmt(thd);
...@@ -2357,6 +2361,7 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u) ...@@ -2357,6 +2361,7 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
DBUG_ENTER("select_insert::prepare"); DBUG_ENTER("select_insert::prepare");
unit= u; unit= u;
/* /*
Since table in which we are going to insert is added to the first Since table in which we are going to insert is added to the first
select, LEX::current_select should point to the first select while select, LEX::current_select should point to the first select while
...@@ -2586,56 +2591,54 @@ void select_insert::send_error(uint errcode,const char *err) ...@@ -2586,56 +2591,54 @@ void select_insert::send_error(uint errcode,const char *err)
if (errcode != ER_UNKNOWN_ERROR && !thd->net.report_error) if (errcode != ER_UNKNOWN_ERROR && !thd->net.report_error)
my_message(errcode, err, MYF(0)); my_message(errcode, err, MYF(0));
if (!table) /*
If the creation of the table failed (due to a syntax error, for
example), no table will have been opened and therefore 'table'
will be NULL. In that case, we still need to execute the rollback
and the end of the function to truncate the binary log, but we can
skip all the intermediate steps.
*/
if (table)
{ {
/* /*
This can only happen when using CREATE ... SELECT and the table was not If we are not in prelocked mode, we end the bulk insert started
created becasue of an syntax error before.
*/ */
DBUG_VOID_RETURN; if (!thd->prelocked_mode)
} table->file->ha_end_bulk_insert();
if (!thd->prelocked_mode)
table->file->ha_end_bulk_insert(); /*
/* If at least one row has been inserted/modified and will stay in
If at least one row has been inserted/modified and will stay in the table the table (the table doesn't have transactions) we must write to
(the table doesn't have transactions) we must write to the binlog (and the binlog (and the error code will make the slave stop).
the error code will make the slave stop).
For many errors (example: we got a duplicate key error while
For many errors (example: we got a duplicate key error while inserting into a MyISAM table), no row will be added to the table,
inserting into a MyISAM table), no row will be added to the table, so passing the error to the slave will not help since there will
so passing the error to the slave will not help since there will be an error code mismatch (the inserts will succeed on the slave
be an error code mismatch (the inserts will succeed on the slave with no error).
with no error).
If table creation failed, the number of rows modified will also be
If we are using row-based replication we have two cases where this zero, so no check for that is made.
code is executed: replication of CREATE-SELECT and replication of */
INSERT-SELECT. if (info.copied || info.deleted || info.updated)
When replicating a CREATE-SELECT statement, we shall not write the
events to the binary log and should thus not set
OPTION_STATUS_NO_TRANS_UPDATE.
When replicating INSERT-SELECT, we shall not write the events to
the binary log for transactional table, but shall write all events
if there is one or more writes to non-transactional tables. In
this case, the OPTION_STATUS_NO_TRANS_UPDATE is set if there is a
write to a non-transactional table, otherwise it is cleared.
*/
if (info.copied || info.deleted || info.updated)
{
if (!table->file->has_transactions())
{ {
if (mysql_bin_log.is_open()) DBUG_ASSERT(table != NULL);
if (!table->file->has_transactions())
{ {
thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query, thd->query_length, if (mysql_bin_log.is_open())
table->file->has_transactions(), FALSE); {
thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query, thd->query_length,
table->file->has_transactions(), FALSE);
}
if (!thd->current_stmt_binlog_row_based && !table->s->tmp_table &&
!can_rollback_data())
thd->options|= OPTION_STATUS_NO_TRANS_UPDATE;
query_cache_invalidate3(thd, table, 1);
} }
if (!thd->current_stmt_binlog_row_based && !table->s->tmp_table &&
!can_rollback_data())
thd->options|= OPTION_STATUS_NO_TRANS_UPDATE;
query_cache_invalidate3(thd, table, 1);
} }
} }
ha_rollback_stmt(thd); ha_rollback_stmt(thd);
table->file->ha_release_auto_increment(); table->file->ha_release_auto_increment();
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
...@@ -2645,8 +2648,11 @@ void select_insert::send_error(uint errcode,const char *err) ...@@ -2645,8 +2648,11 @@ void select_insert::send_error(uint errcode,const char *err)
bool select_insert::send_eof() bool select_insert::send_eof()
{ {
int error,error2; int error,error2;
bool const trans_table= table->file->has_transactions();
ulonglong id; ulonglong id;
DBUG_ENTER("select_insert::send_eof"); DBUG_ENTER("select_insert::send_eof");
DBUG_PRINT("enter", ("trans_table=%d, table_type='%s'",
trans_table, table->file->table_type()));
error= (!thd->prelocked_mode) ? table->file->ha_end_bulk_insert():0; error= (!thd->prelocked_mode) ? table->file->ha_end_bulk_insert():0;
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
...@@ -2666,9 +2672,8 @@ bool select_insert::send_eof() ...@@ -2666,9 +2672,8 @@ bool select_insert::send_eof()
are not logged in RBR) are not logged in RBR)
- We are using statement based replication - We are using statement based replication
*/ */
if (!table->file->has_transactions() && if (!trans_table &&
(!table->s->tmp_table || (!table->s->tmp_table || !thd->current_stmt_binlog_row_based))
!thd->current_stmt_binlog_row_based))
thd->options|= OPTION_STATUS_NO_TRANS_UPDATE; thd->options|= OPTION_STATUS_NO_TRANS_UPDATE;
} }
...@@ -2684,11 +2689,22 @@ bool select_insert::send_eof() ...@@ -2684,11 +2689,22 @@ bool select_insert::send_eof()
thd->clear_error(); thd->clear_error();
thd->binlog_query(THD::ROW_QUERY_TYPE, thd->binlog_query(THD::ROW_QUERY_TYPE,
thd->query, thd->query_length, thd->query, thd->query_length,
table->file->has_transactions(), FALSE); trans_table, FALSE);
}
/*
We will call ha_autocommit_or_rollback() also for
non-transactional tables under row-based replication: there might
be events in the binary logs transaction, and we need to write
them to the binary log.
*/
if (trans_table || thd->current_stmt_binlog_row_based)
{
int const error2= ha_autocommit_or_rollback(thd, error);
if (error2 && !error)
error=error2;
} }
if ((error2=ha_autocommit_or_rollback(thd,error)) && ! error)
error=error2;
table->file->ha_release_auto_increment(); table->file->ha_release_auto_increment();
if (error) if (error)
{ {
table->file->print_error(error,MYF(0)); table->file->print_error(error,MYF(0));
...@@ -2885,14 +2901,19 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u) ...@@ -2885,14 +2901,19 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
class MY_HOOKS : public TABLEOP_HOOKS { class MY_HOOKS : public TABLEOP_HOOKS {
public: public:
MY_HOOKS(select_create *x) : ptr(x) { } MY_HOOKS(select_create *x) : ptr(x) { }
private:
virtual void do_prelock(TABLE **tables, uint count) virtual void do_prelock(TABLE **tables, uint count)
{ {
if (ptr->get_thd()->current_stmt_binlog_row_based && TABLE const *const table = *tables;
!(ptr->get_create_info()->options & HA_LEX_CREATE_TMP_TABLE)) if (ptr->get_thd()->current_stmt_binlog_row_based &&
ptr->binlog_show_create_table(tables, count); table->s->tmp_table == NO_TMP_TABLE &&
!ptr->get_create_info()->table_existed)
{
ptr->binlog_show_create_table(tables, count);
}
} }
private:
select_create *ptr; select_create *ptr;
}; };
...@@ -2901,6 +2922,20 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u) ...@@ -2901,6 +2922,20 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
#endif #endif
unit= u; unit= u;
#ifdef HAVE_ROW_BASED_REPLICATION
/*
Start a statement transaction before the create if we are creating
a non-temporary table and are using row-based replication for the
statement.
*/
if ((thd->lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) == 0 &&
thd->current_stmt_binlog_row_based)
{
thd->binlog_start_trans_and_stmt();
}
#endif
if (!(table= create_table_from_items(thd, create_info, create_table, if (!(table= create_table_from_items(thd, create_info, create_table,
extra_fields, keys, &values, extra_fields, keys, &values,
&thd->extra_lock, hook_ptr))) &thd->extra_lock, hook_ptr)))
...@@ -3048,8 +3083,17 @@ void select_create::abort() ...@@ -3048,8 +3083,17 @@ void select_create::abort()
table->s->version= 0; table->s->version= 0;
hash_delete(&open_cache,(byte*) table); hash_delete(&open_cache,(byte*) table);
if (!create_info->table_existed) if (!create_info->table_existed)
{
quick_rm_table(table_type, create_table->db, quick_rm_table(table_type, create_table->db,
create_table->table_name, 0); create_table->table_name, 0);
/*
We roll back the statement, including truncating the
transaction cache of the binary log, if the statement
failed.
*/
if (thd->current_stmt_binlog_row_based)
ha_rollback_stmt(thd);
}
/* Tell threads waiting for refresh that something has happened */ /* Tell threads waiting for refresh that something has happened */
if (version != refresh_version) if (version != refresh_version)
broadcast_refresh(); broadcast_refresh();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment