Commit 8cdd6613 authored by Sergei Golubchik's avatar Sergei Golubchik

Online alter: always commit for non-trans engines

even if called from binlog_rollback()
parent 5a867d84
...@@ -279,6 +279,62 @@ a b ...@@ -279,6 +279,62 @@ a b
123 NULL 123 NULL
456 NULL 456 NULL
789 NULL 789 NULL
# MYISAM + error
create or replace table t1 (a int primary key) engine=myisam;
insert t1 values (5);
connection con2;
set debug_sync= 'now WAIT_FOR ended';
connection default;
set debug_sync= 'alter_table_copy_end SIGNAL ended WAIT_FOR end';
alter table t1 add b int NULL, algorithm= copy, lock= none;
connection con2;
insert into t1 values (1),(2),(3),(4),(5),(6);
ERROR 23000: Duplicate entry '5' for key 'PRIMARY'
select * from t1;
a
1
2
3
4
5
set debug_sync= 'now SIGNAL end';
connection default;
select * from t1;
a b
5 NULL
1 NULL
2 NULL
3 NULL
4 NULL
# Aria + error
set @@binlog_format=row;
create or replace table t1 (a int primary key) engine=aria;
insert t1 values (5);
connection con2;
set debug_sync= 'now WAIT_FOR ended';
connection default;
set debug_sync= 'alter_table_copy_end SIGNAL ended WAIT_FOR end';
alter table t1 add b int NULL, algorithm= copy, lock= none;
connection con2;
insert into t1 values (1),(2),(3),(4),(5),(6);
ERROR 23000: Duplicate entry '5' for key 'PRIMARY'
select * from t1;
a
1
2
3
4
5
set debug_sync= 'now SIGNAL end';
connection default;
select * from t1;
a b
5 NULL
1 NULL
2 NULL
3 NULL
4 NULL
set @@binlog_format=default;
# Test incompatible changes # Test incompatible changes
create or replace table t1 (a int primary key, b int); create or replace table t1 (a int primary key, b int);
insert t1 values (1, 22); insert t1 values (1, 22);
......
...@@ -347,6 +347,60 @@ set debug_sync= 'now SIGNAL end'; ...@@ -347,6 +347,60 @@ set debug_sync= 'now SIGNAL end';
--reap --reap
select * from t1; select * from t1;
--echo # MYISAM + error
create or replace table t1 (a int primary key) engine=myisam;
insert t1 values (5);
--connection con2
--send
set debug_sync= 'now WAIT_FOR ended';
--connection default
set debug_sync= 'alter_table_copy_end SIGNAL ended WAIT_FOR end';
--send
alter table t1 add b int NULL, algorithm= copy, lock= none;
--connection con2
--reap
--error ER_DUP_ENTRY
insert into t1 values (1),(2),(3),(4),(5),(6);
select * from t1;
set debug_sync= 'now SIGNAL end';
--connection default
--reap
select * from t1;
--echo # Aria + error
set @@binlog_format=row; # otherwise aria upgrades the lock to TL_READ_NO_INSERT
create or replace table t1 (a int primary key) engine=aria;
insert t1 values (5);
--connection con2
--send
set debug_sync= 'now WAIT_FOR ended';
--connection default
set debug_sync= 'alter_table_copy_end SIGNAL ended WAIT_FOR end';
--send
alter table t1 add b int NULL, algorithm= copy, lock= none;
--connection con2
--reap
--error ER_DUP_ENTRY
insert into t1 values (1),(2),(3),(4),(5),(6);
select * from t1;
set debug_sync= 'now SIGNAL end';
--connection default
--reap
select * from t1;
set @@binlog_format=default;
--echo # Test incompatible changes --echo # Test incompatible changes
create or replace table t1 (a int primary key, b int); create or replace table t1 (a int primary key, b int);
insert t1 values (1, 22); insert t1 values (1, 22);
......
...@@ -104,8 +104,7 @@ static int binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr, ...@@ -104,8 +104,7 @@ static int binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr,
Log_event *end_ev, bool all, bool using_stmt, Log_event *end_ev, bool all, bool using_stmt,
bool using_trx, bool is_ro_1pc); bool using_trx, bool is_ro_1pc);
static int binlog_online_alter_commit(THD *thd, bool all); static int binlog_online_alter_end_trans(THD *thd, bool all, bool commit);
static void binlog_online_alter_rollback(THD *thd, bool all);
static const LEX_CSTRING write_error_msg= static const LEX_CSTRING write_error_msg=
{ STRING_WITH_LEN("error writing to the binary log") }; { STRING_WITH_LEN("error writing to the binary log") };
...@@ -2297,7 +2296,7 @@ int binlog_commit(THD *thd, bool all, bool ro_1pc) ...@@ -2297,7 +2296,7 @@ int binlog_commit(THD *thd, bool all, bool ro_1pc)
DBUG_ENTER("binlog_commit"); DBUG_ENTER("binlog_commit");
bool is_ending_transaction= ending_trans(thd, all); bool is_ending_transaction= ending_trans(thd, all);
error= binlog_online_alter_commit(thd, all); error= binlog_online_alter_end_trans(thd, all, true);
if (error) if (error)
DBUG_RETURN(error); DBUG_RETURN(error);
...@@ -2408,7 +2407,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) ...@@ -2408,7 +2407,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
bool rollback_online= !thd->online_alter_cache_list.empty(); bool rollback_online= !thd->online_alter_cache_list.empty();
if (rollback_online) if (rollback_online)
binlog_online_alter_rollback(thd, all); binlog_online_alter_end_trans(thd, all, 0);
int error= 0; int error= 0;
binlog_cache_mngr *const cache_mngr= thd->binlog_get_cache_mngr(); binlog_cache_mngr *const cache_mngr= thd->binlog_get_cache_mngr();
...@@ -7674,12 +7673,11 @@ class CacheWriter: public Log_event_writer ...@@ -7674,12 +7673,11 @@ class CacheWriter: public Log_event_writer
bool first; bool first;
}; };
static int binlog_online_alter_commit(THD *thd, bool all) static int binlog_online_alter_end_trans(THD *thd, bool all, bool commit)
{ {
DBUG_ENTER("binlog_online_alter_commit"); DBUG_ENTER("binlog_online_alter_end_trans");
int error= 0; int error= 0;
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
if (thd->online_alter_cache_list.empty()) if (thd->online_alter_cache_list.empty())
DBUG_RETURN(0); DBUG_RETURN(0);
...@@ -7690,16 +7688,23 @@ static int binlog_online_alter_commit(THD *thd, bool all) ...@@ -7690,16 +7688,23 @@ static int binlog_online_alter_commit(THD *thd, bool all)
auto *binlog= cache.share->online_alter_binlog; auto *binlog= cache.share->online_alter_binlog;
DBUG_ASSERT(binlog); DBUG_ASSERT(binlog);
// do not set STMT_END for last event to leave table open in altering thd bool do_commit= commit || !cache.share->db_type()->rollback ||
error= binlog_flush_pending_rows_event(thd, false, true, binlog, &cache); cache.share->db_type()->flags & HTON_NO_ROLLBACK; // Aria
if (is_ending_transaction) if (do_commit)
{ {
mysql_mutex_lock(binlog->get_log_lock()); // do not set STMT_END for last event to leave table open in altering thd
error= binlog->write_cache(thd, &cache.cache_log); error= binlog_flush_pending_rows_event(thd, false, true, binlog, &cache);
mysql_mutex_unlock(binlog->get_log_lock()); if (is_ending_transaction)
{
mysql_mutex_lock(binlog->get_log_lock());
error= binlog->write_cache(thd, &cache.cache_log);
mysql_mutex_unlock(binlog->get_log_lock());
}
else
cache.store_prev_position();
} }
else else if (!is_ending_transaction)
cache.store_prev_position(); cache.restore_prev_position();
if (error) if (error)
{ {
...@@ -7715,35 +7720,11 @@ static int binlog_online_alter_commit(THD *thd, bool all) ...@@ -7715,35 +7720,11 @@ static int binlog_online_alter_commit(THD *thd, bool all)
is_ending_transaction); is_ending_transaction);
for (TABLE *table= thd->open_tables; table; table= table->next) for (TABLE *table= thd->open_tables; table; table= table->next)
{
table->online_alter_cache= NULL; table->online_alter_cache= NULL;
}
#endif // HAVE_REPLICATION #endif // HAVE_REPLICATION
DBUG_RETURN(error); DBUG_RETURN(error);
} }
static void binlog_online_alter_rollback(THD *thd, bool all)
{
#ifdef HAVE_REPLICATION
bool is_ending_trans= ending_trans(thd, all);
if (!is_ending_trans)
for (auto &cache: thd->online_alter_cache_list)
cache.restore_prev_position();
/*
This is a crucial moment that we are running through
thd->online_alter_cache_list, and not through thd->open_tables to cleanup
stmt cache, though both have it. The reason is that the tables can be closed
to that moment in case of an error.
The same reason applies to the fact we don't store cache in the table
itself -- because it can happen to be not existing.
Still in case if tables are left opened
*/
binlog_online_alter_cleanup(thd->online_alter_cache_list, is_ending_trans);
#endif // HAVE_REPLICATION
}
SAVEPOINT** find_savepoint_in_list(THD *thd, LEX_CSTRING name, SAVEPOINT** find_savepoint_in_list(THD *thd, LEX_CSTRING name,
SAVEPOINT ** const list); SAVEPOINT ** const list);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment