Commit 0cf3ded1 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

merge the brt prefetch changes to the main trunk. closes #1183

git-svn-id: file:///svn/toku/tokudb@9135 c7de825b-a66e-492c-adef-691d508d4ae1
parent eefff9b2
...@@ -233,6 +233,7 @@ struct brt_cursor { ...@@ -233,6 +233,7 @@ struct brt_cursor {
struct list cursors_link; struct list cursors_link;
BRT brt; BRT brt;
BOOL current_in_omt, prev_in_omt; BOOL current_in_omt, prev_in_omt;
BOOL prefetching;
DBT key, val; // The key-value pair that the cursor currently points to DBT key, val; // The key-value pair that the cursor currently points to
DBT prevkey, prevval; // The key-value pair that the cursor pointed to previously. (E.g., when we do a DB_NEXT) DBT prevkey, prevval; // The key-value pair that the cursor pointed to previously. (E.g., when we do a DB_NEXT)
int is_temporary_cursor; // If it is a temporary cursor then use the following skey and sval to return tokudb-managed values in dbts. Otherwise use the brt's skey and skval. int is_temporary_cursor; // If it is a temporary cursor then use the following skey and sval to return tokudb-managed values in dbts. Otherwise use the brt's skey and skval.
......
...@@ -3216,6 +3216,7 @@ int toku_brt_cursor (BRT brt, BRT_CURSOR *cursorptr, int is_temporary_cursor) { ...@@ -3216,6 +3216,7 @@ int toku_brt_cursor (BRT brt, BRT_CURSOR *cursorptr, int is_temporary_cursor) {
toku_init_dbt(&cursor->prevval); cursor->prevval.flags = DB_DBT_REALLOC; toku_init_dbt(&cursor->prevval); cursor->prevval.flags = DB_DBT_REALLOC;
cursor->current_in_omt = FALSE; cursor->current_in_omt = FALSE;
cursor->prev_in_omt = FALSE; cursor->prev_in_omt = FALSE;
cursor->prefetching = FALSE;
list_push(&brt->cursors, &cursor->cursors_link); list_push(&brt->cursors, &cursor->cursors_link);
cursor->is_temporary_cursor=is_temporary_cursor; cursor->is_temporary_cursor=is_temporary_cursor;
cursor->skey = cursor->sval = 0; cursor->skey = cursor->sval = 0;
...@@ -3246,6 +3247,14 @@ int toku_brt_cursor_close(BRT_CURSOR cursor) { ...@@ -3246,6 +3247,14 @@ int toku_brt_cursor_close(BRT_CURSOR cursor) {
return 0; return 0;
} }
static inline void brt_cursor_set_prefetching(BRT_CURSOR cursor) {
cursor->prefetching = TRUE;
}
static inline BOOL brt_cursor_prefetching(BRT_CURSOR cursor) {
return cursor->prefetching;
}
static void save_omtcursor_current_in_prev(BRT_CURSOR cursor) { static void save_omtcursor_current_in_prev(BRT_CURSOR cursor) {
if (!cursor->prev_in_omt) { if (!cursor->prev_in_omt) {
//Free the data. //Free the data.
...@@ -3325,7 +3334,7 @@ static int heaviside_from_search_t (OMTVALUE lev, void *extra) { ...@@ -3325,7 +3334,7 @@ static int heaviside_from_search_t (OMTVALUE lev, void *extra) {
abort(); return 0; abort(); return 0;
} }
static int brt_search_leaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, enum reactivity *re, TOKULOGGER logger, OMTCURSOR omtcursor) { static int brt_search_leaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, enum reactivity *re, TOKULOGGER logger, BRT_CURSOR brtcursor) {
// Now we have to convert from brt_search_t to the heaviside function with a direction. What a pain... // Now we have to convert from brt_search_t to the heaviside function with a direction. What a pain...
*re = get_leaf_reactivity(node); // searching doesn't change the reactivity, so we can calculate it here. *re = get_leaf_reactivity(node); // searching doesn't change the reactivity, so we can calculate it here.
...@@ -3343,7 +3352,7 @@ static int brt_search_leaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT ...@@ -3343,7 +3352,7 @@ static int brt_search_leaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT
heaviside_from_search_t, heaviside_from_search_t,
search, search,
direction, direction,
&datav, &idx, omtcursor); &datav, &idx, brtcursor->omtcursor);
if (r!=0) return r; if (r!=0) return r;
LEAFENTRY le = datav; LEAFENTRY le = datav;
...@@ -3385,7 +3394,7 @@ static int brt_search_leaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT ...@@ -3385,7 +3394,7 @@ static int brt_search_leaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT
break; break;
} }
if (idx>=toku_omt_size(node->u.l.buffer)) continue; if (idx>=toku_omt_size(node->u.l.buffer)) continue;
r = toku_omt_fetch(node->u.l.buffer, idx, &datav, omtcursor); r = toku_omt_fetch(node->u.l.buffer, idx, &datav, brtcursor->omtcursor);
assert(r==0); // we just validated the index assert(r==0); // we just validated the index
le = datav; le = datav;
if (!le_is_provdel(le)) goto got_a_good_value; if (!le_is_provdel(le)) goto got_a_good_value;
...@@ -3405,10 +3414,35 @@ got_a_good_value: ...@@ -3405,10 +3414,35 @@ got_a_good_value:
} }
static int static int
brt_search_node (BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, enum reactivity *re, TOKULOGGER logger, OMTCURSOR omtcursor); brt_search_node (BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, enum reactivity *re, TOKULOGGER logger, BRT_CURSOR brtcursor);
// the number of nodes to prefetch
#define TOKU_DO_PREFETCH 1
#if TOKU_DO_PREFETCH
static void
brt_node_maybe_prefetch(BRT brt, BRTNODE node, int childnum, BRT_CURSOR brtcursor) {
if (0) printf("%s:%d node %p height %d child %d of %d\n", __FUNCTION__, __LINE__, node, node->height, childnum, node->u.n.n_children);
// if the node is the parent of leaves and the cursor is prefetching
// then prefetch the next child if there is one
if (node->height == 1 && brt_cursor_prefetching(brtcursor)) {
int i;
for (i=0; i<TOKU_DO_PREFETCH; i++) {
int nextchildnum = childnum+i+1;
if (nextchildnum >= node->u.n.n_children)
break;
BLOCKNUM nextchildblocknum = BNC_BLOCKNUM(node, nextchildnum);
u_int32_t nextfullhash = compute_child_fullhash(brt->cf, node, nextchildnum);
toku_cachefile_prefetch(brt->cf, nextchildblocknum, nextfullhash,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt->h);
}
}
}
#endif
/* search in a node's child */ /* search in a node's child */
static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, DBT *newkey, DBT *newval, enum reactivity *parent_re, TOKULOGGER logger, OMTCURSOR omtcursor, BOOL *did_react) static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, DBT *newkey, DBT *newval, enum reactivity *parent_re, TOKULOGGER logger, BRT_CURSOR brtcursor, BOOL *did_react)
// Effect: Search in a node's child. // Effect: Search in a node's child.
// If we change the shape, set *did_react = TRUE. Else set *did_react = FALSE. // If we change the shape, set *did_react = TRUE. Else set *did_react = FALSE.
{ {
...@@ -3436,10 +3470,17 @@ static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *s ...@@ -3436,10 +3470,17 @@ static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *s
verify_local_fingerprint_nonleaf(node); verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(childnode); verify_local_fingerprint_nonleaf(childnode);
enum reactivity child_re = RE_STABLE; enum reactivity child_re = RE_STABLE;
int r = brt_search_node(brt, childnode, search, newkey, newval, &child_re, logger, omtcursor); int r = brt_search_node(brt, childnode, search, newkey, newval, &child_re, logger, brtcursor);
// Even if r is reactive, we want to handle the maybe reactive child. // Even if r is reactive, we want to handle the maybe reactive child.
verify_local_fingerprint_nonleaf(node); verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(childnode); verify_local_fingerprint_nonleaf(childnode);
#if TOKU_DO_PREFETCH
// maybe prefetch the next child
if (r == 0)
brt_node_maybe_prefetch(brt, node, childnum, brtcursor);
#endif
{ {
int rr = toku_unpin_brtnode(brt, childnode); // unpin the childnode before handling the reactive child (because that may make the childnode disappear.) int rr = toku_unpin_brtnode(brt, childnode); // unpin the childnode before handling the reactive child (because that may make the childnode disappear.)
if (rr!=0) r = rr; if (rr!=0) r = rr;
...@@ -3458,14 +3499,13 @@ static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *s ...@@ -3458,14 +3499,13 @@ static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *s
return r; return r;
} }
static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, enum reactivity *re, TOKULOGGER logger, OMTCURSOR omtcursor) { static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, enum reactivity *re, TOKULOGGER logger, BRT_CURSOR brtcursor) {
int count=0; int count=0;
again: again:
count++; count++;
verify_local_fingerprint_nonleaf(node); verify_local_fingerprint_nonleaf(node);
{ {
int c; int c;
/* binary search is overkill for a small array */ /* binary search is overkill for a small array */
...@@ -3484,7 +3524,7 @@ static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, ...@@ -3484,7 +3524,7 @@ static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search,
brt->flags & TOKU_DB_DUPSORT ? toku_fill_dbt(&pivotval, kv_pair_val(pivot), kv_pair_vallen(pivot)): 0)) { brt->flags & TOKU_DB_DUPSORT ? toku_fill_dbt(&pivotval, kv_pair_val(pivot), kv_pair_vallen(pivot)): 0)) {
BOOL did_change_shape = FALSE; BOOL did_change_shape = FALSE;
verify_local_fingerprint_nonleaf(node); verify_local_fingerprint_nonleaf(node);
int r = brt_search_child(brt, node, child[c], search, newkey, newval, re, logger, omtcursor, &did_change_shape); int r = brt_search_child(brt, node, child[c], search, newkey, newval, re, logger, brtcursor, &did_change_shape);
assert(r != EAGAIN); assert(r != EAGAIN);
if (r == 0) return r; if (r == 0) return r;
if (did_change_shape) goto again; if (did_change_shape) goto again;
...@@ -3494,23 +3534,23 @@ static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, ...@@ -3494,23 +3534,23 @@ static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search,
/* check the first (left) or last (right) node if nothing has been found */ /* check the first (left) or last (right) node if nothing has been found */
BOOL did_change_shape; // ignore this BOOL did_change_shape; // ignore this
verify_local_fingerprint_nonleaf(node); verify_local_fingerprint_nonleaf(node);
return brt_search_child(brt, node, child[c], search, newkey, newval, re, logger, omtcursor, &did_change_shape); return brt_search_child(brt, node, child[c], search, newkey, newval, re, logger, brtcursor, &did_change_shape);
} }
} }
static int static int
brt_search_node (BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, enum reactivity *re, TOKULOGGER logger, OMTCURSOR omtcursor) brt_search_node (BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, enum reactivity *re, TOKULOGGER logger, BRT_CURSOR brtcursor)
{ {
verify_local_fingerprint_nonleaf(node); verify_local_fingerprint_nonleaf(node);
if (node->height > 0) if (node->height > 0)
return brt_search_nonleaf_node(brt, node, search, newkey, newval, re, logger, omtcursor); return brt_search_nonleaf_node(brt, node, search, newkey, newval, re, logger, brtcursor);
else { else {
return brt_search_leaf_node(brt, node, search, newkey, newval, re, logger, omtcursor); return brt_search_leaf_node(brt, node, search, newkey, newval, re, logger, brtcursor);
} }
} }
static int static int
toku_brt_search (BRT brt, brt_search_t *search, DBT *newkey, DBT *newval, TOKULOGGER logger, OMTCURSOR omtcursor, u_int64_t *root_put_counter) toku_brt_search (BRT brt, brt_search_t *search, DBT *newkey, DBT *newval, TOKULOGGER logger, BRT_CURSOR brtcursor, u_int64_t *root_put_counter)
// Effect: Perform a search. Associate cursor with a leaf if possible. // Effect: Perform a search. Associate cursor with a leaf if possible.
// All searches are performed through this function. // All searches are performed through this function.
{ {
...@@ -3546,7 +3586,7 @@ toku_brt_search (BRT brt, brt_search_t *search, DBT *newkey, DBT *newval, TOKULO ...@@ -3546,7 +3586,7 @@ toku_brt_search (BRT brt, brt_search_t *search, DBT *newkey, DBT *newval, TOKULO
{ {
enum reactivity re = RE_STABLE; enum reactivity re = RE_STABLE;
//static int counter = 0; counter++; //static int counter = 0; counter++;
r = brt_search_node(brt, node, search, newkey, newval, &re, logger, omtcursor); r = brt_search_node(brt, node, search, newkey, newval, &re, logger, brtcursor);
if (r!=0) goto return_r; if (r!=0) goto return_r;
r = brt_handle_maybe_reactive_child_at_root(brt, rootp, &node, re, logger); r = brt_handle_maybe_reactive_child_at_root(brt, rootp, &node, re, logger);
...@@ -3565,7 +3605,7 @@ static int brt_cursor_search(BRT_CURSOR cursor, brt_search_t *search, DBT *outke ...@@ -3565,7 +3605,7 @@ static int brt_cursor_search(BRT_CURSOR cursor, brt_search_t *search, DBT *outke
assert(cursor->prevval.flags == DB_DBT_REALLOC); assert(cursor->prevval.flags == DB_DBT_REALLOC);
brt_cursor_invalidate_callback(cursor->omtcursor, cursor); brt_cursor_invalidate_callback(cursor->omtcursor, cursor);
int r = toku_brt_search(cursor->brt, search, &cursor->prevkey, &cursor->prevval, logger, cursor->omtcursor, &cursor->root_put_counter); int r = toku_brt_search(cursor->brt, search, &cursor->prevkey, &cursor->prevval, logger, cursor, &cursor->root_put_counter);
if (r == 0) { if (r == 0) {
swap_cursor_dbts(cursor); swap_cursor_dbts(cursor);
r = brt_cursor_copyout(cursor, outkey, outval); r = brt_cursor_copyout(cursor, outkey, outval);
...@@ -3599,7 +3639,7 @@ static int brt_cursor_search_eq_kv_xy(BRT_CURSOR cursor, brt_search_t *search, D ...@@ -3599,7 +3639,7 @@ static int brt_cursor_search_eq_kv_xy(BRT_CURSOR cursor, brt_search_t *search, D
assert(cursor->prevval.flags == DB_DBT_REALLOC); assert(cursor->prevval.flags == DB_DBT_REALLOC);
brt_cursor_invalidate_callback(cursor->omtcursor, cursor); brt_cursor_invalidate_callback(cursor->omtcursor, cursor);
int r = toku_brt_search(cursor->brt, search, &cursor->prevkey, &cursor->prevval, logger, cursor->omtcursor, &cursor->root_put_counter); int r = toku_brt_search(cursor->brt, search, &cursor->prevkey, &cursor->prevval, logger, cursor, &cursor->root_put_counter);
if (r == 0) { if (r == 0) {
if (compare_kv_xy(cursor->brt, search->k, search->v, &cursor->prevkey, &cursor->prevval) == 0) { if (compare_kv_xy(cursor->brt, search->k, search->v, &cursor->prevkey, &cursor->prevval) == 0) {
swap_cursor_dbts(cursor); swap_cursor_dbts(cursor);
...@@ -3626,7 +3666,7 @@ static int brt_cursor_current(BRT_CURSOR cursor, int op, DBT *outkey, DBT *outva ...@@ -3626,7 +3666,7 @@ static int brt_cursor_current(BRT_CURSOR cursor, int op, DBT *outkey, DBT *outva
brt_cursor_invalidate_callback(cursor->omtcursor, cursor); brt_cursor_invalidate_callback(cursor->omtcursor, cursor);
brt_search_t search; brt_search_init(&search, brt_cursor_compare_set, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt); brt_search_t search; brt_search_init(&search, brt_cursor_compare_set, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt);
r = toku_brt_search(cursor->brt, &search, &newkey, &newval, logger, cursor->omtcursor, &cursor->root_put_counter); r = toku_brt_search(cursor->brt, &search, &newkey, &newval, logger, cursor, &cursor->root_put_counter);
if (r != 0 || compare_kv_xy(cursor->brt, &cursor->key, &cursor->val, &newkey, &newval) != 0) if (r != 0 || compare_kv_xy(cursor->brt, &cursor->key, &cursor->val, &newkey, &newval) != 0)
r = DB_KEYEMPTY; r = DB_KEYEMPTY;
dbt_cleanup(&newkey); dbt_cleanup(&newkey);
...@@ -3728,7 +3768,7 @@ static int brt_cursor_search_eq_k_x(BRT_CURSOR cursor, brt_search_t *search, DBT ...@@ -3728,7 +3768,7 @@ static int brt_cursor_search_eq_k_x(BRT_CURSOR cursor, brt_search_t *search, DBT
assert(cursor->prevval.flags == DB_DBT_REALLOC); assert(cursor->prevval.flags == DB_DBT_REALLOC);
brt_cursor_invalidate_callback(cursor->omtcursor, cursor); brt_cursor_invalidate_callback(cursor->omtcursor, cursor);
int r = toku_brt_search(cursor->brt, search, &cursor->prevkey, &cursor->prevval, logger, cursor->omtcursor, &cursor->root_put_counter); int r = toku_brt_search(cursor->brt, search, &cursor->prevkey, &cursor->prevval, logger, cursor, &cursor->root_put_counter);
if (r == 0) { if (r == 0) {
if (compare_k_x(cursor->brt, search->k, &cursor->prevkey) == 0) { if (compare_k_x(cursor->brt, search->k, &cursor->prevkey) == 0) {
swap_cursor_dbts(cursor); swap_cursor_dbts(cursor);
...@@ -3862,18 +3902,24 @@ int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *key, DBT *val, int get_flags, T ...@@ -3862,18 +3902,24 @@ int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *key, DBT *val, int get_flags, T
r = brt_cursor_first(cursor, key, val, logger); r = brt_cursor_first(cursor, key, val, logger);
else else
r = brt_cursor_next(cursor, key, val, logger); r = brt_cursor_next(cursor, key, val, logger);
if (r == 0)
brt_cursor_set_prefetching(cursor);
break; break;
case DB_NEXT_DUP: case DB_NEXT_DUP:
if (brt_cursor_not_set(cursor)) if (brt_cursor_not_set(cursor))
r = EINVAL; r = EINVAL;
else else
r = brt_cursor_next_dup(cursor, key, val, logger); r = brt_cursor_next_dup(cursor, key, val, logger);
if (r == 0)
brt_cursor_set_prefetching(cursor);
break; break;
case DB_NEXT_NODUP: case DB_NEXT_NODUP:
if (brt_cursor_not_set(cursor)) if (brt_cursor_not_set(cursor))
r = brt_cursor_first(cursor, key, val, logger); r = brt_cursor_first(cursor, key, val, logger);
else else
r = brt_cursor_next_nodup(cursor, key, val, logger); r = brt_cursor_next_nodup(cursor, key, val, logger);
if (r == 0)
brt_cursor_set_prefetching(cursor);
break; break;
case DB_PREV: case DB_PREV:
if (brt_cursor_not_set(cursor)) if (brt_cursor_not_set(cursor))
......
...@@ -236,23 +236,30 @@ split_fields (char *line, char *fields[], int maxfields) { ...@@ -236,23 +236,30 @@ split_fields (char *line, char *fields[], int maxfields) {
return i; return i;
} }
static int
usage(const char *arg0) {
printf("Usage: %s [--nodata] [--interactive] brtfilename\n", arg0);
return 1;
}
int int
main (int argc, const char *argv[]) { main (int argc, const char *argv[]) {
const char *arg0 = argv[0]; const char *arg0 = argv[0];
static int interactive = 0; int interactive = 0;
argc--; argv++; argc--; argv++;
while (argc>1) { while (argc>0) {
if (strcmp(argv[0], "--nodata")==0) { if (strcmp(argv[0], "--nodata") == 0) {
dump_data = 0; dump_data = 0;
} else if (strcmp(argv[0], "--interactive") == 0) { } else if (strcmp(argv[0], "--interactive") == 0) {
interactive = 1; interactive = 1;
} else { } else if (strcmp(argv[0], "--help") == 0) {
printf("Usage: %s [--nodata] brtfilename\n", arg0); return usage(arg0);
exit(1); } else
} break;
argc--; argv++; argc--; argv++;
} }
assert(argc==1); if (argc != 1) return usage(arg0);
const char *n = argv[0]; const char *n = argv[0];
int f = open(n, O_RDONLY + O_BINARY); assert(f>=0); int f = open(n, O_RDONLY + O_BINARY); assert(f>=0);
struct brt_header *h; struct brt_header *h;
......
...@@ -578,6 +578,7 @@ static int cachetable_fetch_pair(CACHETABLE ct, CACHEFILE cf, PAIR p) { ...@@ -578,6 +578,7 @@ static int cachetable_fetch_pair(CACHETABLE ct, CACHEFILE cf, PAIR p) {
} }
p->state = CTPAIR_IDLE; p->state = CTPAIR_IDLE;
ctpair_write_unlock(&p->rwlock); ctpair_write_unlock(&p->rwlock);
if (0) printf("%s:%d %"PRId64" complete\n", __FUNCTION__, __LINE__, key.b);
return 0; return 0;
} }
...@@ -922,6 +923,7 @@ int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash, ...@@ -922,6 +923,7 @@ int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
CACHETABLE_FLUSH_CALLBACK flush_callback, CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_FETCH_CALLBACK fetch_callback, CACHETABLE_FETCH_CALLBACK fetch_callback,
void *extraargs) { void *extraargs) {
if (0) printf("%s:%d %"PRId64"\n", __FUNCTION__, __LINE__, key.b);
CACHETABLE ct = cf->cachetable; CACHETABLE ct = cf->cachetable;
cachetable_lock(ct); cachetable_lock(ct);
// lookup // lookup
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment