Commit fa11e7b1 authored by Yoni Fogel's avatar Yoni Fogel

Addresses #350

Integrated lock tree into the transactions.
    Allocated on create
    unlocked on abort/commit
    transaction keeps track of which lock trees it owns locks in
Started working on integrating c_get functions.
Modified locktree API to make it easier to use both dupsort and nondup dbs
Fixed tests to work with new locktree API.

git-svn-id: file:///svn/tokudb@2140 c7de825b-a66e-492c-adef-691d508d4ae1
parent f5ae9a00
......@@ -729,12 +729,15 @@ static BOOL __toku_r_backwards(toku_range* range) {
static int __toku_lt_preprocess(toku_lock_tree* tree, DB_TXN* txn,
const DBT* key_left, const DBT* data_left,
const DBT* key_right, const DBT* data_right,
const DBT* key_left, const DBT** pdata_left,
const DBT* key_right, const DBT** pdata_right,
toku_point* left, toku_point* right,
toku_range* query) {
assert(pdata_left && pdata_right);
if (!tree || !txn || !key_left || !key_right) return EINVAL;
if (!tree->duplicates && ( data_left || data_right)) return EINVAL;
if (!tree->duplicates) *pdata_right = *pdata_left = NULL;
const DBT* data_left = *pdata_left;
const DBT* data_right = *pdata_right;
if (tree->duplicates && (!data_left || !data_right)) return EINVAL;
if (tree->duplicates && key_left != data_left &&
__toku_lt_is_infinite(key_left)) return EINVAL;
......@@ -978,8 +981,8 @@ int toku_lt_acquire_range_read_lock(toku_lock_tree* tree, DB_TXN* txn,
toku_range query;
BOOL dominated;
r = __toku_lt_preprocess(tree, txn, key_left, data_left,
key_right, data_right,
r = __toku_lt_preprocess(tree, txn, key_left, &data_left,
key_right, &data_right,
&left, &right,
&query);
if (r!=0) return r;
......@@ -1025,8 +1028,8 @@ int toku_lt_acquire_write_lock(toku_lock_tree* tree, DB_TXN* txn,
BOOL dominated;
toku_range_tree* mainread;
r = __toku_lt_preprocess(tree, txn, key, data,
key, data,
r = __toku_lt_preprocess(tree, txn, key, &data,
key, &data,
&endpoint, &endpoint,
&query);
if (r!=0) return r;
......@@ -1081,8 +1084,8 @@ int toku_lt_acquire_range_write_lock(toku_lock_tree* tree, DB_TXN* txn,
toku_point right;
toku_range query;
r = __toku_lt_preprocess(tree, txn, key_left, data_left,
key_right, data_right,
r = __toku_lt_preprocess(tree, txn, key_left, &data_left,
key_right, &data_right,
&left, &right,
&query);
if (r!=0) return r;
......@@ -1220,7 +1223,7 @@ int toku_lt_set_dups(toku_lock_tree* tree, BOOL duplicates) {
return 0;
}
int toku_lt_set_txn_callback(toku_lock_tree* tree,
int toku_lt_set_txn_add_lt_callback(toku_lock_tree* tree,
int (*callback)(DB_TXN*, toku_lock_tree*)) {
if (!tree || !callback) return EINVAL;
if (!tree->dups_final) return EDOM;
......
......@@ -206,7 +206,6 @@ int toku_lt_close(toku_lock_tree* tree);
The following is asserted:
(tree == NULL || txn == NULL || key == NULL) or
(tree->db is dupsort && data == NULL) or
(tree->db is nodup && data != NULL) or
(tree->db is dupsort && key != data &&
(key == toku_lt_infinity ||
(toku_lock_tree* tree, DB_TXN* txn, const DBT* key, const DBT* data);
......@@ -247,8 +246,6 @@ int toku_lt_acquire_read_lock(toku_lock_tree* tree, DB_TXN* txn,
key_left == NULL || key_right == NULL) or
(tree->db is dupsort &&
(data_left == NULL || data_right == NULL)) or
(tree->db is nodup &&
(data_left != NULL || data_right != NULL)) or
(tree->db is dupsort && key_left != data_left &&
(key_left == toku_lt_infinity ||
key_left == toku_lt_neg_infinity)) or
......@@ -284,7 +281,6 @@ int toku_lt_acquire_range_read_lock(toku_lock_tree* tree, DB_TXN* txn,
* a write (or read) lock that overlaps this point.
* EINVAL: If (tree == NULL || txn == NULL || key == NULL) or
* (tree->db is dupsort && data == NULL) or
* (tree->db is nodup && data != NULL)
* (tree->db is dupsort && key != data &&
* (key == toku_lt_infinity ||
* key == toku_lt_neg_infinity))
......@@ -327,8 +323,6 @@ int toku_lt_acquire_write_lock(toku_lock_tree* tree, DB_TXN* txn,
* key_left == NULL || key_right == NULL) or
* (tree->db is dupsort &&
* (data_left == NULL || data_right == NULL)) or
* (tree->db is nodup &&
* (data_left != NULL || data_right != NULL))
or
* (tree->db is dupsort && key_left != data_left &&
* (key_left == toku_lt_infinity ||
......@@ -368,8 +362,7 @@ int toku_lt_acquire_range_write_lock(toku_lock_tree* tree, DB_TXN* txn,
* txn: The transaction to release all locks for.
* Returns:
* 0: Success.
* EINVAL: If (tree == NULL || txn == NULL) or
* if toku_lt_unlock has already been called on this txn.
* EINVAL: If (tree == NULL || txn == NULL).
* *** Note that txn == NULL is not supported at this time.
*/
int toku_lt_unlock(toku_lock_tree* tree, DB_TXN* txn);
......@@ -383,7 +376,7 @@ int toku_lt_unlock(toku_lock_tree* tree, DB_TXN* txn);
Return: EINVAL if tree is NULL
Return: EDOM if it is too late to change.
*/
int toku_lt_set_txn_callback(toku_lock_tree* tree,
int toku_lt_set_txn_add_lt_callback(toku_lock_tree* tree,
int (*callback)(DB_TXN*, toku_lock_tree*));
#endif
......@@ -62,14 +62,16 @@ static void do_range_test(int (*acquire)(toku_lock_tree*, DB_TXN*,
r = acquire(lt, txn, NULL, data_l,
key_r, data_r);
CKERR2(r, EINVAL);
if (duplicates) {
r = acquire(lt, txn, key_l, reverse_data_l,
key_r, data_r);
CKERR2(r, EINVAL);
r = acquire(lt, txn, key_l, data_l,
NULL, data_r);
key_r, reverse_data_r);
CKERR2(r, EINVAL);
}
r = acquire(lt, txn, key_l, data_l,
key_r, reverse_data_r);
NULL, data_r);
CKERR2(r, EINVAL);
/* Infinite tests. */
......@@ -164,8 +166,10 @@ static void do_point_test(int (*acquire)(toku_lock_tree*, DB_TXN*,
r = acquire(lt, txn, NULL, data);
CKERR2(r, EINVAL);
if (duplicates) {
r = acquire(lt, txn, key, reverse_data);
CKERR2(r, EINVAL);
}
/* Infinite tests. */
if (duplicates) {
......
......@@ -198,12 +198,12 @@ void runtest(BOOL dups) {
/* ********************* */
setup_tree(dups);
lt_insert_write(dups, 0, 'a', 1, 1);
lt_insert_read (dups, DB_LOCK_NOTGRANTED, 'b', 1, 1, 1, 1);
lt_insert_read (dups, DB_LOCK_DEADLOCK, 'b', 1, 1, 1, 1);
close_tree();
/* ********************* */
setup_tree(dups);
lt_insert_read (dups, 0, 'b', 1, 1, 1, 1);
lt_insert_write(dups, DB_LOCK_NOTGRANTED, 'a', 1, 1);
lt_insert_write(dups, DB_LOCK_DEADLOCK, 'a', 1, 1);
close_tree();
/* ********************* */
setup_tree(dups);
......@@ -212,7 +212,7 @@ void runtest(BOOL dups) {
lt_insert_write(dups, 0, 'a', 3, 1);
lt_insert_write(dups, 0, 'a', 4, 1);
lt_insert_write(dups, 0, 'a', 5, 1);
lt_insert_read (dups, DB_LOCK_NOTGRANTED, 'b', 2, 1, 4, 1);
lt_insert_read (dups, DB_LOCK_DEADLOCK, 'b', 2, 1, 4, 1);
close_tree();
/* ********************* */
setup_tree(dups);
......@@ -221,7 +221,7 @@ void runtest(BOOL dups) {
lt_insert_write(dups, 0, 'a', 3, 1);
lt_insert_write(dups, 0, 'a', 4, 1);
lt_insert_write(dups, 0, 'a', 5, 1);
lt_insert_write (dups, DB_LOCK_NOTGRANTED, 'b', 2, 1);
lt_insert_write (dups, DB_LOCK_DEADLOCK, 'b', 2, 1);
close_tree();
/* ********************* */
setup_tree(dups);
......@@ -250,7 +250,7 @@ void runtest(BOOL dups) {
lt_insert_write(dups, 0, 'a', 7, 1);
lt_insert_write(dups, 0, 'a', 8, 1);
lt_insert_write(dups, 0, 'a', 9, 1);
lt_insert_read (dups, DB_LOCK_NOTGRANTED, 'a', 3, 1, 7, 1);
lt_insert_read (dups, DB_LOCK_DEADLOCK, 'a', 3, 1, 7, 1);
close_tree();
/* ********************* */
setup_tree(dups);
......@@ -263,7 +263,7 @@ void runtest(BOOL dups) {
lt_insert_write(dups, 0, 'b', 7, 1);
lt_insert_write(dups, 0, 'b', 8, 1);
lt_insert_write(dups, 0, 'b', 9, 1);
lt_insert_read (dups, DB_LOCK_NOTGRANTED, 'a', 3, 1, 7, 1);
lt_insert_read (dups, DB_LOCK_DEADLOCK, 'a', 3, 1, 7, 1);
close_tree();
/* ********************* */
setup_tree(dups);
......@@ -279,7 +279,7 @@ void runtest(BOOL dups) {
lt_insert_write(dups, 0, 'b', 2, 1);
lt_insert_write(dups, 0, 'b', 3, 1);
lt_insert_write(dups, 0, 'b', 4, 1);
lt_insert_read (dups, DB_LOCK_NOTGRANTED, 'a', 3, 1, 7, 1);
lt_insert_read (dups, DB_LOCK_DEADLOCK, 'a', 3, 1, 7, 1);
close_tree();
/* ********************* */
setup_tree(dups);
......@@ -303,10 +303,10 @@ void runtest(BOOL dups) {
lt_insert_write(dups, 0, 'a', 2, 1);
lt_insert_write(dups, 0, 'a', 3, 1);
lt_insert_write(dups, 0, 'a', 4, 1);
lt_insert_read (dups, DB_LOCK_NOTGRANTED, 'b', 3, 1, 3, 1);
lt_insert_read (dups, DB_LOCK_DEADLOCK, 'b', 3, 1, 3, 1);
lt_unlock('a');
lt_insert_write(dups, 0, 'b', 3, 1);
lt_insert_read (dups, DB_LOCK_NOTGRANTED, 'a', 3, 1, 3, 1);
lt_insert_read (dups, DB_LOCK_DEADLOCK, 'a', 3, 1, 3, 1);
lt_unlock('b');
lt_insert_read (dups, 0, 'a', 3, 1, 3, 1);
close_tree();
......
......@@ -7,6 +7,7 @@
#include "../newbrt/brttypes.h"
#include "../newbrt/brt.h"
#include "../newbrt/list.h"
#include "./lock_tree/lth.h"
struct db_header {
int n_databases; // Or there can be >=1 named databases. This is the count.
......@@ -67,6 +68,7 @@ struct __toku_db_env_internal {
struct __toku_db_txn_internal {
//TXNID txnid64; /* A sixty-four bit txn id. */
TOKUTXN tokutxn;
toku_lth* lth;
DB_TXN *parent;
};
......
......@@ -28,10 +28,6 @@ const char *toku_copyright_string = "Copyright (c) 2007, 2008 Tokutek Inc. All
#include "log.h"
#include "memory.h"
#define TOKU_LT_LINEAR
#include <locktree.h>
#include <lth.h>
const u_int32_t __toku_env_default_max_locks = 1000;
/* the ydb big lock serializes access to the tokudb
......@@ -759,6 +755,24 @@ int db_env_create(DB_ENV ** envp, u_int32_t flags) {
ydb_lock(); int r = toku_env_create(envp, flags); ydb_unlock(); return r;
}
static void toku_txn_release_locks(DB_TXN* txn) {
assert(txn);
toku_lth* lth = txn->i->lth;
assert(lth);
int r;
toku_lth_start_scan(lth);
toku_lock_tree* next = toku_lth_next(lth);
while (next) {
r = toku_lt_unlock(next, txn);
/* Only NULL parameters can give a non 0 return value. */
assert(r==0);
next = toku_lth_next(lth);
}
toku_lth_close(lth);
txn->i->lth = NULL;
}
static int toku_txn_commit(DB_TXN * txn, u_int32_t flags) {
HANDLE_PANICKED_ENV(txn->mgrp);
//notef("flags=%d\n", flags);
......@@ -774,6 +788,7 @@ static int toku_txn_commit(DB_TXN * txn, u_int32_t flags) {
toku_free(txn->i->tokutxn);
}
// Cleanup */
toku_txn_release_locks(txn);
if (txn->i)
toku_free(txn->i);
toku_free(txn);
......@@ -792,6 +807,8 @@ static TXNID next_txn = 0;
static int toku_txn_abort(DB_TXN * txn) {
HANDLE_PANICKED_ENV(txn->mgrp);
int r = toku_logger_abort(txn->i->tokutxn);
toku_txn_release_locks(txn);
toku_free(txn->i);
toku_free(txn);
return r;
......@@ -833,9 +850,20 @@ static int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t f
result->commit = locked_txn_commit;
result->id = locked_txn_id;
MALLOC(result->i);
assert(result->i);
if (!result->i) {
toku_free(result);
return ENOMEM;
}
result->i->parent = stxn;
int r = toku_logger_txn_begin(stxn ? stxn->i->tokutxn : 0, &result->i->tokutxn, next_txn++, env->i->logger);
int r = toku_lth_create(&result->i->lth, toku_malloc, toku_free, toku_realloc);
if (r!=0) {
toku_free(result->i);
toku_free(result);
return r;
}
r = toku_logger_txn_begin(stxn ? stxn->i->tokutxn : 0, &result->i->tokutxn, next_txn++, env->i->logger);
if (r != 0)
return r;
*txn = result;
......@@ -1106,14 +1134,128 @@ delete_silently_and_retry:
static int toku_c_get(DBC * c, DBT * key, DBT * data, u_int32_t flag) {
DB *db = c->dbp;
// DB_TXN *txn = c->i->txn;
HANDLE_PANICKED_DB(db);
u_int32_t get_flag = get_main_cursor_flag(flag);
int r;
if (db->i->primary==0) r = toku_c_get_noassociate(c, key, data, flag);
if (db->i->primary==0) {
r = toku_c_get_noassociate(c, key, data, flag);
int r2 = 0;
switch (get_flag) {
/*
These should be done but were not tested prior to commit.
case (DB_CURRENT): {
// No locking necessary. You already own a lock by virtue
// of having a cursor pointing to this.
break;
}
case (DB_FIRST): {
int r2;
if (r == DB_NOTFOUND) {
r2 = toku_lt_acquire_range_read_lock(db->i->lt, txn,
toku_lt_neg_infinity, toku_lt_neg_infinity,
toku_lt_infinity, toku_lt_infinity);
}
else if (r == 0) {
r2 = toku_lt_acquire_range_read_lock(db->i->lt, txn,
toku_lt_neg_infinity, toku_lt_neg_infinity,
key, data);
}
else return r;
break;
}
case (DB_LAST): {
if (r == DB_NOTFOUND) {
r2 = toku_lt_acquire_range_read_lock(db->i->lt, txn,
toku_lt_neg_infinity, toku_lt_neg_infinity,
toku_lt_infinity, toku_lt_infinity);
}
else if (r == 0) {
r2 = toku_lt_acquire_range_read_lock(db->i->lt, txn,
key, data,
toku_lt_infinity, toku_lt_infinity);
}
else return r;
break;
}
case (DB_SET): {
if (r == DB_NOTFOUND) {
r2 = toku_lt_acquire_range_read_lock(db->i->lt, txn,
key, toku_lt_neg_infinity,
key, toku_lt_infinity);
}
else if (r == 0) {
r2 = toku_lt_acquire_range_read_lock(db->i->lt, txn,
key, toku_lt_neg_infinity,
key, data);
}
else return r;
break;
}
case (DB_GET_BOTH): {
if (r != DB_NOTFOUND && r != 0) return r;
r2 = toku_lt_acquire_read_lock(db->i->lt, txn, key, data);
break;
}
*/
/*
These are not ready and are just notes.
case (DB_GET_BOTH_RANGE): {
barf();
//Not ready yet.
break;
}
case (DB_NEXT): {
post_get_DB_NEXT:
//TODO: Need 'am I initialized' function, and if not, goto post_get_DB_FIRST
//TODO: Need get old data function. MUST BE CALLED BEFORE CGET
break;
}
case (DB_PREV): {
//TODO: Need 'am I initialized' function
//TODO: Need get old data function. MUST BE CALLED BEFORE CGET
break;
}
case (DB_SET_RANGE): {
//TODO: Need to save key_in
if (r == DB_NOTFOUND) {
r2 = toku_lt_acquire_range_read_lock(db->i->lt, txn,
key_in, toku_lt_neg_infinity,
toku_lt_infinity, toku_lt_infinity);
}
else if (r == 0) {
r2 = toku_lt_acquire_range_read_lock(db->i->lt, txn,
key_in, toku_lt_neg_infinity,
key, data);
}
else return r;
break;
}
case (DB_NEXT_NODUP): {
goto post_get_DB_NEXT;
}
case (DB_PREV_NODUP): {
goto post_get_DB_PREV;
}
case (DB_NEXT_DUP): {
//Not ready yet./not needed for MySQL.
barf();
break;
}
default: {
barf();
assert(FALSE);
}
*/
}
if (r2!=0) return r2;
}
else {
// It's a c_get on a secondary.
DBT primary_key;
u_int32_t get_flag = get_main_cursor_flag(flag);
/* It is an error to use the DB_GET_BOTH or DB_GET_BOTH_RANGE flag on a
* cursor that has been opened on a secondary index handle.
......@@ -1659,6 +1801,20 @@ static int toku_db_lt_panic(DB* db, int r) {
return EINVAL;
}
static int toku_txn_add_lt(DB_TXN* txn, toku_lock_tree* lt) {
assert(txn && lt);
toku_lth* lth = txn->i->lth;
assert(lth);
toku_lock_tree* find = toku_lth_find(lth, lt);
if (find) {
assert(find == lt);
return 0;
}
int r = toku_lth_insert(lth, lt);
return r;
}
static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYPE dbtype, u_int32_t flags, int mode) {
HANDLE_PANICKED_DB(db);
// Warning. Should check arguments. Should check return codes on malloc and open and so forth.
......@@ -1729,6 +1885,8 @@ static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *db
db->i->brt->compare_fun, db->i->brt->dup_compare,
toku_malloc, toku_free, toku_realloc);
if (r!=0) goto error_cleanup;
r = toku_lt_set_txn_add_lt_callback(db->i->lt, toku_txn_add_lt);
assert(r==0);
r = toku_brt_open(db->i->brt, db->i->full_fname, fname, dbname,
is_db_create, is_db_excl, is_db_unknown,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment