Commit b5ef4b0e authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Simplified splitting code (don't split fifo. Instead push everything to the...

Simplified splitting code (don't split fifo.  Instead push everything to the child.)  Tests run.  Addresses #1195.

git-svn-id: file:///svn/toku/tokudb.1195@7453 c7de825b-a66e-492c-adef-691d508d4ae1
parent 8195b25b
...@@ -1142,74 +1142,6 @@ static int brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, ...@@ -1142,74 +1142,6 @@ static int brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd,
return r; return r;
} }
int toku_brt_get_fd(BRT brt, int *fdp) {
*fdp = toku_cachefile_fd(brt->cf);
return 0;
}
int toku_brt_reopen(BRT brt, const char *fname, const char *fname_in_env, TOKUTXN txn) {
int r;
// create a new file
int fd = -1;
r = brt_open_file(brt, fname, fname_in_env, TRUE, txn, &fd);
if (r != 0) return r;
// set the cachefile
r = toku_cachefile_set_fd(brt->cf, fd, fname_in_env);
assert(r == 0);
brt->h = 0; // set_fd should close the header
toku_logger_log_fopen(txn, fname_in_env, toku_cachefile_filenum(brt->cf));
// init the tree header
r = toku_read_brt_header_and_store_in_cachefile(brt->cf, &brt->h);
if (r == -1) {
r = brt_alloc_init_header(brt, NULL, txn);
}
return r;
}
int toku_brt_remove_subdb(BRT brt, const char *dbname, u_int32_t flags) {
int i;
int found = -1;
assert(flags == 0);
assert(brt->h);
assert(brt->h->n_named_roots>=0);
for (i = 0; i < brt->h->n_named_roots; i++) {
if (strcmp(brt->h->names[i], dbname) == 0) {
found = i;
break;
}
}
if (found == -1) {
//Should not be possible.
return ENOENT;
}
//Free old db name
toku_free(brt->h->names[found]);
//TODO: Free Diskblocks including root
for (i = found + 1; i < brt->h->n_named_roots; i++) {
brt->h->names[i - 1] = brt->h->names[i];
brt->h->roots[i - 1] = brt->h->roots[i];
brt->h->root_hashes[i - 1] = brt->h->root_hashes[i];
}
brt->h->n_named_roots--;
brt->h->dirty = 1;
// Q: What if n_named_roots becomes 0? A: Don't do anything. an empty list of named roots is OK.
XREALLOC_N(brt->h->n_named_roots, brt->h->names);
XREALLOC_N(brt->h->n_named_roots, brt->h->roots);
XREALLOC_N(brt->h->n_named_roots, brt->h->root_hashes);
return 0;
}
int toku_brt_flush (BRT brt) {
return toku_cachefile_flush(brt->cf);
}
//strcmp(key,"hello387")==0; //strcmp(key,"hello387")==0;
static int push_something_simple(BRT brt, BRTNODE *nodep, CACHEKEY *rootp, BRT_CMD cmd, TOKULOGGER logger) { static int push_something_simple(BRT brt, BRTNODE *nodep, CACHEKEY *rootp, BRT_CMD cmd, TOKULOGGER logger) {
...@@ -1395,176 +1327,6 @@ static inline void brt_split_init(BRT_SPLIT *split) { ...@@ -1395,176 +1327,6 @@ static inline void brt_split_init(BRT_SPLIT *split) {
static int brt_search_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKULOGGER logger, OMTCURSOR); static int brt_search_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKULOGGER logger, OMTCURSOR);
BOOL toku_brt_cursor_uninitialized(BRT_CURSOR c) {
return brt_cursor_not_set(c);
}
DBT *brt_cursor_peek_prev_key(BRT_CURSOR cursor)
// Effect: Return a pointer to a DBT for the previous key.
// Requires: The caller may not modify that DBT or the memory at which it points.
{
return &cursor->prevkey;
}
DBT *brt_cursor_peek_prev_val(BRT_CURSOR cursor)
// Effect: Return a pointer to a DBT for the previous val
// Requires: The caller may not modify that DBT or the memory at which it points.
{
return &cursor->prevval;
}
void brt_cursor_peek_current(BRT_CURSOR cursor, const DBT **pkey, const DBT **pval)
// Effect: Retrieves a pointer to the DBTs for the current key and value.
// Requires: The caller may not modify the DBTs or the memory at which they points.
{
if (cursor->current_in_omt) load_dbts_from_omt(cursor, &cursor->key, &cursor->val);
*pkey = &cursor->key;
*pval = &cursor->val;
}
DBT *brt_cursor_peek_current_key(BRT_CURSOR cursor)
// Effect: Return a pointer to a DBT for the current key.
// Requires: The caller may not modify that DBT or the memory at which it points.
{
if (cursor->current_in_omt) load_dbts_from_omt(cursor, &cursor->key, NULL);
return &cursor->key;
}
DBT *brt_cursor_peek_current_val(BRT_CURSOR cursor)
// Effect: Return a pointer to a DBT for the current val
// Requires: The caller may not modify that DBT or the memory at which it points.
{
if (cursor->current_in_omt) load_dbts_from_omt(cursor, NULL, &cursor->val);
return &cursor->val;
}
int toku_brt_dbt_set(DBT* key, DBT* key_source) {
int r = toku_dbt_set_value(key, (bytevec*)&key_source->data, key_source->size, NULL, FALSE);
return r;
}
int toku_brt_cursor_dbts_set(BRT_CURSOR cursor,
DBT* key, DBT* key_source, BOOL key_disposable,
DBT* val, DBT* val_source, BOOL val_disposable) {
void** key_staticp = cursor->is_temporary_cursor ? &cursor->brt->skey : &cursor->skey;
void** val_staticp = cursor->is_temporary_cursor ? &cursor->brt->sval : &cursor->sval;
int r;
r = toku_dbt_set_two_values(key, (bytevec*)&key_source->data, key_source->size, key_staticp, key_disposable,
val, (bytevec*)&val_source->data, val_source->size, val_staticp, val_disposable);
return r;
}
int toku_brt_cursor_dbts_set_with_dat(BRT_CURSOR cursor, BRT pdb,
DBT* key, DBT* key_source, BOOL key_disposable,
DBT* val, DBT* val_source, BOOL val_disposable,
DBT* dat, DBT* dat_source, BOOL dat_disposable) {
void** key_staticp = cursor->is_temporary_cursor ? &cursor->brt->skey : &cursor->skey;
void** val_staticp = cursor->is_temporary_cursor ? &cursor->brt->sval : &cursor->sval;
void** dat_staticp = &pdb->sval;
int r;
r = toku_dbt_set_three_values(key, (bytevec*)&key_source->data, key_source->size, key_staticp, key_disposable,
val, (bytevec*)&val_source->data, val_source->size, val_staticp, val_disposable,
dat, (bytevec*)&dat_source->data, dat_source->size, dat_staticp, dat_disposable);
return r;
}
void brt_cursor_restore_state_from_prev(BRT_CURSOR cursor) {
toku_omt_cursor_invalidate(cursor->omtcursor);
swap_cursor_dbts(cursor);
}
int toku_brt_cursor_peek_prev(BRT_CURSOR cursor, DBT *outkey, DBT *outval) {
if (toku_omt_cursor_is_valid(cursor->omtcursor)) {
{
assert(cursor->brt->h);
u_int64_t h_counter = cursor->brt->h->root_put_counter;
if (h_counter != cursor->root_put_counter) return -1;
}
OMTVALUE le;
u_int32_t index = 0;
int r = toku_omt_cursor_current_index(cursor->omtcursor, &index);
assert(r==0);
OMT omt = toku_omt_cursor_get_omt(cursor->omtcursor);
get_prev:;
if (index>0) {
r = toku_omt_fetch(omt, --index, &le, NULL);
if (r==0) {
if (le_is_provdel(le)) goto get_prev;
toku_fill_dbt(outkey, le_latest_key(le), le_latest_keylen(le));
toku_fill_dbt(outval, le_latest_val(le), le_latest_vallen(le));
return 0;
}
}
}
return -1;
}
int toku_brt_cursor_peek_next(BRT_CURSOR cursor, DBT *outkey, DBT *outval) {
if (toku_omt_cursor_is_valid(cursor->omtcursor)) {
{
assert(cursor->brt->h);
u_int64_t h_counter = cursor->brt->h->root_put_counter;
if (h_counter != cursor->root_put_counter) return -1;
}
OMTVALUE le;
u_int32_t index = UINT32_MAX;
int r = toku_omt_cursor_current_index(cursor->omtcursor, &index);
assert(r==0);
OMT omt = toku_omt_cursor_get_omt(cursor->omtcursor);
get_next:;
if (++index<toku_omt_size(omt)) {
r = toku_omt_fetch(omt, index, &le, NULL);
if (r==0) {
if (le_is_provdel(le)) goto get_next;
toku_fill_dbt(outkey, le_latest_key(le), le_latest_keylen(le));
toku_fill_dbt(outval, le_latest_val(le), le_latest_vallen(le));
return 0;
}
}
}
return -1;
}
int toku_brt_cursor_after(BRT_CURSOR cursor, DBT *key, DBT *val, DBT *outkey, DBT *outval, TOKUTXN txn) {
TOKULOGGER logger = toku_txn_logger(txn);
brt_search_t search; brt_search_init(&search, brt_cursor_compare_next, BRT_SEARCH_LEFT, key, val, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
int toku_brt_cursor_before(BRT_CURSOR cursor, DBT *key, DBT *val, DBT *outkey, DBT *outval, TOKUTXN txn) {
TOKULOGGER logger = toku_txn_logger(txn);
brt_search_t search; brt_search_init(&search, brt_cursor_compare_prev, BRT_SEARCH_RIGHT, key, val, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_compare_heavi(brt_search_t *search, DBT *x, DBT *y) {
HEAVI_WRAPPER wrapper = search->context;
int r = wrapper->h(x, y, wrapper->extra_h);
// wrapper->r_h must have the same signus as the final chosen element.
// it is initialized to -1 or 1. 0's are closer to the min (max) that we
// want so once we hit 0 we keep it.
if (r==0) wrapper->r_h = 0;
return (search->direction&BRT_SEARCH_LEFT) ? r>=0 : r<=0;
}
//We pass in toku_dbt_fake to the search functions, since it will not pass the
//key(or val) to the heaviside function if key(or val) is NULL.
//It is not used for anything else,
//the actual 'extra' information for the heaviside function is inside the
//wrapper.
static const DBT __toku_dbt_fake;
static const DBT* const toku_dbt_fake = &__toku_dbt_fake;
int toku_brt_cursor_get_heavi (BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKUTXN txn, int direction, HEAVI_WRAPPER wrapper) {
TOKULOGGER logger = toku_txn_logger(txn);
brt_search_t search; brt_search_init(&search, brt_cursor_compare_heavi,
direction < 0 ? BRT_SEARCH_RIGHT : BRT_SEARCH_LEFT,
(DBT*)toku_dbt_fake,
cursor->brt->flags & TOKU_DB_DUPSORT ? (DBT*)toku_dbt_fake : NULL,
wrapper);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
int toku_brt_height_of_root(BRT brt, int *height) { int toku_brt_height_of_root(BRT brt, int *height) {
// for an open brt, return the current height. // for an open brt, return the current height.
int r; int r;
...@@ -1583,11 +1345,3 @@ int toku_brt_height_of_root(BRT brt, int *height) { ...@@ -1583,11 +1345,3 @@ int toku_brt_height_of_root(BRT brt, int *height) {
return 0; return 0;
} }
int toku_brt_get_cursor_count (BRT brt) {
int n = 0;
struct list *list;
for (list = brt->cursors.next; list != &brt->cursors; list = list->next)
n += 1;
return n;
}
...@@ -213,6 +213,10 @@ nonleaf_node_is_gorged (BRTNODE node) { ...@@ -213,6 +213,10 @@ nonleaf_node_is_gorged (BRTNODE node) {
static int static int
brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, enum reactivity *re, BOOL *did_io); brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, enum reactivity *re, BOOL *did_io);
static int
flush_this_child (BRT t, BRTNODE node, int childnum, TOKULOGGER logger, enum reactivity *child_re, BOOL *did_io);
int toku_brt_debug_mode = 0; int toku_brt_debug_mode = 0;
//#define SLOW //#define SLOW
...@@ -960,24 +964,10 @@ brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *spl ...@@ -960,24 +964,10 @@ brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *spl
return 0; return 0;
} }
static int
insert_to_buffer_in_nonleaf (BRTNODE node, int childnum, DBT *k, DBT *v, int type, TXNID xid)
{
unsigned int n_bytes_added = BRT_CMD_OVERHEAD + KEY_VALUE_OVERHEAD + k->size + v->size;
int r = toku_fifo_enq(BNC_BUFFER(node,childnum), k->data, k->size, v->data, v->size, type, xid);
if (r!=0) return r;
// printf("%s:%d fingerprint %08x -> ", __FILE__, __LINE__, node->local_fingerprint);
node->local_fingerprint += node->rand4fingerprint*toku_calc_fingerprint_cmd(type, xid, k->data, k->size, v->data, v->size);
// printf(" %08x\n", node->local_fingerprint);
BNC_NBYTESINBUF(node,childnum) += n_bytes_added;
node->u.n.n_bytes_in_buffers += n_bytes_added;
node->dirty = 1;
return 0;
}
/* NODE is a node with a child. /* NODE is a node with a child.
* childnum was split into two nodes childa, and childb. childa is the same as the original child. childb is a new child. * childnum was split into two nodes childa, and childb. childa is the same as the original child. childb is a new child.
* We must slide things around, & move things from the old table to the new tables. * We must slide things around, & move things from the old table to the new tables.
* Requires: the CHILDNUMth buffer of node is empty.
* We don't push anything down to children. We split the node, and things land wherever they land. * We don't push anything down to children. We split the node, and things land wherever they land.
* We must delete the old buffer (but the old child is already deleted.) * We must delete the old buffer (but the old child is already deleted.)
* On return, the new children are unpinned. * On return, the new children are unpinned.
...@@ -992,6 +982,7 @@ handle_split_of_child (BRT t, BRTNODE node, int childnum, ...@@ -992,6 +982,7 @@ handle_split_of_child (BRT t, BRTNODE node, int childnum,
assert(0 <= childnum && childnum < node->u.n.n_children); assert(0 <= childnum && childnum < node->u.n.n_children);
FIFO old_h = BNC_BUFFER(node,childnum); FIFO old_h = BNC_BUFFER(node,childnum);
int old_count = BNC_NBYTESINBUF(node, childnum); int old_count = BNC_NBYTESINBUF(node, childnum);
assert(old_count==0);
int cnum; int cnum;
int r; int r;
...@@ -1034,16 +1025,6 @@ handle_split_of_child (BRT t, BRTNODE node, int childnum, ...@@ -1034,16 +1025,6 @@ handle_split_of_child (BRT t, BRTNODE node, int childnum,
BNC_NBYTESINBUF(node, childnum+1) = 0; BNC_NBYTESINBUF(node, childnum+1) = 0;
// Remove all the cmds from the local fingerprint. Some may get added in again when we try to push to the child. // Remove all the cmds from the local fingerprint. Some may get added in again when we try to push to the child.
FIFO_ITERATE(old_h, skey, skeylen, sval, svallen, type, xid,
{
u_int32_t old_fingerprint = node->local_fingerprint;
u_int32_t new_fingerprint = old_fingerprint - node->rand4fingerprint*toku_calc_fingerprint_cmd(type, xid, skey, skeylen, sval, svallen);
if (t->txn_that_created != xid) {
r = toku_log_brtdeq(logger, &node->log_lsn, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum);
assert(r==0);
}
node->local_fingerprint = new_fingerprint;
});
//verify_local_fingerprint_nonleaf(node); //verify_local_fingerprint_nonleaf(node);
...@@ -1074,55 +1055,7 @@ handle_split_of_child (BRT t, BRTNODE node, int childnum, ...@@ -1074,55 +1055,7 @@ handle_split_of_child (BRT t, BRTNODE node, int childnum,
node->u.n.n_bytes_in_buffers -= old_count; /* By default, they are all removed. We might add them back in. */ node->u.n.n_bytes_in_buffers -= old_count; /* By default, they are all removed. We might add them back in. */
/* Keep pushing to the children, but not if the children would require a pushdown */ /* Keep pushing to the children, but not if the children would require a pushdown */
FIFO_ITERATE(old_h, skey, skeylen, sval, svallen, type, xid, {
DBT skd; DBT svd;
toku_fill_dbt(&skd, skey, skeylen);
toku_fill_dbt(&svd, sval, svallen);
//verify_local_fingerprint_nonleaf(childa); verify_local_fingerprint_nonleaf(childb);
int pusha = 0; int pushb = 0;
switch (type) {
case BRT_INSERT:
case BRT_DELETE_BOTH:
case BRT_DELETE_ANY:
case BRT_ABORT_BOTH:
case BRT_ABORT_ANY:
case BRT_COMMIT_BOTH:
case BRT_COMMIT_ANY:
if ((type!=BRT_DELETE_ANY && type!=BRT_ABORT_ANY && type!=BRT_COMMIT_ANY) || 0==(t->flags&TOKU_DB_DUPSORT)) {
// If it's an INSERT or DELETE_BOTH or there are no duplicates then we just put the command into one subtree
int cmp = brt_compare_pivot(t, &skd, &svd, splitk->data);
if (cmp <= 0) pusha = 1;
else pushb = 1;
} else {
assert((type==BRT_DELETE_ANY || type==BRT_ABORT_ANY || type==BRT_COMMIT_ANY) && t->flags&TOKU_DB_DUPSORT);
// It is a DELETE or ABORT_ANY and it's a DUPSORT database,
// in which case if the comparison function comes up 0 we must write the command to both children. (See #201)
int cmp = brt_compare_pivot(t, &skd, 0, splitk->data);
if (cmp<=0) pusha=1;
if (cmp>=0) pushb=1; // Could be that both pusha and pushb are set
}
if (pusha) {
r=insert_to_buffer_in_nonleaf(node, childnum, &skd, &svd, type, xid);
}
if (pushb) {
r=insert_to_buffer_in_nonleaf(node, childnum+1, &skd, &svd, type, xid);
}
//verify_local_fingerprint_nonleaf(childa); verify_local_fingerprint_nonleaf(childb);
if (r!=0) printf("r=%d\n", r);
assert(r==0);
goto ok;
case BRT_NONE:
// Don't have to do anything in this case, can just drop the command
goto ok;
}
printf("Bad type %d\n", type); // Don't use default: because I want a compiler warning if I forget a enum case, and I want a runtime error if the type isn't one of the expected ones.
assert(0);
ok: /*nothing*/;
});
toku_fifo_free(&old_h); toku_fifo_free(&old_h);
//verify_local_fingerprint_nonleaf(childa); //verify_local_fingerprint_nonleaf(childa);
...@@ -1152,6 +1085,14 @@ brt_split_child (BRT t, BRTNODE node, int childnum, TOKULOGGER logger) ...@@ -1152,6 +1085,14 @@ brt_split_child (BRT t, BRTNODE node, int childnum, TOKULOGGER logger)
} }
assert(node->height>0); assert(node->height>0);
BRTNODE child; BRTNODE child;
if (BNC_NBYTESINBUF(node, childnum)>0) {
// I don't think this can happen, but it's easy to handle. Flush the child, and if no longer fissible, then return.
enum reactivity re = RE_STABLE;
BOOL did_io = FALSE;
int r = flush_this_child(t, node, childnum, logger, &re, &did_io);
if (r != 0) return r;
if (re != RE_FISSIBLE) return 0;
}
{ {
void *childnode_v; void *childnode_v;
int r = toku_cachetable_get_and_pin(t->cf, int r = toku_cachetable_get_and_pin(t->cf,
...@@ -1848,7 +1789,10 @@ static void find_heaviest_child (BRTNODE node, int *childnum) { ...@@ -1848,7 +1789,10 @@ static void find_heaviest_child (BRTNODE node, int *childnum) {
} }
static int static int
flush_this_child (BRT t, BRTNODE node, int childnum, TOKULOGGER logger, enum reactivity *child_re, BOOL *did_io) { flush_this_child (BRT t, BRTNODE node, int childnum, TOKULOGGER logger, enum reactivity *child_re, BOOL *did_io)
// Effect: Push everything in the CHILDNUMth buffer of node down into the child.
// The child could end up reactive, and this function doesn't fix that.
{
assert(node->height>0); assert(node->height>0);
BLOCKNUM targetchild = BNC_BLOCKNUM(node, childnum); BLOCKNUM targetchild = BNC_BLOCKNUM(node, childnum);
assert(targetchild.b>=0 && targetchild.b<t->h->unused_blocks.b); // This assertion could fail in a concurrent setting since another process might have bumped unused memory. assert(targetchild.b>=0 && targetchild.b<t->h->unused_blocks.b); // This assertion could fail in a concurrent setting since another process might have bumped unused memory.
...@@ -2511,6 +2455,70 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char ...@@ -2511,6 +2455,70 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char
return 0; return 0;
} }
int toku_brt_reopen(BRT brt, const char *fname, const char *fname_in_env, TOKUTXN txn) {
int r;
// create a new file
int fd = -1;
r = brt_open_file(brt, fname, fname_in_env, TRUE, txn, &fd);
if (r != 0) return r;
// set the cachefile
r = toku_cachefile_set_fd(brt->cf, fd, fname_in_env);
assert(r == 0);
brt->h = 0; // set_fd should close the header
toku_logger_log_fopen(txn, fname_in_env, toku_cachefile_filenum(brt->cf));
// init the tree header
r = toku_read_brt_header_and_store_in_cachefile(brt->cf, &brt->h);
if (r == -1) {
r = brt_alloc_init_header(brt, NULL, txn);
}
return r;
}
int toku_brt_remove_subdb(BRT brt, const char *dbname, u_int32_t flags) {
int i;
int found = -1;
assert(flags == 0);
assert(brt->h);
assert(brt->h->n_named_roots>=0);
for (i = 0; i < brt->h->n_named_roots; i++) {
if (strcmp(brt->h->names[i], dbname) == 0) {
found = i;
break;
}
}
if (found == -1) {
//Should not be possible.
return ENOENT;
}
//Free old db name
toku_free(brt->h->names[found]);
//TODO: Free Diskblocks including root
for (i = found + 1; i < brt->h->n_named_roots; i++) {
brt->h->names[i - 1] = brt->h->names[i];
brt->h->roots[i - 1] = brt->h->roots[i];
brt->h->root_hashes[i - 1] = brt->h->root_hashes[i];
}
brt->h->n_named_roots--;
brt->h->dirty = 1;
// Q: What if n_named_roots becomes 0? A: Don't do anything. an empty list of named roots is OK.
XREALLOC_N(brt->h->n_named_roots, brt->h->names);
XREALLOC_N(brt->h->n_named_roots, brt->h->roots);
XREALLOC_N(brt->h->n_named_roots, brt->h->root_hashes);
return 0;
}
int toku_brt_get_fd(BRT brt, int *fdp) {
*fdp = toku_cachefile_fd(brt->cf);
return 0;
}
int toku_brt_set_flags(BRT brt, unsigned int flags) { int toku_brt_set_flags(BRT brt, unsigned int flags) {
brt->did_set_flags = 1; brt->did_set_flags = 1;
brt->flags = flags; brt->flags = flags;
...@@ -2612,6 +2620,11 @@ int toku_brt_create(BRT *brt_ptr) { ...@@ -2612,6 +2620,11 @@ int toku_brt_create(BRT *brt_ptr) {
*brt_ptr = brt; *brt_ptr = brt;
return 0; return 0;
} }
int toku_brt_flush (BRT brt) {
return toku_cachefile_flush(brt->cf);
}
/* ************* CURSORS ********************* */ /* ************* CURSORS ********************* */
static inline void dbt_cleanup(DBT *dbt) { static inline void dbt_cleanup(DBT *dbt) {
...@@ -3384,6 +3397,184 @@ int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *key, DBT *val, int get_flags, T ...@@ -3384,6 +3397,184 @@ int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *key, DBT *val, int get_flags, T
return r; return r;
} }
DBT *brt_cursor_peek_prev_key(BRT_CURSOR cursor)
// Effect: Return a pointer to a DBT for the previous key.
// Requires: The caller may not modify that DBT or the memory at which it points.
{
return &cursor->prevkey;
}
DBT *brt_cursor_peek_prev_val(BRT_CURSOR cursor)
// Effect: Return a pointer to a DBT for the previous val
// Requires: The caller may not modify that DBT or the memory at which it points.
{
return &cursor->prevval;
}
void brt_cursor_peek_current(BRT_CURSOR cursor, const DBT **pkey, const DBT **pval)
// Effect: Retrieves a pointer to the DBTs for the current key and value.
// Requires: The caller may not modify the DBTs or the memory at which they points.
{
if (cursor->current_in_omt) load_dbts_from_omt(cursor, &cursor->key, &cursor->val);
*pkey = &cursor->key;
*pval = &cursor->val;
}
DBT *brt_cursor_peek_current_key(BRT_CURSOR cursor)
// Effect: Return a pointer to a DBT for the current key.
// Requires: The caller may not modify that DBT or the memory at which it points.
{
if (cursor->current_in_omt) load_dbts_from_omt(cursor, &cursor->key, NULL);
return &cursor->key;
}
DBT *brt_cursor_peek_current_val(BRT_CURSOR cursor)
// Effect: Return a pointer to a DBT for the current val
// Requires: The caller may not modify that DBT or the memory at which it points.
{
if (cursor->current_in_omt) load_dbts_from_omt(cursor, NULL, &cursor->val);
return &cursor->val;
}
int toku_brt_cursor_peek_prev(BRT_CURSOR cursor, DBT *outkey, DBT *outval) {
if (toku_omt_cursor_is_valid(cursor->omtcursor)) {
{
assert(cursor->brt->h);
u_int64_t h_counter = cursor->brt->h->root_put_counter;
if (h_counter != cursor->root_put_counter) return -1;
}
OMTVALUE le;
u_int32_t index = 0;
int r = toku_omt_cursor_current_index(cursor->omtcursor, &index);
assert(r==0);
OMT omt = toku_omt_cursor_get_omt(cursor->omtcursor);
get_prev:;
if (index>0) {
r = toku_omt_fetch(omt, --index, &le, NULL);
if (r==0) {
if (le_is_provdel(le)) goto get_prev;
toku_fill_dbt(outkey, le_latest_key(le), le_latest_keylen(le));
toku_fill_dbt(outval, le_latest_val(le), le_latest_vallen(le));
return 0;
}
}
}
return -1;
}
int toku_brt_cursor_peek_next(BRT_CURSOR cursor, DBT *outkey, DBT *outval) {
if (toku_omt_cursor_is_valid(cursor->omtcursor)) {
{
assert(cursor->brt->h);
u_int64_t h_counter = cursor->brt->h->root_put_counter;
if (h_counter != cursor->root_put_counter) return -1;
}
OMTVALUE le;
u_int32_t index = UINT32_MAX;
int r = toku_omt_cursor_current_index(cursor->omtcursor, &index);
assert(r==0);
OMT omt = toku_omt_cursor_get_omt(cursor->omtcursor);
get_next:;
if (++index<toku_omt_size(omt)) {
r = toku_omt_fetch(omt, index, &le, NULL);
if (r==0) {
if (le_is_provdel(le)) goto get_next;
toku_fill_dbt(outkey, le_latest_key(le), le_latest_keylen(le));
toku_fill_dbt(outval, le_latest_val(le), le_latest_vallen(le));
return 0;
}
}
}
return -1;
}
static int brt_cursor_compare_heavi(brt_search_t *search, DBT *x, DBT *y) {
HEAVI_WRAPPER wrapper = search->context;
int r = wrapper->h(x, y, wrapper->extra_h);
// wrapper->r_h must have the same signus as the final chosen element.
// it is initialized to -1 or 1. 0's are closer to the min (max) that we
// want so once we hit 0 we keep it.
if (r==0) wrapper->r_h = 0;
return (search->direction&BRT_SEARCH_LEFT) ? r>=0 : r<=0;
}
//We pass in toku_dbt_fake to the search functions, since it will not pass the
//key(or val) to the heaviside function if key(or val) is NULL.
//It is not used for anything else,
//the actual 'extra' information for the heaviside function is inside the
//wrapper.
static const DBT __toku_dbt_fake;
static const DBT* const toku_dbt_fake = &__toku_dbt_fake;
int toku_brt_cursor_get_heavi (BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKUTXN txn, int direction, HEAVI_WRAPPER wrapper) {
TOKULOGGER logger = toku_txn_logger(txn);
brt_search_t search; brt_search_init(&search, brt_cursor_compare_heavi,
direction < 0 ? BRT_SEARCH_RIGHT : BRT_SEARCH_LEFT,
(DBT*)toku_dbt_fake,
cursor->brt->flags & TOKU_DB_DUPSORT ? (DBT*)toku_dbt_fake : NULL,
wrapper);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
BOOL toku_brt_cursor_uninitialized(BRT_CURSOR c) {
return brt_cursor_not_set(c);
}
int toku_brt_cursor_before(BRT_CURSOR cursor, DBT *key, DBT *val, DBT *outkey, DBT *outval, TOKUTXN txn) {
TOKULOGGER logger = toku_txn_logger(txn);
brt_search_t search; brt_search_init(&search, brt_cursor_compare_prev, BRT_SEARCH_RIGHT, key, val, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
int toku_brt_cursor_after(BRT_CURSOR cursor, DBT *key, DBT *val, DBT *outkey, DBT *outval, TOKUTXN txn) {
TOKULOGGER logger = toku_txn_logger(txn);
brt_search_t search; brt_search_init(&search, brt_cursor_compare_next, BRT_SEARCH_LEFT, key, val, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
void brt_cursor_restore_state_from_prev(BRT_CURSOR cursor) {
toku_omt_cursor_invalidate(cursor->omtcursor);
swap_cursor_dbts(cursor);
}
int toku_brt_get_cursor_count (BRT brt) {
int n = 0;
struct list *list;
for (list = brt->cursors.next; list != &brt->cursors; list = list->next)
n += 1;
return n;
}
int toku_brt_dbt_set(DBT* key, DBT* key_source) {
int r = toku_dbt_set_value(key, (bytevec*)&key_source->data, key_source->size, NULL, FALSE);
return r;
}
int toku_brt_cursor_dbts_set(BRT_CURSOR cursor,
DBT* key, DBT* key_source, BOOL key_disposable,
DBT* val, DBT* val_source, BOOL val_disposable) {
void** key_staticp = cursor->is_temporary_cursor ? &cursor->brt->skey : &cursor->skey;
void** val_staticp = cursor->is_temporary_cursor ? &cursor->brt->sval : &cursor->sval;
int r;
r = toku_dbt_set_two_values(key, (bytevec*)&key_source->data, key_source->size, key_staticp, key_disposable,
val, (bytevec*)&val_source->data, val_source->size, val_staticp, val_disposable);
return r;
}
int toku_brt_cursor_dbts_set_with_dat(BRT_CURSOR cursor, BRT pdb,
DBT* key, DBT* key_source, BOOL key_disposable,
DBT* val, DBT* val_source, BOOL val_disposable,
DBT* dat, DBT* dat_source, BOOL dat_disposable) {
void** key_staticp = cursor->is_temporary_cursor ? &cursor->brt->skey : &cursor->skey;
void** val_staticp = cursor->is_temporary_cursor ? &cursor->brt->sval : &cursor->sval;
void** dat_staticp = &pdb->sval;
int r;
r = toku_dbt_set_three_values(key, (bytevec*)&key_source->data, key_source->size, key_staticp, key_disposable,
val, (bytevec*)&val_source->data, val_source->size, val_staticp, val_disposable,
dat, (bytevec*)&dat_source->data, dat_source->size, dat_staticp, dat_disposable);
return r;
}
/* ********************************* lookup **************************************/ /* ********************************* lookup **************************************/
int toku_brt_lookup (BRT brt, DBT *k, DBT *v) { int toku_brt_lookup (BRT brt, DBT *k, DBT *v) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment