Commit d145d1b6 authored by Kristian Nielsen's avatar Kristian Nielsen

fix bogus stalls in the lock tree for low concurrency applications

Merge into the MariaDB tree the pull request from Rich Prohaska for
PerconaFT. These changes are needed to get parallel replication to
work with TokuDB. Once the pull request is accepted by Percona and the new upstream version enters MariaDB, this commit can be superseded.

Original commit message from Rich Prohaska:

    1. Fix the release before wait race

    The release before wait race occurs when a lock is released by transaction A after transaction B tried to acquire it but before transaction B has a chance to register it's pending lock request.  There are several ways to fix this problem, but we want to optimize for the common situation of minimal lock conflicts, which is what the lock acquisition algorithm currently does.  Our solution to the release before wait race is for transaction B to retry its lock request after its lock request has been added to the pending lock set.

    2. Fix the retry race

    The retry race occurs in the current lock retry algorithm which assumes that if some transaction is running lock retry, then my transaction does not also need to run it.  There is a chance that some pending lock requests will be skipped, but these lock requests will eventually time out.  For applications with small numbers of concurrent transactions, timeouts will frequently occur, and the application throughput will be very small.

    The solution to the retry race is to use a group retry algorithm.  All threads run through the retry logic.  Sequence numbers are used to group retries into batches such that one transaction can run the retry logic on behalf of several transactions.  This amortizes the retry cost.  The sequence numbers also ensure that when a transaction releases its locks, all of the pending lock requests that it is blocking are retried.

    3. Implement a mechanism to find and kill a pending lock request

    Tags lock requests with a client id, use the client id as a key into the pending lock requests sets to find a lock request, complete the lock request with a lock timeout error.

    Copyright (c) 2016, Rich Prohaska
    All rights reserved.

    Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:

    1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.

    2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.

    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
parent 5fda340d
......@@ -425,6 +425,7 @@ static void print_db_env_struct (void) {
"bool (*set_dir_per_db)(DB_ENV *, bool new_val)",
"bool (*get_dir_per_db)(DB_ENV *)",
"const char *(*get_data_dir)(DB_ENV *env)",
"void (*kill_waiter)(DB_ENV *, void *extra)",
NULL};
sort_and_dump_fields("db_env", true, extra);
......@@ -545,8 +546,8 @@ static void print_db_txn_struct (void) {
"int (*abort_with_progress)(DB_TXN*, TXN_PROGRESS_POLL_FUNCTION, void*)",
"int (*xa_prepare) (DB_TXN*, TOKU_XA_XID *, uint32_t flags)",
"uint64_t (*id64) (DB_TXN*)",
"void (*set_client_id)(DB_TXN *, uint64_t client_id)",
"uint64_t (*get_client_id)(DB_TXN *)",
"void (*set_client_id)(DB_TXN *, uint64_t client_id, void *client_extra)",
"void (*get_client_id)(DB_TXN *, uint64_t *client_id, void **client_extra)",
"bool (*is_prepared)(DB_TXN *)",
"DB_TXN *(*get_child)(DB_TXN *)",
"uint64_t (*get_start_time)(DB_TXN *)",
......
......@@ -269,6 +269,7 @@ static txn_child_manager tcm;
.state = TOKUTXN_LIVE,
.num_pin = 0,
.client_id = 0,
.client_extra = nullptr,
.start_time = time(NULL),
};
......@@ -705,12 +706,14 @@ bool toku_txn_has_spilled_rollback(TOKUTXN txn) {
return txn_has_spilled_rollback_logs(txn);
}
uint64_t toku_txn_get_client_id(TOKUTXN txn) {
return txn->client_id;
void toku_txn_get_client_id(TOKUTXN txn, uint64_t *client_id, void **client_extra) {
*client_id = txn->client_id;
*client_extra = txn->client_extra;
}
void toku_txn_set_client_id(TOKUTXN txn, uint64_t client_id) {
void toku_txn_set_client_id(TOKUTXN txn, uint64_t client_id, void *client_extra) {
txn->client_id = client_id;
txn->client_extra = client_extra;
}
time_t toku_txn_get_start_time(struct tokutxn *txn) {
......
......@@ -193,6 +193,7 @@ struct tokutxn {
uint32_t num_pin; // number of threads (all hot indexes) that want this
// txn to not transition to commit or abort
uint64_t client_id;
void *client_extra;
time_t start_time;
};
typedef struct tokutxn *TOKUTXN;
......@@ -293,8 +294,8 @@ void toku_txn_unpin_live_txn(struct tokutxn *txn);
bool toku_txn_has_spilled_rollback(struct tokutxn *txn);
uint64_t toku_txn_get_client_id(struct tokutxn *txn);
void toku_txn_set_client_id(struct tokutxn *txn, uint64_t client_id);
void toku_txn_get_client_id(struct tokutxn *txn, uint64_t *client_id, void **client_extra);
void toku_txn_set_client_id(struct tokutxn *txn, uint64_t client_id, void *client_extra);
time_t toku_txn_get_start_time(struct tokutxn *txn);
......
......@@ -65,6 +65,7 @@ void lock_request::create(void) {
toku_cond_init(&m_wait_cond, nullptr);
m_start_test_callback = nullptr;
m_start_before_pending_test_callback = nullptr;
m_retry_test_callback = nullptr;
}
......@@ -79,7 +80,7 @@ void lock_request::destroy(void) {
}
// set the lock request parameters. this API allows a lock request to be reused.
void lock_request::set(locktree *lt, TXNID txnid, const DBT *left_key, const DBT *right_key, lock_request::type lock_type, bool big_txn) {
void lock_request::set(locktree *lt, TXNID txnid, const DBT *left_key, const DBT *right_key, lock_request::type lock_type, bool big_txn, void *extra) {
invariant(m_state != state::PENDING);
m_lt = lt;
m_txnid = txnid;
......@@ -91,6 +92,7 @@ void lock_request::set(locktree *lt, TXNID txnid, const DBT *left_key, const DBT
m_state = state::INITIALIZED;
m_info = lt ? lt->get_lock_request_info() : nullptr;
m_big_txn = big_txn;
m_extra = extra;
}
// get rid of any stored left and right key copies and
......@@ -173,6 +175,7 @@ int lock_request::start(void) {
m_state = state::PENDING;
m_start_time = toku_current_time_microsec() / 1000;
m_conflicting_txnid = conflicts.get(0);
if (m_start_before_pending_test_callback) m_start_before_pending_test_callback();
toku_mutex_lock(&m_info->mutex);
insert_into_lock_requests();
if (deadlock_exists(conflicts)) {
......@@ -203,7 +206,18 @@ int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*kil
toku_mutex_lock(&m_info->mutex);
// check again, this time locking out other retry calls
if (m_state == state::PENDING) {
retry();
}
while (m_state == state::PENDING) {
// check if this thread is killed
if (killed_callback && killed_callback()) {
remove_from_lock_requests();
complete(DB_LOCK_NOTGRANTED);
continue;
}
// compute next wait time
uint64_t t_wait;
......@@ -221,7 +235,7 @@ int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*kil
invariant(r == 0 || r == ETIMEDOUT);
t_now = toku_current_time_microsec();
if (m_state == state::PENDING && (t_now >= t_end || (killed_callback && killed_callback()))) {
if (m_state == state::PENDING && t_now >= t_end) {
m_info->counters.timeout_count += 1;
// if we're still pending and we timed out, then remove our
......@@ -274,13 +288,15 @@ TXNID lock_request::get_conflicting_txnid(void) const {
}
int lock_request::retry(void) {
invariant(m_state == state::PENDING);
int r;
invariant(m_state == state::PENDING);
txnid_set conflicts;
conflicts.create();
if (m_type == type::WRITE) {
r = m_lt->acquire_write_lock(m_txnid, m_left_key, m_right_key, nullptr, m_big_txn);
r = m_lt->acquire_write_lock(m_txnid, m_left_key, m_right_key, &conflicts, m_big_txn);
} else {
r = m_lt->acquire_read_lock(m_txnid, m_left_key, m_right_key, nullptr, m_big_txn);
r = m_lt->acquire_read_lock(m_txnid, m_left_key, m_right_key, &conflicts, m_big_txn);
}
// if the acquisition succeeded then remove ourselves from the
......@@ -290,59 +306,77 @@ int lock_request::retry(void) {
complete(r);
if (m_retry_test_callback) m_retry_test_callback(); // test callback
toku_cond_broadcast(&m_wait_cond);
} else {
m_conflicting_txnid = conflicts.get(0);
}
conflicts.destroy();
return r;
}
void lock_request::retry_all_lock_requests(locktree *lt) {
void lock_request::retry_all_lock_requests(locktree *lt, void (*after_retry_all_test_callback)(void)) {
lt_lock_request_info *info = lt->get_lock_request_info();
// if a thread reads this bit to be true, then it should go ahead and
// take the locktree mutex and retry lock requests. we use this bit
// to prevent every single thread from waiting on the locktree mutex
// in order to retry requests, especially when no requests actually exist.
//
// it is important to note that this bit only provides an optimization.
// it is not problematic for it to be true when it should be false,
// but it can be problematic for it to be false when it should be true.
// therefore, the lock request code must ensures that when lock requests
// are added to this locktree, the bit is set.
// see lock_request::insert_into_lock_requests()
if (!info->should_retry_lock_requests) {
info->retry_want++;
// if there are no pending lock requests than there is nothing to do
// the unlocked data race on pending_is_empty is OK since lock requests
// are retried after added to the pending set.
if (info->pending_is_empty)
return;
}
toku_mutex_lock(&info->mutex);
// let other threads know that they need not retry lock requests at this time.
//
// the motivation here is that if a bunch of threads have already released
// their locks in the rangetree, then its probably okay for only one thread
// to iterate over the list of requests and retry them. otherwise, at high
// thread counts and a large number of pending lock requests, you could
// end up wasting a lot of cycles.
info->should_retry_lock_requests = false;
size_t i = 0;
while (i < info->pending_lock_requests.size()) {
lock_request *request;
int r = info->pending_lock_requests.fetch(i, &request);
invariant_zero(r);
// retry the lock request. if it didn't succeed,
// move on to the next lock request. otherwise
// the request is gone from the list so we may
// read the i'th entry for the next one.
r = request->retry();
if (r != 0) {
i++;
// here is the group retry algorithm.
// get the latest retry_want count and use it as the generation number of this retry operation.
// if this retry generation is > the last retry generation, then do the lock retries. otherwise,
// no lock retries are needed.
unsigned long long retry_gen = info->retry_want.load();
if (retry_gen > info->retry_done) {
// retry all of the pending lock requests.
for (size_t i = 0; i < info->pending_lock_requests.size(); ) {
lock_request *request;
int r = info->pending_lock_requests.fetch(i, &request);
invariant_zero(r);
// retry this lock request. if it didn't succeed,
// move on to the next lock request. otherwise
// the request is gone from the list so we may
// read the i'th entry for the next one.
r = request->retry();
if (r != 0) {
i++;
}
}
if (after_retry_all_test_callback) after_retry_all_test_callback();
info->retry_done = retry_gen;
}
// future threads should only retry lock requests if some still exist
info->should_retry_lock_requests = info->pending_lock_requests.size() > 0;
toku_mutex_unlock(&info->mutex);
}
void *lock_request::get_extra(void) const {
return m_extra;
}
void lock_request::kill_waiter(void) {
remove_from_lock_requests();
complete(DB_LOCK_NOTGRANTED);
toku_cond_broadcast(&m_wait_cond);
}
void lock_request::kill_waiter(locktree *lt, void *extra) {
lt_lock_request_info *info = lt->get_lock_request_info();
toku_mutex_lock(&info->mutex);
for (size_t i = 0; i < info->pending_lock_requests.size(); i++) {
lock_request *request;
int r = info->pending_lock_requests.fetch(i, &request);
if (r == 0 && request->get_extra() == extra) {
request->kill_waiter();
break;
}
}
toku_mutex_unlock(&info->mutex);
}
......@@ -364,9 +398,7 @@ void lock_request::insert_into_lock_requests(void) {
invariant(r == DB_NOTFOUND);
r = m_info->pending_lock_requests.insert_at(this, idx);
invariant_zero(r);
// ensure that this bit is true, now that at least one lock request is in the set
m_info->should_retry_lock_requests = true;
m_info->pending_is_empty = false;
}
// remove this lock request from the locktree's set. must hold the mutex.
......@@ -378,6 +410,8 @@ void lock_request::remove_from_lock_requests(void) {
invariant(request == this);
r = m_info->pending_lock_requests.delete_at(idx);
invariant_zero(r);
if (m_info->pending_lock_requests.size() == 0)
m_info->pending_is_empty = true;
}
int lock_request::find_by_txnid(lock_request * const &request, const TXNID &txnid) {
......@@ -395,6 +429,10 @@ void lock_request::set_start_test_callback(void (*f)(void)) {
m_start_test_callback = f;
}
void lock_request::set_start_before_pending_test_callback(void (*f)(void)) {
m_start_before_pending_test_callback = f;
}
void lock_request::set_retry_test_callback(void (*f)(void)) {
m_retry_test_callback = f;
}
......
......@@ -78,7 +78,7 @@ class lock_request {
// effect: Resets the lock request parameters, allowing it to be reused.
// requires: Lock request was already created at some point
void set(locktree *lt, TXNID txnid, const DBT *left_key, const DBT *right_key, type lock_type, bool big_txn);
void set(locktree *lt, TXNID txnid, const DBT *left_key, const DBT *right_key, type lock_type, bool big_txn, void *extra = nullptr);
// effect: Tries to acquire a lock described by this lock request.
// returns: The return code of locktree::acquire_[write,read]_lock()
......@@ -109,12 +109,18 @@ class lock_request {
// effect: Retries all of the lock requests for the given locktree.
// Any lock requests successfully restarted is completed and woken up.
// The rest remain pending.
static void retry_all_lock_requests(locktree *lt);
static void retry_all_lock_requests(locktree *lt, void (*after_retry_test_callback)(void) = nullptr);
void set_start_test_callback(void (*f)(void));
void set_start_before_pending_test_callback(void (*f)(void));
void set_retry_test_callback(void (*f)(void));
private:
void *get_extra(void) const;
void kill_waiter(void);
static void kill_waiter(locktree *lt, void *extra);
private:
enum state {
UNINITIALIZED,
INITIALIZED,
......@@ -152,6 +158,8 @@ class lock_request {
// locktree that this lock request is for.
struct lt_lock_request_info *m_info;
void *m_extra;
// effect: tries again to acquire the lock described by this lock request
// returns: 0 if retrying the request succeeded and is now complete
int retry(void);
......@@ -187,6 +195,7 @@ class lock_request {
static int find_by_txnid(lock_request * const &request, const TXNID &txnid);
void (*m_start_test_callback)(void);
void (*m_start_before_pending_test_callback)(void);
void (*m_retry_test_callback)(void);
friend class lock_request_unit_test;
......
......@@ -81,20 +81,14 @@ void locktree::create(locktree_manager *mgr, DICTIONARY_ID dict_id, const compar
m_sto_end_early_time = 0;
m_lock_request_info.pending_lock_requests.create();
m_lock_request_info.pending_is_empty = true;
ZERO_STRUCT(m_lock_request_info.mutex);
toku_mutex_init(&m_lock_request_info.mutex, nullptr);
m_lock_request_info.should_retry_lock_requests = false;
m_lock_request_info.retry_want = m_lock_request_info.retry_done = 0;
ZERO_STRUCT(m_lock_request_info.counters);
// Threads read the should retry bit without a lock
// for performance. It's ok to read the wrong value.
// - If you think you should but you shouldn't, you waste a little time.
// - If you think you shouldn't but you should, then some other thread
// will come around to do the work of retrying requests instead of you.
TOKU_VALGRIND_HG_DISABLE_CHECKING(
&m_lock_request_info.should_retry_lock_requests,
sizeof(m_lock_request_info.should_retry_lock_requests));
TOKU_DRD_IGNORE_VAR(m_lock_request_info.should_retry_lock_requests);
TOKU_VALGRIND_HG_DISABLE_CHECKING(&m_lock_request_info.pending_is_empty, sizeof(m_lock_request_info.pending_is_empty));
TOKU_DRD_IGNORE_VAR(m_lock_request_info.pending_is_empty);
}
void locktree::destroy(void) {
......
......@@ -38,6 +38,8 @@ Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
#pragma once
#include <atomic>
#include <db.h>
#include <toku_time.h>
#include <toku_pthread.h>
......@@ -80,9 +82,11 @@ namespace toku {
// Lock request state for some locktree
struct lt_lock_request_info {
omt<lock_request *> pending_lock_requests;
std::atomic_bool pending_is_empty;
toku_mutex_t mutex;
bool should_retry_lock_requests;
lt_counters counters;
std::atomic_ullong retry_want;
unsigned long long retry_done;
};
// The locktree manager manages a set of locktrees, one for each open dictionary.
......@@ -159,6 +163,8 @@ namespace toku {
// Add time t to the escalator's wait time statistics
void add_escalator_wait_time(uint64_t t);
void kill_waiter(void *extra);
private:
static const uint64_t DEFAULT_MAX_LOCK_MEMORY = 64L * 1024 * 1024;
......
......@@ -483,4 +483,17 @@ void locktree_manager::get_status(LTM_STATUS statp) {
*statp = ltm_status;
}
void locktree_manager::kill_waiter(void *extra) {
mutex_lock();
int r = 0;
size_t num_locktrees = m_locktree_map.size();
for (size_t i = 0; i < num_locktrees; i++) {
locktree *lt;
r = m_locktree_map.fetch(i, &lt);
invariant_zero(r);
lock_request::kill_waiter(lt, extra);
}
mutex_unlock();
}
} /* namespace toku */
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
// test the lock manager kill waiter function
#include "locktree.h"
#include "lock_request.h"
#include "test.h"
#include "locktree_unit_test.h"
#include <thread>
#include <atomic>
namespace toku {
const uint64_t my_lock_wait_time = 1000 * 1000;
const uint64_t my_killed_time = 500 * 1000;
const int n_locks = 4;
static int my_killed_callback(void) {
if (1) fprintf(stderr, "%s:%u %s\n", __FILE__, __LINE__, __FUNCTION__);
return 0;
}
static void locktree_release_lock(locktree *lt, TXNID txn_id, const DBT *left, const DBT *right) {
range_buffer buffer;
buffer.create();
buffer.append(left, right);
lt->release_locks(txn_id, &buffer);
buffer.destroy();
}
static void wait_lock(lock_request *lr, std::atomic_int *done) {
int r = lr->wait(my_lock_wait_time, my_killed_time, my_killed_callback);
assert(r == DB_LOCK_NOTGRANTED);
*done = 1;
}
static void test_kill_waiter(void) {
int r;
locktree_manager mgr;
mgr.create(nullptr, nullptr, nullptr, nullptr);
DICTIONARY_ID dict_id = { 1 };
locktree *lt = mgr.get_lt(dict_id, dbt_comparator, nullptr);
const DBT *one = get_dbt(1);
lock_request locks[n_locks];
std::thread waiters[n_locks-1];
for (int i = 0; i < n_locks; i++) {
locks[i].create();
locks[i].set(lt, i+1, one, one, lock_request::type::WRITE, false, &waiters[i]);
}
// txn 'n_locks' grabs the lock
r = locks[n_locks-1].start();
assert_zero(r);
for (int i = 0; i < n_locks-1; i++) {
r = locks[i].start();
assert(r == DB_LOCK_NOTGRANTED);
}
std::atomic_int done[n_locks-1];
for (int i = 0; i < n_locks-1; i++) {
done[i] = 0;
waiters[i] = std::thread(wait_lock, &locks[i], &done[i]);
}
for (int i = 0; i < n_locks-1; i++) {
assert(!done[i]);
}
sleep(1);
for (int i = 0; i < n_locks-1; i++) {
mgr.kill_waiter(&waiters[i]);
while (!done[i]) sleep(1);
waiters[i].join();
for (int j = i+1; j < n_locks-1; j++)
assert(!done[j]);
}
locktree_release_lock(lt, n_locks, one, one);
for (int i = 0; i < n_locks; i++) {
locks[i].destroy();
}
mgr.release_lt(lt);
mgr.destroy();
}
} /* namespace toku */
int main(void) {
toku::test_kill_waiter();
return 0;
}
......@@ -51,8 +51,9 @@ static uint64_t t_do_kill;
static int my_killed_callback(void) {
uint64_t t_now = toku_current_time_microsec();
if (t_now == t_last_kill)
return 0;
assert(t_now >= t_last_kill);
assert(t_now - t_last_kill >= my_killed_time * 1000 / 2); // div by 2 for valgrind which is not very accurate
t_last_kill = t_now;
killed_calls++;
if (t_now >= t_do_kill)
......
......@@ -52,7 +52,6 @@ static uint64_t t_last_kill;
static int my_killed_callback(void) {
uint64_t t_now = toku_current_time_microsec();
assert(t_now >= t_last_kill);
assert(t_now - t_last_kill >= my_killed_time * 1000 / 2); // div by 2 for valgrind which is not very accurate
t_last_kill = t_now;
killed_calls++;
return 0;
......
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
// test the race between start, release, and wait. since start does not put its
// lock request into the pending set, the blocking txn could release its lock before
// the first txn waits. this will block the first txn because its lock request is
// not known when the lock is released. the bug fix is to try again when lock retries
// are locked out.
#include "locktree.h"
#include "lock_request.h"
#include "test.h"
#include "locktree_unit_test.h"
#include <thread>
#include <atomic>
namespace toku {
const uint64_t my_lock_wait_time = 1000 * 1000; // ms
const uint64_t my_killed_time = 1 * 1000; // ms
static uint64_t t_wait;
static int my_killed_callback(void) {
uint64_t t_now = toku_current_time_microsec();
assert(t_now >= t_wait);
if (t_now - t_wait >= my_killed_time*1000)
abort();
return 0;
}
static void locktree_release_lock(locktree *lt, TXNID txn_id, const DBT *left, const DBT *right) {
range_buffer buffer;
buffer.create();
buffer.append(left, right);
lt->release_locks(txn_id, &buffer);
buffer.destroy();
}
static void test_start_release_wait(void) {
int r;
locktree_manager mgr;
mgr.create(nullptr, nullptr, nullptr, nullptr);
DICTIONARY_ID dict_id = { 1 };
locktree *lt = mgr.get_lt(dict_id, dbt_comparator, nullptr);
const DBT *one = get_dbt(1);
// a locks one
lock_request a;
a.create();
a.set(lt, 1, one, one, lock_request::type::WRITE, false);
r = a.start();
assert(r == 0);
// b tries to lock one, fails
lock_request b;
b.create();
b.set(lt, 2, one, one, lock_request::type::WRITE, false);
r = b.start();
assert(r == DB_LOCK_NOTGRANTED);
// a releases its lock
locktree_release_lock(lt, 1, one, one);
// b waits for one, gets locks immediately
t_wait = toku_current_time_microsec();
r = b.wait(my_lock_wait_time, my_killed_time, my_killed_callback);
assert(r == 0);
// b releases its lock so we can exit cleanly
locktree_release_lock(lt, 2, one, one);
a.destroy();
b.destroy();
mgr.release_lt(lt);
mgr.destroy();
}
} /* namespace toku */
int main(void) {
toku::test_start_release_wait();
return 0;
}
......@@ -37,6 +37,7 @@ Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
#include <iostream>
#include <thread>
#include "test.h"
#include "locktree.h"
#include "lock_request.h"
......@@ -47,15 +48,6 @@ Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
namespace toku {
struct locker_arg {
locktree *_lt;
TXNID _id;
const DBT *_key;
locker_arg(locktree *lt, TXNID id, const DBT *key) : _lt(lt), _id(id), _key(key) {
}
};
static void locker_callback(void) {
usleep(10000);
}
......@@ -97,20 +89,13 @@ static void run_locker(locktree *lt, TXNID txnid, const DBT *key) {
toku_pthread_yield();
if ((i % 10) == 0)
std::cout << toku_pthread_self() << " " << i << std::endl;
std::cout << std::this_thread::get_id() << " " << i << std::endl;
}
}
static void *locker(void *v_arg) {
locker_arg *arg = static_cast<locker_arg *>(v_arg);
run_locker(arg->_lt, arg->_id, arg->_key);
return arg;
}
} /* namespace toku */
int main(void) {
int r;
toku::locktree lt;
DICTIONARY_ID dict_id = { 1 };
......@@ -119,18 +104,12 @@ int main(void) {
const DBT *one = toku::get_dbt(1);
const int n_workers = 2;
toku_pthread_t ids[n_workers];
std::thread worker[n_workers];
for (int i = 0; i < n_workers; i++) {
toku::locker_arg *arg = new toku::locker_arg(&lt, i, one);
r = toku_pthread_create(&ids[i], nullptr, toku::locker, arg);
assert_zero(r);
worker[i] = std::thread(toku::run_locker, &lt, i, one);
}
for (int i = 0; i < n_workers; i++) {
void *ret;
r = toku_pthread_join(ids[i], &ret);
assert_zero(r);
toku::locker_arg *arg = static_cast<toku::locker_arg *>(ret);
delete arg;
worker[i].join();
}
lt.release_reference();
......
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id$"
/*======
This file is part of PerconaFT.
Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
PerconaFT is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License, version 2,
as published by the Free Software Foundation.
PerconaFT is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
----------------------------------------
PerconaFT is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License, version 3,
as published by the Free Software Foundation.
PerconaFT is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
======= */
#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
#include <iostream>
#include <thread>
#include <pthread.h>
#include "test.h"
#include "locktree.h"
#include "lock_request.h"
// Suppose that 3 threads are running a lock acquire, release, retry sequence. There is
// a race in the retry algorithm with 2 threads running lock retry simultaneously. The
// first thread to run retry sets a flag that will cause the second thread to skip the
// lock retries. If the first thread progressed past the contended lock, then the second
// threa will HANG until its lock timer pops, even when the contended lock is no longer held.
// This test exposes this problem as a test hang. The group retry algorithm fixes the race
// in the lock request retry algorihm and this test should no longer hang.
namespace toku {
// use 1000 when after_retry_all is implemented, otherwise use 100000
static const int n_tests = 1000; // 100000;
static void after_retry_all(void) {
usleep(10000);
}
static void run_locker(locktree *lt, TXNID txnid, const DBT *key, pthread_barrier_t *b) {
for (int i = 0; i < n_tests; i++) {
int r;
r = pthread_barrier_wait(b); assert(r == 0 || r == PTHREAD_BARRIER_SERIAL_THREAD);
lock_request request;
request.create();
request.set(lt, txnid, key, key, lock_request::type::WRITE, false);
// try to acquire the lock
r = request.start();
if (r == DB_LOCK_NOTGRANTED) {
// wait for the lock to be granted
r = request.wait(1000 * 1000);
}
if (r == 0) {
// release the lock
range_buffer buffer;
buffer.create();
buffer.append(key, key);
lt->release_locks(txnid, &buffer);
buffer.destroy();
// retry pending lock requests
lock_request::retry_all_lock_requests(lt, after_retry_all);
}
request.destroy();
memset(&request, 0xab, sizeof request);
toku_pthread_yield();
if ((i % 10) == 0)
std::cout << std::this_thread::get_id() << " " << i << std::endl;
}
}
} /* namespace toku */
int main(void) {
toku::locktree lt;
DICTIONARY_ID dict_id = { 1 };
lt.create(nullptr, dict_id, toku::dbt_comparator);
const DBT *one = toku::get_dbt(1);
const int n_workers = 3;
std::thread worker[n_workers];
pthread_barrier_t b;
int r = pthread_barrier_init(&b, nullptr, n_workers); assert(r == 0);
for (int i = 0; i < n_workers; i++) {
worker[i] = std::thread(toku::run_locker, &lt, i, one, &b);
}
for (int i = 0; i < n_workers; i++) {
worker[i].join();
}
r = pthread_barrier_destroy(&b); assert(r == 0);
lt.release_reference();
lt.destroy();
return 0;
}
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id$"
/*======
This file is part of PerconaFT.
Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
PerconaFT is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License, version 2,
as published by the Free Software Foundation.
PerconaFT is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
----------------------------------------
PerconaFT is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License, version 3,
as published by the Free Software Foundation.
PerconaFT is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
======= */
#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
#include <iostream>
#include <thread>
#include <pthread.h>
#include "test.h"
#include "locktree.h"
#include "lock_request.h"
// Suppose that 2 threads are running a lock acquire, release, retry sequence. There is a
// race between the acquire and the release with 2 threads. If thread 1 acquires a lock,
// and thread 2 tries to acquire the same lock and fails, thread 1 may release its lock and retry
// pending lock requests BEFORE thread 2 adds itself to the pending lock requests. If this
// happens, then thread 2 will HANG until its lock timer expires even when the lock it is
// waiting for is FREE.
// This test exposes this problem as a test hang. If the race is fixed, then the test runs to
// completion.
namespace toku {
static void start_before_pending(void) {
usleep(10000);
}
static void run_locker(locktree *lt, TXNID txnid, const DBT *key, pthread_barrier_t *b) {
for (int i = 0; i < 100000; i++) {
int r;
r = pthread_barrier_wait(b); assert(r == 0 || r == PTHREAD_BARRIER_SERIAL_THREAD);
lock_request request;
request.create();
request.set(lt, txnid, key, key, lock_request::type::WRITE, false);
// if the callback is included, then the race is easy to reproduce. Otherwise, several
// test runs may be required before the race happens.
if (1) request.set_start_before_pending_test_callback(start_before_pending);
// try to acquire the lock
r = request.start();
if (r == DB_LOCK_NOTGRANTED) {
// wait for the lock to be granted
r = request.wait(1000 * 1000);
}
if (r == 0) {
// release the lock
range_buffer buffer;
buffer.create();
buffer.append(key, key);
lt->release_locks(txnid, &buffer);
buffer.destroy();
// retry pending lock requests
lock_request::retry_all_lock_requests(lt);
}
request.destroy();
memset(&request, 0xab, sizeof request);
toku_pthread_yield();
if ((i % 10) == 0)
std::cout << std::this_thread::get_id() << " " << i << std::endl;
}
}
} /* namespace toku */
int main(void) {
toku::locktree lt;
DICTIONARY_ID dict_id = { 1 };
lt.create(nullptr, dict_id, toku::dbt_comparator);
const DBT *one = toku::get_dbt(1);
const int n_workers = 2;
std::thread worker[n_workers];
pthread_barrier_t b;
int r = pthread_barrier_init(&b, nullptr, n_workers); assert(r == 0);
for (int i = 0; i < n_workers; i++) {
worker[i] = std::thread(toku::run_locker, &lt, i, one, &b);
}
for (int i = 0; i < n_workers; i++) {
worker[i].join();
}
r = pthread_barrier_destroy(&b); assert(r == 0);
lt.release_reference();
lt.destroy();
return 0;
}
......@@ -55,7 +55,8 @@ static int iterate_callback(DB_TXN *txn,
iterate_row_locks_callback iterate_locks,
void *locks_extra, void *extra) {
uint64_t txnid = txn->id64(txn);
uint64_t client_id = txn->get_client_id(txn);
uint64_t client_id; void *client_extra;
txn->get_client_id(txn, &client_id, &client_extra);
iterate_extra *info = reinterpret_cast<iterate_extra *>(extra);
DB *db;
DBT left_key, right_key;
......@@ -93,13 +94,13 @@ int test_main(int UU(argc), char *const UU(argv[])) {
r = env->open(env, TOKU_TEST_FILENAME, env_flags, 0755); CKERR(r);
r = env->txn_begin(env, NULL, &txn1, 0); CKERR(r);
txn1->set_client_id(txn1, 0);
txn1->set_client_id(txn1, 0, NULL);
txnid1 = txn1->id64(txn1);
r = env->txn_begin(env, NULL, &txn2, 0); CKERR(r);
txn2->set_client_id(txn2, 1);
txn2->set_client_id(txn2, 1, NULL);
txnid2 = txn2->id64(txn2);
r = env->txn_begin(env, NULL, &txn3, 0); CKERR(r);
txn3->set_client_id(txn3, 2);
txn3->set_client_id(txn3, 2, NULL);
txnid3 = txn3->id64(txn3);
{
......
......@@ -93,7 +93,8 @@ static int iterate_txns(DB_TXN *txn,
iterate_row_locks_callback iterate_locks,
void *locks_extra, void *extra) {
uint64_t txnid = txn->id64(txn);
uint64_t client_id = txn->get_client_id(txn);
uint64_t client_id; void *client_extra;
txn->get_client_id(txn, &client_id, &client_extra);
invariant_null(extra);
invariant(txnid > 0);
invariant(client_id == 0);
......
......@@ -2620,6 +2620,10 @@ static void env_set_killed_callback(DB_ENV *env, uint64_t default_killed_time_ms
env->i->killed_callback = killed_callback;
}
static void env_kill_waiter(DB_ENV *env, void *extra) {
env->i->ltm.kill_waiter(extra);
}
static void env_do_backtrace(DB_ENV *env) {
if (env->i->errcall) {
db_env_do_backtrace_errfunc((toku_env_err_func) toku_env_err, (const void *) env);
......@@ -2719,6 +2723,7 @@ toku_env_create(DB_ENV ** envp, uint32_t flags) {
USENV(set_dir_per_db);
USENV(get_dir_per_db);
USENV(get_data_dir);
USENV(kill_waiter);
#undef USENV
// unlocked methods
......
......@@ -323,12 +323,12 @@ int locked_txn_abort(DB_TXN *txn) {
return r;
}
static void locked_txn_set_client_id(DB_TXN *txn, uint64_t client_id) {
toku_txn_set_client_id(db_txn_struct_i(txn)->tokutxn, client_id);
static void locked_txn_set_client_id(DB_TXN *txn, uint64_t client_id, void *client_extra) {
toku_txn_set_client_id(db_txn_struct_i(txn)->tokutxn, client_id, client_extra);
}
static uint64_t locked_txn_get_client_id(DB_TXN *txn) {
return toku_txn_get_client_id(db_txn_struct_i(txn)->tokutxn);
static void locked_txn_get_client_id(DB_TXN *txn, uint64_t *client_id, void **client_extra) {
toku_txn_get_client_id(db_txn_struct_i(txn)->tokutxn, client_id, client_extra);
}
static int toku_txn_discard(DB_TXN *txn, uint32_t flags) {
......
......@@ -75,7 +75,9 @@ int trx_callback(
void *extra) {
uint64_t txn_id = txn->id64(txn);
uint64_t client_id = txn->get_client_id(txn);
uint64_t client_id;
void *client_extra;
txn->get_client_id(txn, &client_id, &client_extra);
uint64_t start_time = txn->get_start_time(txn);
trx_extra_t* e = reinterpret_cast<struct trx_extra_t*>(extra);
THD* thd = e->thd;
......@@ -314,7 +316,9 @@ int locks_callback(
void* extra) {
uint64_t txn_id = txn->id64(txn);
uint64_t client_id = txn->get_client_id(txn);
uint64_t client_id;
void *client_extra;
txn->get_client_id(txn, &client_id, &client_extra);
locks_extra_t* e = reinterpret_cast<struct locks_extra_t*>(extra);
THD* thd = e->thd;
TABLE* table = e->table;
......
......@@ -116,7 +116,7 @@ inline int txn_begin(
int r = env->txn_begin(env, parent, txn, flags);
if (r == 0 && thd) {
DB_TXN* this_txn = *txn;
this_txn->set_client_id(this_txn, thd_get_thread_id(thd));
this_txn->set_client_id(this_txn, thd_get_thread_id(thd), thd);
}
TOKUDB_TRACE_FOR_FLAGS(
TOKUDB_DEBUG_TXN,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment