Commit 5943bb41 authored by John Esmet's avatar John Esmet Committed by Yoni Fogel

fixes #5771 merge the single txnid optimization to main. single threaded write...

fixes #5771 merge the single txnid optimization to main. single threaded write performance is up 20-50% in mysql and multithreaded performance is largely unchanged.


git-svn-id: file:///svn/toku/tokudb@51108 c7de825b-a66e-492c-adef-691d508d4ae1
parent f50474f9
...@@ -29,6 +29,7 @@ void concurrent_tree::locked_keyrange::prepare(concurrent_tree *tree) { ...@@ -29,6 +29,7 @@ void concurrent_tree::locked_keyrange::prepare(concurrent_tree *tree) {
treenode *const root = &tree->m_root; treenode *const root = &tree->m_root;
m_tree = tree; m_tree = tree;
m_subtree = root; m_subtree = root;
m_range = keyrange::get_infinite_range();
root->mutex_lock(); root->mutex_lock();
} }
...@@ -83,12 +84,6 @@ void concurrent_tree::locked_keyrange::remove(const keyrange &range) { ...@@ -83,12 +84,6 @@ void concurrent_tree::locked_keyrange::remove(const keyrange &range) {
} }
} }
uint64_t concurrent_tree::locked_keyrange::remove_all(uint64_t *mem_released) { void concurrent_tree::locked_keyrange::remove_all(void) {
// in practice, remove_all is only okay on the root node, because you m_subtree->recursive_remove();
// want to remove all of the elements in the tree, not some subtree.
//
// so we lazily enforce that you are removing from a non-empty root.
invariant(m_subtree->is_root());
invariant(!m_subtree->is_empty());
return m_subtree->recursive_remove(mem_released);
} }
...@@ -36,11 +36,14 @@ class concurrent_tree { ...@@ -36,11 +36,14 @@ class concurrent_tree {
// effect: prepare to acquire a locked keyrange over the given // effect: prepare to acquire a locked keyrange over the given
// concurrent_tree, preventing other threads from preparing // concurrent_tree, preventing other threads from preparing
// until this thread either does acquire() or release(). // until this thread either does acquire() or release().
// rationale: this provides the user with a serialization point // note: operations performed on a prepared keyrange are equivalent
// for descending / modifying the the tree. // to ones performed on an acquired keyrange over -inf, +inf.
// rationale: this provides the user with a serialization point for descending
// or modifying the the tree. it also proives a convenient way of
// doing serializable operations on the tree.
// There are two valid sequences of calls: // There are two valid sequences of calls:
// - prepare, acquire, release // - prepare, acquire, [operations], release
// - prepare, release // - prepare, [operations],release
void prepare(concurrent_tree *tree); void prepare(concurrent_tree *tree);
// requires: the locked keyrange was prepare()'d // requires: the locked keyrange was prepare()'d
...@@ -68,12 +71,9 @@ class concurrent_tree { ...@@ -68,12 +71,9 @@ class concurrent_tree {
// rationale: caller is responsible for only removing existing ranges // rationale: caller is responsible for only removing existing ranges
void remove(const keyrange &range); void remove(const keyrange &range);
// effect: removes everything within this locked keyrange // effect: removes all of the keys represented by this locked keyrange
// requires: the locked keyrange is over -inf, +inf // rationale: we'd like a fast way to empty out a tree
// returns: the number of nodes removed void remove_all(void);
// returns: *mem_released updated to the cumulative size of all keyranges destroyed
// rationale: we'd like a fast O(n) way of removing everything from the tree
uint64_t remove_all(uint64_t *mem_released);
private: private:
// the concurrent tree this locked keyrange is for // the concurrent tree this locked keyrange is for
......
...@@ -11,12 +11,14 @@ ...@@ -11,12 +11,14 @@
#include <portability/toku_time.h> #include <portability/toku_time.h>
#include "locktree.h" #include "locktree.h"
#include "range_buffer.h"
// including the concurrent_tree here expands the templates // including the concurrent_tree here expands the templates
// and "defines" the implementation, so we do it here in // and "defines" the implementation, so we do it here in
// the locktree source file instead of the header. // the locktree source file instead of the header.
#include "concurrent_tree.h" #include "concurrent_tree.h"
namespace toku { namespace toku {
// A locktree represents the set of row locks owned by all transactions // A locktree represents the set of row locks owned by all transactions
...@@ -41,7 +43,10 @@ void locktree::create(manager::memory_tracker *mem_tracker, DICTIONARY_ID dict_i ...@@ -41,7 +43,10 @@ void locktree::create(manager::memory_tracker *mem_tracker, DICTIONARY_ID dict_i
m_userdata = nullptr; m_userdata = nullptr;
XCALLOC(m_rangetree); XCALLOC(m_rangetree);
m_rangetree->create(m_cmp); m_rangetree->create(m_cmp);
reset_single_txnid_optimization(TXNID_NONE);
m_sto_txnid = TXNID_NONE;
m_sto_buffer.create();
m_sto_score = STO_SCORE_THRESHOLD;
m_lock_request_info.pending_lock_requests.create(); m_lock_request_info.pending_lock_requests.create();
m_lock_request_info.mutex = TOKU_MUTEX_INITIALIZER; m_lock_request_info.mutex = TOKU_MUTEX_INITIALIZER;
...@@ -63,6 +68,7 @@ void locktree::destroy(void) { ...@@ -63,6 +68,7 @@ void locktree::destroy(void) {
m_rangetree->destroy(); m_rangetree->destroy();
toku_free(m_cmp); toku_free(m_cmp);
toku_free(m_rangetree); toku_free(m_rangetree);
m_sto_buffer.destroy();
m_lock_request_info.pending_lock_requests.destroy(); m_lock_request_info.pending_lock_requests.destroy();
} }
...@@ -78,7 +84,7 @@ struct row_lock { ...@@ -78,7 +84,7 @@ struct row_lock {
// caller does not own the range inside the returned row locks, // caller does not own the range inside the returned row locks,
// so remove from the tree with care using them as keys. // so remove from the tree with care using them as keys.
static void iterate_and_get_overlapping_row_locks( static void iterate_and_get_overlapping_row_locks(
const concurrent_tree::locked_keyrange &lkr, const concurrent_tree::locked_keyrange *lkr,
GrowableArray<row_lock> *row_locks) { GrowableArray<row_lock> *row_locks) {
struct copy_fn_obj { struct copy_fn_obj {
GrowableArray<row_lock> *row_locks; GrowableArray<row_lock> *row_locks;
...@@ -89,7 +95,7 @@ static void iterate_and_get_overlapping_row_locks( ...@@ -89,7 +95,7 @@ static void iterate_and_get_overlapping_row_locks(
} }
} copy_fn; } copy_fn;
copy_fn.row_locks = row_locks; copy_fn.row_locks = row_locks;
lkr.iterate(&copy_fn); lkr->iterate(&copy_fn);
} }
// given a txnid and a set of overlapping row locks, determine // given a txnid and a set of overlapping row locks, determine
...@@ -112,75 +118,140 @@ static bool determine_conflicting_txnids(const GrowableArray<row_lock> &row_lock ...@@ -112,75 +118,140 @@ static bool determine_conflicting_txnids(const GrowableArray<row_lock> &row_lock
return conflicts_exist; return conflicts_exist;
} }
// given a row lock, what is the effective memory size? // how much memory does a row lock take up in a concurrent tree?
// that is, how much memory does it take when stored in a tree? static uint64_t row_lock_size_in_tree(const row_lock &lock) {
static uint64_t effective_row_lock_memory_size(const row_lock &lock) {
const uint64_t overhead = concurrent_tree::get_insertion_memory_overhead(); const uint64_t overhead = concurrent_tree::get_insertion_memory_overhead();
return lock.range.get_memory_size() + overhead; return lock.range.get_memory_size() + overhead;
} }
// remove all row locks from the given lkr, then notify the memory tracker.
static void remove_all_row_locks(concurrent_tree::locked_keyrange *lkr,
locktree::manager::memory_tracker *mem_tracker) {
uint64_t mem_released = 0;
uint64_t num_removed = lkr->remove_all(&mem_released);
mem_released += num_removed * concurrent_tree::get_insertion_memory_overhead();
mem_tracker->note_mem_released(mem_released);
}
// remove and destroy the given row lock from the locked keyrange, // remove and destroy the given row lock from the locked keyrange,
// then notify the memory tracker of the newly freed lock. // then notify the memory tracker of the newly freed lock.
static void remove_row_lock(concurrent_tree::locked_keyrange *lkr, static void remove_row_lock_from_tree(concurrent_tree::locked_keyrange *lkr,
const row_lock &lock, locktree::manager::memory_tracker *mem_tracker) { const row_lock &lock, locktree::manager::memory_tracker *mem_tracker) {
const uint64_t mem_released = effective_row_lock_memory_size(lock); const uint64_t mem_released = row_lock_size_in_tree(lock);
lkr->remove(lock.range); lkr->remove(lock.range);
mem_tracker->note_mem_released(mem_released); mem_tracker->note_mem_released(mem_released);
} }
// insert a row lock into the locked keyrange, then notify // insert a row lock into the locked keyrange, then notify
// the memory tracker of this newly acquired lock. // the memory tracker of this newly acquired lock.
static void insert_row_lock(concurrent_tree::locked_keyrange *lkr, static void insert_row_lock_into_tree(concurrent_tree::locked_keyrange *lkr,
const row_lock &lock, locktree::manager::memory_tracker *mem_tracker) { const row_lock &lock, locktree::manager::memory_tracker *mem_tracker) {
uint64_t mem_used = effective_row_lock_memory_size(lock); uint64_t mem_used = row_lock_size_in_tree(lock);
lkr->insert(lock.range, lock.txnid); lkr->insert(lock.range, lock.txnid);
mem_tracker->note_mem_used(mem_used); mem_tracker->note_mem_used(mem_used);
} }
void locktree::update_single_txnid_optimization(TXNID txnid) { void locktree::sto_begin(TXNID txnid) {
if (m_rangetree->is_empty()) { invariant(m_sto_txnid == TXNID_NONE);
// the optimization becomes possible for this txnid if the invariant(m_sto_buffer.is_empty());
// tree was previously empy before we touched it. the idea m_sto_txnid = txnid;
// here is that if we are still the only one to have touched }
// it by the time we commit, the optimization holds.
reset_single_txnid_optimization(txnid); void locktree::sto_append(const DBT *left_key, const DBT *right_key) {
} else { uint64_t buffer_mem, delta;
// the tree is not empty, so some txnid must have touched it. keyrange range;
invariant(m_single_txnid != TXNID_NONE); range.create(left_key, right_key);
// the optimization is not possible if the txnid has changed
if (m_single_txnid_optimization_possible && m_single_txnid != txnid) { buffer_mem = m_sto_buffer.get_num_bytes();
m_single_txnid_optimization_possible = false; m_sto_buffer.append(left_key, right_key);
delta = m_sto_buffer.get_num_bytes() - buffer_mem;
m_mem_tracker->note_mem_used(delta);
}
void locktree::sto_end(void) {
uint64_t num_bytes = m_sto_buffer.get_num_bytes();
m_mem_tracker->note_mem_released(num_bytes);
m_sto_buffer.destroy();
m_sto_buffer.create();
m_sto_txnid = TXNID_NONE;
}
void locktree::sto_end_early(void *prepared_lkr) {
sto_migrate_buffer_ranges_to_tree(prepared_lkr);
sto_end();
m_sto_score = 0;
}
void locktree::sto_migrate_buffer_ranges_to_tree(void *prepared_lkr) {
// There should be something to migrate, and nothing in the rangetree.
invariant(!m_sto_buffer.is_empty());
invariant(m_rangetree->is_empty());
concurrent_tree sto_rangetree;
concurrent_tree::locked_keyrange sto_lkr;
sto_rangetree.create(m_cmp);
// insert all of the ranges from the single txnid buffer into a new rangtree
range_buffer::iterator iter;
range_buffer::iterator::record rec;
iter.create(&m_sto_buffer);
while (iter.current(&rec)) {
sto_lkr.prepare(&sto_rangetree);
int r = acquire_lock_consolidated(&sto_lkr,
m_sto_txnid, rec.get_left_key(), rec.get_right_key(), nullptr);
invariant_zero(r);
sto_lkr.release();
iter.next();
}
// Iterate the newly created rangetree and insert each range into the
// locktree's rangetree, on behalf of the old single txnid.
struct migrate_fn_obj {
concurrent_tree::locked_keyrange *dst_lkr;
bool fn(const keyrange &range, TXNID txnid) {
dst_lkr->insert(range, txnid);
return true;
}
} migrate_fn;
migrate_fn.dst_lkr = static_cast<concurrent_tree::locked_keyrange *>(prepared_lkr);
sto_lkr.prepare(&sto_rangetree);
sto_lkr.iterate(&migrate_fn);
sto_lkr.remove_all();
sto_lkr.release();
sto_rangetree.destroy();
invariant(!m_rangetree->is_empty());
}
bool locktree::sto_try_acquire(void *prepared_lkr, TXNID txnid,
const DBT *left_key, const DBT *right_key) {
if (m_rangetree->is_empty() && m_sto_buffer.is_empty() && m_sto_score >= STO_SCORE_THRESHOLD) {
// We can do the optimization because the rangetree is empty, and
// we know its worth trying because the sto score is big enough.
sto_begin(txnid);
} else if (m_sto_txnid != TXNID_NONE) {
// We are currently doing the optimization. Check if we need to cancel
// it because a new txnid appeared, or if the current single txnid has
// taken too many locks already.
if (m_sto_txnid != txnid || m_sto_buffer.get_num_ranges() > STO_BUFFER_MAX_SIZE) {
sto_end_early(prepared_lkr);
} }
} }
// At this point the sto txnid is properly set. If it is valid, then
// this txnid can append its lock to the sto buffer successfully.
if (m_sto_txnid != TXNID_NONE) {
invariant(m_sto_txnid == txnid);
sto_append(left_key, right_key);
return true;
} else {
invariant(m_sto_buffer.is_empty());
return false;
}
} }
// acquire a lock in the given key range, inclusive. if successful, // try to acquire a lock and consolidate it with existing locks if possible
// return 0. otherwise, populate the conflicts txnid_set with the set of // param: lkr, a prepared locked keyrange
// transactions that conflict with this request. // return: 0 on success, DB_LOCK_NOTGRANTED if conflicting locks exist.
int locktree::acquire_lock(bool is_write_request, TXNID txnid, int locktree::acquire_lock_consolidated(void *prepared_lkr, TXNID txnid,
const DBT *left_key, const DBT *right_key, txnid_set *conflicts) { const DBT *left_key, const DBT *right_key, txnid_set *conflicts) {
int r = 0;
concurrent_tree::locked_keyrange *lkr;
keyrange requested_range; keyrange requested_range;
requested_range.create(left_key, right_key); requested_range.create(left_key, right_key);
lkr = static_cast<concurrent_tree::locked_keyrange *>(prepared_lkr);
// we are only supporting write locks for simplicity lkr->acquire(requested_range);
invariant(is_write_request);
// acquire and prepare a locked keyrange over the requested range.
// prepare is a serialzation point, so we take the opportunity to
// update the single txnid optimization bits.
concurrent_tree::locked_keyrange lkr;
lkr.prepare(m_rangetree);
update_single_txnid_optimization(txnid);
lkr.acquire(requested_range);
// copy out the set of overlapping row locks. // copy out the set of overlapping row locks.
GrowableArray<row_lock> overlapping_row_locks; GrowableArray<row_lock> overlapping_row_locks;
...@@ -189,7 +260,8 @@ int locktree::acquire_lock(bool is_write_request, TXNID txnid, ...@@ -189,7 +260,8 @@ int locktree::acquire_lock(bool is_write_request, TXNID txnid,
size_t num_overlapping_row_locks = overlapping_row_locks.get_size(); size_t num_overlapping_row_locks = overlapping_row_locks.get_size();
// if any overlapping row locks conflict with this request, bail out. // if any overlapping row locks conflict with this request, bail out.
bool conflicts_exist = determine_conflicting_txnids(overlapping_row_locks, txnid, conflicts); bool conflicts_exist = determine_conflicting_txnids(
overlapping_row_locks, txnid, conflicts);
if (!conflicts_exist) { if (!conflicts_exist) {
// there are no conflicts, so all of the overlaps are for the requesting txnid. // there are no conflicts, so all of the overlaps are for the requesting txnid.
// so, we must consolidate all existing overlapping ranges and the requested // so, we must consolidate all existing overlapping ranges and the requested
...@@ -198,23 +270,43 @@ int locktree::acquire_lock(bool is_write_request, TXNID txnid, ...@@ -198,23 +270,43 @@ int locktree::acquire_lock(bool is_write_request, TXNID txnid,
row_lock overlapping_lock = overlapping_row_locks.fetch_unchecked(i); row_lock overlapping_lock = overlapping_row_locks.fetch_unchecked(i);
invariant(overlapping_lock.txnid == txnid); invariant(overlapping_lock.txnid == txnid);
requested_range.extend(m_cmp, overlapping_lock.range); requested_range.extend(m_cmp, overlapping_lock.range);
remove_row_lock(&lkr, overlapping_lock, m_mem_tracker); remove_row_lock_from_tree(lkr, overlapping_lock, m_mem_tracker);
} }
row_lock new_lock = { .range = requested_range, .txnid = txnid }; row_lock new_lock = { .range = requested_range, .txnid = txnid };
insert_row_lock(&lkr, new_lock, m_mem_tracker); insert_row_lock_into_tree(lkr, new_lock, m_mem_tracker);
} else {
r = DB_LOCK_NOTGRANTED;
} }
lkr.release();
overlapping_row_locks.deinit();
requested_range.destroy(); requested_range.destroy();
overlapping_row_locks.deinit();
return r;
}
// if there were conflicts, the lock is not granted. // acquire a lock in the given key range, inclusive. if successful,
if (conflicts_exist) { // return 0. otherwise, populate the conflicts txnid_set with the set of
return DB_LOCK_NOTGRANTED; // transactions that conflict with this request.
} else { int locktree::acquire_lock(bool is_write_request, TXNID txnid,
return 0; const DBT *left_key, const DBT *right_key, txnid_set *conflicts) {
int r = 0;
// we are only supporting write locks for simplicity
invariant(is_write_request);
// acquire and prepare a locked keyrange over the requested range.
// prepare is a serialzation point, so we take the opportunity to
// try the single txnid optimization first.
concurrent_tree::locked_keyrange lkr;
lkr.prepare(m_rangetree);
bool acquired = sto_try_acquire(&lkr, txnid, left_key, right_key);
if (!acquired) {
r = acquire_lock_consolidated(&lkr, txnid, left_key, right_key, conflicts);
} }
lkr.release();
return r;
} }
int locktree::try_acquire_lock(bool is_write_request, TXNID txnid, int locktree::try_acquire_lock(bool is_write_request, TXNID txnid,
...@@ -252,7 +344,7 @@ void locktree::get_conflicts(bool is_write_request, TXNID txnid, ...@@ -252,7 +344,7 @@ void locktree::get_conflicts(bool is_write_request, TXNID txnid,
// copy out the set of overlapping row locks and determine the conflicts // copy out the set of overlapping row locks and determine the conflicts
GrowableArray<row_lock> overlapping_row_locks; GrowableArray<row_lock> overlapping_row_locks;
overlapping_row_locks.init(); overlapping_row_locks.init();
iterate_and_get_overlapping_row_locks(lkr, &overlapping_row_locks); iterate_and_get_overlapping_row_locks(&lkr, &overlapping_row_locks);
// we don't care if conflicts exist. we just want the conflicts set populated. // we don't care if conflicts exist. we just want the conflicts set populated.
(void) determine_conflicting_txnids(overlapping_row_locks, txnid, conflicts); (void) determine_conflicting_txnids(overlapping_row_locks, txnid, conflicts);
...@@ -300,7 +392,7 @@ void locktree::remove_overlapping_locks_for_txnid(TXNID txnid, ...@@ -300,7 +392,7 @@ void locktree::remove_overlapping_locks_for_txnid(TXNID txnid,
// copy out the set of overlapping row locks. // copy out the set of overlapping row locks.
GrowableArray<row_lock> overlapping_row_locks; GrowableArray<row_lock> overlapping_row_locks;
overlapping_row_locks.init(); overlapping_row_locks.init();
iterate_and_get_overlapping_row_locks(lkr, &overlapping_row_locks); iterate_and_get_overlapping_row_locks(&lkr, &overlapping_row_locks);
size_t num_overlapping_row_locks = overlapping_row_locks.get_size(); size_t num_overlapping_row_locks = overlapping_row_locks.get_size();
for (size_t i = 0; i < num_overlapping_row_locks; i++) { for (size_t i = 0; i < num_overlapping_row_locks; i++) {
...@@ -308,7 +400,7 @@ void locktree::remove_overlapping_locks_for_txnid(TXNID txnid, ...@@ -308,7 +400,7 @@ void locktree::remove_overlapping_locks_for_txnid(TXNID txnid,
// If this isn't our lock, that's ok, just don't remove it. // If this isn't our lock, that's ok, just don't remove it.
// See rationale above. // See rationale above.
if (lock.txnid == txnid) { if (lock.txnid == txnid) {
remove_row_lock(&lkr, lock, m_mem_tracker); remove_row_lock_from_tree(&lkr, lock, m_mem_tracker);
} }
} }
...@@ -317,41 +409,27 @@ void locktree::remove_overlapping_locks_for_txnid(TXNID txnid, ...@@ -317,41 +409,27 @@ void locktree::remove_overlapping_locks_for_txnid(TXNID txnid,
release_range.destroy(); release_range.destroy();
} }
// reset the optimization bit to possible for the given txnid inline bool locktree::sto_txnid_is_valid_unsafe(void) const {
void locktree::reset_single_txnid_optimization(TXNID txnid) { return m_sto_txnid != TXNID_NONE;
m_single_txnid = txnid;
m_single_txnid_optimization_possible = true;
} }
inline bool locktree::unsafe_read_single_txnid_optimization_possible(void) const { inline int locktree::sto_get_score_unsafe(void) const {
return m_single_txnid_optimization_possible; return m_sto_score;
} }
bool locktree::try_single_txnid_release_optimization(TXNID txnid) { bool locktree::sto_try_release(TXNID txnid) {
bool released = false; bool released = false;
if (unsafe_read_single_txnid_optimization_possible()) { if (sto_txnid_is_valid_unsafe()) {
// check the bit again with a prepared locked keyrange, // check the bit again with a prepared locked keyrange,
// which protects the optimization bits and rangetree data // which protects the optimization bits and rangetree data
concurrent_tree::locked_keyrange lkr; concurrent_tree::locked_keyrange lkr;
lkr.prepare(m_rangetree); lkr.prepare(m_rangetree);
if (m_single_txnid_optimization_possible) { if (m_sto_txnid != TXNID_NONE) {
// this txnid better be the single txnid on this locktree, // this txnid better be the single txnid on this locktree,
// or else we are in big trouble (meaning the logic is broken) // or else we are in big trouble (meaning the logic is broken)
invariant(m_single_txnid == txnid); invariant(m_sto_txnid == txnid);
invariant(m_rangetree->is_empty());
// acquire a locked range on -inf, +inf. this is just for sto_end();
// readability's sake, since the prepared lkr already has
// the root locked, but the API says to do this so we do.
keyrange infinite_range = keyrange::get_infinite_range();
lkr.acquire(infinite_range);
// knowing that only our row locks exist in the locktree
// and that we have the entire thing locked, remove everything.
remove_all_row_locks(&lkr, m_mem_tracker);
// reset the optimization back to possible, with no txnid
// we set txnid to TXNID_NONE for invariant purposes.
reset_single_txnid_optimization(TXNID_NONE);
released = true; released = true;
} }
lkr.release(); lkr.release();
...@@ -364,7 +442,7 @@ bool locktree::try_single_txnid_release_optimization(TXNID txnid) { ...@@ -364,7 +442,7 @@ bool locktree::try_single_txnid_release_optimization(TXNID txnid) {
void locktree::release_locks(TXNID txnid, const range_buffer *ranges) { void locktree::release_locks(TXNID txnid, const range_buffer *ranges) {
// try the single txn optimization. if it worked, then all of the // try the single txn optimization. if it worked, then all of the
// locks are already released, otherwise we need to do it here. // locks are already released, otherwise we need to do it here.
bool released = try_single_txnid_release_optimization(txnid); bool released = sto_try_release(txnid);
if (!released) { if (!released) {
range_buffer::iterator iter; range_buffer::iterator iter;
range_buffer::iterator::record rec; range_buffer::iterator::record rec;
...@@ -375,6 +453,13 @@ void locktree::release_locks(TXNID txnid, const range_buffer *ranges) { ...@@ -375,6 +453,13 @@ void locktree::release_locks(TXNID txnid, const range_buffer *ranges) {
remove_overlapping_locks_for_txnid(txnid, left_key, right_key); remove_overlapping_locks_for_txnid(txnid, left_key, right_key);
iter.next(); iter.next();
} }
// Increase the sto score slightly. Eventually it will hit
// the threshold and we'll try the optimization again. This
// is how a previously multithreaded system transitions into
// a single threaded system that benefits from the optimization.
if (sto_get_score_unsafe() < STO_SCORE_THRESHOLD) {
toku_sync_fetch_and_add(&m_sto_score, 1);
}
} }
} }
...@@ -409,12 +494,12 @@ static int extract_first_n_row_locks(concurrent_tree::locked_keyrange *lkr, ...@@ -409,12 +494,12 @@ static int extract_first_n_row_locks(concurrent_tree::locked_keyrange *lkr,
// now that the ranges have been copied out, complete // now that the ranges have been copied out, complete
// the extraction by removing the ranges from the tree. // the extraction by removing the ranges from the tree.
// use remove_row_lock() so we properly track the // use remove_row_lock_from_tree() so we properly track the
// amount of memory and number of locks freed. // amount of memory and number of locks freed.
int num_extracted = extract_fn.num_extracted; int num_extracted = extract_fn.num_extracted;
invariant(num_extracted <= num_to_extract); invariant(num_extracted <= num_to_extract);
for (int i = 0; i < num_extracted; i++) { for (int i = 0; i < num_extracted; i++) {
remove_row_lock(lkr, row_locks[i], mem_tracker); remove_row_lock_from_tree(lkr, row_locks[i], mem_tracker);
} }
return num_extracted; return num_extracted;
...@@ -437,6 +522,13 @@ void locktree::escalate(void) { ...@@ -437,6 +522,13 @@ void locktree::escalate(void) {
lkr.prepare(m_rangetree); lkr.prepare(m_rangetree);
lkr.acquire(infinite_range); lkr.acquire(infinite_range);
// if we're in the single txnid optimization, simply call it off.
// if you have to run escalation, you probably don't care about
// the optimization anyway, and this makes things easier.
if (m_sto_txnid != TXNID_NONE) {
sto_end_early(&lkr);
}
// extract and remove batches of row locks from the locktree // extract and remove batches of row locks from the locktree
int num_extracted; int num_extracted;
static const int num_row_locks_per_batch = 128; static const int num_row_locks_per_batch = 128;
...@@ -487,7 +579,7 @@ void locktree::escalate(void) { ...@@ -487,7 +579,7 @@ void locktree::escalate(void) {
size_t new_num_locks = escalated_locks.get_size(); size_t new_num_locks = escalated_locks.get_size();
for (size_t i = 0; i < new_num_locks; i++) { for (size_t i = 0; i < new_num_locks; i++) {
row_lock lock = escalated_locks.fetch_unchecked(i); row_lock lock = escalated_locks.fetch_unchecked(i);
insert_row_lock(&lkr, lock, m_mem_tracker); insert_row_lock_into_tree(&lkr, lock, m_mem_tracker);
lock.range.destroy(); lock.range.destroy();
} }
......
...@@ -168,6 +168,12 @@ class locktree { ...@@ -168,6 +168,12 @@ class locktree {
}; };
ENSURE_POD(memory_tracker); ENSURE_POD(memory_tracker);
// effect: calls the private function run_escalation(), only ok to
// do for tests.
// rationale: to get better stress test coverage, we want a way to
// deterministicly trigger lock escalation.
void run_escalation_for_test(void);
private: private:
static const uint64_t DEFAULT_MAX_LOCK_MEMORY = 64L * 1024 * 1024; static const uint64_t DEFAULT_MAX_LOCK_MEMORY = 64L * 1024 * 1024;
static const uint64_t DEFAULT_LOCK_WAIT_TIME = 0; static const uint64_t DEFAULT_LOCK_WAIT_TIME = 0;
...@@ -240,54 +246,148 @@ class locktree { ...@@ -240,54 +246,148 @@ class locktree {
struct lt_lock_request_info m_lock_request_info; struct lt_lock_request_info m_lock_request_info;
// the following is an optimization for locktrees that contain // The following fields and members prefixed with "sto_" are for
// locks for only a single txnid. in this case, we can just // the single txnid optimization, intended to speed up the case
// delete everything from the locktree when that txnid unlocks. // when only one transaction is using the locktree. If we know
// the locktree has only one transaction, then acquiring locks
// takes O(1) work and releasing all locks takes O(1) work.
//
// How do we know that the locktree only has a single txnid?
// What do we do if it does?
//
// When a txn with txnid T requests a lock:
// - If the tree is empty, the optimization is possible. Set the single
// txnid to T, and insert the lock range into the buffer.
// - If the tree is not empty, check if the single txnid is T. If so,
// append the lock range to the buffer. Otherwise, migrate all of
// the locks in the buffer into the rangetree on behalf of txnid T,
// and invalid the single txnid.
//
// When a txn with txnid T releases its locks:
// - If the single txnid is valid, it must be for T. Destroy the buffer.
// - If it's not valid, release locks the normal way in the rangetree.
//
// To carry out the optimization we need to record a single txnid
// and a range buffer for each locktree, each protected by the root
// lock of the locktree's rangetree. The root lock for a rangetree
// is grabbed by preparing a locked keyrange on the rangetree.
TXNID m_sto_txnid;
range_buffer m_sto_buffer;
// The single txnid optimization speeds up the case when only one
// transaction is using the locktree. But it has the potential to
// hurt the case when more than one txnid exists.
// //
// how do we know that this locktree only has a single txnid? // There are two things we need to do to make the optimization only
// optimize the case we care about, and not hurt the general case.
// //
// when a txn requests a lock: // Bound the worst-case latency for lock migration when the
// - if the tree is empty, set the single txnid to that txn, set // optimization stops working:
// the optimization to true. // - Idea: Stop the optimization and migrate immediate if we notice
// - if the tree is not empty, then some txnid has inserted into // the single txnid has takes many locks in the range buffer.
// the tree before and its txnid is m_single_txnid. set the bit // - Implementation: Enforce a max size on the single txnid range buffer.
// to false if that txnid is different than the one about to insert. // - Analysis: Choosing the perfect max value, M, is difficult to do
// - if the txnid never changes (ie: only one txnid inserts into // without some feedback from the field. Intuition tells us that M should
// a locktree) then the bit stays true and the optimization happens. // not be so small that the optimization is worthless, and it should not
// be so big that it's unreasonable to have to wait behind a thread doing
// the work of converting M buffer locks into rangetree locks.
// //
// when a txn releases its locks // Prevent concurrent-transaction workloads from trying the optimization
// - check the optimization bit. if it is set, take the locktree's mutex // in vain:
// and then check it agian. if it is still set, then perform the optimizaiton. // - Idea: Don't even bother trying the optimization if we think the
// - if the bit was not set, then carry out release locks as usual. // system is in a concurrent-transaction state.
// - Implementation: Do something even simpler than detecting whether the
// system is in a concurent-transaction state. Just keep a "score" value
// and some threshold. If at any time the locktree is eligible for the
// optimization, only do it if the score is at this threshold. When you
// actually do the optimization but someone has to migrate locks in the buffer
// (expensive), then reset the score back to zero. Each time a txn
// releases locks, the score is incremented by 1.
// - Analysis: If you let the threshold be "C", then at most 1 / C txns will
// do the optimization in a concurrent-transaction system. Similarly, it
// takes at most C txns to start using the single txnid optimzation, which
// is good when the system transitions from multithreaded to single threaded.
// //
// the single txnid and the optimizable possible bit are both protected // STO_BUFFER_MAX_SIZE:
// by the root lock on the concurrent tree. the way this is implemented //
// is by a locked keyrange function called prepare(), which grabs // We choose the max value to be 1 million since most transactions are smaller
// the root lock and returns. once acquire/release() is called, the root // than 1 million and we can create a rangetree of 1 million elements in
// lock is unlocked if necessary. so prepare() acts as a serialization // less than a second. So we can be pretty confident that this threshold
// point where we can safely read and modify these bits. // enables the optimization almost always, and prevents super pathological
TXNID m_single_txnid; // latency issues for the first lock taken by a second thread.
bool m_single_txnid_optimization_possible; //
// STO_SCORE_THRESHOLD:
// effect: If the single txnid is possible, assert that it //
// is for the given txnid and then release all of // A simple first guess at a good value for the score threshold is 100.
// the locks in the locktree. // By our analysis, we'd end up doing the optimization in vain for
// returns: True if locks were released, false otherwise // around 1% of all transactions, which seems reasonable. Further,
bool try_single_txnid_release_optimization(TXNID txnid); // if the system goes single threaded, it ought to be pretty quick
// for 100 transactions to go by, so we won't have to wait long before
// effect: Checks if the single txnid bit is set and, if so, // we start doing the single txind optimzation again.
// sets it to false iff the given txnid differs static const int STO_BUFFER_MAX_SIZE = 1 * 1024 * 1024;
// from the current known single txnid. static const int STO_SCORE_THRESHOLD = 100;
void update_single_txnid_optimization(TXNID txnid); int m_sto_score;
// effect: Sets the single txnid bit to be true for the given txnid // effect: begins the single txnid optimizaiton, setting m_sto_txnid
void reset_single_txnid_optimization(TXNID txnid); // to the given txnid.
// requires: m_sto_txnid is invalid
void sto_begin(TXNID txnid);
// effect: append a range to the sto buffer
// requires: m_sto_txnid is valid
void sto_append(const DBT *left_key, const DBT *right_key);
// effect: ends the single txnid optimization, releaseing any memory
// stored in the sto buffer, notifying the tracker, and
// invalidating m_sto_txnid.
// requires: m_sto_txnid is valid
void sto_end(void);
// params: prepared_lkr is a void * to a prepared locked keyrange. see below.
// effect: ends the single txnid optimization early, migrating buffer locks
// into the rangetree, calling sto_end(), and then setting the
// sto_score back to zero.
// requires: m_sto_txnid is valid
void sto_end_early(void *prepared_lkr);
// params: prepared_lkr is a void * to a prepared locked keyrange. we can't use
// the real type because the compiler won't allow us to forward declare
// concurrent_tree::locked_keyrange without including concurrent_tree.h,
// which we cannot do here because it is a template implementation.
// requires: the prepared locked keyrange is for the locktree's rangetree
// requires: m_sto_txnid is valid
// effect: migrates each lock in the single txnid buffer into the locktree's
// rangetree, notifying the memory tracker as necessary.
void sto_migrate_buffer_ranges_to_tree(void *prepared_lkr);
// effect: If m_sto_txnid is valid, then release the txnid's locks
// by ending the optimization.
// requires: If m_sto_txnid is valid, it is equal to the given txnid
// returns: True if locks were released for this txnid
bool sto_try_release(TXNID txnid);
// params: prepared_lkr is a void * to a prepared locked keyrange. see above.
// requires: the prepared locked keyrange is for the locktree's rangetree
// effect: If m_sto_txnid is valid and equal to the given txnid, then
// append a range onto the buffer. Otherwise, if m_sto_txnid is valid
// but not equal to this txnid, then migrate the buffer's locks
// into the rangetree and end the optimization, setting the score
// back to zero.
// returns: true if the lock was acquired for this txnid
bool sto_try_acquire(void *prepared_lkr, TXNID txnid,
const DBT *left_key, const DBT *right_key);
// Effect: // Effect:
// Provides a hook for a helgrind suppression. // Provides a hook for a helgrind suppression.
// Returns: // Returns:
// m_single_txnid_optimization_possible // true if m_sto_txnid is not TXNID_NONE
bool unsafe_read_single_txnid_optimization_possible(void) const; bool sto_txnid_is_valid_unsafe(void) const;
// Effect:
// Provides a hook for a helgrind suppression.
// Returns:
// m_sto_score
int sto_get_score_unsafe(void )const;
// effect: Creates a locktree that uses the given memory tracker // effect: Creates a locktree that uses the given memory tracker
// to report memory usage and honor memory constraints. // to report memory usage and honor memory constraints.
...@@ -299,12 +399,15 @@ class locktree { ...@@ -299,12 +399,15 @@ class locktree {
void remove_overlapping_locks_for_txnid(TXNID txnid, void remove_overlapping_locks_for_txnid(TXNID txnid,
const DBT *left_key, const DBT *right_key); const DBT *left_key, const DBT *right_key);
int try_acquire_lock(bool is_write_request, TXNID txnid, int acquire_lock_consolidated(void *prepared_lkr, TXNID txnid,
const DBT *left_key, const DBT *right_key, txnid_set *conflicts); const DBT *left_key, const DBT *right_key, txnid_set *conflicts);
int acquire_lock(bool is_write_request, TXNID txnid, int acquire_lock(bool is_write_request, TXNID txnid,
const DBT *left_key, const DBT *right_key, txnid_set *conflicts); const DBT *left_key, const DBT *right_key, txnid_set *conflicts);
int try_acquire_lock(bool is_write_request, TXNID txnid,
const DBT *left_key, const DBT *right_key, txnid_set *conflicts);
void escalate(); void escalate();
friend class locktree_unit_test; friend class locktree_unit_test;
......
...@@ -196,6 +196,11 @@ void locktree::manager::release_lt(locktree *lt) { ...@@ -196,6 +196,11 @@ void locktree::manager::release_lt(locktree *lt) {
} }
} }
// test-only version of lock escalation
void locktree::manager::run_escalation_for_test(void) {
run_escalation();
}
// effect: escalate's the locks in each locktree // effect: escalate's the locks in each locktree
// requires: manager's mutex is held // requires: manager's mutex is held
void locktree::manager::run_escalation(void) { void locktree::manager::run_escalation(void) {
......
...@@ -116,6 +116,7 @@ void range_buffer::create(void) { ...@@ -116,6 +116,7 @@ void range_buffer::create(void) {
m_buf = nullptr; m_buf = nullptr;
m_buf_size = 0; m_buf_size = 0;
m_buf_current = 0; m_buf_current = 0;
m_num_ranges = 0;
} }
void range_buffer::append(const DBT *left_key, const DBT *right_key) { void range_buffer::append(const DBT *left_key, const DBT *right_key) {
...@@ -125,6 +126,19 @@ void range_buffer::append(const DBT *left_key, const DBT *right_key) { ...@@ -125,6 +126,19 @@ void range_buffer::append(const DBT *left_key, const DBT *right_key) {
} else { } else {
append_range(left_key, right_key); append_range(left_key, right_key);
} }
m_num_ranges++;
}
bool range_buffer::is_empty(void) const {
return m_buf == nullptr;
}
uint64_t range_buffer::get_num_bytes(void) const {
return m_buf_current;
}
int range_buffer::get_num_ranges(void) const {
return m_num_ranges;
} }
void range_buffer::destroy(void) { void range_buffer::destroy(void) {
......
...@@ -98,12 +98,22 @@ class range_buffer { ...@@ -98,12 +98,22 @@ class range_buffer {
// if the keys are equal, then only one copy is stored. // if the keys are equal, then only one copy is stored.
void append(const DBT *left_key, const DBT *right_key); void append(const DBT *left_key, const DBT *right_key);
// is this range buffer empty?
bool is_empty(void) const;
// how many bytes are stored in this range buffer?
uint64_t get_num_bytes(void) const;
// how many ranges are stored in this range buffer?
int get_num_ranges(void) const;
void destroy(void); void destroy(void);
private: private:
char *m_buf; char *m_buf;
size_t m_buf_size; size_t m_buf_size;
size_t m_buf_current; size_t m_buf_current;
int m_num_ranges;
void append_range(const DBT *left_key, const DBT *right_key); void append_range(const DBT *left_key, const DBT *right_key);
......
...@@ -18,13 +18,6 @@ void concurrent_tree_unit_test::test_lkr_remove_all(void) { ...@@ -18,13 +18,6 @@ void concurrent_tree_unit_test::test_lkr_remove_all(void) {
const uint64_t min = 0; const uint64_t min = 0;
const uint64_t max = 20; const uint64_t max = 20;
// determine how much memory should be released
keyrange example_keyrange;
example_keyrange.create(get_dbt(0), get_dbt(0));
const uint64_t num_elements = max + 1;
const uint64_t expected_mem_released =
example_keyrange.get_memory_size() * num_elements;
// remove_all should work regardless of how the // remove_all should work regardless of how the
// data was inserted into the tree, so we test it // data was inserted into the tree, so we test it
// on a tree whose elements were populated starting // on a tree whose elements were populated starting
...@@ -44,10 +37,7 @@ void concurrent_tree_unit_test::test_lkr_remove_all(void) { ...@@ -44,10 +37,7 @@ void concurrent_tree_unit_test::test_lkr_remove_all(void) {
// remove_all() from the locked keyrange and assert that // remove_all() from the locked keyrange and assert that
// the number of elements and memory removed is correct. // the number of elements and memory removed is correct.
uint64_t mem_released = 0; lkr.remove_all();
uint64_t num_removed = lkr.remove_all(&mem_released);
invariant(num_removed == num_elements);
invariant(mem_released == expected_mem_released);
invariant(lkr.m_subtree->is_empty()); invariant(lkr.m_subtree->is_empty());
invariant(tree.is_empty()); invariant(tree.is_empty());
......
...@@ -51,7 +51,7 @@ void lock_request_unit_test::test_start_pending(void) { ...@@ -51,7 +51,7 @@ void lock_request_unit_test::test_start_pending(void) {
invariant(compare_dbts(nullptr, &request.m_right_key_copy, one) == 0); invariant(compare_dbts(nullptr, &request.m_right_key_copy, one) == 0);
// release the range lock for txnid b // release the range lock for txnid b
lt->remove_overlapping_locks_for_txnid(txnid_b, zero, two); locktree_unit_test::locktree_test_release_lock(lt, txnid_b, zero, two);
// now retry the lock requests. // now retry the lock requests.
// it should transition the request to successfully complete. // it should transition the request to successfully complete.
...@@ -60,7 +60,7 @@ void lock_request_unit_test::test_start_pending(void) { ...@@ -60,7 +60,7 @@ void lock_request_unit_test::test_start_pending(void) {
invariant(request.m_state == lock_request::state::COMPLETE); invariant(request.m_state == lock_request::state::COMPLETE);
invariant(request.m_complete_r == 0); invariant(request.m_complete_r == 0);
lt->remove_overlapping_locks_for_txnid(txnid_a, one, one); locktree_unit_test::locktree_test_release_lock(lt, txnid_a, one, one);
request.destroy(); request.destroy();
mgr.release_lt(lt); mgr.release_lt(lt);
......
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
#define TOKU_LOCK_REQUEST_UNIT_TEST_H #define TOKU_LOCK_REQUEST_UNIT_TEST_H
#include "test.h" #include "test.h"
#include "locktree_unit_test.h"
#include "lock_request.h" #include "lock_request.h"
...@@ -37,7 +38,7 @@ class lock_request_unit_test { ...@@ -37,7 +38,7 @@ class lock_request_unit_test {
// lt->release_locks(), not individually using lt->remove_overlapping_locks_for_txnid). // lt->release_locks(), not individually using lt->remove_overlapping_locks_for_txnid).
void release_lock_and_retry_requests(locktree *lt, void release_lock_and_retry_requests(locktree *lt,
TXNID txnid, const DBT *left_key, const DBT * right_key) { TXNID txnid, const DBT *left_key, const DBT * right_key) {
lt->remove_overlapping_locks_for_txnid(txnid, left_key, right_key); locktree_unit_test::locktree_test_release_lock(lt, txnid, left_key, right_key);
lock_request::retry_all_lock_requests(lt); lock_request::retry_all_lock_requests(lt);
} }
}; };
......
...@@ -77,10 +77,9 @@ void locktree_unit_test::test_conflicts(void) { ...@@ -77,10 +77,9 @@ void locktree_unit_test::test_conflicts(void) {
#undef ACQUIRE_LOCK #undef ACQUIRE_LOCK
} }
invariant(num_row_locks(lt) == 2);
lt->remove_overlapping_locks_for_txnid(txnid_a, one, one); lt->remove_overlapping_locks_for_txnid(txnid_a, one, one);
lt->remove_overlapping_locks_for_txnid(txnid_a, three, four); lt->remove_overlapping_locks_for_txnid(txnid_a, three, four);
invariant(num_row_locks(lt) == 0); invariant(no_row_locks(lt));
} }
mgr.release_lt(lt); mgr.release_lt(lt);
......
...@@ -29,6 +29,16 @@ void locktree_unit_test::test_overlapping_relock(void) { ...@@ -29,6 +29,16 @@ void locktree_unit_test::test_overlapping_relock(void) {
int r; int r;
TXNID txnid_a = 1001; TXNID txnid_a = 1001;
// because of the single txnid optimization, there is no consolidation
// of read or write ranges until there is at least two txnids in
// the locktree. so here we add some arbitrary txnid to get a point
// lock [100, 100] so that the test below can expect to actually
// do something. at the end of the test, we release 100, 100.
const TXNID the_other_txnid = 9999;
const DBT *hundred = get_dbt(100);
r = lt->acquire_write_lock(the_other_txnid, hundred, hundred, nullptr);
invariant(r == 0);
for (int test_run = 0; test_run < 2; test_run++) { for (int test_run = 0; test_run < 2; test_run++) {
// test_run == 0 means test with read lock // test_run == 0 means test with read lock
// test_run == 1 means test with write lock // test_run == 1 means test with write lock
...@@ -40,19 +50,22 @@ void locktree_unit_test::test_overlapping_relock(void) { ...@@ -40,19 +50,22 @@ void locktree_unit_test::test_overlapping_relock(void) {
// ensure only [1,2] exists in the tree // ensure only [1,2] exists in the tree
r = ACQUIRE_LOCK(txnid_a, one, one, nullptr); r = ACQUIRE_LOCK(txnid_a, one, one, nullptr);
invariant(r == 0); invariant(r == 0);
invariant(num_row_locks(lt) == 1);
r = ACQUIRE_LOCK(txnid_a, two, two, nullptr); r = ACQUIRE_LOCK(txnid_a, two, two, nullptr);
invariant(r == 0); invariant(r == 0);
invariant(num_row_locks(lt) == 2);
r = ACQUIRE_LOCK(txnid_a, one, two, nullptr); r = ACQUIRE_LOCK(txnid_a, one, two, nullptr);
invariant(r == 0); invariant(r == 0);
invariant(num_row_locks(lt) == 1);
struct verify_fn_obj { struct verify_fn_obj {
bool saw_the_other;
TXNID expected_txnid; TXNID expected_txnid;
keyrange *expected_range; keyrange *expected_range;
comparator *cmp; comparator *cmp;
bool fn(const keyrange &range, TXNID txnid) { bool fn(const keyrange &range, TXNID txnid) {
if (txnid == the_other_txnid) {
invariant(!saw_the_other);
saw_the_other = true;
return true;
}
invariant(txnid == expected_txnid); invariant(txnid == expected_txnid);
keyrange::comparison c = range.compare(cmp, *expected_range); keyrange::comparison c = range.compare(cmp, *expected_range);
invariant(c == keyrange::comparison::EQUALS); invariant(c == keyrange::comparison::EQUALS);
...@@ -61,55 +74,52 @@ void locktree_unit_test::test_overlapping_relock(void) { ...@@ -61,55 +74,52 @@ void locktree_unit_test::test_overlapping_relock(void) {
} verify_fn; } verify_fn;
verify_fn.cmp = lt->m_cmp; verify_fn.cmp = lt->m_cmp;
#define do_verify() \
do { verify_fn.saw_the_other = false; locktree_iterate<verify_fn_obj>(lt, &verify_fn); } while (0)
keyrange range; keyrange range;
range.create(one, two); range.create(one, two);
verify_fn.expected_txnid = txnid_a; verify_fn.expected_txnid = txnid_a;
verify_fn.expected_range = &range; verify_fn.expected_range = &range;
locktree_iterate<verify_fn_obj>(lt, &verify_fn); do_verify();
// unlocking [1,1] should remove the only range, // unlocking [1,1] should remove the only range,
// the other unlocks shoudl do nothing. // the other unlocks shoudl do nothing.
invariant(num_row_locks(lt) == 1);
lt->remove_overlapping_locks_for_txnid(txnid_a, one, one); lt->remove_overlapping_locks_for_txnid(txnid_a, one, one);
invariant(num_row_locks(lt) == 0);
lt->remove_overlapping_locks_for_txnid(txnid_a, two, two); lt->remove_overlapping_locks_for_txnid(txnid_a, two, two);
invariant(num_row_locks(lt) == 0);
lt->remove_overlapping_locks_for_txnid(txnid_a, one, two); lt->remove_overlapping_locks_for_txnid(txnid_a, one, two);
invariant(num_row_locks(lt) == 0);
// try overlapping from the right // try overlapping from the right
r = ACQUIRE_LOCK(txnid_a, one, three, nullptr); r = ACQUIRE_LOCK(txnid_a, one, three, nullptr);
invariant(num_row_locks(lt) == 1);
r = ACQUIRE_LOCK(txnid_a, two, five, nullptr); r = ACQUIRE_LOCK(txnid_a, two, five, nullptr);
invariant(num_row_locks(lt) == 1);
verify_fn.expected_txnid = txnid_a; verify_fn.expected_txnid = txnid_a;
range.create(one, five); range.create(one, five);
verify_fn.expected_range = &range; verify_fn.expected_range = &range;
locktree_iterate<verify_fn_obj>(lt, &verify_fn); do_verify();
// now overlap from the left // now overlap from the left
r = ACQUIRE_LOCK(txnid_a, zero, four, nullptr); r = ACQUIRE_LOCK(txnid_a, zero, four, nullptr);
invariant(num_row_locks(lt) == 1);
verify_fn.expected_txnid = txnid_a; verify_fn.expected_txnid = txnid_a;
range.create(zero, five); range.create(zero, five);
verify_fn.expected_range = &range; verify_fn.expected_range = &range;
locktree_iterate<verify_fn_obj>(lt, &verify_fn); do_verify();
// now relock in a range that is already dominated // now relock in a range that is already dominated
r = ACQUIRE_LOCK(txnid_a, five, five, nullptr); r = ACQUIRE_LOCK(txnid_a, five, five, nullptr);
invariant(num_row_locks(lt) == 1);
verify_fn.expected_txnid = txnid_a; verify_fn.expected_txnid = txnid_a;
range.create(zero, five); range.create(zero, five);
verify_fn.expected_range = &range; verify_fn.expected_range = &range;
locktree_iterate<verify_fn_obj>(lt, &verify_fn); do_verify();
// release one of the locks we acquired. this should clean up the whole range. // release one of the locks we acquired. this should clean up the whole range.
lt->remove_overlapping_locks_for_txnid(txnid_a, zero, four); lt->remove_overlapping_locks_for_txnid(txnid_a, zero, four);
invariant(num_row_locks(lt) == 0);
#undef ACQUIRE_LOCK #undef ACQUIRE_LOCK
} }
// remove the other txnid's lock now
lt->remove_overlapping_locks_for_txnid(the_other_txnid, hundred, hundred);
mgr.release_lt(lt); mgr.release_lt(lt);
mgr.destroy(); mgr.destroy();
} }
......
...@@ -36,60 +36,35 @@ void locktree_unit_test::test_simple_lock(void) { ...@@ -36,60 +36,35 @@ void locktree_unit_test::test_simple_lock(void) {
// four txns, four points // four txns, four points
r = ACQUIRE_LOCK(txnid_a, one, one, nullptr); r = ACQUIRE_LOCK(txnid_a, one, one, nullptr);
invariant(r == 0); invariant(r == 0);
invariant(num_row_locks(lt) == 1);
r = ACQUIRE_LOCK(txnid_b, two, two, nullptr); r = ACQUIRE_LOCK(txnid_b, two, two, nullptr);
invariant(r == 0); invariant(r == 0);
invariant(num_row_locks(lt) == 2);
r = ACQUIRE_LOCK(txnid_c, three, three, nullptr); r = ACQUIRE_LOCK(txnid_c, three, three, nullptr);
invariant(r == 0); invariant(r == 0);
invariant(num_row_locks(lt) == 3);
r = ACQUIRE_LOCK(txnid_d, four, four, nullptr); r = ACQUIRE_LOCK(txnid_d, four, four, nullptr);
invariant(r == 0); invariant(r == 0);
invariant(num_row_locks(lt) == 4); locktree_test_release_lock(lt, txnid_a, one, one);
lt->remove_overlapping_locks_for_txnid(txnid_a, one, one); locktree_test_release_lock(lt, txnid_b, two, two);
invariant(num_row_locks(lt) == 3); locktree_test_release_lock(lt, txnid_c, three, three);
lt->remove_overlapping_locks_for_txnid(txnid_b, two, two); locktree_test_release_lock(lt, txnid_d, four, four);
invariant(num_row_locks(lt) == 2); invariant(no_row_locks(lt));
lt->remove_overlapping_locks_for_txnid(txnid_c, three, three);
invariant(num_row_locks(lt) == 1);
lt->remove_overlapping_locks_for_txnid(txnid_d, four, four);
invariant(num_row_locks(lt) == 0);
// two txns, two ranges // two txns, two ranges
r = ACQUIRE_LOCK(txnid_c, one, two, nullptr); r = ACQUIRE_LOCK(txnid_c, one, two, nullptr);
invariant(r == 0); invariant(r == 0);
invariant(num_row_locks(lt) == 1);
r = ACQUIRE_LOCK(txnid_b, three, four, nullptr); r = ACQUIRE_LOCK(txnid_b, three, four, nullptr);
invariant(r == 0); invariant(r == 0);
invariant(num_row_locks(lt) == 2); locktree_test_release_lock(lt, txnid_c, one, two);
lt->remove_overlapping_locks_for_txnid(txnid_c, one, two); locktree_test_release_lock(lt, txnid_b, three, four);
invariant(num_row_locks(lt) == 1); invariant(no_row_locks(lt));
lt->remove_overlapping_locks_for_txnid(txnid_b, three, four);
invariant(num_row_locks(lt) == 0);
// one txn, one range, one point
r = ACQUIRE_LOCK(txnid_a, two, three, nullptr);
invariant(r == 0);
invariant(num_row_locks(lt) == 1);
r = ACQUIRE_LOCK(txnid_a, four, four, nullptr);
invariant(r == 0);
invariant(num_row_locks(lt) == 2);
lt->remove_overlapping_locks_for_txnid(txnid_a, two, three);
invariant(num_row_locks(lt) == 1);
lt->remove_overlapping_locks_for_txnid(txnid_a, four, four);
invariant(num_row_locks(lt) == 0);
// two txns, one range, one point // two txns, one range, one point
r = ACQUIRE_LOCK(txnid_c, three, four, nullptr); r = ACQUIRE_LOCK(txnid_c, three, four, nullptr);
invariant(r == 0); invariant(r == 0);
invariant(num_row_locks(lt) == 1);
r = ACQUIRE_LOCK(txnid_d, one, one, nullptr); r = ACQUIRE_LOCK(txnid_d, one, one, nullptr);
invariant(r == 0); invariant(r == 0);
invariant(num_row_locks(lt) == 2); locktree_test_release_lock(lt, txnid_c, three, four);
lt->remove_overlapping_locks_for_txnid(txnid_c, three, four); locktree_test_release_lock(lt, txnid_d, one, one);
invariant(num_row_locks(lt) == 1); invariant(no_row_locks(lt));
lt->remove_overlapping_locks_for_txnid(txnid_d, one, one);
invariant(num_row_locks(lt) == 0);
#undef ACQUIRE_LOCK #undef ACQUIRE_LOCK
} }
...@@ -124,7 +99,7 @@ void locktree_unit_test::test_simple_lock(void) { ...@@ -124,7 +99,7 @@ void locktree_unit_test::test_simple_lock(void) {
for (int64_t i = 0; i < num_locks; i++) { for (int64_t i = 0; i < num_locks; i++) {
k.data = (void *) &keys[i]; k.data = (void *) &keys[i];
lt->remove_overlapping_locks_for_txnid(txnid_a, &k, &k); locktree_test_release_lock(lt, txnid_a, &k, &k);
} }
toku_free(keys); toku_free(keys);
......
...@@ -58,20 +58,10 @@ void locktree_unit_test::test_single_txnid_optimization(void) { ...@@ -58,20 +58,10 @@ void locktree_unit_test::test_single_txnid_optimization(void) {
lock_and_append_point_for_txnid_a(zero); lock_and_append_point_for_txnid_a(zero);
maybe_point_locks_for_txnid_b(2); maybe_point_locks_for_txnid_b(2);
// txnid b does not take a lock on iteration 3
if (where != 3) {
invariant(num_row_locks(lt) == 4);
} else {
invariant(num_row_locks(lt) == 3);
}
lt->release_locks(txnid_a, &buffer); lt->release_locks(txnid_a, &buffer);
// txnid b does not take a lock on iteration 3 // txnid b does not take a lock on iteration 3
if (where != 3) { if (where != 3) {
invariant(num_row_locks(lt) == 1);
struct verify_fn_obj { struct verify_fn_obj {
TXNID expected_txnid; TXNID expected_txnid;
keyrange *expected_range; keyrange *expected_range;
......
...@@ -56,20 +56,19 @@ class locktree_unit_test { ...@@ -56,20 +56,19 @@ class locktree_unit_test {
ltr.release(); ltr.release();
} }
static size_t num_row_locks(const locktree *lt) { static bool no_row_locks(const locktree *lt) {
struct count_fn_obj { return lt->m_rangetree->is_empty() && lt->m_sto_buffer.is_empty();
size_t count;
bool fn(const keyrange &range, TXNID txnid) {
(void) range;
(void) txnid;
count++;
return true;
}
} count_fn;
count_fn.count = 0;
locktree_iterate<count_fn_obj>(lt, &count_fn);
return count_fn.count;
} }
static void locktree_test_release_lock(locktree *lt, TXNID txnid, const DBT *left_key, const DBT *right_key) {
range_buffer buffer;
buffer.create();
buffer.append(left_key, right_key);
lt->release_locks(txnid, &buffer);
buffer.destroy();
}
friend class lock_request_unit_test;
}; };
} /* namespace toku */ } /* namespace toku */
......
...@@ -266,31 +266,23 @@ treenode *treenode::remove_root_of_subtree() { ...@@ -266,31 +266,23 @@ treenode *treenode::remove_root_of_subtree() {
return this; return this;
} }
uint64_t treenode::recursive_remove(uint64_t *mem_released) { void treenode::recursive_remove(void) {
// remove left and right subtrees
uint64_t nodes_removed = 0;
treenode *left = m_left_child.ptr; treenode *left = m_left_child.ptr;
if (left) { if (left) {
nodes_removed += left->recursive_remove(mem_released); left->recursive_remove();
} }
m_left_child.set(nullptr); m_left_child.set(nullptr);
treenode *right = m_right_child.ptr; treenode *right = m_right_child.ptr;
if (right) { if (right) {
nodes_removed += right->recursive_remove(mem_released); right->recursive_remove();
} }
m_right_child.set(nullptr); m_right_child.set(nullptr);
// note the amount of memory to-be released by this node
if (mem_released) {
*mem_released += m_range.get_memory_size();
}
// we do not take locks on the way down, so we know non-root nodes // we do not take locks on the way down, so we know non-root nodes
// are unlocked here and the caller is required to pass a locked // are unlocked here and the caller is required to pass a locked
// root, so this free is correct. // root, so this free is correct.
treenode::free(this); treenode::free(this);
return nodes_removed + 1;
} }
treenode *treenode::remove(const keyrange &range) { treenode *treenode::remove(const keyrange &range) {
......
...@@ -86,10 +86,7 @@ class treenode { ...@@ -86,10 +86,7 @@ class treenode {
// effect: removes this node and all of its children, recursively // effect: removes this node and all of its children, recursively
// requires: every node at and below this node is unlocked // requires: every node at and below this node is unlocked
// returns: the number of nodes removed void recursive_remove(void);
// returns: *mem_released is the total amount of keyrange memory released.
// mem_released does not account for treenode insertion overhead.
uint64_t recursive_remove(uint64_t *mem_released);
private: private:
......
...@@ -343,7 +343,9 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS) ...@@ -343,7 +343,9 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS)
# libtokudb.so. # libtokudb.so.
# We link the test with util directly so that the test code itself can use # We link the test with util directly so that the test code itself can use
# some of those things (i.e. kibbutz in the threaded tests). # some of those things (i.e. kibbutz in the threaded tests).
target_link_libraries(${base}.tdb util ${LIBTOKUDB} ${LIBTOKUPORTABILITY}) # We link the locktree so that threaded stress tests can call some
# functions (ie: lock escalation) directly.
target_link_libraries(${base}.tdb util locktree ${LIBTOKUDB} ${LIBTOKUPORTABILITY})
set_property(TARGET ${base}.tdb APPEND PROPERTY set_property(TARGET ${base}.tdb APPEND PROPERTY
COMPILE_DEFINITIONS "ENVDIR=\"dir.${bin}\";USE_TDB;IS_TDB=1;TOKUDB=1") COMPILE_DEFINITIONS "ENVDIR=\"dir.${bin}\";USE_TDB;IS_TDB=1;TOKUDB=1")
set_property(DIRECTORY APPEND PROPERTY ADDITIONAL_MAKE_CLEAN_FILES "dir.${bin}") set_property(DIRECTORY APPEND PROPERTY ADDITIONAL_MAKE_CLEAN_FILES "dir.${bin}")
...@@ -378,7 +380,7 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS) ...@@ -378,7 +380,7 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS)
function(add_custom_executable prefix binary source) function(add_custom_executable prefix binary source)
add_executable(${prefix}_${binary} ${source}) add_executable(${prefix}_${binary} ${source})
target_link_libraries(${prefix}_${binary} util ${LIBTOKUDB} ${LIBTOKUPORTABILITY}) target_link_libraries(${prefix}_${binary} util locktree ${LIBTOKUDB} ${LIBTOKUPORTABILITY})
set_target_properties(${prefix}_${binary} PROPERTIES set_target_properties(${prefix}_${binary} PROPERTIES
COMPILE_DEFINITIONS "ENVDIR=\"dir.${prefix}_${source}.tdb\";USE_TDB;IS_TDB=1;TOKUDB=1") COMPILE_DEFINITIONS "ENVDIR=\"dir.${prefix}_${source}.tdb\";USE_TDB;IS_TDB=1;TOKUDB=1")
set_property(DIRECTORY APPEND PROPERTY ADDITIONAL_MAKE_CLEAN_FILES "dir.${prefix}_${source}.tdb") set_property(DIRECTORY APPEND PROPERTY ADDITIONAL_MAKE_CLEAN_FILES "dir.${prefix}_${source}.tdb")
......
...@@ -40,9 +40,18 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -40,9 +40,18 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
myargs[0].operation_extra = &soe[0]; myargs[0].operation_extra = &soe[0];
myargs[0].operation = scan_op; myargs[0].operation = scan_op;
// make the lock escalation thread.
// it should sleep somewhere between 10 and 20
// seconds between each escalation.
struct lock_escalation_op_extra eoe;
eoe.min_sleep_time_micros = 10UL * (1000 * 1000);
eoe.max_sleep_time_micros = 20UL * (1000 * 1000);
myargs[1].operation_extra = &eoe;
myargs[1].operation = lock_escalation_op;
// make the threads that update the db // make the threads that update the db
struct update_op_args uoe = get_update_op_args(cli_args, NULL); struct update_op_args uoe = get_update_op_args(cli_args, NULL);
for (int i = 1; i < 1 + cli_args->num_update_threads; ++i) { for (int i = 2; i < 2 + cli_args->num_update_threads; ++i) {
myargs[i].operation_extra = &uoe; myargs[i].operation_extra = &uoe;
myargs[i].operation = update_op; myargs[i].operation = update_op;
myargs[i].do_prepare = false; myargs[i].do_prepare = false;
...@@ -50,7 +59,7 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -50,7 +59,7 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
// doing sequential updates. the rest of the threads // doing sequential updates. the rest of the threads
// will take point write locks on update as usual. // will take point write locks on update as usual.
// this ensures both ranges and points are stressed. // this ensures both ranges and points are stressed.
myargs[i].prelock_updates = i < 4 ? true : false; myargs[i].prelock_updates = i < 5 ? true : false;
} }
run_workers(myargs, num_threads, cli_args->num_seconds, false, cli_args); run_workers(myargs, num_threads, cli_args->num_seconds, false, cli_args);
......
...@@ -34,6 +34,8 @@ ...@@ -34,6 +34,8 @@
#include <util/rwlock.h> #include <util/rwlock.h>
#include <util/kibbutz.h> #include <util/kibbutz.h>
#include <src/ydb-internal.h>
#include <ft/ybt.h> #include <ft/ybt.h>
using namespace toku; using namespace toku;
...@@ -888,6 +890,27 @@ static int UU() verify_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra ...@@ -888,6 +890,27 @@ static int UU() verify_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra
return r; return r;
} }
struct lock_escalation_op_extra {
// sleep somewhere between these times before running escalation.
// this will add some chaos into the mix.
uint64_t min_sleep_time_micros;
uint64_t max_sleep_time_micros;
};
static int UU() lock_escalation_op(DB_TXN *UU(txn), ARG arg, void* operation_extra, void *UU(stats_extra)) {
struct lock_escalation_op_extra *CAST_FROM_VOIDP(extra, operation_extra);
if (extra->max_sleep_time_micros > 0) {
invariant(extra->max_sleep_time_micros >= extra->min_sleep_time_micros);
uint64_t extra_sleep_time = (extra->max_sleep_time_micros - extra->min_sleep_time_micros) + 1;
uint64_t sleep_time = extra->min_sleep_time_micros + (myrandom_r(arg->random_data) % extra_sleep_time);
usleep(sleep_time);
}
if (!arg->cli->nolocktree) {
toku_env_run_lock_escalation_for_test(arg->env);
}
return 0;
}
static int UU() scan_op(DB_TXN *txn, ARG UU(arg), void* operation_extra, void *UU(stats_extra)) { static int UU() scan_op(DB_TXN *txn, ARG UU(arg), void* operation_extra, void *UU(stats_extra)) {
struct scan_op_extra* CAST_FROM_VOIDP(extra, operation_extra); struct scan_op_extra* CAST_FROM_VOIDP(extra, operation_extra);
for (int i = 0; run_test && i < arg->cli->num_DBs; i++) { for (int i = 0; run_test && i < arg->cli->num_DBs; i++) {
......
...@@ -95,6 +95,12 @@ struct __toku_db_env_internal { ...@@ -95,6 +95,12 @@ struct __toku_db_env_internal {
int tmpdir_lockfd; int tmpdir_lockfd;
}; };
// test-only environment function for running lock escalation
static inline void toku_env_run_lock_escalation_for_test(DB_ENV *env) {
toku::locktree::manager *mgr = &env->i->ltm;
mgr->run_escalation_for_test();
}
// Common error handling macros and panic detection // Common error handling macros and panic detection
#define MAYBE_RETURN_ERROR(cond, status) if (cond) return status; #define MAYBE_RETURN_ERROR(cond, status) if (cond) return status;
#define HANDLE_PANICKED_ENV(env) if (toku_env_is_panicked(env)) { sleep(1); return EINVAL; } #define HANDLE_PANICKED_ENV(env) if (toku_env_is_panicked(env)) { sleep(1); return EINVAL; }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment