Commit 39f46745 authored by Denis Protivensky's avatar Denis Protivensky Committed by Jan Lindström

MDEV-24623 Replicate bulk insert as table-level exclusive key

- introduce table key construction function in wsrep service interface
- don't add row keys when replicating bulk insert
- don't start bulk insert on applier or when transaction is not active
- don't start bulk insert on system versioned tables
- implement actual bulk insert table-level key replication
Reviewed-by: default avatarJan Lindström <jan.lindstrom@mariadb.com>
parent a10003bd
......@@ -69,6 +69,9 @@ extern struct wsrep_service_st {
void (*wsrep_thd_self_abort_func)(MYSQL_THD thd);
int (*wsrep_thd_append_key_func)(MYSQL_THD thd, const struct wsrep_key* key,
int n_keys, enum Wsrep_service_key_type);
int (*wsrep_thd_append_table_key_func)(MYSQL_THD thd, const char* db,
const char* table, enum Wsrep_service_key_type);
my_bool (*wsrep_thd_is_local_transaction)(const MYSQL_THD thd);
const char* (*wsrep_thd_client_state_str_func)(const MYSQL_THD thd);
const char* (*wsrep_thd_client_mode_str_func)(const MYSQL_THD thd);
const char* (*wsrep_thd_transaction_state_str_func)(const MYSQL_THD thd);
......@@ -121,6 +124,8 @@ extern struct wsrep_service_st {
#define wsrep_thd_is_local(T) wsrep_service->wsrep_thd_is_local_func(T)
#define wsrep_thd_self_abort(T) wsrep_service->wsrep_thd_self_abort_func(T)
#define wsrep_thd_append_key(T,W,N,K) wsrep_service->wsrep_thd_append_key_func(T,W,N,K)
#define wsrep_thd_append_table_key(T,D,B,K) wsrep_service->wsrep_thd_append_table_key_func(T,D,B,K)
#define wsrep_thd_is_local_transaction(T) wsrep_service->wsrep_thd_is_local_transaction_func(T)
#define wsrep_thd_client_state_str(T) wsrep_service->wsrep_thd_client_state_str_func(T)
#define wsrep_thd_client_mode_str(T) wsrep_service->wsrep_thd_client_mode_str_func(T)
#define wsrep_thd_transaction_state_str(T) wsrep_service->wsrep_thd_transaction_state_str_func(T)
......@@ -226,6 +231,13 @@ extern "C" int wsrep_thd_append_key(MYSQL_THD thd,
int n_keys,
enum Wsrep_service_key_type);
extern "C" int wsrep_thd_append_table_key(MYSQL_THD thd,
const char* db,
const char* table,
enum Wsrep_service_key_type);
extern "C" my_bool wsrep_thd_is_local_transaction(const MYSQL_THD thd);
extern const char* wsrep_sr_table_name_full;
extern "C" const char* wsrep_get_sr_table_name();
......
connection node_2;
connection node_1;
connection node_1;
CREATE TABLE t1 (f1 INTEGER PRIMARY KEY) ENGINE=InnoDB;
SET foreign_key_checks = 0;
SET unique_checks = 0;
START TRANSACTION;
connection node_2;
SET foreign_key_checks = 1;
SET unique_checks = 1;
INSERT INTO t1 VALUES (1001);
connection node_1;
COMMIT;
ERROR 40001: Deadlock found when trying to get lock; try restarting transaction
DROP TABLE t1;
connection node_1;
CREATE TABLE t1 (f1 INTEGER PRIMARY KEY) ENGINE=InnoDB;
START TRANSACTION;
connection node_2;
SET foreign_key_checks = 1;
SET unique_checks = 1;
START TRANSACTION;
INSERT INTO t1 VALUES (1001);
connection node_1;
COMMIT;
2
connection node_2;
COMMIT;
ERROR 40001: Deadlock found when trying to get lock; try restarting transaction
DROP TABLE t1;
#
# Test that bulk insert replicates as table-level exclusive key and
# rolls back properly if needed.
#
--source include/galera_cluster.inc
--source include/have_innodb.inc
#
# Make bulk insert BF-abort, but regular insert succeed.
#
--connection node_1
CREATE TABLE t1 (f1 INTEGER PRIMARY KEY) ENGINE=InnoDB;
# Disable foreign and unique key checks to allow bulk insert.
SET foreign_key_checks = 0;
SET unique_checks = 0;
START TRANSACTION;
--let $count=0
--disable_query_log
while ($count < 1000)
{
--eval INSERT INTO t1 VALUES ($count)
--inc $count
}
--enable_query_log
--connection node_2
# Disable bulk insert.
SET foreign_key_checks = 1;
SET unique_checks = 1;
# Insert a value out of the bulk insert range.
INSERT INTO t1 VALUES (1001);
--connection node_1
--error ER_LOCK_DEADLOCK
COMMIT;
DROP TABLE t1;
#
# Make bulk insert succeed, but regular insert BF-abort.
#
--connection node_1
CREATE TABLE t1 (f1 INTEGER PRIMARY KEY) ENGINE=InnoDB;
--let $before_bulk_keys = `SELECT VARIABLE_VALUE FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME = 'wsrep_repl_keys'`
START TRANSACTION;
--let $count=0
--disable_query_log
while ($count < 1000)
{
--eval INSERT INTO t1 VALUES ($count)
--inc $count
}
--enable_query_log
--connection node_2
# Disable bulk insert.
SET foreign_key_checks = 1;
SET unique_checks = 1;
START TRANSACTION;
# Insert a value out of the bulk insert range.
INSERT INTO t1 VALUES (1001);
--connection node_1
COMMIT;
# Expect two keys to be added for bulk insert: DB-level shared key and table-level exclusive key.
--let $bulk_keys_count = `SELECT VARIABLE_VALUE - $before_bulk_keys FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME = 'wsrep_repl_keys'`
--echo $bulk_keys_count
--connection node_2
--error ER_LOCK_DEADLOCK
COMMIT;
DROP TABLE t1;
......@@ -417,3 +417,23 @@ extern "C" void wsrep_thd_set_PA_unsafe(THD *thd)
WSREP_DEBUG("session does not have active transaction, can not mark as PA unsafe");
}
}
extern "C" int wsrep_thd_append_table_key(MYSQL_THD thd,
const char* db,
const char* table,
enum Wsrep_service_key_type key_type)
{
wsrep_key_arr_t key_arr = {0, 0};
int ret = wsrep_prepare_keys_for_isolation(thd, db, table, NULL, &key_arr);
ret = ret || wsrep_thd_append_key(thd, key_arr.keys,
(int)key_arr.keys_len, key_type);
wsrep_keys_free(&key_arr);
return ret;
}
extern "C" my_bool wsrep_thd_is_local_transaction(const THD *thd)
{
return (wsrep_thd_is_local(thd) &&
thd->wsrep_cs().transaction().active());
}
......@@ -162,6 +162,8 @@ static struct wsrep_service_st wsrep_handler = {
wsrep_thd_is_local,
wsrep_thd_self_abort,
wsrep_thd_append_key,
wsrep_thd_append_table_key,
wsrep_thd_is_local_transaction,
wsrep_thd_client_state_str,
wsrep_thd_client_mode_str,
wsrep_thd_transaction_state_str,
......
......@@ -101,6 +101,12 @@ void wsrep_thd_self_abort(THD *)
int wsrep_thd_append_key(THD *, const struct wsrep_key*, int, enum Wsrep_service_key_type)
{ return 0; }
int wsrep_thd_append_table_key(THD *, const char*, const char*, enum Wsrep_service_key_type)
{ return 0; }
my_bool wsrep_thd_is_local_transaction(const THD*)
{ return 0; }
const char* wsrep_thd_client_state_str(const THD*)
{ return 0; }
......
......@@ -8039,6 +8039,7 @@ ha_innobase::write_row(
#ifdef WITH_WSREP
if (!error_result && trx->is_wsrep()
&& !trx->is_bulk_insert()
&& wsrep_thd_is_local(m_user_thd)
&& !wsrep_thd_ignore_table(m_user_thd)
&& !wsrep_consistency_check(m_user_thd)
......@@ -10142,6 +10143,8 @@ wsrep_append_key(
(shared, exclusive, semi...) */
)
{
ut_ad(!trx->is_bulk_insert());
DBUG_ENTER("wsrep_append_key");
DBUG_PRINT("enter",
("thd: %lu trx: %lld", thd_get_thread_id(thd),
......
......@@ -2573,6 +2573,42 @@ but GCC 4.8.5 does not support pop_options. */
# pragma GCC optimize ("O0")
#endif
#ifdef WITH_WSREP
/** Start bulk insert operation for Galera by appending
table-level exclusive key for bulk insert.
@param trx transaction
@param index index
@retval false on success
@retval true on failure */
ATTRIBUTE_COLD static bool row_ins_wsrep_start_bulk(trx_t *trx, const dict_index_t &index)
{
char db_buf[NAME_LEN + 1];
char tbl_buf[NAME_LEN + 1];
ulint db_buf_len, tbl_buf_len;
if (!index.table->parse_name(db_buf, tbl_buf, &db_buf_len, &tbl_buf_len))
{
WSREP_ERROR("Parse_name for bulk insert failed: %s",
wsrep_thd_query(trx->mysql_thd));
trx->error_state = DB_ROLLBACK;
return true;
}
/* Append table-level exclusive key for bulk insert. */
const int rcode = wsrep_thd_append_table_key(trx->mysql_thd, db_buf,
tbl_buf, WSREP_SERVICE_KEY_EXCLUSIVE);
if (rcode)
{
WSREP_ERROR("Appending table key for bulk insert failed: %s, %d",
wsrep_thd_query(trx->mysql_thd), rcode);
trx->error_state = DB_ROLLBACK;
return true;
}
return false;
}
#endif
/***************************************************************//**
Tries to insert an entry into a clustered index, ignoring foreign key
constraints. If a record with the same unique key is found, the other
......@@ -2702,7 +2738,7 @@ row_ins_clust_index_entry_low(
&& !index->table->skip_alter_undo
&& !index->table->n_rec_locks
&& !index->table->is_active_ddl()
&& !trx->is_wsrep() /* FIXME: MDEV-24623 */
&& !index->table->versioned()
&& !thd_is_slave(trx->mysql_thd) /* FIXME: MDEV-24622 */) {
DEBUG_SYNC_C("empty_root_page_insert");
......@@ -2718,6 +2754,16 @@ row_ins_clust_index_entry_low(
goto skip_bulk_insert;
}
#ifdef WITH_WSREP
if (trx->is_wsrep())
{
if (!wsrep_thd_is_local_transaction(trx->mysql_thd))
goto skip_bulk_insert;
if (row_ins_wsrep_start_bulk(trx, *index))
goto err_exit;
}
#endif /* WITH_WSREP */
#ifdef BTR_CUR_HASH_ADAPT
if (btr_search_enabled) {
btr_search_x_lock_all();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment