Commit 7c65449c authored by Yoni Fogel's avatar Yoni Fogel

Addresses #1031

Implements lazy updates for next_shortcut, and prev_shortcut.


git-svn-id: file:///svn/tokudb@5221 c7de825b-a66e-492c-adef-691d508d4ae1
parent 945b2dda
...@@ -205,6 +205,7 @@ unsigned int toku_brtnode_pivot_key_len (BRTNODE, struct kv_pair *); // Given th ...@@ -205,6 +205,7 @@ unsigned int toku_brtnode_pivot_key_len (BRTNODE, struct kv_pair *); // Given th
struct brt_cursor { struct brt_cursor {
struct list cursors_link; struct list cursors_link;
BRT brt; BRT brt;
BOOL current_in_omt, prev_in_omt;
DBT key, val; // The key-value pair that the cursor currently points to DBT key, val; // The key-value pair that the cursor currently points to
DBT prevkey, prevval; // The key-value pair that the cursor pointed to previously. (E.g., when we do a DB_NEXT) DBT prevkey, prevval; // The key-value pair that the cursor pointed to previously. (E.g., when we do a DB_NEXT)
int is_temporary_cursor; // If it is a temporary cursor then use the following skey and sval to return tokudb-managed values in dbts. Otherwise use the brt's skey and skval. int is_temporary_cursor; // If it is a temporary cursor then use the following skey and sval to return tokudb-managed values in dbts. Otherwise use the brt's skey and skval.
......
...@@ -3104,6 +3104,45 @@ BOOL toku_brt_cursor_uninitialized(BRT_CURSOR c) { ...@@ -3104,6 +3104,45 @@ BOOL toku_brt_cursor_uninitialized(BRT_CURSOR c) {
return brt_cursor_not_set(c); return brt_cursor_not_set(c);
} }
static inline void load_dbts_from_omt(BRT_CURSOR c, DBT *key, DBT *val) {
OMTVALUE le;
int r = toku_omt_cursor_current(c->omtcursor, &le);
assert(r==0);
if (key) {
key->data = le_latest_key(le);
key->size = le_latest_keylen(le);
}
if (val) {
val->data = le_latest_val(le);
val->size = le_latest_vallen(le);
}
}
static void brt_cursor_invalidate_callback(OMTCURSOR UU(omt_c), void *extra) {
BRT_CURSOR cursor = extra;
if (cursor->current_in_omt) {
assert(cursor->key.flags==DB_DBT_REALLOC);
assert(cursor->val.flags==DB_DBT_REALLOC);
DBT key,val;
int r;
load_dbts_from_omt(cursor, toku_init_dbt(&key), toku_init_dbt(&val));
//Make certain not to try to free the omt's memory.
toku_init_dbt(&cursor->key)->flags = DB_DBT_REALLOC;
toku_init_dbt(&cursor->val)->flags = DB_DBT_REALLOC;
r = toku_dbt_set_two_values(&cursor->key, (bytevec*)&key.data, key.size, NULL, FALSE,
&cursor->val, (bytevec*)&val.data, val.size, NULL, FALSE);
//TODO: Find some way to deal with ENOMEM here.
assert(r==0);
cursor->current_in_omt = FALSE;
}
if (cursor->prev_in_omt) {
toku_init_dbt(&cursor->prevkey)->flags = DB_DBT_REALLOC;
toku_init_dbt(&cursor->prevval)->flags = DB_DBT_REALLOC;
cursor->prev_in_omt = FALSE;
}
}
int toku_brt_cursor (BRT brt, BRT_CURSOR *cursorptr, int is_temporary_cursor) { int toku_brt_cursor (BRT brt, BRT_CURSOR *cursorptr, int is_temporary_cursor) {
BRT_CURSOR cursor = toku_malloc(sizeof *cursor); BRT_CURSOR cursor = toku_malloc(sizeof *cursor);
if (cursor == 0) if (cursor == 0)
...@@ -3113,21 +3152,29 @@ int toku_brt_cursor (BRT brt, BRT_CURSOR *cursorptr, int is_temporary_cursor) { ...@@ -3113,21 +3152,29 @@ int toku_brt_cursor (BRT brt, BRT_CURSOR *cursorptr, int is_temporary_cursor) {
toku_init_dbt(&cursor->val); cursor->val.flags = DB_DBT_REALLOC; toku_init_dbt(&cursor->val); cursor->val.flags = DB_DBT_REALLOC;
toku_init_dbt(&cursor->prevkey); cursor->prevkey.flags = DB_DBT_REALLOC; toku_init_dbt(&cursor->prevkey); cursor->prevkey.flags = DB_DBT_REALLOC;
toku_init_dbt(&cursor->prevval); cursor->prevval.flags = DB_DBT_REALLOC; toku_init_dbt(&cursor->prevval); cursor->prevval.flags = DB_DBT_REALLOC;
cursor->current_in_omt = FALSE;
cursor->prev_in_omt = FALSE;
list_push(&brt->cursors, &cursor->cursors_link); list_push(&brt->cursors, &cursor->cursors_link);
cursor->is_temporary_cursor=is_temporary_cursor; cursor->is_temporary_cursor=is_temporary_cursor;
cursor->skey = cursor->sval = 0; cursor->skey = cursor->sval = 0;
int r = toku_omt_cursor_create(&cursor->omtcursor); int r = toku_omt_cursor_create(&cursor->omtcursor);
assert(r==0); assert(r==0);
toku_omt_cursor_set_invalidate_callback(cursor->omtcursor,
brt_cursor_invalidate_callback, cursor);
cursor->root_put_counter=0; cursor->root_put_counter=0;
*cursorptr = cursor; *cursorptr = cursor;
return 0; return 0;
} }
int toku_brt_cursor_close(BRT_CURSOR cursor) { int toku_brt_cursor_close(BRT_CURSOR cursor) {
if (!cursor->current_in_omt) {
dbt_cleanup(&cursor->key); dbt_cleanup(&cursor->key);
dbt_cleanup(&cursor->val); dbt_cleanup(&cursor->val);
}
if (!cursor->prev_in_omt) {
dbt_cleanup(&cursor->prevkey); dbt_cleanup(&cursor->prevkey);
dbt_cleanup(&cursor->prevval); dbt_cleanup(&cursor->prevval);
}
if (cursor->skey) toku_free(cursor->skey); if (cursor->skey) toku_free(cursor->skey);
if (cursor->sval) toku_free(cursor->sval); if (cursor->sval) toku_free(cursor->sval);
list_remove(&cursor->cursors_link); list_remove(&cursor->cursors_link);
...@@ -3154,6 +3201,7 @@ DBT *brt_cursor_peek_current_key(BRT_CURSOR cursor) ...@@ -3154,6 +3201,7 @@ DBT *brt_cursor_peek_current_key(BRT_CURSOR cursor)
// Effect: Return a pointer to a DBT for the current key. // Effect: Return a pointer to a DBT for the current key.
// Requires: The caller may not modify that DBT or the memory at which it points. // Requires: The caller may not modify that DBT or the memory at which it points.
{ {
if (cursor->current_in_omt) load_dbts_from_omt(cursor, &cursor->key, NULL);
return &cursor->key; return &cursor->key;
} }
...@@ -3161,6 +3209,7 @@ DBT *brt_cursor_peek_current_val(BRT_CURSOR cursor) ...@@ -3161,6 +3209,7 @@ DBT *brt_cursor_peek_current_val(BRT_CURSOR cursor)
// Effect: Return a pointer to a DBT for the current val // Effect: Return a pointer to a DBT for the current val
// Requires: The caller may not modify that DBT or the memory at which it points. // Requires: The caller may not modify that DBT or the memory at which it points.
{ {
if (cursor->current_in_omt) load_dbts_from_omt(cursor, NULL, &cursor->val);
return &cursor->val; return &cursor->val;
} }
...@@ -3183,6 +3232,8 @@ static inline int brt_cursor_copyout(BRT_CURSOR cursor, DBT *key, DBT *val) { ...@@ -3183,6 +3232,8 @@ static inline int brt_cursor_copyout(BRT_CURSOR cursor, DBT *key, DBT *val) {
int r = 0; int r = 0;
void** key_staticp = cursor->is_temporary_cursor ? &cursor->brt->skey : &cursor->skey; void** key_staticp = cursor->is_temporary_cursor ? &cursor->brt->skey : &cursor->skey;
void** val_staticp = cursor->is_temporary_cursor ? &cursor->brt->sval : &cursor->sval; void** val_staticp = cursor->is_temporary_cursor ? &cursor->brt->sval : &cursor->sval;
if (cursor->current_in_omt) load_dbts_from_omt(cursor, &cursor->key, &cursor->val);
r = toku_dbt_set_two_values(key, (bytevec*)&cursor->key.data, cursor->key.size, key_staticp, FALSE, r = toku_dbt_set_two_values(key, (bytevec*)&cursor->key.data, cursor->key.size, key_staticp, FALSE,
val, (bytevec*)&cursor->val.data, cursor->val.size, val_staticp, FALSE); val, (bytevec*)&cursor->val.data, cursor->val.size, val_staticp, FALSE);
return r; return r;
...@@ -3243,6 +3294,7 @@ static int brt_cursor_current(BRT_CURSOR cursor, int op, DBT *outkey, DBT *outva ...@@ -3243,6 +3294,7 @@ static int brt_cursor_current(BRT_CURSOR cursor, int op, DBT *outkey, DBT *outva
DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_REALLOC; DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_REALLOC;
DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_REALLOC; DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_REALLOC;
brt_cursor_invalidate_callback(cursor->omtcursor, cursor);
brt_search_t search; brt_search_init(&search, brt_cursor_compare_set, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt); brt_search_t search; brt_search_init(&search, brt_cursor_compare_set, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt);
r = toku_brt_search(cursor->brt, &search, &newkey, &newval, logger, cursor->omtcursor, &cursor->root_put_counter); r = toku_brt_search(cursor->brt, &search, &newkey, &newval, logger, cursor->omtcursor, &cursor->root_put_counter);
if (r != 0 || compare_kv_xy(cursor->brt, &cursor->key, &cursor->val, &newkey, &newval) != 0) if (r != 0 || compare_kv_xy(cursor->brt, &cursor->key, &cursor->val, &newkey, &newval) != 0)
...@@ -3274,6 +3326,7 @@ static int brt_cursor_search(BRT_CURSOR cursor, brt_search_t *search, DBT *outke ...@@ -3274,6 +3326,7 @@ static int brt_cursor_search(BRT_CURSOR cursor, brt_search_t *search, DBT *outke
assert(cursor->prevkey.flags == DB_DBT_REALLOC); assert(cursor->prevkey.flags == DB_DBT_REALLOC);
assert(cursor->prevval.flags == DB_DBT_REALLOC); assert(cursor->prevval.flags == DB_DBT_REALLOC);
brt_cursor_invalidate_callback(cursor->omtcursor, cursor);
int r = toku_brt_search(cursor->brt, search, &cursor->prevkey, &cursor->prevval, logger, cursor->omtcursor, &cursor->root_put_counter); int r = toku_brt_search(cursor->brt, search, &cursor->prevkey, &cursor->prevval, logger, cursor->omtcursor, &cursor->root_put_counter);
if (r == 0) { if (r == 0) {
swap_cursor_dbts(cursor); swap_cursor_dbts(cursor);
...@@ -3287,6 +3340,7 @@ static int brt_cursor_search_eq_kv_xy(BRT_CURSOR cursor, brt_search_t *search, D ...@@ -3287,6 +3340,7 @@ static int brt_cursor_search_eq_kv_xy(BRT_CURSOR cursor, brt_search_t *search, D
assert(cursor->prevkey.flags == DB_DBT_REALLOC); assert(cursor->prevkey.flags == DB_DBT_REALLOC);
assert(cursor->prevval.flags == DB_DBT_REALLOC); assert(cursor->prevval.flags == DB_DBT_REALLOC);
brt_cursor_invalidate_callback(cursor->omtcursor, cursor);
int r = toku_brt_search(cursor->brt, search, &cursor->prevkey, &cursor->prevval, logger, cursor->omtcursor, &cursor->root_put_counter); int r = toku_brt_search(cursor->brt, search, &cursor->prevkey, &cursor->prevval, logger, cursor->omtcursor, &cursor->root_put_counter);
if (r == 0) { if (r == 0) {
if (compare_kv_xy(cursor->brt, search->k, search->v, &cursor->prevkey, &cursor->prevval) == 0) { if (compare_kv_xy(cursor->brt, search->k, search->v, &cursor->prevkey, &cursor->prevval) == 0) {
...@@ -3304,6 +3358,7 @@ static int brt_cursor_search_eq_k_x(BRT_CURSOR cursor, brt_search_t *search, DBT ...@@ -3304,6 +3358,7 @@ static int brt_cursor_search_eq_k_x(BRT_CURSOR cursor, brt_search_t *search, DBT
assert(cursor->prevkey.flags == DB_DBT_REALLOC); assert(cursor->prevkey.flags == DB_DBT_REALLOC);
assert(cursor->prevval.flags == DB_DBT_REALLOC); assert(cursor->prevval.flags == DB_DBT_REALLOC);
brt_cursor_invalidate_callback(cursor->omtcursor, cursor);
int r = toku_brt_search(cursor->brt, search, &cursor->prevkey, &cursor->prevval, logger, cursor->omtcursor, &cursor->root_put_counter); int r = toku_brt_search(cursor->brt, search, &cursor->prevkey, &cursor->prevval, logger, cursor->omtcursor, &cursor->root_put_counter);
if (r == 0) { if (r == 0) {
if (compare_k_x(cursor->brt, search->k, &cursor->prevkey) == 0) { if (compare_k_x(cursor->brt, search->k, &cursor->prevkey) == 0) {
...@@ -3335,6 +3390,16 @@ static int brt_cursor_compare_next(brt_search_t *search, DBT *x, DBT *y) { ...@@ -3335,6 +3390,16 @@ static int brt_cursor_compare_next(brt_search_t *search, DBT *x, DBT *y) {
return compare_kv_xy(brt, search->k, search->v, x, y) < 0; /* return min xy: kv < xy */ return compare_kv_xy(brt, search->k, search->v, x, y) < 0; /* return min xy: kv < xy */
} }
static void save_omtcursor_current_in_prev(BRT_CURSOR cursor) {
if (!cursor->prev_in_omt) {
//Free the data.
if (cursor->prevkey.data) toku_free(cursor->prevkey.data);
if (cursor->prevval.data) toku_free(cursor->prevval.data);
cursor->prev_in_omt = TRUE;
}
load_dbts_from_omt(cursor, &cursor->prevkey, &cursor->prevval);
}
static int brt_cursor_next_shortcut (BRT_CURSOR cursor, DBT *outkey, DBT *outval) static int brt_cursor_next_shortcut (BRT_CURSOR cursor, DBT *outkey, DBT *outval)
// Effect: If possible, increment the cursor and return the key-value pair // Effect: If possible, increment the cursor and return the key-value pair
// (i.e., the next one from what the cursor pointed to before.) // (i.e., the next one from what the cursor pointed to before.)
...@@ -3350,24 +3415,29 @@ static int brt_cursor_next_shortcut (BRT_CURSOR cursor, DBT *outkey, DBT *outval ...@@ -3350,24 +3415,29 @@ static int brt_cursor_next_shortcut (BRT_CURSOR cursor, DBT *outkey, DBT *outval
if (h_counter != cursor->root_put_counter) return -1; if (h_counter != cursor->root_put_counter) return -1;
} }
OMTVALUE le; OMTVALUE le;
get_next:; //Save current value in prev.
int r = toku_omt_cursor_next(cursor->omtcursor, &le); save_omtcursor_current_in_prev(cursor);
if (r==0) {
if (le_is_provdel(le)) goto get_next;
assert(cursor->prevkey.flags == DB_DBT_REALLOC);
assert(cursor->prevval.flags == DB_DBT_REALLOC);
bytevec keyb = le_latest_key(le); u_int32_t index;
bytevec valb = le_latest_val(le); u_int32_t size = toku_omt_size(toku_omt_cursor_get_omt(cursor->omtcursor));
r = toku_dbt_set_two_values(&cursor->prevkey, &keyb, le_latest_keylen(le), NULL, FALSE, int r = toku_omt_cursor_current_index(cursor->omtcursor, &index);
&cursor->prevval, &valb, le_latest_vallen(le), NULL, FALSE); assert(r==0);
while (index+1 < size) {
r = toku_omt_cursor_next(cursor->omtcursor, &le);
assert(r==0); assert(r==0);
index++;
if (le_is_provdel(le)) continue;
swap_cursor_dbts(cursor); //Free old current if necessary.
if (!cursor->current_in_omt) {
if (cursor->key.data) toku_free(cursor->key.data);
if (cursor->val.data) toku_free(cursor->val.data);
cursor->current_in_omt = TRUE;
}
return brt_cursor_copyout(cursor, outkey, outval); return brt_cursor_copyout(cursor, outkey, outval);
} }
brt_cursor_invalidate_callback(cursor->omtcursor, cursor);
} }
return -1; return -1;
} }
...@@ -3505,24 +3575,28 @@ static int brt_cursor_prev_shortcut (BRT_CURSOR cursor, DBT *outkey, DBT *outval ...@@ -3505,24 +3575,28 @@ static int brt_cursor_prev_shortcut (BRT_CURSOR cursor, DBT *outkey, DBT *outval
if (h_counter != cursor->root_put_counter) return -1; if (h_counter != cursor->root_put_counter) return -1;
} }
OMTVALUE le; OMTVALUE le;
get_prev:; //Save current value in prev.
int r = toku_omt_cursor_prev(cursor->omtcursor, &le); save_omtcursor_current_in_prev(cursor);
if (r==0) {
if (le_is_provdel(le)) goto get_prev;
assert(cursor->prevkey.flags == DB_DBT_REALLOC);
assert(cursor->prevval.flags == DB_DBT_REALLOC);
bytevec keyb = le_latest_key(le); u_int32_t index = 0;
bytevec valb = le_latest_val(le); int r = toku_omt_cursor_current_index(cursor->omtcursor, &index);
r = toku_dbt_set_two_values(&cursor->prevkey, &keyb, le_latest_keylen(le), NULL, FALSE, assert(r==0);
&cursor->prevval, &valb, le_latest_vallen(le), NULL, FALSE); while (index>0) {
r = toku_omt_cursor_prev(cursor->omtcursor, &le);
assert(r==0); assert(r==0);
index--;
if (le_is_provdel(le)) continue;
swap_cursor_dbts(cursor); //Free old current if necessary.
if (!cursor->current_in_omt) {
if (cursor->key.data) toku_free(cursor->key.data);
if (cursor->val.data) toku_free(cursor->val.data);
cursor->current_in_omt = TRUE;
}
return brt_cursor_copyout(cursor, outkey, outval); return brt_cursor_copyout(cursor, outkey, outval);
} }
brt_cursor_invalidate_callback(cursor->omtcursor, cursor);
} }
return -1; return -1;
} }
......
...@@ -628,7 +628,7 @@ int toku_omt_cursor_prev (OMTCURSOR c, OMTVALUE *v) { ...@@ -628,7 +628,7 @@ int toku_omt_cursor_prev (OMTCURSOR c, OMTVALUE *v) {
} }
c->index--; c->index--;
int r = toku_omt_fetch(c->omt, c->index, v, NULL); int r = toku_omt_fetch(c->omt, c->index, v, NULL);
if (r!=0) toku_omt_cursor_invalidate(c); assert(r==0);
return r; return r;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment