From 44894c394a847e8071abd706d5ee7aeb91cdf372 Mon Sep 17 00:00:00 2001 From: Yoni Fogel <yoni@tokutek.com> Date: Mon, 11 Feb 2008 20:00:19 +0000 Subject: [PATCH] Addresses #368 Deals with NULL transactions in an INIT_TXN environment. Does not deal with NULL txn cursors yet. Also adds DB_AUTO_COMMIT git-svn-id: file:///svn/tokudb@2252 c7de825b-a66e-492c-adef-691d508d4ae1 --- newbrt/brt.c | 6 +- newbrt/brt.h | 1 + src/lock_tree/locktree.c | 110 ++++++++++++----------- src/ydb.c | 185 +++++++++++++++++++++++++++++++-------- 4 files changed, 215 insertions(+), 87 deletions(-) diff --git a/newbrt/brt.c b/newbrt/brt.c index c44620d9298..66a5c547ac0 100644 --- a/newbrt/brt.c +++ b/newbrt/brt.c @@ -1999,10 +1999,14 @@ static inline void brt_cursor_cleanup(BRT_CURSOR cursor) { dbt_cleanup(&cursor->val); } -inline int brt_cursor_not_set(BRT_CURSOR cursor) { +static inline int brt_cursor_not_set(BRT_CURSOR cursor) { return cursor->key.data == 0 || cursor->val.data == 0; } +BOOL toku_brt_cursor_uninitialized(BRT_CURSOR c) { + return brt_cursor_not_set(c); +} + static inline void brt_cursor_set_key_val(BRT_CURSOR cursor, DBT *newkey, DBT *newval) { brt_cursor_cleanup(cursor); cursor->key = *newkey; memset(newkey, 0, sizeof *newkey); diff --git a/newbrt/brt.h b/newbrt/brt.h index 6b4d7f3ac59..34492f8be1d 100644 --- a/newbrt/brt.h +++ b/newbrt/brt.h @@ -51,6 +51,7 @@ int toku_brt_cursor (BRT, BRT_CURSOR*); int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int brtc_flags, TOKUTXN); int toku_brt_cursor_delete(BRT_CURSOR cursor, int flags, TOKUTXN); int toku_brt_cursor_close (BRT_CURSOR curs); +BOOL toku_brt_cursor_uninitialized(BRT_CURSOR c); typedef struct brtenv *BRTENV; int brtenv_checkpoint (BRTENV env); diff --git a/src/lock_tree/locktree.c b/src/lock_tree/locktree.c index 3f669dc22c9..7a58844620d 100644 --- a/src/lock_tree/locktree.c +++ b/src/lock_tree/locktree.c @@ -44,7 +44,7 @@ char* toku_lt_strerror(TOKU_LT_ERROR r) { return "Unknown error in locking data structures.\n"; } /* Compare two payloads assuming that at least one of them is infinite */ -static int __toku_infinite_compare(void* a, void* b) { +inline static int __toku_infinite_compare(void* a, void* b) { if (a == b) return 0; if (a == toku_lt_infinity) return 1; if (b == toku_lt_infinity) return -1; @@ -52,7 +52,7 @@ static int __toku_infinite_compare(void* a, void* b) { assert(b == toku_lt_neg_infinity); return 1; } -static BOOL __toku_lt_is_infinite(const void* p) { +inline static BOOL __toku_lt_is_infinite(const void* p) { if (p == toku_lt_infinity || p == toku_lt_neg_infinity) { DBT* dbt = (DBT*)p; assert(!dbt->data && !dbt->size); @@ -63,19 +63,19 @@ static BOOL __toku_lt_is_infinite(const void* p) { /* Verifies that NULL data and size are consistent. i.e. The size is 0 if and only if the data is NULL. */ -static void __toku_lt_verify_null_key(const DBT* key) { - assert(!key || __toku_lt_is_infinite(key) || - (key->size > 0) == (key->data != NULL)); +inline static int __toku_lt_verify_null_key(const DBT* key) { + if (key && key->size && !key->data) return EINVAL; + return 0; } -static DBT* __toku_recreate_DBT(DBT* dbt, void* payload, u_int32_t length) { +inline static DBT* __toku_recreate_DBT(DBT* dbt, void* payload, u_int32_t length) { memset(dbt, 0, sizeof(DBT)); dbt->data = payload; dbt->size = length; return dbt; } -static int __toku_lt_txn_cmp(void* a, void* b) { +inline static int __toku_lt_txn_cmp(void* a, void* b) { return a < b ? -1 : (a != b); } @@ -118,27 +118,27 @@ int __toku_lt_point_cmp(void* a, void* b) { /* Functions to update the range count and compare it with the maximum number of ranges */ -static BOOL __toku_lt_range_test_incr(toku_lock_tree* tree, u_int32_t replace) { +inline static BOOL __toku_lt_range_test_incr(toku_lock_tree* tree, u_int32_t replace) { assert(tree); assert(tree->num_ranges); assert(replace <= *tree->num_ranges); return *tree->num_ranges - replace < tree->max_ranges; } -static void __toku_lt_range_incr(toku_lock_tree* tree, u_int32_t replace) { +inline static void __toku_lt_range_incr(toku_lock_tree* tree, u_int32_t replace) { assert(__toku_lt_range_test_incr(tree, replace)); *tree->num_ranges -= replace; *tree->num_ranges += 1; } -static void __toku_lt_range_decr(toku_lock_tree* tree, u_int32_t ranges) { +inline static void __toku_lt_range_decr(toku_lock_tree* tree, u_int32_t ranges) { assert(tree); assert(tree->num_ranges); assert(*tree->num_ranges >= ranges); *tree->num_ranges -= ranges; } -static void __toku_p_free(toku_lock_tree* tree, toku_point* point) { +inline static void __toku_p_free(toku_lock_tree* tree, toku_point* point) { assert(point); if (!__toku_lt_is_infinite(point->key_payload)) { tree->free(point->key_payload); @@ -152,7 +152,7 @@ static void __toku_p_free(toku_lock_tree* tree, toku_point* point) { /* Allocate and copy the payload. */ -static int __toku_payload_copy(toku_lock_tree* tree, +inline static int __toku_payload_copy(toku_lock_tree* tree, void** payload_out, u_int32_t* len_out, void* payload_in, u_int32_t len_in) { assert(payload_out && len_out); @@ -171,7 +171,7 @@ static int __toku_payload_copy(toku_lock_tree* tree, return 0; } -static int __toku_p_makecopy(toku_lock_tree* tree, void** ppoint) { +inline static int __toku_p_makecopy(toku_lock_tree* tree, void** ppoint) { assert(ppoint); toku_point* point = *(toku_point**)ppoint; toku_point* temp_point = NULL; @@ -219,7 +219,7 @@ toku_range_tree* __toku_lt_ifexist_selfwrite(toku_lock_tree* tree, /* Provides access to a selfread tree for a particular transaction. Creates it if it does not exist. */ -static int __toku_lt_selfread(toku_lock_tree* tree, DB_TXN* txn, +inline static int __toku_lt_selfread(toku_lock_tree* tree, DB_TXN* txn, toku_range_tree** pselfread) { int r; assert(tree && txn && pselfread); @@ -246,7 +246,7 @@ static int __toku_lt_selfread(toku_lock_tree* tree, DB_TXN* txn, /* Provides access to a selfwrite tree for a particular transaction. Creates it if it does not exist. */ -static int __toku_lt_selfwrite(toku_lock_tree* tree, DB_TXN* txn, +inline static int __toku_lt_selfwrite(toku_lock_tree* tree, DB_TXN* txn, toku_range_tree** pselfwrite) { int r; assert(tree && txn && pselfwrite); @@ -276,7 +276,7 @@ static int __toku_lt_selfwrite(toku_lock_tree* tree, DB_TXN* txn, Uses the standard definition of dominated from the design document. Determines whether 'query' is dominated by 'rt'. */ -static int __toku_lt_rt_dominates(toku_lock_tree* tree, toku_range* query, +inline static int __toku_lt_rt_dominates(toku_lock_tree* tree, toku_range* query, toku_range_tree* rt, BOOL* dominated) { assert(tree && query && dominated); if (!rt) { @@ -321,7 +321,7 @@ typedef enum If exactly one range overlaps and its data != self, there might be a conflict. We need to check the 'peer'write table to verify. */ -static int __toku_lt_borderwrite_conflict(toku_lock_tree* tree, DB_TXN* self, +inline static int __toku_lt_borderwrite_conflict(toku_lock_tree* tree, DB_TXN* self, toku_range* query, toku_conflict* conflict, DB_TXN** peer) { assert(tree && self && query && conflict && peer); @@ -355,7 +355,7 @@ static int __toku_lt_borderwrite_conflict(toku_lock_tree* tree, DB_TXN* self, Uses the standard definition of 'query' meets 'tree' at 'data' from the design document. */ -static int __toku_lt_meets(toku_lock_tree* tree, toku_range* query, +inline static int __toku_lt_meets(toku_lock_tree* tree, toku_range* query, toku_range_tree* rt, BOOL* met) { assert(tree && query && rt && met); const u_int32_t query_size = 1; @@ -385,7 +385,7 @@ static int __toku_lt_meets(toku_lock_tree* tree, toku_range* query, Uses the standard definition of 'query' meets 'tree' at 'data' from the design document. */ -static int __toku_lt_meets_peer(toku_lock_tree* tree, toku_range* query, +inline static int __toku_lt_meets_peer(toku_lock_tree* tree, toku_range* query, toku_range_tree* rt, DB_TXN* self, BOOL* met) { assert(tree && query && rt && self && met); assert(query->left == query->right); @@ -408,7 +408,7 @@ static int __toku_lt_meets_peer(toku_lock_tree* tree, toku_range* query, Utility function to implement: (from design document) if K meets E at v'!=t and K meets W_v' then return failure. */ -static int __toku_lt_check_borderwrite_conflict(toku_lock_tree* tree, +inline static int __toku_lt_check_borderwrite_conflict(toku_lock_tree* tree, DB_TXN* txn, toku_range* query) { assert(tree && txn && query); toku_conflict conflict; @@ -433,18 +433,21 @@ static int __toku_lt_check_borderwrite_conflict(toku_lock_tree* tree, return 0; } -static void __toku_payload_from_dbt(void** payload, u_int32_t* len, +inline static void __toku_payload_from_dbt(void** payload, u_int32_t* len, const DBT* dbt) { assert(payload && len && dbt); if (__toku_lt_is_infinite(dbt)) *payload = (void*)dbt; - else { - assert(!dbt->data == !dbt->size); + else if (!dbt->size) { + *payload = NULL; + *len = 0; + } else { + assert(dbt->data); *payload = dbt->data; + *len = dbt->size; } - *len = dbt->size; } -static void __toku_init_point(toku_point* point, toku_lock_tree* tree, +inline static void __toku_init_point(toku_point* point, toku_lock_tree* tree, const DBT* key, const DBT* data) { assert(point && tree && key); assert(!tree->duplicates == !data); @@ -463,14 +466,14 @@ static void __toku_init_point(toku_point* point, toku_lock_tree* tree, } } -static void __toku_init_query(toku_range* query, +inline static void __toku_init_query(toku_range* query, toku_point* left, toku_point* right) { query->left = left; query->right = right; query->data = NULL; } -static void __toku_init_insert(toku_range* to_insert, +inline static void __toku_init_insert(toku_range* to_insert, toku_point* left, toku_point* right, DB_TXN* txn) { to_insert->left = left; @@ -480,12 +483,12 @@ static void __toku_init_insert(toku_range* to_insert, /* Returns whether the point already exists as an endpoint of the given range. */ -static BOOL __toku_lt_p_independent(toku_point* point, toku_range* range) { +inline static BOOL __toku_lt_p_independent(toku_point* point, toku_range* range) { assert(point && range); return point != range->left && point != range->right; } -static int __toku_lt_extend_extreme(toku_lock_tree* tree,toku_range* to_insert, +inline static int __toku_lt_extend_extreme(toku_lock_tree* tree,toku_range* to_insert, BOOL* alloc_left, BOOL* alloc_right, u_int32_t numfound) { assert(to_insert && tree && alloc_left && alloc_right); @@ -517,7 +520,7 @@ static int __toku_lt_extend_extreme(toku_lock_tree* tree,toku_range* to_insert, return 0; } -static int __toku_lt_alloc_extreme(toku_lock_tree* tree, toku_range* to_insert, +inline static int __toku_lt_alloc_extreme(toku_lock_tree* tree, toku_range* to_insert, BOOL alloc_left, BOOL* alloc_right) { assert(to_insert && alloc_right); BOOL copy_left = FALSE; @@ -547,7 +550,7 @@ static int __toku_lt_alloc_extreme(toku_lock_tree* tree, toku_range* to_insert, return 0; } -static int __toku_lt_delete_overlapping_ranges(toku_lock_tree* tree, +inline static int __toku_lt_delete_overlapping_ranges(toku_lock_tree* tree, toku_range_tree* rt, u_int32_t numfound) { assert(tree && rt); @@ -561,7 +564,7 @@ static int __toku_lt_delete_overlapping_ranges(toku_lock_tree* tree, return 0; } -static int __toku_lt_free_points(toku_lock_tree* tree, toku_range* to_insert, +inline static int __toku_lt_free_points(toku_lock_tree* tree, toku_range* to_insert, u_int32_t numfound, toku_range_tree *rt) { assert(tree && to_insert); assert(numfound <= tree->buflen); @@ -591,7 +594,7 @@ static int __toku_lt_free_points(toku_lock_tree* tree, toku_range* to_insert, } /* Consolidate the new range and all the overlapping ranges */ -static int __toku_consolidate(toku_lock_tree* tree, +inline static int __toku_consolidate(toku_lock_tree* tree, toku_range* query, toku_range* to_insert, DB_TXN* txn) { int r; @@ -656,7 +659,7 @@ static int __toku_consolidate(toku_lock_tree* tree, return 0; } -static void __toku_lt_init_full_query(toku_lock_tree* tree, toku_range* query, +inline static void __toku_lt_init_full_query(toku_lock_tree* tree, toku_range* query, toku_point* left, toku_point* right) { __toku_init_point(left, tree, (DBT*)toku_lt_neg_infinity, tree->duplicates ? (DBT*)toku_lt_neg_infinity : NULL); @@ -665,7 +668,7 @@ static void __toku_lt_init_full_query(toku_lock_tree* tree, toku_range* query, __toku_init_query(query, left, right); } -static int __toku_lt_free_contents_slow(toku_lock_tree* tree, +inline static int __toku_lt_free_contents_slow(toku_lock_tree* tree, toku_range_tree* rt, toku_range_tree* rtdel) { int r; @@ -693,7 +696,7 @@ static int __toku_lt_free_contents_slow(toku_lock_tree* tree, return r; } -static int __toku_lt_free_contents(toku_lock_tree* tree, toku_range_tree* rt, +inline static int __toku_lt_free_contents(toku_lock_tree* tree, toku_range_tree* rt, toku_range_tree *rtdel) { assert(tree); if (!rt) return 0; @@ -716,7 +719,7 @@ static int __toku_lt_free_contents(toku_lock_tree* tree, toku_range_tree* rt, return r; } -static BOOL __toku_r_backwards(toku_range* range) { +inline static BOOL __toku_r_backwards(toku_range* range) { assert(range && range->left && range->right); toku_point* left = (toku_point*)range->left; toku_point* right = (toku_point*)range->right; @@ -728,7 +731,7 @@ static BOOL __toku_r_backwards(toku_range* range) { } -static int __toku_lt_preprocess(toku_lock_tree* tree, DB_TXN* txn, +inline static int __toku_lt_preprocess(toku_lock_tree* tree, DB_TXN* txn, const DBT* key_left, const DBT** pdata_left, const DBT* key_right, const DBT** pdata_right, toku_point* left, toku_point* right, @@ -744,12 +747,13 @@ static int __toku_lt_preprocess(toku_lock_tree* tree, DB_TXN* txn, if (tree->duplicates && key_right != data_right && __toku_lt_is_infinite(key_right)) return EINVAL; + int r; /* Verify that NULL keys have payload and size that are mutually consistent*/ - __toku_lt_verify_null_key(key_left); - __toku_lt_verify_null_key(data_left); - __toku_lt_verify_null_key(key_right); - __toku_lt_verify_null_key(data_right); + if ((r = __toku_lt_verify_null_key(key_left)) != 0) return r; + if ((r = __toku_lt_verify_null_key(data_left)) != 0) return r; + if ((r = __toku_lt_verify_null_key(key_right)) != 0) return r; + if ((r = __toku_lt_verify_null_key(data_right)) != 0) return r; __toku_init_point(left, tree, key_left, data_left); __toku_init_point(right, tree, key_right, data_right); @@ -758,12 +762,12 @@ static int __toku_lt_preprocess(toku_lock_tree* tree, DB_TXN* txn, if (__toku_r_backwards(query)) return EDOM; tree->dups_final = TRUE; - int r = __toku_lt_callback(tree, txn); + r = __toku_lt_callback(tree, txn); if (r!=0) return r; return 0; } -static int __toku_lt_get_border(toku_lock_tree* tree, BOOL in_borderwrite, +inline static int __toku_lt_get_border(toku_lock_tree* tree, BOOL in_borderwrite, toku_range* pred, toku_range* succ, BOOL* found_p, BOOL* found_s, toku_range* to_insert) { @@ -780,7 +784,7 @@ static int __toku_lt_get_border(toku_lock_tree* tree, BOOL in_borderwrite, return 0; } -static int __toku_lt_expand_border(toku_lock_tree* tree, toku_range* to_insert, +inline static int __toku_lt_expand_border(toku_lock_tree* tree, toku_range* to_insert, toku_range* pred, toku_range* succ, BOOL found_p, BOOL found_s) { assert(tree && to_insert && pred && succ); @@ -798,7 +802,7 @@ static int __toku_lt_expand_border(toku_lock_tree* tree, toku_range* to_insert, return 0; } -static int __toku_lt_split_border(toku_lock_tree* tree, toku_range* to_insert, +inline static int __toku_lt_split_border(toku_lock_tree* tree, toku_range* to_insert, toku_range* pred, toku_range* succ, BOOL found_p, BOOL found_s) { assert(tree && to_insert && pred && succ); @@ -850,7 +854,7 @@ static int __toku_lt_split_border(toku_lock_tree* tree, toku_range* to_insert, done with borderwrite. insert point,point into selfwrite. */ -static int __toku_lt_borderwrite_insert(toku_lock_tree* tree, +inline static int __toku_lt_borderwrite_insert(toku_lock_tree* tree, toku_range* query, toku_range* to_insert) { assert(tree && query && to_insert); @@ -1083,7 +1087,13 @@ int toku_lt_acquire_range_write_lock(toku_lock_tree* tree, DB_TXN* txn, toku_point left; toku_point right; toku_range query; - + + assert(tree); + if (key_left == key_right && + (data_left == data_right || !tree->duplicates)) { + return toku_lt_acquire_write_lock(tree, txn, key_left, data_left); + } + r = __toku_lt_preprocess(tree, txn, key_left, &data_left, key_right, &data_right, &left, &right, @@ -1096,7 +1106,7 @@ int toku_lt_acquire_range_write_lock(toku_lock_tree* tree, DB_TXN* txn, } -static int __toku_sweep_border(toku_lock_tree* tree, toku_range* range) { +inline static int __toku_sweep_border(toku_lock_tree* tree, toku_range* range) { assert(tree && range); toku_range_tree* borderwrite = tree->borderwrite; assert(borderwrite); @@ -1164,7 +1174,7 @@ static int __toku_sweep_border(toku_lock_tree* tree, toku_range* range) { If both found and pred.data=succ.data, merge pred and succ (expand?) free_points */ -static int __toku_lt_border_delete(toku_lock_tree* tree, toku_range_tree* rt) { +inline static int __toku_lt_border_delete(toku_lock_tree* tree, toku_range_tree* rt) { int r; assert(tree); if (!rt) return 0; diff --git a/src/ydb.c b/src/ydb.c index 1f58b93723b..f2eae7a01e3 100644 --- a/src/ydb.c +++ b/src/ydb.c @@ -514,9 +514,17 @@ static void toku_env_set_errpfx(DB_ENV * env, const char *errpfx) { static int toku_env_set_flags(DB_ENV * env, u_int32_t flags, int onoff) { HANDLE_PANICKED_ENV(env); + + u_int32_t change = 0; + if (flags & DB_AUTO_COMMIT) { + change |= DB_AUTO_COMMIT; + flags &= ~DB_AUTO_COMMIT; + } if (flags != 0 && onoff) { - return do_error(env, EINVAL, "TokuDB does not (yet) support any nonzero ENV flags\n"); + return do_error(env, EINVAL, "TokuDB does not (yet) support any nonzero ENV flags other than DB_AUTO_COMMIT\n"); } + if (onoff) env->i->open_flags |= change; + else env->i->open_flags &= ~change; return 0; } @@ -782,20 +790,21 @@ int db_env_create(DB_ENV ** envp, u_int32_t flags) { static int toku_txn_release_locks(DB_TXN* txn) { assert(txn); toku_lth* lth = txn->i->lth; - assert(lth); - int r; - int r2 = 0; - toku_lth_start_scan(lth); - toku_lock_tree* next = toku_lth_next(lth); - while (next) { - r = toku_lt_unlock(next, txn); - if (r!=0 && !r2) r2 = r; - next = toku_lth_next(lth); - } - toku_lth_close(lth); - txn->i->lth = NULL; - return r2; + int r = 0; + if (lth) { + toku_lth_start_scan(lth); + toku_lock_tree* next = toku_lth_next(lth); + int r2; + while (next) { + r2 = toku_lt_unlock(next, txn); + if (r2!=0 && !r) r = r2; + next = toku_lth_next(lth); + } + toku_lth_close(lth); + txn->i->lth = NULL; + } + return r; } static int toku_txn_commit(DB_TXN * txn, u_int32_t flags) { @@ -864,6 +873,7 @@ static int locked_txn_abort(DB_TXN *txn) { static int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags) { HANDLE_PANICKED_ENV(env); if (!toku_logger_is_open(env->i->logger)) return do_error(env, EINVAL, "Environment does not have logging enabled\n"); + if (!(env->i->open_flags & DB_INIT_TXN)) return do_error(env, EINVAL, "Environment does not have transactions enabled\n"); flags=flags; DB_TXN *MALLOC(result); if (result == 0) @@ -880,12 +890,16 @@ static int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t f return ENOMEM; } result->i->parent = stxn; - - int r = toku_lth_create(&result->i->lth, toku_malloc, toku_free, toku_realloc); - if (r!=0) { - toku_free(result->i); - toku_free(result); - return r; + + int r; + if (env->i->open_flags & DB_INIT_LOCK) { + r = toku_lth_create(&result->i->lth, + toku_malloc, toku_free, toku_realloc); + if (r!=0) { + toku_free(result->i); + toku_free(result); + return r; + } } r = toku_logger_txn_begin(stxn ? stxn->i->tokutxn : 0, &result->i->tokutxn, next_txn++, env->i->logger); @@ -1038,16 +1052,14 @@ static int get_main_cursor_flag(u_int32_t flag) { return flag; } -int brt_cursor_not_set(BRT_CURSOR cursor); - -static BOOL toku_c_uninitialized(DBC* c) { - return brt_cursor_not_set(c->i->c); -} - - +inline static BOOL toku_c_uninitialized(DBC* c) { + return toku_brt_cursor_uninitialized(c->i->c); +} static int toku_c_get_current_unconditional(DBC* c, DBT* key, DBT* data) { assert(!toku_c_uninitialized(c)); + memset(key, 0, sizeof(DBT)); + memset(data, 0, sizeof(DBT)); data->flags = key->flags = DB_DBT_MALLOC; TOKUTXN txn = c->i->txn ? c->i->txn->i->tokutxn : NULL; int r = toku_brt_cursor_get(c->i->c, key, data, DB_CURRENT_BINDING, txn); @@ -1102,7 +1114,6 @@ static int toku_c_get_pre_lock(DBC* c, DBT* key, DBT* data, u_int32_t* flag, r = toku_save_original_data(saved_key, key); break; } -//TODO: #warning "Verify our understanding of DB_GET_BOTH_RANGE IS CORRECT HERE" case (DB_GET_BOTH_RANGE): { if (!duplicates) { toku_swap_flag(flag, &get_flag, DB_GET_BOTH); goto get_both; } @@ -1199,7 +1210,6 @@ static int toku_c_get_post_lock(DBC* c, DBT* key, DBT* data, u_int32_t flag, data_r = found ? data : toku_lt_infinity; break; } -//TODO: #warning "Verify our understanding of DB_GET_BOTH_RANGE IS CORRECT HERE" case (DB_GET_BOTH_RANGE): { key_l = key_r = key; data_l = saved_data; @@ -1272,8 +1282,23 @@ static int toku_c_get_noassociate(DBC * c, DBT * key, DBT * data, u_int32_t flag } static int toku_c_del_noassociate(DBC * c, u_int32_t flags) { - HANDLE_PANICKED_DB(c->dbp); - int r = toku_brt_cursor_delete(c->i->c, flags, c->i->txn ? c->i->txn->i->tokutxn : 0); + DB* db = c->dbp; + HANDLE_PANICKED_DB(db); + if (toku_c_uninitialized(c)) return EINVAL; + + int r; + if (db->i->lt) { + DBT saved_key; + DBT saved_data; + r = toku_c_get_current_unconditional(c, &saved_key, &saved_data); + if (r!=0) return r; + r = toku_lt_acquire_write_lock(db->i->lt, c->i->txn, + &saved_key, &saved_data); + if (saved_key.data) toku_free(saved_key.data); + if (saved_data.data) toku_free(saved_data.data); + if (r!=0) return r; + } + r = toku_brt_cursor_delete(c->i->c, flags, c->i->txn ? c->i->txn->i->tokutxn : 0); return r; } @@ -1488,6 +1513,12 @@ static int toku_db_del_noassociate(DB * db, DB_TXN * txn, DBT * key, u_int32_t f toku_free(search_val.data); } //Do the actual deleting. + if (db->i->lt) { + r = toku_lt_acquire_range_write_lock(db->i->lt, txn, + key, toku_lt_neg_infinity, + key, toku_lt_infinity); + if (r!=0) return r; + } r = toku_brt_delete(db->i->brt, key, txn ? txn->i->tokutxn : 0); return r; } @@ -2067,6 +2098,7 @@ static int toku_db_put_noassociate(DB * db, DB_TXN * txn, DBT * key, DBT * data, r = toku_db_get_noassociate(db, txn, key, toku_init_dbt(&testfordata), 0); if (r == 0) return DB_KEYEXIST; + if (r != DB_NOTFOUND) return r; } else if (flags != 0) { /* no other flags are currently supported */ return EINVAL; @@ -2077,6 +2109,7 @@ static int toku_db_put_noassociate(DB * db, DB_TXN * txn, DBT * key, DBT * data, r = toku_db_get_noassociate(db, txn, key, data, DB_GET_BOTH); if (r == 0) return DB_KEYEXIST; + if (r != DB_NOTFOUND) return r; #else return do_error(db->dbenv, EINVAL, "Tokudb requires that db->put specify DB_YESOVERWRITE or DB_NOOVERWRITE on DB_DUPSORT databases"); #endif @@ -2244,37 +2277,117 @@ static int toku_db_fd(DB *db, int *fdp) { #if _THREAD_SAFE +//TODO: DB_AUTO_COMMIT. +//TODO: Nowait only conditionally? +//TODO: NOSYNC change to SYNC if DB_ENV has something in set_flags +inline static int toku_db_construct_autotxn(DB* db, DB_TXN **txn, BOOL* changed, + BOOL force_auto_commit) { + assert(db && txn && changed); + DB_ENV* env = db->dbenv; + if (*txn || !(env->i->open_flags & DB_INIT_TXN)) { + *changed = FALSE; + return 0; + } + BOOL nosync = !force_auto_commit && !(env->i->open_flags & DB_AUTO_COMMIT); + u_int32_t txn_flags = DB_TXN_NOWAIT | (nosync ? DB_TXN_NOSYNC : 0); + int r = toku_txn_begin(env, NULL, txn, txn_flags); + if (r!=0) return r; + *changed = TRUE; + return 0; +} + +inline static int toku_db_destruct_autotxn(DB_TXN *txn, int r, BOOL changed) { + if (!changed) return r; + if (r==0) return toku_txn_commit(txn, 0); + toku_txn_abort(txn); + return r; +} + +inline static int autotxn_db_associate(DB *primary, DB_TXN *txn, DB *secondary, + int (*callback)(DB *secondary, const DBT *key, const DBT *data, DBT *result), u_int32_t flags) { + BOOL changed; int r; + r = toku_db_construct_autotxn(primary, &txn, &changed, FALSE); + if (r!=0) return r; + r = toku_db_associate(primary, txn, secondary, callback, flags); + return toku_db_destruct_autotxn(txn, r, changed); +} + static int locked_db_associate (DB *primary, DB_TXN *txn, DB *secondary, int (*callback)(DB *secondary, const DBT *key, const DBT *data, DBT *result), u_int32_t flags) { - ydb_lock(); int r = toku_db_associate(primary, txn, secondary, callback, flags); ydb_unlock(); return r; + ydb_lock(); int r = autotxn_db_associate(primary, txn, secondary, callback, flags); ydb_unlock(); return r; } static int locked_db_close(DB * db, u_int32_t flags) { ydb_lock(); int r = toku_db_close(db, flags); ydb_unlock(); return r; } +//TODO: Something about the cursor with no txn.. EINVAL maybe? static int locked_db_cursor(DB *db, DB_TXN *txn, DBC **c, u_int32_t flags) { ydb_lock(); int r = toku_db_cursor(db, txn, c, flags); ydb_unlock(); return r; } +inline static int autotxn_db_del(DB* db, DB_TXN* txn, DBT* key, + u_int32_t flags) { + BOOL changed; int r; + r = toku_db_construct_autotxn(db, &txn, &changed, FALSE); + if (r!=0) return r; + r = toku_db_del(db, txn, key, flags); + return toku_db_destruct_autotxn(txn, r, changed); +} + static int locked_db_del(DB * db, DB_TXN * txn, DBT * key, u_int32_t flags) { - ydb_lock(); int r = toku_db_del(db, txn, key, flags); ydb_unlock(); return r; + ydb_lock(); int r = autotxn_db_del(db, txn, key, flags); ydb_unlock(); return r; +} + +inline static int autotxn_db_get(DB* db, DB_TXN* txn, DBT* key, DBT* data, + u_int32_t flags) { + BOOL changed; int r; + r = toku_db_construct_autotxn(db, &txn, &changed, FALSE); + if (r!=0) return r; + r = toku_db_get(db, txn, key, data, flags); + return toku_db_destruct_autotxn(txn, r, changed); } static int locked_db_get (DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags) { - ydb_lock(); int r = toku_db_get(db, txn, key, data, flags); ydb_unlock(); return r; + ydb_lock(); int r = autotxn_db_get(db, txn, key, data, flags); ydb_unlock(); return r; +} + +inline static int autotxn_db_pget(DB* db, DB_TXN* txn, DBT* key, DBT* pkey, + DBT* data, u_int32_t flags) { + BOOL changed; int r; + r = toku_db_construct_autotxn(db, &txn, &changed, FALSE); + if (r!=0) return r; + r = toku_db_pget(db, txn, key, pkey, data, flags); + return toku_db_destruct_autotxn(txn, r, changed); } static int locked_db_pget (DB *db, DB_TXN *txn, DBT *key, DBT *pkey, DBT *data, u_int32_t flags) { - ydb_lock(); int r = toku_db_pget(db, txn, key, pkey, data, flags); ydb_unlock(); return r; + ydb_lock(); int r = autotxn_db_pget(db, txn, key, pkey, data, flags); ydb_unlock(); return r; +} + +inline static int autotxn_db_open(DB* db, DB_TXN* txn, const char *fname, const char *dbname, DBTYPE dbtype, u_int32_t flags, int mode) { + BOOL changed; int r; + r = toku_db_construct_autotxn(db, &txn, &changed, flags & DB_AUTO_COMMIT); + if (r!=0) return r; + r = toku_db_open(db, txn, fname, dbname, dbtype, flags & ~DB_AUTO_COMMIT, mode); + return toku_db_destruct_autotxn(txn, r, changed); } static int locked_db_open(DB *db, DB_TXN *txn, const char *fname, const char *dbname, DBTYPE dbtype, u_int32_t flags, int mode) { ydb_lock(); int r = toku_db_open(db, txn, fname, dbname, dbtype, flags, mode); ydb_unlock(); return r; } +inline static int autotxn_db_put(DB* db, DB_TXN* txn, DBT* key, DBT* data, + u_int32_t flags) { + BOOL changed; int r; + r = toku_db_construct_autotxn(db, &txn, &changed, FALSE); + if (r!=0) return r; + r = toku_db_put(db, txn, key, data, flags); + return toku_db_destruct_autotxn(txn, r, changed); +} + static int locked_db_put(DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags) { - ydb_lock(); int r = toku_db_put(db, txn, key, data, flags); ydb_unlock(); return r; + ydb_lock(); int r = autotxn_db_put(db, txn, key, data, flags); ydb_unlock(); return r; } static int locked_db_remove(DB * db, const char *fname, const char *dbname, u_int32_t flags) { -- 2.30.9