Commit 36a97172 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-13167 InnoDB key rotation is not skipping unused pages

In key rotation, we must initialize unallocated but previously
initialized pages, so that if encryption is enabled on a table,
all clear-text data for the page will eventually be overwritten.
But we should not rotate keys on pages that were never allocated
after the data file was created.

According to the latching order rules, after acquiring the
tablespace latch, no page latches of previously allocated user pages
may be acquired. So, key rotation should check the page allocation
status after acquiring the page latch, not before. But, the latching
order rules also prohibit accessing pages that were not allocated first,
and then acquiring the tablespace latch. Such behaviour would indeed
result in a deadlock when running the following tests:
encryption.innodb_encryption-page-compression
encryption.innodb-checksum-algorithm

Because the key rotation is accessing potentially unallocated pages, it
cannot reliably check if these pages were allocated. It can only check
the page header. If the page number is zero, we can assume that the
page is unallocated.

fil_crypt_rotate_pages(): Skip pages that are known to be uninitialized.

fil_crypt_rotate_page(): Detect uninitialized pages by FIL_PAGE_OFFSET.
Page 0 is never encrypted, and on other pages that are initialized,
FIL_PAGE_OFFSET must contain the page number.

fil_crypt_is_page_uninitialized(): Remove. It suffices to check the
page number field in fil_crypt_rotate_page().
parent e52dd13c
...@@ -1596,20 +1596,6 @@ fil_crypt_find_page_to_rotate( ...@@ -1596,20 +1596,6 @@ fil_crypt_find_page_to_rotate(
return found; return found;
} }
/***********************************************************************
Check if a page is uninitialized (doesn't need to be rotated)
@param[in] frame Page to check
@param[in] page_size Page size
@return true if page is uninitialized, false if not. */
static inline
bool
fil_crypt_is_page_uninitialized(
const byte *frame,
const page_size_t& page_size)
{
return (buf_page_is_zeroes(frame, page_size));
}
#define fil_crypt_get_page_throttle(state,offset,mtr,sleeptime_ms) \ #define fil_crypt_get_page_throttle(state,offset,mtr,sleeptime_ms) \
fil_crypt_get_page_throttle_func(state, offset, mtr, \ fil_crypt_get_page_throttle_func(state, offset, mtr, \
sleeptime_ms, __FILE__, __LINE__) sleeptime_ms, __FILE__, __LINE__)
...@@ -1770,9 +1756,9 @@ fil_crypt_rotate_page( ...@@ -1770,9 +1756,9 @@ fil_crypt_rotate_page(
ulint offset = state->offset; ulint offset = state->offset;
ulint sleeptime_ms = 0; ulint sleeptime_ms = 0;
fil_space_crypt_t *crypt_data = space->crypt_data; fil_space_crypt_t *crypt_data = space->crypt_data;
const page_size_t page_size = page_size_t(space->flags);
ut_ad(space->n_pending_ops > 0); ut_ad(space->n_pending_ops > 0);
ut_ad(offset > 0);
/* In fil_crypt_thread where key rotation is done we have /* In fil_crypt_thread where key rotation is done we have
acquired space and checked that this space is not yet acquired space and checked that this space is not yet
...@@ -1787,35 +1773,47 @@ fil_crypt_rotate_page( ...@@ -1787,35 +1773,47 @@ fil_crypt_rotate_page(
return; return;
} }
ut_d(const bool was_free = fseg_page_is_free(space, offset));
mtr_t mtr; mtr_t mtr;
mtr.start(); mtr.start();
if (buf_block_t* block = fil_crypt_get_page_throttle(state, if (buf_block_t* block = fil_crypt_get_page_throttle(state,
offset, &mtr, offset, &mtr,
&sleeptime_ms)) { &sleeptime_ms)) {
mtr.set_named_space(space);
bool modified = false; bool modified = false;
int needs_scrubbing = BTR_SCRUB_SKIP_PAGE; int needs_scrubbing = BTR_SCRUB_SKIP_PAGE;
lsn_t block_lsn = block->page.newest_modification; lsn_t block_lsn = block->page.newest_modification;
byte* frame = buf_block_get_frame(block); byte* frame = buf_block_get_frame(block);
uint kv = mach_read_from_4(frame+FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION); uint kv = mach_read_from_4(frame+FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
/* check if tablespace is closing after reading page */ if (space->is_stopping()) {
if (!space->is_stopping()) { /* The tablespace is closing (in DROP TABLE or
TRUNCATE TABLE or similar): avoid further access */
if (kv == 0 && } else if (!*reinterpret_cast<uint32_t*>(FIL_PAGE_OFFSET
fil_crypt_is_page_uninitialized(frame, page_size)) { + frame)) {
; /* It looks like this page was never
allocated. Because key rotation is accessing
pages in a pattern that is unlike the normal
B-tree and undo log access pattern, we cannot
invoke fseg_page_is_free() here, because that
could result in a deadlock. If we invoked
fseg_page_is_free() and released the
tablespace latch before acquiring block->lock,
then the fseg_page_is_free() information
could be stale already. */
ut_ad(was_free);
ut_ad(kv == 0);
ut_ad(page_get_space_id(frame) == 0);
} else if (fil_crypt_needs_rotation( } else if (fil_crypt_needs_rotation(
crypt_data->encryption, crypt_data->encryption,
kv, key_state->key_version, kv, key_state->key_version,
key_state->rotate_key_age)) { key_state->rotate_key_age)) {
mtr.set_named_space(space);
modified = true; modified = true;
/* force rotation by dummy updating page */ /* force rotation by dummy updating page */
mlog_write_ulint(frame + mlog_write_ulint(frame + FIL_PAGE_SPACE_ID,
FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID,
space_id, MLOG_4BYTES, &mtr); space_id, MLOG_4BYTES, &mtr);
/* statistics */ /* statistics */
...@@ -1826,7 +1824,6 @@ fil_crypt_rotate_page( ...@@ -1826,7 +1824,6 @@ fil_crypt_rotate_page(
state->min_key_version_found = kv; state->min_key_version_found = kv;
} }
} }
}
needs_scrubbing = btr_page_needs_scrubbing( needs_scrubbing = btr_page_needs_scrubbing(
&state->scrub_data, block, &state->scrub_data, block,
...@@ -1934,7 +1931,8 @@ fil_crypt_rotate_pages( ...@@ -1934,7 +1931,8 @@ fil_crypt_rotate_pages(
rotate_thread_t* state) rotate_thread_t* state)
{ {
ulint space = state->space->id; ulint space = state->space->id;
ulint end = state->offset + state->batch; ulint end = std::min(state->offset + state->batch,
state->space->free_limit);
ut_ad(state->space->n_pending_ops > 0); ut_ad(state->space->n_pending_ops > 0);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment