Commit fe5829a1 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-34446 SIGSEGV on SET GLOBAL innodb_log_file_size with memory-mapped log file

log_t::resize_write(): Advance log_sys.resize_lsn and reset
the resize_log offset to START_OFFSET whenever the memory-mapped buffer
would wrap around.

Previously, in case the initial target offset would be beyond the
requested innodb_log_file_size, we only adjusted the offset but
not the LSN. An incorrect LSN would cause log_sys.buf_free to be out
of bounds when the log resizing completes.

The log_sys.lsn_lock will cover the entire duration of replicating
memory-mapped log for resizing. We just need a mutex that is compatible
with the caller holding log_sys.latch. While the choice of mtr_t::finisher
(for normal log writes) depends on mtr_t::spin_wait_delay,
replicating the log during resizing is a rare operation where we can
afford possible additional context switching overhead.
parent b3cc9529
......@@ -1217,19 +1217,43 @@ inline void log_t::resize_write(lsn_t lsn, const byte *end, size_t len,
if (!resize_flush_buf)
{
ut_ad(is_pmem());
lsn_lock.wr_lock();
const size_t resize_capacity{resize_target - START_OFFSET};
const lsn_t resizing{resize_in_progress()};
if (UNIV_UNLIKELY(lsn < resizing))
{
size_t l= resizing - lsn;
if (l >= len)
return;
end+= l - len;
len-= l;
lsn+= l;
const lsn_t resizing{resize_in_progress()};
/* For memory-mapped log, log_t::resize_start() would never
set log_sys.resize_lsn to less than log_sys.lsn. It cannot
execute concurrently with this thread, because we are holding
log_sys.latch and it would hold an exclusive log_sys.latch. */
if (UNIV_UNLIKELY(lsn < resizing))
{
/* This function may execute in multiple concurrent threads
that hold a shared log_sys.latch. Before we got lsn_lock,
another thread could have executed resize_lsn.store(lsn) below
with a larger lsn than ours.
append_prepare() guarantees that the concurrent writes
cannot overlap, that is, our entire log must be discarded.
Besides, incomplete mini-transactions cannot be parsed anyway. */
ut_ad(resizing >= lsn + len);
goto pmem_done;
}
s= START_OFFSET;
if (UNIV_UNLIKELY(lsn - resizing + len >= resize_capacity))
{
resize_lsn.store(lsn, std::memory_order_relaxed);
lsn= 0;
}
else
{
lsn-= resizing;
s+= lsn;
}
}
lsn-= resizing;
s= START_OFFSET + lsn % resize_capacity;
ut_ad(s + len <= resize_target);
if (UNIV_UNLIKELY(end < &buf[START_OFFSET]))
{
......@@ -1239,59 +1263,22 @@ inline void log_t::resize_write(lsn_t lsn, const byte *end, size_t len,
ut_ad(end + capacity() + len >= &buf[file_size]);
size_t l= size_t(buf - (end - START_OFFSET));
if (UNIV_LIKELY(s + len <= resize_target))
{
/* The destination buffer (log_sys.resize_buf) did not wrap around */
memcpy(resize_buf + s, end + capacity(), l);
memcpy(resize_buf + s + l, &buf[START_OFFSET], len - l);
goto pmem_nowrap;
}
else
{
/* Both log_sys.buf and log_sys.resize_buf wrapped around */
const size_t rl= resize_target - s;
if (l <= rl)
{
/* log_sys.buf wraps around first */
memcpy(resize_buf + s, end + capacity(), l);
memcpy(resize_buf + s + l, &buf[START_OFFSET], rl - l);
memcpy(resize_buf + START_OFFSET, &buf[START_OFFSET + rl - l],
len - l);
}
else
{
/* log_sys.resize_buf wraps around first */
memcpy(resize_buf + s, end + capacity(), rl);
memcpy(resize_buf + START_OFFSET, end + capacity() + rl, l - rl);
memcpy(resize_buf + START_OFFSET + (l - rl),
&buf[START_OFFSET], len - l);
}
goto pmem_wrap;
}
memcpy(resize_buf + s, end + capacity(), l);
memcpy(resize_buf + s + l, &buf[START_OFFSET], len - l);
}
else
{
ut_ad(end + len <= &buf[file_size]);
if (UNIV_LIKELY(s + len <= resize_target))
{
memcpy(resize_buf + s, end, len);
pmem_nowrap:
s+= len - seq;
}
else
{
/* The log_sys.resize_buf wrapped around */
memcpy(resize_buf + s, end, resize_target - s);
memcpy(resize_buf + START_OFFSET, end + (resize_target - s),
len - (resize_target - s));
pmem_wrap:
s+= len - seq;
if (s >= resize_target)
s-= resize_capacity;
resize_lsn.fetch_add(resize_capacity); /* Move the target ahead. */
}
memcpy(resize_buf + s, end, len);
}
s+= len - seq;
/* Always set the sequence bit. If the resized log were to wrap around,
we will advance resize_lsn. */
ut_ad(resize_buf[s] <= 1);
resize_buf[s]= 1;
pmem_done:
lsn_lock.wr_unlock();
}
else
#endif
......@@ -1301,12 +1288,11 @@ inline void log_t::resize_write(lsn_t lsn, const byte *end, size_t len,
ut_ad(s + len <= buf_size);
memcpy(resize_buf + s, end, len);
s+= len - seq;
/* Always set the sequence bit. If the resized log were to wrap around,
we will advance resize_lsn. */
ut_ad(resize_buf[s] <= 1);
resize_buf[s]= 1;
}
/* Always set the sequence bit. If the resized log were to wrap around,
we will advance resize_lsn. */
ut_ad(resize_buf[s] <= 1);
resize_buf[s]= 1;
}
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment