Commit 82a1bd8c authored by Yoni Fogel's avatar Yoni Fogel

addresses #523

implemented lock escalation MINUS flagging of messy transactions
implemented tests for lock escalation

git-svn-id: file:///svn/tokudb@2902 c7de825b-a66e-492c-adef-691d508d4ae1
parent 64ba924c
......@@ -122,10 +122,11 @@ int toku__lt_point_cmp(const toku_point* x, const toku_point* y) {
toku__recreate_DBT(&point_2, y->data_payload, y->data_len));
}
static inline BOOL toku__lt_fraction_ranges_free(toku_lock_tree* tree, u_int32_t denominator) {
assert(tree && tree->num_ranges && denominator);
return *tree->num_ranges <=
tree->max_ranges - (tree->max_ranges / denominator);
static inline BOOL toku__lt_percent_ranges_free(toku_lock_tree* tree,
u_int32_t percent) {
assert(tree && tree->num_ranges && (percent <= 100));
u_int64_t max_ranges64= tree->max_ranges;
return *tree->num_ranges <= max_ranges64 * (100 - percent) / 100;
}
/* Functions to update the range count and compare it with the
......@@ -283,6 +284,12 @@ static inline int toku__lt_selfwrite(toku_lock_tree* tree, DB_TXN* txn,
}
static inline BOOL toku__dominated(toku_range* query, toku_range* by) {
assert(query && by);
return (toku__lt_point_cmp(query->left, by->left) >= 0 &&
toku__lt_point_cmp(query->right, by->right) <= 0);
}
/*
This function only supports non-overlapping trees.
Uses the standard definition of dominated from the design document.
......@@ -316,8 +323,7 @@ static inline int toku__lt_rt_dominates(toku_lock_tree* tree, toku_range* query,
return 0;
}
assert(numfound == 1);
*dominated = (toku__lt_point_cmp(query->left, buf[0].left) >= 0 &&
toku__lt_point_cmp(query->right, buf[0].right) <= 0);
*dominated = toku__dominated(query, &buf[0]);
return 0;
}
......@@ -392,7 +398,8 @@ static inline int toku__lt_meets(toku_lock_tree* tree, toku_range* query,
/*
Determines whether 'query' meets 'rt' at txn2 not equal to txn.
This function supports all range trees, but queries must be a single point.
This function supports all range trees, but queries must either be a single point,
or the range tree is homogenous.
Uses the standard definition of 'query' meets 'tree' at 'data' from the
design document.
*/
......@@ -400,7 +407,7 @@ static inline int toku__lt_meets_peer(toku_lock_tree* tree, toku_range* query,
toku_range_tree* rt, BOOL is_homogenous,
DB_TXN* self, BOOL* met) {
assert(tree && query && rt && self && met);
assert(query->left == query->right);
assert(query->left == query->right || is_homogenous);
const u_int32_t query_size = is_homogenous ? 1 : 2;
toku_range buffer[2];
......@@ -1071,7 +1078,7 @@ static inline int toku__lt_write_range_conflicts_reads(toku_lock_tree* tree,
while ((forest = toku_rth_next(tree->rth)) != NULL) {
if (forest->self_read != NULL && forest->hash_key != txn) {
r = toku__lt_meets_peer(tree, query, forest->self_read, TRUE, txn,
r = toku__lt_meets_peer(tree, query, forest->self_read, TRUE, txn,///
&met);
if (r!=0) { goto cleanup; }
if (met) { r = DB_LOCK_NOTGRANTED; goto cleanup; }
......@@ -1082,7 +1089,13 @@ cleanup:
return r;
}
static inline int toku__border_escalation_trivial(toku_lock_tree* tree, toku_range* border_range, BOOL* trivial) {
/*
Tests whether a range from BorderWrite is trivially escalatable.
i.e. No read locks from other transactions overlap the range.
*/
static inline int toku__border_escalation_trivial(toku_lock_tree* tree,
toku_range* border_range,
BOOL* trivial) {
assert(tree && border_range && trivial);
int r = ENOSYS;
......@@ -1099,23 +1112,111 @@ cleanup:
return r;
}
static inline int toku__escalate_reads_from_border_range(toku_lock_tree* tree, toku_range* border_range) {
assert(tree && border_range);
return 0;
/* */
static inline int toku__escalate_writes_from_border_range(toku_lock_tree* tree,
toku_range* border_range) {
int r = ENOSYS;
if (!tree || !border_range) { r = EINVAL; goto cleanup; }
DB_TXN* txn = border_range->data;
toku_range_tree* self_write = toku__lt_ifexist_selfwrite(tree, txn);
assert(self_write);
toku_range query = *border_range;
u_int32_t numfound = 0;
query.data = NULL;
/*
* Delete all overlapping ranges
*/
r = toku_rt_find(self_write, &query, 0, &tree->buf, &tree->buflen, &numfound);
if (r != 0) { goto cleanup; }
u_int32_t i;
for (i = 0; i < numfound; i++) {
r = toku_rt_delete(self_write, &tree->buf[i]);
if (r != 0) { r = toku__lt_panic(tree, r); goto cleanup; }
/*
* Clean up memory that is not referenced by border_range.
*/
if (tree->buf[i].left != tree->buf[i].right &&
toku__lt_p_independent(tree->buf[i].left, border_range)) {
/* Do not double free if left and right are same point. */
toku__p_free(tree, tree->buf[i].left);
}
if (toku__lt_p_independent(tree->buf[i].right, border_range)) {
toku__p_free(tree, tree->buf[i].right);
}
}
/*
* Insert border_range into self_write table
*/
r = toku_rt_insert(self_write, border_range);
if (r != 0) { r = toku__lt_panic(tree, r); goto cleanup; }
toku__lt_range_incr(tree, numfound);
r = 0;
cleanup:
return r;
}
static inline int toku__escalate_writes_from_border_range(toku_lock_tree* tree, toku_range* border_range) {
assert(tree && border_range);
return 0;
static inline int toku__escalate_reads_from_border_range(toku_lock_tree* tree,
toku_range* border_range) {
int r = ENOSYS;
if (!tree || !border_range) { r = EINVAL; goto cleanup; }
DB_TXN* txn = border_range->data;
toku_range_tree* self_read = toku__lt_ifexist_selfread(tree, txn);
if (self_read == NULL) { r = 0; goto cleanup; }
toku_range query = *border_range;
u_int32_t numfound = 0;
query.data = NULL;
/*
* Delete all overlapping ranges
*/
r = toku_rt_find(self_read, &query, 0, &tree->buf, &tree->buflen, &numfound);
if (r != 0) { goto cleanup; }
u_int32_t i;
u_int32_t removed = 0;
for (i = 0; i < numfound; i++) {
if (!toku__dominated(&tree->buf[0], border_range)) { continue; }
r = toku_rt_delete(self_read, &tree->buf[i]);
if (r != 0) { r = toku__lt_panic(tree, r); goto cleanup; }
#if !defined(TOKU_RT_NOOVERLAPS)
r = toku_rt_delete(tree->mainread, &tree->buf[i]);
if (r != 0) { r = toku__lt_panic(tree, r); goto cleanup; }
#endif /* TOKU_RT_NOOVERLAPS */
removed++;
/*
* Clean up memory that is not referenced by border_range.
*/
if (tree->buf[i].left != tree->buf[i].right &&
toku__lt_p_independent(tree->buf[i].left, border_range)) {
/* Do not double free if left and right are same point. */
toku__p_free(tree, tree->buf[i].left);
}
if (toku__lt_p_independent(tree->buf[i].right, border_range)) {
toku__p_free(tree, tree->buf[i].right);
}
}
toku__lt_range_decr(tree, removed);
r = 0;
cleanup:
return r;
}
/*
* TODO: implement function
* For each range in BorderWrite:
* Check to see if range conflicts any read lock held by other transactions
* Replaces all writes that overlap with range
* Deletes all reads dominated by range
*/
static int toku__do_escalation(toku_lock_tree* tree, BOOL* locks_available) {
int r = ENOSYS;
if (!tree || !locks_available) { r = EINVAL; goto cleanup; }
if (!tree->lock_escalation_allowed) { r = EDOM; goto cleanup; }
toku_range_tree* border = tree->borderwrite;
assert(border);
toku_range border_range;
BOOL found = FALSE;
BOOL trivial = FALSE;
......@@ -1125,6 +1226,10 @@ static int toku__do_escalation(toku_lock_tree* tree, BOOL* locks_available) {
r = toku__border_escalation_trivial(tree, &border_range, &trivial);
if (r!=0) { goto cleanup; }
if (!trivial) { continue; }
/*
* At this point, we determine that escalation is simple,
* Attempt escalation
*/
r = toku__escalate_writes_from_border_range(tree, &border_range);
if (r!=0) { r = toku__lt_panic(tree, r); goto cleanup; }
r = toku__escalate_reads_from_border_range(tree, &border_range);
......@@ -1133,7 +1238,8 @@ static int toku__do_escalation(toku_lock_tree* tree, BOOL* locks_available) {
r = 0;
*locks_available = toku__lt_range_test_incr(tree, 0);
/* Escalation is allowed if 1/10th of the locks (or more) are free. */
tree->lock_escalation_allowed = toku__lt_fraction_ranges_free(tree, 10);
tree->lock_escalation_allowed = toku__lt_percent_ranges_free(tree,
TOKU_DISABLE_ESCALATION_THRESHOLD);
cleanup:
if (r!=0) {
if (tree && locks_available) {
......@@ -1470,6 +1576,10 @@ int toku_lt_unlock(toku_lock_tree* tree, DB_TXN* txn) {
toku__lt_range_decr(tree, ranges);
if (toku__lt_percent_ranges_free(tree, TOKU_ENABLE_ESCALATION_THRESHOLD)) {
tree->lock_escalation_allowed = TRUE;
}
return 0;
}
......
......@@ -30,6 +30,9 @@ typedef enum {
state */
} TOKU_LT_ERROR;
#define TOKU_DISABLE_ESCALATION_THRESHOLD 10
#define TOKU_ENABLE_ESCALATION_THRESHOLD 20
/** Convert error codes into a human-readable error message */
char* toku_lt_strerror(TOKU_LT_ERROR r /**< Error code */)
__attribute__((const,pure));
......
......@@ -34,6 +34,19 @@ int toku_keycompare (bytevec key1, ITEMLEN key1len, bytevec key2, ITEMLEN key2le
return key1len-key2len;
}
int intcmp(DB *db __attribute__((__unused__)), const DBT* a, const DBT* b) {
int x = *(int*)a->data;
int y = *(int*)b->data;
return x - y;
}
int charcmp(DB *db __attribute__((__unused__)), const DBT* a, const DBT* b) {
int x = *(char*)a->data;
int y = *(char*)b->data;
return x - y;
}
int dbcmp (DB *db __attribute__((__unused__)), const DBT *a, const DBT*b) {
return toku_keycompare(a->data, a->size, b->data, b->size);
......
/* We are going to test whether create and close properly check their input. */
#include "test.h"
toku_range_tree* toku__lt_ifexist_selfwrite(toku_lock_tree* tree, DB_TXN* txn);
toku_range_tree* toku__lt_ifexist_selfread(toku_lock_tree* tree, DB_TXN* txn);
int r;
toku_lock_tree* lt = NULL;
DB* db = (DB*)1;
u_int32_t max_locks = 10;
u_int32_t num_locks = 0;
BOOL duplicates = FALSE;
int nums[10000];
DBT _key_left[2];
DBT _key_right[2];
DBT _data_left[2];
DBT _data_right[2];
DBT* key_left[2] ;
DBT* key_right[2] ;
DBT* data_left [2] ;
DBT* data_right[2] ;
toku_point qleft, qright;
toku_range query;
toku_range* buf;
unsigned buflen;
unsigned numfound;
void init_query(BOOL dups) {
init_point(&qleft, lt);
init_point(&qright, lt);
qleft.key_payload = (void *) toku_lt_neg_infinity;
qright.key_payload = (void *) toku_lt_infinity;
if (dups) {
qleft.data_payload = qleft.key_payload;
qright.data_payload = qright.key_payload;
}
memset(&query,0,sizeof(query));
query.left = &qleft;
query.right = &qright;
}
void setup_tree(BOOL dups) {
num_locks = 0;
r = toku_lt_create(&lt, db, dups, dbpanic, max_locks, &num_locks, intcmp, charcmp,
toku_malloc, toku_free, toku_realloc);
CKERR(r);
assert(lt);
init_query(dups);
}
void close_tree(void) {
assert(lt);
r = toku_lt_close(lt);
CKERR(r);
lt = NULL;
}
typedef enum { null = -1, infinite = -2, neg_infinite = -3 } lt_infty;
DBT* set_to_infty(DBT *dbt, lt_infty value) {
if (value == infinite) return (DBT*)toku_lt_infinity;
if (value == neg_infinite) return (DBT*)toku_lt_neg_infinity;
if (value == null) return dbt_init(dbt, NULL, 0);
assert(value >= 0);
return dbt_init(dbt, &nums[value], sizeof(nums[0]));
}
void lt_insert(BOOL dups, int r_expect, char txn, int key_l, int data_l,
int key_r, int data_r, BOOL read_flag) {
DBT _key_left;
DBT _key_right;
DBT _data_left;
DBT _data_right;
DBT* key_left = &_key_left;
DBT* key_right = &_key_right;
DBT* data_left = dups ? &_data_left : NULL;
DBT* data_right = dups ? &_data_right: NULL;
key_left = set_to_infty(key_left, key_l);
key_right = set_to_infty(key_right, key_r);
if (dups) {
if (key_left != &_key_left) data_left = key_left;
else data_left = set_to_infty(data_left, data_l);
if (key_right != &_key_right) data_right = key_right;
else data_right = set_to_infty(data_right, data_r);
assert(key_left && data_left);
assert(!read_flag || (key_right && data_right));
} else {
data_left = data_right = NULL;
assert(key_left && !data_left);
assert(!read_flag || (key_right && !data_right));
}
DB_TXN* local_txn = (DB_TXN*) (size_t) txn;
if (read_flag)
r = toku_lt_acquire_range_read_lock(lt, local_txn, key_left, data_left,
key_right, data_right);
else
r = toku_lt_acquire_write_lock(lt, local_txn, key_left, data_left);
CKERR2(r, r_expect);
}
void lt_insert_read(BOOL dups, int r_expect, char txn, int key_l, int data_l,
int key_r, int data_r) {
lt_insert(dups, r_expect, txn, key_l, data_l, key_r, data_r, TRUE);
}
void lt_insert_write(BOOL dups, int r_expect, char txn, int key_l, int data_l) {
lt_insert(dups, r_expect, txn, key_l, data_l, 0, 0, FALSE);
}
void setup_payload_len(void** payload, u_int32_t* len, int val) {
assert(payload && len);
DBT temp;
*payload = set_to_infty(&temp, val);
if (val < 0) {
*len = 0;
}
else {
*len = sizeof(nums[0]);
*payload = temp.data;
}
}
void lt_find(BOOL dups, toku_range_tree* rt,
unsigned k, int key_l, int data_l,
int key_r, int data_r,
char char_txn) {
r = toku_rt_find(rt, &query, 0, &buf, &buflen, &numfound);
CKERR(r);
assert(numfound==k);
DB_TXN* find_txn = (DB_TXN *) (size_t) char_txn;
toku_point left, right;
init_point(&left, lt);
setup_payload_len(&left.key_payload, &left.key_len, key_l);
if (dups) {
if (key_l < null) left.data_payload = left.key_payload;
else setup_payload_len(&left.data_payload, &left.data_len, data_l);
}
init_point(&right, lt);
setup_payload_len(&right.key_payload, &right.key_len, key_r);
if (dups) {
if (key_r < null) right.data_payload = right.key_payload;
else setup_payload_len(&right.data_payload, &right.data_len, data_r);
}
unsigned i;
for (i = 0; i < numfound; i++) {
if (toku__lt_point_cmp(buf[i].left, &left ) == 0 &&
toku__lt_point_cmp(buf[i].right, &right) == 0 &&
buf[i].data == find_txn) return;
}
assert(FALSE); //Crash since we didn't find it.
}
void lt_unlock(char ctxn) {
int r;
r = toku_lt_unlock(lt, (DB_TXN *) (size_t) ctxn);
CKERR(r);
}
void runtest(BOOL dups) {
/* ********************* */
setup_tree(dups);
lt_insert_write(dups, 0, 'a', 1, 1);
close_tree();
/* ********************* */
setup_tree(dups);
lt_insert_write(dups, 0, 'a', 2, 1);
lt_insert_write(dups, 0, 'a', 1, 1);
close_tree();
/* ********************* */
setup_tree(dups);
lt_insert_write(dups, 0, 'a', 1, 1);
lt_insert_write(dups, 0, 'a', 2, 1);
lt_insert_write(dups, 0, 'a', 1, 1);
close_tree();
/* ********************* */
setup_tree(dups);
lt_insert_write(dups, 0, 'a', 1, 1);
lt_insert_read (dups, 0, 'a', 1, 1, 1, 1);
close_tree();
/* ********************* */
setup_tree(dups);
lt_insert_write(dups, 0, 'a', 1, 1);
lt_insert_read (dups, DB_LOCK_NOTGRANTED, 'b', 1, 1, 1, 1);
close_tree();
/* ********************* */
setup_tree(dups);
lt_insert_read (dups, 0, 'b', 1, 1, 1, 1);
lt_insert_write(dups, DB_LOCK_NOTGRANTED, 'a', 1, 1);
close_tree();
/* ********************* */
setup_tree(dups);
lt_insert_write(dups, 0, 'a', 1, 1);
lt_insert_write(dups, 0, 'a', 2, 1);
lt_insert_write(dups, 0, 'a', 3, 1);
lt_insert_write(dups, 0, 'a', 4, 1);
lt_insert_write(dups, 0, 'a', 5, 1);
lt_insert_read (dups, DB_LOCK_NOTGRANTED, 'b', 2, 1, 4, 1);
close_tree();
/* ********************* */
setup_tree(dups);
lt_insert_write(dups, 0, 'a', 1, 1);
lt_insert_write(dups, 0, 'a', 2, 1);
lt_insert_write(dups, 0, 'a', 3, 1);
lt_insert_write(dups, 0, 'a', 4, 1);
lt_insert_write(dups, 0, 'a', 5, 1);
lt_insert_write (dups, DB_LOCK_NOTGRANTED, 'b', 2, 1);
close_tree();
/* ********************* */
setup_tree(dups);
lt_insert_write(dups, 0, 'a', 1, 1);
lt_insert_write(dups, 0, 'a', 2, 1);
lt_insert_write(dups, 0, 'a', 4, 1);
lt_insert_write(dups, 0, 'a', 5, 1);
lt_insert_read (dups, 0, 'b', 3, 1, 3, 1);
close_tree();
/* ********************* */
setup_tree(dups);
lt_insert_write(dups, 0, 'a', 1, 1);
lt_insert_write(dups, 0, 'a', 2, 1);
lt_insert_write(dups, 0, 'a', 4, 1);
lt_insert_write(dups, 0, 'a', 5, 1);
lt_insert_read (dups, 0, 'b', 3, 1, 3, 1);
close_tree();
/* ********************* */
setup_tree(dups);
lt_insert_write(dups, 0, 'b', 1, 1);
lt_insert_write(dups, 0, 'b', 2, 1);
lt_insert_write(dups, 0, 'b', 3, 1);
lt_insert_write(dups, 0, 'b', 4, 1);
lt_insert_write(dups, 0, 'a', 5, 1);
lt_insert_write(dups, 0, 'a', 6, 1);
lt_insert_write(dups, 0, 'a', 7, 1);
lt_insert_write(dups, 0, 'a', 8, 1);
lt_insert_write(dups, 0, 'a', 9, 1);
lt_insert_read (dups, DB_LOCK_NOTGRANTED, 'a', 3, 1, 7, 1);
close_tree();
/* ********************* */
setup_tree(dups);
lt_insert_write(dups, 0, 'b', 1, 1);
lt_insert_write(dups, 0, 'b', 2, 1);
lt_insert_write(dups, 0, 'b', 3, 1);
lt_insert_write(dups, 0, 'b', 4, 1);
lt_insert_write(dups, 0, 'b', 5, 1);
lt_insert_write(dups, 0, 'b', 6, 1);
lt_insert_write(dups, 0, 'b', 7, 1);
lt_insert_write(dups, 0, 'b', 8, 1);
lt_insert_write(dups, 0, 'b', 9, 1);
lt_insert_read (dups, DB_LOCK_NOTGRANTED, 'a', 3, 1, 7, 1);
close_tree();
/* ********************* */
setup_tree(dups);
lt_insert_write(dups, 0, 'a', 1, 1);
lt_insert_write(dups, 0, 'a', 2, 1);
lt_insert_write(dups, 0, 'a', 3, 1);
lt_insert_write(dups, 0, 'a', 4, 1);
lt_insert_read (dups, 0, 'a', 3, 1, 7, 1);
close_tree();
/* ********************* */
setup_tree(dups);
lt_insert_write(dups, 0, 'b', 1, 1);
lt_insert_write(dups, 0, 'b', 2, 1);
lt_insert_write(dups, 0, 'b', 3, 1);
lt_insert_write(dups, 0, 'b', 4, 1);
lt_insert_read (dups, DB_LOCK_NOTGRANTED, 'a', 3, 1, 7, 1);
close_tree();
/* ********************* */
setup_tree(dups);
lt_insert_write(dups, 0, 'a', 1, 1);
lt_insert_write(dups, 0, 'a', 2, 1);
lt_insert_write(dups, 0, 'a', 4, 1);
lt_insert_write(dups, 0, 'a', 5, 1);
lt_insert_write(dups, 0, 'a', 3, 1);
close_tree();
/* ********************* */
setup_tree(dups);
lt_insert_write(dups, 0, 'a', 1, 1);
lt_insert_write(dups, 0, 'a', 2, 1);
lt_insert_write(dups, 0, 'b', 4, 1);
lt_insert_write(dups, 0, 'b', 5, 1);
lt_insert_write(dups, 0, 'a', 3, 1);
close_tree();
/* ********************* */
setup_tree(dups);
lt_insert_write(dups, 0, 'a', 1, 1);
lt_insert_write(dups, 0, 'a', 2, 1);
lt_insert_write(dups, 0, 'a', 3, 1);
lt_insert_write(dups, 0, 'a', 4, 1);
lt_insert_read (dups, DB_LOCK_NOTGRANTED, 'b', 3, 1, 3, 1);
lt_unlock('a');
lt_insert_write(dups, 0, 'b', 3, 1);
lt_insert_read (dups, DB_LOCK_NOTGRANTED, 'a', 3, 1, 3, 1);
lt_unlock('b');
lt_insert_read (dups, 0, 'a', 3, 1, 3, 1);
close_tree();
/* ********************* */
setup_tree(dups);
lt_insert_write(dups, 0, 'a', 1, 1);
lt_insert_write(dups, 0, 'a', 3, 1);
lt_insert_write(dups, 0, 'b', 2, 1);
lt_unlock('b');
close_tree();
/* ********************* */
}
void run_escalation_test(BOOL dups) {
int i = 0;
/* ******************** */
/* 1 transaction request 1000 write locks, make sure it succeeds*/
setup_tree(dups);
assert(lt->lock_escalation_allowed);
for (i = 0; i < 1000; i++) {
lt_insert_write(dups, 0, 'a', i, i);
assert(lt->lock_escalation_allowed);
}
close_tree();
/* ******************** */
/* interleaving transactions,
TXN A grabs 1 3 5 7 9
TXN B grabs 2 4 6 8 10
make sure lock escalation fails, and that we run out of locks */
setup_tree(dups);
// this should grab ten locks successfully
for (i = 1; i < 10; i+=2) {
lt_insert_write(dups, 0, 'a', i, i);
lt_insert_write(dups, 0, 'b', i+1, i+1);
}
lt_insert_write(dups, ENOMEM, 'a', 100, 100);
lt_insert_write(dups, ENOMEM, 'b', 100, 100);
lt_insert_write(dups, ENOMEM, 'c', 100, 100);
close_tree();
/* ******************** */
/*
test that escalation allowed flag goes from FALSE->TRUE->FALSE
TXN A grabs 1 3 5 7 9
TXN B grabs 2 4 6 8 10
try to grab another lock, fail, lock escalation should be disabled
txn B gets freed
lock escalation should be reenabled
txn C grabs 60,70,80,90,100
lock escalation should work
*/
setup_tree(dups);
assert(lt->lock_escalation_allowed);
// this should grab ten locks successfully
for (i = 1; i < 10; i+=2) {
lt_insert_write(dups, 0, 'a', i, i);
lt_insert_write(dups, 0, 'b', i+1, i+1);
}
assert(lt->lock_escalation_allowed);
lt_insert_write(dups, ENOMEM, 'a', 100, 100);
assert(!lt->lock_escalation_allowed);
lt_insert_write(dups, ENOMEM, 'b', 100, 100);
assert(!lt->lock_escalation_allowed);
lt_insert_write(dups, ENOMEM, 'c', 100, 100);
assert(!lt->lock_escalation_allowed);
lt_insert_read(dups, ENOMEM, 'a', 100, 100, 100, 100);
lt_insert_read(dups, ENOMEM, 'b', 100, 100, 100, 100);
lt_insert_read(dups, ENOMEM, 'c', 100, 100, 100, 100);
lt_unlock('b');
assert(lt->lock_escalation_allowed);
for (i = 50; i < 1000; i++) {
lt_insert_write(dups, 0, 'c', i, i);
assert(lt->lock_escalation_allowed);
}
close_tree();
/* ******************** */
/*
txn A grabs 0,1,2,...,8 (9 locks)
txn B grabs read lock [5,7]
txn C attempts to grab lock, escalation, and lock grab, should fail
lock
*/
setup_tree(dups);
assert(lt->lock_escalation_allowed);
// this should grab ten locks successfully
for (i = 0; i < 10; i ++) {
if (i == 2 || i == 5) { continue; }
lt_insert_write(dups, 0, 'a', i, i);
}
lt_insert_read (dups, 0, 'b', 5, 5, 5, 5);
lt_insert_read (dups, 0, 'b', 2, 2, 2, 2);
lt_insert_write(dups, ENOMEM, 'a', 100, 100);
lt_insert_write(dups, ENOMEM, 'b', 100, 100);
lt_insert_write(dups, ENOMEM, 'c', 100, 100);
lt_insert_read(dups, ENOMEM, 'a', 100, 100, 100, 100);
lt_insert_read(dups, ENOMEM, 'b', 100, 100, 100, 100);
lt_insert_read(dups, ENOMEM, 'c', 100, 100, 100, 100);
lt_unlock('b');
assert(lt->lock_escalation_allowed);
for (i = 50; i < 1000; i++) {
lt_insert_write(dups, 0, 'c', i, i);
assert(lt->lock_escalation_allowed);
}
close_tree();
/* ******************** */
#if 0 //Only use when messy transactions are enabled.
/*
txn A grabs 0,1,2,...,8 (9 locks)
txn B grabs read lock [5,7]
txn C attempts to grab lock, escalation, and lock grab, should fail
lock
*/
setup_tree(dups);
assert(lt->lock_escalation_allowed);
// this should grab ten locks successfully
for (i = 0; i < 7; i++) {
lt_insert_write(dups, 0, 'a', i, i);
}
lt_insert_read (dups, 0, 'b', 5, 5, 6, 6);
lt_insert_read (dups, 0, 'b', 2, 2, 3, 3);
lt_insert_write(dups, ENOMEM, 'a', 100, 100);
lt_insert_write(dups, ENOMEM, 'b', 100, 100);
lt_insert_write(dups, ENOMEM, 'c', 100, 100);
lt_insert_read(dups, ENOMEM, 'a', 100, 100, 100, 100);
lt_insert_read(dups, ENOMEM, 'b', 100, 100, 100, 100);
lt_insert_read(dups, ENOMEM, 'c', 100, 100, 100, 100);
lt_unlock('b');
assert(lt->lock_escalation_allowed);
for (i = 50; i < 1000; i++) {
lt_insert_write(dups, 0, 'c', i, i);
assert(lt->lock_escalation_allowed);
}
close_tree();
#endif
/* ******************** */
/* escalate on read lock, */
setup_tree(dups);
for (i = 0; i < 10; i++) {
lt_insert_write(dups, 0, 'a', i, i);
}
lt_insert_read(dups, 0, 'a', 10, 10, 10, 10);
close_tree();
/* ******************** */
/* escalate on read lock of different transaction. */
setup_tree(dups);
for (i = 0; i < 10; i++) {
lt_insert_write(dups, 0, 'a', i, i);
}
lt_insert_read(dups, 0, 'b', 10, 10, 10, 10);
close_tree();
/* ******************** */
/* txn A grabs write lock 0,9
txn A grabs read lock 1,2,3,4,5,6,7,8
txn B grabs write lock 11, 12, should succeed */
setup_tree(dups);
for (i = 1; i < 9; i++) {
lt_insert_read(dups, 0, 'a', i, i, i, i);
}
lt_insert_write(dups, 0, 'a', 0, 0);
lt_insert_write(dups, 0, 'a', 9, 9);
for (i = 50; i < 1000; i++) {
lt_insert_write(dups, 0, 'b', i, i);
assert(lt->lock_escalation_allowed);
}
close_tree();
/* ******************** */
/* [1-A-5] [10-B-15] [20-A-25] BORDER WRITE
[2B] [6C] [12A] [22A] READ LOCKS
check that only last borderwrite range is escalated */
setup_tree(dups);
lt_insert_write(dups, 0, 'a', 1, 1);
lt_insert_write(dups, 0, 'a', 5, 5);
lt_insert_write(dups, 0, 'b', 10, 10);
lt_insert_write(dups, 0, 'b', 15, 15);
lt_insert_write(dups, 0, 'a', 20, 20);
lt_insert_write(dups, 0, 'a', 23, 23);
lt_insert_write(dups, 0, 'a', 25, 25);
lt_insert_read(dups, 0, 'b', 2, 2, 2, 2);
lt_insert_read(dups, 0, 'a', 12, 12, 12, 12);
lt_insert_read(dups, 0, 'a', 22, 22, 22, 22);
lt_insert_read(dups, 0, 'a', 100, 100, 100, 100);
lt_insert_write(dups, DB_LOCK_NOTGRANTED, 'b', 24, 24);
lt_insert_write(dups, 0, 'a', 14, 14);
lt_insert_write(dups, 0, 'b', 4, 4);
close_tree();
/* ******************** */
}
void init_test(void) {
unsigned i;
for (i = 0; i < sizeof(nums)/sizeof(nums[0]); i++) nums[i] = i;
buflen = 64;
buf = (toku_range*) toku_malloc(buflen*sizeof(toku_range));
}
int main(int argc, const char *argv[]) {
parse_args(argc, argv);
init_test();
run_escalation_test(FALSE);
run_escalation_test(TRUE);
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment