Commit 1562b2c2 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-29666 InnoDB fails to purge secondary index records when indexed virtual columns exist

row_purge_get_partial(): Replaces trx_undo_rec_get_partial_row().
Also copy the purge_node_t::ref to the purge_node_t::row.
In this way, the clustered index key fields will always be
available, even if thanks to
commit d384ead0 (MDEV-14799)
they would no longer be repeated in the remaining part of the
undo log record.
parent 19f0b96d
......@@ -189,10 +189,7 @@ lock table t write;
connection prevent_purge;
commit;
connection default;
--source ../../innodb/include/wait_all_purged.inc
disconnect lock_table;
start transaction with consistent snapshot;
commit;
--source ../../innodb/include/wait_all_purged.inc
set global debug_dbug=@old_dbug;
drop table t;
......
......@@ -166,4 +166,19 @@ buffer_LRU_batch_evict_total_pages buffer
# FLUSH TABLES t1 FOR EXPORT;
# UNLOCK TABLES;
DROP TABLE t1;
#
# MDEV-29666 InnoDB fails to purge secondary index records
# when indexed virtual columns exist
#
CREATE TABLE t1 (a INT, b INT, a1 INT AS(a) VIRTUAL,
INDEX(a1),INDEX(b)) ENGINE=InnoDB;
INSERT INTO t1 SET a=1, b=1;
UPDATE t1 SET a=2, b=3;
InnoDB 0 transactions not purged
FLUSH TABLE t1 FOR EXPORT;
page 4: N_RECS=0x0001
page 5: N_RECS=0x0001
UNLOCK TABLES;
DROP TABLE t1;
# End of 10.3 tests
SET GLOBAL innodb_purge_rseg_truncate_frequency = @saved_frequency;
......@@ -149,4 +149,36 @@ WHERE NAME="buffer_LRU_batch_evict_total_pages" AND COUNT > 0;
DROP TABLE t1;
--echo #
--echo # MDEV-29666 InnoDB fails to purge secondary index records
--echo # when indexed virtual columns exist
--echo #
CREATE TABLE t1 (a INT, b INT, a1 INT AS(a) VIRTUAL,
INDEX(a1),INDEX(b)) ENGINE=InnoDB;
INSERT INTO t1 SET a=1, b=1;
UPDATE t1 SET a=2, b=3;
let DATADIR=`select @@datadir`;
let PAGE_SIZE=`select @@innodb_page_size`;
source include/wait_all_purged.inc;
FLUSH TABLE t1 FOR EXPORT;
perl;
my $ps = $ENV{PAGE_SIZE};
my $file = "$ENV{DATADIR}/test/t1.ibd";
open(FILE, "<", $file) or die "Unable to open $file\n";
die "Unable to read $file\n" unless
sysread(FILE, $_, 6*$ps) == 6*$ps;
close(FILE);
print "page 4: N_RECS=0x", unpack("H*", substr($_, 4 * $ps + 54, 2)), "\n";
print "page 5: N_RECS=0x", unpack("H*", substr($_, 5 * $ps + 54, 2)), "\n";
EOF
UNLOCK TABLES;
DROP TABLE t1;
--echo # End of 10.3 tests
SET GLOBAL innodb_purge_rseg_truncate_frequency = @saved_frequency;
/*****************************************************************************
Copyright (c) 1996, 2016, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2017, 2019, MariaDB Corporation.
Copyright (c) 2017, 2022, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
......@@ -136,30 +136,6 @@ trx_undo_update_rec_get_update(
mem_heap_t* heap, /*!< in: memory heap from which the memory
needed is allocated */
upd_t** upd); /*!< out, own: update vector */
/*******************************************************************//**
Builds a partial row from an update undo log record, for purge.
It contains the columns which occur as ordering in any index of the table.
Any missing columns are indicated by col->mtype == DATA_MISSING.
@return pointer to remaining part of undo record */
byte*
trx_undo_rec_get_partial_row(
/*=========================*/
const byte* ptr, /*!< in: remaining part in update undo log
record of a suitable type, at the start of
the stored index columns;
NOTE that this copy of the undo log record must
be preserved as long as the partial row is
used, as we do NOT copy the data in the
record! */
dict_index_t* index, /*!< in: clustered index */
const upd_t* update, /*!< in: updated columns */
dtuple_t** row, /*!< out, own: partial row */
ibool ignore_prefix, /*!< in: flag to indicate if we
expect blob prefixes in undo. Used
only in the assertion. */
mem_heap_t* heap) /*!< in: memory heap from which the memory
needed is allocated */
MY_ATTRIBUTE((nonnull, warn_unused_result));
/** Report a RENAME TABLE operation.
@param[in,out] trx transaction
@param[in] table table that is being renamed
......
/*****************************************************************************
Copyright (c) 1997, 2017, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2017, 2021, MariaDB Corporation.
Copyright (c) 2017, 2022, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
......@@ -1004,6 +1004,134 @@ row_purge_upd_exist_or_extern_func(
row_purge_upd_exist_or_extern_func(node,undo_rec)
#endif /* UNIV_DEBUG */
/** Build a partial row from an update undo log record for purge.
Any columns which occur as ordering in any index of the table are present.
Any missing columns are indicated by col->mtype == DATA_MISSING.
@param ptr remaining part of the undo log record
@param index clustered index
@param node purge node
@return pointer to remaining part of undo record */
static byte *row_purge_get_partial(const byte *ptr, const dict_index_t &index,
purge_node_t *node)
{
bool first_v_col= true;
bool is_undo_log= true;
ut_ad(index.is_primary());
ut_ad(index.n_uniq == node->ref->n_fields);
node->row= dtuple_create_with_vcol(node->heap, index.table->n_cols,
index.table->n_v_cols);
/* Mark all columns in the row uninitialized, so that
we can distinguish missing fields from fields that are SQL NULL. */
for (ulint i= 0; i < index.table->n_cols; i++)
node->row->fields[i].type.mtype= DATA_MISSING;
dtuple_init_v_fld(node->row);
for (const upd_field_t *uf= node->update->fields, *const ue=
node->update->fields + node->update->n_fields; uf != ue; uf++)
if (!uf->old_v_val)
node->row->fields[dict_index_get_nth_col(&index, uf->field_no)->ind]=
uf->new_val;
const byte *end_ptr= ptr + mach_read_from_2(ptr);
ptr+= 2;
while (ptr != end_ptr)
{
dfield_t *dfield;
const byte *field;
const dict_col_t *col;
ulint len;
ulint orig_len;
ulint field_no= mach_read_next_compressed(&ptr);
if (field_no >= REC_MAX_N_FIELDS)
{
ptr= trx_undo_read_v_idx(index.table, ptr, first_v_col, &is_undo_log,
&field_no);
first_v_col= false;
ptr= trx_undo_rec_get_col_val(ptr, &field, &len, &orig_len);
if (field_no == ULINT_UNDEFINED)
continue; /* there no longer is an index on the virtual column */
dict_v_col_t *vcol= dict_table_get_nth_v_col(index.table, field_no);
col =&vcol->m_col;
dfield= dtuple_get_nth_v_field(node->row, vcol->v_pos);
dict_col_copy_type(&vcol->m_col, &dfield->type);
}
else
{
ptr= trx_undo_rec_get_col_val(ptr, &field, &len, &orig_len);
col= dict_index_get_nth_col(&index, field_no);
dfield= dtuple_get_nth_field(node->row, col->ind);
ut_ad(dfield->type.mtype == DATA_MISSING ||
dict_col_type_assert_equal(col, &dfield->type));
ut_ad(dfield->type.mtype == DATA_MISSING ||
dfield->len == len ||
(len != UNIV_SQL_NULL && len >= UNIV_EXTERN_STORAGE_FIELD));
dict_col_copy_type(dict_table_get_nth_col(index.table, col->ind),
&dfield->type);
}
dfield_set_data(dfield, field, len);
if (len == UNIV_SQL_NULL || len < UNIV_EXTERN_STORAGE_FIELD)
continue;
spatial_status_t spatial_status= static_cast<spatial_status_t>
((len & SPATIAL_STATUS_MASK) >> SPATIAL_STATUS_SHIFT);
len&= ~SPATIAL_STATUS_MASK;
/* Keep compatible with 5.7.9 format. */
if (spatial_status == SPATIAL_UNKNOWN)
spatial_status= dict_col_get_spatial_status(col);
switch (UNIV_EXPECT(spatial_status, SPATIAL_NONE)) {
case SPATIAL_ONLY:
ut_ad(len - UNIV_EXTERN_STORAGE_FIELD == DATA_MBR_LEN);
dfield_set_len(dfield, len - UNIV_EXTERN_STORAGE_FIELD);
break;
case SPATIAL_MIXED:
dfield_set_len(dfield, len - UNIV_EXTERN_STORAGE_FIELD - DATA_MBR_LEN);
break;
default:
dfield_set_len(dfield, len - UNIV_EXTERN_STORAGE_FIELD);
break;
}
dfield_set_ext(dfield);
dfield_set_spatial_status(dfield, spatial_status);
if (!col->ord_part || spatial_status == SPATIAL_ONLY ||
node->rec_type == TRX_UNDO_UPD_DEL_REC)
continue;
/* If the prefix of this BLOB column is indexed, ensure that enough
prefix is stored in the undo log record. */
ut_a(dfield_get_len(dfield) >= BTR_EXTERN_FIELD_REF_SIZE);
ut_a(dict_table_has_atomic_blobs(index.table) ||
dfield_get_len(dfield) >=
REC_ANTELOPE_MAX_INDEX_COL_LEN + BTR_EXTERN_FIELD_REF_SIZE);
}
for (ulint i= 0; i < index.n_uniq; i++)
{
dfield_t &field= node->row->fields[index.fields[i].col->ind];
if (field.type.mtype == DATA_MISSING)
field= node->ref->fields[i];
}
return const_cast<byte*>(ptr);
}
/***********************************************************//**
Parses the row reference and other info in a modify undo log record.
@return true if purge operation required */
......@@ -1153,10 +1281,7 @@ row_purge_parse_undo_rec(
if (!(node->cmpl_info & UPD_NODE_NO_ORD_CHANGE)) {
ut_ad(!(node->update->info_bits & REC_INFO_MIN_REC_FLAG));
ptr = trx_undo_rec_get_partial_row(
ptr, clust_index, node->update, &node->row,
type == TRX_UNDO_UPD_DEL_REC,
node->heap);
ptr = row_purge_get_partial(ptr, *clust_index, node);
} else if (node->update->info_bits & REC_INFO_MIN_REC_FLAG) {
node->ref = &trx_undo_metadata;
}
......
......@@ -1655,182 +1655,6 @@ trx_undo_update_rec_get_update(
return(const_cast<byte*>(ptr));
}
/*******************************************************************//**
Builds a partial row from an update undo log record, for purge.
It contains the columns which occur as ordering in any index of the table.
Any missing columns are indicated by col->mtype == DATA_MISSING.
@return pointer to remaining part of undo record */
byte*
trx_undo_rec_get_partial_row(
/*=========================*/
const byte* ptr, /*!< in: remaining part in update undo log
record of a suitable type, at the start of
the stored index columns;
NOTE that this copy of the undo log record must
be preserved as long as the partial row is
used, as we do NOT copy the data in the
record! */
dict_index_t* index, /*!< in: clustered index */
const upd_t* update, /*!< in: updated columns */
dtuple_t** row, /*!< out, own: partial row */
ibool ignore_prefix, /*!< in: flag to indicate if we
expect blob prefixes in undo. Used
only in the assertion. */
mem_heap_t* heap) /*!< in: memory heap from which the memory
needed is allocated */
{
const byte* end_ptr;
bool first_v_col = true;
bool is_undo_log = true;
ut_ad(index->is_primary());
*row = dtuple_create_with_vcol(
heap, dict_table_get_n_cols(index->table),
dict_table_get_n_v_cols(index->table));
/* Mark all columns in the row uninitialized, so that
we can distinguish missing fields from fields that are SQL NULL. */
for (ulint i = 0; i < dict_table_get_n_cols(index->table); i++) {
dfield_get_type(dtuple_get_nth_field(*row, i))
->mtype = DATA_MISSING;
}
dtuple_init_v_fld(*row);
for (const upd_field_t* uf = update->fields, * const ue
= update->fields + update->n_fields;
uf != ue; uf++) {
if (uf->old_v_val) {
continue;
}
ulint c = dict_index_get_nth_col(index, uf->field_no)->ind;
*dtuple_get_nth_field(*row, c) = uf->new_val;
}
end_ptr = ptr + mach_read_from_2(ptr);
ptr += 2;
while (ptr != end_ptr) {
dfield_t* dfield;
const byte* field;
ulint field_no;
const dict_col_t* col;
ulint col_no;
ulint len;
ulint orig_len;
bool is_virtual;
field_no = mach_read_next_compressed(&ptr);
is_virtual = (field_no >= REC_MAX_N_FIELDS);
if (is_virtual) {
ptr = trx_undo_read_v_idx(
index->table, ptr, first_v_col, &is_undo_log,
&field_no);
first_v_col = false;
}
ptr = trx_undo_rec_get_col_val(ptr, &field, &len, &orig_len);
/* This column could be dropped or no longer indexed */
if (field_no == ULINT_UNDEFINED) {
ut_ad(is_virtual);
continue;
}
if (is_virtual) {
dict_v_col_t* vcol = dict_table_get_nth_v_col(
index->table, field_no);
col = &vcol->m_col;
col_no = dict_col_get_no(col);
dfield = dtuple_get_nth_v_field(*row, vcol->v_pos);
dict_col_copy_type(
&vcol->m_col,
dfield_get_type(dfield));
} else {
col = dict_index_get_nth_col(index, field_no);
col_no = dict_col_get_no(col);
dfield = dtuple_get_nth_field(*row, col_no);
ut_ad(dfield->type.mtype == DATA_MISSING
|| dict_col_type_assert_equal(col,
&dfield->type));
ut_ad(dfield->type.mtype == DATA_MISSING
|| dfield->len == len
|| (len != UNIV_SQL_NULL
&& len >= UNIV_EXTERN_STORAGE_FIELD));
dict_col_copy_type(
dict_table_get_nth_col(index->table, col_no),
dfield_get_type(dfield));
}
dfield_set_data(dfield, field, len);
if (len != UNIV_SQL_NULL
&& len >= UNIV_EXTERN_STORAGE_FIELD) {
spatial_status_t spatial_status;
/* Decode spatial status. */
spatial_status = static_cast<spatial_status_t>(
(len & SPATIAL_STATUS_MASK)
>> SPATIAL_STATUS_SHIFT);
len &= ~SPATIAL_STATUS_MASK;
/* Keep compatible with 5.7.9 format. */
if (spatial_status == SPATIAL_UNKNOWN) {
spatial_status =
dict_col_get_spatial_status(col);
}
switch (spatial_status) {
case SPATIAL_ONLY:
ut_ad(len - UNIV_EXTERN_STORAGE_FIELD
== DATA_MBR_LEN);
dfield_set_len(
dfield,
len - UNIV_EXTERN_STORAGE_FIELD);
break;
case SPATIAL_MIXED:
dfield_set_len(
dfield,
len - UNIV_EXTERN_STORAGE_FIELD
- DATA_MBR_LEN);
break;
case SPATIAL_NONE:
dfield_set_len(
dfield,
len - UNIV_EXTERN_STORAGE_FIELD);
break;
case SPATIAL_UNKNOWN:
ut_ad(0);
break;
}
dfield_set_ext(dfield);
dfield_set_spatial_status(dfield, spatial_status);
/* If the prefix of this column is indexed,
ensure that enough prefix is stored in the
undo log record. */
if (!ignore_prefix && col->ord_part
&& spatial_status != SPATIAL_ONLY) {
ut_a(dfield_get_len(dfield)
>= BTR_EXTERN_FIELD_REF_SIZE);
ut_a(dict_table_has_atomic_blobs(index->table)
|| dfield_get_len(dfield)
>= REC_ANTELOPE_MAX_INDEX_COL_LEN
+ BTR_EXTERN_FIELD_REF_SIZE);
}
}
}
return(const_cast<byte*>(ptr));
}
/** Erase the unused undo log page end.
@param[in,out] undo_page undo log page
@return whether the page contained something */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment