Commit 6f8b889a authored by Guilhem Bichot's avatar Guilhem Bichot

Fix for BUG#38018 INSERT SELECT ON DUPLICATE KEY UPDATE and LOAD DATA CONCURRENT REPLACE

used TL_WRITE_CONCURRENT_INSERT though they may update/delete a row. This could cause concurrent SELECTs
to see a changing table while the SELECT happens, or if the query was made of a group of SELECTs,
some SELECTs would see different versions of the table. And anyway versioning in Maria was so far coded
to support only insertions. REPLACE SELECT, INSERT VALUES ON DUPLICATE KEY UPDATE, LOAD DATA REPLACE
were ok.

mysql-test/r/maria2.result:
  result. Without the code fix, the assertion added to ha_maria::update_row() would fire twice.
mysql-test/t/maria2.test:
  test when INSERT ON DUPLICATE KEY UPDATE and LOAD DATA CONCURRENT REPLACE do update rows
storage/maria/ha_maria.cc:
  Assert that update_row and delete_row never see TL_WRITE_CONCURRENT_INSERT.
  INSERT SELECT ON DUPLICATE KEY UPDATE and LOAD DATA CONCURRENT REPLACE
  must upgrade TL_WRITE_CONCURRENT_INSERT to TL_WRITE because they may update/delete a row.
parent dddc6fd2
...@@ -16,3 +16,29 @@ check table t1 extended; ...@@ -16,3 +16,29 @@ check table t1 extended;
Table Op Msg_type Msg_text Table Op Msg_type Msg_text
test.t1 check status OK test.t1 check status OK
drop table t1; drop table t1;
create table t1(id int, s char(1), unique(s)) engine=maria;
insert into t1 values(1,"a") on duplicate key update t1.id=t1.id+1;
insert into t1 values(1,"a") on duplicate key update t1.id=t1.id+1;
insert into t1 select 1,"a" on duplicate key update t1.id=t1.id+1;
select * from t1;
id s
3 a
replace into t1 select 1,"a";
select * from t1;
id s
1 a
drop table t1;
create table t1 (pk int primary key, apk int unique, data int) engine=maria;
insert into t1 values (1, 1, 1), (4, 4, 4), (6, 6, 6);
load data concurrent infile '../std_data_ln/loaddata5.dat' replace into table t1 fields terminated by '' enclosed by '' ignore 1 lines (pk, apk);
select * from t1 order by pk;
pk apk data
1 1 1
3 4 NULL
5 6 NULL
load data infile '../std_data_ln/loaddata5.dat' replace into table t1 fields terminated by '' enclosed by '' ignore 1 lines (pk, apk);
select * from t1 order by pk;
pk apk data
1 1 1
3 4 NULL
5 6 NULL
...@@ -64,3 +64,23 @@ select count(*) from t1; ...@@ -64,3 +64,23 @@ select count(*) from t1;
select name from t1; select name from t1;
check table t1 extended; check table t1 extended;
drop table t1; drop table t1;
# test INSERT ON DUPLICATE KEY UPDATE
create table t1(id int, s char(1), unique(s)) engine=maria;
insert into t1 values(1,"a") on duplicate key update t1.id=t1.id+1;
insert into t1 values(1,"a") on duplicate key update t1.id=t1.id+1;
insert into t1 select 1,"a" on duplicate key update t1.id=t1.id+1;
select * from t1;
# test REPLACE SELECT
replace into t1 select 1,"a";
select * from t1;
drop table t1;
# test LOAD DATA INFILE REPLACE
create table t1 (pk int primary key, apk int unique, data int) engine=maria;
insert into t1 values (1, 1, 1), (4, 4, 4), (6, 6, 6);
load data concurrent infile '../std_data_ln/loaddata5.dat' replace into table t1 fields terminated by '' enclosed by '' ignore 1 lines (pk, apk);
select * from t1 order by pk;
load data infile '../std_data_ln/loaddata5.dat' replace into table t1 fields terminated by '' enclosed by '' ignore 1 lines (pk, apk);
select * from t1 order by pk;
...@@ -1961,6 +1961,7 @@ bool ha_maria::is_crashed() const ...@@ -1961,6 +1961,7 @@ bool ha_maria::is_crashed() const
int ha_maria::update_row(const uchar * old_data, uchar * new_data) int ha_maria::update_row(const uchar * old_data, uchar * new_data)
{ {
DBUG_ASSERT(file->lock.type != TL_WRITE_CONCURRENT_INSERT);
ha_statistic_increment(&SSV::ha_update_count); ha_statistic_increment(&SSV::ha_update_count);
if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE) if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
table->timestamp_field->set_time(); table->timestamp_field->set_time();
...@@ -1970,6 +1971,7 @@ int ha_maria::update_row(const uchar * old_data, uchar * new_data) ...@@ -1970,6 +1971,7 @@ int ha_maria::update_row(const uchar * old_data, uchar * new_data)
int ha_maria::delete_row(const uchar * buf) int ha_maria::delete_row(const uchar * buf)
{ {
DBUG_ASSERT(file->lock.type != TL_WRITE_CONCURRENT_INSERT);
ha_statistic_increment(&SSV::ha_delete_count); ha_statistic_increment(&SSV::ha_delete_count);
return maria_delete(file, buf); return maria_delete(file, buf);
} }
...@@ -2506,6 +2508,7 @@ THR_LOCK_DATA **ha_maria::store_lock(THD *thd, ...@@ -2506,6 +2508,7 @@ THR_LOCK_DATA **ha_maria::store_lock(THD *thd,
(lock_type == TL_IGNORE || file->lock.type == TL_UNLOCK)); (lock_type == TL_IGNORE || file->lock.type == TL_UNLOCK));
if (lock_type != TL_IGNORE && file->lock.type == TL_UNLOCK) if (lock_type != TL_IGNORE && file->lock.type == TL_UNLOCK)
{ {
const enum enum_sql_command sql_command= thd->lex->sql_command;
/* /*
We have to disable concurrent inserts for INSERT ... SELECT or We have to disable concurrent inserts for INSERT ... SELECT or
INSERT/UPDATE/DELETE with sub queries if we are using statement based INSERT/UPDATE/DELETE with sub queries if we are using statement based
...@@ -2514,24 +2517,33 @@ THR_LOCK_DATA **ha_maria::store_lock(THD *thd, ...@@ -2514,24 +2517,33 @@ THR_LOCK_DATA **ha_maria::store_lock(THD *thd,
*/ */
if (lock_type <= TL_READ_HIGH_PRIORITY && if (lock_type <= TL_READ_HIGH_PRIORITY &&
!thd->current_stmt_binlog_row_based && !thd->current_stmt_binlog_row_based &&
(thd->lex->sql_command != SQLCOM_SELECT && (sql_command != SQLCOM_SELECT &&
thd->lex->sql_command != SQLCOM_LOCK_TABLES) && sql_command != SQLCOM_LOCK_TABLES) &&
(thd->options & OPTION_BIN_LOG) && (thd->options & OPTION_BIN_LOG) &&
mysql_bin_log.is_open()) mysql_bin_log.is_open())
lock_type= TL_READ_NO_INSERT; lock_type= TL_READ_NO_INSERT;
else if (lock_type == TL_WRITE_CONCURRENT_INSERT && else if (lock_type == TL_WRITE_CONCURRENT_INSERT)
(file->state->records == 0))
{ {
const enum enum_duplicates duplicates= thd->lex->duplicates;
/* /*
Bulk insert may use repair, which will cause problems if other Explanation for the 3 conditions below, in order:
- Bulk insert may use repair, which will cause problems if other
threads try to read/insert to the table: disable versioning. threads try to read/insert to the table: disable versioning.
Note that our read of file->state->records is incorrect, as such Note that our read of file->state->records is incorrect, as such
variable may have changed when we come to start_bulk_insert() (worse variable may have changed when we come to start_bulk_insert() (worse
case: we see != 0 so allow versioning, start_bulk_insert() sees 0 and case: we see != 0 so allow versioning, start_bulk_insert() sees 0 and
uses repair). This is prevented because start_bulk_insert() will not uses repair). This is prevented because start_bulk_insert() will not
try repair if we enabled versioning. try repair if we enabled versioning.
- INSERT SELECT ON DUPLICATE KEY UPDATE comes here with
TL_WRITE_CONCURRENT_INSERT but shouldn't because it can do
update/delete of a row and versioning doesn't support that
- same for LOAD DATA CONCURRENT REPLACE.
*/ */
lock_type= TL_WRITE; if ((file->state->records == 0) ||
(sql_command == SQLCOM_INSERT_SELECT && duplicates == DUP_UPDATE) ||
(sql_command == SQLCOM_LOAD && duplicates == DUP_REPLACE))
lock_type= TL_WRITE;
} }
file->lock.type= lock_type; file->lock.type= lock_type;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment