Commit 1620ec58 authored by Rich Prohaska's avatar Rich Prohaska

remove app_private; merge in dup pma;

git-svn-id: file:///svn/tokudb@567 c7de825b-a66e-492c-adef-691d508d4ae1
parent 0e463512
......@@ -57,6 +57,13 @@ struct brtnode {
} u;
};
enum {
BRT_PIVOT_PRESENT_L = 1,
BRT_PIVOT_PRESENT_R = 2,
BRT_PIVOT_TRUNC = 4,
BRT_PIVOT_FRONT_COMPRESS = 8,
};
struct brt_header {
int dirty;
unsigned int nodesize;
......@@ -106,10 +113,6 @@ void brtnode_free (BRTNODE *node);
//int write_brt_header (int fd, struct brt_header *header);
static inline void brtnode_set_dirty(BRTNODE node) {
node->dirty = 1;
}
#if 1
#define DEADBEEF ((void*)0xDEADBEEF)
#else
......
This diff is collapsed.
This diff is collapsed.
......@@ -270,7 +270,7 @@ static void do_insert (PMA pma, const void *key, int keylen, const void *data, i
static void do_delete (PMA pma, const void *key, int keylen, const void *data, int datalen, u_int32_t rand4fingerprint, u_int32_t *sum, u_int32_t *expect_fingerprint) {
DBT k;
assert(*sum==*expect_fingerprint);
int r = pma_delete(pma, fill_dbt(&k, key, keylen), 0, rand4fingerprint, sum);
int r = pma_delete(pma, fill_dbt(&k, key, keylen), 0, rand4fingerprint, sum, 0);
assert(r==BRT_OK);
add_fingerprint_and_check(-rand4fingerprint, *sum, expect_fingerprint, key, keylen, data, datalen); // negative rand4 means subtract.
pma_verify_fingerprint(pma, rand4fingerprint, *sum);
......@@ -297,7 +297,7 @@ static void test_pma_random_pick (void) {
assert(keylen==6); assert(vallen==6);
assert(strcmp(key,"hello")==0);
assert(strcmp(val,"there")==0);
r = pma_delete(pma, fill_dbt(&k, "nothello", 9), 0, rand4fingerprint, &sum);
r = pma_delete(pma, fill_dbt(&k, "nothello", 9), 0, rand4fingerprint, &sum, 0);
assert(r==DB_NOTFOUND);
assert(sum==expect_fingerprint); // didn't change because nothing was deleted.
......@@ -844,7 +844,7 @@ void test_pma_split_n(int n) {
printf("a:"); print_pma(pmaa);
error = pma_split(pmaa, 0, pmab, 0, brand, &bsum, pmac, 0, crand, &csum);
error = pma_split(pmaa, 0, 0, null_db, pmab, 0, brand, &bsum, pmac, 0, crand, &csum);
assert(error == 0);
pma_verify(pmaa, null_db);
pma_verify(pmab, null_db);
......@@ -870,6 +870,83 @@ void test_pma_split_n(int n) {
assert(error == 0);
}
void test_pma_dup_split_n(int n, int dup_mode) {
PMA pmaa, pmab, pmac;
int error;
int i;
int na, nb, nc;
u_int32_t rand4sum = random();
u_int32_t sum = 0;
u_int32_t expect_sum = 0;
u_int32_t brand = random();
u_int32_t bsum = 0;
u_int32_t crand = random();
u_int32_t csum = 0;
printf("test_pma_dup_split_n:%d %d\n", n, dup_mode);
error = pma_create(&pmaa, default_compare_fun, 0);
assert(error == 0);
pma_set_dup_mode(pmaa, dup_mode);
error = pma_create(&pmab, default_compare_fun, 0);
assert(error == 0);
pma_set_dup_mode(pmab, dup_mode);
error = pma_create(&pmac, default_compare_fun, 0);
assert(error == 0);
pma_set_dup_mode(pmac, dup_mode);
/* insert some kv pairs */
int dupkey = random();
for (i=0; i<n; i++) {
int v = i;
do_insert(pmaa, &dupkey, sizeof dupkey, &v, sizeof v, rand4sum, &sum, &expect_sum);
pma_verify(pmaa, null_db);
}
printf("a:"); print_pma(pmaa);
DBT splitk;
error = pma_split(pmaa, 0, &splitk, null_db, pmab, 0, brand, &bsum, pmac, 0, crand, &csum);
assert(error == 0);
pma_verify(pmaa, null_db);
pma_verify(pmab, null_db);
pma_verify(pmac, null_db);
pma_verify_fingerprint(pmab, brand, bsum);
pma_verify_fingerprint(pmac, crand, csum);
if (0) { printf("a:"); print_pma(pmaa); }
na = pma_n_entries(pmaa);
if (0) { printf("b:"); print_pma(pmab); }
nb = pma_n_entries(pmab);
if (0) { printf("c:"); print_pma(pmac); }
nc = pma_n_entries(pmac);
if (na > 0) {
int kk;
assert(splitk.size == sizeof kk);
memcpy(&kk, splitk.data, splitk.size);
assert(kk == dupkey);
if (nb > 0) assert(splitk.flags & BRT_PIVOT_PRESENT_L);
if (nc > 0) assert(splitk.flags & BRT_PIVOT_PRESENT_R);
}
if (splitk.data) toku_free(splitk.data);
assert(na == 0);
assert(nb + nc == n);
error = pma_free(&pmaa);
assert(error == 0);
error = pma_free(&pmab);
assert(error == 0);
error = pma_free(&pmac);
assert(error == 0);
}
void test_pma_split_varkey(void) {
char *keys[] = {
"this", "is", "a", "key", "this is a really really big key", "zz", 0 };
......@@ -905,7 +982,7 @@ void test_pma_split_varkey(void) {
printf("a:"); print_pma(pmaa);
error = pma_split(pmaa, 0, pmab, 0, brand, &bsum, pmac, 0, crand, &csum);
error = pma_split(pmaa, 0, 0, null_db, pmab, 0, brand, &bsum, pmac, 0, crand, &csum);
assert(error == 0);
pma_verify(pmaa, null_db);
pma_verify(pmab, null_db);
......@@ -1049,7 +1126,7 @@ void test_pma_split_cursor(void) {
// print_cursor("cursorc", cursorc);
assert_cursor_val(cursorc, 16);
error = pma_split(pmaa, 0, pmab, 0, brand, &bsum, pmac, 0, crand, &csum);
error = pma_split(pmaa, 0, 0, null_db, pmab, 0, brand, &bsum, pmac, 0, crand, &csum);
assert(error == 0);
pma_verify_fingerprint(pmab, brand, bsum);
......@@ -1103,6 +1180,9 @@ void test_pma_split(void) {
test_pma_split_n(4); memory_check_all_free();
test_pma_split_n(8); memory_check_all_free();
test_pma_split_n(9); memory_check_all_free();
test_pma_dup_split_n(0, DB_DUP); memory_check_all_free();
test_pma_dup_split_n(1, DB_DUP); memory_check_all_free();
test_pma_dup_split_n(9, DB_DUP); memory_check_all_free();
test_pma_split_varkey(); memory_check_all_free();
test_pma_split_cursor(); memory_check_all_free();
}
......@@ -1510,7 +1590,7 @@ void test_pma_double_delete() {
k = 1;
fill_dbt(&key, &k, sizeof k);
error = pma_delete(pma, &key, 0, rand4fingerprint, &sum);
error = pma_delete(pma, &key, 0, rand4fingerprint, &sum, 0);
assert(error == DB_NOTFOUND);
assert(sum == expect_fingerprint);
......@@ -2085,7 +2165,7 @@ void test_dup_key_delete(int n, int mode) {
}
k = htonl(2);
r = pma_delete(pma, fill_dbt(&key, &k, sizeof k), null_db, rand4fingerprint, &sum);
r = pma_delete(pma, fill_dbt(&key, &k, sizeof k), null_db, rand4fingerprint, &sum, 0);
if (r != 0) assert(n == 0);
expect_fingerprint = sum_before_all_the_duplicates;
assert(sum == expect_fingerprint);
......@@ -2140,9 +2220,10 @@ void test_dup_key_delete(int n, int mode) {
assert(r == 0);
}
/* insert n duplicate keys */
void test_dupsort_key_insert(int n) {
printf("test_dup_key_insert:%d\n", n);
/* insert n duplicate keys with random data
verify that the data is sorted */
void test_dupsort_key_insert(int n, int dup_data) {
printf("test_dupsort_key_insert:%d %d\n", n, dup_data);
PMA pma;
int r;
......@@ -2172,13 +2253,14 @@ void test_dupsort_key_insert(int n) {
do_insert(pma, &k, sizeof k, &v, sizeof v, rand4fingerprint, &sum, &expect_fingerprint);
pma_verify(pma, null_db);
k = htonl(2);
int values[n];
int i;
for (i=0; i<n; i++)
values[i] = (i==0 || dup_data) ? (int) htonl(random()) : values[i-1];
/* insert 2->n-i */
for (i=0; i<n; i++) {
k = htonl(2);
values[i] = htonl(random());
do_insert(pma, &k, sizeof k, &values[i], sizeof values[i], rand4fingerprint, &sum, &expect_fingerprint);
pma_verify(pma, null_db);
}
......@@ -2188,7 +2270,6 @@ void test_dupsort_key_insert(int n) {
r = pma_cursor(pma, &cursor);
assert(r == 0);
k = htonl(2);
fill_dbt(&key, &k, sizeof k);
r = pma_cursor_set_key(cursor, &key, 0);
if (r != 0) {
......@@ -2301,8 +2382,10 @@ void test_dup() {
test_dup_key_insert(1000); memory_check_all_free();
test_dup_key_delete(0, DB_DUP); memory_check_all_free();
test_dup_key_delete(1000, DB_DUP); memory_check_all_free();
test_dupsort_key_insert(2); memory_check_all_free();
test_dupsort_key_insert(1000); memory_check_all_free();
test_dupsort_key_insert(2, 0); memory_check_all_free();
test_dupsort_key_insert(1000, 0); memory_check_all_free();
test_dupsort_key_insert(2, 1); memory_check_all_free();
test_dupsort_key_insert(1000, 1); memory_check_all_free();
test_dup_key_delete(0, DB_DUP+DB_DUPSORT); memory_check_all_free();
test_dup_key_delete(1000, DB_DUP+DB_DUPSORT); memory_check_all_free();
test_dup_key_lookup(32, DB_DUP); memory_check_all_free();
......
......@@ -984,7 +984,7 @@ static int pma_next_key(PMA pma, DBT *k, DB *db, int here, int n, int *found) {
return here;
}
static int pma_delete_dup (PMA pma, DBT *k, DB *db, u_int32_t rand4sem, u_int32_t *fingerprint) {
static int pma_delete_dup (PMA pma, DBT *k, DB *db, u_int32_t rand4sem, u_int32_t *fingerprint, u_int32_t *deleted_size) {
/* find the left most matching key in the pma */
int found, lefthere;
lefthere = __pma_left_search(pma, k, db, 0, pma->N, &found);
......@@ -993,6 +993,7 @@ static int pma_delete_dup (PMA pma, DBT *k, DB *db, u_int32_t rand4sem, u_int32_
struct kv_pair *kv = pma->pairs[righthere];
if (kv_pair_valid(kv)) {
/* mark the pair as deleted */
*deleted_size += KEY_VALUE_OVERHEAD + kv_pair_keylen(kv) + kv_pair_vallen(kv);
*fingerprint -= rand4sem*toku_calccrc32_kvpair (kv_pair_key_const(kv), kv_pair_keylen(kv), kv_pair_val_const(kv), kv_pair_vallen(kv));
pma->pairs[righthere] = kv_pair_set_deleted(kv);
if (__pma_count_cursor_refs(pma, righthere) == 0) {
......@@ -1011,13 +1012,14 @@ static int pma_delete_dup (PMA pma, DBT *k, DB *db, u_int32_t rand4sem, u_int32_
return found ? BRT_OK : DB_NOTFOUND;
}
static int pma_delete_nodup (PMA pma, DBT *k, DB *db, u_int32_t rand4sem, u_int32_t *fingerprint) {
static int pma_delete_nodup (PMA pma, DBT *k, DB *db, u_int32_t rand4sem, u_int32_t *fingerprint, u_int32_t *deleted_size) {
int idx = pmainternal_find(pma, k, db);
struct kv_pair *kv = pma->pairs[idx];
if (!kv_pair_valid(kv)) {
if (0) printf("%s:%d l=%d r=%d\n", __FILE__, __LINE__, idx, DB_NOTFOUND);
return DB_NOTFOUND;
}
*deleted_size = KEY_VALUE_OVERHEAD + kv_pair_keylen(kv) + kv_pair_vallen(kv);
*fingerprint -= rand4sem*toku_calccrc32_kvpair (kv_pair_key_const(kv), kv_pair_keylen(kv), kv_pair_val_const(kv), kv_pair_vallen(kv));
pma->pairs[idx] = kv_pair_set_deleted(kv);
if (__pma_count_cursor_refs(pma, idx) == 0)
......@@ -1025,11 +1027,15 @@ static int pma_delete_nodup (PMA pma, DBT *k, DB *db, u_int32_t rand4sem, u_int3
return BRT_OK;
}
int pma_delete (PMA pma, DBT *k, DB *db, u_int32_t rand4sem, u_int32_t *fingerprint) {
int pma_delete (PMA pma, DBT *k, DB *db, u_int32_t rand4sem, u_int32_t *fingerprint, u_int32_t *deleted_size) {
u_int32_t my_deleted_size;
if (!deleted_size)
deleted_size = &my_deleted_size;
*deleted_size = 0;
if (pma->dup_mode & DB_DUP)
return pma_delete_dup(pma, k, db, rand4sem, fingerprint);
return pma_delete_dup(pma, k, db, rand4sem, fingerprint, deleted_size);
else
return pma_delete_nodup(pma, k, db, rand4sem, fingerprint);
return pma_delete_nodup(pma, k, db, rand4sem, fingerprint, deleted_size);
}
void __pma_delete_resume(PMA pma, int here) {
......@@ -1121,30 +1127,41 @@ int pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
DB *db, TOKUTXN txn, DISKOFF diskoff,
u_int32_t rand4fingerprint, u_int32_t *fingerprint) {
//printf("%s:%d v->size=%d\n", __FILE__, __LINE__, v->size);
int idx = pmainternal_find(pma, k, db);
struct kv_pair *kv;
int r;
if (idx < pma_index_limit(pma) && (kv = pma->pairs[idx])) {
DBT k2;
// printf("%s:%d\n", __FILE__, __LINE__);
kv = kv_pair_ptr(kv);
if (0==pma->compare_fun(db, k, fill_dbt(&k2, kv->key, kv->keylen))) {
if (!kv_pair_deleted(pma->pairs[idx])) {
*replaced_v_size = kv->vallen;
*fingerprint -= rand4fingerprint*toku_calccrc32_kvpair(kv_pair_key_const(kv), kv_pair_keylen(kv), kv_pair_val_const(kv), kv_pair_vallen(kv));
r=tokulogger_log_phys_add_or_delete_in_leaf(db, txn, diskoff, 0, kv);
if (r!=0) return r;
}
if (v->size == (unsigned int) kv_pair_vallen(kv)) {
memcpy(kv_pair_val(kv), v->data, v->size);
} else {
pma_mfree_kv_pair(pma, kv);
pma->pairs[idx] = pma_malloc_kv_pair(pma, k->data, k->size, v->data, v->size);
assert(pma->pairs[idx]);
int idx, found;
if (pma->dup_mode & DB_DUPSORT) {
idx = __pma_dup_search(pma, k, v, db, 0, pma->N, &found);
if (found)
idx += 1;
} else if (pma->dup_mode & DB_DUP) {
idx = __pma_right_search(pma, k, db, 0, pma->N, &found);
if (found)
idx += 1;
} else {
idx = pmainternal_find(pma, k, db);
struct kv_pair *kv;
if (idx < pma_index_limit(pma) && (kv = pma->pairs[idx])) {
DBT k2;
// printf("%s:%d\n", __FILE__, __LINE__);
kv = kv_pair_ptr(kv);
if (0==pma->compare_fun(db, k, fill_dbt(&k2, kv->key, kv->keylen))) {
if (!kv_pair_deleted(pma->pairs[idx])) {
*replaced_v_size = kv->vallen;
*fingerprint -= rand4fingerprint*toku_calccrc32_kvpair(kv_pair_key_const(kv), kv_pair_keylen(kv), kv_pair_val_const(kv), kv_pair_vallen(kv));
r=tokulogger_log_phys_add_or_delete_in_leaf(db, txn, diskoff, 0, kv);
if (r!=0) return r;
}
if (v->size == (unsigned int) kv_pair_vallen(kv)) {
memcpy(kv_pair_val(kv), v->data, v->size);
} else {
pma_mfree_kv_pair(pma, kv);
pma->pairs[idx] = pma_malloc_kv_pair(pma, k->data, k->size, v->data, v->size);
assert(pma->pairs[idx]);
}
r = tokulogger_log_phys_add_or_delete_in_leaf(db, txn, diskoff, 0, pma->pairs[idx]);
*fingerprint += rand4fingerprint*toku_calccrc32_kvpair(k->data, k->size, v->data, v->size);
return r;
}
r = tokulogger_log_phys_add_or_delete_in_leaf(db, txn, diskoff, 0, pma->pairs[idx]);
*fingerprint += rand4fingerprint*toku_calccrc32_kvpair(k->data, k->size, v->data, v->size);
return r;
}
}
if (kv_pair_inuse(pma->pairs[idx])) {
......@@ -1274,7 +1291,7 @@ static void __pma_relocate_kvpairs(PMA pma) {
#endif
int pma_split(PMA origpma, unsigned int *origpma_size,
int pma_split(PMA origpma, unsigned int *origpma_size, DBT *splitk, DB *db,
PMA leftpma, unsigned int *leftpma_size, u_int32_t leftrand4fp, u_int32_t *leftfingerprint,
PMA rightpma, unsigned int *rightpma_size, u_int32_t rightrand4fp, u_int32_t *rightfingerprint) {
int error;
......@@ -1289,8 +1306,11 @@ int pma_split(PMA origpma, unsigned int *origpma_size,
/* extract the pairs */
npairs = pma_n_entries(origpma);
if (npairs == 0)
if (npairs == 0) {
if (splitk)
memset(splitk, 0, sizeof *splitk);
return 0;
}
assert(pma_n_entries(leftpma) == 0);
assert(pma_n_entries(rightpma) == 0);
......@@ -1342,6 +1362,22 @@ int pma_split(PMA origpma, unsigned int *origpma_size,
*rightfingerprint += rightrand4fp * sum;
}
if (splitk) {
struct kv_pair *kv = pairs[spliti-1].pair;
assert(kv_pair_valid(kv));
splitk->size = kv_pair_keylen(kv);
splitk->data = memdup(kv_pair_key(kv), splitk->size);
splitk->flags = BRT_PIVOT_PRESENT_L;
if (spliti < npairs) {
kv = pairs[spliti].pair;
DBT k2;
int cmp = origpma->compare_fun(db, splitk, fill_dbt(&k2, kv_pair_key(kv), kv_pair_keylen(kv)));
if (cmp == 0) {
splitk->flags += BRT_PIVOT_PRESENT_R;
}
}
}
/* put the first half of pairs into the left pma */
n = spliti;
error = __pma_resize_array(leftpma, n + n/4, 0);
......@@ -1376,34 +1412,6 @@ int pma_split(PMA origpma, unsigned int *origpma_size,
return 0;
}
int pma_get_last(PMA pma, DBT *key, DBT *val) {
int position;
struct kv_pair *pair;
void *v; int vlen;
position = pma->N - 1;
while ((pair = pma->pairs[position]) == 0) {
if (position > 0)
position--;
else
return DB_NOTFOUND;
}
if (key) {
v = kv_pair_key(pair);
vlen = kv_pair_keylen(pair);
fill_dbt(key, memdup(v, vlen), vlen);
}
if (val) {
v = kv_pair_val(pair);
vlen = kv_pair_vallen(pair);
fill_dbt(val, memdup(v, vlen), vlen);
}
return 0;
}
static void __pma_bulk_cleanup(struct pma *pma, struct kv_pair_tag *pairs, int n) {
int i;
......@@ -1514,6 +1522,7 @@ void pma_verify(PMA pma, DB *db) {
}
#endif
}
void pma_verify_fingerprint (PMA pma, u_int32_t rand4fingerprint, u_int32_t fingerprint) {
u_int32_t actual_fingerprint=0;
PMA_ITERATE(pma, kv, kl, dv, dl,
......
......@@ -47,7 +47,7 @@ enum pma_errors pma_insert (PMA, DBT*, DBT*, DB*, TOKUTXN txn, DISKOFF, u_int32_
/* This returns an error if the key is NOT present. */
int pma_replace (PMA, bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen);
/* This returns an error if the key is NOT present. */
int pma_delete (PMA, DBT *, DB*, u_int32_t /*random for fingerprint*/, u_int32_t */*fingerprint*/);
int pma_delete (PMA, DBT *, DB*, u_int32_t /*random for fingerprint*/, u_int32_t */*fingerprint*/, u_int32_t *deleted_size);
int pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
int *replaced_v_size, /* If it is a replacement, set to the size of the old value, otherwise set to -1. */
......@@ -69,7 +69,7 @@ enum pma_errors pma_lookup (PMA, DBT*, DBT*, DB*);
* leftpma - the pma assigned keys <= pivot key
* rightpma - the pma assigned keys > pivot key
*/
int pma_split(PMA origpma, unsigned int *origpma_size,
int pma_split(PMA origpma, unsigned int *origpma_size, DBT *splitk, DB *db,
PMA leftpma, unsigned int *leftpma_size, u_int32_t leftrand4sum, u_int32_t *leftfingerprint,
PMA rightpma, unsigned int *rightpma_size, u_int32_t rightrand4sum, u_int32_t *rightfingerprint);
......@@ -112,9 +112,6 @@ int pma_cursor_set_range(PMA_CURSOR c, DBT *key, DB *db);
/* delete the key value pair under the cursor, return the size of the pair */
int pma_cursor_delete_under(PMA_CURSOR c, int *kvsize);
/* get the last key and value in the pma */
int pma_get_last(PMA pma, DBT *key, DBT *val);
int pma_random_pick(PMA, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen);
int pma_index_limit(PMA);
......
......@@ -16,15 +16,6 @@ DBT *fill_dbt(DBT *dbt, bytevec k, ITEMLEN len) {
return dbt;
}
DBT *fill_dbt_ap(DBT *dbt, bytevec k, ITEMLEN len, void *app_private __attribute__((unused))) {
fill_dbt(dbt, k, len);
#if USE_DBT_APP_PRIVATE
dbt->app_private=app_private;
#endif
return dbt;
}
int ybt_set_value (DBT *ybt, bytevec val, ITEMLEN vallen, void **staticptrp) {
if (ybt->flags==DB_DBT_MALLOC) {
domalloc:
......
......@@ -8,25 +8,8 @@
DBT* init_dbt (DBT *);
DBT *fill_dbt(DBT *dbt, bytevec k, ITEMLEN len);
DBT *fill_dbt_ap(DBT *dbt, bytevec k, ITEMLEN len, void *app_private);
int ybt_set_value (DBT *, bytevec val, ITEMLEN vallen, void **staticptrp);
#ifndef USE_DBT_APP_PRIVATE
#define USE_DBT_APP_PRIVATE 0
#endif
static inline void *dbt_get_app_private(DBT *dbt __attribute__((unused))) {
#if USE_DBT_APP_PRIVATE
return dbt->app_private;
#else
return 0;
#endif
}
static inline void dbt_set_app_private(DBT *dbt __attribute__((unused)), void *ap __attribute__((unused))) {
#if USE_DBT_APP_PRIVATE
dbt->app_private = ap;
#endif
}
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment