Commit 353948bf authored by Rich Prohaska's avatar Rich Prohaska

use TOKU_DB_DUPSORT internally

git-svn-id: file:///svn/tokudb@664 c7de825b-a66e-492c-adef-691d508d4ae1
parent 19e76243
......@@ -58,6 +58,7 @@ struct brtnode {
} u;
};
/* pivot flags (must fit in 8 bits) */
enum {
BRT_PIVOT_PRESENT_L = 1,
BRT_PIVOT_PRESENT_R = 2,
......
......@@ -347,7 +347,7 @@ int deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int flags,
goto died1;
}
pma_set_dup_mode(result->u.l.buffer, flags);
if (flags & DB_DUPSORT) pma_set_dup_compare(result->u.l.buffer, dup_compare);
if (flags & TOKU_DB_DUPSORT) pma_set_dup_compare(result->u.l.buffer, dup_compare);
//printf("%s:%d r PMA= %p\n", __FILE__, __LINE__, result->u.l.buffer);
#define BRT_USE_PMA_BULK_INSERT 1
#if BRT_USE_PMA_BULK_INSERT
......
......@@ -117,7 +117,7 @@ static void fixup_child_fingerprint(BRTNODE node, int childnum_of_node, BRTNODE
static int brt_compare_pivot(BRT brt, DBT *key, DBT *data, bytevec ck, unsigned int cl, DB *db) {
int cmp;
DBT mydbt;
if (brt->flags & DB_DUPSORT) {
if (brt->flags & TOKU_DB_DUPSORT) {
bytevec k; unsigned int kl;
bytevec d; unsigned int dl;
assert(cl >= sizeof dl);
......@@ -320,8 +320,8 @@ static void initialize_brtnode (BRT t, BRTNODE n, DISKOFF nodename, int height)
} else {
int r = pma_create(&n->u.l.buffer, t->compare_fun, n->nodesize);
assert(r==0);
pma_set_dup_mode(n->u.l.buffer, t->flags & (DB_DUP+DB_DUPSORT));
if (t->flags & DB_DUPSORT)
pma_set_dup_mode(n->u.l.buffer, t->flags & (TOKU_DB_DUP+TOKU_DB_DUPSORT));
if (t->flags & TOKU_DB_DUPSORT)
pma_set_dup_compare(n->u.l.buffer, t->dup_compare);
static int rcount=0;
//printf("%s:%d n PMA= %p (rcount=%d)\n", __FILE__, __LINE__, n->u.l.buffer, rcount);
......@@ -721,7 +721,7 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
;
} else if (cmp > 0) {
tochildnum = childnum+1; tochild = childb;
} else if (t->flags & DB_DUP) {
} else if (t->flags & TOKU_DB_DUP) {
if (node->u.n.pivotflags[childnum] & BRT_PIVOT_PRESENT_R) {
tochildnum = childnum+1; tochild = childb;
}
......@@ -981,7 +981,7 @@ static unsigned int brtnode_right_child (BRTNODE node, DBT *k, DBT *data, BRT t,
} else if (cmp > 0) {
if (maybe != -1) goto foundkeymatch;
return i+1;
} else if (t->flags & DB_DUP) {
} else if (t->flags & TOKU_DB_DUP) {
if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_R)
return i+1;
if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_L)
......@@ -1009,7 +1009,7 @@ static unsigned int brtnode_left_child (BRTNODE node , DBT *k, DBT *d, BRT t, DB
int cmp = brt_compare_pivot(t, k, d, node->u.n.childkeys[i], node->u.n.childkeylens[i], db);
if (cmp > 0) continue;
if (cmp < 0) return i;
if (t->flags & DB_DUP) {
if (t->flags & TOKU_DB_DUP) {
if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_L)
return i;
if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_R)
......@@ -1117,7 +1117,7 @@ static int brt_nonleaf_insert_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
if (debug) printf("%s:%d %*sDoing hash_insert\n", __FILE__, __LINE__, debug, "");
verify_counts(node);
if (found) {
if (!(t->flags & DB_DUP)) {
if (!(t->flags & TOKU_DB_DUP)) {
//printf("%s:%d found and deleting\n", __FILE__, __LINE__);
node->local_fingerprint -= node->rand4fingerprint * toku_calccrc32_cmd(anytype, k->data, k->size, olddata, olddatalen);
int r = toku_hash_delete(node->u.n.htables[childnum], k->data, k->size);
......@@ -1296,7 +1296,7 @@ static int brt_nonleaf_delete_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
} else if (cmp < 0) {
delchild_append(i);
break;
} else if (t->flags & DB_DUPSORT) {
} else if (t->flags & TOKU_DB_DUPSORT) {
delchild_append(i);
delchild_append(i+1);
if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_L) {
......@@ -1307,7 +1307,7 @@ static int brt_nonleaf_delete_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
node->u.n.pivotflags[i] &= ~BRT_PIVOT_PRESENT_R;
node->dirty = 1;
}
} else if (t->flags & DB_DUP) {
} else if (t->flags & TOKU_DB_DUP) {
if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_L) {
delchild_append(i);
node->u.n.pivotflags[i] &= ~BRT_PIVOT_PRESENT_L;
......@@ -1848,7 +1848,7 @@ int brt_lookup_node (BRT brt, DISKOFF off, DBT *k, DBT *v, DB *db, BRTNODE paren
int type;
if (toku_hash_find (node->u.n.htables[childnum], k->data, k->size, &hanswer, &hanswerlen, &type)==0) {
if (type == BRT_INSERT) {
if ((brt->flags & DB_DUP)) {
if ((brt->flags & TOKU_DB_DUP)) {
result = brt_lookup_node(brt, node->u.n.children[childnum], k, v, db, node);
if (result != 0) {
ybt_set_value(v, hanswer, hanswerlen, &brt->sval);
......@@ -1861,7 +1861,7 @@ int brt_lookup_node (BRT brt, DISKOFF off, DBT *k, DBT *v, DB *db, BRTNODE paren
result = 0;
}
} else if (type == BRT_DELETE) {
if ((brt->flags & DB_DUP) && toku_hash_find_idx (node->u.n.htables[childnum], k->data, k->size, 1, &hanswer, &hanswerlen, &type) == 0) {
if ((brt->flags & TOKU_DB_DUP) && toku_hash_find_idx (node->u.n.htables[childnum], k->data, k->size, 1, &hanswer, &hanswerlen, &type) == 0) {
assert(type == BRT_INSERT);
ybt_set_value(v, hanswer, hanswerlen, &brt->sval);
result = 0;
......
......@@ -14,6 +14,12 @@ int open_brt (const char *fname, const char *dbname, int is_create, BRT *, int n
int brt_create(BRT *);
int brt_set_flags(BRT, int flags);
enum {
TOKU_DB_DUP = 1,
TOKU_DB_DUPSORT = 2,
};
int brt_set_nodesize(BRT, int nodesize);
int brt_set_bt_compare(BRT, int (*bt_compare)(DB *, const DBT*, const DBT*));
int brt_set_dup_compare(BRT, int (*dup_compare)(DB *, const DBT*, const DBT*));
......
......@@ -1180,9 +1180,9 @@ void test_pma_split(void) {
test_pma_split_n(4); memory_check_all_free();
test_pma_split_n(8); memory_check_all_free();
test_pma_split_n(9); memory_check_all_free();
test_pma_dup_split_n(0, DB_DUP); memory_check_all_free();
test_pma_dup_split_n(1, DB_DUP); memory_check_all_free();
test_pma_dup_split_n(9, DB_DUP); memory_check_all_free();
test_pma_dup_split_n(0, TOKU_DB_DUP); memory_check_all_free();
test_pma_dup_split_n(1, TOKU_DB_DUP); memory_check_all_free();
test_pma_dup_split_n(9, TOKU_DB_DUP); memory_check_all_free();
test_pma_split_varkey(); memory_check_all_free();
test_pma_split_cursor(); memory_check_all_free();
}
......@@ -2048,7 +2048,7 @@ void test_dup_key_insert(int n) {
assert(r == 0);
pma_verify(pma, null_db);
r = pma_set_dup_mode(pma, DB_DUP);
r = pma_set_dup_mode(pma, TOKU_DB_DUP);
assert(r == 0);
......@@ -2138,7 +2138,7 @@ void test_dup_key_delete(int n, int mode) {
r = pma_set_dup_mode(pma, mode);
assert(r == 0);
if (mode & DB_DUPSORT) {
if (mode & TOKU_DB_DUPSORT) {
r = pma_set_dup_compare(pma, default_compare_fun);
assert(r == 0);
}
......@@ -2236,7 +2236,7 @@ void test_dupsort_key_insert(int n, int dup_data) {
assert(r == 0);
pma_verify(pma, null_db);
r = pma_set_dup_mode(pma, DB_DUP+DB_DUPSORT);
r = pma_set_dup_mode(pma, TOKU_DB_DUP+TOKU_DB_DUPSORT);
assert(r == 0);
r = pma_set_dup_compare(pma, default_compare_fun);
......@@ -2333,7 +2333,7 @@ void test_dup_key_lookup(int n, int mode) {
r = pma_set_dup_mode(pma, mode);
assert(r == 0);
if (mode & DB_DUPSORT) {
if (mode & TOKU_DB_DUPSORT) {
r = pma_set_dup_compare(pma, default_compare_fun);
assert(r == 0);
}
......@@ -2380,16 +2380,16 @@ void test_dup() {
test_dup_key_insert(0); memory_check_all_free();
test_dup_key_insert(2); memory_check_all_free();
test_dup_key_insert(1000); memory_check_all_free();
test_dup_key_delete(0, DB_DUP); memory_check_all_free();
test_dup_key_delete(1000, DB_DUP); memory_check_all_free();
test_dup_key_delete(0, TOKU_DB_DUP); memory_check_all_free();
test_dup_key_delete(1000, TOKU_DB_DUP); memory_check_all_free();
test_dupsort_key_insert(2, 0); memory_check_all_free();
test_dupsort_key_insert(1000, 0); memory_check_all_free();
test_dupsort_key_insert(2, 1); memory_check_all_free();
test_dupsort_key_insert(1000, 1); memory_check_all_free();
test_dup_key_delete(0, DB_DUP+DB_DUPSORT); memory_check_all_free();
test_dup_key_delete(1000, DB_DUP+DB_DUPSORT); memory_check_all_free();
test_dup_key_lookup(32, DB_DUP); memory_check_all_free();
test_dup_key_lookup(32, DB_DUP+DB_DUPSORT); memory_check_all_free();
test_dup_key_delete(0, TOKU_DB_DUP+TOKU_DB_DUPSORT); memory_check_all_free();
test_dup_key_delete(1000, TOKU_DB_DUP+TOKU_DB_DUPSORT); memory_check_all_free();
test_dup_key_lookup(32, TOKU_DB_DUP); memory_check_all_free();
test_dup_key_lookup(32, TOKU_DB_DUP+TOKU_DB_DUPSORT); memory_check_all_free();
}
void pma_tests (void) {
......
......@@ -622,13 +622,13 @@ int pma_set_compare(PMA pma, pma_compare_fun_t compare_fun) {
}
int pma_set_dup_mode(PMA pma, int dup_mode) {
assert(dup_mode == 0 || dup_mode == DB_DUP || dup_mode == (DB_DUP+DB_DUPSORT));
assert(dup_mode == 0 || dup_mode == TOKU_DB_DUP || dup_mode == (TOKU_DB_DUP+TOKU_DB_DUPSORT));
pma->dup_mode = dup_mode;
return 0;
}
int pma_set_dup_compare(PMA pma, pma_compare_fun_t dup_compare_fun) {
assert(pma->dup_mode & DB_DUPSORT);
assert(pma->dup_mode & TOKU_DB_DUPSORT);
pma->dup_compare_fun = dup_compare_fun;
return 0;
}
......@@ -734,7 +734,7 @@ int pma_cursor_get_current(PMA_CURSOR c, DBT *key, DBT *val) {
int pma_cursor_set_key(PMA_CURSOR c, DBT *key, DB *db) {
PMA pma = c->pma;
int here, found;
if (pma->dup_mode & DB_DUP) {
if (pma->dup_mode & TOKU_DB_DUP) {
here = __pma_left_search(pma, key, db, 0, pma->N, &found);
} else
here = pmainternal_find(pma, key, db);
......@@ -775,7 +775,7 @@ int pma_cursor_set_both(PMA_CURSOR c, DBT *key, DBT *val, DB *db) {
int pma_cursor_set_range(PMA_CURSOR c, DBT *key, DB *db) {
PMA pma = c->pma;
int here, found;
if (pma->dup_mode & DB_DUP)
if (pma->dup_mode & TOKU_DB_DUP)
here = __pma_left_search(pma, key, db, 0, pma->N, &found);
else
here = pmainternal_find(pma, key, db);
......@@ -885,7 +885,7 @@ int pmainternal_make_space_at (PMA pma, int idx) {
enum pma_errors pma_lookup (PMA pma, DBT *k, DBT *v, DB *db) {
int here, found;
if (pma->dup_mode & DB_DUP) {
if (pma->dup_mode & TOKU_DB_DUP) {
here = __pma_left_search(pma, k, db, 0, pma->N, &found);
} else
here = pmainternal_find(pma, k, db);
......@@ -938,11 +938,11 @@ int pma_free (PMA *pmap) {
int pma_insert (PMA pma, DBT *k, DBT *v, DB* db, TOKUTXN txn, DISKOFF diskoff, u_int32_t rand4fingerprint, u_int32_t *fingerprint) {
int found, idx;
if (pma->dup_mode & DB_DUPSORT) {
if (pma->dup_mode & TOKU_DB_DUPSORT) {
idx = __pma_dup_search(pma, k, v, db, 0, pma->N, &found);
if (found)
idx += 1;
} else if (pma->dup_mode & DB_DUP) {
} else if (pma->dup_mode & TOKU_DB_DUP) {
idx = __pma_right_search(pma, k, db, 0, pma->N, &found);
if (found)
idx += 1;
......@@ -1039,7 +1039,7 @@ int pma_delete (PMA pma, DBT *k, DB *db, u_int32_t rand4sem, u_int32_t *fingerpr
if (!deleted_size)
deleted_size = &my_deleted_size;
*deleted_size = 0;
if (pma->dup_mode & DB_DUP)
if (pma->dup_mode & TOKU_DB_DUP)
return pma_delete_dup(pma, k, db, rand4sem, fingerprint, deleted_size);
else
return pma_delete_nodup(pma, k, db, rand4sem, fingerprint, deleted_size);
......@@ -1136,11 +1136,11 @@ int pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
//printf("%s:%d v->size=%d\n", __FILE__, __LINE__, v->size);
int r;
int idx, found;
if (pma->dup_mode & DB_DUPSORT) {
if (pma->dup_mode & TOKU_DB_DUPSORT) {
idx = __pma_dup_search(pma, k, v, db, 0, pma->N, &found);
if (found)
idx += 1;
} else if (pma->dup_mode & DB_DUP) {
} else if (pma->dup_mode & TOKU_DB_DUP) {
idx = __pma_right_search(pma, k, db, 0, pma->N, &found);
if (found)
idx += 1;
......@@ -1301,7 +1301,7 @@ static void __pma_relocate_kvpairs(PMA pma) {
static int __pma_compare_kv(PMA pma, struct kv_pair *a, struct kv_pair *b, DB *db) {
DBT dbta, dbtb;
int cmp = pma->compare_fun(db, fill_dbt(&dbta, kv_pair_key(a), kv_pair_keylen(a)), fill_dbt(&dbtb, kv_pair_key(b), kv_pair_keylen(b)));
if (cmp == 0 && (pma->dup_mode & DB_DUPSORT)) {
if (cmp == 0 && (pma->dup_mode & TOKU_DB_DUPSORT)) {
cmp = pma->dup_compare_fun(db, fill_dbt(&dbta, kv_pair_val(a), kv_pair_vallen(b)), fill_dbt(&dbtb, kv_pair_val(b), kv_pair_vallen(b)));
}
return cmp;
......@@ -1380,7 +1380,7 @@ int pma_split(PMA origpma, unsigned int *origpma_size, DBT *splitk, DB *db,
if (splitk) {
struct kv_pair *a = pairs[spliti-1].pair;
if (origpma->dup_mode & DB_DUPSORT) {
if (origpma->dup_mode & TOKU_DB_DUPSORT) {
int kl = kv_pair_keylen(a);
int vl = kv_pair_vallen(a);
splitk->size = (sizeof vl) + kl + vl;
......@@ -1519,9 +1519,9 @@ void pma_verify(PMA pma, DB *db) {
int r = pma->compare_fun(db, &kv_dbt, &nextkv_dbt);
if (pma->dup_mode == 0)
assert(r < 0);
else if (pma->dup_mode & DB_DUP)
else if (pma->dup_mode & TOKU_DB_DUP)
assert(r <= 0);
if (r == 0 && (pma->dup_mode & DB_DUPSORT)) {
if (r == 0 && (pma->dup_mode & TOKU_DB_DUPSORT)) {
fill_dbt(&kv_dbt, kv_pair_val(kv), kv_pair_vallen(kv));
fill_dbt(&nextkv_dbt, kv_pair_val(nextkv), kv_pair_vallen(nextkv));
r = pma->dup_compare_fun(db, &kv_dbt, &nextkv_dbt);
......
......@@ -24,7 +24,7 @@ int pma_create(PMA *, pma_compare_fun_t compare_fun, int maxsize);
int pma_set_compare(PMA pma, pma_compare_fun_t compare_fun);
/* set the duplicate mode
0 -> no duplications, DB_DUP, DB_DUPSORT */
0 -> no duplications, TOKU_DB_DUP, TOKU_DB_DUPSORT */
int pma_set_dup_mode(PMA pma, int mode);
/* set the duplicate compare function */
......
......@@ -15,6 +15,7 @@
#include <ctype.h>
#include <unistd.h>
#include "brt.h"
#include "cachetable.h"
#include "log.h"
#include "memory.h"
......@@ -699,7 +700,12 @@ int __toku_db_set_dup_compare(DB *db, int (*dup_compare)(DB *, const DBT *, cons
}
int __toku_db_set_flags(DB * db, u_int32_t flags) {
int r= brt_set_flags(db->i->brt, flags);
u_int32_t tflags = 0;
if (flags & DB_DUP)
tflags += TOKU_DB_DUP;
if (flags & DB_DUPSORT)
tflags += TOKU_DB_DUPSORT;
int r= brt_set_flags(db->i->brt, tflags);
return r;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment