Commit fb4bb998 authored by Rich Prohaska's avatar Rich Prohaska

verify the pma split key. addresses #277

git-svn-id: file:///svn/tokudb@1913 c7de825b-a66e-492c-adef-691d508d4ae1
parent 70de046e
......@@ -1056,10 +1056,9 @@ static void test_pma_compare_fun (int wrong_endian_p) {
}
static void test_pma_split_n(int n) {
PMA pmaa, pmac;
if (verbose) printf("test_pma_split_n:%d\n", n);
int error;
int i;
int na, nc;
u_int32_t expect_fingerprint = 0;
......@@ -1068,86 +1067,117 @@ static void test_pma_split_n(int n) {
u_int32_t crand = random();
u_int32_t csum = 0;
if (verbose) printf("test_pma_split_n:%d\n", n);
struct tuple {
uint64_t k;
uint64_t v;
} tuples[n];
error = toku_pma_create(&pmaa, toku_default_compare_fun, null_db, null_filenum, 0);
assert(error == 0);
error = toku_pma_create(&pmac, toku_default_compare_fun, null_db, null_filenum, 0);
assert(error == 0);
PMA pmaa;
error = toku_pma_create(&pmaa, toku_default_compare_fun, null_db, null_filenum, n*(sizeof (struct tuple) + 2*sizeof (int))); assert(error == 0);
PMA pmac;
error = toku_pma_create(&pmac, toku_default_compare_fun, null_db, null_filenum, n*(sizeof (struct tuple) + 2*sizeof (int))); assert(error == 0);
/* insert some kv pairs */
int i;
for (i=0; i<n; i++) {
char k[5]; int v;
tuples[i].k = htonl((random() & 0xffff0000) + i);
tuples[i].v = htonl(i);
}
sprintf(k, "%4.4d", i);
v = i;
do_insert(pmaa, k, strlen(k)+1, &v, sizeof v, arand, &asum, &expect_fingerprint);
/* insert some kv pairs */
for (i=0; i<n; i++) {
do_insert(pmaa, &tuples[i].k, sizeof tuples[i].k, &tuples[i].v, sizeof tuples[i].v, arand, &asum, &expect_fingerprint);
toku_pma_verify(pmaa);
}
if (verbose) { printf("a:"); toku_print_pma(pmaa); }
DBT splitk;
error = toku_pma_split(null_txn, null_filenum,
null_diskoff, pmaa, 0, arand, &asum, (LSN*)0,
0,
&splitk,
null_diskoff, pmac, 0, crand, &csum, (LSN*)0);
assert(error == 0);
/* verify the order of the pma's */
toku_pma_verify(pmaa);
toku_pma_verify(pmac);
toku_pma_verify_fingerprint(pmaa, arand, asum);
toku_pma_verify_fingerprint(pmac, crand, csum);
if (verbose) { printf("a:"); toku_print_pma(pmaa); }
na = toku_pma_n_entries(pmaa);
if (verbose) { printf("c:"); toku_print_pma(pmac); }
nc = toku_pma_n_entries(pmac);
int na = toku_pma_n_entries(pmaa);
int nc = toku_pma_n_entries(pmac);
assert(na + nc == n);
error = toku_pma_free(&pmaa);
assert(error == 0);
error = toku_pma_free(&pmac);
assert(error == 0);
int compare_key(PMA pma, DBT *k, struct kv_pair *split) {
DBT d;
return pma->compare_fun(pma->db, k, toku_fill_dbt(&d, kv_pair_key(split), kv_pair_keylen(split)));
}
/* verify the split key */
DBT key; memset(&key, 0, sizeof key); key.flags = DB_DBT_REALLOC;
PMA_CURSOR cursor;
if (na > 0) {
error = toku_pma_cursor(pmaa, &cursor, &skey, &sval); assert(error == 0);
error = toku_pma_cursor_set_position_last(cursor); assert(error == 0);
error = toku_pma_cursor_get_current(cursor, &key, 0, 0); assert(error == 0);
assert(compare_key(pmaa, &key, splitk.data) <= 0);
error = toku_pma_cursor_free(&cursor); assert(error == 0);
}
if (nc > 0) {
error = toku_pma_cursor(pmac, &cursor, &skey, &sval); assert(error == 0);
error = toku_pma_cursor_set_position_first(cursor); assert(error == 0);
error = toku_pma_cursor_get_current(cursor, &key, 0, 0); assert(error == 0);
assert(compare_key(pmaa, &key, splitk.data) > 0);
error = toku_pma_cursor_free(&cursor); assert(error == 0);
}
if (key.data) toku_free(key.data);
if (splitk.data) toku_free(splitk.data);
error = toku_pma_free(&pmaa); assert(error == 0);
error = toku_pma_free(&pmac); assert(error == 0);
}
static void test_pma_dup_split_n(int n, int dup_mode) {
PMA pmaa, pmac;
if (verbose) printf("test_pma_dup_split_n:%d %d\n", n, dup_mode);
int error;
int i;
int na, nc;
u_int32_t expect_asum = 0;
u_int32_t expect_fingerprint = 0;
u_int32_t arand = random();
u_int32_t asum = 0;
u_int32_t crand = random();
u_int32_t csum = 0;
if (verbose) printf("test_pma_dup_split_n:%d %d\n", n, dup_mode);
struct tuple {
uint64_t k;
uint64_t v;
} tuples[n];
error = toku_pma_create(&pmaa, toku_default_compare_fun, null_db, null_filenum, 0);
assert(error == 0);
PMA pmaa;
error = toku_pma_create(&pmaa, toku_default_compare_fun, null_db, null_filenum, n*(sizeof (struct tuple) + 2*sizeof (int))); assert(error == 0);
toku_pma_set_dup_mode(pmaa, dup_mode);
toku_pma_set_dup_compare(pmaa, toku_default_compare_fun);
error = toku_pma_create(&pmac, toku_default_compare_fun, null_db, null_filenum, 0);
assert(error == 0);
PMA pmac;
error = toku_pma_create(&pmac, toku_default_compare_fun, null_db, null_filenum, n*(sizeof (struct tuple) + 2*sizeof (int))); assert(error == 0);
toku_pma_set_dup_mode(pmac, dup_mode);
toku_pma_set_dup_compare(pmac, toku_default_compare_fun);
int i;
for (i=0; i<n; i++) {
tuples[i].k = htonl((random() & 0xffff0000) + i);
tuples[i].v = htonl(i);
}
/* insert some kv pairs */
int dupkey = random();
for (i=0; i<n; i++) {
int v = i;
do_insert(pmaa, &dupkey, sizeof dupkey, &v, sizeof v, arand, &asum, &expect_asum);
do_insert(pmaa, &tuples[i].k, sizeof tuples[i].k, &tuples[i].v, sizeof tuples[i].v, arand, &asum, &expect_fingerprint);
toku_pma_verify(pmaa);
}
if (verbose) { printf("a:"); toku_print_pma(pmaa); }
DBT splitk;
error = toku_pma_split(null_txn, null_filenum,
null_diskoff, pmaa, 0, arand, &asum, (LSN*)0,
&splitk,
......@@ -1158,29 +1188,44 @@ static void test_pma_dup_split_n(int n, int dup_mode) {
toku_pma_verify_fingerprint(pmaa, arand, asum);
toku_pma_verify_fingerprint(pmac, crand, csum);
if (0) { printf("a:"); toku_print_pma(pmaa); }
na = toku_pma_n_entries(pmaa);
if (0) { printf("c:"); toku_print_pma(pmac); }
nc = toku_pma_n_entries(pmac);
int na = toku_pma_n_entries(pmaa);
int nc = toku_pma_n_entries(pmac);
assert(na + nc == n);
if (n > 0) {
int kk, vv;
struct kv_pair *kv = splitk.data;
assert(kv_pair_keylen(kv) == sizeof kk);
memcpy(&kk, kv_pair_key(kv), kv_pair_keylen(kv));
assert(kk == dupkey);
if (dup_mode & TOKU_DB_DUPSORT)
assert(kv_pair_vallen(kv) == sizeof vv);
int compare_key_val(PMA pma, DBT *k, DBT *v, struct kv_pair *split) {
DBT d;
int cmp = pma->compare_fun(pma->db, k, toku_fill_dbt(&d, kv_pair_key(split), kv_pair_keylen(split)));
if (cmp == 0)
cmp = pma->dup_compare_fun(pma->db, v, toku_fill_dbt(&d, kv_pair_val(split), kv_pair_vallen(split)));
return cmp;
}
if (splitk.data) toku_free(splitk.data);
/* verify the split key */
DBT key; memset(&key, 0, sizeof key); key.flags = DB_DBT_REALLOC;
DBT val; memset(&val, 0, sizeof val); val.flags = DB_DBT_REALLOC;
PMA_CURSOR cursor;
if (na > 0) {
error = toku_pma_cursor(pmaa, &cursor, &skey, &sval); assert(error == 0);
error = toku_pma_cursor_set_position_last(cursor); assert(error == 0);
error = toku_pma_cursor_get_current(cursor, &key, &val, 0); assert(error == 0);
assert(compare_key_val(pmaa, &key, &val, splitk.data) <= 0);
error = toku_pma_cursor_free(&cursor); assert(error == 0);
}
if (nc > 0) {
error = toku_pma_cursor(pmac, &cursor, &skey, &sval); assert(error == 0);
error = toku_pma_cursor_set_position_first(cursor); assert(error == 0);
error = toku_pma_cursor_get_current(cursor, &key, &val, 0); assert(error == 0);
assert(compare_key_val(pmaa, &key, &val, splitk.data) > 0);
error = toku_pma_cursor_free(&cursor); assert(error == 0);
}
assert(na + nc == n);
if (key.data) toku_free(key.data);
if (val.data) toku_free(val.data);
error = toku_pma_free(&pmaa);
assert(error == 0);
error = toku_pma_free(&pmac);
assert(error == 0);
if (splitk.data) toku_free(splitk.data);
error = toku_pma_free(&pmaa); assert(error == 0);
error = toku_pma_free(&pmac); assert(error == 0);
}
static void test_pma_split_varkey(void) {
......@@ -1305,9 +1350,11 @@ static void test_pma_split(void) {
test_pma_split_n(4); local_memory_check_all_free();
test_pma_split_n(8); local_memory_check_all_free();
test_pma_split_n(9); local_memory_check_all_free();
test_pma_split_n(1000); local_memory_check_all_free();
test_pma_dup_split_n(0, TOKU_DB_DUP+TOKU_DB_DUPSORT); local_memory_check_all_free();
test_pma_dup_split_n(1, TOKU_DB_DUP+TOKU_DB_DUPSORT); local_memory_check_all_free();
test_pma_dup_split_n(9, TOKU_DB_DUP+TOKU_DB_DUPSORT); local_memory_check_all_free();
test_pma_dup_split_n(1000, TOKU_DB_DUP+TOKU_DB_DUPSORT); local_memory_check_all_free();
test_pma_split_varkey(); local_memory_check_all_free();
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment