Commit 0ef4099d authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

More work on making compare_fun

git-svn-id: file:///svn/tokudb@35 c7de825b-a66e-492c-adef-691d508d4ae1
parent c78194a0
...@@ -683,7 +683,60 @@ void test_cursor_next (void) { ...@@ -683,7 +683,60 @@ void test_cursor_next (void) {
} }
static int nonce;
DB nonce_db;
DBT *fill_b(DBT *x, char *key, unsigned int keylen) {
fill_dbt(x, key, keylen);
x->app_private = &nonce;
return x;
}
int wrong_compare_fun(DB *db, DBT *a, DBT *b) {
unsigned int i;
unsigned char *ad=a->data;
unsigned char *bd=b->data;
unsigned int siz=a->size;
assert(a->size==b->size);
assert(a->app_private == &nonce); // a must have the nonce in it, but I don't care if b does.
assert(db==&nonce_db); // make sure the db was passed down correctly
for (i=0; i<siz; i++) {
if (ad[siz-1-i]<bd[siz-1-i]) return -1;
if (ad[siz-1-i]>bd[siz-1-i]) return +1;
}
return 0;
}
static void test_wrongendian_compare (void) {
const char *n="testbrt.brt";
CACHETABLE ct;
BRT brt;
// BRT_CURSOR cursor;
int r;
DBT kbt, vbt;
int i;
unlink(n);
memory_check_all_free();
r = brt_create_cachetable(&ct, 0); assert(r==0);
r = open_brt(n, 0, 1, &brt, 1<<12, ct, wrong_compare_fun); assert(r==0);
for (i=0; i<1000; i++) {
r = brt_insert(brt, fill_b(&kbt, (void*)&i, sizeof(i)), fill_dbt(&vbt, (void*)&i, sizeof(i)), &nonce_db);
assert(r==0);
}
r = close_brt(brt);
r = cachetable_close(ct); assert(r==0);
memory_check_all_free();
}
static void brt_blackbox_test (void) { static void brt_blackbox_test (void) {
test_wrongendian_compare(); memory_check_all_free();
test_read_what_was_written(); memory_check_all_free(); printf("did read_what_was_written\n"); test_read_what_was_written(); memory_check_all_free(); printf("did read_what_was_written\n");
test_cursor_next(); memory_check_all_free(); test_cursor_next(); memory_check_all_free();
test_multiple_dbs_many(); memory_check_all_free(); test_multiple_dbs_many(); memory_check_all_free();
......
...@@ -279,7 +279,7 @@ static int insert_to_hash_in_nonleaf (BRTNODE node, int childnum, DBT *k, DBT *v ...@@ -279,7 +279,7 @@ static int insert_to_hash_in_nonleaf (BRTNODE node, int childnum, DBT *k, DBT *v
} }
int brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, DB *db) { int brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, void *app_private, DB *db) {
int did_split=0; int did_split=0;
BRTNODE A,B; BRTNODE A,B;
assert(node->height==0); assert(node->height==0);
...@@ -298,13 +298,13 @@ int brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *spl ...@@ -298,13 +298,13 @@ int brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *spl
({ ({
DBT k,v; DBT k,v;
if (!did_split) { if (!did_split) {
insert_to_buffer_in_leaf(A, fill_dbt(&k, key, keylen), fill_dbt(&v, val, vallen), db); insert_to_buffer_in_leaf(A, fill_dbt_ap(&k, key, keylen, app_private), fill_dbt(&v, val, vallen), db);
if (A->u.l.n_bytes_in_buffer *2 >= node->u.l.n_bytes_in_buffer) { if (A->u.l.n_bytes_in_buffer *2 >= node->u.l.n_bytes_in_buffer) {
fill_dbt(splitk, memdup(key, keylen), keylen); fill_dbt(splitk, memdup(key, keylen), keylen);
did_split=1; did_split=1;
} }
} else { } else {
insert_to_buffer_in_leaf(B, fill_dbt(&k, key, keylen), fill_dbt(&v, val, vallen), db); insert_to_buffer_in_leaf(B, fill_dbt_ap(&k, key, keylen, app_private), fill_dbt(&v, val, vallen), db);
} }
})); }));
assert(node->height>0 || node->u.l.buffer!=0); assert(node->height>0 || node->u.l.buffer!=0);
...@@ -561,6 +561,7 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum, ...@@ -561,6 +561,7 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
DBT *childsplitk, /* the data in the childsplitk is alloc'd and is consumed by this call. */ DBT *childsplitk, /* the data in the childsplitk is alloc'd and is consumed by this call. */
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, int *did_split, BRTNODE *nodea, BRTNODE *nodeb,
DBT *splitk, DBT *splitk,
void *app_private,
DB *db) { DB *db) {
assert(node->height>0); assert(node->height>0);
HASHTABLE old_h = node->u.n.htables[childnum]; HASHTABLE old_h = node->u.n.htables[childnum];
...@@ -610,7 +611,7 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum, ...@@ -610,7 +611,7 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
/* Keep pushing to the children, but not if the children would require a pushdown */ /* Keep pushing to the children, but not if the children would require a pushdown */
HASHTABLE_ITERATE(old_h, skey, skeylen, sval, svallen, ({ HASHTABLE_ITERATE(old_h, skey, skeylen, sval, svallen, ({
DBT skd, svd; DBT skd, svd;
fill_dbt(&skd, skey, skeylen); skd.app_private=childsplitk->app_private; fill_dbt_ap(&skd, skey, skeylen, app_private);
fill_dbt(&svd, sval, svallen); fill_dbt(&svd, sval, svallen);
if (t->compare_fun(db, &skd, childsplitk)<=0) { if (t->compare_fun(db, &skd, childsplitk)<=0) {
r=push_kvpair_down_only_if_it_wont_push_more_else_put_here(t, node, childa, &skd, &svd, childnum, db); r=push_kvpair_down_only_if_it_wont_push_more_else_put_here(t, node, childa, &skd, &svd, childnum, db);
...@@ -656,6 +657,7 @@ static int push_some_kvpairs_down (BRT t, BRTNODE node, int childnum, ...@@ -656,6 +657,7 @@ static int push_some_kvpairs_down (BRT t, BRTNODE node, int childnum,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, int *did_split, BRTNODE *nodea, BRTNODE *nodeb,
DBT *splitk, DBT *splitk,
int debug, int debug,
void *app_private,
DB *db) { DB *db) {
void *childnode_v; void *childnode_v;
BRTNODE child; BRTNODE child;
...@@ -690,7 +692,7 @@ static int push_some_kvpairs_down (BRT t, BRTNODE node, int childnum, ...@@ -690,7 +692,7 @@ static int push_some_kvpairs_down (BRT t, BRTNODE node, int childnum,
if (debug) printf("%s:%d %*spush down %s\n", __FILE__, __LINE__, debug, "", (char*)key); if (debug) printf("%s:%d %*spush down %s\n", __FILE__, __LINE__, debug, "", (char*)key);
r = push_a_kvpair_down (t, node, child, childnum, r = push_a_kvpair_down (t, node, child, childnum,
fill_dbt(&hk, key, keylen), fill_dbt(&hv, val, vallen), fill_dbt_ap(&hk, key, keylen, app_private), fill_dbt(&hv, val, vallen),
&child_did_split, &childa, &childb, &child_did_split, &childa, &childb,
&childsplitk, &childsplitk,
db); db);
...@@ -710,7 +712,8 @@ static int push_some_kvpairs_down (BRT t, BRTNODE node, int childnum, ...@@ -710,7 +712,8 @@ static int push_some_kvpairs_down (BRT t, BRTNODE node, int childnum,
if (debug) printf("%s:%d %*shandle split splitkey=%s\n", __FILE__, __LINE__, debug, "", (char*)childsplitk.data); if (debug) printf("%s:%d %*shandle split splitkey=%s\n", __FILE__, __LINE__, debug, "", (char*)childsplitk.data);
r=handle_split_of_child (t, node, childnum, r=handle_split_of_child (t, node, childnum,
childa, childb, &childsplitk, childa, childb, &childsplitk,
did_split, nodea, nodeb, splitk, db); did_split, nodea, nodeb, splitk,
app_private, db);
return r; /* Don't do any more pushing if the child splits. */ return r; /* Don't do any more pushing if the child splits. */
} }
} }
...@@ -728,7 +731,7 @@ int debugp1 (int debug) { ...@@ -728,7 +731,7 @@ int debugp1 (int debug) {
return debug ? debug+1 : 0; return debug ? debug+1 : 0;
} }
static int brtnode_maybe_push_down(BRT t, BRTNODE node, int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, int debug, DB *db) static int brtnode_maybe_push_down(BRT t, BRTNODE node, int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, int debug, void *app_private, DB *db)
/* If the buffer is too full, then push down. Possibly the child will split. That may make us split. */ /* If the buffer is too full, then push down. Possibly the child will split. That may make us split. */
{ {
assert(node->height>0); assert(node->height>0);
...@@ -744,7 +747,7 @@ static int brtnode_maybe_push_down(BRT t, BRTNODE node, int *did_split, BRTNODE ...@@ -744,7 +747,7 @@ static int brtnode_maybe_push_down(BRT t, BRTNODE node, int *did_split, BRTNODE
find_heaviest_child(node, &childnum); find_heaviest_child(node, &childnum);
if (0) printf("%s:%d %*spush some down from %lld into %lld (child %d)\n", __FILE__, __LINE__, debug, "", node->thisnodename, node->u.n.children[childnum], childnum); if (0) printf("%s:%d %*spush some down from %lld into %lld (child %d)\n", __FILE__, __LINE__, debug, "", node->thisnodename, node->u.n.children[childnum], childnum);
assert(node->u.n.children[childnum]!=0); assert(node->u.n.children[childnum]!=0);
int r = push_some_kvpairs_down(t, node, childnum, did_split, nodea, nodeb, splitk, debugp1(debug), db); int r = push_some_kvpairs_down(t, node, childnum, did_split, nodea, nodeb, splitk, debugp1(debug), app_private, db);
if (r!=0) return r; if (r!=0) return r;
assert(*did_split==0 || *did_split==1); assert(*did_split==0 || *did_split==1);
if (debug) printf("%s:%d %*sdid push_some_kvpairs_down did_split=%d\n", __FILE__, __LINE__, debug, "", *did_split); if (debug) printf("%s:%d %*sdid push_some_kvpairs_down did_split=%d\n", __FILE__, __LINE__, debug, "", *did_split);
...@@ -781,7 +784,7 @@ static int brt_leaf_insert (BRT t, BRTNODE node, DBT *k, DBT *v, ...@@ -781,7 +784,7 @@ static int brt_leaf_insert (BRT t, BRTNODE node, DBT *k, DBT *v,
node->u.l.n_bytes_in_buffer += k->size + v->size + KEY_VALUE_OVERHEAD; node->u.l.n_bytes_in_buffer += k->size + v->size + KEY_VALUE_OVERHEAD;
// If it doesn't fit, then split the leaf. // If it doesn't fit, then split the leaf.
if (serialize_brtnode_size(node) > node->nodesize) { if (serialize_brtnode_size(node) > node->nodesize) {
int r = brtleaf_split (t, node, nodea, nodeb, splitk, db); int r = brtleaf_split (t, node, nodea, nodeb, splitk, k->app_private, db);
if (r!=0) return r; if (r!=0) return r;
//printf("%s:%d splitkey=%s\n", __FILE__, __LINE__, (char*)*splitkey); //printf("%s:%d splitkey=%s\n", __FILE__, __LINE__, (char*)*splitkey);
split_count++; split_count++;
...@@ -842,7 +845,8 @@ static int brt_nonleaf_insert (BRT t, BRTNODE node, DBT *k, DBT *v, ...@@ -842,7 +845,8 @@ static int brt_nonleaf_insert (BRT t, BRTNODE node, DBT *k, DBT *v,
if (child_did_split) { if (child_did_split) {
r=handle_split_of_child(t, node, childnum, r=handle_split_of_child(t, node, childnum,
childa, childb, &childsplitk, childa, childb, &childsplitk,
did_split, nodea, nodeb, splitk, db); did_split, nodea, nodeb, splitk,
k->app_private, db);
if (r!=0) return r; if (r!=0) return r;
} else { } else {
cachetable_unpin(t->cf, child->thisnodename, 1); cachetable_unpin(t->cf, child->thisnodename, 1);
...@@ -872,7 +876,7 @@ static int brt_nonleaf_insert (BRT t, BRTNODE node, DBT *k, DBT *v, ...@@ -872,7 +876,7 @@ static int brt_nonleaf_insert (BRT t, BRTNODE node, DBT *k, DBT *v,
} }
if (debug) printf("%s:%d %*sDoing maybe_push_down\n", __FILE__, __LINE__, debug, ""); if (debug) printf("%s:%d %*sDoing maybe_push_down\n", __FILE__, __LINE__, debug, "");
int r = brtnode_maybe_push_down(t, node, did_split, nodea, nodeb, splitk, debugp1(debug), db); int r = brtnode_maybe_push_down(t, node, did_split, nodea, nodeb, splitk, debugp1(debug), k->app_private, db);
if (r!=0) return r; if (r!=0) return r;
if (debug) printf("%s:%d %*sDid maybe_push_down\n", __FILE__, __LINE__, debug, ""); if (debug) printf("%s:%d %*sDid maybe_push_down\n", __FILE__, __LINE__, debug, "");
if (*did_split) { if (*did_split) {
......
...@@ -455,7 +455,7 @@ enum pma_errors pma_lookup (PMA pma, DBT *k, DBT *v, DB *db) { ...@@ -455,7 +455,7 @@ enum pma_errors pma_lookup (PMA pma, DBT *k, DBT *v, DB *db) {
int l = pmainternal_find(pma, k, db); int l = pmainternal_find(pma, k, db);
assert(0<=l ); assert(l<=pma_index_limit(pma)); assert(0<=l ); assert(l<=pma_index_limit(pma));
if (l==pma_index_limit(pma)) return DB_NOTFOUND; if (l==pma_index_limit(pma)) return DB_NOTFOUND;
if (pma->compare_fun(db, k, fill_dbt(&k2, pma->pairs[l].key,pma->pairs[l].keylen))==0) { if (pma->pairs[l].key!=0 && pma->compare_fun(db, k, fill_dbt(&k2, pma->pairs[l].key,pma->pairs[l].keylen))==0) {
return ybt_set_value(v, pma->pairs[l].val, pma->pairs[l].vallen, &pma->sval); return ybt_set_value(v, pma->pairs[l].val, pma->pairs[l].vallen, &pma->sval);
} else { } else {
return DB_NOTFOUND; return DB_NOTFOUND;
......
...@@ -16,6 +16,12 @@ DBT *fill_dbt(DBT *dbt, bytevec k, ITEMLEN len) { ...@@ -16,6 +16,12 @@ DBT *fill_dbt(DBT *dbt, bytevec k, ITEMLEN len) {
return dbt; return dbt;
} }
DBT *fill_dbt_ap(DBT *dbt, bytevec k, ITEMLEN len, void *app_private) {
fill_dbt(dbt, k, len);
dbt->app_private=app_private;
return dbt;
}
int ybt_set_value (DBT *ybt, bytevec val, ITEMLEN vallen, void **staticptrp) { int ybt_set_value (DBT *ybt, bytevec val, ITEMLEN vallen, void **staticptrp) {
if (ybt->flags==DB_DBT_MALLOC) { if (ybt->flags==DB_DBT_MALLOC) {
......
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
DBT* init_dbt (DBT *); DBT* init_dbt (DBT *);
DBT *fill_dbt(DBT *dbt, bytevec k, ITEMLEN len); DBT *fill_dbt(DBT *dbt, bytevec k, ITEMLEN len);
DBT *fill_dbt_ap(DBT *dbt, bytevec k, ITEMLEN len, void *app_private);
int ybt_set_value (DBT *, bytevec val, ITEMLEN vallen, void **staticptrp); int ybt_set_value (DBT *, bytevec val, ITEMLEN vallen, void **staticptrp);
#endif #endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment