Commit 9756a5df authored by Rich Prohaska's avatar Rich Prohaska

add dupsort insert, delete, lookup

git-svn-id: file:///svn/tokudb@634 c7de825b-a66e-492c-adef-691d508d4ae1
parent 5f03add2
......@@ -114,6 +114,26 @@ static void fixup_child_fingerprint(BRTNODE node, int childnum_of_node, BRTNODE
node->dirty=1;
}
static int brt_compare_pivot(BRT brt, DBT *key, DBT *data, bytevec ck, unsigned int cl, DB *db) {
int cmp;
DBT mydbt;
if (brt->flags & DB_DUPSORT) {
bytevec k; unsigned int kl;
bytevec d; unsigned int dl;
assert(cl >= sizeof dl);
memcpy(&dl, ck, sizeof dl);
assert(cl >= dl - (sizeof dl));
kl = cl - dl - (sizeof dl);
k = ck + (sizeof dl);
d = ck + (sizeof dl) + kl;
cmp = brt->compare_fun(db, key, fill_dbt(&mydbt, k, kl));
if (cmp == 0 && data != 0)
cmp = brt->dup_compare(db, data, fill_dbt(&mydbt, d, dl));
} else
cmp = brt->compare_fun(db, key, fill_dbt(&mydbt, ck, cl));
return cmp;
}
void brtnode_flush_callback (CACHEFILE cachefile, DISKOFF nodename, void *brtnode_v, long size __attribute((unused)), BOOL write_me, BOOL keep_me, LSN modified_lsn, BOOL rename_p __attribute__((__unused__))) {
BRTNODE brtnode = brtnode_v;
......@@ -299,8 +319,11 @@ static void initialize_brtnode (BRT t, BRTNODE n, DISKOFF nodename, int height)
n->u.n.n_bytes_in_hashtables = 0;
} else {
int r = pma_create(&n->u.l.buffer, t->compare_fun, n->nodesize);
static int rcount=0;
assert(r==0);
pma_set_dup_mode(n->u.l.buffer, t->flags & (DB_DUP+DB_DUPSORT));
if (t->flags & DB_DUPSORT)
pma_set_dup_compare(n->u.l.buffer, t->dup_compare);
static int rcount=0;
//printf("%s:%d n PMA= %p (rcount=%d)\n", __FILE__, __LINE__, n->u.l.buffer, rcount);
rcount++;
n->u.l.n_bytes_in_buffer = 0;
......@@ -691,11 +714,19 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
BRT_CMD brtcmd;
brtcmd.type = type; brtcmd.u.id.key = &skd; brtcmd.u.id.val = &svd; brtcmd.u.id.db = db;
//verify_local_fingerprint_nonleaf(childa); verify_local_fingerprint_nonleaf(childb);
if (t->compare_fun(db, &skd, childsplitk)<=0) {
r=push_brt_cmd_down_only_if_it_wont_push_more_else_put_here(t, node, childa, &brtcmd, childnum, txn);
} else {
r=push_brt_cmd_down_only_if_it_wont_push_more_else_put_here(t, node, childb, &brtcmd, childnum+1, txn);
}
int tochildnum = childnum;
BRTNODE tochild = childa;
int cmp = brt_compare_pivot(t, &skd, &svd, childsplitk->data, childsplitk->size, db);
if (cmp < 0) {
;
} else if (cmp > 0) {
tochildnum = childnum+1; tochild = childb;
} else if (t->flags & DB_DUP) {
if (node->u.n.pivotflags[childnum] & BRT_PIVOT_PRESENT_R) {
tochildnum = childnum+1; tochild = childb;
}
}
r=push_brt_cmd_down_only_if_it_wont_push_more_else_put_here(t, node, tochild, &brtcmd, tochildnum, txn);
//verify_local_fingerprint_nonleaf(childa); verify_local_fingerprint_nonleaf(childb);
if (r!=0) return r;
}));
......@@ -938,18 +969,61 @@ static int brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
return EINVAL;
}
static unsigned int brtnode_which_child (BRTNODE node , DBT *k, BRT t, DB *db) {
/* find the rightmost child that the key/data will be inserted */
static unsigned int brtnode_right_child (BRTNODE node, DBT *k, DBT *data, BRT t, DB *db) {
assert(node->height>0);
int maybe = -1; /* last pivot that matched the key */
int i;
for (i=node->u.n.n_children-2; i >= 0; i--) {
int cmp = brt_compare_pivot(t, k, data, node->u.n.childkeys[i], node->u.n.childkeylens[i], db);
if (cmp < 0) {
continue;
} else if (cmp > 0) {
if (maybe != -1) goto foundkeymatch;
return i+1;
} else if (t->flags & DB_DUP) {
if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_R)
return i+1;
if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_L)
return i;
maybe = i;
} else
maybe = i;
}
maybe = 0;
foundkeymatch:
if (!(node->u.n.pivotflags[maybe] & BRT_PIVOT_PRESENT_L)) {
node->u.n.pivotflags[maybe] |= BRT_PIVOT_PRESENT_L;
node->dirty = 1;
}
return maybe;
}
/* find the leftmost child that may contain the key */
static unsigned int brtnode_left_child (BRTNODE node , DBT *k, DBT *d, BRT t, DB *db) {
int i;
assert(node->height>0);
for (i=0; i<node->u.n.n_children-1; i++) {
DBT k2;
if (t->compare_fun(db, k, fill_dbt(&k2, node->u.n.childkeys[i], node->u.n.childkeylens[i]))<=0) {
int cmp = brt_compare_pivot(t, k, d, node->u.n.childkeys[i], node->u.n.childkeylens[i], db);
if (cmp > 0) continue;
if (cmp < 0) return i;
if (t->flags & DB_DUP) {
if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_L)
return i;
if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_R)
return i+1;
}
return i;
}
return node->u.n.n_children-1;
}
static inline unsigned int brtnode_which_child (BRTNODE node , DBT *k, BRT t, DB *db) {
return brtnode_left_child(node, k, 0, t, db);
}
static int brt_nonleaf_put_cmd_child (BRT t, BRTNODE node, BRT_CMD *cmd,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb,
DBT *splitk, int debug, TOKUTXN txn, int childnum, int maybe) {
......@@ -1002,7 +1076,7 @@ static int brt_nonleaf_put_cmd_child (BRT t, BRTNODE node, BRT_CMD *cmd,
int brt_do_push_cmd = 1;
static int brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
static int brt_nonleaf_insert_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb,
DBT *splitk,
int debug,
......@@ -1016,7 +1090,8 @@ static int brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
DBT *v = cmd->u.id.val;
DB *db = cmd->u.id.db;
childnum = brtnode_which_child(node, k, t, db);
childnum = brtnode_right_child(node, k, v, t, db);
//rfp printf("nonleaf_insert %d,%d -> %lld %d\n", htonl(*(int*)k->data), *(int*)v->data, node->thisnodename, childnum);
/* non-buffering mode when cursors are open on this child */
if (node->u.n.n_cursors[childnum] > 0) {
......@@ -1042,6 +1117,104 @@ static int brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
if (debug) printf("%s:%d %*sDoing hash_insert\n", __FILE__, __LINE__, debug, "");
verify_counts(node);
if (found) {
if (!(t->flags & DB_DUP)) {
//printf("%s:%d found and deleting\n", __FILE__, __LINE__);
node->local_fingerprint -= node->rand4fingerprint * toku_calccrc32_cmd(anytype, k->data, k->size, olddata, olddatalen);
int r = toku_hash_delete(node->u.n.htables[childnum], k->data, k->size);
/* Be careful, olddata is now invalid because of the delete. */
int diff = k->size + olddatalen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD;
assert(r==0);
node->u.n.n_bytes_in_hashtables -= diff;
node->u.n.n_bytes_in_hashtable[childnum] -= diff;
node->dirty = 1;
//printf("%s:%d deleted %d bytes\n", __FILE__, __LINE__, diff);
found = 0;
}
}
}
//verify_local_fingerprint_nonleaf(node);
/* if the child is in the cache table then push the cmd to it
otherwise just put it into this node's buffer */
if (!found && brt_do_push_cmd) {
int r = brt_nonleaf_put_cmd_child(t, node, cmd, did_split, nodea, nodeb, splitk, debug, txn, childnum, 1);
if (r == 0) {
//printf("%s:%d\n", __FILE__, __LINE__);
return r;
}
}
//verify_local_fingerprint_nonleaf(node);
{
int diff = k->size + v->size + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD;
int r=toku_hash_insert(node->u.n.htables[childnum], k->data, k->size, v->data, v->size, type);
assert(r==0);
node->local_fingerprint += node->rand4fingerprint * toku_calccrc32_cmd(type, k->data, k->size, v->data, v->size);
node->u.n.n_bytes_in_hashtables += diff;
node->u.n.n_bytes_in_hashtable[childnum] += diff;
node->dirty = 1;
}
if (debug) printf("%s:%d %*sDoing maybe_push_down\n", __FILE__, __LINE__, debug, "");
//verify_local_fingerprint_nonleaf(node);
int r = brtnode_maybe_push_down(t, node, did_split, nodea, nodeb, splitk, debugp1(debug), db, txn);
if (r!=0) return r;
if (debug) printf("%s:%d %*sDid maybe_push_down\n", __FILE__, __LINE__, debug, "");
if (*did_split) {
assert(serialize_brtnode_size(*nodea)<=(*nodea)->nodesize);
assert(serialize_brtnode_size(*nodeb)<=(*nodeb)->nodesize);
assert((*nodea)->u.n.n_children>0);
assert((*nodeb)->u.n.n_children>0);
assert((*nodea)->u.n.children[(*nodea)->u.n.n_children-1]!=0);
assert((*nodeb)->u.n.children[(*nodeb)->u.n.n_children-1]!=0);
verify_counts(*nodea);
verify_counts(*nodeb);
} else {
assert(serialize_brtnode_size(node)<=node->nodesize);
verify_counts(node);
}
//if (*did_split) {
// verify_local_fingerprint_nonleaf(*nodea);
// verify_local_fingerprint_nonleaf(*nodeb);
//} else {
// verify_local_fingerprint_nonleaf(node);
//}
return 0;
}
static int brt_nonleaf_delete_cmd_child (BRT t, BRTNODE node, BRT_CMD *cmd,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
int debug, TOKUTXN txn, unsigned int childnum) {
//verify_local_fingerprint_nonleaf(node);
int found;
int type = cmd->type;
DBT *k = cmd->u.id.key;
DBT *v = cmd->u.id.val;
DB *db = cmd->u.id.db;
/* non-buffering mode when cursors are open on this child */
if (node->u.n.n_cursors[childnum] > 0) {
assert(node->u.n.n_bytes_in_hashtable[childnum] == 0);
int r = brt_nonleaf_put_cmd_child(t, node, cmd, did_split, nodea, nodeb, splitk, debug, txn, childnum, 0);
//if (*did_split) {
// verify_local_fingerprint_nonleaf(*nodea);
// verify_local_fingerprint_nonleaf(*nodeb);
//} else {
// verify_local_fingerprint_nonleaf(node);
//}
return r;
}
//verify_local_fingerprint_nonleaf(node);
{
int anytype;
bytevec olddata;
ITEMLEN olddatalen;
found = !toku_hash_find(node->u.n.htables[childnum], k->data, k->size, &olddata, &olddatalen, &anytype);
//verify_local_fingerprint_nonleaf(node);
if (debug) printf("%s:%d %*sDoing hash_insert\n", __FILE__, __LINE__, debug, "");
verify_counts(node);
while (found) {
//printf("%s:%d found and deleting\n", __FILE__, __LINE__);
node->local_fingerprint -= node->rand4fingerprint * toku_calccrc32_cmd(anytype, k->data, k->size, olddata, olddatalen);
int r = toku_hash_delete(node->u.n.htables[childnum], k->data, k->size);
......@@ -1052,6 +1225,7 @@ static int brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
node->u.n.n_bytes_in_hashtable[childnum] -= diff;
node->dirty = 1;
//printf("%s:%d deleted %d bytes\n", __FILE__, __LINE__, diff);
found = !toku_hash_find(node->u.n.htables[childnum], k->data, k->size, &olddata, &olddatalen, &anytype);
}
}
//verify_local_fingerprint_nonleaf(node);
......@@ -1101,6 +1275,92 @@ static int brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
return 0;
}
/* delete in all subtrees starting from the left most one which contains the key */
static int brt_nonleaf_delete_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
int debug,
TOKUTXN txn) {
int r;
/* find all children that need a delete cmd */
int delchild[TREE_FANOUT], delidx = 0;
inline void delchild_append(int i) {
if (delidx == 0 || delchild[delidx-1] != i)
delchild[delidx++] = i;
}
int i;
for (i = 0; i < node->u.n.n_children-1; i++) {
int cmp = brt_compare_pivot(t, cmd->u.id.key, 0, node->u.n.childkeys[i], node->u.n.childkeylens[i], cmd->u.id.db);
if (cmp > 0) {
continue;
} else if (cmp < 0) {
delchild_append(i);
break;
} else if (t->flags & DB_DUPSORT) {
delchild_append(i);
delchild_append(i+1);
if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_L) {
node->u.n.pivotflags[i] &= ~BRT_PIVOT_PRESENT_L;
node->dirty = 1;
}
if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_R) {
node->u.n.pivotflags[i] &= ~BRT_PIVOT_PRESENT_R;
node->dirty = 1;
}
} else if (t->flags & DB_DUP) {
if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_L) {
delchild_append(i);
node->u.n.pivotflags[i] &= ~BRT_PIVOT_PRESENT_L;
node->dirty = 1;
}
if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_R) {
delchild_append(i+1);
node->u.n.pivotflags[i] &= ~BRT_PIVOT_PRESENT_R;
node->dirty = 1;
}
} else {
if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_L) {
node->u.n.pivotflags[i] &= ~BRT_PIVOT_PRESENT_L;
node->dirty = 1;
}
delchild_append(i);
break;
}
}
if (delidx == 0)
delchild_append(node->u.n.n_children-1);
/* issue the delete cmd to all of the children found previously */
for (i=0; i<delidx; i++) {
r = brt_nonleaf_delete_cmd_child(t, node, cmd, did_split, nodea, nodeb, splitk, debug, txn, delchild[i]);
assert(r == 0);
}
/* post condition: for all pk(i) == k -> assert pf(i) == 0 */
for (i=0; i < node->u.n.n_children-1; i++) {
int cmp = brt_compare_pivot(t, cmd->u.id.key, 0, node->u.n.childkeys[i], node->u.n.childkeylens[i], cmd->u.id.db);
if (cmp == 0)
assert(node->u.n.pivotflags[i] == 0);
}
return 0;
}
static int brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb,
DBT *splitk,
int debug,
TOKUTXN txn) {
if (cmd->type == BRT_INSERT)
return brt_nonleaf_insert_cmd(t, node, cmd, did_split, nodea, nodeb, splitk, debug, txn);
else if (cmd->type == BRT_DELETE)
return brt_nonleaf_delete_cmd(t, node, cmd, did_split, nodea, nodeb, splitk, debug, txn);
else
return EINVAL;
}
//static void verify_local_fingerprint_nonleaf (BRTNODE node) {
// u_int32_t fp=0;
......@@ -1451,8 +1711,9 @@ int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, CACHEKE
assert(newroot);
*rootp=newroot_diskoff;
brt->h->dirty=1;
// printf("new_root %lld\n", newroot_diskoff);
initialize_brtnode (brt, newroot, newroot_diskoff, nodea->height+1);
printf("new_root %lld %d %lld %lld\n", newroot_diskoff, newroot->height, nodea->thisnodename, nodeb->thisnodename);
newroot->parent_brtnode=0;
newroot->u.n.n_children=2;
//printf("%s:%d Splitkey=%p %s\n", __FILE__, __LINE__, splitkey, splitkey);
......@@ -1589,8 +1850,7 @@ int brt_lookup_node (BRT brt, DISKOFF off, DBT *k, DBT *v, DB *db, BRTNODE paren
} else if (type == BRT_DELETE) {
result = DB_NOTFOUND;
} else {
assert(0);
result = -1; // some versions of gcc complain
result = EINVAL;
}
//verify_local_fingerprint_nonleaf(node);
r = cachetable_unpin(brt->cf, off, 0, 0);
......
......@@ -12,6 +12,46 @@
#include "yerror.h"
#include "hashfun.h"
static int hashelt_list_verify(struct hashelt_list *helist) {
HASHELT e = helist->head;
if (e == 0)
return helist->tail == 0;
while (e->next)
e = e->next;
return helist->tail == e;
}
static inline void hashelt_list_init(struct hashelt_list *helist) {
helist->head = helist->tail = 0;
}
static inline void hashelt_list_append(struct hashelt_list *helist, HASHELT e) {
assert(hashelt_list_verify(helist));
e->next = 0;
if (helist->tail)
helist->tail->next = e;
else
helist->head = e;
helist->tail = e;
assert(hashelt_list_verify(helist));
}
static inline HASHELT hashelt_list_pop(struct hashelt_list *helist) {
assert(hashelt_list_verify(helist));
HASHELT e = helist->head;
if (e) {
helist->head = e->next;
if (helist->head == 0)
helist->tail = 0;
assert(hashelt_list_verify(helist));
}
return e;
}
static inline HASHELT hashelt_list_peek(struct hashelt_list *helist) {
return helist->head;
}
int toku_hashtable_create (HASHTABLE *h) {
HASHTABLE MALLOC(tab);
unsigned int i;
......@@ -22,31 +62,39 @@ int toku_hashtable_create (HASHTABLE *h) {
assert(sizeof(*tab->array)==sizeof(void*));
tab->array = toku_calloc(tab->arraysize, sizeof(*tab->array));
for (i=0; i<tab->arraysize; i++) tab->array[i]=0;
tab->allow_dups = 1;
*h=tab;
return 0;
}
static void hash_find_internal (HASHTABLE tab, unsigned int hash, const unsigned char *key, ITEMLEN keylen, HASHELT *hashelt, HASHELT **prev_ptr) {
int toku_hashtable_set_dups (HASHTABLE tab, unsigned int allow_dups) {
tab->allow_dups = allow_dups;
return 0;
}
static void hash_find_internal (HASHTABLE tab, unsigned int hash, const unsigned char *key, ITEMLEN keylen, HASHDUP *dup_ptr, HASHDUP **prev_ptr) {
unsigned int h = hash % tab->arraysize;
HASHELT he;
HASHELT *prev = &tab->array[h];
for (he=*prev; he; prev=&he->next, he=*prev) {
HASHDUP dup;
HASHDUP *prev = &tab->array[h];
for (dup=*prev; dup; prev=&dup->next, dup=*prev) {
HASHELT he = hashelt_list_peek(&dup->kdlist); assert(he);
if (keylen==he->keylen && memcmp(key, he->keyval, keylen)==0) {
*prev_ptr = prev;
*hashelt = he;
*dup_ptr = dup;
return;
}
}
*prev_ptr = prev;
*hashelt = 0;
*dup_ptr = 0;
}
int toku_hash_find (HASHTABLE tab, bytevec key, ITEMLEN keylen, bytevec *data, ITEMLEN *datalen, int *type) {
HASHELT he, *prev_ptr;
hash_find_internal(tab, hash_key (key, keylen), key, keylen, &he, &prev_ptr);
if (he==0) {
HASHDUP dup, *prev;
hash_find_internal(tab, hash_key (key, keylen), key, keylen, &dup, &prev);
if (dup==0) {
return -1;
} else {
HASHELT he = dup->kdlist.head;
*data = &he->keyval[he->keylen];
*datalen = he->vallen;
*type = he->type;
......@@ -58,21 +106,22 @@ int toku_hash_rehash_everything (HASHTABLE tab, unsigned int primeindexdelta) {
int newprimeindex = primeindexdelta+tab->primeidx;
assert(newprimeindex>=0);
unsigned int newarraysize = get_prime(newprimeindex);
HASHELT *newarray = toku_calloc(newarraysize, sizeof(*tab->array));
HASHDUP *newarray = toku_calloc(newarraysize, sizeof(*tab->array));
unsigned int i;
//printf("%s:%d newarraysize=%d\n", __FILE__, __LINE__, newarraysize);
assert(newarray!=0);
tab->primeidx=newprimeindex;
for (i=0; i<newarraysize; i++) newarray[i]=0;
for (i=0; i<tab->arraysize; i++) {
HASHELT he;
while ((he=tab->array[i])!=0) {
HASHDUP dup;
while ((dup=tab->array[i])!=0) {
HASHELT he = hashelt_list_peek(&dup->kdlist); assert(he);
//unsigned int hk = hash_key((unsigned char *)he->key, he->keylen);
unsigned int h = he->hash%newarraysize;
//assert(he->hash==hk);
tab->array[i] = he->next;
he->next = newarray[h];
newarray[h] = he;
tab->array[i] = dup->next;
dup->next = newarray[h];
newarray[h] = dup;
}
}
toku_free(tab->array);
......@@ -87,15 +136,18 @@ int toku_hash_insert (HASHTABLE tab, const void *key, ITEMLEN keylen, const void
{
unsigned int hk = hash_key (key,keylen);
unsigned int h = hk%tab->arraysize;
{
HASHELT he,*prev_ptr;
hash_find_internal(tab, hk, key, keylen, &he, &prev_ptr);
if (he!=0) {
HASHDUP dup,*prev_ptr;
hash_find_internal(tab, hk, key, keylen, &dup, &prev_ptr);
if (dup == 0) {
dup = toku_malloc(sizeof *dup);
assert(dup);
hashelt_list_init(&dup->kdlist);
dup->next = tab->array[h];
tab->array[h]=dup;
} else if (!tab->allow_dups)
return BRT_ALREADY_THERE;
}
}
{
/* Otherwise the key is not already present, so we need to add it. */
HASHELT he=toku_malloc(sizeof(*he)+keylen+vallen);
assert(he); // ?????
he->type = type;
......@@ -103,30 +155,63 @@ int toku_hash_insert (HASHTABLE tab, const void *key, ITEMLEN keylen, const void
he->vallen = vallen;
memmove(&he->keyval[0], key, keylen);
memmove(&he->keyval[keylen], val, vallen);
he->hash = hk;
he->next = tab->array[h];
tab->array[h]=he;
hashelt_list_append(&dup->kdlist, he);
tab->n_keys++;
if (tab->n_keys > tab->arraysize) {
return toku_hash_rehash_everything(tab, +1);
}
return BRT_OK;
}
}
int toku_hash_delete (HASHTABLE tab, const void *key, ITEMLEN keylen) {
HASHELT he, *prev_ptr;
HASHDUP dup, *prev_ptr;
//printf("%s:%d deleting %s (bucket %d)\n", __FILE__, __LINE__, key, hash_key(key,keylen)%tab->arraysize);
hash_find_internal(tab, hash_key (key, keylen), key, keylen, &dup, &prev_ptr);
if (dup==0) return DB_NOTFOUND;
else {
assert(*prev_ptr==dup);
HASHELT he = hashelt_list_pop(&dup->kdlist);
assert(he);
//printf("%s:%d deleting %s %s\n", __FILE__, __LINE__, he->key, he->val);
toku_free_n(he, sizeof(*he)+he->keylen+he->vallen);
tab->n_keys--;
if (!hashelt_list_peek(&dup->kdlist)) {
/* delete the dups from the hash list */
*prev_ptr = dup->next;
toku_free_n(dup, sizeof *dup);
}
if ((tab->n_keys * 4 < tab->arraysize) && tab->primeidx>0) {
return toku_hash_rehash_everything(tab, -1);
}
return BRT_OK;
}
}
int toku_hash_delete_all (HASHTABLE tab, const void *key, ITEMLEN keylen) {
HASHDUP dup, *prev_ptr;
//printf("%s:%d deleting %s (bucket %d)\n", __FILE__, __LINE__, key, hash_key(key,keylen)%tab->arraysize);
hash_find_internal(tab, hash_key (key, keylen), key, keylen, &he, &prev_ptr);
if (he==0) return DB_NOTFOUND;
hash_find_internal(tab, hash_key (key, keylen), key, keylen, &dup, &prev_ptr);
if (dup==0) return DB_NOTFOUND;
else {
assert(*prev_ptr==dup);
/* delete the dups from the hash list */
*prev_ptr = dup->next;
/* delete all of the kd pairs in the dup list */
HASHELT he;
while ((he = hashelt_list_pop(&dup->kdlist)) != 0 ) {
//printf("%s:%d deleting %s %s\n", __FILE__, __LINE__, he->key, he->val);
assert(*prev_ptr==he);
*prev_ptr = he->next;
//printf("Freeing %s %s\n", he->key, he->val);
toku_free_n(he, sizeof(*he)+he->keylen+he->vallen);
tab->n_keys--;
}
toku_free_n(dup, sizeof *dup);
if ((tab->n_keys * 4 < tab->arraysize) && tab->primeidx>0) {
return toku_hash_rehash_everything(tab, -1);
......@@ -141,8 +226,9 @@ int toku_hashtable_random_pick(HASHTABLE h, bytevec *key, ITEMLEN *keylen, bytev
unsigned int usei = (*randomnumber)%h->arraysize;
for (i=0; i<h->arraysize; i++, usei++) {
if (usei>=h->arraysize) usei=0;
HASHELT he=h->array[usei];
if (he) {
HASHDUP dup=h->array[usei];
if (dup) {
HASHELT he = dup->kdlist.head; assert(he);
*key = &he->keyval[0];
*keylen = he->keylen;
*data = &he->keyval[he->keylen];
......@@ -220,7 +306,13 @@ void toku_hashtable_free(HASHTABLE *tab) {
void toku_hashtable_clear(HASHTABLE tab) {
unsigned int i;
for (i=0; i<tab->arraysize; i++) {
hasheltlist_free(tab->array[i]);
HASHDUP dup = tab->array[i];
while (dup) {
HASHDUP nextdup = dup->next;
hasheltlist_free(hashelt_list_peek(&dup->kdlist));
toku_free_n(dup, sizeof *dup);
dup = nextdup;
}
tab->array[i]=0;
}
tab->n_keys = 0;
......
......@@ -5,20 +5,37 @@
/* Hash table with chaining. */
/* The keys and values are byte sequences. */
/* The keys and values are malloc'd by the hashtable. */
/* Duplicate keys are allowed by default and are stored in a FIFO list */
typedef struct hashtable *HASHTABLE;
int toku_hashtable_create (HASHTABLE*);
/* Configure the hash table for duplicate keys.
allow_dups != 0 -> duplications allowed, allow_dups == 0 -> no duplicates */
int toku_hashtable_set_dups (HASHTABLE, unsigned int allow_dups);
/* Return 0 if the key is found in the hashtable, -1 otherwise. */
/* Warning: The data returned points to the internals of the hashtable. It is set to "const" to try to prevent you from messing it up. */
int toku_hash_find (HASHTABLE tab, bytevec key, ITEMLEN keylen, bytevec*data, ITEMLEN *datalen, int *type);
/* Replace the key if it was already there. */
/* Insert the key/data pair into the hash table.
If the key is not in the hash table then insert it.
If the key already exists and duplicates are allowed then append it to the list of duplicates.
If the key already exists and duplicates are not allowed then return an error */
int toku_hash_insert (HASHTABLE tab, const void *key, ITEMLEN keylen, const void *data, ITEMLEN datalen, int type);
/* It is OK to delete something that isn't there. */
/* Delete the first entry with the given key
It is OK to delete something that isn't there. */
int toku_hash_delete (HASHTABLE tab, const void *key, ITEMLEN keylen);
/* Delete all entries with the given key */
int toku_hash_delete_all (HASHTABLE tab, const void *key, ITEMLEN keylen);
void toku_hashtable_free(HASHTABLE *tab);
int toku_hashtable_n_entries(HASHTABLE);
......@@ -29,35 +46,50 @@ int toku_hashtable_random_pick(HASHTABLE h, bytevec *key, ITEMLEN *keylen, bytev
typedef struct hashelt *HASHELT;
struct hashelt {
unsigned int hash;
HASHELT next;
unsigned int hash;
int type;
ITEMLEN keylen;
ITEMLEN vallen;
char keyval[]; /* the first KEYLEN bytes are the key. The next bytes are the value. */
};
struct hashelt_list {
HASHELT head;
HASHELT tail;
};
typedef struct hashdup *HASHDUP;
struct hashdup {
HASHDUP next;
struct hashelt_list kdlist;
};
struct hashtable {
HASHDUP *array;
unsigned int n_keys;
unsigned int arraysize;
unsigned int primeidx;
HASHELT *array;
unsigned int allow_dups;
};
/* You cannot add or delete elements from the hashtable while iterating. */
void toku_hashtable_iterate (HASHTABLE tab, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,ITEMLEN datalen,int type, void*), void*);
// If you don't want to use something, do something like use "key __attribute__((__unused__))" for keyvar.
#define HASHTABLE_ITERATE(table,keyvar,keylenvar,datavar,datalenvar,typevar,body) ({ \
unsigned int hi_counter; \
for (hi_counter=0; hi_counter<table->arraysize; hi_counter++) { \
HASHDUP hi_dup; \
for (hi_dup=table->array[hi_counter]; hi_dup; hi_dup=hi_dup->next) { \
HASHELT hi_he; \
for (hi_he=table->array[hi_counter]; hi_he; hi_he=hi_he->next) { \
for (hi_he=hi_dup->kdlist.head; hi_he; hi_he=hi_he->next) { \
const char *keyvar = &hi_he->keyval[0]; \
ITEMLEN keylenvar = hi_he->keylen; \
const char *datavar = &hi_he->keyval[hi_he->keylen]; \
ITEMLEN datalenvar = hi_he->vallen; \
int typevar = hi_he->type; \
body; \
}}})
}}}})
#endif
......@@ -6,6 +6,7 @@
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <arpa/inet.h>
void verify_hash_instance (bytevec kv_v, ITEMLEN kl, bytevec dv_v, ITEMLEN dl,
int N, int *data, char *saw) {
......@@ -139,10 +140,122 @@ void test1(void) {
toku_hashtable_free(&table);
}
void test_insert_nodup(int n) {
HASHTABLE t;
int r;
r = toku_hashtable_create(&t);
assert(r == 0);
toku_hashtable_set_dups(t, 0);
int keys[n], vals[n];
int i;
for (i=0; i<n; i++) {
keys[i] = htonl(i);
vals[i] = i;
r = toku_hash_insert(t, &keys[i], sizeof keys[i], &vals[i], sizeof vals[i], i);
assert(r == 0);
}
for (i=0; i<n; i++) {
bytevec data; ITEMLEN datalen; int type;
r = toku_hash_find(t, &keys[i], sizeof keys[i], &data, &datalen, &type);
assert(r == 0);
assert(datalen == sizeof vals[i]);
assert(type == i);
int vv;
memcpy(&vv, data, datalen);
assert(vv == vals[i]);
}
/* try to insert duplicates should fail */
for (i=0; i<n; i++) {
keys[i] = htonl(i);
vals[i] = i;
r = toku_hash_insert(t, &keys[i], sizeof keys[i], &vals[i], sizeof vals[i], i);
assert(r != 0);
}
toku_hashtable_free(&t);
assert(t == 0);
}
void test_insert_dup(int n, int do_delete_all) {
HASHTABLE t;
int r;
r = toku_hashtable_create(&t);
assert(r == 0);
toku_hashtable_set_dups(t, 1);
int keys[n], vals[n];
int dupkey = n + n/2;
int i;
for (i=0; i<n; i++) {
keys[i] = htonl(i);
vals[i] = i;
r = toku_hash_insert(t, &keys[i], sizeof keys[i], &vals[i], sizeof vals[i], i);
assert(r == 0);
}
for (i=0; i<n; i++) {
int key = htonl(dupkey);
int val = i;
r = toku_hash_insert(t, &key, sizeof key, &val, sizeof val, i);
assert(r == 0);
}
for (i=0; i<n; i++) {
bytevec data; ITEMLEN datalen; int type;
r = toku_hash_find(t, &keys[i], sizeof keys[i], &data, &datalen, &type);
assert(r == 0);
assert(datalen == sizeof vals[i]);
assert(type == i);
int vv;
memcpy(&vv, data, datalen);
assert(vv == vals[i]);
}
for (i=0; ; i++) {
int key = htonl(dupkey);
bytevec data; ITEMLEN datalen; int type;
r = toku_hash_find(t, &key, sizeof key, &data, &datalen, &type);
if (r != 0) break;
assert(datalen == sizeof vals[i]);
assert(type == i);
int vv;
memcpy(&vv, data, datalen);
assert(vv == vals[i]);
if (do_delete_all)
r = toku_hash_delete_all(t, &key, sizeof key);
else
r = toku_hash_delete(t, &key, sizeof key);
assert(r == 0);
}
if (do_delete_all)
assert(i == 1);
else
assert(i == n);
toku_hashtable_free(&t);
assert(t == 0);
}
int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__unused__))) {
test_primes();
test0();
test1();
test_insert_nodup(1000);
test_insert_dup(1000, 0);
test_insert_dup(1000, 1);
malloc_cleanup();
return 0;
}
#include <stdlib.h>
#include <sys/stat.h>
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <db.h>
/* verify that the dup flags are written and read from the database file correctly */
void test_dup_flags(int dup_flags) {
printf("test_dup_flags:%d\n", dup_flags);
// DIR is defined in the Makefile
DB_ENV * const null_env = 0;
DB *db;
DB_TXN * const null_txn = 0;
const char * const fname = "test.dup.flags.brt";
int r;
#define CKERR(r) if (r!=0) fprintf(stderr, "%s:%d error %d %s\n", __FILE__, __LINE__, r, db_strerror(r)); assert(r==0);
unlink(fname);
/* create the dup database file */
r = db_create(&db, null_env, 0);
assert(r == 0);
r = db->set_flags(db, dup_flags);
assert(r == 0);
r = db->open(db, null_txn, fname, "main", DB_BTREE, DB_CREATE, 0666);
assert(r == 0);
r = db->close(db, 0);
assert(r == 0);
/* verify dup flags match */
r = db_create(&db, null_env, 0);
assert(r == 0);
r = db->open(db, null_txn, fname, "main", DB_BTREE, 0, 0666);
//assert(r != 0);
r = db->close(db, 0);
assert(r == 0);
r = db_create(&db, null_env, 0);
assert(r == 0);
r = db->set_flags(db, dup_flags);
assert(r == 0);
r = db->open(db, null_txn, fname, "main", DB_BTREE, 0, 0666);
assert(r == 0);
r = db->close(db, 0);
assert(r == 0);
/* verify nodesize match */
r = db_create(&db, null_env, 0);
assert(r == 0);
r = db->set_flags(db, dup_flags);
assert(r == 0);
r = db->set_pagesize(db, 4096);
assert(r == 0);
r = db->open(db, null_txn, fname, "main", DB_BTREE, 0, 0666);
assert(r == 0);
r = db->close(db, 0);
assert(r == 0);
}
DBT *dbt_init(DBT *dbt, void *data, u_int32_t size) {
memset(dbt, 0, sizeof *dbt);
dbt->data = data;
dbt->size = size;
return dbt;
}
DBT *dbt_init_malloc(DBT *dbt) {
memset(dbt, 0, sizeof *dbt);
dbt->flags = DB_DBT_MALLOC;
return dbt;
}
void expect(DBC *cursor, int k, int v) {
DBT key, val;
int r = cursor->c_get(cursor, dbt_init_malloc(&key), dbt_init_malloc(&val), DB_NEXT);
assert(r == 0);
assert(key.size == sizeof k);
int kk;
memcpy(&kk, key.data, key.size);
assert(val.size == sizeof v);
int vv;
memcpy(&vv, val.data, val.size);
if (kk != k || vv != v) printf("expect key %d got %d - %d %d\n", htonl(k), htonl(kk), htonl(v), htonl(vv));
assert(kk == k);
assert(vv == v);
free(key.data);
free(val.data);
}
/* verify that key insertions are stored in insert order */
void test_insert(int n, int dup_mode) {
printf("test_insert:%d %d\n", n, dup_mode);
int main() {
DB_ENV * const null_env = 0;
DB *db;
DB_TXN * const null_txn = 0;
const char * const fname = DIR "/test.dup.db";
const char * const fname = "test.insert.brt";
int r;
int i;
system("rm -rf " DIR);
r=mkdir(DIR, 0777); assert(r==0);
unlink(fname);
/* create the dup database file */
r = db_create(&db, null_env, 0);
CKERR(r);
r = db->set_flags(db, DB_DUP);
CKERR(r);
assert(r == 0);
r = db->set_flags(db, dup_mode);
assert(r == 0);
r = db->set_pagesize(db, 4096);
assert(r == 0);
r = db->open(db, null_txn, fname, "main", DB_BTREE, DB_CREATE, 0666);
CKERR(r);
assert(r == 0);
int values[n];
for (i=0; i<n; i++)
values[i] = htonl(random());
int sortvalues[n];
for (i=0; i<n; i++)
sortvalues[i] = values[i];
int mycmp(const void *a, const void *b) {
return memcmp(a, b, sizeof (int));
}
qsort(sortvalues, n, sizeof sortvalues[0], mycmp);
/* insert n-1 unique keys {0, 1, n-1} - {n/2} */
for (i=0; i<n; i++) {
if (i == n/2)
continue;
int k = htonl(i);
int v = values[i];
DBT key, val;
r = db->put(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0);
assert(r == 0);
}
/* insert n duplicates */
for (i=0; i<n; i++) {
int k = htonl(n/2);
int v = values[i];
DBT key, val;
r = db->put(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0);
assert(r == 0);
}
/* verify lookups */
for (i=0; i<n; i++) {
int k = htonl(i);
DBT key, val;
r = db->get(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init_malloc(&val), 0);
assert(r == 0);
int vv;
assert(val.size == sizeof vv);
memcpy(&vv, val.data, val.size);
if (i == n/2) {
if (dup_mode & DB_DUPSORT)
assert(vv == sortvalues[0]);
else if (dup_mode & DB_DUP)
assert(vv == values[0]);
else
assert(vv == values[n-1]);
} else
assert(vv == values[i]);
free(val.data);
}
/* verify the sort order with a cursor */
DBC *cursor;
r = db->cursor(db, null_txn, &cursor, 0);
assert(r == 0);
for (i=0; i<n/2; i++)
expect(cursor, htonl(i), values[i]);
if (dup_mode & DB_DUPSORT) {
for (i=0; i<n; i++)
expect(cursor, htonl(n/2), sortvalues[i]);
} else if (dup_mode & DB_DUP) {
for (i=0; i<n; i++)
expect(cursor, htonl(n/2), values[i]);
} else {
expect(cursor, htonl(n/2), values[n-1]);
}
for (i=(n/2)+1; i<n; i++)
expect(cursor, htonl(i), values[i]);
r = cursor->c_close(cursor);
assert(r == 0);
r = db->close(db, 0);
CKERR(r);
assert(r == 0);
}
/* verify dup flags match */
/* verify dup keys are buffered in order in non-leaf nodes */
void test_nonleaf_insert(int n, int dup_mode) {
printf("test_nonleaf_insert:%d %d\n", n, dup_mode);
DB_ENV * const null_env = 0;
DB *db;
DB_TXN * const null_txn = 0;
const char * const fname = "test.nonleaf.insert.brt";
int r;
int i;
unlink(fname);
/* create the dup database file */
r = db_create(&db, null_env, 0);
assert(r == 0);
r = db->set_flags(db, dup_mode);
assert(r == 0);
r = db->set_pagesize(db, 4096);
assert(r == 0);
r = db->open(db, null_txn, fname, "main", DB_BTREE, DB_CREATE, 0666);
assert(r == 0);
int values[n];
for (i=0; i<n; i++)
values[i] = htonl(random());
int sortvalues[n];
for (i=0; i<n; i++)
sortvalues[i] = values[i];
int mycmp(const void *a, const void *b) {
return memcmp(a, b, sizeof (int));
}
qsort(sortvalues, n, sizeof sortvalues[0], mycmp);
/* insert n-1 unique keys {0, 1, n-1} - {n/2} */
for (i=0; i<n; i++) {
if (i == n/2)
continue;
int k = htonl(i);
int v = values[i];
DBT key, val;
r = db->put(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0);
assert(r == 0);
}
/* reopen the database to force nonleaf buffering */
r = db->close(db, 0);
assert(r == 0);
r = db_create(&db, null_env, 0);
assert(r == 0);
r = db->set_flags(db, dup_mode);
assert(r == 0);
r = db->set_pagesize(db, 4096);
assert(r == 0);
r = db->open(db, null_txn, fname, "main", DB_BTREE, 0, 0666);
assert(r == 0);
/* insert n duplicates */
for (i=0; i<n; i++) {
int k = htonl(n/2);
int v = values[i];
DBT key, val;
r = db->put(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0);
assert(r == 0);
}
/* verify lookups */
for (i=0; i<n; i++) {
int k = htonl(i);
DBT key, val;
r = db->get(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init_malloc(&val), 0);
assert(r == 0);
int vv;
assert(val.size == sizeof vv);
memcpy(&vv, val.data, val.size);
if (i == n/2) {
if (dup_mode & DB_DUPSORT)
assert(vv == sortvalues[0]);
else if (dup_mode & DB_DUP)
assert(vv == values[0]);
else
assert(vv == values[n-1]);
} else
assert(vv == values[i]);
free(val.data);
}
/* verify the sort order with a cursor */
DBC *cursor;
r = db->cursor(db, null_txn, &cursor, 0);
assert(r == 0);
for (i=0; i<n/2; i++)
expect(cursor, htonl(i), values[i]);
if (dup_mode & DB_DUPSORT) {
for (i=0; i<n; i++)
expect(cursor, htonl(n/2), sortvalues[i]);
} else if (dup_mode & DB_DUP) {
for (i=0; i<n; i++)
expect(cursor, htonl(n/2), values[i]);
} else {
expect(cursor, htonl(n/2), values[n-1]);
}
for (i=(n/2)+1; i<n; i++)
expect(cursor, htonl(i), values[i]);
r = cursor->c_close(cursor);
assert(r == 0);
r = db->close(db, 0);
assert(r == 0);
}
/* verify dup keys delete */
void test_dup_delete(int n, int dup_mode) {
printf("test_dup_delete:%d %d\n", n, dup_mode);
DB_ENV * const null_env = 0;
DB *db;
DB_TXN * const null_txn = 0;
const char * const fname = "test.dup.insert.brt";
int r;
unlink(fname);
/* create the dup database file */
r = db_create(&db, null_env, 0);
assert(r == 0);
r = db->set_flags(db, dup_mode);
assert(r == 0);
r = db->set_pagesize(db, 4096);
assert(r == 0);
r = db->open(db, null_txn, fname, "main", DB_BTREE, DB_CREATE, 0666);
assert(r == 0);
/* insert n-1 unique keys {0, 1, n-1} - {n/2} */
int i;
for (i=0; i<n; i++) {
if (i == n/2)
continue;
int k = htonl(i);
int v = htonl(n+i);
DBT key, val;
r = db->put(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0);
assert(r == 0);
}
/* reopen the database to force nonleaf buffering */
r = db->close(db, 0);
assert(r == 0);
r = db_create(&db, null_env, 0);
CKERR(r);
assert(r == 0);
r = db->set_flags(db, dup_mode);
assert(r == 0);
r = db->set_pagesize(db, 4096);
assert(r == 0);
r = db->open(db, null_txn, fname, "main", DB_BTREE, 0, 0666);
CKERR(r);
assert(r == 0);
/* insert n duplicates */
for (i=0; i<n; i++) {
int k = htonl(n/2);
int v = htonl(n+i);
DBT key, val;
r = db->put(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0);
assert(r == 0);
r = db->get(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init_malloc(&val), 0);
assert(r == 0);
int vv;
assert(val.size == sizeof vv);
memcpy(&vv, val.data, val.size);
assert(vv == htonl(n));
free(val.data);
}
DBT key; int k = htonl(n/2);
r = db->del(db, null_txn, dbt_init(&key, &k, sizeof k), 0);
assert(r == 0);
/* verify lookup fails */
{
int k = htonl(n/2);
DBT key, val;
r = db->get(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init_malloc(&val), 0);
assert(r != 0);
}
/* verify all dups are removed using a cursor */
DBC *cursor;
r = db->cursor(db, null_txn, &cursor, 0);
assert(r == 0);
for (i=0; i<n/2; i++) {
expect(cursor, htonl(i), htonl(n+i));
}
for (i=(n/2)+1; i<n; i++) {
expect(cursor, htonl(i), htonl(n+i));
}
r = cursor->c_close(cursor);
assert(r == 0);
r = db->close(db, 0);
CKERR(r);
assert(r == 0);
}
void test_dup_delete_delete(int n) {
printf("test_dup_delete_delete:%d\n", n);
DB_ENV * const null_env = 0;
DB *db;
DB_TXN * const null_txn = 0;
const char * const fname = "test.dup.insert.brt";
int r;
unlink(fname);
/* create the dup database file */
r = db_create(&db, null_env, 0);
assert(r == 0);
r = db->set_flags(db, DB_DUP);
assert(r == 0);
r = db->set_pagesize(db, 4096);
assert(r == 0);
r = db->open(db, null_txn, fname, "main", DB_BTREE, DB_CREATE, 0666);
assert(r == 0);
/* insert n-1 unique keys {0, 1, n-1} - {n/2} */
int i;
for (i=0; i<n; i++) {
if (i == n/2)
continue;
int k = htonl(i);
int v = i;
DBT key, val;
r = db->put(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0);
assert(r == 0);
}
/* reopen the database to force nonleaf buffering */
r = db->close(db, 0);
assert(r == 0);
r = db_create(&db, null_env, 0);
CKERR(r);
assert(r == 0);
r = db->set_flags(db, DB_DUP);
CKERR(r);
assert(r == 0);
r = db->set_pagesize(db, 4096);
assert(r == 0);
r = db->open(db, null_txn, fname, "main", DB_BTREE, 0, 0666);
CKERR(r);
assert(r == 0);
/* insert n duplicates */
for (i=0; i<n; i++) {
int k = htonl(n/2);
int v = i;
DBT key, val;
r = db->put(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0);
assert(r == 0);
}
/* delete the dup key */
DBT key; int k = htonl(n/2);
r = db->del(db, null_txn, dbt_init(&key, &k, sizeof k), 0);
assert(r == 0);
/* delete again */
r = db->del(db, null_txn, dbt_init(&key, &k, sizeof k), 0);
assert(r == 0);
/* verify all dups are remove using a cursor */
DBC *cursor;
r = db->cursor(db, null_txn, &cursor, 0);
assert(r == 0);
for (i=0; i<n/2; i++) {
expect(cursor, htonl(i), i);
}
for (i=(n/2)+1; i<n; i++) {
expect(cursor, htonl(i), i);
}
r = cursor->c_close(cursor);
assert(r == 0);
r = db->close(db, 0);
CKERR(r);
assert(r == 0);
}
/* verify nodesize match */
void test_dup_delete_insert(int n, int dup_mode) {
printf("test_dup_delete_insert:%d %d\n", n, dup_mode);
DB_ENV * const null_env = 0;
DB *db;
DB_TXN * const null_txn = 0;
const char * const fname = "test.dup.insert.brt";
int r;
unlink(fname);
/* create the dup database file */
r = db_create(&db, null_env, 0);
assert(r == 0);
r = db->set_flags(db, dup_mode);
assert(r == 0);
r = db->set_pagesize(db, 4096);
assert(r == 0);
r = db->open(db, null_txn, fname, "main", DB_BTREE, DB_CREATE, 0666);
assert(r == 0);
/* insert n-1 unique keys {0, 1, n-1} - {n/2} */
int i;
for (i=0; i<n; i++) {
if (i == n/2)
continue;
int k = htonl(i);
int v = i;
DBT key, val;
r = db->put(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0);
assert(r == 0);
}
/* reopen the database to force nonleaf buffering */
r = db->close(db, 0);
assert(r == 0);
r = db_create(&db, null_env, 0);
CKERR(r);
assert(r == 0);
r = db->set_flags(db, dup_mode);
assert(r == 0);
r = db->set_pagesize(db, 4096);
assert(r == 0);
r = db->open(db, null_txn, fname, "main", DB_BTREE, 0, 0666);
assert(r == 0);
/* insert n duplicates */
for (i=0; i<n; i++) {
int k = htonl(n/2);
int v = htonl(i);
DBT key, val;
r = db->put(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0);
assert(r == 0);
r = db->get(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init_malloc(&val), 0);
assert(r == 0);
int vv;
assert(val.size == sizeof vv);
memcpy(&vv, val.data, val.size);
assert(vv == htonl(0));
free(val.data);
}
int k = htonl(n/2);
DBT key;
r = db->del(db, null_txn, dbt_init(&key, &k, sizeof k), 0);
assert(r == 0);
/* insert n duplicates */
for (i=0; i<n; i++) {
int k = htonl(n/2);
int v = htonl(i);
DBT key, val;
r = db->put(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0);
assert(r == 0);
r = db->get(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init_malloc(&val), 0);
assert(r == 0);
int vv;
assert(val.size == sizeof vv);
memcpy(&vv, val.data, val.size);
assert(vv == htonl(0));
free(val.data);
}
DBC *cursor;
r = db->cursor(db, null_txn, &cursor, 0);
assert(r == 0);
for (i=0; i<n/2; i++) {
expect(cursor, htonl(i), i);
}
for (i=0; i<n; i++) {
expect(cursor, htonl(n/2), htonl(i));
}
for (i=(n/2)+1; i<n; i++) {
expect(cursor, htonl(i), i);
}
r = cursor->c_close(cursor);
assert(r == 0);
r = db->close(db, 0);
assert(r == 0);
}
void test_all_dup_delete_insert(int n) {
printf("test_all_dup_delete_insert:%d\n", n);
DB_ENV * const null_env = 0;
DB *db;
DB_TXN * const null_txn = 0;
const char * const fname = "test.dup.insert.brt";
int r;
unlink(fname);
/* create the dup database file */
r = db_create(&db, null_env, 0);
assert(r == 0);
r = db->set_flags(db, DB_DUP);
assert(r == 0);
r = db->set_pagesize(db, 4096);
assert(r == 0);
r = db->open(db, null_txn, fname, "main", DB_BTREE, DB_CREATE, 0666);
assert(r == 0);
int i;
for (i=0; i<n; i++) {
int k = htonl(n/2);
int v = i;
DBT key, val;
r = db->put(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0);
assert(r == 0);
}
/* reopen the database to force nonleaf buffering */
r = db->close(db, 0);
assert(r == 0);
r = db_create(&db, null_env, 0);
assert(r == 0);
r = db->set_flags(db, DB_DUP);
CKERR(r);
assert(r == 0);
r = db->set_pagesize(db, 4096);
assert(r == 0);
r = db->open(db, null_txn, fname, "main", DB_BTREE, 0, 0666);
assert(r == 0);
/* insert n duplicates */
for (i=0; i<n; i++) {
int k = htonl(n/2);
int v = n+i;
DBT key, val;
r = db->put(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0);
assert(r == 0);
}
DBT key; int k = htonl(n/2);
r = db->del(db, null_txn, dbt_init(&key, &k, sizeof k), 0);
assert(r == 0);
/* insert n duplicates */
for (i=0; i<n; i++) {
int k = htonl(n/2);
int v = 2*n+i;
DBT key, val;
r = db->put(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0);
assert(r == 0);
}
DBC *cursor;
r = db->cursor(db, null_txn, &cursor, 0);
assert(r == 0);
for (i=0; i<n; i++) {
expect(cursor, htonl(n/2), 2*n+i);
}
r = cursor->c_close(cursor);
assert(r == 0);
r = db->close(db, 0);
assert(r == 0);
}
void test_walk_empty(int n, int dup_mode) {
printf("test_walk_empty:%d %d\n", n, dup_mode);
DB_ENV * const null_env = 0;
DB *db;
DB_TXN * const null_txn = 0;
const char * const fname = "test.dup.insert.brt";
int r;
unlink(fname);
/* create the dup database file */
r = db_create(&db, null_env, 0);
assert(r == 0);
r = db->set_flags(db, dup_mode);
assert(r == 0);
r = db->set_pagesize(db, 4096);
CKERR(r);
assert(r == 0);
r = db->open(db, null_txn, fname, "main", DB_BTREE, DB_CREATE, 0666);
assert(r == 0);
/* insert n duplicates */
int i;
for (i=0; i<n; i++) {
int k = htonl(n/2);
int v = htonl(i);
DBT key, val;
r = db->put(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0);
assert(r == 0);
}
/* reopen the database to force nonleaf buffering */
r = db->close(db, 0);
assert(r == 0);
r = db_create(&db, null_env, 0);
assert(r == 0);
r = db->set_flags(db, dup_mode);
assert(r == 0);
r = db->set_pagesize(db, 4096);
assert(r == 0);
r = db->open(db, null_txn, fname, "main", DB_BTREE, 0, 0666);
CKERR(r);
assert(r == 0);
/* insert n duplicates */
for (i=0; i<n; i++) {
int k = htonl(n/2);
int v = htonl(n+i);
DBT key, val;
r = db->put(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0);
assert(r == 0);
}
{
DBT key; int k = htonl(n/2);
r = db->del(db, null_txn, dbt_init(&key, &k, sizeof k), 0);
assert(r == 0);
}
DBC *cursor;
r = db->cursor(db, null_txn, &cursor, 0);
assert(r == 0);
{
DBT key, val;
int r = cursor->c_get(cursor, dbt_init_malloc(&key), dbt_init_malloc(&val), DB_NEXT);
assert(r != 0);
}
r = cursor->c_close(cursor);
assert(r == 0);
r = db->close(db, 0);
CKERR(r);
assert(r == 0);
}
int main() {
int i;
/* test flags */
test_dup_flags(DB_DUP);
test_dup_flags(DB_DUP + DB_DUPSORT);
/* test simple insert */
for (i = 1; i <= (1<<16); i *= 2) {
test_insert(i, 0);
test_insert(i, DB_DUP);
test_insert(i, DB_DUP + DB_DUPSORT);
}
/* test buffered insert */
for (i = 1; i <= (1<<16); i *= 2) {
test_nonleaf_insert(i, 0);
test_nonleaf_insert(i, DB_DUP);
test_nonleaf_insert(i, DB_DUP + DB_DUPSORT);
}
/* test dup delete */
for (i = 1; i <= (1<<16); i *= 2) {
test_dup_delete(i, DB_DUP);
test_dup_delete(i, DB_DUP + DB_DUPSORT);
}
/* test dup delete insert */
for (i = 1; i <= (1<<16); i *= 2) {
test_dup_delete_insert(i, DB_DUP);
test_dup_delete_insert(i, DB_DUP + DB_DUPSORT);
test_walk_empty(i, DB_DUP);
test_walk_empty(i, DB_DUP + DB_DUPSORT);
test_all_dup_delete_insert(i);
}
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment