Commit ce5b3e64 authored by Yoni Fogel's avatar Yoni Fogel

Addresses #350

Integrated lock tree into the transactions.
    Allocated on create
    unlocked on abort/commit
    transaction keeps track of which lock trees it owns locks in
Started working on integrating c_get functions.
Modified locktree API to make it easier to use both dupsort and nondup dbs
Fixed tests to work with new locktree API.

git-svn-id: file:///svn/tokudb@2140 c7de825b-a66e-492c-adef-691d508d4ae1
parent f48cbb7f
...@@ -729,12 +729,15 @@ static BOOL __toku_r_backwards(toku_range* range) { ...@@ -729,12 +729,15 @@ static BOOL __toku_r_backwards(toku_range* range) {
static int __toku_lt_preprocess(toku_lock_tree* tree, DB_TXN* txn, static int __toku_lt_preprocess(toku_lock_tree* tree, DB_TXN* txn,
const DBT* key_left, const DBT* data_left, const DBT* key_left, const DBT** pdata_left,
const DBT* key_right, const DBT* data_right, const DBT* key_right, const DBT** pdata_right,
toku_point* left, toku_point* right, toku_point* left, toku_point* right,
toku_range* query) { toku_range* query) {
assert(pdata_left && pdata_right);
if (!tree || !txn || !key_left || !key_right) return EINVAL; if (!tree || !txn || !key_left || !key_right) return EINVAL;
if (!tree->duplicates && ( data_left || data_right)) return EINVAL; if (!tree->duplicates) *pdata_right = *pdata_left = NULL;
const DBT* data_left = *pdata_left;
const DBT* data_right = *pdata_right;
if (tree->duplicates && (!data_left || !data_right)) return EINVAL; if (tree->duplicates && (!data_left || !data_right)) return EINVAL;
if (tree->duplicates && key_left != data_left && if (tree->duplicates && key_left != data_left &&
__toku_lt_is_infinite(key_left)) return EINVAL; __toku_lt_is_infinite(key_left)) return EINVAL;
...@@ -978,9 +981,9 @@ int toku_lt_acquire_range_read_lock(toku_lock_tree* tree, DB_TXN* txn, ...@@ -978,9 +981,9 @@ int toku_lt_acquire_range_read_lock(toku_lock_tree* tree, DB_TXN* txn,
toku_range query; toku_range query;
BOOL dominated; BOOL dominated;
r = __toku_lt_preprocess(tree, txn, key_left, data_left, r = __toku_lt_preprocess(tree, txn, key_left, &data_left,
key_right, data_right, key_right, &data_right,
&left, &right, &left, &right,
&query); &query);
if (r!=0) return r; if (r!=0) return r;
...@@ -1025,8 +1028,8 @@ int toku_lt_acquire_write_lock(toku_lock_tree* tree, DB_TXN* txn, ...@@ -1025,8 +1028,8 @@ int toku_lt_acquire_write_lock(toku_lock_tree* tree, DB_TXN* txn,
BOOL dominated; BOOL dominated;
toku_range_tree* mainread; toku_range_tree* mainread;
r = __toku_lt_preprocess(tree, txn, key, data, r = __toku_lt_preprocess(tree, txn, key, &data,
key, data, key, &data,
&endpoint, &endpoint, &endpoint, &endpoint,
&query); &query);
if (r!=0) return r; if (r!=0) return r;
...@@ -1081,9 +1084,9 @@ int toku_lt_acquire_range_write_lock(toku_lock_tree* tree, DB_TXN* txn, ...@@ -1081,9 +1084,9 @@ int toku_lt_acquire_range_write_lock(toku_lock_tree* tree, DB_TXN* txn,
toku_point right; toku_point right;
toku_range query; toku_range query;
r = __toku_lt_preprocess(tree, txn, key_left, data_left, r = __toku_lt_preprocess(tree, txn, key_left, &data_left,
key_right, data_right, key_right, &data_right,
&left, &right, &left, &right,
&query); &query);
if (r!=0) return r; if (r!=0) return r;
...@@ -1220,8 +1223,8 @@ int toku_lt_set_dups(toku_lock_tree* tree, BOOL duplicates) { ...@@ -1220,8 +1223,8 @@ int toku_lt_set_dups(toku_lock_tree* tree, BOOL duplicates) {
return 0; return 0;
} }
int toku_lt_set_txn_callback(toku_lock_tree* tree, int toku_lt_set_txn_add_lt_callback(toku_lock_tree* tree,
int (*callback)(DB_TXN*, toku_lock_tree*)) { int (*callback)(DB_TXN*, toku_lock_tree*)) {
if (!tree || !callback) return EINVAL; if (!tree || !callback) return EINVAL;
if (!tree->dups_final) return EDOM; if (!tree->dups_final) return EDOM;
tree->lock_callback = callback; tree->lock_callback = callback;
......
...@@ -206,7 +206,6 @@ int toku_lt_close(toku_lock_tree* tree); ...@@ -206,7 +206,6 @@ int toku_lt_close(toku_lock_tree* tree);
The following is asserted: The following is asserted:
(tree == NULL || txn == NULL || key == NULL) or (tree == NULL || txn == NULL || key == NULL) or
(tree->db is dupsort && data == NULL) or (tree->db is dupsort && data == NULL) or
(tree->db is nodup && data != NULL) or
(tree->db is dupsort && key != data && (tree->db is dupsort && key != data &&
(key == toku_lt_infinity || (key == toku_lt_infinity ||
(toku_lock_tree* tree, DB_TXN* txn, const DBT* key, const DBT* data); (toku_lock_tree* tree, DB_TXN* txn, const DBT* key, const DBT* data);
...@@ -247,8 +246,6 @@ int toku_lt_acquire_read_lock(toku_lock_tree* tree, DB_TXN* txn, ...@@ -247,8 +246,6 @@ int toku_lt_acquire_read_lock(toku_lock_tree* tree, DB_TXN* txn,
key_left == NULL || key_right == NULL) or key_left == NULL || key_right == NULL) or
(tree->db is dupsort && (tree->db is dupsort &&
(data_left == NULL || data_right == NULL)) or (data_left == NULL || data_right == NULL)) or
(tree->db is nodup &&
(data_left != NULL || data_right != NULL)) or
(tree->db is dupsort && key_left != data_left && (tree->db is dupsort && key_left != data_left &&
(key_left == toku_lt_infinity || (key_left == toku_lt_infinity ||
key_left == toku_lt_neg_infinity)) or key_left == toku_lt_neg_infinity)) or
...@@ -284,7 +281,6 @@ int toku_lt_acquire_range_read_lock(toku_lock_tree* tree, DB_TXN* txn, ...@@ -284,7 +281,6 @@ int toku_lt_acquire_range_read_lock(toku_lock_tree* tree, DB_TXN* txn,
* a write (or read) lock that overlaps this point. * a write (or read) lock that overlaps this point.
* EINVAL: If (tree == NULL || txn == NULL || key == NULL) or * EINVAL: If (tree == NULL || txn == NULL || key == NULL) or
* (tree->db is dupsort && data == NULL) or * (tree->db is dupsort && data == NULL) or
* (tree->db is nodup && data != NULL)
* (tree->db is dupsort && key != data && * (tree->db is dupsort && key != data &&
* (key == toku_lt_infinity || * (key == toku_lt_infinity ||
* key == toku_lt_neg_infinity)) * key == toku_lt_neg_infinity))
...@@ -327,8 +323,6 @@ int toku_lt_acquire_write_lock(toku_lock_tree* tree, DB_TXN* txn, ...@@ -327,8 +323,6 @@ int toku_lt_acquire_write_lock(toku_lock_tree* tree, DB_TXN* txn,
* key_left == NULL || key_right == NULL) or * key_left == NULL || key_right == NULL) or
* (tree->db is dupsort && * (tree->db is dupsort &&
* (data_left == NULL || data_right == NULL)) or * (data_left == NULL || data_right == NULL)) or
* (tree->db is nodup &&
* (data_left != NULL || data_right != NULL))
or or
* (tree->db is dupsort && key_left != data_left && * (tree->db is dupsort && key_left != data_left &&
* (key_left == toku_lt_infinity || * (key_left == toku_lt_infinity ||
...@@ -368,8 +362,7 @@ int toku_lt_acquire_range_write_lock(toku_lock_tree* tree, DB_TXN* txn, ...@@ -368,8 +362,7 @@ int toku_lt_acquire_range_write_lock(toku_lock_tree* tree, DB_TXN* txn,
* txn: The transaction to release all locks for. * txn: The transaction to release all locks for.
* Returns: * Returns:
* 0: Success. * 0: Success.
* EINVAL: If (tree == NULL || txn == NULL) or * EINVAL: If (tree == NULL || txn == NULL).
* if toku_lt_unlock has already been called on this txn.
* *** Note that txn == NULL is not supported at this time. * *** Note that txn == NULL is not supported at this time.
*/ */
int toku_lt_unlock(toku_lock_tree* tree, DB_TXN* txn); int toku_lt_unlock(toku_lock_tree* tree, DB_TXN* txn);
...@@ -383,7 +376,7 @@ int toku_lt_unlock(toku_lock_tree* tree, DB_TXN* txn); ...@@ -383,7 +376,7 @@ int toku_lt_unlock(toku_lock_tree* tree, DB_TXN* txn);
Return: EINVAL if tree is NULL Return: EINVAL if tree is NULL
Return: EDOM if it is too late to change. Return: EDOM if it is too late to change.
*/ */
int toku_lt_set_txn_callback(toku_lock_tree* tree, int toku_lt_set_txn_add_lt_callback(toku_lock_tree* tree,
int (*callback)(DB_TXN*, toku_lock_tree*)); int (*callback)(DB_TXN*, toku_lock_tree*));
#endif #endif
...@@ -62,15 +62,17 @@ static void do_range_test(int (*acquire)(toku_lock_tree*, DB_TXN*, ...@@ -62,15 +62,17 @@ static void do_range_test(int (*acquire)(toku_lock_tree*, DB_TXN*,
r = acquire(lt, txn, NULL, data_l, r = acquire(lt, txn, NULL, data_l,
key_r, data_r); key_r, data_r);
CKERR2(r, EINVAL); CKERR2(r, EINVAL);
r = acquire(lt, txn, key_l, reverse_data_l, if (duplicates) {
key_r, data_r); r = acquire(lt, txn, key_l, reverse_data_l,
CKERR2(r, EINVAL); key_r, data_r);
CKERR2(r, EINVAL);
r = acquire(lt, txn, key_l, data_l,
key_r, reverse_data_r);
CKERR2(r, EINVAL);
}
r = acquire(lt, txn, key_l, data_l, r = acquire(lt, txn, key_l, data_l,
NULL, data_r); NULL, data_r);
CKERR2(r, EINVAL); CKERR2(r, EINVAL);
r = acquire(lt, txn, key_l, data_l,
key_r, reverse_data_r);
CKERR2(r, EINVAL);
/* Infinite tests. */ /* Infinite tests. */
if (duplicates) { if (duplicates) {
...@@ -164,8 +166,10 @@ static void do_point_test(int (*acquire)(toku_lock_tree*, DB_TXN*, ...@@ -164,8 +166,10 @@ static void do_point_test(int (*acquire)(toku_lock_tree*, DB_TXN*,
r = acquire(lt, txn, NULL, data); r = acquire(lt, txn, NULL, data);
CKERR2(r, EINVAL); CKERR2(r, EINVAL);
r = acquire(lt, txn, key, reverse_data); if (duplicates) {
CKERR2(r, EINVAL); r = acquire(lt, txn, key, reverse_data);
CKERR2(r, EINVAL);
}
/* Infinite tests. */ /* Infinite tests. */
if (duplicates) { if (duplicates) {
......
...@@ -198,12 +198,12 @@ void runtest(BOOL dups) { ...@@ -198,12 +198,12 @@ void runtest(BOOL dups) {
/* ********************* */ /* ********************* */
setup_tree(dups); setup_tree(dups);
lt_insert_write(dups, 0, 'a', 1, 1); lt_insert_write(dups, 0, 'a', 1, 1);
lt_insert_read (dups, DB_LOCK_NOTGRANTED, 'b', 1, 1, 1, 1); lt_insert_read (dups, DB_LOCK_DEADLOCK, 'b', 1, 1, 1, 1);
close_tree(); close_tree();
/* ********************* */ /* ********************* */
setup_tree(dups); setup_tree(dups);
lt_insert_read (dups, 0, 'b', 1, 1, 1, 1); lt_insert_read (dups, 0, 'b', 1, 1, 1, 1);
lt_insert_write(dups, DB_LOCK_NOTGRANTED, 'a', 1, 1); lt_insert_write(dups, DB_LOCK_DEADLOCK, 'a', 1, 1);
close_tree(); close_tree();
/* ********************* */ /* ********************* */
setup_tree(dups); setup_tree(dups);
...@@ -212,7 +212,7 @@ void runtest(BOOL dups) { ...@@ -212,7 +212,7 @@ void runtest(BOOL dups) {
lt_insert_write(dups, 0, 'a', 3, 1); lt_insert_write(dups, 0, 'a', 3, 1);
lt_insert_write(dups, 0, 'a', 4, 1); lt_insert_write(dups, 0, 'a', 4, 1);
lt_insert_write(dups, 0, 'a', 5, 1); lt_insert_write(dups, 0, 'a', 5, 1);
lt_insert_read (dups, DB_LOCK_NOTGRANTED, 'b', 2, 1, 4, 1); lt_insert_read (dups, DB_LOCK_DEADLOCK, 'b', 2, 1, 4, 1);
close_tree(); close_tree();
/* ********************* */ /* ********************* */
setup_tree(dups); setup_tree(dups);
...@@ -221,7 +221,7 @@ void runtest(BOOL dups) { ...@@ -221,7 +221,7 @@ void runtest(BOOL dups) {
lt_insert_write(dups, 0, 'a', 3, 1); lt_insert_write(dups, 0, 'a', 3, 1);
lt_insert_write(dups, 0, 'a', 4, 1); lt_insert_write(dups, 0, 'a', 4, 1);
lt_insert_write(dups, 0, 'a', 5, 1); lt_insert_write(dups, 0, 'a', 5, 1);
lt_insert_write (dups, DB_LOCK_NOTGRANTED, 'b', 2, 1); lt_insert_write (dups, DB_LOCK_DEADLOCK, 'b', 2, 1);
close_tree(); close_tree();
/* ********************* */ /* ********************* */
setup_tree(dups); setup_tree(dups);
...@@ -250,7 +250,7 @@ void runtest(BOOL dups) { ...@@ -250,7 +250,7 @@ void runtest(BOOL dups) {
lt_insert_write(dups, 0, 'a', 7, 1); lt_insert_write(dups, 0, 'a', 7, 1);
lt_insert_write(dups, 0, 'a', 8, 1); lt_insert_write(dups, 0, 'a', 8, 1);
lt_insert_write(dups, 0, 'a', 9, 1); lt_insert_write(dups, 0, 'a', 9, 1);
lt_insert_read (dups, DB_LOCK_NOTGRANTED, 'a', 3, 1, 7, 1); lt_insert_read (dups, DB_LOCK_DEADLOCK, 'a', 3, 1, 7, 1);
close_tree(); close_tree();
/* ********************* */ /* ********************* */
setup_tree(dups); setup_tree(dups);
...@@ -263,7 +263,7 @@ void runtest(BOOL dups) { ...@@ -263,7 +263,7 @@ void runtest(BOOL dups) {
lt_insert_write(dups, 0, 'b', 7, 1); lt_insert_write(dups, 0, 'b', 7, 1);
lt_insert_write(dups, 0, 'b', 8, 1); lt_insert_write(dups, 0, 'b', 8, 1);
lt_insert_write(dups, 0, 'b', 9, 1); lt_insert_write(dups, 0, 'b', 9, 1);
lt_insert_read (dups, DB_LOCK_NOTGRANTED, 'a', 3, 1, 7, 1); lt_insert_read (dups, DB_LOCK_DEADLOCK, 'a', 3, 1, 7, 1);
close_tree(); close_tree();
/* ********************* */ /* ********************* */
setup_tree(dups); setup_tree(dups);
...@@ -279,7 +279,7 @@ void runtest(BOOL dups) { ...@@ -279,7 +279,7 @@ void runtest(BOOL dups) {
lt_insert_write(dups, 0, 'b', 2, 1); lt_insert_write(dups, 0, 'b', 2, 1);
lt_insert_write(dups, 0, 'b', 3, 1); lt_insert_write(dups, 0, 'b', 3, 1);
lt_insert_write(dups, 0, 'b', 4, 1); lt_insert_write(dups, 0, 'b', 4, 1);
lt_insert_read (dups, DB_LOCK_NOTGRANTED, 'a', 3, 1, 7, 1); lt_insert_read (dups, DB_LOCK_DEADLOCK, 'a', 3, 1, 7, 1);
close_tree(); close_tree();
/* ********************* */ /* ********************* */
setup_tree(dups); setup_tree(dups);
...@@ -303,10 +303,10 @@ void runtest(BOOL dups) { ...@@ -303,10 +303,10 @@ void runtest(BOOL dups) {
lt_insert_write(dups, 0, 'a', 2, 1); lt_insert_write(dups, 0, 'a', 2, 1);
lt_insert_write(dups, 0, 'a', 3, 1); lt_insert_write(dups, 0, 'a', 3, 1);
lt_insert_write(dups, 0, 'a', 4, 1); lt_insert_write(dups, 0, 'a', 4, 1);
lt_insert_read (dups, DB_LOCK_NOTGRANTED, 'b', 3, 1, 3, 1); lt_insert_read (dups, DB_LOCK_DEADLOCK, 'b', 3, 1, 3, 1);
lt_unlock('a'); lt_unlock('a');
lt_insert_write(dups, 0, 'b', 3, 1); lt_insert_write(dups, 0, 'b', 3, 1);
lt_insert_read (dups, DB_LOCK_NOTGRANTED, 'a', 3, 1, 3, 1); lt_insert_read (dups, DB_LOCK_DEADLOCK, 'a', 3, 1, 3, 1);
lt_unlock('b'); lt_unlock('b');
lt_insert_read (dups, 0, 'a', 3, 1, 3, 1); lt_insert_read (dups, 0, 'a', 3, 1, 3, 1);
close_tree(); close_tree();
......
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
#include "../newbrt/brttypes.h" #include "../newbrt/brttypes.h"
#include "../newbrt/brt.h" #include "../newbrt/brt.h"
#include "../newbrt/list.h" #include "../newbrt/list.h"
#include "./lock_tree/lth.h"
struct db_header { struct db_header {
int n_databases; // Or there can be >=1 named databases. This is the count. int n_databases; // Or there can be >=1 named databases. This is the count.
...@@ -67,6 +68,7 @@ struct __toku_db_env_internal { ...@@ -67,6 +68,7 @@ struct __toku_db_env_internal {
struct __toku_db_txn_internal { struct __toku_db_txn_internal {
//TXNID txnid64; /* A sixty-four bit txn id. */ //TXNID txnid64; /* A sixty-four bit txn id. */
TOKUTXN tokutxn; TOKUTXN tokutxn;
toku_lth* lth;
DB_TXN *parent; DB_TXN *parent;
}; };
......
...@@ -28,10 +28,6 @@ const char *toku_copyright_string = "Copyright (c) 2007, 2008 Tokutek Inc. All ...@@ -28,10 +28,6 @@ const char *toku_copyright_string = "Copyright (c) 2007, 2008 Tokutek Inc. All
#include "log.h" #include "log.h"
#include "memory.h" #include "memory.h"
#define TOKU_LT_LINEAR
#include <locktree.h>
#include <lth.h>
const u_int32_t __toku_env_default_max_locks = 1000; const u_int32_t __toku_env_default_max_locks = 1000;
/* the ydb big lock serializes access to the tokudb /* the ydb big lock serializes access to the tokudb
...@@ -759,6 +755,24 @@ int db_env_create(DB_ENV ** envp, u_int32_t flags) { ...@@ -759,6 +755,24 @@ int db_env_create(DB_ENV ** envp, u_int32_t flags) {
ydb_lock(); int r = toku_env_create(envp, flags); ydb_unlock(); return r; ydb_lock(); int r = toku_env_create(envp, flags); ydb_unlock(); return r;
} }
static void toku_txn_release_locks(DB_TXN* txn) {
assert(txn);
toku_lth* lth = txn->i->lth;
assert(lth);
int r;
toku_lth_start_scan(lth);
toku_lock_tree* next = toku_lth_next(lth);
while (next) {
r = toku_lt_unlock(next, txn);
/* Only NULL parameters can give a non 0 return value. */
assert(r==0);
next = toku_lth_next(lth);
}
toku_lth_close(lth);
txn->i->lth = NULL;
}
static int toku_txn_commit(DB_TXN * txn, u_int32_t flags) { static int toku_txn_commit(DB_TXN * txn, u_int32_t flags) {
HANDLE_PANICKED_ENV(txn->mgrp); HANDLE_PANICKED_ENV(txn->mgrp);
//notef("flags=%d\n", flags); //notef("flags=%d\n", flags);
...@@ -774,6 +788,7 @@ static int toku_txn_commit(DB_TXN * txn, u_int32_t flags) { ...@@ -774,6 +788,7 @@ static int toku_txn_commit(DB_TXN * txn, u_int32_t flags) {
toku_free(txn->i->tokutxn); toku_free(txn->i->tokutxn);
} }
// Cleanup */ // Cleanup */
toku_txn_release_locks(txn);
if (txn->i) if (txn->i)
toku_free(txn->i); toku_free(txn->i);
toku_free(txn); toku_free(txn);
...@@ -792,6 +807,8 @@ static TXNID next_txn = 0; ...@@ -792,6 +807,8 @@ static TXNID next_txn = 0;
static int toku_txn_abort(DB_TXN * txn) { static int toku_txn_abort(DB_TXN * txn) {
HANDLE_PANICKED_ENV(txn->mgrp); HANDLE_PANICKED_ENV(txn->mgrp);
int r = toku_logger_abort(txn->i->tokutxn); int r = toku_logger_abort(txn->i->tokutxn);
toku_txn_release_locks(txn);
toku_free(txn->i); toku_free(txn->i);
toku_free(txn); toku_free(txn);
return r; return r;
...@@ -833,9 +850,20 @@ static int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t f ...@@ -833,9 +850,20 @@ static int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t f
result->commit = locked_txn_commit; result->commit = locked_txn_commit;
result->id = locked_txn_id; result->id = locked_txn_id;
MALLOC(result->i); MALLOC(result->i);
assert(result->i); if (!result->i) {
toku_free(result);
return ENOMEM;
}
result->i->parent = stxn; result->i->parent = stxn;
int r = toku_logger_txn_begin(stxn ? stxn->i->tokutxn : 0, &result->i->tokutxn, next_txn++, env->i->logger);
int r = toku_lth_create(&result->i->lth, toku_malloc, toku_free, toku_realloc);
if (r!=0) {
toku_free(result->i);
toku_free(result);
return r;
}
r = toku_logger_txn_begin(stxn ? stxn->i->tokutxn : 0, &result->i->tokutxn, next_txn++, env->i->logger);
if (r != 0) if (r != 0)
return r; return r;
*txn = result; *txn = result;
...@@ -1106,14 +1134,128 @@ static int toku_c_pget(DBC * c, DBT *key, DBT *pkey, DBT *data, u_int32_t flag) ...@@ -1106,14 +1134,128 @@ static int toku_c_pget(DBC * c, DBT *key, DBT *pkey, DBT *data, u_int32_t flag)
static int toku_c_get(DBC * c, DBT * key, DBT * data, u_int32_t flag) { static int toku_c_get(DBC * c, DBT * key, DBT * data, u_int32_t flag) {
DB *db = c->dbp; DB *db = c->dbp;
// DB_TXN *txn = c->i->txn;
HANDLE_PANICKED_DB(db); HANDLE_PANICKED_DB(db);
u_int32_t get_flag = get_main_cursor_flag(flag);
int r; int r;
if (db->i->primary==0) r = toku_c_get_noassociate(c, key, data, flag); if (db->i->primary==0) {
r = toku_c_get_noassociate(c, key, data, flag);
int r2 = 0;
switch (get_flag) {
/*
These should be done but were not tested prior to commit.
case (DB_CURRENT): {
// No locking necessary. You already own a lock by virtue
// of having a cursor pointing to this.
break;
}
case (DB_FIRST): {
int r2;
if (r == DB_NOTFOUND) {
r2 = toku_lt_acquire_range_read_lock(db->i->lt, txn,
toku_lt_neg_infinity, toku_lt_neg_infinity,
toku_lt_infinity, toku_lt_infinity);
}
else if (r == 0) {
r2 = toku_lt_acquire_range_read_lock(db->i->lt, txn,
toku_lt_neg_infinity, toku_lt_neg_infinity,
key, data);
}
else return r;
break;
}
case (DB_LAST): {
if (r == DB_NOTFOUND) {
r2 = toku_lt_acquire_range_read_lock(db->i->lt, txn,
toku_lt_neg_infinity, toku_lt_neg_infinity,
toku_lt_infinity, toku_lt_infinity);
}
else if (r == 0) {
r2 = toku_lt_acquire_range_read_lock(db->i->lt, txn,
key, data,
toku_lt_infinity, toku_lt_infinity);
}
else return r;
break;
}
case (DB_SET): {
if (r == DB_NOTFOUND) {
r2 = toku_lt_acquire_range_read_lock(db->i->lt, txn,
key, toku_lt_neg_infinity,
key, toku_lt_infinity);
}
else if (r == 0) {
r2 = toku_lt_acquire_range_read_lock(db->i->lt, txn,
key, toku_lt_neg_infinity,
key, data);
}
else return r;
break;
}
case (DB_GET_BOTH): {
if (r != DB_NOTFOUND && r != 0) return r;
r2 = toku_lt_acquire_read_lock(db->i->lt, txn, key, data);
break;
}
*/
/*
These are not ready and are just notes.
case (DB_GET_BOTH_RANGE): {
barf();
//Not ready yet.
break;
}
case (DB_NEXT): {
post_get_DB_NEXT:
//TODO: Need 'am I initialized' function, and if not, goto post_get_DB_FIRST
//TODO: Need get old data function. MUST BE CALLED BEFORE CGET
break;
}
case (DB_PREV): {
//TODO: Need 'am I initialized' function
//TODO: Need get old data function. MUST BE CALLED BEFORE CGET
break;
}
case (DB_SET_RANGE): {
//TODO: Need to save key_in
if (r == DB_NOTFOUND) {
r2 = toku_lt_acquire_range_read_lock(db->i->lt, txn,
key_in, toku_lt_neg_infinity,
toku_lt_infinity, toku_lt_infinity);
}
else if (r == 0) {
r2 = toku_lt_acquire_range_read_lock(db->i->lt, txn,
key_in, toku_lt_neg_infinity,
key, data);
}
else return r;
break;
}
case (DB_NEXT_NODUP): {
goto post_get_DB_NEXT;
}
case (DB_PREV_NODUP): {
goto post_get_DB_PREV;
}
case (DB_NEXT_DUP): {
//Not ready yet./not needed for MySQL.
barf();
break;
}
default: {
barf();
assert(FALSE);
}
*/
}
if (r2!=0) return r2;
}
else { else {
// It's a c_get on a secondary. // It's a c_get on a secondary.
DBT primary_key; DBT primary_key;
u_int32_t get_flag = get_main_cursor_flag(flag);
/* It is an error to use the DB_GET_BOTH or DB_GET_BOTH_RANGE flag on a /* It is an error to use the DB_GET_BOTH or DB_GET_BOTH_RANGE flag on a
* cursor that has been opened on a secondary index handle. * cursor that has been opened on a secondary index handle.
...@@ -1659,6 +1801,20 @@ static int toku_db_lt_panic(DB* db, int r) { ...@@ -1659,6 +1801,20 @@ static int toku_db_lt_panic(DB* db, int r) {
return EINVAL; return EINVAL;
} }
static int toku_txn_add_lt(DB_TXN* txn, toku_lock_tree* lt) {
assert(txn && lt);
toku_lth* lth = txn->i->lth;
assert(lth);
toku_lock_tree* find = toku_lth_find(lth, lt);
if (find) {
assert(find == lt);
return 0;
}
int r = toku_lth_insert(lth, lt);
return r;
}
static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYPE dbtype, u_int32_t flags, int mode) { static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYPE dbtype, u_int32_t flags, int mode) {
HANDLE_PANICKED_DB(db); HANDLE_PANICKED_DB(db);
// Warning. Should check arguments. Should check return codes on malloc and open and so forth. // Warning. Should check arguments. Should check return codes on malloc and open and so forth.
...@@ -1729,6 +1885,8 @@ static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *db ...@@ -1729,6 +1885,8 @@ static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *db
db->i->brt->compare_fun, db->i->brt->dup_compare, db->i->brt->compare_fun, db->i->brt->dup_compare,
toku_malloc, toku_free, toku_realloc); toku_malloc, toku_free, toku_realloc);
if (r!=0) goto error_cleanup; if (r!=0) goto error_cleanup;
r = toku_lt_set_txn_add_lt_callback(db->i->lt, toku_txn_add_lt);
assert(r==0);
r = toku_brt_open(db->i->brt, db->i->full_fname, fname, dbname, r = toku_brt_open(db->i->brt, db->i->full_fname, fname, dbname,
is_db_create, is_db_excl, is_db_unknown, is_db_create, is_db_excl, is_db_unknown,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment