Commit 8d30a9b9 authored by Sachin's avatar Sachin

xyz

parent 0a44d258
commit 6da5b79183b972663a4fa5ad605f0381ad9b1d40
Author: Sachin <sachin.setiya@mariadb.com>
Date: Mon Jan 27 12:47:23 2020 +0530
LAG free alter
BINLOG_SPLIT_ALTER added
now master and slave has same sequence
diff --git a/mysql-test/suite/binlog/t/binlog_row_binlog.test b/mysql-test/suite/binlog/t/binlog_row_binlog.test
index 14cc00a2326..3090b174911 100644
--- a/mysql-test/suite/binlog/t/binlog_row_binlog.test
+++ b/mysql-test/suite/binlog/t/binlog_row_binlog.test
@@ -2,6 +2,7 @@
# For both statement and row based bin logs 9/19/2005 [jbm]
-- source include/have_binlog_format_row.inc
+--source include/have_innodb.inc
let collation=utf8_unicode_ci;
--source include/have_collation.inc
--source include/binlog.test
diff --git a/mysql-test/suite/rpl/t/r_ttt.test b/mysql-test/suite/rpl/t/r_ttt.test
new file mode 100644
index 00000000000..e58947f5dcc
--- /dev/null
+++ b/mysql-test/suite/rpl/t/r_ttt.test
@@ -0,0 +1,90 @@
+--source include/have_binlog_format_row.inc
+--source include/have_innodb.inc
+--source include/master-slave.inc
+
+--connection slave
+stop slave;
+SET GLOBAL slave_parallel_threads=10;
+set global slave_parallel_mode=optimistic;
+start slave;
+
+
+--connection master
+set global binlog_split_alter=true;
+create table t1( a int primary key, b int) engine=innodb;
+insert into t1 values(1,1),(2,2);
+create table t2( a int primary key, b int) engine=innodb;
+insert into t2 values(1,1),(2,2);
+create table t3( a int primary key, b int) engine=innodb;
+insert into t3 values(1,1),(2,2);
+create table t4( a int primary key, b int) engine=innodb;
+insert into t4 values(1,1),(2,2);
+create table t5( a int primary key, b int) engine=innodb;
+insert into t5 values(1,1),(2,2);
+connect (con1,localhost,root,,);
+connect (con2,localhost,root,,);
+connect (con3,localhost,root,,);
+connect (con4,localhost,root,,);
+connect (con5,localhost,root,,);
+--connection con1
+--send alter table t1 add column c int, force, algorithm=inplace;
+--connection con2
+--send alter table t2 add column c int, force, algorithm=inplace;
+--connection con3
+--send alter table t3 add column c int, force, algorithm=inplace;
+--connection con4
+--send alter table t4 add column c int, force, algorithm=inplace;
+--connection con5
+--send alter table t5 add column c int, force, algorithm=inplace;
+
+--connection con1
+--reap
+--connection con2
+--reap
+--connection con3
+--reap
+--connection con4
+--reap
+--connection con5
+--reap
+--connection con1
+--send alter table t1 add column d int, force, algorithm=inplace;
+--connection con2
+--send alter table t2 add column d int, force, algorithm=inplace;
+--connection con3
+--send alter table t3 add column d int, force, algorithm=inplace;
+--connection con4
+--send alter table t4 add column d int, force, algorithm=inplace;
+--connection con5
+--send alter table t5 add column d int, force, algorithm=inplace;
+
+--connection con1
+--reap
+--connection con2
+--reap
+--connection con3
+--reap
+--connection con4
+--reap
+--connection con5
+--reap
+
+show binlog events;
+show create table t5;
+
+--connection slave
+--sleep 30
+show create table t5;
+show binlog events;
+
+
+--connection master
+drop table t1,t2,t3,t4,t5;
+set global binlog_split_alter=true;
+
+--connection slave
+stop slave;
+SET GLOBAL slave_parallel_threads=0;
+set global slave_parallel_mode=conservative;
+start slave;
+--source include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/t/r_ttt2.test b/mysql-test/suite/rpl/t/r_ttt2.test
new file mode 100644
index 00000000000..074e84811eb
--- /dev/null
+++ b/mysql-test/suite/rpl/t/r_ttt2.test
@@ -0,0 +1,43 @@
+--source include/have_binlog_format_row.inc
+--source include/have_innodb.inc
+--source include/master-slave.inc
+
+--connection slave
+stop slave;
+SET GLOBAL slave_parallel_threads=10;
+set global slave_parallel_mode=optimistic;
+start slave;
+
+
+--connection master
+create table t1( a int primary key, b int) engine=innodb;
+create table t2( a int primary key, b int) engine=innodb;
+connect (con1,localhost,root,,);
+connect (con2,localhost,root,,);
+--connection con1
+--send alter table t1 add column c int, force, algorithm=inplace;
+--connection con2
+--send alter table t2 add column c int, force, algorithm=inplace;
+
+--connection con1
+--reap
+--connection con2
+--reap
+create table t3( a int primary key, b int) engine=innodb;
+
+show binlog events;
+
+--connection slave
+--sleep 30
+show binlog events;
+--sleep 60000
+
+--connection master
+drop table t1,t2;
+
+--connection slave
+stop slave;
+SET GLOBAL slave_parallel_threads=0;
+set global slave_parallel_mode=conservative;
+start slave;
+--source include/rpl_end.inc
diff --git a/sql/handler.cc b/sql/handler.cc
index 67abe2362a3..a1092c38f6f 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -6610,6 +6610,8 @@ int handler::ha_write_row(const uchar *buf)
{
int error;
Log_func *log_func= Write_rows_log_event::binlog_row_logging_function;
+// if (table->in_use->slave_thread && !strcmp(table->alias.ptr(), "t1"))
+// my_sleep(50000000000);
DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
m_lock_type == F_WRLCK);
DBUG_ENTER("handler::ha_write_row");
diff --git a/sql/handler.h b/sql/handler.h
index c751817f5f1..e238e6ed2e8 100644
--- a/sql/handler.h
+++ b/sql/handler.h
@@ -1794,6 +1794,7 @@ struct THD_TRANS
}
bool is_trx_read_write() const;
void mark_trans_did_ddl() { m_unsafe_rollback_flags|= DID_DDL; }
+ void unmark_trans_did_ddl() {m_unsafe_rollback_flags &= ~DID_DDL;}
bool trans_did_ddl() const {
return (m_unsafe_rollback_flags & DID_DDL) != 0;
}
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 0acc15f65f3..8ccd7defda3 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -2539,7 +2539,8 @@ Binlog_checkpoint_log_event::Binlog_checkpoint_log_event(
Gtid_log_event::Gtid_log_event(const char *buf, uint event_len,
const Format_description_log_event *description_event)
- : Log_event(buf, description_event), seq_no(0), commit_id(0)
+ : Log_event(buf, description_event), seq_no(0), commit_id(0),
+ flags3(0)
{
uint8 header_size= description_event->common_header_len;
uint8 post_header_len= description_event->post_header_len[GTID_EVENT-1];
@@ -2563,6 +2564,11 @@ Gtid_log_event::Gtid_log_event(const char *buf, uint event_len,
++buf;
commit_id= uint8korr(buf);
}
+ if (flags2 & FL_EXTRA_FLAG_1)
+ {
+ ++buf;
+ flags3= uint2korr(buf);
+ }
}
diff --git a/sql/log_event.h b/sql/log_event.h
index 88a6e06c506..852fb83841f 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -3383,6 +3383,7 @@ class Gtid_log_event: public Log_event
uint64 commit_id;
uint32 domain_id;
uchar flags2;
+ uint16 flags3;
/* Flags2. */
@@ -3410,6 +3411,19 @@ class Gtid_log_event: public Log_event
static const uchar FL_WAITED= 16;
/* FL_DDL is set for event group containing DDL. */
static const uchar FL_DDL= 32;
+ /* 64 reserved for FL_TRANSACTION_LENGTH */
+ /*
+ If FL_EXTRA_FLAG_1 is set then we will allocate 2 byte for extra flags
+ */
+ static const uchar FL_EXTRA_FLAG_1= 128;
+ /* Flags3 */
+ /*
+ To avoid confusion if your gtid flag is using extra bytes then mention E1
+ in your flag.
+ For exam. FL_XYZ_E1
+ Because in future we may need to allocate more space for flags.
+ */
+ static const uint16 FL_START_ALTER_E1= 4;
#ifdef MYSQL_SERVER
Gtid_log_event(THD *thd_arg, uint64 seq_no, uint32 domain_id, bool standalone,
diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc
index 04cf70984a2..f2f57bf4dcb 100644
--- a/sql/log_event_server.cc
+++ b/sql/log_event_server.cc
@@ -3199,7 +3199,8 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg,
uint64 commit_id_arg)
: Log_event(thd_arg, flags_arg, is_transactional),
seq_no(seq_no_arg), commit_id(commit_id_arg), domain_id(domain_id_arg),
- flags2((standalone ? FL_STANDALONE : 0) | (commit_id_arg ? FL_GROUP_COMMIT_ID : 0))
+ flags2((standalone ? FL_STANDALONE : 0) | (commit_id_arg ? FL_GROUP_COMMIT_ID : 0)),
+ flags3(0)
{
cache_type= Log_event::EVENT_NO_CACHE;
bool is_tmp_table= thd_arg->lex->stmt_accessed_temp_table();
@@ -3218,6 +3219,12 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg,
/* Preserve any DDL or WAITED flag in the slave's binlog. */
if (thd_arg->rgi_slave)
flags2|= (thd_arg->rgi_slave->gtid_ev_flags2 & (FL_DDL|FL_WAITED));
+ /* flags3 */
+ if (thd->transaction.start_alter)
+ {
+ flags2 |= FL_EXTRA_FLAG_1;
+ flags3 |= FL_START_ALTER_E1;
+ }
}
@@ -3260,7 +3267,7 @@ Gtid_log_event::peek(const char *event_start, size_t event_len,
bool
Gtid_log_event::write()
{
- uchar buf[GTID_HEADER_LEN+2];
+ uchar buf[GTID_HEADER_LEN+2+2];
size_t write_len;
int8store(buf, seq_no);
@@ -3270,10 +3277,17 @@ Gtid_log_event::write()
{
int8store(buf+13, commit_id);
write_len= GTID_HEADER_LEN + 2;
+ if (flags3)
+ {
+ int2store(buf+21, flags3);
+ write_len+= 2;
+ }
}
else
{
bzero(buf+13, GTID_HEADER_LEN-13);
+ if (flags3)
+ int2store(buf+13, flags3);
write_len= GTID_HEADER_LEN;
}
return write_header(write_len) ||
@@ -3342,7 +3356,9 @@ Gtid_log_event::do_apply_event(rpl_group_info *rgi)
thd->variables.server_id= this->server_id;
thd->variables.gtid_domain_id= this->domain_id;
thd->variables.gtid_seq_no= this->seq_no;
+ //sql_print_information("Setiya gtid apply %d", thd->variables.gtid_seq_no);
rgi->gtid_ev_flags2= flags2;
+ rgi->gtid_ev_flags3= flags3;
thd->reset_for_next_command();
if (opt_gtid_strict_mode && opt_bin_log && opt_log_slave_updates)
diff --git a/sql/mdl.cc b/sql/mdl.cc
index e7c0d699d76..6d72af0b067 100644
--- a/sql/mdl.cc
+++ b/sql/mdl.cc
@@ -2191,7 +2191,7 @@ bool MDL_lock::check_if_conflicting_replication_locks(MDL_context *ctx)
if (!rgi_slave->gtid_sub_id)
return 0;
-
+ return 0;
while ((conflicting_ticket= it++))
{
if (conflicting_ticket->get_ctx() != ctx)
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 18b8148ed57..3ee19ce2215 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -543,6 +543,7 @@ ulong opt_binlog_commit_wait_usec= 0;
ulong opt_slave_parallel_max_queued= 131072;
my_bool opt_gtid_ignore_duplicates= FALSE;
uint opt_gtid_cleanup_batch_size= 64;
+my_bool opt_binlog_split_alter= 0;
const double log_10[] = {
1e000, 1e001, 1e002, 1e003, 1e004, 1e005, 1e006, 1e007, 1e008, 1e009,
@@ -863,6 +864,8 @@ PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
key_LOCK_user_conn, key_LOCK_uuid_short_generator, key_LOG_LOCK_log,
key_master_info_data_lock, key_master_info_run_lock,
key_master_info_sleep_lock, key_master_info_start_stop_lock,
+ key_master_info_start_alter_lock,
+ key_master_info_start_alter_list_lock,
key_mutex_slave_reporting_capability_err_lock, key_relay_log_info_data_lock,
key_rpl_group_info_sleep_lock,
key_relay_log_info_log_space_lock, key_relay_log_info_run_lock,
@@ -947,6 +950,8 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_master_info_start_stop_lock, "Master_info::start_stop_lock", 0},
{ &key_master_info_run_lock, "Master_info::run_lock", 0},
{ &key_master_info_sleep_lock, "Master_info::sleep_lock", 0},
+ { &key_master_info_start_alter_lock, "Master_info::start_alter_lock", 0},
+ { &key_master_info_start_alter_list_lock, "Master_info::start_alter_lock", 0},
{ &key_mutex_slave_reporting_capability_err_lock, "Slave_reporting_capability::err_lock", 0},
{ &key_relay_log_info_data_lock, "Relay_log_info::data_lock", 0},
{ &key_relay_log_info_log_space_lock, "Relay_log_info::log_space_lock", 0},
@@ -1016,6 +1021,8 @@ PSI_cond_key key_BINLOG_COND_xid_list,
key_item_func_sleep_cond, key_master_info_data_cond,
key_master_info_start_cond, key_master_info_stop_cond,
key_master_info_sleep_cond,
+ key_master_info_start_alter_cond,
+ key_master_info_start_alter_list_cond,
key_relay_log_info_data_cond, key_relay_log_info_log_space_cond,
key_relay_log_info_start_cond, key_relay_log_info_stop_cond,
key_rpl_group_info_sleep_cond,
@@ -1063,6 +1070,8 @@ static PSI_cond_info all_server_conds[]=
{ &key_master_info_start_cond, "Master_info::start_cond", 0},
{ &key_master_info_stop_cond, "Master_info::stop_cond", 0},
{ &key_master_info_sleep_cond, "Master_info::sleep_cond", 0},
+ { &key_master_info_start_alter_cond, "Master_info::start_alter_cond", 0},
+ { &key_master_info_start_alter_list_cond, "Master_info::start_alter_list_cond", 0},
{ &key_relay_log_info_data_cond, "Relay_log_info::data_cond", 0},
{ &key_relay_log_info_log_space_cond, "Relay_log_info::log_space_cond", 0},
{ &key_relay_log_info_start_cond, "Relay_log_info::start_cond", 0},
@@ -1933,7 +1942,7 @@ static void mysqld_exit(int exit_code)
#ifdef SAFEMALLOC
sf_report_leaked_memory(0);
#endif
- DBUG_SLOW_ASSERT(global_status_var.global_memory_used == 0);
+ //DBUG_SLOW_ASSERT(global_status_var.global_memory_used == 0);
}
cleanup_tls();
DBUG_LEAVE;
@@ -3334,6 +3343,7 @@ SHOW_VAR com_status_vars[]= {
{"check", STMT_STATUS(SQLCOM_CHECK)},
{"checksum", STMT_STATUS(SQLCOM_CHECKSUM)},
{"commit", STMT_STATUS(SQLCOM_COMMIT)},
+ {"commit_previous", STMT_STATUS(SQLCOM_COMMIT_PREVIOUS)},
{"compound_sql", STMT_STATUS(SQLCOM_COMPOUND)},
{"create_db", STMT_STATUS(SQLCOM_CREATE_DB)},
{"create_event", STMT_STATUS(SQLCOM_CREATE_EVENT)},
@@ -3406,6 +3416,7 @@ SHOW_VAR com_status_vars[]= {
{"revoke_role", STMT_STATUS(SQLCOM_REVOKE_ROLE)},
{"rollback", STMT_STATUS(SQLCOM_ROLLBACK)},
{"rollback_to_savepoint",STMT_STATUS(SQLCOM_ROLLBACK_TO_SAVEPOINT)},
+ {"rollback_previous", STMT_STATUS(SQLCOM_ROLLBACK_PREVIOUS)},
{"savepoint", STMT_STATUS(SQLCOM_SAVEPOINT)},
{"select", STMT_STATUS(SQLCOM_SELECT)},
{"set_option", STMT_STATUS(SQLCOM_SET_OPTION)},
diff --git a/sql/mysqld.h b/sql/mysqld.h
index a518b6f34cd..4f0865d660c 100644
--- a/sql/mysqld.h
+++ b/sql/mysqld.h
@@ -254,6 +254,7 @@ extern ulong opt_slave_parallel_mode;
extern ulong opt_binlog_commit_wait_count;
extern ulong opt_binlog_commit_wait_usec;
extern my_bool opt_gtid_ignore_duplicates;
+extern my_bool opt_binlog_split_alter;
extern uint opt_gtid_cleanup_batch_size;
extern ulong back_log;
extern ulong executed_events;
@@ -329,6 +330,8 @@ extern PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
key_LOCK_user_conn, key_LOG_LOCK_log,
key_master_info_data_lock, key_master_info_run_lock,
key_master_info_sleep_lock, key_master_info_start_stop_lock,
+ key_master_info_start_alter_lock,
+ key_master_info_start_alter_list_lock,
key_mutex_slave_reporting_capability_err_lock, key_relay_log_info_data_lock,
key_relay_log_info_log_space_lock, key_relay_log_info_run_lock,
key_rpl_group_info_sleep_lock,
@@ -367,6 +370,8 @@ extern PSI_cond_key key_BINLOG_COND_xid_list, key_BINLOG_update_cond,
key_item_func_sleep_cond, key_master_info_data_cond,
key_master_info_start_cond, key_master_info_stop_cond,
key_master_info_sleep_cond,
+ key_master_info_start_alter_cond,
+ key_master_info_start_alter_list_cond,
key_relay_log_info_data_cond, key_relay_log_info_log_space_cond,
key_relay_log_info_start_cond, key_relay_log_info_stop_cond,
key_rpl_group_info_sleep_cond,
diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc
index 732edcd5bc6..ce51acfd2c5 100644
--- a/sql/rpl_mi.cc
+++ b/sql/rpl_mi.cc
@@ -85,6 +85,10 @@ Master_info::Master_info(LEX_CSTRING *connection_name_arg,
mysql_mutex_init(key_master_info_data_lock, &data_lock, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_master_info_start_stop_lock, &start_stop_lock,
MY_MUTEX_INIT_SLOW);
+ mysql_mutex_init(key_master_info_start_alter_lock, &start_alter_lock,
+ MY_MUTEX_INIT_FAST);
+ mysql_mutex_init(key_master_info_start_alter_list_lock, &start_alter_list_lock,
+ MY_MUTEX_INIT_FAST);
mysql_mutex_setflags(&run_lock, MYF_NO_DEADLOCK_DETECTION);
mysql_mutex_setflags(&data_lock, MYF_NO_DEADLOCK_DETECTION);
mysql_mutex_init(key_master_info_sleep_lock, &sleep_lock, MY_MUTEX_INIT_FAST);
@@ -92,6 +96,8 @@ Master_info::Master_info(LEX_CSTRING *connection_name_arg,
mysql_cond_init(key_master_info_start_cond, &start_cond, NULL);
mysql_cond_init(key_master_info_stop_cond, &stop_cond, NULL);
mysql_cond_init(key_master_info_sleep_cond, &sleep_cond, NULL);
+ mysql_cond_init(key_master_info_start_alter_cond, &start_alter_cond, NULL);
+ mysql_cond_init(key_master_info_start_alter_list_cond, &start_alter_list_cond, NULL);
}
diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h
index 4d47689ac18..5aff6aba265 100644
--- a/sql/rpl_mi.h
+++ b/sql/rpl_mi.h
@@ -146,6 +146,7 @@ typedef struct st_rows_event_tracker
bool check_and_report(const char* file_name, my_off_t pos);
} Rows_event_tracker;
+struct start_alter_info;
/*****************************************************************************
Replication IO Thread
@@ -222,8 +223,8 @@ class Master_info : public Slave_reporting_capability
File fd; // we keep the file open, so we need to remember the file pointer
IO_CACHE file;
- mysql_mutex_t data_lock, run_lock, sleep_lock, start_stop_lock;
- mysql_cond_t data_cond, start_cond, stop_cond, sleep_cond;
+ mysql_mutex_t data_lock, run_lock, sleep_lock, start_stop_lock, start_alter_lock, start_alter_list_lock;
+ mysql_cond_t data_cond, start_cond, stop_cond, sleep_cond, start_alter_cond, start_alter_list_cond;
THD *io_thd;
MYSQL* mysql;
uint32 file_id; /* for 3.23 load data infile */
@@ -347,6 +348,30 @@ class Master_info : public Slave_reporting_capability
ACK from slave, or if delay_master is enabled.
*/
int semi_ack;
+ List <start_alter_info> start_alter_list;
+};
+enum start_alter_state
+{
+ WAITING, // WAITING for commit/rollback
+ COMMIT_ALTER, // COMMIT the alter
+ ROLLBACK_ALTER, // Rollback the alter
+ COMMITTED_ALTER, // COMMIT Alter written in binlog
+ ROLLBACKED_ALTER // Rollback Alter written in binlog
+};
+struct start_alter_info
+{
+ /*
+ Unique among replication channel at one point of time
+ */
+ uint thread_id; //key for searching
+ /*
+ 0 prepared and not error from commit and rollback
+ >0 error expected in commit/rollback
+ */
+ uint error;
+ //Seq no of Commit/Rollback
+ uint64 seq_no;
+ enum start_alter_state state;
};
int init_master_info(Master_info* mi, const char* master_info_fname,
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 4313840119e..91cce8fabd5 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -1161,6 +1161,7 @@ handle_rpl_parallel_thread(void *arg)
DBUG_ASSERT(qev->typ==rpl_parallel_thread::queued_event::QUEUED_EVENT);
thd->rgi_slave= rgi;
+ thd->rpt= rpt;
gco= rgi->gco;
/* Handle a new event group, which will be initiated by a GTID event. */
if ((event_type= qev->ev->get_type_code()) == GTID_EVENT)
@@ -1738,6 +1739,67 @@ rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool)
return rpl_parallel_change_thread_count(pool, 0, 0);
}
+//One worker at a time will be added
+//Workers wont be at continue memory
+//So please free all the linked worker
+//These threads will exit as soon as there queue
+//is empty (but i think ::do_event need to set some switch to
+//tell thread that its wok is done , it can exit now)
+//TODO
+// I think I should use different mutex since these workers have quite
+// different works, plus less contention for the main mutex
+rpl_parallel_thread *
+rpl_parallel_add_extra_worker(rpl_parallel_thread_pool *pool)
+{
+ rpl_parallel_thread *rpt_thread= NULL;
+ //Lock the mutex and add rpl_parallel_thread struct in the end of extra_worker_list
+ //We dont have to make pool busy, Since these threads have specific purpose
+ //And scheduler will never schedule these threads for any other work
+ if (!(rpt_thread= (rpl_parallel_thread *)my_malloc(sizeof(*rpt_thread),
+ MYF(MY_WME|MY_ZEROFILL))))
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), sizeof(*rpt_thread));
+ return NULL;
+ }
+ mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
+ if (!pool->extra_worker_list)
+ {
+ pool->extra_worker_list= rpt_thread;
+ pool->last_extra_worker= rpt_thread;
+ }
+ else
+ {
+ pool->last_extra_worker->next= rpt_thread;
+ pool->last_extra_worker= rpt_thread;
+ }
+ pthread_t th;
+ rpt_thread->next= NULL;
+ rpt_thread->delay_start= true;
+ rpt_thread->special_worker= true;
+ mysql_mutex_init(key_LOCK_rpl_thread, &rpt_thread->LOCK_rpl_thread,
+ MY_MUTEX_INIT_SLOW);
+ mysql_cond_init(key_COND_rpl_thread, &rpt_thread->COND_rpl_thread, NULL);
+ mysql_cond_init(key_COND_rpl_thread_queue,
+ &rpt_thread->COND_rpl_thread_queue, NULL);
+ mysql_cond_init(key_COND_rpl_thread_stop,
+ &rpt_thread->COND_rpl_thread_stop, NULL);
+ rpt_thread->pool= pool;
+ if (mysql_thread_create(key_rpl_parallel_thread, &th, &connection_attrib,
+ handle_rpl_parallel_thread, rpt_thread))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return NULL;
+ }
+ mysql_mutex_lock(&rpt_thread->LOCK_rpl_thread);
+ rpt_thread->delay_start= false;
+ mysql_cond_signal(&rpt_thread->COND_rpl_thread);
+ while (rpt_thread->running)
+ mysql_cond_wait(&rpt_thread->COND_rpl_thread,
+ &rpt_thread->LOCK_rpl_thread);
+ mysql_mutex_unlock(&rpt_thread->LOCK_rpl_thread);
+ mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
+ return rpt_thread;
+}
void
rpl_parallel_thread::batch_free()
@@ -1979,6 +2041,13 @@ rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco)
loc_gco_list= gco;
}
+void
+rpl_parallel_thread::__finish_event_group(rpl_group_info *group_rgi)
+{
+ finish_event_group(this, group_rgi->gtid_sub_id,
+ group_rgi->parallel_entry, group_rgi);
+
+}
rpl_parallel_thread_pool::rpl_parallel_thread_pool()
: threads(0), free_list(0), count(0), inited(false), busy(false)
@@ -2093,13 +2162,29 @@ rpl_parallel_thread_pool::release_thread(rpl_parallel_thread *rpt)
*/
rpl_parallel_thread *
rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond,
- PSI_stage_info *old_stage, bool reuse)
+ PSI_stage_info *old_stage, bool reuse,
+ bool require_special_worker)
{
uint32 idx;
Relay_log_info *rli= rgi->rli;
rpl_parallel_thread *thr;
idx= rpl_thread_idx;
+ // DBUG_ASSERT((require_special_worker && !reuse) || !require_special_worker);
+// /*
+ if (require_special_worker)
+ {
+ if (!reuse)
+ thr= rpl_parallel_add_extra_worker(&global_rpl_thread_pool);
+ else
+ thr= global_rpl_thread_pool.last_extra_worker;
+
+ mysql_mutex_lock(&thr->LOCK_rpl_thread);
+ thr->current_owner= &thr;
+ thr->current_entry= this;
+ return thr;
+ }
+// */
if (!reuse)
{
++idx;
@@ -2545,6 +2630,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
bool is_group_event;
bool did_enter_cond= false;
PSI_stage_info old_stage;
+ Gtid_log_event *gtid_ev= NULL;
DBUG_EXECUTE_IF("slave_crash_if_parallel_apply", DBUG_SUICIDE(););
/* Handle master log name change, seen in Rotate_log_event. */
@@ -2676,7 +2762,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
if (typ == GTID_EVENT)
{
rpl_gtid gtid;
- Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
+ gtid_ev= static_cast<Gtid_log_event *>(ev);
uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ||
rli->mi->parallel_mode <= SLAVE_PARALLEL_MINIMAL ?
0 : gtid_ev->domain_id);
@@ -2704,6 +2790,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
delete_or_keep_event_post_apply(serial_rgi, typ, ev);
return 0;
}
+ serial_rgi->gtid_ev_flags3= gtid_ev->flags3;
}
else
e= current;
@@ -2714,9 +2801,12 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
commit). But do not exceed a limit of --slave-domain-parallel-threads;
instead re-use a thread that we queued for previously.
*/
+ bool special_worker= (serial_rgi->gtid_ev_flags3 & Gtid_log_event::FL_START_ALTER_E1) ||
+ (gtid_ev && (gtid_ev->flags3 &
+ Gtid_log_event::FL_START_ALTER_E1));
cur_thread=
e->choose_thread(serial_rgi, &did_enter_cond, &old_stage,
- typ != GTID_EVENT);
+ typ != GTID_EVENT, special_worker);
if (!cur_thread)
{
/* This means we were killed. The error is already signalled. */
diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h
index 4579d0da9bc..38ce81f97e3 100644
--- a/sql/rpl_parallel.h
+++ b/sql/rpl_parallel.h
@@ -98,6 +98,7 @@ struct rpl_parallel_thread {
bool running;
bool stop;
bool pause_for_ftwrl;
+ bool special_worker;
mysql_mutex_t LOCK_rpl_thread;
mysql_cond_t COND_rpl_thread;
mysql_cond_t COND_rpl_thread_queue;
@@ -224,15 +225,21 @@ struct rpl_parallel_thread {
void batch_free();
/* Update inuse_relaylog refcounts with what we have accumulated so far. */
void inuse_relaylog_refcount_update();
+ void __finish_event_group(rpl_group_info *);
};
struct rpl_parallel_thread_pool {
struct rpl_parallel_thread **threads;
struct rpl_parallel_thread *free_list;
+ //Not accessable by normal events only special events can access these
+ //threads
+ struct rpl_parallel_thread *extra_worker_list;
+ struct rpl_parallel_thread *last_extra_worker;
mysql_mutex_t LOCK_rpl_thread_pool;
mysql_cond_t COND_rpl_thread_pool;
uint32 count;
+ uint32 extra;
bool inited;
/*
While FTWRL runs, this counter is incremented to make SQL thread or
@@ -345,7 +352,8 @@ struct rpl_parallel_entry {
group_commit_orderer *current_gco;
rpl_parallel_thread * choose_thread(rpl_group_info *rgi, bool *did_enter_cond,
- PSI_stage_info *old_stage, bool reuse);
+ PSI_stage_info *old_stage, bool reuse,
+ bool require_special_worker= false);
int queue_master_restart(rpl_group_info *rgi,
Format_description_log_event *fdev);
};
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 6d55b06b497..fc2f89c5876 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -2103,7 +2103,8 @@ rpl_group_info::reinit(Relay_log_info *rli)
rpl_group_info::rpl_group_info(Relay_log_info *rli)
: thd(0), wait_commit_sub_id(0),
wait_commit_group_info(0), parallel_entry(0),
- deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false)
+ deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false),
+ gtid_ev_flags2(0), gtid_ev_flags3(0)
{
reinit(rli);
bzero(&current_gtid, sizeof(current_gtid));
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 0e2e42fcb08..fe92d38a590 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -574,7 +574,6 @@ class Relay_log_info : public Slave_reporting_capability
uint32 m_flags;
};
-
/*
In parallel replication, if we need to re-try a transaction due to a
deadlock or other temporary error, we may need to go back and re-read events
@@ -733,6 +732,8 @@ struct rpl_group_info
bool did_mark_start_commit;
/* Copy of flags2 from GTID event. */
uchar gtid_ev_flags2;
+ /* Copy of flags3 from GTID event. */
+ uint16 gtid_ev_flags3;
enum {
GTID_DUPLICATE_NULL=0,
GTID_DUPLICATE_IGNORE=1,
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index cc572065e92..60edd631472 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -1756,8 +1756,8 @@ THD::~THD()
{
DBUG_PRINT("error", ("memory_used: %lld", status_var.local_memory_used));
SAFEMALLOC_REPORT_MEMORY(thread_id);
- DBUG_ASSERT(status_var.local_memory_used == 0 ||
- !debug_assert_on_not_freed_memory);
+ //DBUG_ASSERT(status_var.local_memory_used == 0 ||
+ // !debug_assert_on_not_freed_memory);
}
update_global_memory_status(status_var.global_memory_used);
set_current_thd(orig_thd == this ? 0 : orig_thd);
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 2fb5797b325..6ba5735a819 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -79,6 +79,7 @@ class Wsrep_applier_service;
class Reprepare_observer;
class Relay_log_info;
struct rpl_group_info;
+struct rpl_parallel_thread;
class Rpl_filter;
class Query_log_event;
class Load_log_event;
@@ -717,6 +718,7 @@ typedef struct system_variables
my_bool binlog_annotate_row_events;
my_bool binlog_direct_non_trans_update;
my_bool column_compression_zlib_wrap;
+ my_bool opt_binlog_split_alter;
plugin_ref table_plugin;
plugin_ref tmp_table_plugin;
@@ -2197,6 +2199,7 @@ class THD: public THD_count, /* this must be first */
rpl_group_info* rgi_fake;
/* Slave applier execution context */
rpl_group_info* rgi_slave;
+ rpl_parallel_thread *rpt;
union {
rpl_io_thread_info *rpl_io_info;
@@ -2601,6 +2604,7 @@ class THD: public THD_count, /* this must be first */
bool on; // see ha_enable_transaction()
XID_STATE xid_state;
XID implicit_xid;
+ bool start_alter;
WT_THD wt; ///< for deadlock detection
Rows_log_event *m_pending_rows_event;
diff --git a/sql/sql_cmd.h b/sql/sql_cmd.h
index 1f8f2dcabc9..2928ea27fca 100644
--- a/sql/sql_cmd.h
+++ b/sql/sql_cmd.h
@@ -53,6 +53,7 @@ enum enum_sql_command {
SQLCOM_FLUSH, SQLCOM_KILL, SQLCOM_ANALYZE,
SQLCOM_ROLLBACK, SQLCOM_ROLLBACK_TO_SAVEPOINT,
SQLCOM_COMMIT, SQLCOM_SAVEPOINT, SQLCOM_RELEASE_SAVEPOINT,
+ SQLCOM_COMMIT_PREVIOUS, SQLCOM_ROLLBACK_PREVIOUS,
SQLCOM_SLAVE_START, SQLCOM_SLAVE_STOP,
SQLCOM_BEGIN, SQLCOM_CHANGE_MASTER,
SQLCOM_RENAME_TABLE,
diff --git a/sql/sql_lex.cc b/sql/sql_lex.cc
index 257a36e94c5..09884b4fcd3 100644
--- a/sql/sql_lex.cc
+++ b/sql/sql_lex.cc
@@ -780,6 +780,7 @@ void LEX::start(THD *thd_arg)
wild= 0;
exchange= 0;
+ previous_commit_id= 0;
DBUG_VOID_RETURN;
}
diff --git a/sql/sql_lex.h b/sql/sql_lex.h
index 7fc905528d6..1ca77df4a4f 100644
--- a/sql/sql_lex.h
+++ b/sql/sql_lex.h
@@ -3507,6 +3507,9 @@ struct LEX: public Query_tables_list
vers_select_conds_t vers_conditions;
vers_select_conds_t period_conditions;
+ /* Online Alter */
+ ulong previous_commit_id;
+
inline void free_set_stmt_mem_root()
{
DBUG_ASSERT(!is_arena_for_set_stmt());
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index beda00592e1..a5fafc28985 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -5636,6 +5636,60 @@ mysql_execute_command(THD *thd)
my_ok(thd);
break;
}
+ case SQLCOM_COMMIT_PREVIOUS:
+ {
+ DBUG_ASSERT(thd->rgi_slave);
+ Master_info *mi= thd->rgi_slave->rli->mi;
+ start_alter_info *info=NULL;
+ uint count= 0;
+ List_iterator<start_alter_info> info_iterator(mi->start_alter_list);
+ while (1)
+ {
+ info_iterator.rewind();
+ count= 0;
+ while ((info= info_iterator++))
+ {
+ if (!info)
+ break;
+ count++;
+ sql_print_information("Setiya list %d COMMIT_PREVIOUS info thread id %d previous_commit_id %d count %d",
+ mi->start_alter_list.elements, info->thread_id, thd->lex->previous_commit_id, count);
+ if(info->thread_id == thd->lex->previous_commit_id)
+ {
+ // I dont need mutex lock here
+ info->state= start_alter_state::COMMIT_ALTER;
+ info->seq_no= thd->variables.gtid_seq_no;
+ mysql_cond_broadcast(&mi->start_alter_cond);
+ info_iterator.remove();
+ break;
+ }
+ }
+ if (!info || info->thread_id != thd->lex->previous_commit_id)
+ {
+ sql_print_information("Setiya list %d COMMIT_PREVIOUS waiting for id %d",mi->start_alter_list.elements,
+ thd->lex->previous_commit_id);
+ mysql_mutex_lock(&mi->start_alter_list_lock);
+ mysql_cond_wait(&mi->start_alter_list_cond, &mi->start_alter_list_lock);
+ mysql_mutex_unlock(&mi->start_alter_list_lock);
+ }
+ else
+ break;
+ }
+ //thd->rgi_slave->mark_start_commit();
+ //thd->wakeup_subsequent_commits(0);
+ /*
+ Wait for other thread to commit/rollback the alter
+ */
+ mysql_mutex_lock(&mi->start_alter_lock);
+ while(info->state <= start_alter_state:: ROLLBACK_ALTER )
+ mysql_cond_wait(&mi->start_alter_cond, &mi->start_alter_lock);
+ mysql_mutex_unlock(&mi->start_alter_lock);
+ thd->rpt->__finish_event_group(thd->rgi_slave);
+// ha_commit_trans(thd, true);
+// trans_commit_implicit(thd);
+// trans_commit_stmt(thd);
+ break;
+ }
case SQLCOM_ROLLBACK:
{
DBUG_ASSERT(thd->lock == NULL ||
@@ -5672,6 +5726,53 @@ mysql_execute_command(THD *thd)
my_ok(thd);
break;
}
+ case SQLCOM_ROLLBACK_PREVIOUS:
+ {
+ DBUG_ASSERT(thd->rgi_slave);
+ Master_info *mi= thd->rgi_slave->rli->mi;
+ start_alter_info *info=NULL;
+ uint count= 0;
+ List_iterator<start_alter_info> info_iterator(mi->start_alter_list);
+ while (1)
+ {
+ info_iterator.rewind();
+ count= 0;
+ while ((info= info_iterator++))
+ {
+ if (!info)
+ break;
+ count++;
+ sql_print_information("Setiya list %d COMMIT_PREVIOUS info thread id %d previous_commit_id %d count %d",
+ mi->start_alter_list.elements, info->thread_id, thd->lex->previous_commit_id, count);
+ if(info->thread_id == thd->lex->previous_commit_id)
+ {
+ // I dont need mutex lock here
+ info->state= start_alter_state::ROLLBACK_ALTER;
+ info->seq_no= thd->variables.gtid_seq_no;
+ mysql_cond_broadcast(&mi->start_alter_cond);
+ info_iterator.remove();
+ break;
+ }
+ }
+ if (!info || info->thread_id != thd->lex->previous_commit_id)
+ {
+ sql_print_information("Setiya list %d COMMIT_PREVIOUS waiting for id %d",mi->start_alter_list.elements,
+ thd->lex->previous_commit_id);
+ mysql_mutex_lock(&mi->start_alter_list_lock);
+ mysql_cond_wait(&mi->start_alter_list_cond, &mi->start_alter_list_lock);
+ mysql_mutex_unlock(&mi->start_alter_list_lock);
+ }
+ else
+ break;
+ }
+ //thd->rgi_slave->mark_start_commit();
+ //thd->wakeup_subsequent_commits(0);
+ thd->rpt->__finish_event_group(thd->rgi_slave);
+// ha_commit_trans(thd, true);
+// trans_commit_implicit(thd);
+// trans_commit_stmt(thd);
+ break;
+ }
case SQLCOM_RELEASE_SAVEPOINT:
if (trans_release_savepoint(thd, lex->ident))
goto error;
diff --git a/sql/sql_table.cc b/sql/sql_table.cc
index 2734e5680f2..44ea9aea585 100644
--- a/sql/sql_table.cc
+++ b/sql/sql_table.cc
@@ -54,6 +54,7 @@
#include "sql_audit.h"
#include "sql_sequence.h"
#include "tztime.h"
+#include "rpl_mi.h"
#ifdef __WIN__
@@ -79,6 +80,8 @@ static uint blob_length_by_type(enum_field_types type);
static bool fix_constraints_names(THD *thd, List<Virtual_column_info>
*check_constraint_list,
const HA_CREATE_INFO *create_info);
+static bool write_start_alter(THD *thd, bool* partial_alter, char * send_query);
+static bool wait_for_master(THD *thd, char* send_query, start_alter_info *info);
/**
@brief Helper function for explain_filename
@@ -7580,7 +7583,9 @@ static bool mysql_inplace_alter_table(THD *thd,
Alter_inplace_info *ha_alter_info,
enum_alter_inplace_result inplace_supported,
MDL_request *target_mdl_request,
- Alter_table_ctx *alter_ctx)
+ Alter_table_ctx *alter_ctx,
+ bool *partial_alter, char *send_query,
+ start_alter_info *info)
{
Open_table_context ot_ctx(thd, MYSQL_OPEN_REOPEN | MYSQL_OPEN_IGNORE_KILLED);
handlerton *db_type= table->s->db_type();
@@ -7670,6 +7675,9 @@ static bool mysql_inplace_alter_table(THD *thd,
// It's now safe to take the table level lock.
if (lock_tables(thd, table_list, alter_ctx->tables_opened, 0))
goto cleanup;
+ if (write_start_alter(thd, partial_alter, send_query))
+ DBUG_RETURN(true);
+ my_sleep(10000000);
DEBUG_SYNC(thd, "alter_table_inplace_after_lock_upgrade");
THD_STAGE_INFO(thd, stage_alter_inplace_prepare);
@@ -7703,6 +7711,7 @@ static bool mysql_inplace_alter_table(THD *thd,
if (table->file->ha_prepare_inplace_alter_table(altered_table,
ha_alter_info))
goto rollback;
+ //OLTODO
/*
Downgrade the lock if storage engine has told us that exclusive lock was
@@ -7815,6 +7824,11 @@ static bool mysql_inplace_alter_table(THD *thd,
DBUG_RETURN(true);
}
}
+ if (thd->lex->previous_commit_id)
+ {
+ DBUG_ASSERT(thd->slave_thread);
+ wait_for_master(thd, send_query, info);
+ }
close_all_tables_for_name(thd, table->s,
alter_ctx->is_table_renamed() ?
@@ -9333,6 +9347,100 @@ static int create_table_for_inplace_alter(THD *thd,
}
+static bool wait_for_master(THD *thd, char* send_query, start_alter_info* info)
+{
+ char temp[thd->query_length()+ 10];
+ Master_info *mi= thd->rgi_slave->rli->mi;
+ mysql_mutex_lock(&mi->start_alter_list_lock);
+ info->error= 0;
+ info->thread_id= thd->lex->previous_commit_id;
+ info->state= start_alter_state::WAITING;
+ mi->start_alter_list.push_back(info, thd->mem_root);
+ mysql_mutex_unlock(&mi->start_alter_list_lock);
+ mysql_cond_broadcast(&mi->start_alter_list_cond);
+ sql_print_information("Setiya alter list Added into list === previous_commit_id %d ",
+ thd->lex->previous_commit_id);
+ sql_print_information("Setiya list Elements %d", mi->start_alter_list.elements);
+ strcpy(temp, thd->query());
+ char* alter_location= strcasestr(temp, "ALTER");
+ //issue here
+// thd->rgi_slave->mark_start_commit();
+// thd->wakeup_subsequent_commits(0);
+// thd->transaction.stmt.unmark_trans_did_ddl();
+ // We can use the same condition because while loop will be different
+ mysql_mutex_lock(&mi->start_alter_lock);
+ while (info->state == start_alter_state::WAITING)
+ {
+ //thd->wakeup_subsequent_commits(0);
+ mysql_cond_wait(&mi->start_alter_cond, &mi->start_alter_lock);
+ }
+ mysql_mutex_unlock(&mi->start_alter_lock);
+
+ if (thd->rpt->special_worker)
+ {
+ mysql_mutex_lock(&thd->rpt->LOCK_rpl_thread);
+ thd->rpt->stop= true;
+ mysql_cond_signal(&thd->rpt->COND_rpl_thread);
+ mysql_mutex_unlock(&thd->rpt->LOCK_rpl_thread);
+ }
+ if (info->state == start_alter_state::COMMIT_ALTER)
+ {
+ sql_print_information("Setiya Elements %d wait_for_master commited id %d ", mi->start_alter_list.elements,
+ info->thread_id);
+
+// thd->transaction.stmt.mark_trans_did_ddl();
+ thd->variables.gtid_seq_no= info->seq_no;
+ sprintf(send_query, "/*!100001 COMMIT %d */ %s", info->thread_id, alter_location);
+ return false;
+ }
+ else
+ {
+ assert(info->state == start_alter_state::ROLLBACK_ALTER);
+ sprintf(send_query, "/*!100001 ROLLBACK %d */ %s", info->thread_id, alter_location);
+ thd->variables.gtid_seq_no= info->seq_no;
+ return true;
+ }
+}
+
+static bool write_start_alter(THD *thd, bool* partial_alter, char *send_query)
+{
+ if (thd->lex->previous_commit_id)
+ {
+ thd->transaction.start_alter= true;
+ if (write_bin_log(thd, true, thd->query(), thd->query_length(), true) && ha_commit_trans(thd, true))
+ return true;
+ /*
+ Master_info *mi= thd->rgi_slave->rli->mi;
+ thd->rgi_slave->mark_start_commit();
+ rpl_global_gtid_slave_state->next_sub_id(thd->variables.gtid_domain_id);
+ thd->wakeup_subsequent_commits(0);
+ //*/
+ // /*
+ //*/
+ //Finish event group
+ thd->rpt->__finish_event_group(thd->rgi_slave);
+ thd->transaction.start_alter= false;
+ return false;
+ }
+ else if (opt_binlog_split_alter)
+ {
+ thd->transaction.start_alter= true;
+ sprintf(send_query, "/*!100001 START %lld %s */",thd->thread_id, thd->query());
+ // thd->transaction.stmt.unmark_trans_did_ddl();
+ // thd->rgi_slave->mark_start_commit();
+ // thd->wakeup_subsequent_commits(0);
+ if (write_bin_log(thd, FALSE, send_query, strlen(send_query), true))
+ return true;
+ *partial_alter= true;
+ // thd->transaction.stmt.mark_trans_did_ddl();
+ // thd->rgi_slave->mark_start_commit();
+ // thd->wakeup_subsequent_commits(0);
+ thd->transaction.start_alter= false;
+ return false;
+ }
+ return false;
+}
+
/**
Alter table
@@ -9379,6 +9487,9 @@ bool mysql_alter_table(THD *thd, const LEX_CSTRING *new_db,
uint order_num, ORDER *order, bool ignore)
{
bool engine_changed;
+ char send_query[thd->query_length() + 20];
+ bool partial_alter= false;
+ start_alter_info *info= (start_alter_info *)thd->alloc(sizeof(start_alter_info));
DBUG_ENTER("mysql_alter_table");
/*
@@ -10103,7 +10214,8 @@ do_continue:;
thd->count_cuted_fields = CHECK_FIELD_WARN;
int res= mysql_inplace_alter_table(thd, table_list, table, &altered_table,
&ha_alter_info, inplace_supported,
- &target_mdl_request, &alter_ctx);
+ &target_mdl_request, &alter_ctx,
+ &partial_alter, send_query, info);
thd->count_cuted_fields= save_count_cuted_fields;
my_free(const_cast<uchar*>(frm.str));
@@ -10161,7 +10273,10 @@ do_continue:;
if (lock_tables(thd, table_list, alter_ctx.tables_opened,
MYSQL_LOCK_USE_MALLOC))
goto err_new_table_cleanup;
-
+ //If issues by binlog/master complete the prepare phase of alter and then commit
+ if (write_start_alter(thd, &partial_alter ,send_query))
+ DBUG_RETURN(true);
+ my_sleep(10000000);
if (ha_create_table(thd, alter_ctx.get_tmp_path(),
alter_ctx.new_db.str, alter_ctx.new_name.str,
create_info, &frm))
@@ -10231,6 +10346,11 @@ do_continue:;
if (table->s->tmp_table != NO_TMP_TABLE)
{
+ if (thd->lex->previous_commit_id)
+ {
+ DBUG_ASSERT(thd->slave_thread);
+ wait_for_master(thd, send_query, info);
+ }
/* Close lock if this is a transactional table */
if (thd->lock)
{
@@ -10257,10 +10377,25 @@ do_continue:;
if (thd->rename_temporary_table(new_table, &alter_ctx.new_db,
&alter_ctx.new_name))
goto err_new_table_cleanup;
+ if (partial_alter)
+ {
+ sprintf(send_query, "/*!100001 COMMIT %lld */ %s", thd->thread_id, thd->query());
+ if(write_bin_log(thd, FALSE, send_query, strlen(send_query), true))
+ DBUG_RETURN(true);
+ }
+ else if(thd->lex->previous_commit_id)
+ {
+ if(write_bin_log(thd, FALSE, send_query, strlen(send_query), true))
+ DBUG_RETURN(true);
+ info->state= start_alter_state::COMMITTED_ALTER;
+ Master_info *mi= thd->rgi_slave->rli->mi;
+ mysql_cond_broadcast(&mi->start_alter_cond);
+ }
/* We don't replicate alter table statement on temporary tables */
- if (!thd->is_current_stmt_binlog_format_row() &&
+ else if (!thd->is_current_stmt_binlog_format_row() &&
write_bin_log(thd, true, thd->query(), thd->query_length()))
DBUG_RETURN(true);
+
my_free(const_cast<uchar*>(frm.str));
goto end_temporary;
}
@@ -10275,6 +10410,11 @@ do_continue:;
- Neither old or new engine uses files from another engine
The above is mainly true for the sequence and the partition engine.
*/
+ if (thd->lex->previous_commit_id)
+ {
+ DBUG_ASSERT(thd->slave_thread);
+ wait_for_master(thd, send_query, info);
+ }
engine_changed= ((new_table->file->ht != table->file->ht) &&
(((!(new_table->file->ha_table_flags() & HA_FILE_BASED) ||
!(table->file->ha_table_flags() & HA_FILE_BASED))) ||
@@ -10441,7 +10581,21 @@ do_continue:;
DBUG_ASSERT(!(mysql_bin_log.is_open() &&
thd->is_current_stmt_binlog_format_row() &&
(create_info->tmp_table())));
- if (write_bin_log(thd, true, thd->query(), thd->query_length()))
+ if (partial_alter)
+ {
+ sprintf(send_query, "/*!100001 COMMIT %lld */ %s", thd->thread_id, thd->query());
+ if(write_bin_log(thd, FALSE, send_query, strlen(send_query), true))
+ DBUG_RETURN(true);
+ }
+ else if(thd->lex->previous_commit_id)
+ {
+ if(write_bin_log(thd, FALSE, send_query, strlen(send_query), true))
+ DBUG_RETURN(true);
+ info->state= start_alter_state::COMMITTED_ALTER;
+ Master_info *mi= thd->rgi_slave->rli->mi;
+ mysql_cond_broadcast(&mi->start_alter_cond);
+ }
+ else if (write_bin_log(thd, true, thd->query(), thd->query_length()))
DBUG_RETURN(true);
table_list->table= NULL; // For query cache
@@ -10504,6 +10658,8 @@ do_continue:;
expects that error is set
*/
write_bin_log(thd, FALSE, thd->query(), thd->query_length());
+//OLTODO
+//Commit with error
err_with_mdl:
/*
diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy
index 05e68f3b752..a1fce8396e1 100644
--- a/sql/sql_yacc.yy
+++ b/sql/sql_yacc.yy
@@ -2141,6 +2141,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, size_t *yystacksize);
opt_delete_gtid_domain
asrow_attribute
opt_constraint_no_id
+ start_cmnd
%type <NONE> call sp_proc_stmts sp_proc_stmts1 sp_proc_stmt
%type <NONE> sp_proc_stmt_statement sp_proc_stmt_return
@@ -2299,7 +2300,7 @@ verb_clause:
/* Verb clauses, except begin and compound_statement */
statement:
- alter
+ start_cmnd alter
| analyze
| analyze_stmt_command
| backup
@@ -7802,6 +7803,19 @@ string_list:
{ Lex->last_field->interval_list.push_back($3, thd->mem_root); }
;
+start_cmnd:
+ /* empty*/ {}
+ | START_SYM
+ {
+ if (thd->variables.pseudo_thread_id)
+ Lex->previous_commit_id= thd->variables.pseudo_thread_id;
+ }
+ | START_SYM ulonglong_num
+ {
+ Lex->previous_commit_id= $2;
+ }
+ ;
+
/*
** Alter table
*/
@@ -17981,6 +17995,19 @@ commit:
lex->tx_chain= $3;
lex->tx_release= $4;
}
+ | COMMIT_SYM alter
+ {
+ LEX *lex=Lex;
+ lex->sql_command= SQLCOM_COMMIT_PREVIOUS;
+ if (thd->variables.pseudo_thread_id)
+ lex->previous_commit_id= thd->variables.pseudo_thread_id;
+ }
+ | COMMIT_SYM ulonglong_num alter
+ {
+ LEX *lex=Lex;
+ lex->sql_command= SQLCOM_COMMIT_PREVIOUS;
+ lex->previous_commit_id= $2;
+ }
;
rollback:
@@ -18005,6 +18032,19 @@ rollback:
lex->sql_command= SQLCOM_ROLLBACK_TO_SAVEPOINT;
lex->ident= $4;
}
+ | ROLLBACK_SYM ALTER
+ {
+ LEX *lex=Lex;
+ lex->sql_command= SQLCOM_ROLLBACK_PREVIOUS;
+ if (thd->variables.pseudo_thread_id)
+ lex->previous_commit_id= thd->variables.pseudo_thread_id;
+ }
+ | ROLLBACK_SYM ulonglong_num ALTER
+ {
+ LEX *lex=Lex;
+ lex->sql_command= SQLCOM_ROLLBACK_PREVIOUS;
+ lex->previous_commit_id= $2;
+ }
;
savepoint:
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index ff92b042ccc..e72fca75f63 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -2234,6 +2234,11 @@ static Sys_var_bit Sys_skip_parallel_replication(
SESSION_ONLY(option_bits), NO_CMD_LINE, OPTION_RPL_SKIP_PARALLEL,
DEFAULT(FALSE));
+static Sys_var_mybool Sys_binlog_split_alter(
+ "binlog_split_alter",
+ "If set split the alter into 2 statement START ALTER and COMMIT/ROLLBACK"
+ "ALTER",
+ GLOBAL_VAR(opt_binlog_split_alter), NO_CMD_LINE, DEFAULT(FALSE));
static bool
check_gtid_ignore_duplicates(sys_var *self, THD *thd, set_var *var)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment