Commit 14a77b12 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

newbrt tests run. Still no deletes. But the code is smaller than it was...

newbrt tests run.  Still no deletes.  But the code is smaller than it was (2734 according to sloccount vs 3137 before, which is 13 percent reduction).  Addresses #1195.

git-svn-id: file:///svn/toku/tokudb.1195@7362 c7de825b-a66e-492c-adef-691d508d4ae1
parent 3cd10a8b
......@@ -1591,102 +1591,6 @@ int toku_brt_cursor_get_heavi (BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKU
wrapper);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
static void toku_brt_keyrange_internal (BRT brt, CACHEKEY nodename, u_int32_t fullhash, DBT *key, u_int64_t *less, u_int64_t *equal, u_int64_t *greater) {
BRTNODE node;
{
void *node_v;
//assert(fullhash == toku_cachetable_hash(brt->cf, nodename));
int rr = toku_cachetable_get_and_pin(brt->cf, nodename, fullhash,
&node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt->h);
assert(rr == 0);
node = node_v;
assert(node->fullhash==fullhash);
}
if (node->height>0) {
int n_keys = node->u.n.n_children-1;
int compares[n_keys];
int i;
for (i=0; i<n_keys; i++) {
struct kv_pair *pivot = node->u.n.childkeys[i];
DBT dbt;
compares[i] = brt->compare_fun(brt->db, toku_fill_dbt(&dbt, kv_pair_key(pivot), kv_pair_keylen(pivot)), key);
}
for (i=0; i<node->u.n.n_children; i++) {
int prevcomp = (i==0) ? -1 : compares[i-1];
int nextcomp = (i+1 >= n_keys) ? 1 : compares[i];
int subest = BNC_SUBTREE_LEAFENTRY_ESTIMATE(node, i);
if (nextcomp < 0) {
// We're definitely looking too far to the left
*less += subest;
} else if (prevcomp > 0) {
// We're definitely looking too far to the right
*greater += subest;
} else if (prevcomp == 0 && nextcomp == 0) {
// We're looking at a subtree that contains all zeros
*equal += subest;
} else {
// nextcomp>=0 and prevcomp<=0, so something in the subtree could match
// but they are not both zero, so it's not the whole subtree, so we need to recurse
toku_brt_keyrange_internal(brt, BNC_BLOCKNUM(node, i), compute_child_fullhash(brt->cf, node, i), key, less, equal, greater);
}
}
} else {
BRT_CMD_S cmd = { BRT_INSERT, 0, .u.id={key,0}};
struct cmd_leafval_bessel_extra be = {brt, &cmd, 0};
u_int32_t idx;
int r = toku_omt_find_zero(node->u.l.buffer, toku_cmd_leafval_bessel, &be, 0, &idx, NULL);
*less += idx;
if (r==0 && (brt->flags & TOKU_DB_DUP)) {
// There is something, and so we now want to find the rightmost extent.
u_int32_t idx2;
r = toku_omt_find(node->u.l.buffer, toku_cmd_leafval_bessel, &be, +1, 0, &idx2, NULL);
if (r==0) {
*greater += toku_omt_size(node->u.l.buffer)-idx2;
*equal += idx2-idx;
} else {
*equal += toku_omt_size(node->u.l.buffer)-idx;
}
//printf("%s:%d (%llu, %llu, %llu)\n", __FILE__, __LINE__, (unsigned long long)*less, (unsigned long long)*equal, (unsigned long long)*greater);
} else {
*greater += toku_omt_size(node->u.l.buffer)-idx;
if (r==0) {
(*greater)--;
(*equal)++;
}
}
}
{
int rr = toku_unpin_brtnode(brt, node);
assert(rr == 0);
}
}
int toku_brt_keyrange (BRT brt, DBT *key, u_int64_t *less, u_int64_t *equal, u_int64_t *greater) {
assert(brt->h);
u_int32_t fullhash;
CACHEKEY *rootp = toku_calculate_root_offset_pointer(brt, &fullhash);
*less = *equal = *greater = 0;
toku_brt_keyrange_internal (brt, *rootp, fullhash, key, less, equal, greater);
return 0;
}
int toku_brt_cursor_delete(BRT_CURSOR cursor, int flags, TOKUTXN txn) {
if ((flags & ~DB_DELETE_ANY) != 0)
return EINVAL;
if (brt_cursor_not_set(cursor))
return EINVAL;
int r = 0;
if (!(flags & DB_DELETE_ANY))
r = brt_cursor_current(cursor, DB_CURRENT, 0, 0, toku_txn_logger(txn));
if (r == 0) {
if (cursor->current_in_omt) load_dbts_from_omt(cursor, &cursor->key, &cursor->val);
r = toku_brt_delete_both(cursor->brt, &cursor->key, &cursor->val, txn);
}
return r;
}
int toku_brt_height_of_root(BRT brt, int *height) {
// for an open brt, return the current height.
int r;
......
......@@ -83,8 +83,8 @@ int toku_testsetup_insert_to_leaf (BRT brt, BLOCKNUM blocknum, char *key, int ke
DBT keydbt,valdbt;
BRT_CMD_S cmd = {BRT_INSERT, 0, .u.id={toku_fill_dbt(&keydbt, key, keylen),
toku_fill_dbt(&valdbt, val, vallen)}};
struct cmd_leafval_bessel_extra be = {brt, &cmd, node->flags & TOKU_DB_DUPSORT};
r = toku_omt_find_zero(node->u.l.buffer, toku_cmd_leafval_bessel, &be, &storeddatav, &idx, NULL);
struct cmd_leafval_heaviside_extra be = {brt, &cmd, node->flags & TOKU_DB_DUPSORT};
r = toku_omt_find_zero(node->u.l.buffer, toku_cmd_leafval_heaviside, &be, &storeddatav, &idx, NULL);
if (r==0) {
......@@ -96,7 +96,7 @@ int toku_testsetup_insert_to_leaf (BRT brt, BLOCKNUM blocknum, char *key, int ke
// Now put the new kv in.
toku_omt_set_at(node->u.l.buffer, leafentry, idx);
} else {
r = toku_omt_insert(node->u.l.buffer, leafentry, toku_cmd_leafval_bessel, &be, 0);
r = toku_omt_insert(node->u.l.buffer, leafentry, toku_cmd_leafval_heaviside, &be, 0);
assert(r==0);
}
......
......@@ -1648,7 +1648,7 @@ static int brt_nonleaf_cmd_once_to_child (BRT t, BRTNODE node, unsigned int chil
r = brtnode_put_cmd (t, child, cmd, logger, &re_array[childnum], did_io);
int rr = toku_unpin_brtnode(t, child);
assert(rr=0);
assert(rr==0);
return r;
}
......@@ -1889,7 +1889,9 @@ brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, enum react
return brt_leaf_put_cmd(t, node, cmd, logger, re);
} else {
enum reactivity *MALLOC_N(node->u.n.n_children, child_re);
{ int i; for (i=0; i<node->u.n.n_children; i++) child_re[i]=RE_STABLE; }
int r = brt_nonleaf_put_cmd(t, node, cmd, logger, child_re, did_io);
{ int i; for (i=0; i<node->u.n.n_children; i++) assert(child_re[i]<=RE_FISSIBLE); }
if (r!=0) goto return_r;
// Now we may have overfilled node. So we'll flush the heaviest child until we are happy.
while (!*did_io // Don't flush if we've done I/O.
......@@ -1901,8 +1903,10 @@ brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, enum react
}
// Now all those children may need fixing.
int i;
for (i=0; i<node->u.n.n_children; i++) {
int childnum = node->u.n.n_children - 1 -i;
int original_n_children = node->u.n.n_children;
for (i=0; i<original_n_children; i++) {
{ int j; for (j=0; j<original_n_children; j++) assert(child_re[j]<=RE_FISSIBLE); }
int childnum = original_n_children - 1 -i;
switch (child_re[childnum]) {
case RE_STABLE: goto next_child; // Could be a continue, but it seems fragile
case RE_FISSIBLE:
......@@ -1944,7 +1948,7 @@ static int push_something_at_root (BRT brt, BRTNODE *nodep, CACHEKEY *rootp, BRT
// Note: During the initial descent, we may gorged many nonleaf nodes. We wish to flush only one nonleaf node at each level.
{
BRTNODE node = *nodep;
enum reactivity re;
enum reactivity re = RE_STABLE;
BOOL did_io = FALSE;
{
int r = brtnode_put_cmd(brt, node, cmd, logger, &re, &did_io);
......@@ -1957,7 +1961,7 @@ static int push_something_at_root (BRT brt, BRTNODE *nodep, CACHEKEY *rootp, BRT
switch (re) {
case RE_STABLE:
return 0;
case RE_FUSIBLE:
case RE_FISSIBLE:
// The root node should split, so make a new root.
{
BRTNODE nodea,nodeb;
......@@ -1971,7 +1975,7 @@ static int push_something_at_root (BRT brt, BRTNODE *nodep, CACHEKEY *rootp, BRT
}
return brt_init_new_root(brt, nodea, nodeb, splitk, rootp, logger, nodep);
}
case RE_FISSIBLE:
case RE_FUSIBLE:
return 0; // Cannot merge anything at the root, so return happy.
}
assert(0); // cannot happen
......@@ -2734,8 +2738,11 @@ static int heaviside_from_search_t (OMTVALUE lev, void *extra) {
abort(); return 0;
}
static int brt_search_leaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, TOKULOGGER logger, OMTCURSOR omtcursor) {
static int brt_search_leaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, enum reactivity *re, TOKULOGGER logger, OMTCURSOR omtcursor) {
// Now we have to convert from brt_search_t to the heaviside function with a direction. What a pain...
*re = get_leaf_reactivity(node); // searching doesn't change the reactivity, so we can calculate it here.
int direction;
switch (search->direction) {
case BRT_SEARCH_LEFT: direction = +1; goto ok;
......@@ -2893,8 +2900,9 @@ brt_search_node (BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *
{
if (node->height > 0)
return brt_search_nonleaf_node(brt, node, search, newkey, newval, re, logger, omtcursor);
else
return brt_search_leaf_node(brt, node, search, newkey, newval, logger, omtcursor);
else {
return brt_search_leaf_node(brt, node, search, newkey, newval, re, logger, omtcursor);
}
}
static int
......@@ -2932,13 +2940,15 @@ toku_brt_search (BRT brt, brt_search_t *search, DBT *newkey, DBT *newval, TOKULO
}
{
enum reactivity re;
enum reactivity re = RE_STABLE;
static int counter = 0;
counter++;
r = brt_search_node(brt, node, search, newkey, newval, &re, logger, omtcursor);
if (r!=0) goto return_r;
switch (re) {
case RE_STABLE: goto return_r;
case RE_FUSIBLE:
case RE_FISSIBLE:
// The root node should split, so make a new root.
{
BRTNODE nodea,nodeb;
......@@ -2953,7 +2963,7 @@ toku_brt_search (BRT brt, brt_search_t *search, DBT *newkey, DBT *newval, TOKULO
r = brt_init_new_root(brt, nodea, nodeb, splitk, rootp, logger, &node);
goto return_r;
}
case RE_FISSIBLE:
case RE_FUSIBLE:
goto return_r; // Cannot merge anything at the root, so return happy.
}
assert(0); // cannot happen
......@@ -3338,6 +3348,123 @@ int toku_brt_lookup (BRT brt, DBT *k, DBT *v) {
return r;
}
/* ********************************* delete **************************************/
int toku_brt_delete_both(BRT brt, DBT *key, DBT *val, TOKUTXN txn) {
//{ unsigned i; printf("del %p keylen=%d key={", brt->db, key->size); for(i=0; i<key->size; i++) printf("%d,", ((char*)key->data)[i]); printf("} datalen=%d data={", val->size); for(i=0; i<val->size; i++) printf("%d,", ((char*)val->data)[i]); printf("}\n"); }
int r;
if (txn && (brt->txn_that_created != toku_txn_get_txnid(txn))) {
BYTESTRING keybs = {key->size, toku_memdup_in_rollback(txn, key->data, key->size)};
BYTESTRING databs = {val->size, toku_memdup_in_rollback(txn, val->data, val->size)};
toku_cachefile_refup(brt->cf);
r = toku_logger_save_rollback_cmddeleteboth(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), keybs, databs);
if (r!=0) return r;
r = toku_txn_note_brt(txn, brt);
if (r!=0) return r;
}
BRT_CMD_S brtcmd = { BRT_DELETE_BOTH, toku_txn_get_txnid(txn), .u.id={key,val}};
r = toku_brt_root_put_cmd(brt, &brtcmd, toku_txn_logger(txn));
return r;
}
int toku_brt_cursor_delete(BRT_CURSOR cursor, int flags, TOKUTXN txn) {
if ((flags & ~DB_DELETE_ANY) != 0)
return EINVAL;
if (brt_cursor_not_set(cursor))
return EINVAL;
int r = 0;
if (!(flags & DB_DELETE_ANY))
r = brt_cursor_current(cursor, DB_CURRENT, 0, 0, toku_txn_logger(txn));
if (r == 0) {
if (cursor->current_in_omt) load_dbts_from_omt(cursor, &cursor->key, &cursor->val);
r = toku_brt_delete_both(cursor->brt, &cursor->key, &cursor->val, txn);
}
return r;
}
/* ********************* keyrange ************************ */
static void toku_brt_keyrange_internal (BRT brt, CACHEKEY nodename, u_int32_t fullhash, DBT *key, u_int64_t *less, u_int64_t *equal, u_int64_t *greater) {
BRTNODE node;
{
void *node_v;
//assert(fullhash == toku_cachetable_hash(brt->cf, nodename));
int rr = toku_cachetable_get_and_pin(brt->cf, nodename, fullhash,
&node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt->h);
assert(rr == 0);
node = node_v;
assert(node->fullhash==fullhash);
}
if (node->height>0) {
int n_keys = node->u.n.n_children-1;
int compares[n_keys];
int i;
for (i=0; i<n_keys; i++) {
struct kv_pair *pivot = node->u.n.childkeys[i];
DBT dbt;
compares[i] = brt->compare_fun(brt->db, toku_fill_dbt(&dbt, kv_pair_key(pivot), kv_pair_keylen(pivot)), key);
}
for (i=0; i<node->u.n.n_children; i++) {
int prevcomp = (i==0) ? -1 : compares[i-1];
int nextcomp = (i+1 >= n_keys) ? 1 : compares[i];
int subest = BNC_SUBTREE_LEAFENTRY_ESTIMATE(node, i);
if (nextcomp < 0) {
// We're definitely looking too far to the left
*less += subest;
} else if (prevcomp > 0) {
// We're definitely looking too far to the right
*greater += subest;
} else if (prevcomp == 0 && nextcomp == 0) {
// We're looking at a subtree that contains all zeros
*equal += subest;
} else {
// nextcomp>=0 and prevcomp<=0, so something in the subtree could match
// but they are not both zero, so it's not the whole subtree, so we need to recurse
toku_brt_keyrange_internal(brt, BNC_BLOCKNUM(node, i), compute_child_fullhash(brt->cf, node, i), key, less, equal, greater);
}
}
} else {
BRT_CMD_S cmd = { BRT_INSERT, 0, .u.id={key,0}};
struct cmd_leafval_heaviside_extra be = {brt, &cmd, 0};
u_int32_t idx;
int r = toku_omt_find_zero(node->u.l.buffer, toku_cmd_leafval_heaviside, &be, 0, &idx, NULL);
*less += idx;
if (r==0 && (brt->flags & TOKU_DB_DUP)) {
// There is something, and so we now want to find the rightmost extent.
u_int32_t idx2;
r = toku_omt_find(node->u.l.buffer, toku_cmd_leafval_heaviside, &be, +1, 0, &idx2, NULL);
if (r==0) {
*greater += toku_omt_size(node->u.l.buffer)-idx2;
*equal += idx2-idx;
} else {
*equal += toku_omt_size(node->u.l.buffer)-idx;
}
//printf("%s:%d (%llu, %llu, %llu)\n", __FILE__, __LINE__, (unsigned long long)*less, (unsigned long long)*equal, (unsigned long long)*greater);
} else {
*greater += toku_omt_size(node->u.l.buffer)-idx;
if (r==0) {
(*greater)--;
(*equal)++;
}
}
}
{
int rr = toku_unpin_brtnode(brt, node);
assert(rr == 0);
}
}
int toku_brt_keyrange (BRT brt, DBT *key, u_int64_t *less, u_int64_t *equal, u_int64_t *greater) {
assert(brt->h);
u_int32_t fullhash;
CACHEKEY *rootp = toku_calculate_root_offset_pointer(brt, &fullhash);
*less = *equal = *greater = 0;
toku_brt_keyrange_internal (brt, *rootp, fullhash, key, less, equal, greater);
return 0;
}
/* ********************* debugging dump ************************ */
static int
toku_dump_brtnode (BRT brt, BLOCKNUM blocknum, int depth, bytevec lorange, ITEMLEN lolen, bytevec hirange, ITEMLEN hilen) {
......
......@@ -4,7 +4,7 @@
#include "includes.h"
// execute the cachetable callbacks using a writer thread 0->no 1->yes
#define DO_WRITER_THREAD 1
#define DO_WRITER_THREAD 0
#if DO_WRITER_THREAD
static void *cachetable_writer(void *);
#endif
......
......@@ -1448,12 +1448,15 @@ static void brt_blackbox_test (void) {
test_brt_delete();
// This test doesn't make much sense any more. We'll have to do revised tests for this functionality.
#if 0
int old_brt_do_push_cmd = toku_brt_do_push_cmd;
toku_brt_do_push_cmd = 0;
test_brt_delete();
toku_brt_do_push_cmd = old_brt_do_push_cmd;
#endif
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment