Commit 32086353 authored by unknown's avatar unknown

MDEV-26: Global Transaction ID.

Improvements to record_gtid():

 - Check for correct table definition of mysql.rpl_slave_state
 - Use autocommit, to save one call to ha_commit_trans()
 - Slightly more efficient way to set table->write_set
 - Use ha_index_read_map() to locate rows to support any storage engine.
parent 1baa0a57
include/master-slave.inc
[connection master]
*** Test that we check against incorrect table definition for mysql.rpl_slave_state ***
CREATE TABLE t1(a INT PRIMARY KEY) ENGINE=InnoDB;
include/stop_slave.inc
ALTER TABLE mysql.rpl_slave_state CHANGE seq_no seq_no VARCHAR(20);
START SLAVE;
INSERT INTO t1 VALUES (1);
CALL mtr.add_suppression("Slave: Failed to open mysql.rpl_slave_state");
include/wait_for_slave_sql_error.inc [errno=1942]
include/stop_slave.inc
ALTER TABLE mysql.rpl_slave_state CHANGE seq_no seq_no BIGINT UNSIGNED NOT NULL;
ALTER TABLE mysql.rpl_slave_state DROP PRIMARY KEY;
ALTER TABLE mysql.rpl_slave_state ADD PRIMARY KEY (sub_id, domain_id);
START SLAVE;
include/wait_for_slave_sql_error.inc [errno=1942]
include/stop_slave.inc
ALTER TABLE mysql.rpl_slave_state DROP PRIMARY KEY;
START SLAVE;
include/wait_for_slave_sql_error.inc [errno=1942]
include/stop_slave.inc
ALTER TABLE mysql.rpl_slave_state ADD PRIMARY KEY (sub_id);
START SLAVE;
include/wait_for_slave_sql_error.inc [errno=1942]
include/stop_slave.inc
ALTER TABLE mysql.rpl_slave_state DROP PRIMARY KEY;
ALTER TABLE mysql.rpl_slave_state ADD PRIMARY KEY (domain_id, sub_id);
include/start_slave.inc
SELECT * FROM t1;
a
1
DROP TABLE t1;
include/rpl_end.inc
--source include/have_innodb.inc
--source include/master-slave.inc
--echo *** Test that we check against incorrect table definition for mysql.rpl_slave_state ***
--connection master
CREATE TABLE t1(a INT PRIMARY KEY) ENGINE=InnoDB;
--sync_slave_with_master
--connection slave
--source include/stop_slave.inc
ALTER TABLE mysql.rpl_slave_state CHANGE seq_no seq_no VARCHAR(20);
START SLAVE;
--connection master
INSERT INTO t1 VALUES (1);
--connection slave
CALL mtr.add_suppression("Slave: Failed to open mysql.rpl_slave_state");
--let $slave_sql_errno=1942
--source include/wait_for_slave_sql_error.inc
--source include/stop_slave.inc
ALTER TABLE mysql.rpl_slave_state CHANGE seq_no seq_no BIGINT UNSIGNED NOT NULL;
ALTER TABLE mysql.rpl_slave_state DROP PRIMARY KEY;
ALTER TABLE mysql.rpl_slave_state ADD PRIMARY KEY (sub_id, domain_id);
START SLAVE;
--let $slave_sql_errno=1942
--source include/wait_for_slave_sql_error.inc
--source include/stop_slave.inc
ALTER TABLE mysql.rpl_slave_state DROP PRIMARY KEY;
START SLAVE;
--let $slave_sql_errno=1942
--source include/wait_for_slave_sql_error.inc
--source include/stop_slave.inc
ALTER TABLE mysql.rpl_slave_state ADD PRIMARY KEY (sub_id);
START SLAVE;
--let $slave_sql_errno=1942
--source include/wait_for_slave_sql_error.inc
--source include/stop_slave.inc
ALTER TABLE mysql.rpl_slave_state DROP PRIMARY KEY;
ALTER TABLE mysql.rpl_slave_state ADD PRIMARY KEY (domain_id, sub_id);
--source include/start_slave.inc
--connection master
--sync_slave_with_master
--connection slave
SELECT * FROM t1;
--connection master
DROP TABLE t1;
--source include/rpl_end.inc
...@@ -6280,6 +6280,44 @@ rpl_slave_state::truncate_state_table(THD *thd) ...@@ -6280,6 +6280,44 @@ rpl_slave_state::truncate_state_table(THD *thd)
} }
static const TABLE_FIELD_TYPE mysql_rpl_slave_state_coltypes[4]= {
{ { C_STRING_WITH_LEN("domain_id") },
{ C_STRING_WITH_LEN("int(10) unsigned") },
{NULL, 0} },
{ { C_STRING_WITH_LEN("sub_id") },
{ C_STRING_WITH_LEN("bigint(20) unsigned") },
{NULL, 0} },
{ { C_STRING_WITH_LEN("server_id") },
{ C_STRING_WITH_LEN("int(10) unsigned") },
{NULL, 0} },
{ { C_STRING_WITH_LEN("seq_no") },
{ C_STRING_WITH_LEN("bigint(20) unsigned") },
{NULL, 0} },
};
static const uint mysql_rpl_slave_state_pk_parts[]= {0, 1};
static const TABLE_FIELD_DEF mysql_rpl_slave_state_tabledef= {
array_elements(mysql_rpl_slave_state_coltypes),
mysql_rpl_slave_state_coltypes,
array_elements(mysql_rpl_slave_state_pk_parts),
mysql_rpl_slave_state_pk_parts
};
class Gtid_db_intact : public Table_check_intact
{
protected:
void report_error(uint, const char *fmt, ...)
{
va_list args;
va_start(args, fmt);
error_log_print(ERROR_LEVEL, fmt, args);
va_end(args);
}
};
static Gtid_db_intact gtid_table_intact;
/* /*
Write a gtid to the replication slave state table. Write a gtid to the replication slave state table.
...@@ -6304,6 +6342,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, ...@@ -6304,6 +6342,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
TABLE *table; TABLE *table;
list_element *elist= 0, *next; list_element *elist= 0, *next;
element *elem; element *elem;
ulonglong thd_saved_option= thd->variables.option_bits;
mysql_reset_thd_for_next_command(thd, 0); mysql_reset_thd_for_next_command(thd, 0);
...@@ -6315,16 +6354,24 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, ...@@ -6315,16 +6354,24 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
goto end; goto end;
table_opened= true; table_opened= true;
table= tlist.table; table= tlist.table;
if ((err= gtid_table_intact.check(table, &mysql_rpl_slave_state_tabledef)))
{
my_error(ER_GTID_OPEN_TABLE_FAILED, MYF(0), "mysql",
rpl_gtid_slave_state_table_name.str);
goto end;
}
table->no_replicate= 1; table->no_replicate= 1;
if (!in_transaction)
thd->variables.option_bits&=
~(ulonglong)(OPTION_NOT_AUTOCOMMIT|OPTION_BEGIN);
/* /*
ToDo: Check the table definition, error if not as expected. ToDo: Check the table definition, error if not as expected.
We need the correct first 4 columns with correct type, and the primary key. We need the correct first 4 columns with correct type, and the primary key.
*/ */
bitmap_set_bit(table->write_set, table->field[0]->field_index); bitmap_set_all(table->write_set);
bitmap_set_bit(table->write_set, table->field[1]->field_index);
bitmap_set_bit(table->write_set, table->field[2]->field_index);
bitmap_set_bit(table->write_set, table->field[3]->field_index);
table->field[0]->store((ulonglong)gtid->domain_id, true); table->field[0]->store((ulonglong)gtid->domain_id, true);
table->field[1]->store(sub_id, true); table->field[1]->store(sub_id, true);
...@@ -6347,24 +6394,28 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, ...@@ -6347,24 +6394,28 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
goto end; goto end;
/* Now delete any already committed rows. */ /* Now delete any already committed rows. */
DBUG_ASSERT
((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
table->s->primary_key < MAX_KEY /* ToDo support all storage engines */);
bitmap_set_bit(table->read_set, table->field[0]->field_index); bitmap_set_bit(table->read_set, table->field[0]->field_index);
bitmap_set_bit(table->read_set, table->field[1]->field_index); bitmap_set_bit(table->read_set, table->field[1]->field_index);
if ((err= table->file->ha_index_init(0, 0)))
goto end;
while (elist) while (elist)
{ {
uchar key_buffer[4+8];
next= elist->next; next= elist->next;
table->field[1]->store(elist->sub_id, true); table->field[1]->store(elist->sub_id, true);
/* domain_id is already set in table->record[0] from write_row() above. */ /* domain_id is already set in table->record[0] from write_row() above. */
if ((err= table->file->ha_rnd_pos_by_record(table->record[0])) || key_copy(key_buffer, table->record[0], &table->key_info[0], 0, false);
(err= table->file->ha_delete_row(table->record[0]))) if ((err= table->file->ha_index_read_map(table->record[1], key_buffer,
goto end; HA_WHOLE_KEY, HA_READ_KEY_EXACT)) ||
(err= table->file->ha_delete_row(table->record[1])))
break;
my_free(elist); my_free(elist);
elist= next; elist= next;
} }
table->file->ha_index_end();
end: end:
...@@ -6378,17 +6429,14 @@ end: ...@@ -6378,17 +6429,14 @@ end:
*/ */
ha_rollback_trans(thd, FALSE); ha_rollback_trans(thd, FALSE);
close_thread_tables(thd); close_thread_tables(thd);
if (!in_transaction)
ha_rollback_trans(thd, TRUE);
} }
else else
{ {
ha_commit_trans(thd, FALSE); ha_commit_trans(thd, FALSE);
close_thread_tables(thd); close_thread_tables(thd);
if (!in_transaction)
ha_commit_trans(thd, TRUE);
} }
} }
thd->variables.option_bits= thd_saved_option;
return err; return err;
} }
......
...@@ -6608,3 +6608,6 @@ ER_CANNOT_UPDATE_GTID_STATE ...@@ -6608,3 +6608,6 @@ ER_CANNOT_UPDATE_GTID_STATE
eng "Could not update replication slave gtid state" eng "Could not update replication slave gtid state"
ER_DUPLICATE_GTID_DOMAIN ER_DUPLICATE_GTID_DOMAIN
eng "GTID %u-%u-%llu and %u-%u-%llu conflict (duplicate domain id %u)" eng "GTID %u-%u-%llu and %u-%u-%llu conflict (duplicate domain id %u)"
ER_GTID_OPEN_TABLE_FAILED
eng "Failed to open %s.%s"
ger "Öffnen von %s.%s fehlgeschlagen"
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment