Commit c003b362 authored by Sachin's avatar Sachin

MDEV-22777 Assertion `thd->rgi_slave' failed in process_commit_alter / process_rollback_alte

parent 148a4e38
...@@ -7943,3 +7943,5 @@ ER_WARN_HISTORY_ROW_START_TIME ...@@ -7943,3 +7943,5 @@ ER_WARN_HISTORY_ROW_START_TIME
eng "Table `%s.%s` history row start '%s' is later than row end '%s'" eng "Table `%s.%s` history row start '%s' is later than row end '%s'"
ER_PART_STARTS_BEYOND_INTERVAL ER_PART_STARTS_BEYOND_INTERVAL
eng "%`s: STARTS is later than query time, first history partition may exceed INTERVAL value" eng "%`s: STARTS is later than query time, first history partition may exceed INTERVAL value"
ER_MANUAL_SPLIT_ALTER
eng "Manual splitting of ALTER is not allowed"
...@@ -394,24 +394,27 @@ static int process_start_alter(THD *thd, uint64 thread_id) ...@@ -394,24 +394,27 @@ static int process_start_alter(THD *thd, uint64 thread_id)
} }
static int process_commit_alter(THD *thd, uint64 thread_id) static int process_commit_alter(THD *thd, uint64 thread_id)
{ {
DBUG_ASSERT(thd->rgi_slave); start_alter_info *info=NULL;
Master_info *mi= NULL;
// pseudo_slave_mode is ON
if (!thd->rgi_slave)
goto direct_commit;
DBUG_EXECUTE_IF("rpl_slave_stop_CA", { DBUG_EXECUTE_IF("rpl_slave_stop_CA", {
debug_sync_set_action(thd, debug_sync_set_action(thd,
STRING_WITH_LEN("now signal CA_1_processing WAIT_FOR proceed_CA_1")); STRING_WITH_LEN("now signal CA_1_processing WAIT_FOR proceed_CA_1"));
}); });
thd->gtid_flags3|= Gtid_log_event::FL_START_ALTER_E1; thd->gtid_flags3|= Gtid_log_event::FL_START_ALTER_E1;
Master_info *mi= thd->rgi_slave->rli->mi; mi= thd->rgi_slave->rli->mi;
start_alter_info *info=NULL;
uint count=0;
mysql_mutex_lock(&mi->start_alter_list_lock); mysql_mutex_lock(&mi->start_alter_list_lock);
List_iterator<start_alter_info> info_iterator(mi->start_alter_list);
while ((info= info_iterator++))
{ {
count++; List_iterator<start_alter_info> info_iterator(mi->start_alter_list);
if(info->thread_id == thread_id) while ((info= info_iterator++))
{ {
info_iterator.remove(); if(info->thread_id == thread_id)
break; {
info_iterator.remove();
break;
}
} }
} }
mysql_mutex_unlock(&mi->start_alter_list_lock); mysql_mutex_unlock(&mi->start_alter_list_lock);
...@@ -421,10 +424,7 @@ static int process_commit_alter(THD *thd, uint64 thread_id) ...@@ -421,10 +424,7 @@ static int process_commit_alter(THD *thd, uint64 thread_id)
//direct_commit_alter is used so that mysql_alter_table should not do //direct_commit_alter is used so that mysql_alter_table should not do
//unnecessary binlogging or spawn new thread because there is no start //unnecessary binlogging or spawn new thread because there is no start
//alter context //alter context
thd->direct_commit_alter= 1; goto direct_commit;
if (thd->open_temporary_tables(thd->lex->query_tables))
return START_ALTER_ERROR;
return START_ALTER_PARSE;
} }
/* /*
start_alter_state must be ::REGISTERED start_alter_state must be ::REGISTERED
...@@ -446,29 +446,37 @@ static int process_commit_alter(THD *thd, uint64 thread_id) ...@@ -446,29 +446,37 @@ static int process_commit_alter(THD *thd, uint64 thread_id)
if (write_bin_log(thd, true, thd->query(), thd->query_length())) if (write_bin_log(thd, true, thd->query(), thd->query_length()))
return START_ALTER_ERROR; return START_ALTER_ERROR;
return START_ALTER_SKIP; return START_ALTER_SKIP;
direct_commit:
thd->direct_commit_alter= 1;
if (thd->open_temporary_tables(thd->lex->query_tables))
return START_ALTER_ERROR;
return START_ALTER_PARSE;
} }
static int process_rollback_alter(THD *thd, uint64 thread_id) static int process_rollback_alter(THD *thd, uint64 thread_id)
{ {
DBUG_ASSERT(thd->rgi_slave);
Master_info *mi= thd->rgi_slave->rli->mi;
start_alter_info *info=NULL; start_alter_info *info=NULL;
Master_info *mi= NULL;
// pseudo_slave_mode is ON
if (!thd->rgi_slave)
goto write_binlog;
mi= thd->rgi_slave->rli->mi;
mysql_mutex_lock(&mi->start_alter_list_lock); mysql_mutex_lock(&mi->start_alter_list_lock);
List_iterator<start_alter_info> info_iterator(mi->start_alter_list);
while ((info= info_iterator++))
{ {
if(info->thread_id == thread_id) List_iterator<start_alter_info> info_iterator(mi->start_alter_list);
while ((info= info_iterator++))
{ {
info_iterator.remove(); if(info->thread_id == thread_id)
break; {
info_iterator.remove();
break;
}
} }
} }
mysql_mutex_unlock(&mi->start_alter_list_lock); mysql_mutex_unlock(&mi->start_alter_list_lock);
if (!info || info->thread_id != thread_id) if (!info || info->thread_id != thread_id)
{ {
//Just write the binlog because there is nothing to be done //Just write the binlog because there is nothing to be done
if (write_bin_log(thd, true, thd->query(), thd->query_length())) goto write_binlog;
return START_ALTER_ERROR;
return START_ALTER_SKIP;
} }
/* /*
start_alter_state must be ::REGISTERED start_alter_state must be ::REGISTERED
...@@ -487,6 +495,7 @@ static int process_rollback_alter(THD *thd, uint64 thread_id) ...@@ -487,6 +495,7 @@ static int process_rollback_alter(THD *thd, uint64 thread_id)
mysql_mutex_unlock(&mi->start_alter_lock); mysql_mutex_unlock(&mi->start_alter_lock);
mysql_cond_destroy(&info->start_alter_cond); mysql_cond_destroy(&info->start_alter_cond);
my_free(info); my_free(info);
write_binlog:
if (write_bin_log(thd, true, thd->query(), thd->query_length())) if (write_bin_log(thd, true, thd->query(), thd->query_length()))
return START_ALTER_ERROR; return START_ALTER_ERROR;
return START_ALTER_SKIP; return START_ALTER_SKIP;
...@@ -523,7 +532,6 @@ bool Sql_cmd_alter_table::execute(THD *thd) ...@@ -523,7 +532,6 @@ bool Sql_cmd_alter_table::execute(THD *thd)
ulong priv=0; ulong priv=0;
ulong priv_needed= ALTER_ACL; ulong priv_needed= ALTER_ACL;
bool result; bool result;
int alter_res= 0;
DBUG_ENTER("Sql_cmd_alter_table::execute"); DBUG_ENTER("Sql_cmd_alter_table::execute");
...@@ -644,26 +652,35 @@ bool Sql_cmd_alter_table::execute(THD *thd) ...@@ -644,26 +652,35 @@ bool Sql_cmd_alter_table::execute(THD *thd)
We will follow a different executation path if it is START ALTER We will follow a different executation path if it is START ALTER
or commit/rollback alter or commit/rollback alter
*/ */
switch (alter_info.alter_state) { // SA/CA will be processed only if we are slave or in pseduo_slave_mode
case Alter_info::ALTER_TABLE_START: if (alter_info.alter_state > Alter_info::ALTER_TABLE_NORMAL)
alter_res= process_start_alter(thd, alter_info.alter_identifier); {
break; if ( !thd->slave_thread && !thd->variables.pseudo_slave_mode)
case Alter_info::ALTER_TABLE_COMMIT: {
alter_res= process_commit_alter(thd, alter_info.alter_identifier); my_error(ER_MANUAL_SPLIT_ALTER, MYF(0));
break; DBUG_RETURN(TRUE);
case Alter_info::ALTER_TABLE_ROLLBACK: }
alter_res= process_rollback_alter(thd, alter_info.alter_identifier); int alter_res= 0;
break; switch (alter_info.alter_state)
case Alter_info::ALTER_TABLE_NORMAL: {
default: case Alter_info::ALTER_TABLE_START:
alter_res= START_ALTER_PARSE; alter_res= process_start_alter(thd, alter_info.alter_identifier);
break; break;
case Alter_info::ALTER_TABLE_COMMIT:
alter_res= process_commit_alter(thd, alter_info.alter_identifier);
break;
case Alter_info::ALTER_TABLE_ROLLBACK:
alter_res= process_rollback_alter(thd, alter_info.alter_identifier);
break;
default:
break;
}
if (alter_res == START_ALTER_SKIP)
DBUG_RETURN(FALSE);
else if (alter_res == START_ALTER_ERROR)
DBUG_RETURN(TRUE);
} }
if (alter_res == START_ALTER_SKIP) // START_ALTER_PARSE -->
DBUG_RETURN(FALSE);
else if (alter_res == START_ALTER_ERROR)
DBUG_RETURN(TRUE);
result= mysql_alter_table(thd, &select_lex->db, &lex->name, result= mysql_alter_table(thd, &select_lex->db, &lex->name,
&create_info, &create_info,
first_table, first_table,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment