Commit ee3d071c authored by Rich Prohaska's avatar Rich Prohaska

fix 2 dup search bugs

git-svn-id: file:///svn/tokudb@642 c7de825b-a66e-492c-adef-691d508d4ae1
parent 76e5b4c2
...@@ -1712,8 +1712,7 @@ int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, CACHEKE ...@@ -1712,8 +1712,7 @@ int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, CACHEKE
*rootp=newroot_diskoff; *rootp=newroot_diskoff;
brt->h->dirty=1; brt->h->dirty=1;
initialize_brtnode (brt, newroot, newroot_diskoff, nodea->height+1); initialize_brtnode (brt, newroot, newroot_diskoff, nodea->height+1);
printf("new_root %lld %d %lld %lld\n", newroot_diskoff, newroot->height, nodea->thisnodename, nodeb->thisnodename); // printf("new_root %lld %d %lld %lld\n", newroot_diskoff, newroot->height, nodea->thisnodename, nodeb->thisnodename);
newroot->parent_brtnode=0; newroot->parent_brtnode=0;
newroot->u.n.n_children=2; newroot->u.n.n_children=2;
//printf("%s:%d Splitkey=%p %s\n", __FILE__, __LINE__, splitkey, splitkey); //printf("%s:%d Splitkey=%p %s\n", __FILE__, __LINE__, splitkey, splitkey);
...@@ -1843,11 +1842,24 @@ int brt_lookup_node (BRT brt, DISKOFF off, DBT *k, DBT *v, DB *db, BRTNODE paren ...@@ -1843,11 +1842,24 @@ int brt_lookup_node (BRT brt, DISKOFF off, DBT *k, DBT *v, DB *db, BRTNODE paren
int type; int type;
if (toku_hash_find (node->u.n.htables[childnum], k->data, k->size, &hanswer, &hanswerlen, &type)==0) { if (toku_hash_find (node->u.n.htables[childnum], k->data, k->size, &hanswer, &hanswerlen, &type)==0) {
if (type == BRT_INSERT) { if (type == BRT_INSERT) {
if ((brt->flags & DB_DUP)) {
result = brt_lookup_node(brt, node->u.n.children[childnum], k, v, db, node);
if (result != 0) {
ybt_set_value(v, hanswer, hanswerlen, &brt->sval);
result = 0;
}
} else {
//printf("Found %d bytes\n", *vallen); //printf("Found %d bytes\n", *vallen);
ybt_set_value(v, hanswer, hanswerlen, &brt->sval); ybt_set_value(v, hanswer, hanswerlen, &brt->sval);
//printf("%s:%d Returning %p\n", __FILE__, __LINE__, v->data); //printf("%s:%d Returning %p\n", __FILE__, __LINE__, v->data);
result = 0; result = 0;
}
} else if (type == BRT_DELETE) { } else if (type == BRT_DELETE) {
if ((brt->flags & DB_DUP) && toku_hash_find_idx (node->u.n.htables[childnum], k->data, k->size, 1, &hanswer, &hanswerlen, &type) == 0) {
assert(type == BRT_INSERT);
ybt_set_value(v, hanswer, hanswerlen, &brt->sval);
result = 0;
} else
result = DB_NOTFOUND; result = DB_NOTFOUND;
} else { } else {
result = EINVAL; result = EINVAL;
......
...@@ -88,13 +88,33 @@ static void hash_find_internal (HASHTABLE tab, unsigned int hash, const unsigned ...@@ -88,13 +88,33 @@ static void hash_find_internal (HASHTABLE tab, unsigned int hash, const unsigned
*dup_ptr = 0; *dup_ptr = 0;
} }
int toku_hash_find_idx (HASHTABLE tab, bytevec key, ITEMLEN keylen, int idx, bytevec *data, ITEMLEN *datalen, int *type) {
HASHDUP dup, *prev;
hash_find_internal(tab, hash_key (key, keylen), key, keylen, &dup, &prev);
if (dup==0) {
return -1;
} else {
HASHELT he = hashelt_list_peek(&dup->kdlist);
int i;
for (i=0; i<idx; i++) {
he = he->next;
if (he == 0)
return -2;
}
*data = &he->keyval[he->keylen];
*datalen = he->vallen;
*type = he->type;
return 0;
}
}
int toku_hash_find (HASHTABLE tab, bytevec key, ITEMLEN keylen, bytevec *data, ITEMLEN *datalen, int *type) { int toku_hash_find (HASHTABLE tab, bytevec key, ITEMLEN keylen, bytevec *data, ITEMLEN *datalen, int *type) {
HASHDUP dup, *prev; HASHDUP dup, *prev;
hash_find_internal(tab, hash_key (key, keylen), key, keylen, &dup, &prev); hash_find_internal(tab, hash_key (key, keylen), key, keylen, &dup, &prev);
if (dup==0) { if (dup==0) {
return -1; return -1;
} else { } else {
HASHELT he = dup->kdlist.head; HASHELT he = hashelt_list_peek(&dup->kdlist);
*data = &he->keyval[he->keylen]; *data = &he->keyval[he->keylen];
*datalen = he->vallen; *datalen = he->vallen;
*type = he->type; *type = he->type;
...@@ -228,7 +248,7 @@ int toku_hashtable_random_pick(HASHTABLE h, bytevec *key, ITEMLEN *keylen, bytev ...@@ -228,7 +248,7 @@ int toku_hashtable_random_pick(HASHTABLE h, bytevec *key, ITEMLEN *keylen, bytev
if (usei>=h->arraysize) usei=0; if (usei>=h->arraysize) usei=0;
HASHDUP dup=h->array[usei]; HASHDUP dup=h->array[usei];
if (dup) { if (dup) {
HASHELT he = dup->kdlist.head; assert(he); HASHELT he = hashelt_list_peek(&dup->kdlist); assert(he);
*key = &he->keyval[0]; *key = &he->keyval[0];
*keylen = he->keylen; *keylen = he->keylen;
*data = &he->keyval[he->keylen]; *data = &he->keyval[he->keylen];
......
...@@ -18,7 +18,10 @@ int toku_hashtable_set_dups (HASHTABLE, unsigned int allow_dups); ...@@ -18,7 +18,10 @@ int toku_hashtable_set_dups (HASHTABLE, unsigned int allow_dups);
/* Return 0 if the key is found in the hashtable, -1 otherwise. */ /* Return 0 if the key is found in the hashtable, -1 otherwise. */
/* Warning: The data returned points to the internals of the hashtable. It is set to "const" to try to prevent you from messing it up. */ /* Warning: The data returned points to the internals of the hashtable. It is set to "const" to try to prevent you from messing it up. */
int toku_hash_find (HASHTABLE tab, bytevec key, ITEMLEN keylen, bytevec*data, ITEMLEN *datalen, int *type); int toku_hash_find (HASHTABLE tab, bytevec key, ITEMLEN keylen, bytevec *data, ITEMLEN *datalen, int *type);
/* match on key, index on duplicates */
int toku_hash_find_idx (HASHTABLE tab, bytevec key, ITEMLEN keylen, int idx, bytevec *data, ITEMLEN *datalen, int *type);
/* Insert the key/data pair into the hash table. /* Insert the key/data pair into the hash table.
If the key is not in the hash table then insert it. If the key is not in the hash table then insert it.
......
...@@ -747,6 +747,251 @@ void test_walk_empty(int n, int dup_mode) { ...@@ -747,6 +747,251 @@ void test_walk_empty(int n, int dup_mode) {
assert(r == 0); assert(r == 0);
} }
void test_icdi_search(int n, int dup_mode) {
printf("test_icdi_search:%d %d\n", n, dup_mode);
DB_ENV * const null_env = 0;
DB *db;
DB_TXN * const null_txn = 0;
const char * const fname = "test.dup.insert.brt";
int r;
unlink(fname);
/* create the dup database file */
r = db_create(&db, null_env, 0);
assert(r == 0);
r = db->set_flags(db, dup_mode);
assert(r == 0);
r = db->set_pagesize(db, 4096);
assert(r == 0);
r = db->open(db, null_txn, fname, "main", DB_BTREE, DB_CREATE, 0666);
assert(r == 0);
/* insert n duplicates */
int i;
for (i=0; i<n; i++) {
int k = htonl(n/2);
int v = htonl(i);
DBT key, val;
r = db->put(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0);
assert(r == 0);
r = db->get(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init_malloc(&val), 0);
assert(r == 0);
int vv;
assert(val.size == sizeof vv);
memcpy(&vv, val.data, val.size);
assert(vv == htonl(0));
free(val.data);
}
/* reopen the database to force nonleaf buffering */
r = db->close(db, 0);
assert(r == 0);
r = db_create(&db, null_env, 0);
assert(r == 0);
r = db->set_flags(db, dup_mode);
assert(r == 0);
r = db->set_pagesize(db, 4096);
assert(r == 0);
r = db->open(db, null_txn, fname, "main", DB_BTREE, 0, 0666);
assert(r == 0);
int k = htonl(n/2);
DBT key;
r = db->del(db, null_txn, dbt_init(&key, &k, sizeof k), 0);
assert(r == 0);
/* insert n duplicates */
for (i=0; i<n; i++) {
int k = htonl(n/2);
int v = htonl(n+i);
DBT key, val;
r = db->put(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0);
assert(r == 0);
r = db->get(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init_malloc(&val), 0);
assert(r == 0);
int vv;
assert(val.size == sizeof vv);
memcpy(&vv, val.data, val.size);
assert(vv == htonl(n));
free(val.data);
}
DBC *cursor;
r = db->cursor(db, null_txn, &cursor, 0);
assert(r == 0);
for (i=0; i<n; i++) {
expect(cursor, htonl(n/2), htonl(n+i));
}
r = cursor->c_close(cursor);
assert(r == 0);
r = db->close(db, 0);
assert(r == 0);
}
void test_ici_search(int n, int dup_mode) {
printf("test_ici_search:%d %d\n", n, dup_mode);
DB_ENV * const null_env = 0;
DB *db;
DB_TXN * const null_txn = 0;
const char * const fname = "test.dup.insert.brt";
int r;
unlink(fname);
/* create the dup database file */
r = db_create(&db, null_env, 0);
assert(r == 0);
r = db->set_flags(db, dup_mode);
assert(r == 0);
r = db->set_pagesize(db, 4096);
assert(r == 0);
r = db->open(db, null_txn, fname, "main", DB_BTREE, DB_CREATE, 0666);
assert(r == 0);
/* insert n duplicates */
int i;
for (i=0; i<n; i++) {
int k = htonl(n/2);
int v = htonl(i);
DBT key, val;
r = db->put(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0);
assert(r == 0);
r = db->get(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init_malloc(&val), 0);
assert(r == 0);
int vv;
assert(val.size == sizeof vv);
memcpy(&vv, val.data, val.size);
assert(vv == htonl(0));
free(val.data);
}
/* reopen the database to force nonleaf buffering */
r = db->close(db, 0);
assert(r == 0);
r = db_create(&db, null_env, 0);
assert(r == 0);
r = db->set_flags(db, dup_mode);
assert(r == 0);
r = db->set_pagesize(db, 4096);
assert(r == 0);
r = db->open(db, null_txn, fname, "main", DB_BTREE, 0, 0666);
assert(r == 0);
/* insert n duplicates */
for (i=0; i<n; i++) {
int k = htonl(n/2);
int v = htonl(n+i);
DBT key, val;
r = db->put(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0);
assert(r == 0);
r = db->get(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init_malloc(&val), 0);
assert(r == 0);
int vv;
assert(val.size == sizeof vv);
memcpy(&vv, val.data, val.size);
assert(vv == htonl(0));
free(val.data);
}
DBC *cursor;
r = db->cursor(db, null_txn, &cursor, 0);
assert(r == 0);
for (i=0; i<2*n; i++) {
expect(cursor, htonl(n/2), htonl(i));
}
r = cursor->c_close(cursor);
assert(r == 0);
r = db->close(db, 0);
assert(r == 0);
}
void db_insert(DB *db, int k, int v) {
DB_TXN * const null_txn = 0;
DBT key, val;
int r = db->put(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0);
assert(r == 0);
}
void expect_db_lookup(DB *db, int k, int v) {
DB_TXN * const null_txn = 0;
DBT key, val;
int r = db->get(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init_malloc(&val), 0);
assert(r == 0);
int vv;
assert(val.size == sizeof vv);
memcpy(&vv, val.data, val.size);
assert(vv == v);
free(val.data);
}
void test_i0i1ci0_search(int n, int dup_mode) {
printf("test_i0i1ci0_search:%d %d\n", n, dup_mode);
DB_ENV * const null_env = 0;
DB *db;
DB_TXN * const null_txn = 0;
const char * const fname = "test.dup.insert.brt";
int r;
unlink(fname);
/* create the dup database file */
r = db_create(&db, null_env, 0);
assert(r == 0);
r = db->set_flags(db, dup_mode);
assert(r == 0);
r = db->set_pagesize(db, 4096);
assert(r == 0);
r = db->open(db, null_txn, fname, "main", DB_BTREE, DB_CREATE, 0666);
assert(r == 0);
/* insert <0,0> */
db_insert(db, 0, 0);
/* insert n duplicates */
int i;
for (i=0; i<n; i++) {
int k = htonl(1);
int v = htonl(i);
db_insert(db, k, v);
expect_db_lookup(db, k, htonl(0));
}
/* reopen the database to force nonleaf buffering */
r = db->close(db, 0);
assert(r == 0);
r = db_create(&db, null_env, 0);
assert(r == 0);
r = db->set_flags(db, dup_mode);
assert(r == 0);
r = db->set_pagesize(db, 4096);
assert(r == 0);
r = db->open(db, null_txn, fname, "main", DB_BTREE, 0, 0666);
assert(r == 0);
/* insert <0,1> */
db_insert(db, 0, 1);
/* verify dup search digs deep into the tree */
expect_db_lookup(db, 0, 0);
r = db->close(db, 0);
assert(r == 0);
}
int main() { int main() {
int i; int i;
...@@ -773,7 +1018,6 @@ int main() { ...@@ -773,7 +1018,6 @@ int main() {
test_dup_delete(i, DB_DUP + DB_DUPSORT); test_dup_delete(i, DB_DUP + DB_DUPSORT);
} }
/* test dup delete insert */ /* test dup delete insert */
for (i = 1; i <= (1<<16); i *= 2) { for (i = 1; i <= (1<<16); i *= 2) {
test_dup_delete_insert(i, DB_DUP); test_dup_delete_insert(i, DB_DUP);
...@@ -783,5 +1027,12 @@ int main() { ...@@ -783,5 +1027,12 @@ int main() {
test_all_dup_delete_insert(i); test_all_dup_delete_insert(i);
} }
/* test dup search */
for (i = 1; i <= (1<<16); i *= 2) {
test_ici_search(i, DB_DUP);
test_icdi_search(i, DB_DUP);
test_i0i1ci0_search(i, DB_DUP);
}
return 0; return 0;
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment