Commit 44314c76 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-15165 InnoDB purge for index on virtual column is trying to access an incomplete record

The algorithm change is based on a MySQL 8.0 fix for
BUG #26818787: ASSERTION: DATA0DATA.IC:430:TUPLE
by Krzysztof Kapuścik
https://github.com/mysql/mysql-server/commit/ee606e62bbddd7ac3579b4a20ef8684fa7cd83fe

If a record had been inserted in place of a delete-marked purgeable
record by modifying that record, and purge was accessing that record
before the off-page columns were written, row_build_index_entry()
would have returned NULL, causing a crash.

row_vers_non_virtual_fields_equal(): Check whether all non-virtual fields
of an index are equal. Replaces row_vers_non_vc_match(). A more complex
version of this function was called row_vers_non_vc_index_entry_match()
in the MySQL 8.0 fix.

row_vers_impl_x_locked_low(): This change is not directly related to
the reported problem, but apparently to the removal of the function
row_vers_non_vc_match(). This function checks if a secondary index
record was modified by a transaction that has not been committed yet.
For comparing the non-virtual columns, construct a secondary index
tuple from the table row.

row_vers_vc_matches_cluster(): Replace row_vers_non_vc_match() with
code that is equivalent to the row_vers_non_vc_index_entry_match()
in the MySQL 8.0 fix. Also, deduplicate some code by using goto.
parent 29240b50
......@@ -145,7 +145,8 @@ connection default;
update t set a = repeat('m', 16000) where a like "aaa%";
connect lock_table, localhost, root;
lock table t write;
disconnect prevent_purge;
connection prevent_purge;
commit;
connection default;
InnoDB 0 transactions not purged
disconnect lock_table;
......@@ -154,5 +155,28 @@ commit;
InnoDB 0 transactions not purged
set global debug_dbug=@old_dbug;
drop table t;
#
# MDEV-15165 InnoDB purge for index on virtual column
# is trying to access an incomplete record
#
CREATE TABLE t1(
u INT PRIMARY KEY, b BLOB, ug INT GENERATED ALWAYS AS (u) VIRTUAL,
INDEX bug(b(100),ug)
) ENGINE=InnoDB;
INSERT INTO t1 (u,b) VALUES(1,REPEAT('a',16384));
connection prevent_purge;
start transaction with consistent snapshot;
connection default;
DELETE FROM t1;
SET DEBUG_SYNC='blob_write_middle SIGNAL halfway WAIT_FOR purged';
INSERT INTO t1 (u,b) VALUES(1,REPEAT('a',16384));
connection prevent_purge;
SET DEBUG_SYNC='now WAIT_FOR halfway';
COMMIT;
InnoDB 0 transactions not purged
SET DEBUG_SYNC='now SIGNAL purged';
disconnect prevent_purge;
connection default;
DROP TABLE t1;
set debug_sync=reset;
SET GLOBAL innodb_purge_rseg_truncate_frequency = @saved_frequency;
......@@ -184,7 +184,8 @@ connection default;
update t set a = repeat('m', 16000) where a like "aaa%";
connect(lock_table, localhost, root);
lock table t write;
disconnect prevent_purge;
connection prevent_purge;
commit;
connection default;
--source ../../innodb/include/wait_all_purged.inc
disconnect lock_table;
......@@ -194,6 +195,32 @@ commit;
set global debug_dbug=@old_dbug;
drop table t;
--echo #
--echo # MDEV-15165 InnoDB purge for index on virtual column
--echo # is trying to access an incomplete record
--echo #
CREATE TABLE t1(
u INT PRIMARY KEY, b BLOB, ug INT GENERATED ALWAYS AS (u) VIRTUAL,
INDEX bug(b(100),ug)
) ENGINE=InnoDB;
INSERT INTO t1 (u,b) VALUES(1,REPEAT('a',16384));
connection prevent_purge;
start transaction with consistent snapshot;
connection default;
DELETE FROM t1;
SET DEBUG_SYNC='blob_write_middle SIGNAL halfway WAIT_FOR purged';
send INSERT INTO t1 (u,b) VALUES(1,REPEAT('a',16384));
connection prevent_purge;
SET DEBUG_SYNC='now WAIT_FOR halfway';
COMMIT;
--source ../../innodb/include/wait_all_purged.inc
SET DEBUG_SYNC='now SIGNAL purged';
disconnect prevent_purge;
connection default;
reap;
DROP TABLE t1;
--source include/wait_until_count_sessions.inc
set debug_sync=reset;
SET GLOBAL innodb_purge_rseg_truncate_frequency = @saved_frequency;
/*****************************************************************************
Copyright (c) 1997, 2016, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 1997, 2017, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2017, 2018, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under
......@@ -45,24 +45,31 @@ Created 2/6/1997 Heikki Tuuri
#include "lock0lock.h"
#include "row0mysql.h"
/** Check whether all non-virtual columns in a virtual index match that of in
the cluster index
@param[in] index the secondary index
@param[in] row the cluster index row in dtuple form
@param[in] ext externally stored column prefix or NULL
@param[in] ientry the secondary index entry
@param[in,out] heap heap used to build virtual dtuple
@param[in,out] n_non_v_col number of non-virtual columns in the index
@return true if all matches, false otherwise */
/** Check whether all non-virtual index fields are equal.
@param[in] index the secondary index
@param[in] a first index entry to compare
@param[in] b second index entry to compare
@return whether all non-virtual fields are equal */
static
bool
row_vers_non_vc_match(
dict_index_t* index,
const dtuple_t* row,
const row_ext_t* ext,
const dtuple_t* ientry,
mem_heap_t* heap,
ulint* n_non_v_col);
row_vers_non_virtual_fields_equal(
const dict_index_t* index,
const dfield_t* a,
const dfield_t* b)
{
const dict_field_t* end = &index->fields[index->n_fields];
for (const dict_field_t* ifield = index->fields; ifield != end;
ifield++) {
if (!dict_col_is_virtual(ifield->col)
&& cmp_dfield_dfield(a++, b++)) {
return false;
}
}
return true;
}
/** Determine if an active transaction has inserted or modified a secondary
index record.
@param[in] clust_rec clustered index record
......@@ -233,15 +240,29 @@ row_vers_impl_x_locked_low(
}
if (!cur_vrow) {
ulint n_non_v_col = 0;
/* Build index entry out of row */
entry = row_build_index_entry(row, ext, index,
heap);
/* entry could only be NULL (the
clustered index record could contain
BLOB pointers that are NULL) if we
were accessing a freshly inserted
record before it was fully inserted.
prev_version cannot possibly be such
an incomplete record, because its
transaction would have to be committed
in order for later versions of the
record to be able to exist. */
ut_ad(entry);
/* If the indexed virtual columns has changed,
there must be log record to generate vrow.
Otherwise, it is not changed, so no need
to compare */
if (row_vers_non_vc_match(
index, row, ext, ientry, heap,
&n_non_v_col) == 0) {
if (!row_vers_non_virtual_fields_equal(
index,
ientry->fields, entry->fields)) {
if (rec_del != vers_del) {
break;
}
......@@ -411,61 +432,6 @@ row_vers_must_preserve_del_marked(
return(!purge_sys->view.changes_visible(trx_id, name));
}
/** Check whether all non-virtual columns in a virtual index match that of in
the cluster index
@param[in] index the secondary index
@param[in] row the cluster index row in dtuple form
@param[in] ext externally stored column prefix or NULL
@param[in] ientry the secondary index entry
@param[in,out] heap heap used to build virtual dtuple
@param[in,out] n_non_v_col number of non-virtual columns in the index
@return true if all matches, false otherwise */
static
bool
row_vers_non_vc_match(
dict_index_t* index,
const dtuple_t* row,
const row_ext_t* ext,
const dtuple_t* ientry,
mem_heap_t* heap,
ulint* n_non_v_col)
{
const dfield_t* field1;
dfield_t* field2;
ulint n_fields = dtuple_get_n_fields(ientry);
ulint ret = true;
*n_non_v_col = 0;
/* Build index entry out of row */
dtuple_t* nentry = row_build_index_entry(row, ext, index, heap);
for (ulint i = 0; i < n_fields; i++) {
const dict_field_t* ind_field = dict_index_get_nth_field(
index, i);
const dict_col_t* col = ind_field->col;
/* Only check non-virtual columns */
if (dict_col_is_virtual(col)) {
continue;
}
if (ret) {
field1 = dtuple_get_nth_field(ientry, i);
field2 = dtuple_get_nth_field(nentry, i);
if (cmp_dfield_dfield(field1, field2) != 0) {
ret = false;
}
}
(*n_non_v_col)++;
}
return(ret);
}
/** build virtual column value from current cluster index record data
@param[in,out] row the cluster index row in dtuple form
@param[in] clust_index clustered index
......@@ -616,8 +582,7 @@ that of current cluster index record, which is recreated from information
stored in undo log
@param[in] in_purge called by purge thread
@param[in] rec record in the clustered index
@param[in] row the cluster index row in dtuple form
@param[in] ext externally stored column prefix or NULL
@param[in] icentry the index entry built from a cluster row
@param[in] clust_index cluster index
@param[in] clust_offsets offsets on the cluster record
@param[in] index the secondary index
......@@ -633,8 +598,7 @@ bool
row_vers_vc_matches_cluster(
bool in_purge,
const rec_t* rec,
const dtuple_t* row,
row_ext_t* ext,
const dtuple_t* icentry,
dict_index_t* clust_index,
ulint* clust_offsets,
dict_index_t* index,
......@@ -659,15 +623,27 @@ row_vers_vc_matches_cluster(
dfield_t* field2;
ulint i;
tuple_heap = mem_heap_create(1024);
/* First compare non-virtual columns (primary keys) */
if (!row_vers_non_vc_match(index, row, ext, ientry, tuple_heap,
&n_non_v_col)) {
mem_heap_free(tuple_heap);
return(false);
ut_ad(index->n_fields == n_fields);
ut_ad(n_fields == dtuple_get_n_fields(icentry));
{
const dfield_t* a = ientry->fields;
const dfield_t* b = icentry->fields;
for (const dict_field_t *ifield = index->fields,
*const end = &index->fields[index->n_fields];
ifield != end; ifield++, a++, b++) {
if (!dict_col_is_virtual(ifield->col)) {
if (cmp_dfield_dfield(a, b)) {
return false;
}
n_non_v_col++;
}
}
}
tuple_heap = mem_heap_create(1024);
ut_ad(n_fields > n_non_v_col);
*vrow = dtuple_create_with_vcol(v_heap ? v_heap : tuple_heap, 0, num_v);
......@@ -936,27 +912,28 @@ row_vers_old_has_index_entry(
entry = row_build_index_entry(
row, ext, index, heap);
if (entry && !dtuple_coll_cmp(ientry, entry)) {
mem_heap_free(heap);
if (v_heap) {
mem_heap_free(v_heap);
}
return(TRUE);
goto safe_to_purge;
}
} else {
if (row_vers_vc_matches_cluster(
also_curr, rec, row, ext, clust_index,
clust_offsets, index, ientry, roll_ptr,
trx_id, NULL, &vrow, mtr)) {
mem_heap_free(heap);
if (v_heap) {
mem_heap_free(v_heap);
}
return(TRUE);
/* Build index entry out of row */
entry = row_build_index_entry(row, ext, index, heap);
/* entry could only be NULL if
the clustered index record is an uncommitted
inserted record whose BLOBs have not been
written yet. The secondary index record
can be safely removed, because it cannot
possibly refer to this incomplete
clustered index record. (Insert would
always first be completed for the
clustered index record, then proceed to
secondary indexes.) */
if (entry && row_vers_vc_matches_cluster(
also_curr, rec, entry,
clust_index, clust_offsets,
index, ientry, roll_ptr,
trx_id, NULL, &vrow, mtr)) {
goto safe_to_purge;
}
}
clust_offsets = rec_get_offsets(rec, clust_index, NULL,
......@@ -989,7 +966,7 @@ row_vers_old_has_index_entry(
a different binary value in a char field, but the
collation identifies the old and new value anyway! */
if (entry && !dtuple_coll_cmp(ientry, entry)) {
safe_to_purge:
mem_heap_free(heap);
if (v_heap) {
......@@ -1086,13 +1063,7 @@ row_vers_old_has_index_entry(
and new value anyway! */
if (entry && !dtuple_coll_cmp(ientry, entry)) {
mem_heap_free(heap);
if (v_heap) {
mem_heap_free(v_heap);
}
return(TRUE);
goto safe_to_purge;
}
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment