Commit 33a99aff authored by John Esmet's avatar John Esmet Committed by Yoni Fogel

fixes #5771 merge the single txnid optimization to main. single threaded write...

fixes #5771 merge the single txnid optimization to main. single threaded write performance is up 20-50% in mysql and multithreaded performance is largely unchanged.


git-svn-id: file:///svn/toku/tokudb@51108 c7de825b-a66e-492c-adef-691d508d4ae1
parent b43de53f
...@@ -29,6 +29,7 @@ void concurrent_tree::locked_keyrange::prepare(concurrent_tree *tree) { ...@@ -29,6 +29,7 @@ void concurrent_tree::locked_keyrange::prepare(concurrent_tree *tree) {
treenode *const root = &tree->m_root; treenode *const root = &tree->m_root;
m_tree = tree; m_tree = tree;
m_subtree = root; m_subtree = root;
m_range = keyrange::get_infinite_range();
root->mutex_lock(); root->mutex_lock();
} }
...@@ -83,12 +84,6 @@ void concurrent_tree::locked_keyrange::remove(const keyrange &range) { ...@@ -83,12 +84,6 @@ void concurrent_tree::locked_keyrange::remove(const keyrange &range) {
} }
} }
uint64_t concurrent_tree::locked_keyrange::remove_all(uint64_t *mem_released) { void concurrent_tree::locked_keyrange::remove_all(void) {
// in practice, remove_all is only okay on the root node, because you m_subtree->recursive_remove();
// want to remove all of the elements in the tree, not some subtree.
//
// so we lazily enforce that you are removing from a non-empty root.
invariant(m_subtree->is_root());
invariant(!m_subtree->is_empty());
return m_subtree->recursive_remove(mem_released);
} }
...@@ -36,11 +36,14 @@ class concurrent_tree { ...@@ -36,11 +36,14 @@ class concurrent_tree {
// effect: prepare to acquire a locked keyrange over the given // effect: prepare to acquire a locked keyrange over the given
// concurrent_tree, preventing other threads from preparing // concurrent_tree, preventing other threads from preparing
// until this thread either does acquire() or release(). // until this thread either does acquire() or release().
// rationale: this provides the user with a serialization point // note: operations performed on a prepared keyrange are equivalent
// for descending / modifying the the tree. // to ones performed on an acquired keyrange over -inf, +inf.
// rationale: this provides the user with a serialization point for descending
// or modifying the the tree. it also proives a convenient way of
// doing serializable operations on the tree.
// There are two valid sequences of calls: // There are two valid sequences of calls:
// - prepare, acquire, release // - prepare, acquire, [operations], release
// - prepare, release // - prepare, [operations],release
void prepare(concurrent_tree *tree); void prepare(concurrent_tree *tree);
// requires: the locked keyrange was prepare()'d // requires: the locked keyrange was prepare()'d
...@@ -68,12 +71,9 @@ class concurrent_tree { ...@@ -68,12 +71,9 @@ class concurrent_tree {
// rationale: caller is responsible for only removing existing ranges // rationale: caller is responsible for only removing existing ranges
void remove(const keyrange &range); void remove(const keyrange &range);
// effect: removes everything within this locked keyrange // effect: removes all of the keys represented by this locked keyrange
// requires: the locked keyrange is over -inf, +inf // rationale: we'd like a fast way to empty out a tree
// returns: the number of nodes removed void remove_all(void);
// returns: *mem_released updated to the cumulative size of all keyranges destroyed
// rationale: we'd like a fast O(n) way of removing everything from the tree
uint64_t remove_all(uint64_t *mem_released);
private: private:
// the concurrent tree this locked keyrange is for // the concurrent tree this locked keyrange is for
......
This diff is collapsed.
This diff is collapsed.
...@@ -196,6 +196,11 @@ void locktree::manager::release_lt(locktree *lt) { ...@@ -196,6 +196,11 @@ void locktree::manager::release_lt(locktree *lt) {
} }
} }
// test-only version of lock escalation
void locktree::manager::run_escalation_for_test(void) {
run_escalation();
}
// effect: escalate's the locks in each locktree // effect: escalate's the locks in each locktree
// requires: manager's mutex is held // requires: manager's mutex is held
void locktree::manager::run_escalation(void) { void locktree::manager::run_escalation(void) {
......
...@@ -116,6 +116,7 @@ void range_buffer::create(void) { ...@@ -116,6 +116,7 @@ void range_buffer::create(void) {
m_buf = nullptr; m_buf = nullptr;
m_buf_size = 0; m_buf_size = 0;
m_buf_current = 0; m_buf_current = 0;
m_num_ranges = 0;
} }
void range_buffer::append(const DBT *left_key, const DBT *right_key) { void range_buffer::append(const DBT *left_key, const DBT *right_key) {
...@@ -125,6 +126,19 @@ void range_buffer::append(const DBT *left_key, const DBT *right_key) { ...@@ -125,6 +126,19 @@ void range_buffer::append(const DBT *left_key, const DBT *right_key) {
} else { } else {
append_range(left_key, right_key); append_range(left_key, right_key);
} }
m_num_ranges++;
}
bool range_buffer::is_empty(void) const {
return m_buf == nullptr;
}
uint64_t range_buffer::get_num_bytes(void) const {
return m_buf_current;
}
int range_buffer::get_num_ranges(void) const {
return m_num_ranges;
} }
void range_buffer::destroy(void) { void range_buffer::destroy(void) {
......
...@@ -98,12 +98,22 @@ class range_buffer { ...@@ -98,12 +98,22 @@ class range_buffer {
// if the keys are equal, then only one copy is stored. // if the keys are equal, then only one copy is stored.
void append(const DBT *left_key, const DBT *right_key); void append(const DBT *left_key, const DBT *right_key);
// is this range buffer empty?
bool is_empty(void) const;
// how many bytes are stored in this range buffer?
uint64_t get_num_bytes(void) const;
// how many ranges are stored in this range buffer?
int get_num_ranges(void) const;
void destroy(void); void destroy(void);
private: private:
char *m_buf; char *m_buf;
size_t m_buf_size; size_t m_buf_size;
size_t m_buf_current; size_t m_buf_current;
int m_num_ranges;
void append_range(const DBT *left_key, const DBT *right_key); void append_range(const DBT *left_key, const DBT *right_key);
......
...@@ -18,13 +18,6 @@ void concurrent_tree_unit_test::test_lkr_remove_all(void) { ...@@ -18,13 +18,6 @@ void concurrent_tree_unit_test::test_lkr_remove_all(void) {
const uint64_t min = 0; const uint64_t min = 0;
const uint64_t max = 20; const uint64_t max = 20;
// determine how much memory should be released
keyrange example_keyrange;
example_keyrange.create(get_dbt(0), get_dbt(0));
const uint64_t num_elements = max + 1;
const uint64_t expected_mem_released =
example_keyrange.get_memory_size() * num_elements;
// remove_all should work regardless of how the // remove_all should work regardless of how the
// data was inserted into the tree, so we test it // data was inserted into the tree, so we test it
// on a tree whose elements were populated starting // on a tree whose elements were populated starting
...@@ -44,10 +37,7 @@ void concurrent_tree_unit_test::test_lkr_remove_all(void) { ...@@ -44,10 +37,7 @@ void concurrent_tree_unit_test::test_lkr_remove_all(void) {
// remove_all() from the locked keyrange and assert that // remove_all() from the locked keyrange and assert that
// the number of elements and memory removed is correct. // the number of elements and memory removed is correct.
uint64_t mem_released = 0; lkr.remove_all();
uint64_t num_removed = lkr.remove_all(&mem_released);
invariant(num_removed == num_elements);
invariant(mem_released == expected_mem_released);
invariant(lkr.m_subtree->is_empty()); invariant(lkr.m_subtree->is_empty());
invariant(tree.is_empty()); invariant(tree.is_empty());
......
...@@ -51,7 +51,7 @@ void lock_request_unit_test::test_start_pending(void) { ...@@ -51,7 +51,7 @@ void lock_request_unit_test::test_start_pending(void) {
invariant(compare_dbts(nullptr, &request.m_right_key_copy, one) == 0); invariant(compare_dbts(nullptr, &request.m_right_key_copy, one) == 0);
// release the range lock for txnid b // release the range lock for txnid b
lt->remove_overlapping_locks_for_txnid(txnid_b, zero, two); locktree_unit_test::locktree_test_release_lock(lt, txnid_b, zero, two);
// now retry the lock requests. // now retry the lock requests.
// it should transition the request to successfully complete. // it should transition the request to successfully complete.
...@@ -60,7 +60,7 @@ void lock_request_unit_test::test_start_pending(void) { ...@@ -60,7 +60,7 @@ void lock_request_unit_test::test_start_pending(void) {
invariant(request.m_state == lock_request::state::COMPLETE); invariant(request.m_state == lock_request::state::COMPLETE);
invariant(request.m_complete_r == 0); invariant(request.m_complete_r == 0);
lt->remove_overlapping_locks_for_txnid(txnid_a, one, one); locktree_unit_test::locktree_test_release_lock(lt, txnid_a, one, one);
request.destroy(); request.destroy();
mgr.release_lt(lt); mgr.release_lt(lt);
......
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
#define TOKU_LOCK_REQUEST_UNIT_TEST_H #define TOKU_LOCK_REQUEST_UNIT_TEST_H
#include "test.h" #include "test.h"
#include "locktree_unit_test.h"
#include "lock_request.h" #include "lock_request.h"
...@@ -37,7 +38,7 @@ class lock_request_unit_test { ...@@ -37,7 +38,7 @@ class lock_request_unit_test {
// lt->release_locks(), not individually using lt->remove_overlapping_locks_for_txnid). // lt->release_locks(), not individually using lt->remove_overlapping_locks_for_txnid).
void release_lock_and_retry_requests(locktree *lt, void release_lock_and_retry_requests(locktree *lt,
TXNID txnid, const DBT *left_key, const DBT * right_key) { TXNID txnid, const DBT *left_key, const DBT * right_key) {
lt->remove_overlapping_locks_for_txnid(txnid, left_key, right_key); locktree_unit_test::locktree_test_release_lock(lt, txnid, left_key, right_key);
lock_request::retry_all_lock_requests(lt); lock_request::retry_all_lock_requests(lt);
} }
}; };
......
...@@ -77,10 +77,9 @@ void locktree_unit_test::test_conflicts(void) { ...@@ -77,10 +77,9 @@ void locktree_unit_test::test_conflicts(void) {
#undef ACQUIRE_LOCK #undef ACQUIRE_LOCK
} }
invariant(num_row_locks(lt) == 2);
lt->remove_overlapping_locks_for_txnid(txnid_a, one, one); lt->remove_overlapping_locks_for_txnid(txnid_a, one, one);
lt->remove_overlapping_locks_for_txnid(txnid_a, three, four); lt->remove_overlapping_locks_for_txnid(txnid_a, three, four);
invariant(num_row_locks(lt) == 0); invariant(no_row_locks(lt));
} }
mgr.release_lt(lt); mgr.release_lt(lt);
......
...@@ -29,6 +29,16 @@ void locktree_unit_test::test_overlapping_relock(void) { ...@@ -29,6 +29,16 @@ void locktree_unit_test::test_overlapping_relock(void) {
int r; int r;
TXNID txnid_a = 1001; TXNID txnid_a = 1001;
// because of the single txnid optimization, there is no consolidation
// of read or write ranges until there is at least two txnids in
// the locktree. so here we add some arbitrary txnid to get a point
// lock [100, 100] so that the test below can expect to actually
// do something. at the end of the test, we release 100, 100.
const TXNID the_other_txnid = 9999;
const DBT *hundred = get_dbt(100);
r = lt->acquire_write_lock(the_other_txnid, hundred, hundred, nullptr);
invariant(r == 0);
for (int test_run = 0; test_run < 2; test_run++) { for (int test_run = 0; test_run < 2; test_run++) {
// test_run == 0 means test with read lock // test_run == 0 means test with read lock
// test_run == 1 means test with write lock // test_run == 1 means test with write lock
...@@ -40,19 +50,22 @@ void locktree_unit_test::test_overlapping_relock(void) { ...@@ -40,19 +50,22 @@ void locktree_unit_test::test_overlapping_relock(void) {
// ensure only [1,2] exists in the tree // ensure only [1,2] exists in the tree
r = ACQUIRE_LOCK(txnid_a, one, one, nullptr); r = ACQUIRE_LOCK(txnid_a, one, one, nullptr);
invariant(r == 0); invariant(r == 0);
invariant(num_row_locks(lt) == 1);
r = ACQUIRE_LOCK(txnid_a, two, two, nullptr); r = ACQUIRE_LOCK(txnid_a, two, two, nullptr);
invariant(r == 0); invariant(r == 0);
invariant(num_row_locks(lt) == 2);
r = ACQUIRE_LOCK(txnid_a, one, two, nullptr); r = ACQUIRE_LOCK(txnid_a, one, two, nullptr);
invariant(r == 0); invariant(r == 0);
invariant(num_row_locks(lt) == 1);
struct verify_fn_obj { struct verify_fn_obj {
bool saw_the_other;
TXNID expected_txnid; TXNID expected_txnid;
keyrange *expected_range; keyrange *expected_range;
comparator *cmp; comparator *cmp;
bool fn(const keyrange &range, TXNID txnid) { bool fn(const keyrange &range, TXNID txnid) {
if (txnid == the_other_txnid) {
invariant(!saw_the_other);
saw_the_other = true;
return true;
}
invariant(txnid == expected_txnid); invariant(txnid == expected_txnid);
keyrange::comparison c = range.compare(cmp, *expected_range); keyrange::comparison c = range.compare(cmp, *expected_range);
invariant(c == keyrange::comparison::EQUALS); invariant(c == keyrange::comparison::EQUALS);
...@@ -61,55 +74,52 @@ void locktree_unit_test::test_overlapping_relock(void) { ...@@ -61,55 +74,52 @@ void locktree_unit_test::test_overlapping_relock(void) {
} verify_fn; } verify_fn;
verify_fn.cmp = lt->m_cmp; verify_fn.cmp = lt->m_cmp;
#define do_verify() \
do { verify_fn.saw_the_other = false; locktree_iterate<verify_fn_obj>(lt, &verify_fn); } while (0)
keyrange range; keyrange range;
range.create(one, two); range.create(one, two);
verify_fn.expected_txnid = txnid_a; verify_fn.expected_txnid = txnid_a;
verify_fn.expected_range = &range; verify_fn.expected_range = &range;
locktree_iterate<verify_fn_obj>(lt, &verify_fn); do_verify();
// unlocking [1,1] should remove the only range, // unlocking [1,1] should remove the only range,
// the other unlocks shoudl do nothing. // the other unlocks shoudl do nothing.
invariant(num_row_locks(lt) == 1);
lt->remove_overlapping_locks_for_txnid(txnid_a, one, one); lt->remove_overlapping_locks_for_txnid(txnid_a, one, one);
invariant(num_row_locks(lt) == 0);
lt->remove_overlapping_locks_for_txnid(txnid_a, two, two); lt->remove_overlapping_locks_for_txnid(txnid_a, two, two);
invariant(num_row_locks(lt) == 0);
lt->remove_overlapping_locks_for_txnid(txnid_a, one, two); lt->remove_overlapping_locks_for_txnid(txnid_a, one, two);
invariant(num_row_locks(lt) == 0);
// try overlapping from the right // try overlapping from the right
r = ACQUIRE_LOCK(txnid_a, one, three, nullptr); r = ACQUIRE_LOCK(txnid_a, one, three, nullptr);
invariant(num_row_locks(lt) == 1);
r = ACQUIRE_LOCK(txnid_a, two, five, nullptr); r = ACQUIRE_LOCK(txnid_a, two, five, nullptr);
invariant(num_row_locks(lt) == 1);
verify_fn.expected_txnid = txnid_a; verify_fn.expected_txnid = txnid_a;
range.create(one, five); range.create(one, five);
verify_fn.expected_range = &range; verify_fn.expected_range = &range;
locktree_iterate<verify_fn_obj>(lt, &verify_fn); do_verify();
// now overlap from the left // now overlap from the left
r = ACQUIRE_LOCK(txnid_a, zero, four, nullptr); r = ACQUIRE_LOCK(txnid_a, zero, four, nullptr);
invariant(num_row_locks(lt) == 1);
verify_fn.expected_txnid = txnid_a; verify_fn.expected_txnid = txnid_a;
range.create(zero, five); range.create(zero, five);
verify_fn.expected_range = &range; verify_fn.expected_range = &range;
locktree_iterate<verify_fn_obj>(lt, &verify_fn); do_verify();
// now relock in a range that is already dominated // now relock in a range that is already dominated
r = ACQUIRE_LOCK(txnid_a, five, five, nullptr); r = ACQUIRE_LOCK(txnid_a, five, five, nullptr);
invariant(num_row_locks(lt) == 1);
verify_fn.expected_txnid = txnid_a; verify_fn.expected_txnid = txnid_a;
range.create(zero, five); range.create(zero, five);
verify_fn.expected_range = &range; verify_fn.expected_range = &range;
locktree_iterate<verify_fn_obj>(lt, &verify_fn); do_verify();
// release one of the locks we acquired. this should clean up the whole range. // release one of the locks we acquired. this should clean up the whole range.
lt->remove_overlapping_locks_for_txnid(txnid_a, zero, four); lt->remove_overlapping_locks_for_txnid(txnid_a, zero, four);
invariant(num_row_locks(lt) == 0);
#undef ACQUIRE_LOCK #undef ACQUIRE_LOCK
} }
// remove the other txnid's lock now
lt->remove_overlapping_locks_for_txnid(the_other_txnid, hundred, hundred);
mgr.release_lt(lt); mgr.release_lt(lt);
mgr.destroy(); mgr.destroy();
} }
......
...@@ -36,60 +36,35 @@ void locktree_unit_test::test_simple_lock(void) { ...@@ -36,60 +36,35 @@ void locktree_unit_test::test_simple_lock(void) {
// four txns, four points // four txns, four points
r = ACQUIRE_LOCK(txnid_a, one, one, nullptr); r = ACQUIRE_LOCK(txnid_a, one, one, nullptr);
invariant(r == 0); invariant(r == 0);
invariant(num_row_locks(lt) == 1);
r = ACQUIRE_LOCK(txnid_b, two, two, nullptr); r = ACQUIRE_LOCK(txnid_b, two, two, nullptr);
invariant(r == 0); invariant(r == 0);
invariant(num_row_locks(lt) == 2);
r = ACQUIRE_LOCK(txnid_c, three, three, nullptr); r = ACQUIRE_LOCK(txnid_c, three, three, nullptr);
invariant(r == 0); invariant(r == 0);
invariant(num_row_locks(lt) == 3);
r = ACQUIRE_LOCK(txnid_d, four, four, nullptr); r = ACQUIRE_LOCK(txnid_d, four, four, nullptr);
invariant(r == 0); invariant(r == 0);
invariant(num_row_locks(lt) == 4); locktree_test_release_lock(lt, txnid_a, one, one);
lt->remove_overlapping_locks_for_txnid(txnid_a, one, one); locktree_test_release_lock(lt, txnid_b, two, two);
invariant(num_row_locks(lt) == 3); locktree_test_release_lock(lt, txnid_c, three, three);
lt->remove_overlapping_locks_for_txnid(txnid_b, two, two); locktree_test_release_lock(lt, txnid_d, four, four);
invariant(num_row_locks(lt) == 2); invariant(no_row_locks(lt));
lt->remove_overlapping_locks_for_txnid(txnid_c, three, three);
invariant(num_row_locks(lt) == 1);
lt->remove_overlapping_locks_for_txnid(txnid_d, four, four);
invariant(num_row_locks(lt) == 0);
// two txns, two ranges // two txns, two ranges
r = ACQUIRE_LOCK(txnid_c, one, two, nullptr); r = ACQUIRE_LOCK(txnid_c, one, two, nullptr);
invariant(r == 0); invariant(r == 0);
invariant(num_row_locks(lt) == 1);
r = ACQUIRE_LOCK(txnid_b, three, four, nullptr); r = ACQUIRE_LOCK(txnid_b, three, four, nullptr);
invariant(r == 0); invariant(r == 0);
invariant(num_row_locks(lt) == 2); locktree_test_release_lock(lt, txnid_c, one, two);
lt->remove_overlapping_locks_for_txnid(txnid_c, one, two); locktree_test_release_lock(lt, txnid_b, three, four);
invariant(num_row_locks(lt) == 1); invariant(no_row_locks(lt));
lt->remove_overlapping_locks_for_txnid(txnid_b, three, four);
invariant(num_row_locks(lt) == 0);
// one txn, one range, one point
r = ACQUIRE_LOCK(txnid_a, two, three, nullptr);
invariant(r == 0);
invariant(num_row_locks(lt) == 1);
r = ACQUIRE_LOCK(txnid_a, four, four, nullptr);
invariant(r == 0);
invariant(num_row_locks(lt) == 2);
lt->remove_overlapping_locks_for_txnid(txnid_a, two, three);
invariant(num_row_locks(lt) == 1);
lt->remove_overlapping_locks_for_txnid(txnid_a, four, four);
invariant(num_row_locks(lt) == 0);
// two txns, one range, one point // two txns, one range, one point
r = ACQUIRE_LOCK(txnid_c, three, four, nullptr); r = ACQUIRE_LOCK(txnid_c, three, four, nullptr);
invariant(r == 0); invariant(r == 0);
invariant(num_row_locks(lt) == 1);
r = ACQUIRE_LOCK(txnid_d, one, one, nullptr); r = ACQUIRE_LOCK(txnid_d, one, one, nullptr);
invariant(r == 0); invariant(r == 0);
invariant(num_row_locks(lt) == 2); locktree_test_release_lock(lt, txnid_c, three, four);
lt->remove_overlapping_locks_for_txnid(txnid_c, three, four); locktree_test_release_lock(lt, txnid_d, one, one);
invariant(num_row_locks(lt) == 1); invariant(no_row_locks(lt));
lt->remove_overlapping_locks_for_txnid(txnid_d, one, one);
invariant(num_row_locks(lt) == 0);
#undef ACQUIRE_LOCK #undef ACQUIRE_LOCK
} }
...@@ -124,7 +99,7 @@ void locktree_unit_test::test_simple_lock(void) { ...@@ -124,7 +99,7 @@ void locktree_unit_test::test_simple_lock(void) {
for (int64_t i = 0; i < num_locks; i++) { for (int64_t i = 0; i < num_locks; i++) {
k.data = (void *) &keys[i]; k.data = (void *) &keys[i];
lt->remove_overlapping_locks_for_txnid(txnid_a, &k, &k); locktree_test_release_lock(lt, txnid_a, &k, &k);
} }
toku_free(keys); toku_free(keys);
......
...@@ -58,20 +58,10 @@ void locktree_unit_test::test_single_txnid_optimization(void) { ...@@ -58,20 +58,10 @@ void locktree_unit_test::test_single_txnid_optimization(void) {
lock_and_append_point_for_txnid_a(zero); lock_and_append_point_for_txnid_a(zero);
maybe_point_locks_for_txnid_b(2); maybe_point_locks_for_txnid_b(2);
// txnid b does not take a lock on iteration 3
if (where != 3) {
invariant(num_row_locks(lt) == 4);
} else {
invariant(num_row_locks(lt) == 3);
}
lt->release_locks(txnid_a, &buffer); lt->release_locks(txnid_a, &buffer);
// txnid b does not take a lock on iteration 3 // txnid b does not take a lock on iteration 3
if (where != 3) { if (where != 3) {
invariant(num_row_locks(lt) == 1);
struct verify_fn_obj { struct verify_fn_obj {
TXNID expected_txnid; TXNID expected_txnid;
keyrange *expected_range; keyrange *expected_range;
......
...@@ -56,20 +56,19 @@ class locktree_unit_test { ...@@ -56,20 +56,19 @@ class locktree_unit_test {
ltr.release(); ltr.release();
} }
static size_t num_row_locks(const locktree *lt) { static bool no_row_locks(const locktree *lt) {
struct count_fn_obj { return lt->m_rangetree->is_empty() && lt->m_sto_buffer.is_empty();
size_t count;
bool fn(const keyrange &range, TXNID txnid) {
(void) range;
(void) txnid;
count++;
return true;
}
} count_fn;
count_fn.count = 0;
locktree_iterate<count_fn_obj>(lt, &count_fn);
return count_fn.count;
} }
static void locktree_test_release_lock(locktree *lt, TXNID txnid, const DBT *left_key, const DBT *right_key) {
range_buffer buffer;
buffer.create();
buffer.append(left_key, right_key);
lt->release_locks(txnid, &buffer);
buffer.destroy();
}
friend class lock_request_unit_test;
}; };
} /* namespace toku */ } /* namespace toku */
......
...@@ -266,31 +266,23 @@ treenode *treenode::remove_root_of_subtree() { ...@@ -266,31 +266,23 @@ treenode *treenode::remove_root_of_subtree() {
return this; return this;
} }
uint64_t treenode::recursive_remove(uint64_t *mem_released) { void treenode::recursive_remove(void) {
// remove left and right subtrees
uint64_t nodes_removed = 0;
treenode *left = m_left_child.ptr; treenode *left = m_left_child.ptr;
if (left) { if (left) {
nodes_removed += left->recursive_remove(mem_released); left->recursive_remove();
} }
m_left_child.set(nullptr); m_left_child.set(nullptr);
treenode *right = m_right_child.ptr; treenode *right = m_right_child.ptr;
if (right) { if (right) {
nodes_removed += right->recursive_remove(mem_released); right->recursive_remove();
} }
m_right_child.set(nullptr); m_right_child.set(nullptr);
// note the amount of memory to-be released by this node
if (mem_released) {
*mem_released += m_range.get_memory_size();
}
// we do not take locks on the way down, so we know non-root nodes // we do not take locks on the way down, so we know non-root nodes
// are unlocked here and the caller is required to pass a locked // are unlocked here and the caller is required to pass a locked
// root, so this free is correct. // root, so this free is correct.
treenode::free(this); treenode::free(this);
return nodes_removed + 1;
} }
treenode *treenode::remove(const keyrange &range) { treenode *treenode::remove(const keyrange &range) {
......
...@@ -86,10 +86,7 @@ class treenode { ...@@ -86,10 +86,7 @@ class treenode {
// effect: removes this node and all of its children, recursively // effect: removes this node and all of its children, recursively
// requires: every node at and below this node is unlocked // requires: every node at and below this node is unlocked
// returns: the number of nodes removed void recursive_remove(void);
// returns: *mem_released is the total amount of keyrange memory released.
// mem_released does not account for treenode insertion overhead.
uint64_t recursive_remove(uint64_t *mem_released);
private: private:
......
...@@ -343,7 +343,9 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS) ...@@ -343,7 +343,9 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS)
# libtokudb.so. # libtokudb.so.
# We link the test with util directly so that the test code itself can use # We link the test with util directly so that the test code itself can use
# some of those things (i.e. kibbutz in the threaded tests). # some of those things (i.e. kibbutz in the threaded tests).
target_link_libraries(${base}.tdb util ${LIBTOKUDB} ${LIBTOKUPORTABILITY}) # We link the locktree so that threaded stress tests can call some
# functions (ie: lock escalation) directly.
target_link_libraries(${base}.tdb util locktree ${LIBTOKUDB} ${LIBTOKUPORTABILITY})
set_property(TARGET ${base}.tdb APPEND PROPERTY set_property(TARGET ${base}.tdb APPEND PROPERTY
COMPILE_DEFINITIONS "ENVDIR=\"dir.${bin}\";USE_TDB;IS_TDB=1;TOKUDB=1") COMPILE_DEFINITIONS "ENVDIR=\"dir.${bin}\";USE_TDB;IS_TDB=1;TOKUDB=1")
set_property(DIRECTORY APPEND PROPERTY ADDITIONAL_MAKE_CLEAN_FILES "dir.${bin}") set_property(DIRECTORY APPEND PROPERTY ADDITIONAL_MAKE_CLEAN_FILES "dir.${bin}")
...@@ -378,7 +380,7 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS) ...@@ -378,7 +380,7 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS)
function(add_custom_executable prefix binary source) function(add_custom_executable prefix binary source)
add_executable(${prefix}_${binary} ${source}) add_executable(${prefix}_${binary} ${source})
target_link_libraries(${prefix}_${binary} util ${LIBTOKUDB} ${LIBTOKUPORTABILITY}) target_link_libraries(${prefix}_${binary} util locktree ${LIBTOKUDB} ${LIBTOKUPORTABILITY})
set_target_properties(${prefix}_${binary} PROPERTIES set_target_properties(${prefix}_${binary} PROPERTIES
COMPILE_DEFINITIONS "ENVDIR=\"dir.${prefix}_${source}.tdb\";USE_TDB;IS_TDB=1;TOKUDB=1") COMPILE_DEFINITIONS "ENVDIR=\"dir.${prefix}_${source}.tdb\";USE_TDB;IS_TDB=1;TOKUDB=1")
set_property(DIRECTORY APPEND PROPERTY ADDITIONAL_MAKE_CLEAN_FILES "dir.${prefix}_${source}.tdb") set_property(DIRECTORY APPEND PROPERTY ADDITIONAL_MAKE_CLEAN_FILES "dir.${prefix}_${source}.tdb")
......
...@@ -40,9 +40,18 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -40,9 +40,18 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
myargs[0].operation_extra = &soe[0]; myargs[0].operation_extra = &soe[0];
myargs[0].operation = scan_op; myargs[0].operation = scan_op;
// make the lock escalation thread.
// it should sleep somewhere between 10 and 20
// seconds between each escalation.
struct lock_escalation_op_extra eoe;
eoe.min_sleep_time_micros = 10UL * (1000 * 1000);
eoe.max_sleep_time_micros = 20UL * (1000 * 1000);
myargs[1].operation_extra = &eoe;
myargs[1].operation = lock_escalation_op;
// make the threads that update the db // make the threads that update the db
struct update_op_args uoe = get_update_op_args(cli_args, NULL); struct update_op_args uoe = get_update_op_args(cli_args, NULL);
for (int i = 1; i < 1 + cli_args->num_update_threads; ++i) { for (int i = 2; i < 2 + cli_args->num_update_threads; ++i) {
myargs[i].operation_extra = &uoe; myargs[i].operation_extra = &uoe;
myargs[i].operation = update_op; myargs[i].operation = update_op;
myargs[i].do_prepare = false; myargs[i].do_prepare = false;
...@@ -50,7 +59,7 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -50,7 +59,7 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
// doing sequential updates. the rest of the threads // doing sequential updates. the rest of the threads
// will take point write locks on update as usual. // will take point write locks on update as usual.
// this ensures both ranges and points are stressed. // this ensures both ranges and points are stressed.
myargs[i].prelock_updates = i < 4 ? true : false; myargs[i].prelock_updates = i < 5 ? true : false;
} }
run_workers(myargs, num_threads, cli_args->num_seconds, false, cli_args); run_workers(myargs, num_threads, cli_args->num_seconds, false, cli_args);
......
...@@ -34,6 +34,8 @@ ...@@ -34,6 +34,8 @@
#include <util/rwlock.h> #include <util/rwlock.h>
#include <util/kibbutz.h> #include <util/kibbutz.h>
#include <src/ydb-internal.h>
#include <ft/ybt.h> #include <ft/ybt.h>
using namespace toku; using namespace toku;
...@@ -888,6 +890,27 @@ static int UU() verify_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra ...@@ -888,6 +890,27 @@ static int UU() verify_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra
return r; return r;
} }
struct lock_escalation_op_extra {
// sleep somewhere between these times before running escalation.
// this will add some chaos into the mix.
uint64_t min_sleep_time_micros;
uint64_t max_sleep_time_micros;
};
static int UU() lock_escalation_op(DB_TXN *UU(txn), ARG arg, void* operation_extra, void *UU(stats_extra)) {
struct lock_escalation_op_extra *CAST_FROM_VOIDP(extra, operation_extra);
if (extra->max_sleep_time_micros > 0) {
invariant(extra->max_sleep_time_micros >= extra->min_sleep_time_micros);
uint64_t extra_sleep_time = (extra->max_sleep_time_micros - extra->min_sleep_time_micros) + 1;
uint64_t sleep_time = extra->min_sleep_time_micros + (myrandom_r(arg->random_data) % extra_sleep_time);
usleep(sleep_time);
}
if (!arg->cli->nolocktree) {
toku_env_run_lock_escalation_for_test(arg->env);
}
return 0;
}
static int UU() scan_op(DB_TXN *txn, ARG UU(arg), void* operation_extra, void *UU(stats_extra)) { static int UU() scan_op(DB_TXN *txn, ARG UU(arg), void* operation_extra, void *UU(stats_extra)) {
struct scan_op_extra* CAST_FROM_VOIDP(extra, operation_extra); struct scan_op_extra* CAST_FROM_VOIDP(extra, operation_extra);
for (int i = 0; run_test && i < arg->cli->num_DBs; i++) { for (int i = 0; run_test && i < arg->cli->num_DBs; i++) {
......
...@@ -95,6 +95,12 @@ struct __toku_db_env_internal { ...@@ -95,6 +95,12 @@ struct __toku_db_env_internal {
int tmpdir_lockfd; int tmpdir_lockfd;
}; };
// test-only environment function for running lock escalation
static inline void toku_env_run_lock_escalation_for_test(DB_ENV *env) {
toku::locktree::manager *mgr = &env->i->ltm;
mgr->run_escalation_for_test();
}
// Common error handling macros and panic detection // Common error handling macros and panic detection
#define MAYBE_RETURN_ERROR(cond, status) if (cond) return status; #define MAYBE_RETURN_ERROR(cond, status) if (cond) return status;
#define HANDLE_PANICKED_ENV(env) if (toku_env_is_panicked(env)) { sleep(1); return EINVAL; } #define HANDLE_PANICKED_ENV(env) if (toku_env_is_panicked(env)) { sleep(1); return EINVAL; }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment