Commit 6ade8b29 authored by Sachin's avatar Sachin

Some things are working

parent 3198c4fe
--source include/have_binlog_format_row.inc
--source include/have_innodb.inc
create table t1( a int primary key, b int);
insert into t1 values(1,1);
insert into t1 values(2,2);
alter table t1 add column c int;
show binlog events;
drop table t1;
!include my.cnf
[mysqld.1]
log-bin
log-slave-updates
[mysqld.2]
log-bin
log-slave-updates
[mysqld.3]
log-bin
log-slave-updates
[mysqld.4]
server-id=4
log-bin=server4-bin
log-slave-updates
[ENV]
SERVER_MYPORT_4= @mysqld.4.port
SERVER_MYSOCK_4= @mysqld.4.socket
--source include/not_embedded.inc
--source include/have_innodb.inc
--source include/have_debug.inc
--connect (server_1,127.0.0.1,root,,,$SERVER_MYPORT_1)
--connect (server_2,127.0.0.1,root,,,$SERVER_MYPORT_2)
--connect (server_3,127.0.0.1,root,,,$SERVER_MYPORT_3)
--connect (server_4,127.0.0.1,root,,,$SERVER_MYPORT_4)
--connection server_1
create database a;
use a;
create table t1(a int)engine=innodb;
insert into t1 values(1);
insert into t1 values(1);
insert into t1 values(1);
insert into t1 values(1);
insert into t1 values(1);
--save_master_pos
--connection server_2
create database b;
use b;
create table t1(a int)engine=innodb;
insert into t1 values(1);
insert into t1 values(1);
insert into t1 values(1);
insert into t1 values(1);
insert into t1 values(1);
--save_master_pos
--connection server_3
create database c;
use c;
create table t1(a int)engine=innodb;
insert into t1 values(1);
insert into t1 values(1);
insert into t1 values(1);
insert into t1 values(1);
insert into t1 values(1);
insert into t1 values(1);
--save_master_pos
--connection server_4
--disable_warnings
--replace_result $SERVER_MYPORT_1 MYPORT_1
eval change master 'm1' to master_port=$SERVER_MYPORT_1 , master_host='127.0.0.1', master_user='root';
--replace_result $SERVER_MYPORT_2 MYPORT_2
eval change master 'm2' to master_port=$SERVER_MYPORT_2 , master_host='127.0.0.1', master_user='root';
--replace_result $SERVER_MYPORT_3 MYPORT_3
eval change master to master_port=$SERVER_MYPORT_3 , master_host='127.0.0.1', master_user='root';
SET GLOBAL slave_parallel_threads=10;
set global slave_parallel_mode=optimistic;
start all slaves;
set default_master_connection = 'm1';
--source include/wait_for_slave_to_start.inc
set default_master_connection = 'm2';
--source include/wait_for_slave_to_start.inc
set default_master_connection = '';
--source include/wait_for_slave_to_start.inc
--echo #CleanUp
--connection server_1
drop database a;
--save_master_pos
--connection server_2
drop database b;
--save_master_pos
--connection server_3
drop database c;
--save_master_pos
--connection server_4
--sync_with_master 0,'m1'
--sync_with_master 0,'m2'
--sync_with_master 0,''
--disable_warnings
stop all slaves;
--enable_warnings
SET default_master_connection = "m1";
--source include/wait_for_slave_to_stop.inc
SET default_master_connection = "m2";
--source include/wait_for_slave_to_stop.inc
SET default_master_connection = "";
--source include/wait_for_slave_to_stop.inc
SET GLOBAL slave_parallel_threads=0;
set global slave_parallel_mode=conservative;
include/master-slave.inc
[connection master]
connection slave;
stop slave;
SET GLOBAL slave_parallel_threads=10;
set global slave_parallel_mode=optimistic;
start slave;
connection master;
set global binlog_split_alter=true;
create table t1( a int primary key, b int) engine=myisam;
create table t2( a int primary key, b int) engine=myisam;
connect con1,localhost,root,,;
connect con2,localhost,root,,;
connection con1;
alter table t1 add column c int;;
connection con2;
alter table t2 add column c int;;
connection con1;
connection con2;
create table t3( a int primary key, b int) engine=innodb;
connection slave;
show create table t1;
Table Create Table
t1 CREATE TABLE `t1` (
`a` int(11) NOT NULL,
`b` int(11) DEFAULT NULL,
`c` int(11) DEFAULT NULL,
PRIMARY KEY (`a`)
) ENGINE=MyISAM DEFAULT CHARSET=latin1
connection master;
drop table t1,t2,t3;
set global binlog_split_alter= false;
connection slave;
stop slave;
SET GLOBAL slave_parallel_threads=0;
set global slave_parallel_mode=conservative;
start slave;
include/rpl_end.inc
--source include/have_binlog_format_row.inc
--source include/have_innodb.inc
--source include/master-slave.inc
--connection slave
stop slave;
SET GLOBAL slave_parallel_threads=10;
set global slave_parallel_mode=optimistic;
start slave;
--connection master
create table t1( a int primary key, b int) engine=innodb;
#insert into t1 values(1,1),(2,2);
#create table t2( a int primary key, b int) engine=innodb;
#insert into t2 values(1,1),(2,2);
#create table t3( a int primary key, b int) engine=innodb;
#insert into t3 values(1,1),(2,2);
#create table t4( a int primary key, b int) engine=innodb;
#insert into t4 values(1,1),(2,2);
#create table t5( a int primary key, b int) engine=innodb;
#insert into t5 values(1,1),(2,2);
alter table t1 add column c int, force, algorithm=inplace;
#alter table t2 add column c int, force, algorithm=inplace;
#alter table t3 add column c int, force, algorithm=inplace;
#alter table t4 add column c int, force, algorithm=inplace;
#alter table t5 add column c int, force, algorithm=inplace;
show binlog events;
#show create table t1;
--connection slave
--sleep 10
show binlog events;
--sleep 10000
--connection master
drop table t1,t2,t3,t4,t5;
--connection slave
stop slave;
SET GLOBAL slave_parallel_threads=0;
set global slave_parallel_mode=conservative;
start slave;
--source include/rpl_end.inc
--source include/have_binlog_format_row.inc
--source include/have_innodb.inc
--source include/master-slave.inc
--connection slave
stop slave;
SET GLOBAL slave_parallel_threads=10;
set global slave_parallel_mode=optimistic;
start slave;
--connection master
create table t1( a int primary key, b int) engine=innodb;
insert into t1 values(1,1),(2,2);
alter table t1 add column c int, force, algorithm=inplace;
show binlog events;
show create table t1;
--connection slave
--sleep 10000
--connection master
drop table t1,t2,t3,t4,t5;
--connection slave
stop slave;
SET GLOBAL slave_parallel_threads=0;
set global slave_parallel_mode=conservative;
start slave;
--source include/rpl_end.inc
--source include/have_binlog_format_row.inc
--source include/have_innodb.inc
--source include/master-slave.inc
--connection slave
stop slave;
SET GLOBAL slave_parallel_threads=4;
set global slave_parallel_mode=optimistic;
start slave;
--connection master
create table t1( a int, b int) engine=innodb;
insert into t1 values(1,1);
insert into t1 values(1,1);
insert into t1 values(1,1);
insert into t1 values(1,1);
insert into t1 values(1,1);
insert into t1 values(1,1);
insert into t1 values(1,1);
insert into t1 values(1,1);
insert into t1 values(1,1);
insert into t1 values(1,1);
insert into t1 values(1,1);
insert into t1 values(1,1);
insert into t1 values(1,1);
insert into t1 values(1,1);
insert into t1 values(1,1);
show binlog events;
#show create table t1;
--connection slave
--sleep 10000
--connection master
drop table t1,t2,t3,t4,t5;
--connection slave
stop slave;
SET GLOBAL slave_parallel_threads=0;
set global slave_parallel_mode=conservative;
start slave;
--source include/rpl_end.inc
......@@ -10,33 +10,35 @@ start slave;
--connection master
set global binlog_split_alter=true;
create table t1( a int primary key, b int) engine=innodb;
create table t2( a int primary key, b int) engine=innodb;
connect (con1,localhost,root,,);
connect (con2,localhost,root,,);
--connection con1
--send alter table t1 add column c int, force, algorithm=inplace;
#--connection con2
#--send alter table t2 add column c int, force, algorithm=inplace;
--connection con2
--send alter table t2 add column c int, force, algorithm=inplace;
#--connection con1
#--reap
#--connection con2
#--reap
#create table t3( a int primary key, b int) engine=innodb;
#--source include/kill_mysqld.inc
#--let $rpl_server_number= 1
#--source include/rpl_stop_server.inc
--connection con1
--reap
--connection con2
--reap
create table t3( a int primary key, b int) engine=innodb;
#show binlog events;
show binlog events;
--sleep 5
--sleep 10
--connection slave
stop slave;
show binlog events;
show create table t1;
#--sleep 30
#show binlog events;
--sleep 60000
--connection master
drop table t1,t2;
drop table t1,t2,t3;
set global binlog_split_alter= false;
--connection slave
stop slave;
......
--source include/have_binlog_format_row.inc
--source include/have_innodb.inc
--source include/master-slave.inc
--connection slave
stop slave;
SET GLOBAL slave_parallel_threads=10;
set global slave_parallel_mode=optimistic;
start slave;
--connection master
set global binlog_split_alter=true;
create table t1( a int primary key, b int) engine=myisam;
create table t2( a int primary key, b int) engine=myisam;
connect (con1,localhost,root,,);
connect (con2,localhost,root,,);
--connection con1
--send alter table t1 add column c int;
--connection con2
--send alter table t2 add column c int;
#--source include/kill_mysqld.inc
#--let $rpl_server_number= 1
#--source include/rpl_stop_server.inc
--connection con1
--reap
--connection con2
--reap
create table t3( a int primary key, b int) engine=innodb;
#show binlog events;
--sleep 5
--connection slave
show create table t1;
#show binlog events;
--connection master
drop table t1,t2,t3;
set global binlog_split_alter= false;
--connection slave
stop slave;
SET GLOBAL slave_parallel_threads=0;
set global slave_parallel_mode=conservative;
start slave;
--source include/rpl_end.inc
--source include/have_binlog_format_row.inc
--source include/have_innodb.inc
--source include/master-slave.inc
--source include/have_debug.inc
--connection slave
SET @old_debug_slave= @@global.debug;
set global debug_dbug="+d,start_alter_delay_slave";
stop slave;
SET GLOBAL slave_parallel_threads=10;
set global slave_parallel_mode=optimistic;
start slave;
#
# SLAVE Shutdown
#
#
--connection master
SET @old_debug_master= @@global.debug;
set global debug_dbug="+d,start_alter_delay_master";
set global binlog_split_alter=true;
create table t1( a int primary key, b int) engine=myisam;
create table t2( a int primary key, b int) engine=myisam;
connect (con1,localhost,root,,);
connect (con2,localhost,root,,);
--connection con1
--send alter table t1 add column c int;
--connection con2
--send alter table t2 add column c int;
#Kill Slave
--connection slave
--sleep 1
#select @@gtid_slave_pos;
#select master_gtid_wait('0-1-3');
#select @@gtid_binlog_state;
show binlog events;
#--let $rpl_server_number= 2
#--source include/rpl_stop_server.inc
--connection con1
--reap
--connection con2
--reap
create table t3( a int primary key, b int) engine=innodb;
--sleep 100000
show binlog events;
--sync_slave_with_master
show create table t1;
show binlog events;
--connection master
SET GLOBAL debug_dbug= @old_debug_master;
drop table t1,t2,t3;
set global binlog_split_alter= false;
--sync_slave_with_master
SET GLOBAL debug_dbug= @old_debug_slave;
stop slave;
SET GLOBAL slave_parallel_threads=0;
set global slave_parallel_mode=conservative;
start slave;
--source include/rpl_end.inc
--source include/have_binlog_format_row.inc
--source include/have_innodb.inc
--source include/master-slave.inc
--source include/have_debug.inc
--connection slave
SET @old_debug_slave= @@global.debug;
set global debug_dbug="+d,start_alter_delay_slave";
stop slave;
SET GLOBAL slave_parallel_threads=10;
set global slave_parallel_mode=optimistic;
start slave;
#
# SLAVE Shutdown
#
#
--connection master
SET @old_debug_master= @@global.debug;
set global debug_dbug="+d,start_alter_delay_master";
set global binlog_split_alter=true;
create table t1( a int primary key, b int) engine=myisam;
create table t2( a int primary key, b int) engine=myisam;
connect (con1,localhost,root,,);
connect (con2,localhost,root,,);
--connection con1
--send alter table t1 add column c int;
--connection con2
--send alter table t2 add column c int;
#Kill Slave
--connection slave
--sleep 3
#select @@gtid_slave_pos;
#select master_gtid_wait('0-1-3');
#select @@gtid_binlog_state;
show binlog events;
--let $rpl_server_number= 2
--source include/rpl_restart_server.inc
--connection con1
--reap
--connection con2
--reap
create table t3( a int primary key, b int) engine=innodb;
--sleep 100000
show binlog events;
--sync_slave_with_master
show create table t1;
show binlog events;
--connection master
SET GLOBAL debug_dbug= @old_debug_master;
drop table t1,t2,t3;
set global binlog_split_alter= false;
--sync_slave_with_master
SET GLOBAL debug_dbug= @old_debug_slave;
stop slave;
SET GLOBAL slave_parallel_threads=0;
set global slave_parallel_mode=conservative;
start slave;
--source include/rpl_end.inc
Just a prototype maybe be stupid too
but I have to try it na
Read view same time on master and slave gtid
create prepare point
So it will be enclosed like this
Start
Alter
End
Prepare { Gtid will be there}
Commit {Gtid will be there}
so Alter will be in gtid commit
So to proceed with prototype
look how xa prepare is sandwiched between gtid and commit
in short look xa prepare/coomit are recorded in binlog
then look when we create read view and then send the prepare event encapsulated
in gtid.
GTID_BLOCK
Gtid
XA START
QUERY_BLOCK/ROW_BLOCK
Query/Write/Dml
QUERY_LOG_EVENT BLOCK
XA END
XID_PREPARE_EVENT
XA Prepare
GTID_BLOCK
gtid
/TODO
XA COMMIT
Prototype
Wait for read view
create the prepapre event and send it
MASTER
W1
W2
A_1
W3
W4
W5
6A_2C
W7
W8
Slave
Prepare
W1
A_1
W2
W3
W4
W5
WA_2C
W7
W8
Slave Commit
W1
W2
A_1
W3
W4
W5
6A_2C
W7
W8
For now should be executed in TOI mode, no other exclusive stuff
XA START ALTER_THREAD_ID
ALTER
XA END
Prepare
Commit
New Design
Start 'Gtid_seq_no_constant' ' part_no_1' Command_1
Start 'Gtid_seq_no_constant' ' part_no_2' Command_2
.
.
.
.
Start 'Gtid_seq_no_constant' ' part_no_N' Command_N
Commit 'Gtid_seq_no_constant' 'part_no_N+1'
Gtid_seq_no_constant= Gtid_seq no of Command1
What about domain id ?
We can have something like this 'x-x-x' Whole gtid instead of seq_no
For the just stick with normal alter
START 'thread_id' ALTER
COMMIT 'thred_id'
>>>>Replace thread_id with starting gtid and partno
Next stage artifical commit
Slave t.1
start ALTER
Complete prepare phase
SEND PREPARE N
WAIT FOR COMMIt/ROLLBACK N Signal
COMMIT/ROLLBACK
Slave t.2
Commit N
Wait for PREPARE N
Send COMMIT N
ROLLABACK
Wait for PREPARE N
SEND ROLLBACK N Signal
No nothing like that , Start alter should commit, But we will preserve thread context
Lets assume simple case
M S
| |
S.A(t.1) |
| S.A(thread_1)
C(t.1) Waiting
| C(thread_2)
So Something Like this
On Slave
Parent thread
|
S.A
Prepare Alter
Fork Slave Thread
| |
Do fake Commit Wait_of_commit signal N
| Do Commit N
Exit
On MASTER
START ALTER......
COMMIT ALTER....
ROLLBACK ALTER ....
On Slave
START ALTER
COMMIT ALTER...
COMMIT query_id ALTER (Will be issued by DBA)
ROLLBACK ALTER...
ROLLBACK query_id ALTER (Will be issued by DBA)
W.SA W.C/R
SA |
Global Str flg SA(channel_name + id) |
SIGNAL |
WAIT |
| check global struc (channel_name + id)
| Signal
COMMIT
GLOBAL ARRAY of size worker num (wrt to channel)
[0][0][0][0][1][1][1][0][0] -1, 0, error
W..W..W........W........
START ALTER STRUCTURE
thread_id(key)
status WAITING, ROLLBACK, COMMIT
error 0, UINT32_MAX
void THD::lock_temporary_table(TABLE *table)
do_gco_wait
mark_start_commit_no_lock
change the start alter to commit/rollback , As given by master.
______________________________________________________________________
drop table t1;
create table t1 (id int primary key auto_increment, a int, b int) engine=myisam;
insert into t1 values(null, 1,1);
insert into t1(a,b) select a,b from t1;
insert into t1(a,b) select a,b from t1;
insert into t1(a,b) select a,b from t1;
insert into t1(a,b) select a,b from t1;
insert into t1(a,b) select a,b from t1;
insert into t1(a,b) select a,b from t1;
insert into t1(a,b) select a,b from t1;
insert into t1(a,b) select a,b from t1;
insert into t1(a,b) select a,b from t1;
insert into t1(a,b) select a,b from t1;
insert into t1(a,b) select a,b from t1;
insert into t1(a,b) select a,b from t1;
insert into t1(a,b) select a,b from t1;
insert into t1(a,b) select a,b from t1;
insert into t1(a,b) select a,b from t1;
insert into t1(a,b) select a,b from t1;
insert into t1(a,b) select a,b from t1;
insert into t1(a,b) select a,b from t1;
insert into t1(a,b) select a,b from t1;
insert into t1(a,b) select a,b from t1;
insert into t1(a,b) select a,b from t1;
shutdown;
alter table t1 add column d int default (a+b);shutdown;
alter table t1 add column d int default 1 , force, algorithm=inplace;shutdown
alter table t1 add column f1 int unique default(id+10);
shutdown;
create table t1( id int unique) engine =innodb;
T1 = insert into t1 values(1);
T2 = insert into t1 values(2);
T3 = alter table t1 drop index id;
T4 = insert into t1 values(2);
T1 T1
DML DML
DML DML
DML DML
DML ALTER
DML DML
DML DML
DML DML
ALTER DML
DML DML
DML DML
Make it FL_TRANSACTIONAL(SA)
Done , now we have crash
SLAVE side
(notify means calling mark_commit_done and wait_subsequent_commits)
time ----->
t1 SA(notify) ---Work--- Wait_for_master
t2 SA(notify) ---Work--- Wait_for_master
t3 SA(notify) ---Work--- Wait_for_master
t4 SA(notify) ---Work--- Wait_for_master
C(t1) (WAIT to COMMIT)
C(t2) (WAIT to COMMIT)
C(t3) (WAIT to COMMIT)
C(t4) (WAIT to COMMIT)
Do one thing , debug this
Worker 4
inserts 20 , insert with sleep of 500
State
waiting
commit/rollback
commited/rollbacked
rpl_parallel_add_extra_worker()
thd::transaction add START_ALTER
Next sub id
(gdb) bt
#0 event_group_new_gtid (rgi=0x7fffb0031eb0, gev=0x7fffb003bbb0) at /home/sachin/10.5/server/sql/rpl_rli.cc:2135
#1 0x0000555555fed12f in rpl_parallel_thread::get_rgi (this=0x7fffb00295b8, rli=0x5555586f3898, gtid_ev=0x7fffb003bbb0, e=0x7fffb00314e0, event_size=42) at /home/sachin/10.5/server/sql/rpl_parallel.cc:1907
#2 0x0000555555fef1f7 in rpl_parallel::do_event (this=0x5555586f6ab0, serial_rgi=0x7fffb00008d0, ev=0x7fffb003bbb0, event_size=42) at /home/sachin/10.5/server/sql/rpl_parallel.cc:2745
#3 0x0000555555cc249c in exec_relay_log_event (thd=0x7fffb0001690, rli=0x5555586f3898, serial_rgi=0x7fffb00008d0) at /home/sachin/10.5/server/sql/slave.cc:4381
#4 0x0000555555cc60aa in handle_slave_sql (arg=0x5555586f1bd0) at /home/sachin/10.5/server/sql/slave.cc:5633
#5 0x00005555564bf70c in pfs_spawn_thread (arg=0x7fffa401b360) at /home/sachin/10.5/server/storage/perfschema/pfs.cc:1862
#6 0x00007ffff6e0e6db in start_thread (arg=0x7ffff0191700) at pthread_create.c:463
#7 0x00007ffff5ff488f in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95
(gdb)
rpl_parallel.cc 1119
Now shutdown/stop slave
Start alter Waiting for signal --- Do nothing
COMMIT/ROLLBACK signal recieved --do the work and shutdown
TEST CASE
MASTER KILLED
_WORKING_
SA CA .... SA _SDM_ CA
//ISSUE
SA SA C/R Alter
SLAVE KILLED
SA CA .... SA _SDS_ CA
RESTART
DIFFERENT type of alter algoirthm
(INNODB)
(MYISAm)
and other storage engine
inc_group_relay_log_pos
......@@ -1618,6 +1618,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
}
set_thd_db(thd, rpl_filter, db, db_len);
thd->master_log_pos= this->log_pos;
/*
Setting the character set and collation of the current database thd->db.
......
......@@ -352,6 +352,7 @@ class Master_info : public Slave_reporting_capability
};
enum start_alter_state
{
REGISTERED, // Start Alter exist
WAITING, // WAITING for commit/rollback
COMMIT_ALTER, // COMMIT the alter
ROLLBACK_ALTER, // Rollback the alter
......
......@@ -5826,6 +5826,30 @@ pthread_handler_t handle_slave_sql(void *arg)
return 0; // Avoid compiler warnings
}
/*
Handle start Alter
*/
pthread_handler_t handle_start_slave(void *arg)
{
THD *thd; /* needs to be first for thread_stack */
Master_info *mi= ((Master_info*)arg);
// needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
my_thread_init();
DBUG_ENTER("handle_start_slave");
serial_rgi= new rpl_group_info(rli);
thd = new THD(next_thread_id()); // note that contructor of THD uses DBUG_ !
thd->thread_stack = (char*)&thd; // remember where our stack is
delete thd;
DBUG_LEAVE; // Must match DBUG_ENTER()
my_thread_end();
ERR_remove_state(0);
pthread_exit(0);
return 0; // Avoid compiler warnings
}
/*
process_io_create_file()
......
......@@ -631,6 +631,7 @@ THD::THD(my_thread_id id, bool is_wsrep_applier)
:Statement(&main_lex, &main_mem_root, STMT_CONVENTIONAL_EXECUTION,
/* statement id */ 0),
rli_fake(0), rgi_fake(0), rgi_slave(NULL),
rpt(NULL), master_log_pos(0),
protocol_text(this), protocol_binary(this),
m_current_stage_key(0),
in_sub_stmt(0), log_all_errors(0),
......
......@@ -2200,6 +2200,7 @@ class THD: public THD_count, /* this must be first */
/* Slave applier execution context */
rpl_group_info* rgi_slave;
rpl_parallel_thread *rpt;
ulong master_log_pos;
union {
rpl_io_thread_info *rpl_io_info;
......
......@@ -5642,42 +5642,42 @@ mysql_execute_command(THD *thd)
Master_info *mi= thd->rgi_slave->rli->mi;
start_alter_info *info=NULL;
uint count= 0;
mysql_mutex_lock(&mi->start_alter_list_lock);
List_iterator<start_alter_info> info_iterator(mi->start_alter_list);
while (1)
while ((info= info_iterator++))
{
info_iterator.rewind();
count= 0;
while ((info= info_iterator++))
{
if (!info)
break;
count++;
if(info->thread_id == thd->lex->previous_commit_id)
{
// I dont need mutex lock here
info->state= start_alter_state::COMMIT_ALTER;
info->seq_no= thd->variables.gtid_seq_no;
mysql_cond_broadcast(&mi->start_alter_cond);
info_iterator.remove();
break;
}
}
if (!info || info->thread_id != thd->lex->previous_commit_id)
count++;
if(info->thread_id == thd->lex->previous_commit_id)
{
mysql_mutex_lock(&mi->start_alter_list_lock);
mysql_cond_wait(&mi->start_alter_list_cond, &mi->start_alter_list_lock);
mysql_mutex_unlock(&mi->start_alter_list_lock);
}
else
info_iterator.remove();
break;
}
}
mysql_mutex_unlock(&mi->start_alter_list_lock);
if (!info || info->thread_id != thd->lex->previous_commit_id)
{
//error handeling
DBUG_ASSERT(lex->m_sql_cmd != NULL);
res= lex->m_sql_cmd->execute(thd);
DBUG_PRINT("result", ("res: %d killed: %d is_error: %d",
res, thd->killed, thd->is_error()));
break;
}
//thd->rgi_slave->mark_start_commit();
//thd->wakeup_subsequent_commits(0);
/*
Wait for other thread to commit/rollback the alter
*/
start_alter_state can be either ::REGISTERED or ::WAITING
*/
mysql_mutex_lock(&mi->start_alter_lock);
while(info->state <= start_alter_state:: ROLLBACK_ALTER )
while(info->state == start_alter_state::REGISTERED )
mysql_cond_wait(&mi->start_alter_cond, &mi->start_alter_lock);
mysql_mutex_unlock(&mi->start_alter_lock);
mysql_mutex_lock(&mi->start_alter_lock);
info->state= start_alter_state::COMMIT_ALTER;
info->seq_no= thd->variables.gtid_seq_no;
mysql_mutex_unlock(&mi->start_alter_lock);
mysql_cond_broadcast(&mi->start_alter_cond);
// Wait for commit by worker thread
mysql_mutex_lock(&mi->start_alter_lock);
while(info->state <= start_alter_state::ROLLBACK_ALTER )
mysql_cond_wait(&mi->start_alter_cond, &mi->start_alter_lock);
mysql_mutex_unlock(&mi->start_alter_lock);
thd->rpt->__finish_event_group(thd->rgi_slave);
......
......@@ -80,7 +80,8 @@ static uint blob_length_by_type(enum_field_types type);
static bool fix_constraints_names(THD *thd, List<Virtual_column_info>
*check_constraint_list,
const HA_CREATE_INFO *create_info);
static bool write_start_alter(THD *thd, bool* partial_alter, char * send_query);
static bool write_start_alter(THD *thd, bool* partial_alter, char * send_query,
start_alter_info *info);
static bool wait_for_master(THD *thd, char* send_query, start_alter_info *info);
/**
......@@ -7675,10 +7676,10 @@ static bool mysql_inplace_alter_table(THD *thd,
// It's now safe to take the table level lock.
if (lock_tables(thd, table_list, alter_ctx->tables_opened, 0))
goto cleanup;
if (write_start_alter(thd, partial_alter, send_query))
if (write_start_alter(thd, partial_alter, send_query, info))
DBUG_RETURN(true);
if (thd->slave_thread && !strcmp("t1", table->alias.c_ptr()))
my_sleep(1000000000);
//if (thd->slave_thread && !strcmp("t1", table->alias.c_ptr()))
//my_sleep(1000000000);
DEBUG_SYNC(thd, "alter_table_inplace_after_lock_upgrade");
THD_STAGE_INFO(thd, stage_alter_inplace_prepare);
......@@ -9352,13 +9353,10 @@ static bool wait_for_master(THD *thd, char* send_query, start_alter_info* info)
{
char temp[thd->query_length()+ 10];
Master_info *mi= thd->rgi_slave->rli->mi;
mysql_mutex_lock(&mi->start_alter_list_lock);
info->error= 0;
info->thread_id= thd->lex->previous_commit_id;
mysql_mutex_lock(&mi->start_alter_lock);
info->state= start_alter_state::WAITING;
mi->start_alter_list.push_back(info, thd->mem_root);
mysql_mutex_unlock(&mi->start_alter_list_lock);
mysql_cond_broadcast(&mi->start_alter_list_cond);
mysql_mutex_unlock(&mi->start_alter_lock);
mysql_cond_broadcast(&mi->start_alter_cond);
strcpy(temp, thd->query());
char* alter_location= strcasestr(temp, "ALTER");
//issue here
......@@ -9383,9 +9381,6 @@ static bool wait_for_master(THD *thd, char* send_query, start_alter_info* info)
}
if (info->state == start_alter_state::COMMIT_ALTER)
{
sql_print_information("Setiya Elements %d wait_for_master commited id %d ", mi->start_alter_list.elements,
info->thread_id);
// thd->transaction.stmt.mark_trans_did_ddl();
thd->variables.gtid_seq_no= info->seq_no;
sprintf(send_query, "/*!100001 COMMIT %d */ %s", info->thread_id, alter_location);
......@@ -9400,7 +9395,8 @@ static bool wait_for_master(THD *thd, char* send_query, start_alter_info* info)
}
}
static bool write_start_alter(THD *thd, bool* partial_alter, char *send_query)
static bool write_start_alter(THD *thd, bool* partial_alter, char *send_query,
start_alter_info *info)
{
if (thd->lex->previous_commit_id)
{
......@@ -9416,8 +9412,22 @@ static bool write_start_alter(THD *thd, bool* partial_alter, char *send_query)
// /*
//*/
//Finish event group
Master_info *mi= thd->rgi_slave->rli->mi;
info->error= 0;
info->thread_id= thd->lex->previous_commit_id;
info->state= start_alter_state::REGISTERED;
mysql_mutex_lock(&mi->start_alter_list_lock);
mi->start_alter_list.push_back(info, thd->mem_root);
mysql_mutex_unlock(&mi->start_alter_list_lock);
//I think we dont need this
mysql_cond_broadcast(&mi->start_alter_list_cond);
thd->rgi_slave->rli->stmt_done(thd->master_log_pos, thd,
thd->rgi_slave);
thd->rpt->__finish_event_group(thd->rgi_slave);
thd->transaction.start_alter= false;
DBUG_EXECUTE_IF("start_alter_delay_slave", {
my_sleep(10000000);
});
return false;
}
else if (opt_binlog_split_alter)
......@@ -9434,6 +9444,9 @@ static bool write_start_alter(THD *thd, bool* partial_alter, char *send_query)
// thd->rgi_slave->mark_start_commit();
// thd->wakeup_subsequent_commits(0);
thd->transaction.start_alter= false;
DBUG_EXECUTE_IF("start_alter_delay_master", {
my_sleep(10000000);
});
return false;
}
return false;
......@@ -9488,6 +9501,9 @@ bool mysql_alter_table(THD *thd, const LEX_CSTRING *new_db,
char send_query[thd->query_length() + 20];
bool partial_alter= false;
start_alter_info *info= (start_alter_info *)thd->alloc(sizeof(start_alter_info));
Master_info *mi= NULL;
if (thd->slave_thread)
mi= thd->rgi_slave->rli->mi;
DBUG_ENTER("mysql_alter_table");
/*
......@@ -10272,10 +10288,10 @@ do_continue:;
MYSQL_LOCK_USE_MALLOC))
goto err_new_table_cleanup;
//If issues by binlog/master complete the prepare phase of alter and then commit
if (write_start_alter(thd, &partial_alter ,send_query))
if (write_start_alter(thd, &partial_alter, send_query, info))
DBUG_RETURN(true);
if (thd->slave_thread && !strcmp("t1", table->alias.c_ptr()))
my_sleep(1000000000);
// if (thd->slave_thread && !strcmp("t1", table->alias.c_ptr()))
// my_sleep(1000000000);
if (ha_create_table(thd, alter_ctx.get_tmp_path(),
alter_ctx.new_db.str, alter_ctx.new_name.str,
create_info, &frm))
......@@ -10386,8 +10402,9 @@ do_continue:;
{
if(write_bin_log(thd, FALSE, send_query, strlen(send_query), true))
DBUG_RETURN(true);
mysql_mutex_lock(&mi->start_alter_lock);
info->state= start_alter_state::COMMITTED_ALTER;
Master_info *mi= thd->rgi_slave->rli->mi;
mysql_mutex_unlock(&mi->start_alter_lock);
mysql_cond_broadcast(&mi->start_alter_cond);
}
/* We don't replicate alter table statement on temporary tables */
......@@ -10590,8 +10607,9 @@ do_continue:;
{
if(write_bin_log(thd, FALSE, send_query, strlen(send_query), true))
DBUG_RETURN(true);
mysql_mutex_lock(&mi->start_alter_lock);
info->state= start_alter_state::COMMITTED_ALTER;
Master_info *mi= thd->rgi_slave->rli->mi;
mysql_mutex_unlock(&mi->start_alter_lock);
mysql_cond_broadcast(&mi->start_alter_cond);
}
else if (write_bin_log(thd, true, thd->query(), thd->query_length()))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment