Commit 07c6b9a0 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

#4761 hold the lock tree manager mutex and the lock tree mutex when doing lock...

#4761 hold the lock tree manager mutex and the lock tree mutex when doing lock escalation refs[t:4761]

git-svn-id: file:///svn/toku/tokudb@42329 c7de825b-a66e-492c-adef-691d508d4ae1
parent ce59e63b
......@@ -106,6 +106,34 @@ static const DBT __toku_lt_neg_infinity;
const DBT* const toku_lt_infinity = &__toku_lt_infinity;
const DBT* const toku_lt_neg_infinity = &__toku_lt_neg_infinity;
static void
ltm_mutex_lock(toku_ltm *mgr) {
toku_mutex_lock(&mgr->mutex);
assert(!mgr->mutex_locked);
mgr->mutex_locked = true;
}
static void
ltm_mutex_unlock(toku_ltm *mgr) {
assert(mgr->mutex_locked);
mgr->mutex_locked = false;
toku_mutex_unlock(&mgr->mutex);
}
static void
lt_mutex_lock(toku_lock_tree *tree) {
toku_mutex_lock(&tree->mutex);
assert(!tree->mutex_locked);
tree->mutex_locked = true;
}
static void
lt_mutex_unlock(toku_lock_tree *tree) {
assert(tree->mutex_locked);
tree->mutex_locked = false;
toku_mutex_unlock(&tree->mutex);
}
char*
toku_lt_strerror(TOKU_LT_ERROR r) {
if (r >= 0)
......@@ -166,13 +194,13 @@ lt_txn_cmp(const TXNID a, const TXNID b) {
}
static inline void
toku_ltm_remove_lt(toku_ltm* mgr, toku_lock_tree* lt) {
ltm_remove_lt(toku_ltm* mgr, toku_lock_tree* lt) {
assert(mgr && lt);
toku_lth_delete(mgr->lth, lt);
}
static inline int
toku_ltm_add_lt(toku_ltm* mgr, toku_lock_tree* lt) {
ltm_add_lt(toku_ltm* mgr, toku_lock_tree* lt) {
assert(mgr && lt);
return toku_lth_insert(mgr->lth, lt);
}
......@@ -243,6 +271,7 @@ toku_ltm_create(toku_ltm** pmgr,
r = ENOMEM; goto cleanup;
}
toku_mutex_init(&mgr->mutex, NULL);
mgr->mutex_locked = false;
DRD_IGNORE_VAR(mgr->status);
r = 0;
*pmgr = mgr;
......@@ -1354,6 +1383,7 @@ toku_lt_create(toku_lock_tree** ptree,
goto cleanup;
toku_lock_request_tree_init(tmp_tree);
toku_mutex_init(&tmp_tree->mutex, NULL);
tmp_tree->mutex_locked = false;
tmp_tree->ref_count = 1;
*ptree = tmp_tree;
r = 0;
......@@ -1382,12 +1412,12 @@ void
toku_ltm_invalidate_lt(toku_ltm* mgr, DICTIONARY_ID dict_id) {
assert(mgr && dict_id.dictid != DICTIONARY_ID_NONE.dictid);
toku_lt_map* map = NULL;
toku_mutex_lock(&mgr->mutex);
ltm_mutex_lock(mgr);
map = toku_idlth_find(mgr->idlth, dict_id);
if (map) {
toku_idlth_delete(mgr->idlth, dict_id);
}
toku_mutex_unlock(&mgr->mutex);
ltm_mutex_unlock(mgr);
}
static inline void
......@@ -1411,7 +1441,7 @@ toku_ltm_get_lt(toku_ltm* mgr, toku_lock_tree** ptree, DICTIONARY_ID dict_id, DB
bool added_to_idlth = FALSE;
bool added_extant_db = FALSE;
toku_mutex_lock(&mgr->mutex);
ltm_mutex_lock(mgr);
map = toku_idlth_find(mgr->idlth, dict_id);
if (map != NULL) {
/* Load already existing lock tree. */
......@@ -1430,7 +1460,7 @@ toku_ltm_get_lt(toku_ltm* mgr, toku_lock_tree** ptree, DICTIONARY_ID dict_id, DB
goto cleanup;
toku_lt_set_dict_id(tree, dict_id);
/* add tree to ltm */
r = toku_ltm_add_lt(mgr, tree);
r = ltm_add_lt(mgr, tree);
if (r != 0)
goto cleanup;
added_to_ltm = TRUE;
......@@ -1452,7 +1482,7 @@ toku_ltm_get_lt(toku_ltm* mgr, toku_lock_tree** ptree, DICTIONARY_ID dict_id, DB
*ptree = tree;
r = 0;
cleanup:
toku_mutex_unlock(&mgr->mutex);
ltm_mutex_unlock(mgr);
if (r == 0) {
mgr->STATUS_VALUE(LTM_LT_CREATE)++;
mgr->STATUS_VALUE(LTM_LT_NUM)++;
......@@ -1461,15 +1491,15 @@ toku_ltm_get_lt(toku_ltm* mgr, toku_lock_tree** ptree, DICTIONARY_ID dict_id, DB
}
else {
if (tree != NULL) {
toku_mutex_lock(&mgr->mutex);
ltm_mutex_lock(mgr);
if (added_to_ltm)
toku_ltm_remove_lt(mgr, tree);
ltm_remove_lt(mgr, tree);
if (added_to_idlth)
toku_idlth_delete(mgr->idlth, dict_id);
if (added_extant_db)
lt_remove_db(tree, db);
toku_lt_close(tree);
toku_mutex_unlock(&mgr->mutex);
ltm_mutex_unlock(mgr);
}
mgr->STATUS_VALUE(LTM_LT_CREATE_FAIL)++;
}
......@@ -1521,6 +1551,8 @@ toku_lt_close(toku_lock_tree* tree) {
static int
lt_try_acquire_range_read_lock(toku_lock_tree* tree, DB* db, TXNID txn, const DBT* key_left, const DBT* key_right) {
assert(tree->mutex_locked); // locked by this thread
int r;
toku_point left;
toku_point right;
......@@ -1771,6 +1803,7 @@ lt_escalate_write_locks(toku_lock_tree* tree) {
static int
lt_do_escalation(toku_lock_tree* lt) {
assert(lt);
assert(lt->mutex_locked);
int r = ENOSYS;
DB* db; // extract db from lt
OMTVALUE dbv;
......@@ -1811,29 +1844,33 @@ ltm_do_escalation(toku_ltm* mgr) {
int r = ENOSYS;
toku_lock_tree* lt = NULL;
ltm_mutex_lock(mgr);
toku_lth_start_scan(mgr->lth); // initialize iterator in mgr
while ((lt = toku_lth_next(mgr->lth)) != NULL) {
lt_mutex_lock(lt);
r = lt_do_escalation(lt);
lt_mutex_unlock(lt);
if (r != 0)
goto cleanup;
}
r = 0;
cleanup:
ltm_mutex_unlock(mgr);
return r;
}
static int
toku_lt_acquire_range_read_lock_internal(toku_lock_tree* tree, DB* db, TXNID txn, const DBT* key_left, const DBT* key_right) {
lt_acquire_range_read_lock(toku_lock_tree* tree, DB* db, TXNID txn, const DBT* key_left, const DBT* key_right, bool do_escalation) {
int r = ENOSYS;
r = lt_try_acquire_range_read_lock(tree, db, txn,
key_left, key_right);
if (r==TOKUDB_OUT_OF_LOCKS) {
r = lt_try_acquire_range_read_lock(tree, db, txn, key_left, key_right);
if (r==TOKUDB_OUT_OF_LOCKS && do_escalation) {
lt_mutex_unlock(tree);
r = ltm_do_escalation(tree->mgr);
lt_mutex_lock(tree);
if (r == 0) {
r = lt_try_acquire_range_read_lock(tree, db, txn,
key_left, key_right);
r = lt_try_acquire_range_read_lock(tree, db, txn, key_left, key_right);
if (r == 0) {
tree->mgr->STATUS_VALUE(LTM_LOCK_ESCALATION_SUCCESSES)++;
}
......@@ -1862,9 +1899,9 @@ toku_lt_acquire_range_read_lock(toku_lock_tree* tree, DB* db, TXNID txn, const D
if (!tree || !db || !key_left || !key_right)
r = EINVAL;
if (r == 0) {
toku_mutex_lock(&tree->mutex);
r = toku_lt_acquire_range_read_lock_internal(tree, db, txn, key_left, key_right);
toku_mutex_unlock(&tree->mutex);
lt_mutex_lock(tree);
r = lt_acquire_range_read_lock(tree, db, txn, key_left, key_right, true);
lt_mutex_unlock(tree);
}
return r;
}
......@@ -1876,6 +1913,8 @@ toku_lt_acquire_read_lock(toku_lock_tree* tree, DB* db, TXNID txn, const DBT* ke
static int
lt_try_acquire_range_write_lock(toku_lock_tree* tree, DB* db, TXNID txn, const DBT* key_left, const DBT* key_right) {
assert(tree->mutex_locked);
int r;
toku_point left;
toku_point right;
......@@ -1961,12 +2000,14 @@ lt_try_acquire_range_write_lock(toku_lock_tree* tree, DB* db, TXNID txn, const D
}
static int
toku_lt_acquire_range_write_lock_internal(toku_lock_tree* tree, DB* db, TXNID txn, const DBT* key_left, const DBT* key_right) {
lt_acquire_range_write_lock(toku_lock_tree* tree, DB* db, TXNID txn, const DBT* key_left, const DBT* key_right, bool do_escalation) {
int r = ENOSYS;
r = lt_try_acquire_range_write_lock(tree, db, txn, key_left, key_right);
if (r == TOKUDB_OUT_OF_LOCKS) {
if (r == TOKUDB_OUT_OF_LOCKS && do_escalation) {
lt_mutex_unlock(tree);
r = ltm_do_escalation(tree->mgr);
lt_mutex_lock(tree);
if (r == 0) {
r = lt_try_acquire_range_write_lock(tree, db, txn, key_left, key_right);
if (r == 0) {
......@@ -1997,9 +2038,9 @@ toku_lt_acquire_range_write_lock(toku_lock_tree* tree, DB* db, TXNID txn, const
if (!tree || !db || !key_left || !key_right)
r = EINVAL;
if (r == 0) {
toku_mutex_lock(&tree->mutex);
r = toku_lt_acquire_range_write_lock_internal(tree, db, txn, key_left, key_right);
toku_mutex_unlock(&tree->mutex);
lt_mutex_lock(tree);
r = lt_acquire_range_write_lock(tree, db, txn, key_left, key_right, true);
lt_mutex_unlock(tree);
}
return r;
}
......@@ -2115,6 +2156,8 @@ lt_border_delete(toku_lock_tree* tree, toku_range_tree* rt) {
static inline int
lt_unlock_txn(toku_lock_tree* tree, TXNID txn) {
assert(tree->mutex_locked);
if (!tree)
return EINVAL;
int r;
......@@ -2169,20 +2212,21 @@ toku_lt_unlock_txn(toku_lock_tree* tree, TXNID txn) {
if (!tree) {
r = EINVAL; goto cleanup;
}
toku_mutex_lock(&tree->mutex);
lt_mutex_lock(tree);
if (toku_omt_size(tree->dbs) > 0) {
lt_unlock_txn(tree, txn);
lt_retry_lock_requests(tree);
} else {
r = toku_rth_insert(tree->txns_to_unlock, txn);
}
toku_mutex_unlock(&tree->mutex);
lt_mutex_unlock(tree);
cleanup:
return r;
}
static void
lt_unlock_deferred_txns(toku_lock_tree *tree) {
lt_mutex_lock(tree);
toku_rth_start_scan(tree->txns_to_unlock);
rt_forest *forest;
while ((forest = toku_rth_next(tree->txns_to_unlock)) != NULL) {
......@@ -2191,6 +2235,7 @@ lt_unlock_deferred_txns(toku_lock_tree *tree) {
}
toku_rth_clear(tree->txns_to_unlock);
lt_retry_lock_requests(tree);
lt_mutex_unlock(tree);
}
void
......@@ -2202,13 +2247,13 @@ toku_lt_add_ref(toku_lock_tree* tree) {
static void
toku_ltm_stop_managing_lt(toku_ltm* mgr, toku_lock_tree* tree) {
toku_mutex_lock(&mgr->mutex);
toku_ltm_remove_lt(mgr, tree);
ltm_mutex_lock(mgr);
ltm_remove_lt(mgr, tree);
toku_lt_map* map = toku_idlth_find(mgr->idlth, tree->dict_id);
if (map && map->tree == tree) {
toku_idlth_delete(mgr->idlth, tree->dict_id);
}
toku_mutex_unlock(&mgr->mutex);
ltm_mutex_unlock(mgr);
}
int
......@@ -2245,7 +2290,7 @@ find_db (OMTVALUE v, void *dbv) {
static void
lt_add_db(toku_lock_tree* tree, DB *db) {
toku_mutex_lock(&tree->mutex);
lt_mutex_lock(tree);
if (db != NULL) {
int r;
OMTVALUE get_dbv = NULL;
......@@ -2255,12 +2300,12 @@ lt_add_db(toku_lock_tree* tree, DB *db) {
r = toku_omt_insert_at(tree->dbs, db, index);
assert_zero(r);
}
toku_mutex_unlock(&tree->mutex);
lt_mutex_unlock(tree);
}
static void
lt_remove_db(toku_lock_tree* tree, DB *db) {
toku_mutex_lock(&tree->mutex);
lt_mutex_lock(tree);
if (db != NULL) {
int r;
OMTVALUE get_dbv = NULL;
......@@ -2271,7 +2316,7 @@ lt_remove_db(toku_lock_tree* tree, DB *db) {
r = toku_omt_delete_at(tree->dbs, index);
assert_zero(r);
}
toku_mutex_unlock(&tree->mutex);
lt_mutex_unlock(tree);
}
void
......@@ -2332,9 +2377,9 @@ void
toku_lock_request_destroy(toku_lock_request *lock_request) {
if (lock_request->state == LOCK_REQUEST_PENDING) {
toku_lock_tree *tree = lock_request->tree;
toku_mutex_lock(&tree->mutex);
lt_mutex_lock(tree);
toku_lock_request_tree_delete(lock_request->tree, lock_request);
toku_mutex_unlock(&tree->mutex);
lt_mutex_unlock(tree);
}
toku_lock_request_destroy_wait(lock_request);
toku_free(lock_request->key_left_copy.data);
......@@ -2364,24 +2409,28 @@ toku_lock_request_wait_internal(toku_lock_request *lock_request, toku_lock_tree
long int d_sec = usec / 1000000;
long int d_usec = usec % 1000000;
struct timespec ts = { sec + d_sec, d_usec * 1000 };
if (!tree_locked) toku_mutex_lock(&tree->mutex);
if (!tree_locked) lt_mutex_lock(tree);
while (lock_request->state == LOCK_REQUEST_PENDING) {
toku_lock_request_init_wait(lock_request);
tree->mutex_locked = false;
r = pthread_cond_timedwait(&lock_request->wait, &tree->mutex, &ts);
tree->mutex_locked = true;
assert(r == 0 || r == ETIMEDOUT);
if (r == ETIMEDOUT && lock_request->state == LOCK_REQUEST_PENDING) {
toku_lock_request_tree_delete(tree, lock_request);
toku_lock_request_complete(lock_request, DB_LOCK_NOTGRANTED);
}
}
if (!tree_locked) toku_mutex_unlock(&tree->mutex);
if (!tree_locked) lt_mutex_unlock(tree);
} else {
if (!tree_locked) toku_mutex_lock(&tree->mutex);
if (!tree_locked) lt_mutex_lock(tree);
while (lock_request->state == LOCK_REQUEST_PENDING) {
toku_lock_request_init_wait(lock_request);
tree->mutex_locked = false;
r = toku_pthread_cond_wait(&lock_request->wait, &tree->mutex); assert_zero(r);
tree->mutex_locked = true;
}
if (!tree_locked) toku_mutex_unlock(&tree->mutex);
if (!tree_locked) lt_mutex_unlock(tree);
}
assert(lock_request->state == LOCK_REQUEST_COMPLETE);
return lock_request->complete_r;
......@@ -2500,15 +2549,20 @@ static void print_key(const char *sp, const DBT *k) {
static void toku_lt_check_deadlock(toku_lock_tree *tree, toku_lock_request *a_lock_request);
static int
toku_lock_request_start_locked(toku_lock_request *lock_request, toku_lock_tree *tree, bool copy_keys_if_not_granted) {
int r;
toku_lock_request_start_locked(toku_lock_request *lock_request, toku_lock_tree *tree, bool copy_keys_if_not_granted, bool do_escalation) {
assert(lock_request->state == LOCK_REQUEST_INIT);
if (lock_request->type == LOCK_REQUEST_READ) {
r = toku_lt_acquire_range_read_lock_internal(tree, lock_request->db, lock_request->txnid, lock_request->key_left, lock_request->key_right);
} else if (lock_request->type == LOCK_REQUEST_WRITE) {
r = toku_lt_acquire_range_write_lock_internal(tree, lock_request->db, lock_request->txnid, lock_request->key_left, lock_request->key_right);
} else
assert(tree->mutex_locked);
int r = 0;
switch (lock_request->type) {
case LOCK_REQUEST_READ:
r = lt_acquire_range_read_lock(tree, lock_request->db, lock_request->txnid, lock_request->key_left, lock_request->key_right, do_escalation);
break;
case LOCK_REQUEST_WRITE:
r = lt_acquire_range_write_lock(tree, lock_request->db, lock_request->txnid, lock_request->key_left, lock_request->key_right, do_escalation);
break;
case LOCK_REQUEST_UNKNOWN:
assert(0);
}
#if TOKU_LT_DEBUG
if (toku_lt_debug) {
printf("%s:%u %lu db=%p %s %u\n", __FUNCTION__, __LINE__, lock_request->txnid, lock_request->db, lock_request->type == LOCK_REQUEST_READ ? "r" : "w", lock_request->state);
......@@ -2540,15 +2594,15 @@ toku_lock_request_start_locked(toku_lock_request *lock_request, toku_lock_tree *
int
toku_lock_request_start(toku_lock_request *lock_request, toku_lock_tree *tree, bool copy_keys_if_not_granted) {
toku_mutex_lock(&tree->mutex);
int r = toku_lock_request_start_locked(lock_request, tree, copy_keys_if_not_granted);
toku_mutex_unlock(&tree->mutex);
lt_mutex_lock(tree);
int r = toku_lock_request_start_locked(lock_request, tree, copy_keys_if_not_granted, true);
lt_mutex_unlock(tree);
return r;
}
static int
toku_lt_acquire_lock_request_with_timeout_locked(toku_lock_tree *tree, toku_lock_request *lock_request, struct timeval *wait_time) {
int r = toku_lock_request_start_locked(lock_request, tree, false);
int r = toku_lock_request_start_locked(lock_request, tree, false, true);
if (r == DB_LOCK_NOTGRANTED)
r = toku_lock_request_wait_internal(lock_request, tree, wait_time, true);
return r;
......@@ -2556,9 +2610,9 @@ toku_lt_acquire_lock_request_with_timeout_locked(toku_lock_tree *tree, toku_lock
int
toku_lt_acquire_lock_request_with_timeout(toku_lock_tree *tree, toku_lock_request *lock_request, struct timeval *wait_time) {
toku_mutex_lock(&tree->mutex);
lt_mutex_lock(tree);
int r = toku_lt_acquire_lock_request_with_timeout_locked(tree, lock_request, wait_time);
toku_mutex_unlock(&tree->mutex);
lt_mutex_unlock(tree);
return r;
}
......@@ -2570,15 +2624,17 @@ toku_lt_acquire_lock_request_with_default_timeout(toku_lock_tree *tree, toku_loc
static void
lt_retry_lock_requests(toku_lock_tree *tree) {
int r;
assert(tree->mutex_locked);
for (uint32_t i = 0; i < toku_omt_size(tree->lock_requests); ) {
int r;
OMTVALUE v;
r = toku_omt_fetch(tree->lock_requests, i, &v); assert_zero(r);
toku_lock_request *lock_request = (toku_lock_request *) v;
assert(lock_request->state == LOCK_REQUEST_PENDING);
lock_request->state = LOCK_REQUEST_INIT;
toku_omt_delete_at(tree->lock_requests, i);
r = toku_lock_request_start_locked(lock_request, tree, false);
r = toku_lock_request_start_locked(lock_request, tree, false, false);
if (lock_request->state == LOCK_REQUEST_COMPLETE) {
toku_lock_request_wakeup(lock_request, tree);
} else {
......@@ -2590,9 +2646,9 @@ lt_retry_lock_requests(toku_lock_tree *tree) {
void
toku_lt_retry_lock_requests(toku_lock_tree *tree) {
toku_mutex_lock(&tree->mutex);
lt_mutex_lock(tree);
lt_retry_lock_requests(tree);
toku_mutex_unlock(&tree->mutex);
lt_mutex_unlock(tree);
}
#include <stdbool.h>
......@@ -2789,11 +2845,11 @@ lt_verify(toku_lock_tree *lt) {
void
toku_lt_verify(toku_lock_tree *lt, DB *db) {
toku_mutex_lock(&lt->mutex);
lt_mutex_lock(lt);
lt_set_comparison_functions(lt, db);
lt_verify(lt);
lt_clear_comparison_functions(lt);
toku_mutex_unlock(&lt->mutex);
lt_mutex_unlock(lt);
}
#undef STATUS_VALUE
......@@ -75,9 +75,11 @@ struct __toku_lock_tree {
DICTIONARY_ID dict_id;
OMT dbs; //The extant dbs using this lock tree.
OMT lock_requests;
toku_pthread_mutex_t mutex;
toku_rth* txns_to_unlock; // set of txn's that could not release their locks because there was no db for the comparison function
toku_pthread_mutex_t mutex;
bool mutex_locked;
/** A temporary area where we store the results of various find on
the range trees that this lock tree owns
Memory ownership:
......@@ -149,6 +151,8 @@ struct __toku_ltm {
int (*panic)(DB*, int);
toku_pthread_mutex_t mutex;
bool mutex_locked;
struct timeval lock_wait_time;
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment