Commit caad2144 authored by Rich Prohaska's avatar Rich Prohaska

use TOKU_DB_DUPSORT internally

git-svn-id: file:///svn/tokudb@664 c7de825b-a66e-492c-adef-691d508d4ae1
parent 70fc7b56
...@@ -58,6 +58,7 @@ struct brtnode { ...@@ -58,6 +58,7 @@ struct brtnode {
} u; } u;
}; };
/* pivot flags (must fit in 8 bits) */
enum { enum {
BRT_PIVOT_PRESENT_L = 1, BRT_PIVOT_PRESENT_L = 1,
BRT_PIVOT_PRESENT_R = 2, BRT_PIVOT_PRESENT_R = 2,
......
...@@ -347,7 +347,7 @@ int deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int flags, ...@@ -347,7 +347,7 @@ int deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int flags,
goto died1; goto died1;
} }
pma_set_dup_mode(result->u.l.buffer, flags); pma_set_dup_mode(result->u.l.buffer, flags);
if (flags & DB_DUPSORT) pma_set_dup_compare(result->u.l.buffer, dup_compare); if (flags & TOKU_DB_DUPSORT) pma_set_dup_compare(result->u.l.buffer, dup_compare);
//printf("%s:%d r PMA= %p\n", __FILE__, __LINE__, result->u.l.buffer); //printf("%s:%d r PMA= %p\n", __FILE__, __LINE__, result->u.l.buffer);
#define BRT_USE_PMA_BULK_INSERT 1 #define BRT_USE_PMA_BULK_INSERT 1
#if BRT_USE_PMA_BULK_INSERT #if BRT_USE_PMA_BULK_INSERT
......
...@@ -117,7 +117,7 @@ static void fixup_child_fingerprint(BRTNODE node, int childnum_of_node, BRTNODE ...@@ -117,7 +117,7 @@ static void fixup_child_fingerprint(BRTNODE node, int childnum_of_node, BRTNODE
static int brt_compare_pivot(BRT brt, DBT *key, DBT *data, bytevec ck, unsigned int cl, DB *db) { static int brt_compare_pivot(BRT brt, DBT *key, DBT *data, bytevec ck, unsigned int cl, DB *db) {
int cmp; int cmp;
DBT mydbt; DBT mydbt;
if (brt->flags & DB_DUPSORT) { if (brt->flags & TOKU_DB_DUPSORT) {
bytevec k; unsigned int kl; bytevec k; unsigned int kl;
bytevec d; unsigned int dl; bytevec d; unsigned int dl;
assert(cl >= sizeof dl); assert(cl >= sizeof dl);
...@@ -320,8 +320,8 @@ static void initialize_brtnode (BRT t, BRTNODE n, DISKOFF nodename, int height) ...@@ -320,8 +320,8 @@ static void initialize_brtnode (BRT t, BRTNODE n, DISKOFF nodename, int height)
} else { } else {
int r = pma_create(&n->u.l.buffer, t->compare_fun, n->nodesize); int r = pma_create(&n->u.l.buffer, t->compare_fun, n->nodesize);
assert(r==0); assert(r==0);
pma_set_dup_mode(n->u.l.buffer, t->flags & (DB_DUP+DB_DUPSORT)); pma_set_dup_mode(n->u.l.buffer, t->flags & (TOKU_DB_DUP+TOKU_DB_DUPSORT));
if (t->flags & DB_DUPSORT) if (t->flags & TOKU_DB_DUPSORT)
pma_set_dup_compare(n->u.l.buffer, t->dup_compare); pma_set_dup_compare(n->u.l.buffer, t->dup_compare);
static int rcount=0; static int rcount=0;
//printf("%s:%d n PMA= %p (rcount=%d)\n", __FILE__, __LINE__, n->u.l.buffer, rcount); //printf("%s:%d n PMA= %p (rcount=%d)\n", __FILE__, __LINE__, n->u.l.buffer, rcount);
...@@ -721,7 +721,7 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum, ...@@ -721,7 +721,7 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
; ;
} else if (cmp > 0) { } else if (cmp > 0) {
tochildnum = childnum+1; tochild = childb; tochildnum = childnum+1; tochild = childb;
} else if (t->flags & DB_DUP) { } else if (t->flags & TOKU_DB_DUP) {
if (node->u.n.pivotflags[childnum] & BRT_PIVOT_PRESENT_R) { if (node->u.n.pivotflags[childnum] & BRT_PIVOT_PRESENT_R) {
tochildnum = childnum+1; tochild = childb; tochildnum = childnum+1; tochild = childb;
} }
...@@ -981,7 +981,7 @@ static unsigned int brtnode_right_child (BRTNODE node, DBT *k, DBT *data, BRT t, ...@@ -981,7 +981,7 @@ static unsigned int brtnode_right_child (BRTNODE node, DBT *k, DBT *data, BRT t,
} else if (cmp > 0) { } else if (cmp > 0) {
if (maybe != -1) goto foundkeymatch; if (maybe != -1) goto foundkeymatch;
return i+1; return i+1;
} else if (t->flags & DB_DUP) { } else if (t->flags & TOKU_DB_DUP) {
if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_R) if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_R)
return i+1; return i+1;
if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_L) if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_L)
...@@ -1009,7 +1009,7 @@ static unsigned int brtnode_left_child (BRTNODE node , DBT *k, DBT *d, BRT t, DB ...@@ -1009,7 +1009,7 @@ static unsigned int brtnode_left_child (BRTNODE node , DBT *k, DBT *d, BRT t, DB
int cmp = brt_compare_pivot(t, k, d, node->u.n.childkeys[i], node->u.n.childkeylens[i], db); int cmp = brt_compare_pivot(t, k, d, node->u.n.childkeys[i], node->u.n.childkeylens[i], db);
if (cmp > 0) continue; if (cmp > 0) continue;
if (cmp < 0) return i; if (cmp < 0) return i;
if (t->flags & DB_DUP) { if (t->flags & TOKU_DB_DUP) {
if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_L) if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_L)
return i; return i;
if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_R) if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_R)
...@@ -1117,7 +1117,7 @@ static int brt_nonleaf_insert_cmd (BRT t, BRTNODE node, BRT_CMD *cmd, ...@@ -1117,7 +1117,7 @@ static int brt_nonleaf_insert_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
if (debug) printf("%s:%d %*sDoing hash_insert\n", __FILE__, __LINE__, debug, ""); if (debug) printf("%s:%d %*sDoing hash_insert\n", __FILE__, __LINE__, debug, "");
verify_counts(node); verify_counts(node);
if (found) { if (found) {
if (!(t->flags & DB_DUP)) { if (!(t->flags & TOKU_DB_DUP)) {
//printf("%s:%d found and deleting\n", __FILE__, __LINE__); //printf("%s:%d found and deleting\n", __FILE__, __LINE__);
node->local_fingerprint -= node->rand4fingerprint * toku_calccrc32_cmd(anytype, k->data, k->size, olddata, olddatalen); node->local_fingerprint -= node->rand4fingerprint * toku_calccrc32_cmd(anytype, k->data, k->size, olddata, olddatalen);
int r = toku_hash_delete(node->u.n.htables[childnum], k->data, k->size); int r = toku_hash_delete(node->u.n.htables[childnum], k->data, k->size);
...@@ -1296,7 +1296,7 @@ static int brt_nonleaf_delete_cmd (BRT t, BRTNODE node, BRT_CMD *cmd, ...@@ -1296,7 +1296,7 @@ static int brt_nonleaf_delete_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
} else if (cmp < 0) { } else if (cmp < 0) {
delchild_append(i); delchild_append(i);
break; break;
} else if (t->flags & DB_DUPSORT) { } else if (t->flags & TOKU_DB_DUPSORT) {
delchild_append(i); delchild_append(i);
delchild_append(i+1); delchild_append(i+1);
if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_L) { if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_L) {
...@@ -1307,7 +1307,7 @@ static int brt_nonleaf_delete_cmd (BRT t, BRTNODE node, BRT_CMD *cmd, ...@@ -1307,7 +1307,7 @@ static int brt_nonleaf_delete_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
node->u.n.pivotflags[i] &= ~BRT_PIVOT_PRESENT_R; node->u.n.pivotflags[i] &= ~BRT_PIVOT_PRESENT_R;
node->dirty = 1; node->dirty = 1;
} }
} else if (t->flags & DB_DUP) { } else if (t->flags & TOKU_DB_DUP) {
if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_L) { if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_L) {
delchild_append(i); delchild_append(i);
node->u.n.pivotflags[i] &= ~BRT_PIVOT_PRESENT_L; node->u.n.pivotflags[i] &= ~BRT_PIVOT_PRESENT_L;
...@@ -1848,7 +1848,7 @@ int brt_lookup_node (BRT brt, DISKOFF off, DBT *k, DBT *v, DB *db, BRTNODE paren ...@@ -1848,7 +1848,7 @@ int brt_lookup_node (BRT brt, DISKOFF off, DBT *k, DBT *v, DB *db, BRTNODE paren
int type; int type;
if (toku_hash_find (node->u.n.htables[childnum], k->data, k->size, &hanswer, &hanswerlen, &type)==0) { if (toku_hash_find (node->u.n.htables[childnum], k->data, k->size, &hanswer, &hanswerlen, &type)==0) {
if (type == BRT_INSERT) { if (type == BRT_INSERT) {
if ((brt->flags & DB_DUP)) { if ((brt->flags & TOKU_DB_DUP)) {
result = brt_lookup_node(brt, node->u.n.children[childnum], k, v, db, node); result = brt_lookup_node(brt, node->u.n.children[childnum], k, v, db, node);
if (result != 0) { if (result != 0) {
ybt_set_value(v, hanswer, hanswerlen, &brt->sval); ybt_set_value(v, hanswer, hanswerlen, &brt->sval);
...@@ -1861,7 +1861,7 @@ int brt_lookup_node (BRT brt, DISKOFF off, DBT *k, DBT *v, DB *db, BRTNODE paren ...@@ -1861,7 +1861,7 @@ int brt_lookup_node (BRT brt, DISKOFF off, DBT *k, DBT *v, DB *db, BRTNODE paren
result = 0; result = 0;
} }
} else if (type == BRT_DELETE) { } else if (type == BRT_DELETE) {
if ((brt->flags & DB_DUP) && toku_hash_find_idx (node->u.n.htables[childnum], k->data, k->size, 1, &hanswer, &hanswerlen, &type) == 0) { if ((brt->flags & TOKU_DB_DUP) && toku_hash_find_idx (node->u.n.htables[childnum], k->data, k->size, 1, &hanswer, &hanswerlen, &type) == 0) {
assert(type == BRT_INSERT); assert(type == BRT_INSERT);
ybt_set_value(v, hanswer, hanswerlen, &brt->sval); ybt_set_value(v, hanswer, hanswerlen, &brt->sval);
result = 0; result = 0;
......
...@@ -14,6 +14,12 @@ int open_brt (const char *fname, const char *dbname, int is_create, BRT *, int n ...@@ -14,6 +14,12 @@ int open_brt (const char *fname, const char *dbname, int is_create, BRT *, int n
int brt_create(BRT *); int brt_create(BRT *);
int brt_set_flags(BRT, int flags); int brt_set_flags(BRT, int flags);
enum {
TOKU_DB_DUP = 1,
TOKU_DB_DUPSORT = 2,
};
int brt_set_nodesize(BRT, int nodesize); int brt_set_nodesize(BRT, int nodesize);
int brt_set_bt_compare(BRT, int (*bt_compare)(DB *, const DBT*, const DBT*)); int brt_set_bt_compare(BRT, int (*bt_compare)(DB *, const DBT*, const DBT*));
int brt_set_dup_compare(BRT, int (*dup_compare)(DB *, const DBT*, const DBT*)); int brt_set_dup_compare(BRT, int (*dup_compare)(DB *, const DBT*, const DBT*));
......
...@@ -1180,9 +1180,9 @@ void test_pma_split(void) { ...@@ -1180,9 +1180,9 @@ void test_pma_split(void) {
test_pma_split_n(4); memory_check_all_free(); test_pma_split_n(4); memory_check_all_free();
test_pma_split_n(8); memory_check_all_free(); test_pma_split_n(8); memory_check_all_free();
test_pma_split_n(9); memory_check_all_free(); test_pma_split_n(9); memory_check_all_free();
test_pma_dup_split_n(0, DB_DUP); memory_check_all_free(); test_pma_dup_split_n(0, TOKU_DB_DUP); memory_check_all_free();
test_pma_dup_split_n(1, DB_DUP); memory_check_all_free(); test_pma_dup_split_n(1, TOKU_DB_DUP); memory_check_all_free();
test_pma_dup_split_n(9, DB_DUP); memory_check_all_free(); test_pma_dup_split_n(9, TOKU_DB_DUP); memory_check_all_free();
test_pma_split_varkey(); memory_check_all_free(); test_pma_split_varkey(); memory_check_all_free();
test_pma_split_cursor(); memory_check_all_free(); test_pma_split_cursor(); memory_check_all_free();
} }
...@@ -2048,7 +2048,7 @@ void test_dup_key_insert(int n) { ...@@ -2048,7 +2048,7 @@ void test_dup_key_insert(int n) {
assert(r == 0); assert(r == 0);
pma_verify(pma, null_db); pma_verify(pma, null_db);
r = pma_set_dup_mode(pma, DB_DUP); r = pma_set_dup_mode(pma, TOKU_DB_DUP);
assert(r == 0); assert(r == 0);
...@@ -2138,7 +2138,7 @@ void test_dup_key_delete(int n, int mode) { ...@@ -2138,7 +2138,7 @@ void test_dup_key_delete(int n, int mode) {
r = pma_set_dup_mode(pma, mode); r = pma_set_dup_mode(pma, mode);
assert(r == 0); assert(r == 0);
if (mode & DB_DUPSORT) { if (mode & TOKU_DB_DUPSORT) {
r = pma_set_dup_compare(pma, default_compare_fun); r = pma_set_dup_compare(pma, default_compare_fun);
assert(r == 0); assert(r == 0);
} }
...@@ -2236,7 +2236,7 @@ void test_dupsort_key_insert(int n, int dup_data) { ...@@ -2236,7 +2236,7 @@ void test_dupsort_key_insert(int n, int dup_data) {
assert(r == 0); assert(r == 0);
pma_verify(pma, null_db); pma_verify(pma, null_db);
r = pma_set_dup_mode(pma, DB_DUP+DB_DUPSORT); r = pma_set_dup_mode(pma, TOKU_DB_DUP+TOKU_DB_DUPSORT);
assert(r == 0); assert(r == 0);
r = pma_set_dup_compare(pma, default_compare_fun); r = pma_set_dup_compare(pma, default_compare_fun);
...@@ -2333,7 +2333,7 @@ void test_dup_key_lookup(int n, int mode) { ...@@ -2333,7 +2333,7 @@ void test_dup_key_lookup(int n, int mode) {
r = pma_set_dup_mode(pma, mode); r = pma_set_dup_mode(pma, mode);
assert(r == 0); assert(r == 0);
if (mode & DB_DUPSORT) { if (mode & TOKU_DB_DUPSORT) {
r = pma_set_dup_compare(pma, default_compare_fun); r = pma_set_dup_compare(pma, default_compare_fun);
assert(r == 0); assert(r == 0);
} }
...@@ -2380,16 +2380,16 @@ void test_dup() { ...@@ -2380,16 +2380,16 @@ void test_dup() {
test_dup_key_insert(0); memory_check_all_free(); test_dup_key_insert(0); memory_check_all_free();
test_dup_key_insert(2); memory_check_all_free(); test_dup_key_insert(2); memory_check_all_free();
test_dup_key_insert(1000); memory_check_all_free(); test_dup_key_insert(1000); memory_check_all_free();
test_dup_key_delete(0, DB_DUP); memory_check_all_free(); test_dup_key_delete(0, TOKU_DB_DUP); memory_check_all_free();
test_dup_key_delete(1000, DB_DUP); memory_check_all_free(); test_dup_key_delete(1000, TOKU_DB_DUP); memory_check_all_free();
test_dupsort_key_insert(2, 0); memory_check_all_free(); test_dupsort_key_insert(2, 0); memory_check_all_free();
test_dupsort_key_insert(1000, 0); memory_check_all_free(); test_dupsort_key_insert(1000, 0); memory_check_all_free();
test_dupsort_key_insert(2, 1); memory_check_all_free(); test_dupsort_key_insert(2, 1); memory_check_all_free();
test_dupsort_key_insert(1000, 1); memory_check_all_free(); test_dupsort_key_insert(1000, 1); memory_check_all_free();
test_dup_key_delete(0, DB_DUP+DB_DUPSORT); memory_check_all_free(); test_dup_key_delete(0, TOKU_DB_DUP+TOKU_DB_DUPSORT); memory_check_all_free();
test_dup_key_delete(1000, DB_DUP+DB_DUPSORT); memory_check_all_free(); test_dup_key_delete(1000, TOKU_DB_DUP+TOKU_DB_DUPSORT); memory_check_all_free();
test_dup_key_lookup(32, DB_DUP); memory_check_all_free(); test_dup_key_lookup(32, TOKU_DB_DUP); memory_check_all_free();
test_dup_key_lookup(32, DB_DUP+DB_DUPSORT); memory_check_all_free(); test_dup_key_lookup(32, TOKU_DB_DUP+TOKU_DB_DUPSORT); memory_check_all_free();
} }
void pma_tests (void) { void pma_tests (void) {
......
...@@ -622,13 +622,13 @@ int pma_set_compare(PMA pma, pma_compare_fun_t compare_fun) { ...@@ -622,13 +622,13 @@ int pma_set_compare(PMA pma, pma_compare_fun_t compare_fun) {
} }
int pma_set_dup_mode(PMA pma, int dup_mode) { int pma_set_dup_mode(PMA pma, int dup_mode) {
assert(dup_mode == 0 || dup_mode == DB_DUP || dup_mode == (DB_DUP+DB_DUPSORT)); assert(dup_mode == 0 || dup_mode == TOKU_DB_DUP || dup_mode == (TOKU_DB_DUP+TOKU_DB_DUPSORT));
pma->dup_mode = dup_mode; pma->dup_mode = dup_mode;
return 0; return 0;
} }
int pma_set_dup_compare(PMA pma, pma_compare_fun_t dup_compare_fun) { int pma_set_dup_compare(PMA pma, pma_compare_fun_t dup_compare_fun) {
assert(pma->dup_mode & DB_DUPSORT); assert(pma->dup_mode & TOKU_DB_DUPSORT);
pma->dup_compare_fun = dup_compare_fun; pma->dup_compare_fun = dup_compare_fun;
return 0; return 0;
} }
...@@ -734,7 +734,7 @@ int pma_cursor_get_current(PMA_CURSOR c, DBT *key, DBT *val) { ...@@ -734,7 +734,7 @@ int pma_cursor_get_current(PMA_CURSOR c, DBT *key, DBT *val) {
int pma_cursor_set_key(PMA_CURSOR c, DBT *key, DB *db) { int pma_cursor_set_key(PMA_CURSOR c, DBT *key, DB *db) {
PMA pma = c->pma; PMA pma = c->pma;
int here, found; int here, found;
if (pma->dup_mode & DB_DUP) { if (pma->dup_mode & TOKU_DB_DUP) {
here = __pma_left_search(pma, key, db, 0, pma->N, &found); here = __pma_left_search(pma, key, db, 0, pma->N, &found);
} else } else
here = pmainternal_find(pma, key, db); here = pmainternal_find(pma, key, db);
...@@ -775,7 +775,7 @@ int pma_cursor_set_both(PMA_CURSOR c, DBT *key, DBT *val, DB *db) { ...@@ -775,7 +775,7 @@ int pma_cursor_set_both(PMA_CURSOR c, DBT *key, DBT *val, DB *db) {
int pma_cursor_set_range(PMA_CURSOR c, DBT *key, DB *db) { int pma_cursor_set_range(PMA_CURSOR c, DBT *key, DB *db) {
PMA pma = c->pma; PMA pma = c->pma;
int here, found; int here, found;
if (pma->dup_mode & DB_DUP) if (pma->dup_mode & TOKU_DB_DUP)
here = __pma_left_search(pma, key, db, 0, pma->N, &found); here = __pma_left_search(pma, key, db, 0, pma->N, &found);
else else
here = pmainternal_find(pma, key, db); here = pmainternal_find(pma, key, db);
...@@ -885,7 +885,7 @@ int pmainternal_make_space_at (PMA pma, int idx) { ...@@ -885,7 +885,7 @@ int pmainternal_make_space_at (PMA pma, int idx) {
enum pma_errors pma_lookup (PMA pma, DBT *k, DBT *v, DB *db) { enum pma_errors pma_lookup (PMA pma, DBT *k, DBT *v, DB *db) {
int here, found; int here, found;
if (pma->dup_mode & DB_DUP) { if (pma->dup_mode & TOKU_DB_DUP) {
here = __pma_left_search(pma, k, db, 0, pma->N, &found); here = __pma_left_search(pma, k, db, 0, pma->N, &found);
} else } else
here = pmainternal_find(pma, k, db); here = pmainternal_find(pma, k, db);
...@@ -938,11 +938,11 @@ int pma_free (PMA *pmap) { ...@@ -938,11 +938,11 @@ int pma_free (PMA *pmap) {
int pma_insert (PMA pma, DBT *k, DBT *v, DB* db, TOKUTXN txn, DISKOFF diskoff, u_int32_t rand4fingerprint, u_int32_t *fingerprint) { int pma_insert (PMA pma, DBT *k, DBT *v, DB* db, TOKUTXN txn, DISKOFF diskoff, u_int32_t rand4fingerprint, u_int32_t *fingerprint) {
int found, idx; int found, idx;
if (pma->dup_mode & DB_DUPSORT) { if (pma->dup_mode & TOKU_DB_DUPSORT) {
idx = __pma_dup_search(pma, k, v, db, 0, pma->N, &found); idx = __pma_dup_search(pma, k, v, db, 0, pma->N, &found);
if (found) if (found)
idx += 1; idx += 1;
} else if (pma->dup_mode & DB_DUP) { } else if (pma->dup_mode & TOKU_DB_DUP) {
idx = __pma_right_search(pma, k, db, 0, pma->N, &found); idx = __pma_right_search(pma, k, db, 0, pma->N, &found);
if (found) if (found)
idx += 1; idx += 1;
...@@ -1039,7 +1039,7 @@ int pma_delete (PMA pma, DBT *k, DB *db, u_int32_t rand4sem, u_int32_t *fingerpr ...@@ -1039,7 +1039,7 @@ int pma_delete (PMA pma, DBT *k, DB *db, u_int32_t rand4sem, u_int32_t *fingerpr
if (!deleted_size) if (!deleted_size)
deleted_size = &my_deleted_size; deleted_size = &my_deleted_size;
*deleted_size = 0; *deleted_size = 0;
if (pma->dup_mode & DB_DUP) if (pma->dup_mode & TOKU_DB_DUP)
return pma_delete_dup(pma, k, db, rand4sem, fingerprint, deleted_size); return pma_delete_dup(pma, k, db, rand4sem, fingerprint, deleted_size);
else else
return pma_delete_nodup(pma, k, db, rand4sem, fingerprint, deleted_size); return pma_delete_nodup(pma, k, db, rand4sem, fingerprint, deleted_size);
...@@ -1136,11 +1136,11 @@ int pma_insert_or_replace (PMA pma, DBT *k, DBT *v, ...@@ -1136,11 +1136,11 @@ int pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
//printf("%s:%d v->size=%d\n", __FILE__, __LINE__, v->size); //printf("%s:%d v->size=%d\n", __FILE__, __LINE__, v->size);
int r; int r;
int idx, found; int idx, found;
if (pma->dup_mode & DB_DUPSORT) { if (pma->dup_mode & TOKU_DB_DUPSORT) {
idx = __pma_dup_search(pma, k, v, db, 0, pma->N, &found); idx = __pma_dup_search(pma, k, v, db, 0, pma->N, &found);
if (found) if (found)
idx += 1; idx += 1;
} else if (pma->dup_mode & DB_DUP) { } else if (pma->dup_mode & TOKU_DB_DUP) {
idx = __pma_right_search(pma, k, db, 0, pma->N, &found); idx = __pma_right_search(pma, k, db, 0, pma->N, &found);
if (found) if (found)
idx += 1; idx += 1;
...@@ -1301,7 +1301,7 @@ static void __pma_relocate_kvpairs(PMA pma) { ...@@ -1301,7 +1301,7 @@ static void __pma_relocate_kvpairs(PMA pma) {
static int __pma_compare_kv(PMA pma, struct kv_pair *a, struct kv_pair *b, DB *db) { static int __pma_compare_kv(PMA pma, struct kv_pair *a, struct kv_pair *b, DB *db) {
DBT dbta, dbtb; DBT dbta, dbtb;
int cmp = pma->compare_fun(db, fill_dbt(&dbta, kv_pair_key(a), kv_pair_keylen(a)), fill_dbt(&dbtb, kv_pair_key(b), kv_pair_keylen(b))); int cmp = pma->compare_fun(db, fill_dbt(&dbta, kv_pair_key(a), kv_pair_keylen(a)), fill_dbt(&dbtb, kv_pair_key(b), kv_pair_keylen(b)));
if (cmp == 0 && (pma->dup_mode & DB_DUPSORT)) { if (cmp == 0 && (pma->dup_mode & TOKU_DB_DUPSORT)) {
cmp = pma->dup_compare_fun(db, fill_dbt(&dbta, kv_pair_val(a), kv_pair_vallen(b)), fill_dbt(&dbtb, kv_pair_val(b), kv_pair_vallen(b))); cmp = pma->dup_compare_fun(db, fill_dbt(&dbta, kv_pair_val(a), kv_pair_vallen(b)), fill_dbt(&dbtb, kv_pair_val(b), kv_pair_vallen(b)));
} }
return cmp; return cmp;
...@@ -1380,7 +1380,7 @@ int pma_split(PMA origpma, unsigned int *origpma_size, DBT *splitk, DB *db, ...@@ -1380,7 +1380,7 @@ int pma_split(PMA origpma, unsigned int *origpma_size, DBT *splitk, DB *db,
if (splitk) { if (splitk) {
struct kv_pair *a = pairs[spliti-1].pair; struct kv_pair *a = pairs[spliti-1].pair;
if (origpma->dup_mode & DB_DUPSORT) { if (origpma->dup_mode & TOKU_DB_DUPSORT) {
int kl = kv_pair_keylen(a); int kl = kv_pair_keylen(a);
int vl = kv_pair_vallen(a); int vl = kv_pair_vallen(a);
splitk->size = (sizeof vl) + kl + vl; splitk->size = (sizeof vl) + kl + vl;
...@@ -1519,9 +1519,9 @@ void pma_verify(PMA pma, DB *db) { ...@@ -1519,9 +1519,9 @@ void pma_verify(PMA pma, DB *db) {
int r = pma->compare_fun(db, &kv_dbt, &nextkv_dbt); int r = pma->compare_fun(db, &kv_dbt, &nextkv_dbt);
if (pma->dup_mode == 0) if (pma->dup_mode == 0)
assert(r < 0); assert(r < 0);
else if (pma->dup_mode & DB_DUP) else if (pma->dup_mode & TOKU_DB_DUP)
assert(r <= 0); assert(r <= 0);
if (r == 0 && (pma->dup_mode & DB_DUPSORT)) { if (r == 0 && (pma->dup_mode & TOKU_DB_DUPSORT)) {
fill_dbt(&kv_dbt, kv_pair_val(kv), kv_pair_vallen(kv)); fill_dbt(&kv_dbt, kv_pair_val(kv), kv_pair_vallen(kv));
fill_dbt(&nextkv_dbt, kv_pair_val(nextkv), kv_pair_vallen(nextkv)); fill_dbt(&nextkv_dbt, kv_pair_val(nextkv), kv_pair_vallen(nextkv));
r = pma->dup_compare_fun(db, &kv_dbt, &nextkv_dbt); r = pma->dup_compare_fun(db, &kv_dbt, &nextkv_dbt);
......
...@@ -24,7 +24,7 @@ int pma_create(PMA *, pma_compare_fun_t compare_fun, int maxsize); ...@@ -24,7 +24,7 @@ int pma_create(PMA *, pma_compare_fun_t compare_fun, int maxsize);
int pma_set_compare(PMA pma, pma_compare_fun_t compare_fun); int pma_set_compare(PMA pma, pma_compare_fun_t compare_fun);
/* set the duplicate mode /* set the duplicate mode
0 -> no duplications, DB_DUP, DB_DUPSORT */ 0 -> no duplications, TOKU_DB_DUP, TOKU_DB_DUPSORT */
int pma_set_dup_mode(PMA pma, int mode); int pma_set_dup_mode(PMA pma, int mode);
/* set the duplicate compare function */ /* set the duplicate compare function */
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include <ctype.h> #include <ctype.h>
#include <unistd.h> #include <unistd.h>
#include "brt.h"
#include "cachetable.h" #include "cachetable.h"
#include "log.h" #include "log.h"
#include "memory.h" #include "memory.h"
...@@ -699,7 +700,12 @@ int __toku_db_set_dup_compare(DB *db, int (*dup_compare)(DB *, const DBT *, cons ...@@ -699,7 +700,12 @@ int __toku_db_set_dup_compare(DB *db, int (*dup_compare)(DB *, const DBT *, cons
} }
int __toku_db_set_flags(DB * db, u_int32_t flags) { int __toku_db_set_flags(DB * db, u_int32_t flags) {
int r= brt_set_flags(db->i->brt, flags); u_int32_t tflags = 0;
if (flags & DB_DUP)
tflags += TOKU_DB_DUP;
if (flags & DB_DUPSORT)
tflags += TOKU_DB_DUPSORT;
int r= brt_set_flags(db->i->brt, tflags);
return r; return r;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment