Commit 5a867d84 authored by Nikita Malyavin's avatar Nikita Malyavin Committed by Sergei Golubchik

Online alter: savepoints

parent b2be2e39
......@@ -459,10 +459,6 @@ alter table t1 add b int NULL, algorithm= copy, lock= none;
connection con2;
update t1 set a= 123 where a = 1;
savepoint whoopsie;
ERROR 42000: Cannot set up a savepoint while online ALTER TABLE is in progress
show warnings;
Level Code Message
Error 1235 Cannot set up a savepoint while online ALTER TABLE is in progress
rollback to savepoint savie;
commit;
set debug_sync= 'now SIGNAL end';
......@@ -475,7 +471,58 @@ select * from t2;
a
1
222
create or replace table t1 (a int);
insert t1 values (1), (2);
create or replace table t2 (a int);
insert t2 values (1), (2);
create or replace table t3 (a int) engine=myisam;
insert t3 values (1);
connection con2;
begin;
update t2 set a= 222 where a = 2;
savepoint savie;
update t2 set a= 111 where a = 1;
set debug_sync= 'now WAIT_FOR ended';
connection default;
set debug_sync= 'alter_table_copy_end SIGNAL ended WAIT_FOR end';
alter table t1 add b int NULL, algorithm= copy, lock= none;
connection con2;
update t1 set a= 222 where a = 2;
savepoint whoopsie;
update t1 set a= 123 where a = 1;
insert t3 values (2);
select * from t1;
a
123
222
rollback to savepoint whoopsie;
Warnings:
Warning 1196 Some non-transactional changed tables couldn't be rolled back
select * from t1;
a
1
222
select * from t3;
a
1
2
commit;
set debug_sync= 'now SIGNAL end';
connection default;
select * from t1;
a b
1 NULL
222 NULL
select * from t2;
a
111
222
select * from t3;
a
1
2
# Cleanup
set debug_sync= 'reset';
drop table t1;
drop table t2;
drop table t3;
......@@ -601,9 +601,7 @@ alter table t1 add b int NULL, algorithm= copy, lock= none;
--reap
update t1 set a= 123 where a = 1;
--error ER_NOT_SUPPORTED_YET
savepoint whoopsie;
show warnings;
rollback to savepoint savie;
commit;
......@@ -616,7 +614,55 @@ set debug_sync= 'now SIGNAL end';
select * from t1;
select * from t2;
create or replace table t1 (a int);
insert t1 values (1), (2);
create or replace table t2 (a int);
insert t2 values (1), (2);
create or replace table t3 (a int) engine=myisam;
insert t3 values (1);
--connection con2
begin;
update t2 set a= 222 where a = 2;
savepoint savie;
update t2 set a= 111 where a = 1;
--send
set debug_sync= 'now WAIT_FOR ended';
--connection default
set debug_sync= 'alter_table_copy_end SIGNAL ended WAIT_FOR end';
--send
alter table t1 add b int NULL, algorithm= copy, lock= none;
--connection con2
--reap
update t1 set a= 222 where a = 2;
savepoint whoopsie;
update t1 set a= 123 where a = 1;
insert t3 values (2);
select * from t1;
rollback to savepoint whoopsie;
select * from t1;
select * from t3;
commit;
set debug_sync= 'now SIGNAL end';
--connection default
--reap
select * from t1;
select * from t2;
select * from t3;
--echo # Cleanup
set debug_sync= 'reset';
drop table t1;
drop table t2;
drop table t3;
......@@ -2994,13 +2994,6 @@ int ha_savepoint(THD *thd, SAVEPOINT *sv)
Ha_trx_info *ha_info= trans->ha_list;
DBUG_ENTER("ha_savepoint");
if (!thd->online_alter_cache_list.empty())
{
my_printf_error(ER_NOT_SUPPORTED_YET, "Cannot set up a savepoint while "
"online ALTER TABLE is in progress", MYF(0));
DBUG_RETURN(1);
}
for (; ha_info; ha_info= ha_info->next())
{
int err;
......
......@@ -279,7 +279,7 @@ void make_default_log_name(char **out, const char* log_ext, bool once)
class binlog_cache_data: public ilist_node<>
{
public:
binlog_cache_data(): share(0), m_pending(0), status(0),
binlog_cache_data(): share(0), sv_list(0), m_pending(0), status(0),
before_stmt_pos(MY_OFF_T_UNDEF),
incident(FALSE),
saved_max_binlog_cache_size(0), ptr_binlog_cache_use(0),
......@@ -420,6 +420,7 @@ class binlog_cache_data: public ilist_node<>
IO_CACHE cache_log;
TABLE_SHARE *share; // for online alter table
SAVEPOINT *sv_list;
private:
/*
Pending binrows event. This event is the event where the rows are currently
......@@ -2607,6 +2608,9 @@ static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv)
int error= 1;
DBUG_ENTER("binlog_savepoint_set");
if (!mysql_bin_log.is_open() && !thd->online_alter_cache_list.empty())
DBUG_RETURN(0);
char buf[1024];
String log_query(buf, sizeof(buf), &my_charset_bin);
......@@ -2639,8 +2643,8 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
{
DBUG_ENTER("binlog_savepoint_rollback");
if (!thd->online_alter_cache_list.empty())
binlog_online_alter_rollback(thd, !thd->in_sub_stmt);
if (!mysql_bin_log.is_open() && !thd->online_alter_cache_list.empty())
DBUG_RETURN(0);
/*
Write ROLLBACK TO SAVEPOINT to the binlog cache if we have updated some
......@@ -7740,6 +7744,61 @@ static void binlog_online_alter_rollback(THD *thd, bool all)
#endif // HAVE_REPLICATION
}
SAVEPOINT** find_savepoint_in_list(THD *thd, LEX_CSTRING name,
SAVEPOINT ** const list);
SAVEPOINT* savepoint_add(THD *thd, LEX_CSTRING name, SAVEPOINT **list,
int (*release_old)(THD*, SAVEPOINT*));
int online_alter_savepoint_set(THD *thd, LEX_CSTRING name)
{
DBUG_ENTER("binlog_online_alter_savepoint");
#ifdef HAVE_REPLICATION
if (thd->online_alter_cache_list.empty())
DBUG_RETURN(0);
if (savepoint_alloc_size < sizeof (SAVEPOINT) + sizeof(my_off_t))
savepoint_alloc_size= sizeof (SAVEPOINT) + sizeof(my_off_t);
for (auto &cache: thd->online_alter_cache_list)
{
if (cache.share->db_type()->savepoint_set == NULL)
continue;
SAVEPOINT *sv= savepoint_add(thd, name, &cache.sv_list, NULL);
if(unlikely(sv == NULL))
DBUG_RETURN(1);
my_off_t *pos= (my_off_t*)(sv+1);
*pos= cache.get_byte_position();
sv->prev= cache.sv_list;
cache.sv_list= sv;
}
#endif
DBUG_RETURN(0);
}
int online_alter_savepoint_rollback(THD *thd, LEX_CSTRING name)
{
DBUG_ENTER("online_alter_savepoint_rollback");
#ifdef HAVE_REPLICATION
for (auto &cache: thd->online_alter_cache_list)
{
if (cache.share->db_type()->savepoint_set == NULL)
continue;
SAVEPOINT **sv= find_savepoint_in_list(thd, name, &cache.sv_list);
// sv is null if savepoint was set up before online table was modified
my_off_t pos= *sv ? *(my_off_t*)(*sv+1) : 0;
cache.restore_savepoint(pos);
}
#endif
DBUG_RETURN(0);
}
/*
Write the contents of a cache to the binary log.
......
......@@ -1415,4 +1415,7 @@ int binlog_rollback_by_xid(handlerton *hton, XID *xid);
bool write_bin_log_start_alter(THD *thd, bool& partial_alter,
uint64 start_alter_id, bool log_if_exists);
int online_alter_savepoint_set(THD *thd, LEX_CSTRING name);
int online_alter_savepoint_rollback(THD *thd, LEX_CSTRING name);
#endif /* LOG_H */
......@@ -550,17 +550,17 @@ bool trans_rollback_stmt(THD *thd)
DBUG_RETURN(FALSE);
}
/* Find a named savepoint in the current transaction. */
static SAVEPOINT **
find_savepoint(THD *thd, LEX_CSTRING name)
/** Find a savepoint by name in a savepoint list */
SAVEPOINT** find_savepoint_in_list(THD *thd, LEX_CSTRING name,
SAVEPOINT ** const list)
{
SAVEPOINT **sv= &thd->transaction->savepoints;
SAVEPOINT **sv= list;
while (*sv)
{
if (system_charset_info->strnncoll(
(uchar *) name.str, name.length,
(uchar *) (*sv)->name, (*sv)->length) == 0)
(uchar *) name.str, name.length,
(uchar *) (*sv)->name, (*sv)->length) == 0)
break;
sv= &(*sv)->prev;
}
......@@ -568,6 +568,43 @@ find_savepoint(THD *thd, LEX_CSTRING name)
return sv;
}
/* Find a named savepoint in the current transaction. */
static SAVEPOINT **
find_savepoint(THD *thd, LEX_CSTRING name)
{
return find_savepoint_in_list(thd, name, &thd->transaction->savepoints);
}
SAVEPOINT* savepoint_add(THD *thd, LEX_CSTRING name, SAVEPOINT **list,
int (*release_old)(THD*, SAVEPOINT*))
{
DBUG_ENTER("savepoint_add");
SAVEPOINT **sv= find_savepoint_in_list(thd, name, list);
SAVEPOINT *newsv;
if (*sv) /* old savepoint of the same name exists */
{
newsv= *sv;
if (release_old){
int error= release_old(thd, *sv);
if (error)
DBUG_RETURN(NULL);
}
*sv= (*sv)->prev;
}
else if ((newsv= (SAVEPOINT *) alloc_root(&thd->transaction->mem_root,
savepoint_alloc_size)) == NULL)
{
my_error(ER_OUT_OF_RESOURCES, MYF(0));
DBUG_RETURN(NULL);
}
newsv->name= strmake_root(&thd->transaction->mem_root, name.str, name.length);
newsv->length= (uint)name.length;
DBUG_RETURN(newsv);
}
/**
Set a named transaction savepoint.
......@@ -581,7 +618,6 @@ find_savepoint(THD *thd, LEX_CSTRING name)
bool trans_savepoint(THD *thd, LEX_CSTRING name)
{
SAVEPOINT **sv, *newsv;
DBUG_ENTER("trans_savepoint");
if (!(thd->in_multi_stmt_transaction_mode() || thd->in_sub_stmt) ||
......@@ -591,23 +627,11 @@ bool trans_savepoint(THD *thd, LEX_CSTRING name)
if (thd->transaction->xid_state.check_has_uncommitted_xa())
DBUG_RETURN(TRUE);
sv= find_savepoint(thd, name);
SAVEPOINT *newsv= savepoint_add(thd, name, &thd->transaction->savepoints,
ha_release_savepoint);
if (*sv) /* old savepoint of the same name exists */
{
newsv= *sv;
ha_release_savepoint(thd, *sv);
*sv= (*sv)->prev;
}
else if ((newsv= (SAVEPOINT *) alloc_root(&thd->transaction->mem_root,
savepoint_alloc_size)) == NULL)
{
my_error(ER_OUT_OF_RESOURCES, MYF(0));
if (newsv == NULL)
DBUG_RETURN(TRUE);
}
newsv->name= strmake_root(&thd->transaction->mem_root, name.str, name.length);
newsv->length= (uint)name.length;
/*
if we'll get an error here, don't add new savepoint to the list.
......@@ -617,6 +641,10 @@ bool trans_savepoint(THD *thd, LEX_CSTRING name)
if (unlikely(ha_savepoint(thd, newsv)))
DBUG_RETURN(TRUE);
int error= online_alter_savepoint_set(thd, name);
if (unlikely(error))
DBUG_RETURN(error);
newsv->prev= thd->transaction->savepoints;
thd->transaction->savepoints= newsv;
......@@ -676,6 +704,8 @@ bool trans_rollback_to_savepoint(THD *thd, LEX_CSTRING name)
ER_WARNING_NOT_COMPLETE_ROLLBACK,
ER_THD(thd, ER_WARNING_NOT_COMPLETE_ROLLBACK));
res= res || online_alter_savepoint_rollback(thd, name);
thd->transaction->savepoints= sv;
if (res)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment