Commit 21ccfb87 authored by Yoni Fogel's avatar Yoni Fogel

Addresses #394

Merge branch tokudb.394 back into trunk

git-svn-id: file:///svn/tokudb@3841 c7de825b-a66e-492c-adef-691d508d4ae1
parent a7b1545a
...@@ -2506,13 +2506,35 @@ int show_brt_blocknumbers (BRT brt) { ...@@ -2506,13 +2506,35 @@ int show_brt_blocknumbers (BRT brt) {
#endif #endif
int toku_brt_dbt_set_key(BRT brt, DBT *ybt, bytevec val, ITEMLEN vallen) { int toku_brt_dbt_set_both(BRT brt, DBT* key, DBT* key_source,
int r = toku_dbt_set_value(ybt, val, vallen, &brt->skey); DBT* val, DBT* val_source) {
int r = toku_dbt_set_two_values(key, key_source->data, key_source->size, &brt->skey,
val, val_source->data, val_source->size, &brt->sval);
return r; return r;
} }
int toku_brt_dbt_set_value(BRT brt, DBT *ybt, bytevec val, ITEMLEN vallen) { int toku_brt_dbt_set_three(BRT brt_primary, BRT brt_secondary,
int r = toku_dbt_set_value(ybt, val, vallen, &brt->sval); DBT* key, DBT* key_source,
DBT* pkey, DBT* pkey_source,
DBT* val, DBT* val_source) {
int r = toku_dbt_set_three_values(key, key_source->data, key_source->size, &brt_secondary->skey,
pkey, pkey_source->data, pkey_source->size, &brt_secondary->sval,
val, val_source->data, val_source->size, &brt_primary->sval);
return r;
}
int toku_brt_dbt_set(DBT* key, DBT* key_source) {
int r = toku_dbt_set_value(key, key_source->data, key_source->size, NULL);
return r;
}
int toku_brt_dbt_set_key(BRT brt, DBT* key, DBT* key_source) {
int r = toku_dbt_set_value(key, key_source->data, key_source->size, &brt->skey);
return r;
}
int toku_brt_dbt_set_val(BRT brt, DBT* val, DBT* val_source) {
int r = toku_dbt_set_value(val, val_source->data, val_source->size, &brt->sval);
return r; return r;
} }
...@@ -2685,11 +2707,16 @@ static int brt_search_leaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT ...@@ -2685,11 +2707,16 @@ static int brt_search_leaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT
} }
} }
got_a_good_value: got_a_good_value:
if (newkey) { if (newkey && newval) {
r = toku_dbt_set_two_values(newkey, le_latest_key(le), le_latest_keylen(le), &brt->skey,
newval, le_latest_val(le), le_latest_vallen(le), &brt->sval);
if (r!=0) return r;
}
else if (newkey) {
r = toku_dbt_set_value(newkey, le_latest_key(le), le_latest_keylen(le), &brt->skey); r = toku_dbt_set_value(newkey, le_latest_key(le), le_latest_keylen(le), &brt->skey);
if (r!=0) return r; if (r!=0) return r;
} }
if (newval) { else if (newval) {
r = toku_dbt_set_value(newval, le_latest_val(le), le_latest_vallen(le), &brt->sval); r = toku_dbt_set_value(newval, le_latest_val(le), le_latest_vallen(le), &brt->sval);
if (r!=0) return r; if (r!=0) return r;
} }
...@@ -2776,6 +2803,11 @@ static inline void brt_cursor_set_key_val(BRT_CURSOR cursor, DBT *newkey, DBT *n ...@@ -2776,6 +2803,11 @@ static inline void brt_cursor_set_key_val(BRT_CURSOR cursor, DBT *newkey, DBT *n
cursor->val = *newval; memset(newval, 0, sizeof *newval); cursor->val = *newval; memset(newval, 0, sizeof *newval);
} }
/* Used to restore the state of a cursor. */
void brt_cursor_set_key_val_manually(BRT_CURSOR cursor, DBT* key, DBT* val) {
brt_cursor_set_key_val(cursor, key, val);
}
int toku_brt_cursor (BRT brt, BRT_CURSOR *cursorptr, int is_temporary_cursor) { int toku_brt_cursor (BRT brt, BRT_CURSOR *cursorptr, int is_temporary_cursor) {
BRT_CURSOR cursor = toku_malloc(sizeof *cursor); BRT_CURSOR cursor = toku_malloc(sizeof *cursor);
if (cursor == 0) if (cursor == 0)
...@@ -2816,13 +2848,38 @@ static inline int compare_kv_xy(BRT brt, DBT *k, DBT *v, DBT *x, DBT *y) { ...@@ -2816,13 +2848,38 @@ static inline int compare_kv_xy(BRT brt, DBT *k, DBT *v, DBT *x, DBT *y) {
static inline int brt_cursor_copyout(BRT_CURSOR cursor, DBT *key, DBT *val) { static inline int brt_cursor_copyout(BRT_CURSOR cursor, DBT *key, DBT *val) {
int r = 0; int r = 0;
if (key) void** key_staticp = cursor->is_temporary_cursor ? &cursor->brt->skey : &cursor->skey;
r = toku_dbt_set_value(key, cursor->key.data, cursor->key.size, cursor->is_temporary_cursor ? &cursor->brt->skey : &cursor->skey); void** val_staticp = cursor->is_temporary_cursor ? &cursor->brt->sval : &cursor->sval;
if (r == 0 && val) r = toku_dbt_set_two_values(key, cursor->key.data, cursor->key.size, key_staticp,
r = toku_dbt_set_value(val, cursor->val.data, cursor->val.size, cursor->is_temporary_cursor ? &cursor->brt->sval : &cursor->sval); val, cursor->val.data, cursor->val.size, val_staticp);
return r; return r;
} }
int toku_brt_cursor_copyout(BRT_CURSOR cursor, DBT *key, DBT *val) {
int r = brt_cursor_copyout(cursor, key, val);
return r;
}
/* Used to save the state of a cursor. */
int brt_cursor_save_key_val(BRT_CURSOR cursor, DBT* key, DBT* val) {
if (brt_cursor_not_set(cursor)) {
if (key) { *key = cursor->key; }
if (val) { *val = cursor->val; }
return 0;
}
else {
assert(!key || key->flags == DB_DBT_MALLOC);
assert(!val || val->flags == DB_DBT_MALLOC);
int r;
if ((r = brt_cursor_copyout(cursor, key, val))) { return r; }
/* An initialized cursor cannot have NULL key->data or
* val->data. */
assert(key==NULL || key->data!=NULL);
assert(val==NULL || val->data!=NULL);
return 0;
}
}
static int brt_cursor_compare_set(brt_search_t *search, DBT *x, DBT *y) { static int brt_cursor_compare_set(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context; BRT brt = search->context;
return compare_kv_xy(brt, search->k, search->v, x, y) <= 0; /* return min xy: kv <= xy */ return compare_kv_xy(brt, search->k, search->v, x, y) <= 0; /* return min xy: kv <= xy */
......
...@@ -59,8 +59,16 @@ int brtenv_checkpoint (BRTENV env); ...@@ -59,8 +59,16 @@ int brtenv_checkpoint (BRTENV env);
extern int toku_brt_do_push_cmd; // control whether push occurs eagerly. extern int toku_brt_do_push_cmd; // control whether push occurs eagerly.
int toku_brt_dbt_set_key (BRT, DBT*, bytevec val, ITEMLEN vallen); int toku_brt_dbt_set_three(BRT brt_primary, BRT brt_secondary,
int toku_brt_dbt_set_value (BRT, DBT*, bytevec val, ITEMLEN vallen); DBT* key, DBT* key_source,
DBT* pkey, DBT* pkey_source,
DBT* val, DBT* val_source);
int toku_brt_dbt_set_both(BRT brt, DBT* key, DBT* key_source,
DBT* val, DBT* val_source);
int toku_brt_dbt_set_key(BRT brt, DBT* key, DBT* key_source);
int toku_brt_dbt_set_val(BRT brt, DBT* val, DBT* val_source);
int toku_brt_dbt_set(DBT* key, DBT* key_source);
int toku_brt_cursor_copyout(BRT_CURSOR cursor, DBT *key, DBT *val);
int toku_brt_get_fd(BRT, int *); int toku_brt_get_fd(BRT, int *);
......
...@@ -150,10 +150,8 @@ static int toku_pma_cursor_get_current(PMA_CURSOR cursor, DBT *key, DBT *val, in ...@@ -150,10 +150,8 @@ static int toku_pma_cursor_get_current(PMA_CURSOR cursor, DBT *key, DBT *val, in
if (r != 0) if (r != 0)
return r; return r;
if (key) r = toku_dbt_set_two_values(key, cursor->key.data, cursor->key.size, cursor->sskey,
r = toku_dbt_set_value(key, cursor->key.data, cursor->key.size, cursor->sskey); val, cursor->val.data, cursor->val.size, cursor->ssval);
if (val && r == 0)
r = toku_dbt_set_value(val, cursor->val.data, cursor->val.size, cursor->ssval);
return r; return r;
} }
......
...@@ -16,44 +16,115 @@ DBT *toku_fill_dbt(DBT *dbt, bytevec k, ITEMLEN len) { ...@@ -16,44 +16,115 @@ DBT *toku_fill_dbt(DBT *dbt, bytevec k, ITEMLEN len) {
return dbt; return dbt;
} }
int toku_dbt_set_value (DBT *ybt, bytevec val, ITEMLEN vallen, void **staticptrp) { static inline int dbt_set_preprocess(DBT* ybt, ITEMLEN len, void** staticptrp, void** tmp_data) {
int r = ENOSYS; int r = ENOSYS;
if (ybt->flags==DB_DBT_MALLOC) { if (ybt) {
domalloc: if (ybt->flags==DB_DBT_USERMEM) {
ybt->data = toku_malloc((size_t)vallen); if (ybt->ulen < len) {
if (!ybt->data && vallen > 0) { r = errno; goto cleanup; } ybt->size = len;
} else if (ybt->flags==DB_DBT_REALLOC) { r = DB_BUFFER_SMALL;
if (ybt->data==0) goto domalloc; goto cleanup;
/* tmp is used to prevent a memory leak if realloc fails */ }
void* tmp = toku_realloc(ybt->data, (size_t)vallen);
if (!tmp && vallen > 0) { r = errno; goto cleanup; }
ybt->data = tmp;
} else if (ybt->flags==DB_DBT_USERMEM) {
ybt->size = vallen;
if (ybt->ulen < vallen) { r = DB_BUFFER_SMALL; goto cleanup; }
} else {
if (staticptrp==0) return -1;
void *staticptr=*staticptrp;
//void *old=staticptr;
if (staticptr==0) {
staticptr = toku_malloc((size_t)vallen);
if (!staticptr && vallen > 0) { r = errno; goto cleanup; }
} else {
/* tmp is used to prevent a memory leak if realloc fails */
void* tmp = toku_realloc(staticptr, vallen);
if (!tmp && vallen > 0) { r = errno; goto cleanup; }
staticptr = tmp;
} }
//if (old!=staticptr) printf("%s:%d MALLOC --> %p\n", __FILE__, __LINE__, staticptr); else if (ybt->flags==DB_DBT_MALLOC || ybt->flags==DB_DBT_REALLOC || ybt->flags==0) {
*staticptrp = staticptr; if (ybt->flags==0 && staticptrp==NULL) { r = -1; goto cleanup; }
ybt->data = vallen > 0 ? staticptr : 0; *tmp_data = toku_malloc(len);
if (!*tmp_data && len > 0) { r = errno; goto cleanup; }
}
else { r = EINVAL; goto cleanup; }
} }
ybt->size = vallen; r = 0;
if (ybt->size>0) { cleanup:
memcpy(ybt->data, val, (size_t)vallen); return r;
}
static inline void dbt_set_copy(DBT* ybt, bytevec data, ITEMLEN len, void** staticptrp, void* tmp_data) {
if (ybt) {
if (ybt->flags==DB_DBT_REALLOC && ybt->data) toku_free(ybt->data);
else if (ybt->flags==0) {
toku_free(*staticptrp);
*staticptrp = tmp_data;
}
if (ybt->flags!=DB_DBT_USERMEM) {
if (ybt->flags!=0 || len>0) ybt->data = tmp_data;
else ybt->data = NULL;
}
if ((ybt->size = len) > 0) memcpy(ybt->data, data, (size_t)len);
} }
}
/* Atomically set three dbts, such that they either both succeed, or
* there is no side effect. */
int toku_dbt_set_three_values(
DBT* ybt1, bytevec ybt1_data, ITEMLEN ybt1_len, void** ybt1_staticptrp,
DBT* ybt2, bytevec ybt2_data, ITEMLEN ybt2_len, void** ybt2_staticptrp,
DBT* ybt3, bytevec ybt3_data, ITEMLEN ybt3_len, void** ybt3_staticptrp) {
int r = ENOSYS;
void* tmp_ybt1_data = NULL;
void* tmp_ybt2_data = NULL;
void* tmp_ybt3_data = NULL;
/* Do all mallocs and check for all possible errors. */
if ((r = dbt_set_preprocess(ybt1, ybt1_len, ybt1_staticptrp, &tmp_ybt1_data))) goto cleanup;
if ((r = dbt_set_preprocess(ybt2, ybt2_len, ybt2_staticptrp, &tmp_ybt2_data))) goto cleanup;
if ((r = dbt_set_preprocess(ybt3, ybt3_len, ybt3_staticptrp, &tmp_ybt3_data))) goto cleanup;
/* Copy/modify atomically the dbts. */
dbt_set_copy(ybt1, ybt1_data, ybt1_len, ybt1_staticptrp, tmp_ybt1_data);
dbt_set_copy(ybt2, ybt2_data, ybt2_len, ybt2_staticptrp, tmp_ybt2_data);
dbt_set_copy(ybt3, ybt3_data, ybt3_len, ybt3_staticptrp, tmp_ybt3_data);
r = 0; r = 0;
cleanup: cleanup:
if (r!=0) {
if (tmp_ybt1_data) toku_free(tmp_ybt1_data);
if (tmp_ybt2_data) toku_free(tmp_ybt2_data);
if (tmp_ybt3_data) toku_free(tmp_ybt3_data);
}
return r;
}
/* Atomically set two dbts, such that they either both succeed, or
* there is no side effect. */
int toku_dbt_set_two_values(
DBT* ybt1, bytevec ybt1_data, ITEMLEN ybt1_len, void** ybt1_staticptrp,
DBT* ybt2, bytevec ybt2_data, ITEMLEN ybt2_len, void** ybt2_staticptrp) {
int r = ENOSYS;
void* tmp_ybt1_data = NULL;
void* tmp_ybt2_data = NULL;
/* Do all mallocs and check for all possible errors. */
if ((r = dbt_set_preprocess(ybt1, ybt1_len, ybt1_staticptrp, &tmp_ybt1_data))) goto cleanup;
if ((r = dbt_set_preprocess(ybt2, ybt2_len, ybt2_staticptrp, &tmp_ybt2_data))) goto cleanup;
/* Copy/modify atomically the dbts. */
dbt_set_copy(ybt1, ybt1_data, ybt1_len, ybt1_staticptrp, tmp_ybt1_data);
dbt_set_copy(ybt2, ybt2_data, ybt2_len, ybt2_staticptrp, tmp_ybt2_data);
r = 0;
cleanup:
if (r!=0) {
if (tmp_ybt1_data) toku_free(tmp_ybt1_data);
if (tmp_ybt2_data) toku_free(tmp_ybt2_data);
}
return r;
}
int toku_dbt_set_value(DBT* ybt1, bytevec ybt1_data, ITEMLEN ybt1_len, void** ybt1_staticptrp) {
int r = ENOSYS;
void* tmp_ybt1_data = NULL;
/* Do all mallocs and check for all possible errors. */
if ((r = dbt_set_preprocess(ybt1, ybt1_len, ybt1_staticptrp, &tmp_ybt1_data))) goto cleanup;
/* Copy/modify atomically the dbts. */
dbt_set_copy(ybt1, ybt1_data, ybt1_len, ybt1_staticptrp, tmp_ybt1_data);
r = 0;
cleanup:
if (r!=0) {
if (tmp_ybt1_data) toku_free(tmp_ybt1_data);
}
return r; return r;
} }
...@@ -11,7 +11,13 @@ ...@@ -11,7 +11,13 @@
DBT* toku_init_dbt (DBT *); DBT* toku_init_dbt (DBT *);
DBT *toku_fill_dbt(DBT *dbt, bytevec k, ITEMLEN len); DBT *toku_fill_dbt(DBT *dbt, bytevec k, ITEMLEN len);
int toku_dbt_set_value (DBT *, bytevec val, ITEMLEN vallen, void **staticptrp); int toku_dbt_set_value (DBT *, bytevec val, ITEMLEN vallen, void **staticptrp);
int toku_dbt_set_two_values(DBT* key, bytevec key_data, ITEMLEN key_len, void** key_staticptrp,
DBT* val, bytevec val_data, ITEMLEN val_len, void** val_staticptrp);
int toku_dbt_set_three_values(
DBT* ybt1, bytevec ybt1_data, ITEMLEN ybt1_len, void** ybt1_staticptrp,
DBT* ybt2, bytevec ybt2_data, ITEMLEN ybt2_len, void** ybt2_staticptrp,
DBT* ybt3, bytevec ybt3_data, ITEMLEN ybt3_len, void** ybt3_staticptrp);
#endif #endif
...@@ -96,7 +96,6 @@ static int toku_c_pget(DBC * c, DBT *key, DBT *pkey, DBT *data, u_int32_t flag); ...@@ -96,7 +96,6 @@ static int toku_c_pget(DBC * c, DBT *key, DBT *pkey, DBT *data, u_int32_t flag);
static int toku_c_del(DBC *c, u_int32_t flags); static int toku_c_del(DBC *c, u_int32_t flags);
static int toku_c_count(DBC *cursor, db_recno_t *count, u_int32_t flags); static int toku_c_count(DBC *cursor, db_recno_t *count, u_int32_t flags);
static int toku_c_close(DBC * c); static int toku_c_close(DBC * c);
static int toku_save_original_data(DBT* dst, DBT* src);
/* misc */ /* misc */
static char *construct_full_name(const char *dir, const char *fname); static char *construct_full_name(const char *dir, const char *fname);
...@@ -1111,31 +1110,58 @@ static inline DB_TXN* toku_txn_ancestor(DB_TXN* txn) { ...@@ -1111,31 +1110,58 @@ static inline DB_TXN* toku_txn_ancestor(DB_TXN* txn) {
static int toku_txn_add_lt(DB_TXN* txn, toku_lock_tree* lt); static int toku_txn_add_lt(DB_TXN* txn, toku_lock_tree* lt);
static void toku_c_get_fix_flags(DBC* c, u_int32_t* flag) { /* c_get has many subfunctions with lots of parameters
assert(c && flag); * this structure exists to simplify it. */
DB* db = c->dbp; typedef struct {
DBC* c; // The cursor
u_int32_t get_flag = get_main_cursor_flag(*flag); DB* db; // db the cursor is iterating over
unsigned int brtflags; DB_TXN* txn_anc; // The (root) ancestor of the transaction
toku_brt_get_flags(db->i->brt, &brtflags); TXNID id_anc;
BOOL duplicates = (brtflags & TOKU_DB_DUPSORT) != 0; DBT cursor_key; // Original position of cursor (key portion)
DBT cursor_val; // Original position of cursor (val portion)
switch (get_flag) { DBT tmp_key; // Temporary key to protect out param
case (DB_NEXT): { DBT tmp_val; // Temporary val to protect out param
if (!duplicates) { DBT tmp_dat; // Temporary data val to protect out param
toku_swap_flag(flag, &get_flag, DB_NEXT_NODUP); u_int32_t flag; // The c_get flag
u_int32_t op; // The operation portion of the c_get flag
BOOL cursor_is_write; // Whether op can change position of cursor
BOOL cursor_was_saved; // Whether we saved the cursor yet.
BOOL key_is_read;
BOOL key_is_write;
BOOL val_is_read;
BOOL val_is_write;
BOOL dat_is_read;
BOOL dat_is_write;
BOOL duplicates;
BOOL cursor_malloced;
BOOL tmp_key_malloced;
BOOL tmp_val_malloced;
BOOL tmp_dat_malloced;
} C_GET_VARS;
static void toku_c_get_fix_flags(C_GET_VARS* g) {
g->op = get_main_cursor_flag(g->flag);
switch (g->op) {
case (DB_NEXT):
case (DB_NEXT_NODUP): {
if (toku_c_uninitialized(g->c)) toku_swap_flag(&g->flag, &g->op, DB_FIRST);
else if (!g->duplicates && g->op == DB_NEXT) {
toku_swap_flag(&g->flag, &g->op, DB_NEXT_NODUP);
} }
break; break;
} }
case (DB_PREV): { case (DB_PREV):
if (!duplicates) { case (DB_PREV_NODUP): {
toku_swap_flag(flag, &get_flag, DB_PREV_NODUP); if (toku_c_uninitialized(g->c)) toku_swap_flag(&g->flag, &g->op, DB_LAST);
else if (!g->duplicates && g->op == DB_PREV) {
toku_swap_flag(&g->flag, &g->op, DB_PREV_NODUP);
} }
break; break;
} }
case (DB_GET_BOTH_RANGE): { case (DB_GET_BOTH_RANGE): {
if (!duplicates) { if (!g->duplicates) {
toku_swap_flag(flag, &get_flag, DB_GET_BOTH); toku_swap_flag(&g->flag, &g->op, DB_GET_BOTH);
} }
break; break;
} }
...@@ -1145,214 +1171,357 @@ static void toku_c_get_fix_flags(DBC* c, u_int32_t* flag) { ...@@ -1145,214 +1171,357 @@ static void toku_c_get_fix_flags(DBC* c, u_int32_t* flag) {
} }
} }
static int toku_c_get_pre_lock(DBC* c, DBT* key, DBT* data, u_int32_t* flag, static int toku_c_get_pre_acquire_lock_if_possible(C_GET_VARS* g, DBT* key, DBT* data) {
DBT* saved_key, DBT* saved_data) { int r = ENOSYS;
assert(saved_key && saved_data && flag); toku_lock_tree* lt = g->db->i->lt;
DB* db = c->dbp; if (!lt) { r = 0; goto cleanup; }
if (!db->i->lt) return 0;
saved_key->data = NULL;
saved_data->data = NULL;
DB_TXN* txn = c->i->txn;
u_int32_t get_flag = get_main_cursor_flag(*flag); /* We know what to lock ahead of time. */
unsigned int brtflags; if (g->op == DB_GET_BOTH ||
toku_brt_get_flags(db->i->brt, &brtflags); (g->op == DB_SET && !g->duplicates)) {
BOOL duplicates = (brtflags & TOKU_DB_DUPSORT) != 0; r = toku_txn_add_lt(g->txn_anc, lt);
DB_TXN* txn_anc = NULL; if (r!=0) goto cleanup;
TXNID id_anc = 0; r = toku_lt_acquire_read_lock(lt, g->db, g->id_anc, key, data);
if (r!=0) goto cleanup;
}
r = 0;
cleanup:
return r;
}
int r = 0; static int toku_c_get_describe_inputs(C_GET_VARS* g) {
switch (get_flag) { int r = ENOSYS;
case (DB_CURRENT): /* Default is FALSE. */
case (DB_SET): switch (g->op) {
case (DB_FIRST): case DB_SET:
case (DB_LAST): { g->key_is_read = TRUE;
/* The above cases have all their code in toku_c_get_post_lock. */ g->val_is_write = TRUE;
break;
}
case (DB_GET_BOTH): {
txn_anc = toku_txn_ancestor(txn);
r = toku_txn_add_lt(txn_anc, db->i->lt);
if (r!=0) return r;
id_anc = toku_txn_get_txnid(txn_anc->i->tokutxn);
r = toku_lt_acquire_read_lock(db->i->lt, db, id_anc, key, data);
break; break;
} case DB_SET_RANGE:
case (DB_SET_RANGE): { g->key_is_read = TRUE;
r = toku_save_original_data(saved_key, key); g->key_is_write = TRUE;
g->val_is_write = TRUE;
break; break;
} case DB_GET_BOTH:
case (DB_GET_BOTH_RANGE): { g->key_is_read = TRUE;
assert(duplicates); g->val_is_read = TRUE;
r = toku_save_original_data(saved_data, data);
break; break;
} case DB_GET_BOTH_RANGE:
case (DB_NEXT): assert(g->duplicates);
case (DB_NEXT_NODUP): { g->key_is_read = TRUE;
r = toku_uninitialized_swap(c, saved_key, saved_data, flag, g->val_is_read = TRUE;
&get_flag, DB_FIRST); g->val_is_write = TRUE;
break; break;
} case DB_CURRENT:
case (DB_PREV): case DB_FIRST:
case (DB_PREV_NODUP): { case DB_LAST:
r = toku_uninitialized_swap(c, saved_key, saved_data, flag, case DB_NEXT:
&get_flag, DB_LAST); case DB_NEXT_NODUP:
break; case DB_NEXT_DUP:
} case DB_PREV:
#ifdef DB_PREV_DUP case DB_PREV_NODUP:
case (DB_PREV_DUP): #if defined(DB_PREV_DUP)
case DB_PREV_DUP:
#endif #endif
case (DB_NEXT_DUP): { g->key_is_write = TRUE;
if (!duplicates || toku_c_uninitialized(c)) r = EINVAL; g->val_is_write = TRUE;
else r = toku_c_get_current_unconditional(c, saved_key, saved_data);
break; break;
} default:
default: {
//TODO: Output an error.
r = EINVAL; r = EINVAL;
break; goto cleanup;
}
} }
if (g->op != DB_CURRENT) g->cursor_is_write = TRUE;
r = 0;
cleanup:
return r; return r;
} }
static int toku_c_get_post_lock(DBC* c, DBT* key, DBT* data, u_int32_t flag, static int toku_c_pget_describe_inputs(C_GET_VARS* g) {
int r_last, DBT* saved_key, DBT* saved_data) { int r = toku_c_get_describe_inputs(g);
assert(saved_key && saved_data); g->dat_is_read = FALSE;
DB* db = c->dbp; g->dat_is_write = TRUE;
if (!db->i->lt) return r_last; return r;
int r = 0; }
if (r_last && r_last != DB_NOTFOUND && r_last != DB_KEYEMPTY) {
r = r_last; static int toku_c_pget_save_inputs(C_GET_VARS* g, DBT* key, DBT* val, DBT* dat) {
goto cleanup; int r = ENOSYS;
}
DB_TXN* txn = c->i->txn; /*
u_int32_t get_flag = get_main_cursor_flag(flag); * Readonly: Copy original struct
if (r_last == DB_KEYEMPTY) { * Writeonly: Set to 0 (already done), DB_DBT_REALLOC, copy back later.
assert(get_flag == DB_CURRENT); * ReadWrite: Make a copy of original in new memory, copy back later
return r_last; * */
if (key) {
assert(g->key_is_read || g->key_is_write);
if (g->key_is_read && !g->key_is_write) g->tmp_key = *key;
else {
/* g->key_is_write */
g->tmp_key.flags = DB_DBT_REALLOC;
if (g->key_is_read &&
(r = toku_brt_dbt_set(&g->tmp_key, key))) goto cleanup;
g->tmp_key_malloced = TRUE;
}
}
if (val) {
assert(g->val_is_read || g->val_is_write);
if (g->val_is_read && !g->val_is_write) g->tmp_val = *val;
else {
/* g->val_is_write */
g->tmp_val.flags = DB_DBT_REALLOC;
if (g->val_is_read &&
(r = toku_brt_dbt_set(&g->tmp_val, val))) goto cleanup;
g->tmp_val_malloced = TRUE;
}
} }
assert(r_last == DB_NOTFOUND || r_last == 0); if (dat) {
BOOL found = r_last == 0; assert(g->dat_is_read || g->dat_is_write);
if (g->dat_is_read && !g->dat_is_write) g->tmp_dat = *dat;
else {
/* g->dat_is_write */
g->tmp_dat.flags = DB_DBT_REALLOC;
if (g->dat_is_read &&
(r = toku_brt_dbt_set(&g->tmp_dat, dat))) goto cleanup;
g->tmp_dat_malloced = TRUE;
}
}
r = 0;
cleanup:
if (r!=0) {
/* Cleanup memory allocated if necessary. */
if (g->tmp_key.data && g->tmp_key_malloced) toku_free(g->tmp_key.data);
if (g->tmp_val.data && g->tmp_val_malloced) toku_free(g->tmp_val.data);
if (g->tmp_dat.data && g->tmp_dat_malloced) toku_free(g->tmp_dat.data);
}
return r;
}
static int toku_c_get_save_inputs(C_GET_VARS* g, DBT* key, DBT* val) {
int r = toku_c_pget_save_inputs(g, key, val, NULL);
return r;
}
static int toku_c_get_post_lock(C_GET_VARS* g, BOOL found, DBT* orig_key, DBT* orig_val) {
int r = ENOSYS;
toku_lock_tree* lt = g->db->i->lt;
if (!lt) { r = 0; goto cleanup; }
BOOL lock = TRUE; BOOL lock = TRUE;
const DBT* key_l; const DBT* key_l;
const DBT* key_r; const DBT* key_r;
const DBT* data_l; const DBT* val_l;
const DBT* data_r; const DBT* val_r;
switch (get_flag) { switch (g->op) {
case (DB_CURRENT): { case (DB_CURRENT):
/* No locking necessary. You already own a lock by virtue /* No locking necessary. You already own a lock by virtue
of having a cursor pointing to this. */ of having a cursor pointing to this. */
lock = FALSE; lock = FALSE;
break; break;
} case (DB_SET):
case (DB_SET): { if (!g->duplicates) {
key_l = key_r = key; /* We acquired the lock before the cursor op. */
data_l = toku_lt_neg_infinity; lock = FALSE;
data_r = found ? data : toku_lt_infinity; break;
}
key_l = key_r = &g->tmp_key;
val_l = toku_lt_neg_infinity;
val_r = found ? &g->tmp_val : toku_lt_infinity;
break; break;
} case (DB_GET_BOTH):
case (DB_GET_BOTH): { /* We acquired the lock before the cursor op. */
/* All done in toku_c_get_pre_lock. */
lock = FALSE; lock = FALSE;
break; break;
} case (DB_FIRST):
case (DB_FIRST): { key_l = val_l = toku_lt_neg_infinity;
key_l = data_l = toku_lt_neg_infinity; key_r = found ? &g->tmp_key : toku_lt_infinity;
key_r = found ? key : toku_lt_infinity; val_r = found ? &g->tmp_val : toku_lt_infinity;
data_r = found ? data : toku_lt_infinity;
break; break;
} case (DB_LAST):
case (DB_LAST): { key_l = found ? &g->tmp_key : toku_lt_neg_infinity;
key_l = found ? key : toku_lt_neg_infinity; val_l = found ? &g->tmp_val : toku_lt_neg_infinity;
data_l = found ? data : toku_lt_neg_infinity; key_r = val_r = toku_lt_infinity;
key_r = data_r = toku_lt_infinity;
break; break;
} case (DB_SET_RANGE):
case (DB_SET_RANGE): { key_l = orig_key;
key_l = saved_key; val_l = toku_lt_neg_infinity;
data_l = toku_lt_neg_infinity; key_r = found ? &g->tmp_key : toku_lt_infinity;
key_r = found ? key : toku_lt_infinity; val_r = found ? &g->tmp_val : toku_lt_infinity;
data_r = found ? data : toku_lt_infinity;
break; break;
} case (DB_GET_BOTH_RANGE):
case (DB_GET_BOTH_RANGE): { key_l = key_r = &g->tmp_key;
key_l = key_r = key; val_l = orig_val;
data_l = saved_data; val_r = found ? &g->tmp_val : toku_lt_infinity;
data_r = found ? data : toku_lt_infinity;
break; break;
}
case (DB_NEXT): case (DB_NEXT):
case (DB_NEXT_NODUP): { case (DB_NEXT_NODUP):
assert(!toku_c_uninitialized(c)); assert(!toku_c_uninitialized(g->c));
key_l = saved_key; key_l = &g->cursor_key;
data_l = saved_data; val_l = &g->cursor_val;
key_r = found ? key : toku_lt_infinity; key_r = found ? &g->tmp_key : toku_lt_infinity;
data_r = found ? data : toku_lt_infinity; val_r = found ? &g->tmp_val : toku_lt_infinity;
break; break;
}
case (DB_PREV): case (DB_PREV):
case (DB_PREV_NODUP): { case (DB_PREV_NODUP):
assert(!toku_c_uninitialized(c)); assert(!toku_c_uninitialized(g->c));
key_l = found ? key : toku_lt_neg_infinity; key_l = found ? &g->tmp_key : toku_lt_neg_infinity;
data_l = found ? data : toku_lt_neg_infinity; val_l = found ? &g->tmp_val : toku_lt_neg_infinity;
key_r = saved_key; key_r = &g->cursor_key;
data_r = saved_data; val_r = &g->cursor_val;
break; break;
} case (DB_NEXT_DUP):
case (DB_NEXT_DUP): { assert(!toku_c_uninitialized(g->c));
assert(!toku_c_uninitialized(c)); key_l = key_r = &g->cursor_key;
key_l = key_r = saved_key; val_l = &g->cursor_val;
data_l = saved_data; val_r = found ? &g->tmp_val : toku_lt_infinity;
data_r = found ? data : toku_lt_infinity;
break; break;
}
#ifdef DB_PREV_DUP #ifdef DB_PREV_DUP
case (DB_PREV_DUP): { case (DB_PREV_DUP):
assert(!toku_c_uninitialized(c)); assert(!toku_c_uninitialized(g->c));
key_l = key_r = saved_key; key_l = key_r = &g->cursor_key;
data_l = found ? data : toku_lt_neg_infinity; val_l = found ? &g->tmp_val : toku_lt_neg_infinity;
data_r = saved_data; val_r = &g->cursor_val;
break; break;
}
#endif #endif
default: { default:
r = EINVAL; r = EINVAL;
lock = FALSE; lock = FALSE;
break; goto cleanup;
}
} }
if (lock) { if (lock) {
DB_TXN* txn_anc = toku_txn_ancestor(txn); if ((r = toku_txn_add_lt(g->txn_anc, lt))) goto cleanup;
r = toku_txn_add_lt(txn_anc, db->i->lt); r = toku_lt_acquire_range_read_lock(lt, g->db, g->id_anc, key_l, val_l,
if (r!=0) { goto cleanup; } key_r, val_r);
TXNID id_anc = toku_txn_get_txnid(txn_anc->i->tokutxn); if (r!=0) goto cleanup;
r = toku_lt_acquire_range_read_lock(db->i->lt, db, id_anc,
key_l, data_l,
key_r, data_r);
} }
r = 0;
cleanup: cleanup:
if (saved_key->data) toku_free(saved_key->data); return r;
if (saved_data->data) toku_free(saved_data->data);
return r ? r : r_last;
} }
/* Used to save the state of a cursor. */
int brt_cursor_save_key_val(BRT_CURSOR cursor, DBT* key, DBT* val);
/* Used to restore the state of a cursor. */
void brt_cursor_set_key_val_manually(BRT_CURSOR cursor, DBT* key, DBT* val);
static int toku_c_get_noassociate(DBC * c, DBT * key, DBT * data, u_int32_t flag) { static int toku_c_get_save_cursor(C_GET_VARS* g) {
HANDLE_PANICKED_DB(c->dbp); int r = ENOSYS;
DBT saved_key; if (!g->cursor_is_write) { r = 0; goto cleanup; }
DBT saved_data; if (!toku_c_uninitialized(g->c)) {
g->cursor_key.flags = DB_DBT_MALLOC;
g->cursor_val.flags = DB_DBT_MALLOC;
}
if ((r = brt_cursor_save_key_val(g->c->i->c, &g->cursor_key, &g->cursor_val))) goto cleanup;
if (!toku_c_uninitialized(g->c)) g->cursor_malloced = TRUE;
g->cursor_was_saved = TRUE;
r = 0;
cleanup:
return r;
}
int r; static int toku_c_pget_save_cursor(C_GET_VARS* g) {
toku_c_get_fix_flags(c, &flag); return toku_c_get_save_cursor(g);
r = toku_c_get_pre_lock(c, key, data, &flag, &saved_key, &saved_data); }
if (r!=0) return r;
static int toku_c_pget_assign_outputs(C_GET_VARS* g, DBT* key, DBT* val, DBT* dat) {
int r = ENOSYS;
DBT* write_key = g->key_is_write ? key : NULL;
DBT* write_val = g->val_is_write ? val : NULL;
DBT* write_dat = g->dat_is_write ? dat : NULL;
/* In the case of a non-associated database, we call it a
* 'secondary' here. */
DB* pdb = g->db->i->primary;
BRT primary = pdb ? pdb->i->brt : NULL;
BRT secondary = g->db->i->brt;
r = toku_brt_dbt_set_three(primary, secondary,
write_key, &g->tmp_key,
write_val, &g->tmp_val,
write_dat, &g->tmp_dat);
if (r!=0) goto cleanup;
r = 0;
cleanup:
return r;
}
static int toku_c_get_assign_outputs(C_GET_VARS* g, DBT* key, DBT* val) {
int r = ENOSYS;
DBT* write_key = g->key_is_write ? key : NULL;
DBT* write_val = g->val_is_write ? val : NULL;
r = toku_brt_dbt_set_both(g->db->i->brt,
write_key, &g->tmp_key,
write_val, &g->tmp_val);
if (r!=0) goto cleanup;
r = 0;
cleanup:
return r;
}
static int toku_c_get_noassociate(DBC * c, DBT * key, DBT * val, u_int32_t flag) {
HANDLE_PANICKED_DB(c->dbp);
int r = ENOSYS;
int r_cursor_op = 0;
C_GET_VARS g;
memset(&g, 0, sizeof(g));
/* Initialize variables. */
g.c = c;
g.db = c->dbp;
g.flag = flag;
if (c->i->txn) {
g.txn_anc = toku_txn_ancestor(c->i->txn);
g.id_anc = toku_txn_get_txnid(g.txn_anc->i->tokutxn);
}
unsigned int brtflags;
toku_brt_get_flags(g.db->i->brt, &brtflags);
g.duplicates = (brtflags & TOKU_DB_DUPSORT) != 0;
/* Standardize the op flag. */
toku_c_get_fix_flags(&g);
/* If we know what to lock before the cursor op, lock now. */
if ((r = toku_c_get_pre_acquire_lock_if_possible(&g, key, val))) goto cleanup;
/* Determine whether the key and val parameters are read, write,
* or both. */
if ((r = toku_c_get_describe_inputs(&g))) goto cleanup;
/* Save the cursor position if the op can modify the cursor position. */
if ((r = toku_c_get_save_cursor(&g))) goto cleanup;
/* Save key and value to temporary local versions. */
if ((r = toku_c_get_save_inputs(&g, key, val))) goto cleanup;
/* Run the cursor operation on the brt. */
TOKUTXN txn = c->i->txn ? c->i->txn->i->tokutxn : NULL; TOKUTXN txn = c->i->txn ? c->i->txn->i->tokutxn : NULL;
r = toku_brt_cursor_get(c->i->c, key, data, flag, txn); r_cursor_op = r = toku_brt_cursor_get(c->i->c, &g.tmp_key, &g.tmp_val, g.flag, txn);
r = toku_c_get_post_lock(c, key, data, flag, r, &saved_key, &saved_data); /* Only DB_CURRENT should possibly retun DB_KEYEMPTY,
* and DB_CURRENT requires no locking. */
if (r==DB_KEYEMPTY) { assert(g.op==DB_CURRENT); goto cleanup; }
/* If we do not find what the query wants, a lock can still fail
* 'first'. */
if (r!=0 && r!=DB_NOTFOUND) goto cleanup;
/* If we have not yet locked, lock now. */
BOOL found = r_cursor_op==0;
r = toku_c_get_post_lock(&g, found, key, val);
if (r!=0) goto cleanup;
/* if found, write the outputs to the output parameters. */
if (found && (r = toku_c_get_assign_outputs(&g, key, val))) goto cleanup;
r = r_cursor_op;
cleanup:
if (g.cursor_was_saved && g.cursor_malloced) {
/* We saved the cursor. We either need to restore it, or free
* the saved version. */
if (r!=0 && r!=DB_NOTFOUND) {
/* Failure since 0 and DB_NOTFOUND are 'successes';
* Restore the cursor. */
brt_cursor_set_key_val_manually(c->i->c, &g.cursor_key, &g.cursor_val);
/* cursor_key/val will be zeroed out. */
}
else {
/* Delete the saved cursor. */
if (g.cursor_key.data) toku_free(g.cursor_key.data);
if (g.cursor_val.data) toku_free(g.cursor_val.data);
}
}
/* Cleanup temporary keys. */
if (g.tmp_key.data && g.tmp_key_malloced) toku_free(g.tmp_key.data);
if (g.tmp_val.data && g.tmp_val_malloced) toku_free(g.tmp_val.data);
return r; return r;
} }
...@@ -1381,112 +1550,97 @@ static int toku_c_del_noassociate(DBC * c, u_int32_t flags) { ...@@ -1381,112 +1550,97 @@ static int toku_c_del_noassociate(DBC * c, u_int32_t flags) {
return r; return r;
} }
static int toku_save_original_data(DBT* dst, DBT* src) {
int r;
*dst = *src;
#ifdef DB_DBT_PARTIAL
#error toku_c_pget does not properly handle DB_DBT_PARTIAL
#endif
//We may use this multiple times, we'll free only once at the end.
dst->flags = DB_DBT_REALLOC;
//Not using DB_DBT_USERMEM.
dst->ulen = 0;
if (src->size) {
if (!src->data) return EINVAL;
dst->data = toku_malloc(src->size);
if (!dst->data) {
r = ENOMEM;
return r;
}
memcpy(dst->data, src->data, src->size);
}
else dst->data = NULL;
return 0;
}
static int toku_c_pget(DBC * c, DBT *key, DBT *pkey, DBT *data, u_int32_t flag) { static int toku_c_pget(DBC * c, DBT *key, DBT *pkey, DBT *data, u_int32_t flag) {
HANDLE_PANICKED_DB(c->dbp);
DB *pdb = c->dbp->i->primary;
/* c_pget does not work on a primary. */
if (!pdb) return EINVAL;
/* If data and primary_key are both zeroed,
* the temporary storage used to fill in data
* is different in the two cases because they
* come from different trees.
* Make sure they realy are different trees. */
assert(c->dbp->i->brt!=pdb->i->brt);
assert(c->dbp!=pdb);
int r; int r;
int r2; C_GET_VARS g;
int r3; memset(&g, 0, sizeof(g));
DB *db = c->dbp; /* Initialize variables. */
HANDLE_PANICKED_DB(db); g.c = c;
DB *pdb = db->i->primary; g.db = c->dbp;
g.flag = flag;
if (!pdb) return EINVAL; //c_pget does not work on a primary. unsigned int brtflags;
// If data and primary_key are both zeroed, the temporary storage used to fill in data is different in the two cases because they come from different trees. toku_brt_get_flags(g.db->i->brt, &brtflags);
assert(db->i->brt!=pdb->i->brt); // Make sure they realy are different trees. g.duplicates = (brtflags & TOKU_DB_DUPSORT) != 0;
assert(db!=pdb);
/* Standardize the op flag. */
DBT copied_key; toku_c_get_fix_flags(&g);
DBT copied_pkey; /* Determine whether the key, val, and data, parameters are read, write,
DBT copied_data; * or both. */
//Store original pointers. if ((r = toku_c_pget_describe_inputs(&g))) goto cleanup;
DBT* o_key = key;
DBT* o_pkey = pkey; /* The 'key' from C_GET_VARS is the secondary key, and the 'val'
DBT* o_data = data; * from C_GET_VARS is the primary key. The 'data' parameter here
//Use copied versions for everything until/if success. * is ALWAYS write-only */
key = &copied_key;
pkey = &copied_pkey; /* Save the cursor position if the op can modify the cursor position. */
data = &copied_data; if ((r = toku_c_pget_save_cursor(&g))) goto cleanup;;
if (0) { if (0) {
delete_silently_and_retry: delete_silently_and_retry:
//Free any old data. /* Free all old 'saved' elements, return to pristine state. */
toku_free(key->data); if (g.tmp_key.data && g.tmp_key_malloced) toku_free(g.tmp_key.data);
toku_free(pkey->data); memset(&g.tmp_key, 0, sizeof(g.tmp_key));
toku_free(data->data); g.tmp_key_malloced = FALSE;
//Silently delete and re-run.
r = toku_c_del_noassociate(c, 0); if (g.tmp_val.data && g.tmp_val_malloced) toku_free(g.tmp_val.data);
if (r != 0) return r; memset(&g.tmp_val, 0, sizeof(g.tmp_val));
} g.tmp_val_malloced = FALSE;
if (0) {
died0: if (g.tmp_dat.data && g.tmp_dat_malloced) toku_free(g.tmp_dat.data);
return r; memset(&g.tmp_dat, 0, sizeof(g.tmp_dat));
} g.tmp_dat_malloced = FALSE;
//Need to save all the original data. /* Silently delete and re-run. */
r = toku_save_original_data(&copied_key, o_key); if (r!=0) goto died0; if ((r = toku_c_del_noassociate(c, 0))) goto cleanup;
if (0) {
died1:
toku_free(key->data);
goto died0;
} }
r = toku_save_original_data(&copied_pkey, o_pkey); if (r!=0) goto died1; /* Save the inputs. */
if (0) { if ((r = toku_c_pget_save_inputs(&g, key, pkey, data))) goto cleanup;
died2:
toku_free(pkey->data); if ((r = toku_c_get_noassociate(c, &g.tmp_key, &g.tmp_val, flag))) goto cleanup;
goto died1;
r = toku_db_get(pdb, c->i->txn, &g.tmp_val, &g.tmp_dat, 0);
if (r==DB_NOTFOUND) goto delete_silently_and_retry;
if (r!=0) goto cleanup;
r = verify_secondary_key(g.db, &g.tmp_val, &g.tmp_dat, &g.tmp_key);
if (r==DB_SECONDARY_BAD) goto delete_silently_and_retry;
if (r!=0) goto cleanup;
/* Atomically assign all 3 outputs. */
if ((r = toku_c_pget_assign_outputs(&g, key, pkey, data))) goto cleanup;
r = 0;
cleanup:
if (g.cursor_was_saved && g.cursor_malloced) {
/* We saved the cursor. We either need to restore it, or free
* the saved version. */
if (r!=0) {
/* Restore the cursor. */
brt_cursor_set_key_val_manually(c->i->c, &g.cursor_key, &g.cursor_val);
/* cursor_key/val will be zeroed out. */
}
else {
/* Delete the saved cursor. */
if (g.cursor_key.data) toku_free(g.cursor_key.data);
if (g.cursor_val.data) toku_free(g.cursor_val.data);
}
} }
r = toku_save_original_data(&copied_data, o_data); if (r!=0) goto died2; /* Cleanup temporary keys. */
if (0) { if (g.tmp_key.data && g.tmp_key_malloced) toku_free(g.tmp_key.data);
died3: if (g.tmp_val.data && g.tmp_val_malloced) toku_free(g.tmp_val.data);
toku_free(data->data); if (g.tmp_dat.data && g.tmp_dat_malloced) toku_free(g.tmp_dat.data);
goto died2; return r;
}
r = toku_c_get_noassociate(c, key, pkey, flag);
if (r != 0) goto died3;
r = toku_db_get(pdb, c->i->txn, pkey, data, 0);
if (r == DB_NOTFOUND) goto delete_silently_and_retry;
if (r != 0) goto died3;
r = verify_secondary_key(db, pkey, data, key);
if (r == DB_SECONDARY_BAD) goto delete_silently_and_retry;
if (r != 0) goto died3;
//Copy everything and return.
assert(r==0);
r = toku_brt_dbt_set_key(db->i->brt, o_key, key->data, key->size);
r2 = toku_brt_dbt_set_key(pdb->i->brt, o_pkey, pkey->data, pkey->size);
r3 = toku_brt_dbt_set_value(pdb->i->brt, o_data, data->data, data->size);
//Cleanup.
toku_free(key->data);
toku_free(pkey->data);
toku_free(data->data);
if (r!=0) return r;
if (r2!=0) return r2;
return r3;
} }
static int toku_c_get(DBC * c, DBT * key, DBT * data, u_int32_t flag) { static int toku_c_get(DBC * c, DBT * key, DBT * data, u_int32_t flag) {
...@@ -1657,31 +1811,43 @@ static int toku_c_del(DBC * c, u_int32_t flags) { ...@@ -1657,31 +1811,43 @@ static int toku_c_del(DBC * c, u_int32_t flags) {
memset(&pkey, 0, sizeof(pkey)); memset(&pkey, 0, sizeof(pkey));
memset(&data, 0, sizeof(data)); memset(&data, 0, sizeof(data));
pkey.flags = DB_DBT_REALLOC;
data.flags = DB_DBT_REALLOC;
if (db->i->primary == 0) { if (db->i->primary == 0) {
pdb = db; pdb = db;
r = toku_c_get(c, &pkey, &data, DB_CURRENT); r = toku_c_get(c, &pkey, &data, DB_CURRENT);
if (r != 0) goto assoc_cleanup;
} else { } else {
DBT skey; DBT skey;
pdb = db->i->primary; pdb = db->i->primary;
memset(&skey, 0, sizeof(skey)); memset(&skey, 0, sizeof(skey));
skey.flags = DB_DBT_MALLOC;
r = toku_c_pget(c, &skey, &pkey, &data, DB_CURRENT); r = toku_c_pget(c, &skey, &pkey, &data, DB_CURRENT);
if (r!=0) goto assoc_cleanup;
if (skey.data) toku_free(skey.data);
} }
if (r != 0) return r;
for (h = list_head(&pdb->i->associated); h != &pdb->i->associated; h = h->next) { for (h = list_head(&pdb->i->associated); h != &pdb->i->associated; h = h->next) {
struct __toku_db_internal *dbi = list_struct(h, struct __toku_db_internal, associated); struct __toku_db_internal *dbi = list_struct(h, struct __toku_db_internal, associated);
if (dbi->db == db) continue; //Skip current db (if its primary or secondary) if (dbi->db == db) continue; //Skip current db (if its primary or secondary)
r = do_associated_deletes(c->i->txn, &pkey, &data, dbi->db); r = do_associated_deletes(c->i->txn, &pkey, &data, dbi->db);
if (r!=0) return r; if (r!=0) goto assoc_cleanup;
} }
if (db->i->primary != 0) { if (db->i->primary != 0) {
//If this is a secondary, we did not delete from the primary. //If this is a secondary, we did not delete from the primary.
//Primaries cannot have duplicates, (noncursor) del is safe. //Primaries cannot have duplicates, (noncursor) del is safe.
r = toku_db_del_noassociate(pdb, c->i->txn, &pkey, DB_DELETE_ANY); r = toku_db_del_noassociate(pdb, c->i->txn, &pkey, DB_DELETE_ANY);
if (r!=0) return r; if (r!=0) goto assoc_cleanup;
} }
assoc_cleanup:
if (pkey.data) toku_free(pkey.data);
if (data.data) toku_free(data.data);
if (r!=0) goto cleanup;
} }
r = toku_c_del_noassociate(c, flags); r = toku_c_del_noassociate(c, flags);
if (r!=0) goto cleanup;
r = 0;
cleanup:
return r; return r;
} }
...@@ -1827,12 +1993,16 @@ static int toku_db_del(DB *db, DB_TXN *txn, DBT *key, u_int32_t flags) { ...@@ -1827,12 +1993,16 @@ static int toku_db_del(DB *db, DB_TXN *txn, DBT *key, u_int32_t flags) {
u_int32_t brtflags; u_int32_t brtflags;
memset(&data, 0, sizeof(data)); memset(&data, 0, sizeof(data));
data.flags = DB_DBT_REALLOC;
toku_brt_get_flags(db->i->brt, &brtflags); toku_brt_get_flags(db->i->brt, &brtflags);
if (brtflags & TOKU_DB_DUPSORT) { if (brtflags & TOKU_DB_DUPSORT) {
int r2; int r2;
DBC *dbc; DBC *dbc;
BOOL found = FALSE; BOOL found = FALSE;
DBT tmp_key;
memset(&tmp_key, 0, sizeof(tmp_key));
tmp_key.flags = DB_DBT_REALLOC;
/* If we are deleting all copies from a secondary with duplicates, /* If we are deleting all copies from a secondary with duplicates,
* We have to make certain we cascade all the deletes. */ * We have to make certain we cascade all the deletes. */
...@@ -1845,13 +2015,20 @@ static int toku_db_del(DB *db, DB_TXN *txn, DBT *key, u_int32_t flags) { ...@@ -1845,13 +2015,20 @@ static int toku_db_del(DB *db, DB_TXN *txn, DBT *key, u_int32_t flags) {
r = toku_c_del(dbc, 0); r = toku_c_del(dbc, 0);
if (r==0) found = TRUE; if (r==0) found = TRUE;
if (r!=0 && r!=DB_KEYEMPTY) break; if (r!=0 && r!=DB_KEYEMPTY) break;
r = toku_c_get_noassociate(dbc, key, &data, DB_NEXT_DUP); /* key is an input-only parameter. it can have flags such as
* DB_DBT_MALLOC. If this were the case, we would clobber the
* contents of key as well as possibly have a memory leak if we
* pass it to toku_c_get_noassociate below. We use a temporary
* junk variable to avoid this. */
r = toku_c_get_noassociate(dbc, &tmp_key, &data, DB_NEXT_DUP);
if (r == DB_NOTFOUND) { if (r == DB_NOTFOUND) {
//If we deleted at least one we're happy. Quit out. //If we deleted at least one we're happy. Quit out.
if (found) r = 0; if (found) r = 0;
break; break;
} }
} }
if (data.data) toku_free(data.data);
if (tmp_key.data) toku_free(tmp_key.data);
r2 = toku_c_close(dbc); r2 = toku_c_close(dbc);
if (r != 0) return r; if (r != 0) return r;
...@@ -2202,9 +2379,13 @@ static int toku_db_put_noassociate(DB * db, DB_TXN * txn, DBT * key, DBT * data, ...@@ -2202,9 +2379,13 @@ static int toku_db_put_noassociate(DB * db, DB_TXN * txn, DBT * key, DBT * data,
} else if (flags == DB_NOOVERWRITE) { } else if (flags == DB_NOOVERWRITE) {
/* check if the key already exists */ /* check if the key already exists */
DBT testfordata; DBT testfordata;
r = toku_db_get_noassociate(db, txn, key, toku_init_dbt(&testfordata), 0); memset(&testfordata, 0, sizeof(testfordata));
if (r == 0) testfordata.flags = DB_DBT_MALLOC;
r = toku_db_get_noassociate(db, txn, key, &testfordata, 0);
if (r == 0) {
if (testfordata.data) toku_free(testfordata.data);
return DB_KEYEXIST; return DB_KEYEXIST;
}
if (r != DB_NOTFOUND) return r; if (r != DB_NOTFOUND) return r;
} else if (flags != 0) { } else if (flags != 0) {
/* no other flags are currently supported */ /* no other flags are currently supported */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment