Commit d8b8258a authored by Vladislav Vaintroub's avatar Vladislav Vaintroub

MDEV-26789 log_write_up_to needs mechanism to prevent stalls of async. waiters

Retry write and/or flush, if group_commit_lock::release() indicates that
there are async waiters left, and there is no new group commit lead
parent 41c66ef6
...@@ -803,6 +803,9 @@ void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key, ...@@ -803,6 +803,9 @@ void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key,
return; return;
} }
repeat:
lsn_t ret_lsn1= 0, ret_lsn2= 0;
if (flush_to_disk && if (flush_to_disk &&
flush_lock.acquire(lsn, callback) != group_commit_lock::ACQUIRED) flush_lock.acquire(lsn, callback) != group_commit_lock::ACQUIRED)
return; return;
...@@ -817,20 +820,32 @@ void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key, ...@@ -817,20 +820,32 @@ void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key,
log_write(rotate_key); log_write(rotate_key);
ut_a(log_sys.write_lsn == write_lsn); ut_a(log_sys.write_lsn == write_lsn);
write_lock.release(write_lsn); ret_lsn1= write_lock.release(write_lsn);
} }
if (!flush_to_disk) if (flush_to_disk)
return; {
/* Flush the highest written lsn.*/
/* Flush the highest written lsn.*/ auto flush_lsn = write_lock.value();
auto flush_lsn = write_lock.value(); flush_lock.set_pending(flush_lsn);
flush_lock.set_pending(flush_lsn); log_write_flush_to_disk_low(flush_lsn);
log_write_flush_to_disk_low(flush_lsn); ret_lsn2= flush_lock.release(flush_lsn);
flush_lock.release(flush_lsn);
log_flush_notify(flush_lsn);
DBUG_EXECUTE_IF("crash_after_log_write_upto", DBUG_SUICIDE(););
}
log_flush_notify(flush_lsn); if (ret_lsn1 || ret_lsn2)
DBUG_EXECUTE_IF("crash_after_log_write_upto", DBUG_SUICIDE();); {
/*
There is no new group commit lead, some async waiters could stall.
Rerun log_write_up_to(), to prevent that.
*/
lsn= std::max(ret_lsn1, ret_lsn2);
static const completion_callback dummy{[](void *) {},nullptr};
callback= &dummy;
goto repeat;
}
} }
/** Write to the log file up to the last log entry. /** Write to the log file up to the last log entry.
......
...@@ -274,11 +274,11 @@ group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num, c ...@@ -274,11 +274,11 @@ group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num, c
return lock_return_code::EXPIRED; return lock_return_code::EXPIRED;
} }
void group_commit_lock::release(value_type num) group_commit_lock::value_type group_commit_lock::release(value_type num)
{ {
completion_callback callbacks[1000]; completion_callback callbacks[1000];
size_t callback_count = 0; size_t callback_count = 0;
value_type ret = 0;
std::unique_lock<std::mutex> lk(m_mtx); std::unique_lock<std::mutex> lk(m_mtx);
m_lock = false; m_lock = false;
...@@ -359,6 +359,12 @@ void group_commit_lock::release(value_type num) ...@@ -359,6 +359,12 @@ void group_commit_lock::release(value_type num)
{ {
wakeup_list->m_group_commit_leader=true; wakeup_list->m_group_commit_leader=true;
} }
else
{
/* Tell the caller that some pending callbacks left, and he should
do something to prevent stalls. This should be a rare situation.*/
ret= m_pending_callbacks[0].first;
}
} }
lk.unlock(); lk.unlock();
...@@ -371,6 +377,7 @@ void group_commit_lock::release(value_type num) ...@@ -371,6 +377,7 @@ void group_commit_lock::release(value_type num)
next= cur->m_next; next= cur->m_next;
cur->m_sema.wake(); cur->m_sema.wake();
} }
return ret;
} }
#ifndef DBUG_OFF #ifndef DBUG_OFF
......
...@@ -39,16 +39,23 @@ It has a state consisting of ...@@ -39,16 +39,23 @@ It has a state consisting of
Operations supported on this semaphore Operations supported on this semaphore
1.acquire(num): 1.acquire(num, callback):
- waits until current value exceeds num, or until lock is granted. - waits until current value exceeds num, or until lock is granted.
if running synchronously (callback is nullptr)
- returns EXPIRED if current_value >= num, - returns EXPIRED if current_value >= num,
or ACQUIRED, if current_value < num and lock is granted. or ACQUIRED, if current_value < num and lock is granted,
or CALLBACK_QUEUED, if callback was not nullptr, and function
would otherwise have to wait
2.release(num) 2.release(num)
- releases lock - releases lock
- sets new current value to max(num,current_value) - sets new current value to max(num,current_value)
- releases some threads waiting in acquire() - releases some threads waiting in acquire()
- executes some callbacks
- might return some lsn, meaning there are some pending
callbacks left, and there is no new group commit lead
(i.e caller must do something to flush those pending callbacks)
3. value() 3. value()
- read current value - read current value
...@@ -82,7 +89,7 @@ class group_commit_lock ...@@ -82,7 +89,7 @@ class group_commit_lock
CALLBACK_QUEUED CALLBACK_QUEUED
}; };
lock_return_code acquire(value_type num, const completion_callback *cb); lock_return_code acquire(value_type num, const completion_callback *cb);
void release(value_type num); value_type release(value_type num);
value_type value() const; value_type value() const;
value_type pending() const; value_type pending() const;
void set_pending(value_type num); void set_pending(value_type num);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment