Commit 9abb92dc authored by Yoni Fogel's avatar Yoni Fogel

Addresses #394

Merge branch tokudb.394 back into trunk

git-svn-id: file:///svn/tokudb@3841 c7de825b-a66e-492c-adef-691d508d4ae1
parent fd51763c
......@@ -2506,13 +2506,35 @@ int show_brt_blocknumbers (BRT brt) {
#endif
int toku_brt_dbt_set_key(BRT brt, DBT *ybt, bytevec val, ITEMLEN vallen) {
int r = toku_dbt_set_value(ybt, val, vallen, &brt->skey);
int toku_brt_dbt_set_both(BRT brt, DBT* key, DBT* key_source,
DBT* val, DBT* val_source) {
int r = toku_dbt_set_two_values(key, key_source->data, key_source->size, &brt->skey,
val, val_source->data, val_source->size, &brt->sval);
return r;
}
int toku_brt_dbt_set_value(BRT brt, DBT *ybt, bytevec val, ITEMLEN vallen) {
int r = toku_dbt_set_value(ybt, val, vallen, &brt->sval);
int toku_brt_dbt_set_three(BRT brt_primary, BRT brt_secondary,
DBT* key, DBT* key_source,
DBT* pkey, DBT* pkey_source,
DBT* val, DBT* val_source) {
int r = toku_dbt_set_three_values(key, key_source->data, key_source->size, &brt_secondary->skey,
pkey, pkey_source->data, pkey_source->size, &brt_secondary->sval,
val, val_source->data, val_source->size, &brt_primary->sval);
return r;
}
int toku_brt_dbt_set(DBT* key, DBT* key_source) {
int r = toku_dbt_set_value(key, key_source->data, key_source->size, NULL);
return r;
}
int toku_brt_dbt_set_key(BRT brt, DBT* key, DBT* key_source) {
int r = toku_dbt_set_value(key, key_source->data, key_source->size, &brt->skey);
return r;
}
int toku_brt_dbt_set_val(BRT brt, DBT* val, DBT* val_source) {
int r = toku_dbt_set_value(val, val_source->data, val_source->size, &brt->sval);
return r;
}
......@@ -2685,11 +2707,16 @@ static int brt_search_leaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT
}
}
got_a_good_value:
if (newkey) {
if (newkey && newval) {
r = toku_dbt_set_two_values(newkey, le_latest_key(le), le_latest_keylen(le), &brt->skey,
newval, le_latest_val(le), le_latest_vallen(le), &brt->sval);
if (r!=0) return r;
}
else if (newkey) {
r = toku_dbt_set_value(newkey, le_latest_key(le), le_latest_keylen(le), &brt->skey);
if (r!=0) return r;
}
if (newval) {
else if (newval) {
r = toku_dbt_set_value(newval, le_latest_val(le), le_latest_vallen(le), &brt->sval);
if (r!=0) return r;
}
......@@ -2776,6 +2803,11 @@ static inline void brt_cursor_set_key_val(BRT_CURSOR cursor, DBT *newkey, DBT *n
cursor->val = *newval; memset(newval, 0, sizeof *newval);
}
/* Used to restore the state of a cursor. */
void brt_cursor_set_key_val_manually(BRT_CURSOR cursor, DBT* key, DBT* val) {
brt_cursor_set_key_val(cursor, key, val);
}
int toku_brt_cursor (BRT brt, BRT_CURSOR *cursorptr, int is_temporary_cursor) {
BRT_CURSOR cursor = toku_malloc(sizeof *cursor);
if (cursor == 0)
......@@ -2816,13 +2848,38 @@ static inline int compare_kv_xy(BRT brt, DBT *k, DBT *v, DBT *x, DBT *y) {
static inline int brt_cursor_copyout(BRT_CURSOR cursor, DBT *key, DBT *val) {
int r = 0;
if (key)
r = toku_dbt_set_value(key, cursor->key.data, cursor->key.size, cursor->is_temporary_cursor ? &cursor->brt->skey : &cursor->skey);
if (r == 0 && val)
r = toku_dbt_set_value(val, cursor->val.data, cursor->val.size, cursor->is_temporary_cursor ? &cursor->brt->sval : &cursor->sval);
void** key_staticp = cursor->is_temporary_cursor ? &cursor->brt->skey : &cursor->skey;
void** val_staticp = cursor->is_temporary_cursor ? &cursor->brt->sval : &cursor->sval;
r = toku_dbt_set_two_values(key, cursor->key.data, cursor->key.size, key_staticp,
val, cursor->val.data, cursor->val.size, val_staticp);
return r;
}
int toku_brt_cursor_copyout(BRT_CURSOR cursor, DBT *key, DBT *val) {
int r = brt_cursor_copyout(cursor, key, val);
return r;
}
/* Used to save the state of a cursor. */
int brt_cursor_save_key_val(BRT_CURSOR cursor, DBT* key, DBT* val) {
if (brt_cursor_not_set(cursor)) {
if (key) { *key = cursor->key; }
if (val) { *val = cursor->val; }
return 0;
}
else {
assert(!key || key->flags == DB_DBT_MALLOC);
assert(!val || val->flags == DB_DBT_MALLOC);
int r;
if ((r = brt_cursor_copyout(cursor, key, val))) { return r; }
/* An initialized cursor cannot have NULL key->data or
* val->data. */
assert(key==NULL || key->data!=NULL);
assert(val==NULL || val->data!=NULL);
return 0;
}
}
static int brt_cursor_compare_set(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context;
return compare_kv_xy(brt, search->k, search->v, x, y) <= 0; /* return min xy: kv <= xy */
......
......@@ -59,8 +59,16 @@ int brtenv_checkpoint (BRTENV env);
extern int toku_brt_do_push_cmd; // control whether push occurs eagerly.
int toku_brt_dbt_set_key (BRT, DBT*, bytevec val, ITEMLEN vallen);
int toku_brt_dbt_set_value (BRT, DBT*, bytevec val, ITEMLEN vallen);
int toku_brt_dbt_set_three(BRT brt_primary, BRT brt_secondary,
DBT* key, DBT* key_source,
DBT* pkey, DBT* pkey_source,
DBT* val, DBT* val_source);
int toku_brt_dbt_set_both(BRT brt, DBT* key, DBT* key_source,
DBT* val, DBT* val_source);
int toku_brt_dbt_set_key(BRT brt, DBT* key, DBT* key_source);
int toku_brt_dbt_set_val(BRT brt, DBT* val, DBT* val_source);
int toku_brt_dbt_set(DBT* key, DBT* key_source);
int toku_brt_cursor_copyout(BRT_CURSOR cursor, DBT *key, DBT *val);
int toku_brt_get_fd(BRT, int *);
......
......@@ -150,10 +150,8 @@ static int toku_pma_cursor_get_current(PMA_CURSOR cursor, DBT *key, DBT *val, in
if (r != 0)
return r;
if (key)
r = toku_dbt_set_value(key, cursor->key.data, cursor->key.size, cursor->sskey);
if (val && r == 0)
r = toku_dbt_set_value(val, cursor->val.data, cursor->val.size, cursor->ssval);
r = toku_dbt_set_two_values(key, cursor->key.data, cursor->key.size, cursor->sskey,
val, cursor->val.data, cursor->val.size, cursor->ssval);
return r;
}
......
......@@ -16,44 +16,115 @@ DBT *toku_fill_dbt(DBT *dbt, bytevec k, ITEMLEN len) {
return dbt;
}
int toku_dbt_set_value (DBT *ybt, bytevec val, ITEMLEN vallen, void **staticptrp) {
static inline int dbt_set_preprocess(DBT* ybt, ITEMLEN len, void** staticptrp, void** tmp_data) {
int r = ENOSYS;
if (ybt->flags==DB_DBT_MALLOC) {
domalloc:
ybt->data = toku_malloc((size_t)vallen);
if (!ybt->data && vallen > 0) { r = errno; goto cleanup; }
} else if (ybt->flags==DB_DBT_REALLOC) {
if (ybt->data==0) goto domalloc;
/* tmp is used to prevent a memory leak if realloc fails */
void* tmp = toku_realloc(ybt->data, (size_t)vallen);
if (!tmp && vallen > 0) { r = errno; goto cleanup; }
ybt->data = tmp;
} else if (ybt->flags==DB_DBT_USERMEM) {
ybt->size = vallen;
if (ybt->ulen < vallen) { r = DB_BUFFER_SMALL; goto cleanup; }
} else {
if (staticptrp==0) return -1;
void *staticptr=*staticptrp;
//void *old=staticptr;
if (staticptr==0) {
staticptr = toku_malloc((size_t)vallen);
if (!staticptr && vallen > 0) { r = errno; goto cleanup; }
} else {
/* tmp is used to prevent a memory leak if realloc fails */
void* tmp = toku_realloc(staticptr, vallen);
if (!tmp && vallen > 0) { r = errno; goto cleanup; }
staticptr = tmp;
}
//if (old!=staticptr) printf("%s:%d MALLOC --> %p\n", __FILE__, __LINE__, staticptr);
*staticptrp = staticptr;
ybt->data = vallen > 0 ? staticptr : 0;
}
ybt->size = vallen;
if (ybt->size>0) {
memcpy(ybt->data, val, (size_t)vallen);
if (ybt) {
if (ybt->flags==DB_DBT_USERMEM) {
if (ybt->ulen < len) {
ybt->size = len;
r = DB_BUFFER_SMALL;
goto cleanup;
}
}
else if (ybt->flags==DB_DBT_MALLOC || ybt->flags==DB_DBT_REALLOC || ybt->flags==0) {
if (ybt->flags==0 && staticptrp==NULL) { r = -1; goto cleanup; }
*tmp_data = toku_malloc(len);
if (!*tmp_data && len > 0) { r = errno; goto cleanup; }
}
else { r = EINVAL; goto cleanup; }
}
r = 0;
cleanup:
return r;
}
static inline void dbt_set_copy(DBT* ybt, bytevec data, ITEMLEN len, void** staticptrp, void* tmp_data) {
if (ybt) {
if (ybt->flags==DB_DBT_REALLOC && ybt->data) toku_free(ybt->data);
else if (ybt->flags==0) {
toku_free(*staticptrp);
*staticptrp = tmp_data;
}
if (ybt->flags!=DB_DBT_USERMEM) {
if (ybt->flags!=0 || len>0) ybt->data = tmp_data;
else ybt->data = NULL;
}
if ((ybt->size = len) > 0) memcpy(ybt->data, data, (size_t)len);
}
}
/* Atomically set three dbts, such that they either both succeed, or
* there is no side effect. */
int toku_dbt_set_three_values(
DBT* ybt1, bytevec ybt1_data, ITEMLEN ybt1_len, void** ybt1_staticptrp,
DBT* ybt2, bytevec ybt2_data, ITEMLEN ybt2_len, void** ybt2_staticptrp,
DBT* ybt3, bytevec ybt3_data, ITEMLEN ybt3_len, void** ybt3_staticptrp) {
int r = ENOSYS;
void* tmp_ybt1_data = NULL;
void* tmp_ybt2_data = NULL;
void* tmp_ybt3_data = NULL;
/* Do all mallocs and check for all possible errors. */
if ((r = dbt_set_preprocess(ybt1, ybt1_len, ybt1_staticptrp, &tmp_ybt1_data))) goto cleanup;
if ((r = dbt_set_preprocess(ybt2, ybt2_len, ybt2_staticptrp, &tmp_ybt2_data))) goto cleanup;
if ((r = dbt_set_preprocess(ybt3, ybt3_len, ybt3_staticptrp, &tmp_ybt3_data))) goto cleanup;
/* Copy/modify atomically the dbts. */
dbt_set_copy(ybt1, ybt1_data, ybt1_len, ybt1_staticptrp, tmp_ybt1_data);
dbt_set_copy(ybt2, ybt2_data, ybt2_len, ybt2_staticptrp, tmp_ybt2_data);
dbt_set_copy(ybt3, ybt3_data, ybt3_len, ybt3_staticptrp, tmp_ybt3_data);
r = 0;
cleanup:
if (r!=0) {
if (tmp_ybt1_data) toku_free(tmp_ybt1_data);
if (tmp_ybt2_data) toku_free(tmp_ybt2_data);
if (tmp_ybt3_data) toku_free(tmp_ybt3_data);
}
return r;
}
/* Atomically set two dbts, such that they either both succeed, or
* there is no side effect. */
int toku_dbt_set_two_values(
DBT* ybt1, bytevec ybt1_data, ITEMLEN ybt1_len, void** ybt1_staticptrp,
DBT* ybt2, bytevec ybt2_data, ITEMLEN ybt2_len, void** ybt2_staticptrp) {
int r = ENOSYS;
void* tmp_ybt1_data = NULL;
void* tmp_ybt2_data = NULL;
/* Do all mallocs and check for all possible errors. */
if ((r = dbt_set_preprocess(ybt1, ybt1_len, ybt1_staticptrp, &tmp_ybt1_data))) goto cleanup;
if ((r = dbt_set_preprocess(ybt2, ybt2_len, ybt2_staticptrp, &tmp_ybt2_data))) goto cleanup;
/* Copy/modify atomically the dbts. */
dbt_set_copy(ybt1, ybt1_data, ybt1_len, ybt1_staticptrp, tmp_ybt1_data);
dbt_set_copy(ybt2, ybt2_data, ybt2_len, ybt2_staticptrp, tmp_ybt2_data);
r = 0;
cleanup:
if (r!=0) {
if (tmp_ybt1_data) toku_free(tmp_ybt1_data);
if (tmp_ybt2_data) toku_free(tmp_ybt2_data);
}
return r;
}
int toku_dbt_set_value(DBT* ybt1, bytevec ybt1_data, ITEMLEN ybt1_len, void** ybt1_staticptrp) {
int r = ENOSYS;
void* tmp_ybt1_data = NULL;
/* Do all mallocs and check for all possible errors. */
if ((r = dbt_set_preprocess(ybt1, ybt1_len, ybt1_staticptrp, &tmp_ybt1_data))) goto cleanup;
/* Copy/modify atomically the dbts. */
dbt_set_copy(ybt1, ybt1_data, ybt1_len, ybt1_staticptrp, tmp_ybt1_data);
r = 0;
cleanup:
if (r!=0) {
if (tmp_ybt1_data) toku_free(tmp_ybt1_data);
}
return r;
}
......@@ -11,6 +11,12 @@
DBT* toku_init_dbt (DBT *);
DBT *toku_fill_dbt(DBT *dbt, bytevec k, ITEMLEN len);
int toku_dbt_set_value (DBT *, bytevec val, ITEMLEN vallen, void **staticptrp);
int toku_dbt_set_two_values(DBT* key, bytevec key_data, ITEMLEN key_len, void** key_staticptrp,
DBT* val, bytevec val_data, ITEMLEN val_len, void** val_staticptrp);
int toku_dbt_set_three_values(
DBT* ybt1, bytevec ybt1_data, ITEMLEN ybt1_len, void** ybt1_staticptrp,
DBT* ybt2, bytevec ybt2_data, ITEMLEN ybt2_len, void** ybt2_staticptrp,
DBT* ybt3, bytevec ybt3_data, ITEMLEN ybt3_len, void** ybt3_staticptrp);
......
......@@ -96,7 +96,6 @@ static int toku_c_pget(DBC * c, DBT *key, DBT *pkey, DBT *data, u_int32_t flag);
static int toku_c_del(DBC *c, u_int32_t flags);
static int toku_c_count(DBC *cursor, db_recno_t *count, u_int32_t flags);
static int toku_c_close(DBC * c);
static int toku_save_original_data(DBT* dst, DBT* src);
/* misc */
static char *construct_full_name(const char *dir, const char *fname);
......@@ -1111,31 +1110,58 @@ static inline DB_TXN* toku_txn_ancestor(DB_TXN* txn) {
static int toku_txn_add_lt(DB_TXN* txn, toku_lock_tree* lt);
static void toku_c_get_fix_flags(DBC* c, u_int32_t* flag) {
assert(c && flag);
DB* db = c->dbp;
u_int32_t get_flag = get_main_cursor_flag(*flag);
unsigned int brtflags;
toku_brt_get_flags(db->i->brt, &brtflags);
BOOL duplicates = (brtflags & TOKU_DB_DUPSORT) != 0;
switch (get_flag) {
case (DB_NEXT): {
if (!duplicates) {
toku_swap_flag(flag, &get_flag, DB_NEXT_NODUP);
/* c_get has many subfunctions with lots of parameters
* this structure exists to simplify it. */
typedef struct {
DBC* c; // The cursor
DB* db; // db the cursor is iterating over
DB_TXN* txn_anc; // The (root) ancestor of the transaction
TXNID id_anc;
DBT cursor_key; // Original position of cursor (key portion)
DBT cursor_val; // Original position of cursor (val portion)
DBT tmp_key; // Temporary key to protect out param
DBT tmp_val; // Temporary val to protect out param
DBT tmp_dat; // Temporary data val to protect out param
u_int32_t flag; // The c_get flag
u_int32_t op; // The operation portion of the c_get flag
BOOL cursor_is_write; // Whether op can change position of cursor
BOOL cursor_was_saved; // Whether we saved the cursor yet.
BOOL key_is_read;
BOOL key_is_write;
BOOL val_is_read;
BOOL val_is_write;
BOOL dat_is_read;
BOOL dat_is_write;
BOOL duplicates;
BOOL cursor_malloced;
BOOL tmp_key_malloced;
BOOL tmp_val_malloced;
BOOL tmp_dat_malloced;
} C_GET_VARS;
static void toku_c_get_fix_flags(C_GET_VARS* g) {
g->op = get_main_cursor_flag(g->flag);
switch (g->op) {
case (DB_NEXT):
case (DB_NEXT_NODUP): {
if (toku_c_uninitialized(g->c)) toku_swap_flag(&g->flag, &g->op, DB_FIRST);
else if (!g->duplicates && g->op == DB_NEXT) {
toku_swap_flag(&g->flag, &g->op, DB_NEXT_NODUP);
}
break;
}
case (DB_PREV): {
if (!duplicates) {
toku_swap_flag(flag, &get_flag, DB_PREV_NODUP);
case (DB_PREV):
case (DB_PREV_NODUP): {
if (toku_c_uninitialized(g->c)) toku_swap_flag(&g->flag, &g->op, DB_LAST);
else if (!g->duplicates && g->op == DB_PREV) {
toku_swap_flag(&g->flag, &g->op, DB_PREV_NODUP);
}
break;
}
case (DB_GET_BOTH_RANGE): {
if (!duplicates) {
toku_swap_flag(flag, &get_flag, DB_GET_BOTH);
if (!g->duplicates) {
toku_swap_flag(&g->flag, &g->op, DB_GET_BOTH);
}
break;
}
......@@ -1145,214 +1171,357 @@ static void toku_c_get_fix_flags(DBC* c, u_int32_t* flag) {
}
}
static int toku_c_get_pre_lock(DBC* c, DBT* key, DBT* data, u_int32_t* flag,
DBT* saved_key, DBT* saved_data) {
assert(saved_key && saved_data && flag);
DB* db = c->dbp;
if (!db->i->lt) return 0;
saved_key->data = NULL;
saved_data->data = NULL;
DB_TXN* txn = c->i->txn;
static int toku_c_get_pre_acquire_lock_if_possible(C_GET_VARS* g, DBT* key, DBT* data) {
int r = ENOSYS;
toku_lock_tree* lt = g->db->i->lt;
if (!lt) { r = 0; goto cleanup; }
u_int32_t get_flag = get_main_cursor_flag(*flag);
unsigned int brtflags;
toku_brt_get_flags(db->i->brt, &brtflags);
BOOL duplicates = (brtflags & TOKU_DB_DUPSORT) != 0;
DB_TXN* txn_anc = NULL;
TXNID id_anc = 0;
/* We know what to lock ahead of time. */
if (g->op == DB_GET_BOTH ||
(g->op == DB_SET && !g->duplicates)) {
r = toku_txn_add_lt(g->txn_anc, lt);
if (r!=0) goto cleanup;
r = toku_lt_acquire_read_lock(lt, g->db, g->id_anc, key, data);
if (r!=0) goto cleanup;
}
r = 0;
cleanup:
return r;
}
int r = 0;
switch (get_flag) {
case (DB_CURRENT):
case (DB_SET):
case (DB_FIRST):
case (DB_LAST): {
/* The above cases have all their code in toku_c_get_post_lock. */
static int toku_c_get_describe_inputs(C_GET_VARS* g) {
int r = ENOSYS;
/* Default is FALSE. */
switch (g->op) {
case DB_SET:
g->key_is_read = TRUE;
g->val_is_write = TRUE;
break;
}
case (DB_GET_BOTH): {
txn_anc = toku_txn_ancestor(txn);
r = toku_txn_add_lt(txn_anc, db->i->lt);
if (r!=0) return r;
id_anc = toku_txn_get_txnid(txn_anc->i->tokutxn);
r = toku_lt_acquire_read_lock(db->i->lt, db, id_anc, key, data);
case DB_SET_RANGE:
g->key_is_read = TRUE;
g->key_is_write = TRUE;
g->val_is_write = TRUE;
break;
}
case (DB_SET_RANGE): {
r = toku_save_original_data(saved_key, key);
case DB_GET_BOTH:
g->key_is_read = TRUE;
g->val_is_read = TRUE;
break;
}
case (DB_GET_BOTH_RANGE): {
assert(duplicates);
r = toku_save_original_data(saved_data, data);
case DB_GET_BOTH_RANGE:
assert(g->duplicates);
g->key_is_read = TRUE;
g->val_is_read = TRUE;
g->val_is_write = TRUE;
break;
}
case (DB_NEXT):
case (DB_NEXT_NODUP): {
r = toku_uninitialized_swap(c, saved_key, saved_data, flag,
&get_flag, DB_FIRST);
case DB_CURRENT:
case DB_FIRST:
case DB_LAST:
case DB_NEXT:
case DB_NEXT_NODUP:
case DB_NEXT_DUP:
case DB_PREV:
case DB_PREV_NODUP:
#if defined(DB_PREV_DUP)
case DB_PREV_DUP:
#endif
g->key_is_write = TRUE;
g->val_is_write = TRUE;
break;
default:
r = EINVAL;
goto cleanup;
}
case (DB_PREV):
case (DB_PREV_NODUP): {
r = toku_uninitialized_swap(c, saved_key, saved_data, flag,
&get_flag, DB_LAST);
break;
if (g->op != DB_CURRENT) g->cursor_is_write = TRUE;
r = 0;
cleanup:
return r;
}
static int toku_c_pget_describe_inputs(C_GET_VARS* g) {
int r = toku_c_get_describe_inputs(g);
g->dat_is_read = FALSE;
g->dat_is_write = TRUE;
return r;
}
static int toku_c_pget_save_inputs(C_GET_VARS* g, DBT* key, DBT* val, DBT* dat) {
int r = ENOSYS;
/*
* Readonly: Copy original struct
* Writeonly: Set to 0 (already done), DB_DBT_REALLOC, copy back later.
* ReadWrite: Make a copy of original in new memory, copy back later
* */
if (key) {
assert(g->key_is_read || g->key_is_write);
if (g->key_is_read && !g->key_is_write) g->tmp_key = *key;
else {
/* g->key_is_write */
g->tmp_key.flags = DB_DBT_REALLOC;
if (g->key_is_read &&
(r = toku_brt_dbt_set(&g->tmp_key, key))) goto cleanup;
g->tmp_key_malloced = TRUE;
}
#ifdef DB_PREV_DUP
case (DB_PREV_DUP):
#endif
case (DB_NEXT_DUP): {
if (!duplicates || toku_c_uninitialized(c)) r = EINVAL;
else r = toku_c_get_current_unconditional(c, saved_key, saved_data);
break;
}
default: {
//TODO: Output an error.
r = EINVAL;
break;
if (val) {
assert(g->val_is_read || g->val_is_write);
if (g->val_is_read && !g->val_is_write) g->tmp_val = *val;
else {
/* g->val_is_write */
g->tmp_val.flags = DB_DBT_REALLOC;
if (g->val_is_read &&
(r = toku_brt_dbt_set(&g->tmp_val, val))) goto cleanup;
g->tmp_val_malloced = TRUE;
}
}
if (dat) {
assert(g->dat_is_read || g->dat_is_write);
if (g->dat_is_read && !g->dat_is_write) g->tmp_dat = *dat;
else {
/* g->dat_is_write */
g->tmp_dat.flags = DB_DBT_REALLOC;
if (g->dat_is_read &&
(r = toku_brt_dbt_set(&g->tmp_dat, dat))) goto cleanup;
g->tmp_dat_malloced = TRUE;
}
}
r = 0;
cleanup:
if (r!=0) {
/* Cleanup memory allocated if necessary. */
if (g->tmp_key.data && g->tmp_key_malloced) toku_free(g->tmp_key.data);
if (g->tmp_val.data && g->tmp_val_malloced) toku_free(g->tmp_val.data);
if (g->tmp_dat.data && g->tmp_dat_malloced) toku_free(g->tmp_dat.data);
}
return r;
}
static int toku_c_get_post_lock(DBC* c, DBT* key, DBT* data, u_int32_t flag,
int r_last, DBT* saved_key, DBT* saved_data) {
assert(saved_key && saved_data);
DB* db = c->dbp;
if (!db->i->lt) return r_last;
int r = 0;
if (r_last && r_last != DB_NOTFOUND && r_last != DB_KEYEMPTY) {
r = r_last;
goto cleanup;
}
static int toku_c_get_save_inputs(C_GET_VARS* g, DBT* key, DBT* val) {
int r = toku_c_pget_save_inputs(g, key, val, NULL);
return r;
}
DB_TXN* txn = c->i->txn;
u_int32_t get_flag = get_main_cursor_flag(flag);
if (r_last == DB_KEYEMPTY) {
assert(get_flag == DB_CURRENT);
return r_last;
}
assert(r_last == DB_NOTFOUND || r_last == 0);
BOOL found = r_last == 0;
static int toku_c_get_post_lock(C_GET_VARS* g, BOOL found, DBT* orig_key, DBT* orig_val) {
int r = ENOSYS;
toku_lock_tree* lt = g->db->i->lt;
if (!lt) { r = 0; goto cleanup; }
BOOL lock = TRUE;
const DBT* key_l;
const DBT* key_r;
const DBT* data_l;
const DBT* data_r;
switch (get_flag) {
case (DB_CURRENT): {
const DBT* val_l;
const DBT* val_r;
switch (g->op) {
case (DB_CURRENT):
/* No locking necessary. You already own a lock by virtue
of having a cursor pointing to this. */
lock = FALSE;
break;
}
case (DB_SET): {
key_l = key_r = key;
data_l = toku_lt_neg_infinity;
data_r = found ? data : toku_lt_infinity;
case (DB_SET):
if (!g->duplicates) {
/* We acquired the lock before the cursor op. */
lock = FALSE;
break;
}
case (DB_GET_BOTH): {
/* All done in toku_c_get_pre_lock. */
key_l = key_r = &g->tmp_key;
val_l = toku_lt_neg_infinity;
val_r = found ? &g->tmp_val : toku_lt_infinity;
break;
case (DB_GET_BOTH):
/* We acquired the lock before the cursor op. */
lock = FALSE;
break;
}
case (DB_FIRST): {
key_l = data_l = toku_lt_neg_infinity;
key_r = found ? key : toku_lt_infinity;
data_r = found ? data : toku_lt_infinity;
case (DB_FIRST):
key_l = val_l = toku_lt_neg_infinity;
key_r = found ? &g->tmp_key : toku_lt_infinity;
val_r = found ? &g->tmp_val : toku_lt_infinity;
break;
}
case (DB_LAST): {
key_l = found ? key : toku_lt_neg_infinity;
data_l = found ? data : toku_lt_neg_infinity;
key_r = data_r = toku_lt_infinity;
case (DB_LAST):
key_l = found ? &g->tmp_key : toku_lt_neg_infinity;
val_l = found ? &g->tmp_val : toku_lt_neg_infinity;
key_r = val_r = toku_lt_infinity;
break;
}
case (DB_SET_RANGE): {
key_l = saved_key;
data_l = toku_lt_neg_infinity;
key_r = found ? key : toku_lt_infinity;
data_r = found ? data : toku_lt_infinity;
case (DB_SET_RANGE):
key_l = orig_key;
val_l = toku_lt_neg_infinity;
key_r = found ? &g->tmp_key : toku_lt_infinity;
val_r = found ? &g->tmp_val : toku_lt_infinity;
break;
}
case (DB_GET_BOTH_RANGE): {
key_l = key_r = key;
data_l = saved_data;
data_r = found ? data : toku_lt_infinity;
case (DB_GET_BOTH_RANGE):
key_l = key_r = &g->tmp_key;
val_l = orig_val;
val_r = found ? &g->tmp_val : toku_lt_infinity;
break;
}
case (DB_NEXT):
case (DB_NEXT_NODUP): {
assert(!toku_c_uninitialized(c));
key_l = saved_key;
data_l = saved_data;
key_r = found ? key : toku_lt_infinity;
data_r = found ? data : toku_lt_infinity;
case (DB_NEXT_NODUP):
assert(!toku_c_uninitialized(g->c));
key_l = &g->cursor_key;
val_l = &g->cursor_val;
key_r = found ? &g->tmp_key : toku_lt_infinity;
val_r = found ? &g->tmp_val : toku_lt_infinity;
break;
}
case (DB_PREV):
case (DB_PREV_NODUP): {
assert(!toku_c_uninitialized(c));
key_l = found ? key : toku_lt_neg_infinity;
data_l = found ? data : toku_lt_neg_infinity;
key_r = saved_key;
data_r = saved_data;
case (DB_PREV_NODUP):
assert(!toku_c_uninitialized(g->c));
key_l = found ? &g->tmp_key : toku_lt_neg_infinity;
val_l = found ? &g->tmp_val : toku_lt_neg_infinity;
key_r = &g->cursor_key;
val_r = &g->cursor_val;
break;
}
case (DB_NEXT_DUP): {
assert(!toku_c_uninitialized(c));
key_l = key_r = saved_key;
data_l = saved_data;
data_r = found ? data : toku_lt_infinity;
case (DB_NEXT_DUP):
assert(!toku_c_uninitialized(g->c));
key_l = key_r = &g->cursor_key;
val_l = &g->cursor_val;
val_r = found ? &g->tmp_val : toku_lt_infinity;
break;
}
#ifdef DB_PREV_DUP
case (DB_PREV_DUP): {
assert(!toku_c_uninitialized(c));
key_l = key_r = saved_key;
data_l = found ? data : toku_lt_neg_infinity;
data_r = saved_data;
case (DB_PREV_DUP):
assert(!toku_c_uninitialized(g->c));
key_l = key_r = &g->cursor_key;
val_l = found ? &g->tmp_val : toku_lt_neg_infinity;
val_r = &g->cursor_val;
break;
}
#endif
default: {
default:
r = EINVAL;
lock = FALSE;
break;
}
goto cleanup;
}
if (lock) {
DB_TXN* txn_anc = toku_txn_ancestor(txn);
r = toku_txn_add_lt(txn_anc, db->i->lt);
if (r!=0) { goto cleanup; }
TXNID id_anc = toku_txn_get_txnid(txn_anc->i->tokutxn);
r = toku_lt_acquire_range_read_lock(db->i->lt, db, id_anc,
key_l, data_l,
key_r, data_r);
if ((r = toku_txn_add_lt(g->txn_anc, lt))) goto cleanup;
r = toku_lt_acquire_range_read_lock(lt, g->db, g->id_anc, key_l, val_l,
key_r, val_r);
if (r!=0) goto cleanup;
}
r = 0;
cleanup:
if (saved_key->data) toku_free(saved_key->data);
if (saved_data->data) toku_free(saved_data->data);
return r ? r : r_last;
return r;
}
/* Used to save the state of a cursor. */
int brt_cursor_save_key_val(BRT_CURSOR cursor, DBT* key, DBT* val);
/* Used to restore the state of a cursor. */
void brt_cursor_set_key_val_manually(BRT_CURSOR cursor, DBT* key, DBT* val);
static int toku_c_get_noassociate(DBC * c, DBT * key, DBT * data, u_int32_t flag) {
HANDLE_PANICKED_DB(c->dbp);
DBT saved_key;
DBT saved_data;
static int toku_c_get_save_cursor(C_GET_VARS* g) {
int r = ENOSYS;
if (!g->cursor_is_write) { r = 0; goto cleanup; }
if (!toku_c_uninitialized(g->c)) {
g->cursor_key.flags = DB_DBT_MALLOC;
g->cursor_val.flags = DB_DBT_MALLOC;
}
if ((r = brt_cursor_save_key_val(g->c->i->c, &g->cursor_key, &g->cursor_val))) goto cleanup;
if (!toku_c_uninitialized(g->c)) g->cursor_malloced = TRUE;
g->cursor_was_saved = TRUE;
r = 0;
cleanup:
return r;
}
int r;
toku_c_get_fix_flags(c, &flag);
r = toku_c_get_pre_lock(c, key, data, &flag, &saved_key, &saved_data);
if (r!=0) return r;
static int toku_c_pget_save_cursor(C_GET_VARS* g) {
return toku_c_get_save_cursor(g);
}
static int toku_c_pget_assign_outputs(C_GET_VARS* g, DBT* key, DBT* val, DBT* dat) {
int r = ENOSYS;
DBT* write_key = g->key_is_write ? key : NULL;
DBT* write_val = g->val_is_write ? val : NULL;
DBT* write_dat = g->dat_is_write ? dat : NULL;
/* In the case of a non-associated database, we call it a
* 'secondary' here. */
DB* pdb = g->db->i->primary;
BRT primary = pdb ? pdb->i->brt : NULL;
BRT secondary = g->db->i->brt;
r = toku_brt_dbt_set_three(primary, secondary,
write_key, &g->tmp_key,
write_val, &g->tmp_val,
write_dat, &g->tmp_dat);
if (r!=0) goto cleanup;
r = 0;
cleanup:
return r;
}
static int toku_c_get_assign_outputs(C_GET_VARS* g, DBT* key, DBT* val) {
int r = ENOSYS;
DBT* write_key = g->key_is_write ? key : NULL;
DBT* write_val = g->val_is_write ? val : NULL;
r = toku_brt_dbt_set_both(g->db->i->brt,
write_key, &g->tmp_key,
write_val, &g->tmp_val);
if (r!=0) goto cleanup;
r = 0;
cleanup:
return r;
}
static int toku_c_get_noassociate(DBC * c, DBT * key, DBT * val, u_int32_t flag) {
HANDLE_PANICKED_DB(c->dbp);
int r = ENOSYS;
int r_cursor_op = 0;
C_GET_VARS g;
memset(&g, 0, sizeof(g));
/* Initialize variables. */
g.c = c;
g.db = c->dbp;
g.flag = flag;
if (c->i->txn) {
g.txn_anc = toku_txn_ancestor(c->i->txn);
g.id_anc = toku_txn_get_txnid(g.txn_anc->i->tokutxn);
}
unsigned int brtflags;
toku_brt_get_flags(g.db->i->brt, &brtflags);
g.duplicates = (brtflags & TOKU_DB_DUPSORT) != 0;
/* Standardize the op flag. */
toku_c_get_fix_flags(&g);
/* If we know what to lock before the cursor op, lock now. */
if ((r = toku_c_get_pre_acquire_lock_if_possible(&g, key, val))) goto cleanup;
/* Determine whether the key and val parameters are read, write,
* or both. */
if ((r = toku_c_get_describe_inputs(&g))) goto cleanup;
/* Save the cursor position if the op can modify the cursor position. */
if ((r = toku_c_get_save_cursor(&g))) goto cleanup;
/* Save key and value to temporary local versions. */
if ((r = toku_c_get_save_inputs(&g, key, val))) goto cleanup;
/* Run the cursor operation on the brt. */
TOKUTXN txn = c->i->txn ? c->i->txn->i->tokutxn : NULL;
r = toku_brt_cursor_get(c->i->c, key, data, flag, txn);
r = toku_c_get_post_lock(c, key, data, flag, r, &saved_key, &saved_data);
r_cursor_op = r = toku_brt_cursor_get(c->i->c, &g.tmp_key, &g.tmp_val, g.flag, txn);
/* Only DB_CURRENT should possibly retun DB_KEYEMPTY,
* and DB_CURRENT requires no locking. */
if (r==DB_KEYEMPTY) { assert(g.op==DB_CURRENT); goto cleanup; }
/* If we do not find what the query wants, a lock can still fail
* 'first'. */
if (r!=0 && r!=DB_NOTFOUND) goto cleanup;
/* If we have not yet locked, lock now. */
BOOL found = r_cursor_op==0;
r = toku_c_get_post_lock(&g, found, key, val);
if (r!=0) goto cleanup;
/* if found, write the outputs to the output parameters. */
if (found && (r = toku_c_get_assign_outputs(&g, key, val))) goto cleanup;
r = r_cursor_op;
cleanup:
if (g.cursor_was_saved && g.cursor_malloced) {
/* We saved the cursor. We either need to restore it, or free
* the saved version. */
if (r!=0 && r!=DB_NOTFOUND) {
/* Failure since 0 and DB_NOTFOUND are 'successes';
* Restore the cursor. */
brt_cursor_set_key_val_manually(c->i->c, &g.cursor_key, &g.cursor_val);
/* cursor_key/val will be zeroed out. */
}
else {
/* Delete the saved cursor. */
if (g.cursor_key.data) toku_free(g.cursor_key.data);
if (g.cursor_val.data) toku_free(g.cursor_val.data);
}
}
/* Cleanup temporary keys. */
if (g.tmp_key.data && g.tmp_key_malloced) toku_free(g.tmp_key.data);
if (g.tmp_val.data && g.tmp_val_malloced) toku_free(g.tmp_val.data);
return r;
}
......@@ -1381,112 +1550,97 @@ static int toku_c_del_noassociate(DBC * c, u_int32_t flags) {
return r;
}
static int toku_save_original_data(DBT* dst, DBT* src) {
static int toku_c_pget(DBC * c, DBT *key, DBT *pkey, DBT *data, u_int32_t flag) {
HANDLE_PANICKED_DB(c->dbp);
DB *pdb = c->dbp->i->primary;
/* c_pget does not work on a primary. */
if (!pdb) return EINVAL;
/* If data and primary_key are both zeroed,
* the temporary storage used to fill in data
* is different in the two cases because they
* come from different trees.
* Make sure they realy are different trees. */
assert(c->dbp->i->brt!=pdb->i->brt);
assert(c->dbp!=pdb);
int r;
C_GET_VARS g;
memset(&g, 0, sizeof(g));
/* Initialize variables. */
g.c = c;
g.db = c->dbp;
g.flag = flag;
unsigned int brtflags;
toku_brt_get_flags(g.db->i->brt, &brtflags);
g.duplicates = (brtflags & TOKU_DB_DUPSORT) != 0;
*dst = *src;
#ifdef DB_DBT_PARTIAL
#error toku_c_pget does not properly handle DB_DBT_PARTIAL
#endif
//We may use this multiple times, we'll free only once at the end.
dst->flags = DB_DBT_REALLOC;
//Not using DB_DBT_USERMEM.
dst->ulen = 0;
if (src->size) {
if (!src->data) return EINVAL;
dst->data = toku_malloc(src->size);
if (!dst->data) {
r = ENOMEM;
return r;
}
memcpy(dst->data, src->data, src->size);
}
else dst->data = NULL;
return 0;
}
/* Standardize the op flag. */
toku_c_get_fix_flags(&g);
/* Determine whether the key, val, and data, parameters are read, write,
* or both. */
if ((r = toku_c_pget_describe_inputs(&g))) goto cleanup;
static int toku_c_pget(DBC * c, DBT *key, DBT *pkey, DBT *data, u_int32_t flag) {
int r;
int r2;
int r3;
DB *db = c->dbp;
HANDLE_PANICKED_DB(db);
DB *pdb = db->i->primary;
if (!pdb) return EINVAL; //c_pget does not work on a primary.
// If data and primary_key are both zeroed, the temporary storage used to fill in data is different in the two cases because they come from different trees.
assert(db->i->brt!=pdb->i->brt); // Make sure they realy are different trees.
assert(db!=pdb);
DBT copied_key;
DBT copied_pkey;
DBT copied_data;
//Store original pointers.
DBT* o_key = key;
DBT* o_pkey = pkey;
DBT* o_data = data;
//Use copied versions for everything until/if success.
key = &copied_key;
pkey = &copied_pkey;
data = &copied_data;
/* The 'key' from C_GET_VARS is the secondary key, and the 'val'
* from C_GET_VARS is the primary key. The 'data' parameter here
* is ALWAYS write-only */
/* Save the cursor position if the op can modify the cursor position. */
if ((r = toku_c_pget_save_cursor(&g))) goto cleanup;;
if (0) {
delete_silently_and_retry:
//Free any old data.
toku_free(key->data);
toku_free(pkey->data);
toku_free(data->data);
//Silently delete and re-run.
r = toku_c_del_noassociate(c, 0);
if (r != 0) return r;
}
if (0) {
died0:
return r;
}
//Need to save all the original data.
r = toku_save_original_data(&copied_key, o_key); if (r!=0) goto died0;
if (0) {
died1:
toku_free(key->data);
goto died0;
}
r = toku_save_original_data(&copied_pkey, o_pkey); if (r!=0) goto died1;
if (0) {
died2:
toku_free(pkey->data);
goto died1;
}
r = toku_save_original_data(&copied_data, o_data); if (r!=0) goto died2;
if (0) {
died3:
toku_free(data->data);
goto died2;
/* Free all old 'saved' elements, return to pristine state. */
if (g.tmp_key.data && g.tmp_key_malloced) toku_free(g.tmp_key.data);
memset(&g.tmp_key, 0, sizeof(g.tmp_key));
g.tmp_key_malloced = FALSE;
if (g.tmp_val.data && g.tmp_val_malloced) toku_free(g.tmp_val.data);
memset(&g.tmp_val, 0, sizeof(g.tmp_val));
g.tmp_val_malloced = FALSE;
if (g.tmp_dat.data && g.tmp_dat_malloced) toku_free(g.tmp_dat.data);
memset(&g.tmp_dat, 0, sizeof(g.tmp_dat));
g.tmp_dat_malloced = FALSE;
/* Silently delete and re-run. */
if ((r = toku_c_del_noassociate(c, 0))) goto cleanup;
}
/* Save the inputs. */
if ((r = toku_c_pget_save_inputs(&g, key, pkey, data))) goto cleanup;
r = toku_c_get_noassociate(c, key, pkey, flag);
if (r != 0) goto died3;
r = toku_db_get(pdb, c->i->txn, pkey, data, 0);
if (r == DB_NOTFOUND) goto delete_silently_and_retry;
if (r != 0) goto died3;
r = verify_secondary_key(db, pkey, data, key);
if (r == DB_SECONDARY_BAD) goto delete_silently_and_retry;
if (r != 0) goto died3;
if ((r = toku_c_get_noassociate(c, &g.tmp_key, &g.tmp_val, flag))) goto cleanup;
//Copy everything and return.
assert(r==0);
r = toku_db_get(pdb, c->i->txn, &g.tmp_val, &g.tmp_dat, 0);
if (r==DB_NOTFOUND) goto delete_silently_and_retry;
if (r!=0) goto cleanup;
r = toku_brt_dbt_set_key(db->i->brt, o_key, key->data, key->size);
r2 = toku_brt_dbt_set_key(pdb->i->brt, o_pkey, pkey->data, pkey->size);
r3 = toku_brt_dbt_set_value(pdb->i->brt, o_data, data->data, data->size);
r = verify_secondary_key(g.db, &g.tmp_val, &g.tmp_dat, &g.tmp_key);
if (r==DB_SECONDARY_BAD) goto delete_silently_and_retry;
if (r!=0) goto cleanup;
//Cleanup.
toku_free(key->data);
toku_free(pkey->data);
toku_free(data->data);
if (r!=0) return r;
if (r2!=0) return r2;
return r3;
/* Atomically assign all 3 outputs. */
if ((r = toku_c_pget_assign_outputs(&g, key, pkey, data))) goto cleanup;
r = 0;
cleanup:
if (g.cursor_was_saved && g.cursor_malloced) {
/* We saved the cursor. We either need to restore it, or free
* the saved version. */
if (r!=0) {
/* Restore the cursor. */
brt_cursor_set_key_val_manually(c->i->c, &g.cursor_key, &g.cursor_val);
/* cursor_key/val will be zeroed out. */
}
else {
/* Delete the saved cursor. */
if (g.cursor_key.data) toku_free(g.cursor_key.data);
if (g.cursor_val.data) toku_free(g.cursor_val.data);
}
}
/* Cleanup temporary keys. */
if (g.tmp_key.data && g.tmp_key_malloced) toku_free(g.tmp_key.data);
if (g.tmp_val.data && g.tmp_val_malloced) toku_free(g.tmp_val.data);
if (g.tmp_dat.data && g.tmp_dat_malloced) toku_free(g.tmp_dat.data);
return r;
}
static int toku_c_get(DBC * c, DBT * key, DBT * data, u_int32_t flag) {
......@@ -1657,31 +1811,43 @@ static int toku_c_del(DBC * c, u_int32_t flags) {
memset(&pkey, 0, sizeof(pkey));
memset(&data, 0, sizeof(data));
pkey.flags = DB_DBT_REALLOC;
data.flags = DB_DBT_REALLOC;
if (db->i->primary == 0) {
pdb = db;
r = toku_c_get(c, &pkey, &data, DB_CURRENT);
if (r != 0) goto assoc_cleanup;
} else {
DBT skey;
pdb = db->i->primary;
memset(&skey, 0, sizeof(skey));
skey.flags = DB_DBT_MALLOC;
r = toku_c_pget(c, &skey, &pkey, &data, DB_CURRENT);
if (r!=0) goto assoc_cleanup;
if (skey.data) toku_free(skey.data);
}
if (r != 0) return r;
for (h = list_head(&pdb->i->associated); h != &pdb->i->associated; h = h->next) {
struct __toku_db_internal *dbi = list_struct(h, struct __toku_db_internal, associated);
if (dbi->db == db) continue; //Skip current db (if its primary or secondary)
r = do_associated_deletes(c->i->txn, &pkey, &data, dbi->db);
if (r!=0) return r;
if (r!=0) goto assoc_cleanup;
}
if (db->i->primary != 0) {
//If this is a secondary, we did not delete from the primary.
//Primaries cannot have duplicates, (noncursor) del is safe.
r = toku_db_del_noassociate(pdb, c->i->txn, &pkey, DB_DELETE_ANY);
if (r!=0) return r;
if (r!=0) goto assoc_cleanup;
}
assoc_cleanup:
if (pkey.data) toku_free(pkey.data);
if (data.data) toku_free(data.data);
if (r!=0) goto cleanup;
}
r = toku_c_del_noassociate(c, flags);
if (r!=0) goto cleanup;
r = 0;
cleanup:
return r;
}
......@@ -1827,12 +1993,16 @@ static int toku_db_del(DB *db, DB_TXN *txn, DBT *key, u_int32_t flags) {
u_int32_t brtflags;
memset(&data, 0, sizeof(data));
data.flags = DB_DBT_REALLOC;
toku_brt_get_flags(db->i->brt, &brtflags);
if (brtflags & TOKU_DB_DUPSORT) {
int r2;
DBC *dbc;
BOOL found = FALSE;
DBT tmp_key;
memset(&tmp_key, 0, sizeof(tmp_key));
tmp_key.flags = DB_DBT_REALLOC;
/* If we are deleting all copies from a secondary with duplicates,
* We have to make certain we cascade all the deletes. */
......@@ -1845,13 +2015,20 @@ static int toku_db_del(DB *db, DB_TXN *txn, DBT *key, u_int32_t flags) {
r = toku_c_del(dbc, 0);
if (r==0) found = TRUE;
if (r!=0 && r!=DB_KEYEMPTY) break;
r = toku_c_get_noassociate(dbc, key, &data, DB_NEXT_DUP);
/* key is an input-only parameter. it can have flags such as
* DB_DBT_MALLOC. If this were the case, we would clobber the
* contents of key as well as possibly have a memory leak if we
* pass it to toku_c_get_noassociate below. We use a temporary
* junk variable to avoid this. */
r = toku_c_get_noassociate(dbc, &tmp_key, &data, DB_NEXT_DUP);
if (r == DB_NOTFOUND) {
//If we deleted at least one we're happy. Quit out.
if (found) r = 0;
break;
}
}
if (data.data) toku_free(data.data);
if (tmp_key.data) toku_free(tmp_key.data);
r2 = toku_c_close(dbc);
if (r != 0) return r;
......@@ -2202,9 +2379,13 @@ static int toku_db_put_noassociate(DB * db, DB_TXN * txn, DBT * key, DBT * data,
} else if (flags == DB_NOOVERWRITE) {
/* check if the key already exists */
DBT testfordata;
r = toku_db_get_noassociate(db, txn, key, toku_init_dbt(&testfordata), 0);
if (r == 0)
memset(&testfordata, 0, sizeof(testfordata));
testfordata.flags = DB_DBT_MALLOC;
r = toku_db_get_noassociate(db, txn, key, &testfordata, 0);
if (r == 0) {
if (testfordata.data) toku_free(testfordata.data);
return DB_KEYEXIST;
}
if (r != DB_NOTFOUND) return r;
} else if (flags != 0) {
/* no other flags are currently supported */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment