Commit 5fad7d0b authored by Yoni Fogel's avatar Yoni Fogel

Addresses #350

DB->get, DB->pget, DBC->c_get, DBC->c_pget almost done.
Remainder: associated dbs need DBC->c_del and DB->del to lock properly.
Rest is done.

Associated dbs (using any of the above calls) may get stuck half way due to locking.
We are not currently optimizing for them, but it will lock everything necessary
(Once DB->del and DB->c_del lock properly)

Some tests are written, but not all of them.

Currently test_db_txn_locks.c fails due to abort not working properly.


git-svn-id: file:///svn/tokudb@2210 c7de825b-a66e-492c-adef-691d508d4ae1
parent c6875ae9
......@@ -1999,7 +1999,7 @@ static inline void brt_cursor_cleanup(BRT_CURSOR cursor) {
dbt_cleanup(&cursor->val);
}
static inline int brt_cursor_not_set(BRT_CURSOR cursor) {
inline int brt_cursor_not_set(BRT_CURSOR cursor) {
return cursor->key.data == 0 || cursor->val.data == 0;
}
......
......@@ -206,6 +206,12 @@ void test(u_int32_t dup_flags) {
close_dbs();
/* ********************************************************************** */
setup_dbs(dup_flags);
put(TRUE, 'a', 1, 1);
early_abort('a');
cget(TRUE, FALSE, 'b', 1, 1);
close_dbs();
/* ********************************************************************** */
setup_dbs(dup_flags);
cget(TRUE, FALSE, 'a', 1, 1);
cget(TRUE, FALSE, 'b', 1, 1);
put(FALSE, 'a', 1, 1);
......@@ -222,6 +228,6 @@ void test(u_int32_t dup_flags) {
int main() {
test(0);
test(DB_DUP | DB_DUPSORT);
// test(DB_DUP | DB_DUPSORT);
return 0;
}
......@@ -111,6 +111,7 @@ static int toku_c_pget(DBC * c, DBT *key, DBT *pkey, DBT *data, u_int32_t flag);
static int toku_c_del(DBC *c, u_int32_t flags);
static int toku_c_count(DBC *cursor, db_recno_t *count, u_int32_t flags);
static int toku_c_close(DBC * c);
static int toku_save_original_data(DBT* dst, DBT* src);
/* misc */
static char *construct_full_name(const char *dir, const char *fname);
......@@ -1022,18 +1023,6 @@ static int verify_secondary_key(DB *secondary, DBT *pkey, DBT *data, DBT *skey)
return r;
}
static int toku_c_get_noassociate(DBC * c, DBT * key, DBT * data, u_int32_t flag) {
HANDLE_PANICKED_DB(c->dbp);
int r = toku_brt_cursor_get(c->i->c, key, data, flag, c->i->txn ? c->i->txn->i->tokutxn : 0);
return r;
}
static int toku_c_del_noassociate(DBC * c, u_int32_t flags) {
HANDLE_PANICKED_DB(c->dbp);
int r = toku_brt_cursor_delete(c->i->c, flags, c->i->txn ? c->i->txn->i->tokutxn : 0);
return r;
}
//Get the main portion of a cursor flag (excluding the bitwise or'd components).
static int get_main_cursor_flag(u_int32_t flag) {
#ifdef DB_READ_UNCOMMITTED
......@@ -1049,7 +1038,246 @@ static int get_main_cursor_flag(u_int32_t flag) {
return flag;
}
static int toku_c_pget_save_original_data(DBT* dst, DBT* src) {
int brt_cursor_not_set(BRT_CURSOR cursor);
static BOOL toku_c_uninitialized(DBC* c) {
return brt_cursor_not_set(c->i->c);
}
static int toku_c_get_current_unconditional(DBC* c, DBT* key, DBT* data) {
assert(!toku_c_uninitialized(c));
data->flags = key->flags = DB_DBT_MALLOC;
TOKUTXN txn = c->i->txn ? c->i->txn->i->tokutxn : NULL;
int r = toku_brt_cursor_get(c->i->c, key, data, DB_CURRENT_BINDING, txn);
return r;
}
static inline void toku_swap_flag(u_int32_t* flag, u_int32_t* get_flag,
u_int32_t new_flag) {
*flag -= *get_flag;
*get_flag = new_flag;
*flag += *get_flag;
}
static inline int toku_uninitialized_swap(DBC* c, DBT* key, DBT* data,
u_int32_t* flag, u_int32_t* get_flag,
u_int32_t new_flag) {
/* DB_FIRST/DB_LAST do nothing in pre_lock so we can skip the goto. */
if (toku_c_uninitialized(c)) toku_swap_flag(flag, get_flag, new_flag);
else return toku_c_get_current_unconditional(c, key, data);
return 0;
}
static int toku_c_get_pre_lock(DBC* c, DBT* key, DBT* data, u_int32_t* flag,
DBT* saved_key, DBT* saved_data) {
assert(saved_key && saved_data && flag);
DB* db = c->dbp;
if (!db->i->lt) return 0;
saved_key->data = NULL;
saved_data->data = NULL;
DB_TXN* txn = c->i->txn;
u_int32_t get_flag = get_main_cursor_flag(*flag);
unsigned int brtflags;
toku_brt_get_flags(db->i->brt, &brtflags);
BOOL duplicates = (brtflags & TOKU_DB_DUPSORT) != 0;
int r = 0;
switch (get_flag) {
case (DB_CURRENT):
case (DB_SET):
case (DB_FIRST):
case (DB_LAST): {
/* The above cases have all their code in toku_c_get_post_lock. */
break;
}
case (DB_GET_BOTH): {
get_both:
r = toku_lt_acquire_read_lock(db->i->lt, txn, key, data);
break;
}
case (DB_SET_RANGE): {
r = toku_save_original_data(saved_key, key);
break;
}
//TODO: #warning "Verify our understanding of DB_GET_BOTH_RANGE IS CORRECT HERE"
case (DB_GET_BOTH_RANGE): {
if (!duplicates) {
toku_swap_flag(flag, &get_flag, DB_GET_BOTH); goto get_both; }
r = toku_save_original_data(saved_data, data);
break;
}
case (DB_NEXT):
case (DB_NEXT_NODUP): {
r = toku_uninitialized_swap(c, saved_key, saved_data, flag,
&get_flag, DB_FIRST);
break;
}
case (DB_PREV):
case (DB_PREV_NODUP): {
r = toku_uninitialized_swap(c, saved_key, saved_data, flag,
&get_flag, DB_LAST);
break;
}
#ifdef DB_PREV_DUP
case (DB_PREV_DUP):
#endif
case (DB_NEXT_DUP): {
if (!duplicates || toku_c_uninitialized(c)) r = EINVAL;
else r = toku_c_get_current_unconditional(c, saved_key, saved_data);
}
default: {
//TODO: Output an error.
r = EINVAL;
break;
}
}
return r;
}
static int toku_c_get_post_lock(DBC* c, DBT* key, DBT* data, u_int32_t flag,
int r_last, DBT* saved_key, DBT* saved_data) {
assert(saved_key && saved_data);
DB* db = c->dbp;
if (!db->i->lt) return r_last;
int r = 0;
if (r_last && r_last != DB_NOTFOUND && r_last != DB_KEYEMPTY) {
r = r_last;
goto cleanup;
}
DB_TXN* txn = c->i->txn;
u_int32_t get_flag = get_main_cursor_flag(flag);
if (r_last == DB_KEYEMPTY) {
assert(get_flag == DB_CURRENT);
return r_last;
}
assert(r_last == DB_NOTFOUND || r_last == 0);
BOOL found = r_last == 0;
BOOL lock = TRUE;
const DBT* key_l;
const DBT* key_r;
const DBT* data_l;
const DBT* data_r;
switch (get_flag) {
case (DB_CURRENT): {
/* No locking necessary. You already own a lock by virtue
of having a cursor pointing to this. */
lock = FALSE;
break;
}
case (DB_SET): {
key_l = key_r = key;
data_l = toku_lt_neg_infinity;
data_r = found ? data : toku_lt_infinity;
break;
}
case (DB_GET_BOTH): {
/* All done in toku_c_get_pre_lock. */
lock = FALSE;
break;
}
case (DB_FIRST): {
key_l = data_l = toku_lt_neg_infinity;
key_r = found ? key : toku_lt_infinity;
data_r = found ? data : toku_lt_infinity;
break;
}
case (DB_LAST): {
key_l = found ? key : toku_lt_neg_infinity;
data_l = found ? data : toku_lt_neg_infinity;
key_r = data_r = toku_lt_infinity;
break;
}
case (DB_SET_RANGE): {
key_l = saved_key;
data_l = toku_lt_neg_infinity;
key_r = found ? key : toku_lt_infinity;
data_r = found ? data : toku_lt_infinity;
break;
}
//TODO: #warning "Verify our understanding of DB_GET_BOTH_RANGE IS CORRECT HERE"
case (DB_GET_BOTH_RANGE): {
key_l = key_r = key;
data_l = saved_data;
data_r = found ? data : toku_lt_infinity;
break;
}
case (DB_NEXT):
case (DB_NEXT_NODUP): {
assert(!toku_c_uninitialized(c));
key_l = saved_key;
data_l = saved_data;
key_r = found ? key : toku_lt_infinity;
data_r = found ? data : toku_lt_infinity;
break;
}
case (DB_PREV):
case (DB_PREV_NODUP): {
assert(!toku_c_uninitialized(c));
key_l = found ? key : toku_lt_neg_infinity;
data_l = found ? data : toku_lt_neg_infinity;
key_r = saved_key;
data_r = saved_data;
break;
}
case (DB_NEXT_DUP): {
assert(!toku_c_uninitialized(c));
key_l = key_r = saved_key;
data_l = saved_data;
data_r = found ? data : toku_lt_infinity;
break;
}
#ifdef DB_PREV_DUP
case (DB_PREV_DUP): {
assert(!toku_c_uninitialized(c));
key_l = key_r = saved_key;
data_l = found ? data : toku_lt_neg_infinity;
data_r = saved_data;
break;
}
#endif
default: {
r = EINVAL;
lock = FALSE;
break;
}
}
if (lock) r = toku_lt_acquire_range_read_lock(db->i->lt, txn,
key_l, data_l,
key_r, data_r);
cleanup:
if (saved_key->data) toku_free(saved_key->data);
if (saved_data->data) toku_free(saved_data->data);
return r ? r : r_last;
}
static int toku_c_get_noassociate(DBC * c, DBT * key, DBT * data, u_int32_t flag) {
HANDLE_PANICKED_DB(c->dbp);
DBT saved_key;
DBT saved_data;
int r;
r = toku_c_get_pre_lock(c, key, data, &flag, &saved_key, &saved_data);
if (r!=0) return r;
TOKUTXN txn = c->i->txn ? c->i->txn->i->tokutxn : NULL;
r = toku_brt_cursor_get(c->i->c, key, data, flag, txn);
r = toku_c_get_post_lock(c, key, data, flag, r, &saved_key, &saved_data);
return r;
}
static int toku_c_del_noassociate(DBC * c, u_int32_t flags) {
HANDLE_PANICKED_DB(c->dbp);
int r = toku_brt_cursor_delete(c->i->c, flags, c->i->txn ? c->i->txn->i->tokutxn : 0);
return r;
}
static int toku_save_original_data(DBT* dst, DBT* src) {
int r;
*dst = *src;
......@@ -1113,19 +1341,19 @@ static int toku_c_pget(DBC * c, DBT *key, DBT *pkey, DBT *data, u_int32_t flag)
return r;
}
//Need to save all the original data.
r = toku_c_pget_save_original_data(&copied_key, o_key); if (r!=0) goto died0;
r = toku_save_original_data(&copied_key, o_key); if (r!=0) goto died0;
if (0) {
died1:
toku_free(key->data);
goto died0;
}
r = toku_c_pget_save_original_data(&copied_pkey, o_pkey); if (r!=0) goto died1;
r = toku_save_original_data(&copied_pkey, o_pkey); if (r!=0) goto died1;
if (0) {
died2:
toku_free(pkey->data);
goto died1;
}
r = toku_c_pget_save_original_data(&copied_data, o_data); if (r!=0) goto died2;
r = toku_save_original_data(&copied_data, o_data); if (r!=0) goto died2;
if (0) {
died3:
toku_free(data->data);
......@@ -1158,141 +1386,12 @@ static int toku_c_pget(DBC * c, DBT *key, DBT *pkey, DBT *data, u_int32_t flag)
static int toku_c_get(DBC * c, DBT * key, DBT * data, u_int32_t flag) {
DB *db = c->dbp;
DB_TXN *txn = c->i->txn;
HANDLE_PANICKED_DB(db);
u_int32_t get_flag = get_main_cursor_flag(flag);
int r;
if (db->i->primary==0) {
r = toku_c_get_noassociate(c, key, data, flag);
if (db->i->lt) {
if (r != DB_NOTFOUND && r != 0 && r != DB_KEYEMPTY) return r;
int r2 = 0;
switch (get_flag) {
case (DB_SET): {
if (r == DB_NOTFOUND) {
r2 = toku_lt_acquire_range_read_lock(db->i->lt, txn,
key, toku_lt_neg_infinity,
key, toku_lt_infinity);
}
else {
r2 = toku_lt_acquire_range_read_lock(db->i->lt, txn,
key, toku_lt_neg_infinity,
key, data);
}
break;
}
}
if (r2!=0) return r2;
}
}
/*
These should be done but were not tested prior to commit.
case (DB_CURRENT): {
// No locking necessary. You already own a lock by virtue
// of having a cursor pointing to this.
break;
}
case (DB_FIRST): {
int r2;
if (r == DB_NOTFOUND) {
r2 = toku_lt_acquire_range_read_lock(db->i->lt, txn,
toku_lt_neg_infinity, toku_lt_neg_infinity,
toku_lt_infinity, toku_lt_infinity);
}
else if (r == 0) {
r2 = toku_lt_acquire_range_read_lock(db->i->lt, txn,
toku_lt_neg_infinity, toku_lt_neg_infinity,
key, data);
}
else return r;
break;
}
case (DB_LAST): {
if (r == DB_NOTFOUND) {
r2 = toku_lt_acquire_range_read_lock(db->i->lt, txn,
toku_lt_neg_infinity, toku_lt_neg_infinity,
toku_lt_infinity, toku_lt_infinity);
}
else if (r == 0) {
r2 = toku_lt_acquire_range_read_lock(db->i->lt, txn,
key, data,
toku_lt_infinity, toku_lt_infinity);
}
else return r;
break;
}
case (DB_SET): {
if (r == DB_NOTFOUND) {
r2 = toku_lt_acquire_range_read_lock(db->i->lt, txn,
key, toku_lt_neg_infinity,
key, toku_lt_infinity);
}
else if (r == 0) {
r2 = toku_lt_acquire_range_read_lock(db->i->lt, txn,
key, toku_lt_neg_infinity,
key, data);
}
else return r;
break;
}
case (DB_GET_BOTH): {
if (r != DB_NOTFOUND && r != 0) return r;
r2 = toku_lt_acquire_read_lock(db->i->lt, txn, key, data);
break;
}
*/
/*
These are not ready and are just notes.
case (DB_GET_BOTH_RANGE): {
barf();
//Not ready yet.
break;
}
case (DB_NEXT): {
post_get_DB_NEXT:
//TODO: Need 'am I initialized' function, and if not, goto post_get_DB_FIRST
//TODO: Need get old data function. MUST BE CALLED BEFORE CGET
break;
}
case (DB_PREV): {
//TODO: Need 'am I initialized' function
//TODO: Need get old data function. MUST BE CALLED BEFORE CGET
break;
}
case (DB_SET_RANGE): {
//TODO: Need to save key_in
if (r == DB_NOTFOUND) {
r2 = toku_lt_acquire_range_read_lock(db->i->lt, txn,
key_in, toku_lt_neg_infinity,
toku_lt_infinity, toku_lt_infinity);
}
else if (r == 0) {
r2 = toku_lt_acquire_range_read_lock(db->i->lt, txn,
key_in, toku_lt_neg_infinity,
key, data);
}
else return r;
break;
}
case (DB_NEXT_NODUP): {
goto post_get_DB_NEXT;
}
case (DB_PREV_NODUP): {
goto post_get_DB_PREV;
}
case (DB_NEXT_DUP): {
//Not ready yet./not needed for MySQL.
barf();
break;
}
default: {
barf();
assert(FALSE);
}
*/
else {
// It's a c_get on a secondary.
DBT primary_key;
......@@ -1300,11 +1399,9 @@ These are not ready and are just notes.
/* It is an error to use the DB_GET_BOTH or DB_GET_BOTH_RANGE flag on a
* cursor that has been opened on a secondary index handle.
*/
if ((get_flag == DB_GET_BOTH)
#ifdef DB_GET_BOTH_RANGE
|| (get_flag == DB_GET_BOTH_RANGE)
#endif
) return EINVAL;
u_int32_t get_flag = get_main_cursor_flag(flag);
if ((get_flag == DB_GET_BOTH) ||
(get_flag == DB_GET_BOTH_RANGE)) return EINVAL;
memset(&primary_key, 0, sizeof(primary_key));
r = toku_c_pget(c, key, &primary_key, data, flag);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment