Commit fba743d2 authored by Rich Prohaska's avatar Rich Prohaska

remove the unsorted duplicates code from the pma. addresses #247

git-svn-id: file:///svn/tokudb@1537 c7de825b-a66e-492c-adef-691d508d4ae1
parent e4f197a8
......@@ -931,12 +931,15 @@ static void test_pma_dup_split_n(int n, int dup_mode) {
error = toku_pma_create(&pmaa, toku_default_compare_fun, null_db, null_filenum, 0);
assert(error == 0);
toku_pma_set_dup_mode(pmaa, dup_mode);
toku_pma_set_dup_compare(pmaa, toku_default_compare_fun);
error = toku_pma_create(&pmab, toku_default_compare_fun, null_db, null_filenum, 0);
assert(error == 0);
toku_pma_set_dup_mode(pmab, dup_mode);
toku_pma_set_dup_compare(pmab, toku_default_compare_fun);
error = toku_pma_create(&pmac, toku_default_compare_fun, null_db, null_filenum, 0);
assert(error == 0);
toku_pma_set_dup_mode(pmac, dup_mode);
toku_pma_set_dup_compare(pmac, toku_default_compare_fun);
/* insert some kv pairs */
int dupkey = random();
......@@ -1231,9 +1234,9 @@ static void test_pma_split(void) {
test_pma_split_n(4); local_memory_check_all_free();
test_pma_split_n(8); local_memory_check_all_free();
test_pma_split_n(9); local_memory_check_all_free();
test_pma_dup_split_n(0, TOKU_DB_DUP); local_memory_check_all_free();
test_pma_dup_split_n(1, TOKU_DB_DUP); local_memory_check_all_free();
test_pma_dup_split_n(9, TOKU_DB_DUP); local_memory_check_all_free();
test_pma_dup_split_n(0, TOKU_DB_DUP+TOKU_DB_DUPSORT); local_memory_check_all_free();
test_pma_dup_split_n(1, TOKU_DB_DUP+TOKU_DB_DUPSORT); local_memory_check_all_free();
test_pma_dup_split_n(9, TOKU_DB_DUP+TOKU_DB_DUPSORT); local_memory_check_all_free();
test_pma_split_varkey(); local_memory_check_all_free();
test_pma_split_cursor(); local_memory_check_all_free();
}
......@@ -2234,93 +2237,6 @@ static void test_nodup_key_insert(int n) {
assert(r == 0);
}
/* insert n duplicate keys */
static void test_dup_key_insert(int n) {
if (verbose) printf("test_dup_key_insert:%d\n", n);
PMA pma;
int r;
u_int32_t rand4fingerprint = random();
u_int32_t sum = 0;
u_int32_t expect_fingerprint = 0;
r = toku_pma_create(&pma, toku_default_compare_fun, null_db, null_filenum, (n + 2) * (8 + sizeof (int) + sizeof (int)));
assert(r == 0);
toku_pma_verify(pma);
r = toku_pma_set_dup_mode(pma, TOKU_DB_DUP);
assert(r == 0);
DBT key, val;
int k, v;
/* insert 1->1, 3->3 */
k = htonl(1); v = 1;
do_insert(pma, &k, sizeof k, &v, sizeof v, rand4fingerprint, &sum, &expect_fingerprint);
toku_pma_verify(pma);
k = htonl(3); v = 3;
do_insert(pma, &k, sizeof k, &v, sizeof v, rand4fingerprint, &sum, &expect_fingerprint);
toku_pma_verify(pma);
int i;
/* insert 2->0, 2->1, .. 2->n-1 */
for (i=0; i<n; i++) {
k = htonl(2);
v = i;
do_insert(pma, &k, sizeof k, &v, sizeof v, rand4fingerprint, &sum, &expect_fingerprint);
toku_pma_verify(pma);
}
/* cursor walk from key k should find values 0, 1, .. n-1 */
PMA_CURSOR cursor;
r = toku_pma_cursor(pma, &cursor, &skey, &sval);
assert(r == 0);
k = htonl(2);
toku_fill_dbt(&key, &k, sizeof k);
r = toku_pma_cursor_set_both(cursor, &key, 0);
if (r != 0) {
assert(n == 0);
} else {
i = 0;
while (1) {
toku_init_dbt(&key); key.flags = DB_DBT_MALLOC;
toku_init_dbt(&val); val.flags = DB_DBT_MALLOC;
r = toku_pma_cursor_get_current(cursor, &key, &val, 0);
assert(r == 0);
int kk;
assert(key.size == sizeof kk);
memcpy(&kk, key.data, key.size);
if (k != kk) {
toku_free(key.data);
toku_free(val.data);
break;
}
int vv;
assert(val.size == sizeof vv);
memcpy(&vv, val.data, val.size);
assert(vv == i);
toku_free(key.data);
toku_free(val.data);
i += 1;
r = toku_pma_cursor_set_position_next(cursor);
if (r != 0)
break;
}
assert(i == n);
}
r = toku_pma_cursor_free(&cursor);
assert(r == 0);
r = toku_pma_free(&pma);
assert(r == 0);
}
/* insert n duplicate keys, delete key, verify all keys are deleted */
static void test_dup_key_delete(int n, int mode) {
if (verbose) printf("test_dup_key_delete:%d %x\n", n, mode);
......@@ -2578,20 +2494,10 @@ static void test_dup_key_lookup(int n, int mode) {
static void test_dup() {
test_nodup_key_insert(2); local_memory_check_all_free();
test_dup_key_insert(0); local_memory_check_all_free();
test_dup_key_insert(2); local_memory_check_all_free();
test_dup_key_insert(1000); local_memory_check_all_free();
test_dup_key_delete(0, TOKU_DB_DUP); local_memory_check_all_free();
test_dup_key_delete(1000, TOKU_DB_DUP); local_memory_check_all_free();
test_dupsort_key_insert(2, 0); local_memory_check_all_free();
test_dupsort_key_insert(1000, 0); local_memory_check_all_free();
#if PMA_DUP_DUP
test_dupsort_key_insert(2, 1); local_memory_check_all_free();
test_dupsort_key_insert(1000, 1); local_memory_check_all_free();
#endif
test_dup_key_delete(0, TOKU_DB_DUP+TOKU_DB_DUPSORT); local_memory_check_all_free();
test_dup_key_delete(1000, TOKU_DB_DUP+TOKU_DB_DUPSORT); local_memory_check_all_free();
test_dup_key_lookup(32, TOKU_DB_DUP); local_memory_check_all_free();
test_dup_key_lookup(32, TOKU_DB_DUP+TOKU_DB_DUPSORT); local_memory_check_all_free();
}
......@@ -2620,7 +2526,6 @@ static void pma_tests (void) {
test_pma_cursor_set_key(); local_memory_check_all_free();
test_pma_cursor_set_range(); local_memory_check_all_free();
test_pma_cursor_delete_under(); local_memory_check_all_free();
test_pma_cursor_delete_under_mode(3, TOKU_DB_DUP); local_memory_check_all_free();
test_pma_cursor_delete_under_mode(3, TOKU_DB_DUP+TOKU_DB_DUPSORT); local_memory_check_all_free();
test_pma_cursor_set_both(); local_memory_check_all_free();
test_dup();
......
......@@ -649,7 +649,8 @@ int toku_pma_set_compare(PMA pma, pma_compare_fun_t compare_fun) {
}
int toku_pma_set_dup_mode(PMA pma, int dup_mode) {
assert(dup_mode == 0 || dup_mode == TOKU_DB_DUP || dup_mode == (TOKU_DB_DUP+TOKU_DB_DUPSORT));
if (!(dup_mode == 0 || dup_mode == (TOKU_DB_DUP+TOKU_DB_DUPSORT)))
return EINVAL;
pma->dup_mode = dup_mode;
return 0;
}
......@@ -830,25 +831,19 @@ int toku_pma_cursor_set_range_both(PMA_CURSOR c, DBT *key, DBT *val) {
static void pma_cursor_key_last(PMA_CURSOR c, int *lastkeymatch) {
*lastkeymatch = 1;
PMA pma = c->pma;
if (pma->dup_mode & TOKU_DB_DUP) {
if (pma->dup_mode & TOKU_DB_DUPSORT) {
int here, found;
/* get the current key */
here = c->position; assert(0 <= here && here < (int) pma->N);
struct kv_pair *kv = kv_pair_ptr(pma->pairs[here]);
DBT currentkey; toku_fill_dbt(&currentkey, kv_pair_key(kv), kv_pair_keylen(kv));
DBT currentval, *v;
if (pma->dup_mode & TOKU_DB_DUPSORT) {
toku_fill_dbt(&currentval, kv_pair_val(kv), kv_pair_vallen(kv));
v = &currentval;
} else
v = 0;
DBT currentval; toku_fill_dbt(&currentval, kv_pair_val(kv), kv_pair_vallen(kv));
/* check if the next key == current key */
here = c->position+1;
for (;;) {
here = pma_next_key(pma, &currentkey, v, here, pma->N, &found);
here = pma_next_key(pma, &currentkey, &currentval, here, pma->N, &found);
if (!found) break;
if (kv_pair_valid(pma->pairs[here])) {
*lastkeymatch = 0; /* next key == current key */
......@@ -859,7 +854,7 @@ static void pma_cursor_key_last(PMA_CURSOR c, int *lastkeymatch) {
/* check if the prev key == current key */
here = c->position-1;
for (;;) {
here = pma_prev_key(pma, &currentkey, v, here, pma->N, &found);
here = pma_prev_key(pma, &currentkey, &currentval, here, pma->N, &found);
if (!found) break;
if (kv_pair_valid(pma->pairs[here])) {
*lastkeymatch = 0; /* prev key == current key */
......@@ -963,7 +958,7 @@ int toku_pmainternal_make_space_at (TOKUTXN txn, FILENUM filenum, DISKOFF offset
enum pma_errors toku_pma_lookup (PMA pma, DBT *k, DBT *v) {
unsigned int here;
int found;
if (pma->dup_mode & TOKU_DB_DUP) {
if (pma->dup_mode & TOKU_DB_DUPSORT) {
here = pma_left_search(pma, k, 0, 0, pma->N, &found);
} else
here = toku_pmainternal_find(pma, k);
......@@ -1020,9 +1015,6 @@ int toku_pma_insert (PMA pma, DBT *k, DBT *v, TOKUTXN txn, FILENUM filenum, DISK
if (pma->dup_mode & TOKU_DB_DUPSORT) {
idx = pma_right_search(pma, k, v, 0, pma->N, &found);
if (found) return BRT_ALREADY_THERE;
} else if (pma->dup_mode & TOKU_DB_DUP) {
idx = pma_right_search(pma, k, 0, 0, pma->N, &found);
if (found) idx += 1;
} else {
idx = toku_pmainternal_find(pma, k);
if (idx < toku_pma_index_limit(pma) && pma->pairs[idx]) {
......@@ -1114,7 +1106,7 @@ int toku_pma_delete (PMA pma, DBT *k, DBT *v, u_int32_t rand4sem, u_int32_t *fin
if (!deleted_size)
deleted_size = &my_deleted_size;
*deleted_size = 0;
if (pma->dup_mode & TOKU_DB_DUP)
if (pma->dup_mode & TOKU_DB_DUPSORT)
return pma_delete_dup(pma, k, v, rand4sem, fingerprint, deleted_size);
else
return pma_delete_nodup(pma, k, v, rand4sem, fingerprint, deleted_size);
......@@ -1212,16 +1204,9 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
int found;
if (pma->dup_mode & TOKU_DB_DUPSORT) {
idx = pma_right_search(pma, k, v, 0, pma->N, &found);
#if PMA_DUP_DUP
if (found) idx += 1;
#else
if (found) {
kv = kv_pair_ptr(pma->pairs[idx]); goto replaceit;
}
#endif
} else if (pma->dup_mode & TOKU_DB_DUP) {
idx = pma_right_search(pma, k, 0, 0, pma->N, &found);
if (found) idx += 1;
} else {
idx = toku_pmainternal_find(pma, k);
if (idx < toku_pma_index_limit(pma) && (kv = pma->pairs[idx])) {
......@@ -1596,7 +1581,7 @@ void toku_pma_verify(PMA pma) {
int r = pma->compare_fun(pma->db, &kv_dbt, &nextkv_dbt);
if (pma->dup_mode == 0)
assert(r < 0);
else if (pma->dup_mode & TOKU_DB_DUP)
else if (pma->dup_mode & TOKU_DB_DUPSORT)
assert(r <= 0);
if (r == 0 && (pma->dup_mode & TOKU_DB_DUPSORT)) {
toku_fill_dbt(&kv_dbt, kv_pair_val(kv), kv_pair_vallen(kv));
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment