Commit 1f6db92c authored by Rich Prohaska's avatar Rich Prohaska

first cut of delboth addresses #182

git-svn-id: file:///svn/tokudb@1478 c7de825b-a66e-492c-adef-691d508d4ae1
parent b3c59517
......@@ -209,6 +209,7 @@ enum brt_cmd_type {
BRT_NONE = 0,
BRT_INSERT = 1,
BRT_DELETE = 2,
BRT_DELETE_BOTH = 3,
};
struct brt_cmd {
......
......@@ -4,6 +4,7 @@
#include "brt.h"
#include "key.h"
#include "pma.h"
#include "brt-internal.h"
#include "memory.h"
#include <assert.h>
......@@ -2200,6 +2201,80 @@ static void test_insert_delete_lookup(int n) {
r = toku_cachetable_close(&ct); assert(r==0);
}
/* insert <0,0>, <0,1>, .. <0,n>
delete_both <0,i> for all even i
verify <0,i> exists for all odd i */
void test_brt_delete_both(int n) {
if (verbose) printf("test_brt_delete_both:%d\n", n);
BRT t;
int r;
CACHETABLE ct;
char fname[]="testbrt.brt";
int i;
r = toku_brt_create_cachetable(&ct, 0, ZERO_LSN, NULL_LOGGER); assert(r==0);
unlink(fname);
r = toku_brt_create(&t); assert(r == 0);
r = toku_brt_set_flags(t, TOKU_DB_DUP + TOKU_DB_DUPSORT); assert(r == 0);
r = toku_brt_set_nodesize(t, 4096); assert(r == 0);
r = toku_brt_open(t, fname, fname, 0, 1, 1, 0, ct, null_txn);
assert(r==0);
DBT key, val;
int k, v;
for (i=0; i<n; i++) {
k = htonl(0); v = htonl(i);
r = toku_brt_insert(t, toku_fill_dbt(&key, &k, sizeof k), toku_fill_dbt(&val, &v, sizeof v), 0);
assert(r == 0);
}
for (i=0; i<n; i += 2) {
k = htonl(0); v = htonl(i);
r = toku_brt_delete_both(t, toku_fill_dbt(&key, &k, sizeof k), toku_fill_dbt(&val, &v, sizeof v)); assert(r == 0);
}
#if 0
for (i=1; i<n; i += 2) {
k = htonl(0);
toku_fill_dbt(&key, &k, sizeof k);
toku_init_dbt(&val); val.flags = DB_DBT_MALLOC;
r = toku_brt_lookup(t, &key, &val); assert(r == 0);
int vv;
assert(val.size == sizeof vv);
memcpy(&vv, val.data, val.size);
assert(vv == (int) htonl(i));
if (val.data) free(val.data);
r = toku_brt_delete_both(t, toku_fill_dbt(&key, &k, sizeof k), toku_fill_dbt(&val, &vv, sizeof vv)); assert(r == 0);
}
#endif
/* cursor should find only odd pairs */
BRT_CURSOR cursor;
r = toku_brt_cursor(t, &cursor); assert(r == 0);
for (i=1; ; i += 2) {
toku_init_dbt(&key); key.flags = DB_DBT_MALLOC;
toku_init_dbt(&val); val.flags = DB_DBT_MALLOC;
r = toku_brt_cursor_get(cursor, &key, &val, DB_NEXT, null_txn);
if (r != 0) break;
int vv;
assert(val.size == sizeof vv);
memcpy(&vv, val.data, val.size);
assert(vv == (int) htonl(i));
toku_free(key.data);
toku_free(val.data);
}
r = toku_brt_cursor_close(cursor); assert(r == 0);
r = toku_close_brt(t); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0);
}
static void test_brt_delete() {
test_brt_delete_empty(); toku_memory_check_all_free();
test_brt_delete_present(1); toku_memory_check_all_free();
......@@ -2217,7 +2292,7 @@ static void test_brt_delete() {
static void brt_blackbox_test (void) {
toku_memory_check = 1;
test_brt_delete_both(512); toku_memory_check_all_free();
test_wrongendian_compare(0, 2); toku_memory_check_all_free();
test_wrongendian_compare(1, 2); toku_memory_check_all_free();
test_wrongendian_compare(1, 257); toku_memory_check_all_free();
......
......@@ -199,16 +199,11 @@ void toku_brtnode_flush_callback (CACHEFILE cachefile, DISKOFF nodename, void *b
int toku_brtnode_fetch_callback (CACHEFILE cachefile, DISKOFF nodename, void **brtnode_pv, long *sizep, void*extraargs, LSN *written_lsn) {
BRT t =(BRT)extraargs;
BRTNODE *result=(BRTNODE*)brtnode_pv;
//printf("result=%p written_lsn=%p\n", result, written_lsn);
int r = toku_deserialize_brtnode_from(toku_cachefile_fd(cachefile), nodename, result, t->flags, t->nodesize,
t->compare_fun, t->dup_compare, t->db, toku_cachefile_filenum(t->cf));
if (r == 0)
*sizep = brtnode_size(*result);
//printf("result=%p written_lsn=%p\n", result, written_lsn);
//printf("disk_lsn=%lld\n", (*result)->disk_lsn.lsn);
//printf("ok\n");
*written_lsn = (*result)->disk_lsn;
//printf("ok\n");
//(*result)->parent_brtnode = 0; /* Don't know it right now. */
//printf("%s:%d installed %p (offset=%lld)\n", __FILE__, __LINE__, *result, nodename);
return r;
......@@ -332,7 +327,7 @@ static void initialize_brtnode (BRT t, BRTNODE n, DISKOFF nodename, int height)
} else {
int r = toku_pma_create(&n->u.l.buffer, t->compare_fun, t->db, toku_cachefile_filenum(t->cf), n->nodesize);
assert(r==0);
toku_pma_set_dup_mode(n->u.l.buffer, t->flags & (TOKU_DB_DUP|TOKU_DB_DUPSORT));
toku_pma_set_dup_mode(n->u.l.buffer, t->flags & (TOKU_DB_DUP+TOKU_DB_DUPSORT));
toku_pma_set_dup_compare(n->u.l.buffer, t->dup_compare);
static int rcount=0;
//printf("%s:%d n PMA= %p (rcount=%d)\n", __FILE__, __LINE__, n->u.l.buffer, rcount);
......@@ -723,7 +718,7 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
//verify_local_fingerprint_nonleaf(childa); verify_local_fingerprint_nonleaf(childb);
int tochildnum = childnum;
BRTNODE tochild = childa;
if (type == BRT_INSERT) {
if (type == BRT_INSERT || type == BRT_DELETE_BOTH) {
int cmp = brt_compare_pivot(t, &skd, &svd, childsplitk->data);
if (cmp < 0) {
;
......@@ -975,7 +970,7 @@ static int brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
} else if (cmd->type == BRT_DELETE) {
u_int32_t delta;
int r = toku_pma_delete(node->u.l.buffer, cmd->u.id.key, node->rand4fingerprint, &node->local_fingerprint, &delta);
int r = toku_pma_delete(node->u.l.buffer, cmd->u.id.key, 0, node->rand4fingerprint, &node->local_fingerprint, &delta);
if (r == BRT_OK) {
node->u.l.n_bytes_in_buffer -= delta;
node->dirty = 1;
......@@ -983,8 +978,19 @@ static int brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
*did_split = 0;
return BRT_OK;
} else
} else if (cmd->type == BRT_DELETE_BOTH) {
u_int32_t delta;
int r = toku_pma_delete(node->u.l.buffer, cmd->u.id.key, cmd->u.id.val, node->rand4fingerprint, &node->local_fingerprint, &delta);
if (r == BRT_OK) {
node->u.l.n_bytes_in_buffer -= delta;
node->dirty = 1;
}
*did_split = 0;
return BRT_OK;
} else {
return EINVAL;
}
}
/* find the rightmost child that the key/data will be inserted */
......@@ -1076,7 +1082,7 @@ static int brt_nonleaf_put_cmd_child (BRT t, BRTNODE node, BRT_CMD *cmd,
}
if (child_did_split) {
if (0) printf("brt_nonleaf_insert child_split %p\n", child);
assert(cmd->type == BRT_INSERT || cmd->type == BRT_DELETE);
assert(cmd->type <= BRT_DELETE_BOTH);
r = handle_split_of_child(t, node, childnum,
childa, childb, &childsplitk,
did_split, nodea, nodeb, splitk,
......@@ -1362,13 +1368,12 @@ static int brt_nonleaf_delete_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
return 0;
}
static int brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb,
DBT *splitk,
int debug,
TOKUTXN txn) {
if (cmd->type == BRT_INSERT)
if (cmd->type == BRT_INSERT || cmd->type == BRT_DELETE_BOTH)
return brt_nonleaf_insert_cmd(t, node, cmd, did_split, nodea, nodeb, splitk, debug, txn);
else if (cmd->type == BRT_DELETE)
return brt_nonleaf_delete_cmd(t, node, cmd, did_split, nodea, nodeb, splitk, debug, txn);
......@@ -1931,6 +1936,7 @@ static int brt_lookup_node (BRT brt, DISKOFF off, DBT *k, DBT *v, BRTNODE parent
} else
result = DB_NOTFOUND;
} else {
assert(0);
result = EINVAL;
}
//verify_local_fingerprint_nonleaf(node);
......@@ -1984,6 +1990,17 @@ int toku_brt_delete(BRT brt, DBT *key) {
return r;
}
int toku_brt_delete_both(BRT brt, DBT *key, DBT *val) {
int r;
BRT_CMD brtcmd;
brtcmd.type = BRT_DELETE_BOTH;
brtcmd.u.id.key = key;
brtcmd.u.id.val = val;
r = brt_root_put_cmd(brt, &brtcmd, 0);
return r;
}
int toku_verify_brtnode (BRT brt, DISKOFF off, bytevec lorange, ITEMLEN lolen, bytevec hirange, ITEMLEN hilen, int recurse, BRTNODE parent_brtnode);
int toku_dump_brtnode (BRT brt, DISKOFF off, int depth, bytevec lorange, ITEMLEN lolen, bytevec hirange, ITEMLEN hilen, BRTNODE parent_brtnode) {
......
......@@ -27,6 +27,7 @@ int toku_brt_remove_subdb(BRT brt, const char *dbname, u_int32_t flags);
int toku_brt_insert (BRT, DBT *, DBT *, TOKUTXN);
int toku_brt_lookup (BRT brt, DBT *k, DBT *v);
int toku_brt_delete (BRT brt, DBT *k);
int toku_brt_delete_both (BRT brt, DBT *k, DBT *v);
int toku_close_brt (BRT);
int toku_dump_brt (BRT brt);
void brt_fsync (BRT); /* fsync, but don't clear the caches. */
......
......@@ -29,11 +29,8 @@ u_int32_t toku_calccrc32_cmd (int type, const void *key, int keylen, const void
}
u_int32_t toku_calccrc32_cmdstruct (BRT_CMD *cmd) {
switch (cmd->type) {
case BRT_NONE:
case BRT_INSERT:
case BRT_DELETE:
if (cmd->type <= BRT_DELETE_BOTH)
return toku_calccrc32_cmd (cmd->type, cmd->u.id.key->data, cmd->u.id.key->size, cmd->u.id.val->data, cmd->u.id.val->size);
}
else
assert(0); /* Should not have come here. */
}
......@@ -289,13 +289,26 @@ static void do_insert (PMA pma, const void *key, int keylen, const void *data, i
toku_pma_verify_fingerprint(pma, rand4fingerprint, *sum);
}
static void do_delete (PMA pma, const void *key, int keylen, const void *data, int datalen, u_int32_t rand4fingerprint, u_int32_t *sum, u_int32_t *expect_fingerprint) {
static int do_delete (PMA pma, const void *key, int keylen, const void *data, int datalen, u_int32_t rand4fingerprint, u_int32_t *sum, u_int32_t *expect_fingerprint) {
DBT k;
assert(*sum==*expect_fingerprint);
int r = toku_pma_delete(pma, toku_fill_dbt(&k, key, keylen), rand4fingerprint, sum, 0);
assert(r==BRT_OK);
int r = toku_pma_delete(pma, toku_fill_dbt(&k, key, keylen), 0, rand4fingerprint, sum, 0);
if (r==BRT_OK) {
add_fingerprint_and_check(-rand4fingerprint, *sum, expect_fingerprint, key, keylen, data, datalen); // negative rand4 means subtract.
toku_pma_verify_fingerprint(pma, rand4fingerprint, *sum);
}
return r;
}
static int do_delete_both (PMA pma, const void *key, int keylen, const void *data, int datalen, u_int32_t rand4fingerprint, u_int32_t *sum, u_int32_t *expect_fingerprint) {
DBT k, v;
assert(*sum==*expect_fingerprint);
int r = toku_pma_delete(pma, toku_fill_dbt(&k, key, keylen), toku_fill_dbt(&v, data, datalen), rand4fingerprint, sum, 0);
if (r==BRT_OK) {
add_fingerprint_and_check(-rand4fingerprint, *sum, expect_fingerprint, key, keylen, data, datalen); // negative rand4 means subtract.
toku_pma_verify_fingerprint(pma, rand4fingerprint, *sum);
}
return r;
}
static void test_pma_random_pick (void) {
......@@ -319,11 +332,11 @@ static void test_pma_random_pick (void) {
assert(keylen==6); assert(vallen==6);
assert(strcmp(key,"hello")==0);
assert(strcmp(val,"there")==0);
r = toku_pma_delete(pma, toku_fill_dbt(&k, "nothello", 9), rand4fingerprint, &sum, 0);
r = toku_pma_delete(pma, toku_fill_dbt(&k, "nothello", 9), 0, rand4fingerprint, &sum, 0);
assert(r==DB_NOTFOUND);
assert(sum==expect_fingerprint); // didn't change because nothing was deleted.
do_delete(pma, "hello", 6, "there", 6, rand4fingerprint, &sum, &expect_fingerprint);
r = do_delete(pma, "hello", 6, "there", 6, rand4fingerprint, &sum, &expect_fingerprint); assert(r == 0);
r = toku_pma_random_pick(pma, &key, &keylen, &val, &vallen);
assert(r==DB_NOTFOUND);
......@@ -344,14 +357,14 @@ static void test_pma_random_pick (void) {
do_insert(pma, "aaf", 4, "fthere", 7, rand4fingerprint, &sum, &expect_fingerprint);
do_insert(pma, "aag", 4, "gthere", 7, rand4fingerprint, &sum, &expect_fingerprint);
toku_pma_verify_fingerprint(pma, rand4fingerprint, sum);
do_delete(pma, "aaa", 4, "athere", 7, rand4fingerprint, &sum, &expect_fingerprint);
do_delete(pma, "aab", 4, "bthere", 7, rand4fingerprint, &sum, &expect_fingerprint);
do_delete(pma, "aac", 4, "cthere", 7, rand4fingerprint, &sum, &expect_fingerprint);
do_delete(pma, "aad", 4, "dthere", 7, rand4fingerprint, &sum, &expect_fingerprint);
do_delete(pma, "aae", 4, "ethere", 7, rand4fingerprint, &sum, &expect_fingerprint);
r = do_delete(pma, "aaa", 4, "athere", 7, rand4fingerprint, &sum, &expect_fingerprint); assert(r == 0);
r = do_delete(pma, "aab", 4, "bthere", 7, rand4fingerprint, &sum, &expect_fingerprint); assert(r == 0);
r = do_delete(pma, "aac", 4, "cthere", 7, rand4fingerprint, &sum, &expect_fingerprint); assert(r == 0);
r = do_delete(pma, "aad", 4, "dthere", 7, rand4fingerprint, &sum, &expect_fingerprint); assert(r == 0);
r = do_delete(pma, "aae", 4, "ethere", 7, rand4fingerprint, &sum, &expect_fingerprint); assert(r == 0);
/* don't delete aaf */
do_delete(pma, "aag", 4, "gthere", 7, rand4fingerprint, &sum, &expect_fingerprint);
do_delete(pma, "hello", 6, "there", 6, rand4fingerprint, &sum, &expect_fingerprint);
r = do_delete(pma, "aag", 4, "gthere", 7, rand4fingerprint, &sum, &expect_fingerprint); assert(r == 0);
r = do_delete(pma, "hello", 6, "there", 6, rand4fingerprint, &sum, &expect_fingerprint); assert(r == 0);
r = toku_pma_random_pick(pma, &key, &keylen, &val, &vallen);
assert(r==0);
......@@ -729,7 +742,7 @@ static void test_pma_cursor_delete(int n) {
/* delete the first key, which is (int)(0) with value (0) */
k = 0;
do_delete(pma, &k, sizeof k, &k, sizeof k, rand4fingerprint, &sum, &expect_fingerprint);
error = do_delete(pma, &k, sizeof k, &k, sizeof k, rand4fingerprint, &sum, &expect_fingerprint); assert(error == 0);
/* cursor get should fail */
toku_init_dbt(&cursorkey); cursorkey.flags = DB_DBT_MALLOC;
......@@ -1394,7 +1407,7 @@ static void test_pma_delete_shrink(int n) {
int v=i;
snprintf(k, sizeof k, "%.10d", i);
do_delete(pma, k, strlen(k)+1, &v, sizeof v, rand4fingerprint, &sum, &expect_fingerprint);
r = do_delete(pma, k, strlen(k)+1, &v, sizeof v, rand4fingerprint, &sum, &expect_fingerprint); assert(r == 0);
}
assert(pma->N == PMA_MIN_ARRAY_SIZE);
......@@ -1442,7 +1455,7 @@ static void test_pma_delete_random(int n) {
int v = keys[i];
snprintf(k, sizeof k, "%.10d", keys[i]);
do_delete(pma, k, strlen(k)+1, &v, sizeof v, rand4fingerprint, &sum, &expect_fingerprint);
r = do_delete(pma, k, strlen(k)+1, &v, sizeof v, rand4fingerprint, &sum, &expect_fingerprint); assert(r == 0);
}
assert(pma->N == PMA_MIN_ARRAY_SIZE);
......@@ -1520,7 +1533,7 @@ static void test_pma_delete_cursor(int n) {
int v=i;
snprintf(k, sizeof k, "%.10d", i);
do_delete(pma, k, strlen(k)+1, &v, sizeof v, rand4fingerprint, &sum, &expect_fingerprint);
r = do_delete(pma, k, strlen(k)+1, &v, sizeof v, rand4fingerprint, &sum, &expect_fingerprint); assert(r == 0);
if (i == n-1)
assert_cursor_nokey(pmacursor);
else
......@@ -1574,7 +1587,7 @@ static void test_pma_delete_insert() {
assert_cursor_equal(pmacursor, 1);
k = 1; v = 1;
do_delete(pma, &k, sizeof k, &v, sizeof v, rand4fingerprint, &sum, &expect_fingerprint);
error = do_delete(pma, &k, sizeof k, &v, sizeof v, rand4fingerprint, &sum, &expect_fingerprint); assert(error == 0);
assert_cursor_nokey(pmacursor);
k = 1;
......@@ -1623,12 +1636,12 @@ static void test_pma_double_delete() {
assert_cursor_equal(pmacursor, 1);
k = 1; v = 1;
do_delete(pma, &k, sizeof k, &v, sizeof v, rand4fingerprint, &sum, &expect_fingerprint);
error = do_delete(pma, &k, sizeof k, &v, sizeof v, rand4fingerprint, &sum, &expect_fingerprint); assert(error == 0);
assert_cursor_nokey(pmacursor);
k = 1;
toku_fill_dbt(&key, &k, sizeof k);
error = toku_pma_delete(pma, &key, rand4fingerprint, &sum, 0);
error = toku_pma_delete(pma, &key, 0, rand4fingerprint, &sum, 0);
assert(error == DB_NOTFOUND);
assert(sum == expect_fingerprint);
......@@ -1672,7 +1685,7 @@ static void test_pma_cursor_first_delete_last() {
k = htonl(1);
v = 1;
do_delete(pma, &k, sizeof k, &v, sizeof v, rand4fingerprint, &sum, &expect_fingerprint);
error = do_delete(pma, &k, sizeof k, &v, sizeof v, rand4fingerprint, &sum, &expect_fingerprint); assert(error == 0);
assert(toku_pma_n_entries(pma) == 2);
error = toku_pma_cursor_set_position_last(pmacursor);
......@@ -1719,7 +1732,7 @@ static void test_pma_cursor_last_delete_first() {
k = htonl(2);
v = 2;
do_delete(pma, &k, sizeof k, &v, sizeof v, rand4fingerprint, &sum, &expect_fingerprint);
error = do_delete(pma, &k, sizeof k, &v, sizeof v, rand4fingerprint, &sum, &expect_fingerprint); assert(error == 0);
assert(toku_pma_n_entries(pma) == 2);
error = toku_pma_cursor_set_position_first(pmacursor);
......@@ -1733,6 +1746,82 @@ static void test_pma_cursor_last_delete_first() {
assert(error == 0);
}
void test_pma_delete_both(int n) {
if (verbose) printf("test_pma_delete_both:%d\n", n);
int error;
PMA pma;
u_int32_t rand4fingerprint = random();
u_int32_t sum = 0;
u_int32_t expect_fingerprint = 0;
error = toku_pma_create(&pma, toku_default_compare_fun, null_db, null_filenum, n * 4 * sizeof (int)); assert(error == 0);
toku_pma_set_dup_mode(pma, TOKU_DB_DUP + TOKU_DB_DUPSORT);
toku_pma_set_dup_compare(pma, toku_default_compare_fun);
int i;
for (i=0; i<n; i++) {
int k = htonl(i); int v = htonl(i);
do_insert(pma, &k, sizeof k, &v, sizeof v, rand4fingerprint, &sum, &expect_fingerprint);
}
for (i=0; i<n; i++) {
int k = htonl(i); int v = htonl(i+1);
error = do_delete_both(pma, &k, sizeof k, &v, sizeof v, rand4fingerprint, &sum, &expect_fingerprint); assert(error == DB_NOTFOUND);
}
for (i=0; i<n; i++) {
int k = htonl(i); int v = htonl(i);
error = do_delete_both(pma, &k, sizeof k, &v, sizeof v, rand4fingerprint, &sum, &expect_fingerprint); assert(error == 0);
}
for (i=0; i<n; i++) {
int k = htonl(i); int v = htonl(i);
error = do_delete(pma, &k, sizeof k, &v, sizeof v, rand4fingerprint, &sum, &expect_fingerprint); assert(error == DB_NOTFOUND);
}
error = toku_pma_free(&pma); assert(error == 0);
}
void test_pma_delete_both_dup(int n) {
if (verbose) printf("test_pma_delete_both:%d\n", n);
int error;
PMA pma;
u_int32_t rand4fingerprint = random();
u_int32_t sum = 0;
u_int32_t expect_fingerprint = 0;
error = toku_pma_create(&pma, toku_default_compare_fun, null_db, null_filenum, n * 4 * sizeof (int)); assert(error == 0);
toku_pma_set_dup_mode(pma, TOKU_DB_DUP + TOKU_DB_DUPSORT);
toku_pma_set_dup_compare(pma, toku_default_compare_fun);
int i;
for (i=0; i<n; i++) {
int k = htonl(1); int v = htonl(i);
do_insert(pma, &k, sizeof k, &v, sizeof v, rand4fingerprint, &sum, &expect_fingerprint);
}
{
int k = htonl(1); int v = htonl(n+1);
error = do_delete_both(pma, &k, sizeof k, &v, sizeof v, rand4fingerprint, &sum, &expect_fingerprint); assert(error == DB_NOTFOUND);
}
for (i=0; i<n; i++) {
int k = htonl(1); int v = htonl(i);
error = do_delete_both(pma, &k, sizeof k, &v, sizeof v, rand4fingerprint, &sum, &expect_fingerprint); assert(error == 0);
}
for (i=0; i<n; i++) {
int k = htonl(1); int v = htonl(i);
error = do_delete(pma, &k, sizeof k, &v, sizeof v, rand4fingerprint, &sum, &expect_fingerprint); assert(error == DB_NOTFOUND);
}
error = toku_pma_free(&pma); assert(error == 0);
}
static void test_pma_delete() {
test_pma_delete_shrink(256); local_memory_check_all_free();
test_pma_delete_random(256); local_memory_check_all_free();
......@@ -1741,6 +1830,8 @@ static void test_pma_delete() {
test_pma_double_delete(); local_memory_check_all_free();
test_pma_cursor_first_delete_last(); local_memory_check_all_free();
test_pma_cursor_last_delete_first(); local_memory_check_all_free();
test_pma_delete_both(512); local_memory_check_all_free();
test_pma_delete_both_dup(512); local_memory_check_all_free();
}
static void test_pma_already_there() {
......@@ -2275,7 +2366,7 @@ static void test_dup_key_delete(int n, int mode) {
}
k = htonl(2);
r = toku_pma_delete(pma, toku_fill_dbt(&key, &k, sizeof k), rand4fingerprint, &sum, 0);
r = toku_pma_delete(pma, toku_fill_dbt(&key, &k, sizeof k), 0, rand4fingerprint, &sum, 0);
if (r != 0) assert(n == 0);
expect_fingerprint = sum_before_all_the_duplicates;
assert(sum == expect_fingerprint);
......@@ -2506,7 +2597,6 @@ static void test_dup() {
static void pma_tests (void) {
toku_memory_check=1;
toku_test_keycompare(); local_memory_check_all_free();
test_pma_compare_fun(0); local_memory_check_all_free();
test_pma_compare_fun(1); local_memory_check_all_free();
......
......@@ -1061,11 +1061,11 @@ int toku_pma_insert (PMA pma, DBT *k, DBT *v, TOKUTXN txn, FILENUM filenum, DISK
}
}
static int pma_delete_dup (PMA pma, DBT *k, u_int32_t rand4sem, u_int32_t *fingerprint, u_int32_t *deleted_size) {
static int pma_delete_dup (PMA pma, DBT *k, DBT *v, u_int32_t rand4sem, u_int32_t *fingerprint, u_int32_t *deleted_size) {
/* find the left most matching key in the pma */
int found;
unsigned int lefthere;
lefthere = pma_left_search(pma, k, 0, 0, pma->N, &found);
lefthere = pma_left_search(pma, k, v, 0, pma->N, &found);
int rightfound = found, righthere = lefthere;
while (rightfound) {
struct kv_pair *kv = pma->pairs[righthere];
......@@ -1081,7 +1081,7 @@ static int pma_delete_dup (PMA pma, DBT *k, u_int32_t rand4sem, u_int32_t *finge
}
}
/* find the next matching key in the pma */
righthere = pma_next_key(pma, k, 0, righthere+1, pma->N, &rightfound);
righthere = pma_next_key(pma, k, v, righthere+1, pma->N, &rightfound);
}
if (found) {
/* check the density of the region centered around the deleted pairs */
......@@ -1090,30 +1090,34 @@ static int pma_delete_dup (PMA pma, DBT *k, u_int32_t rand4sem, u_int32_t *finge
return found ? BRT_OK : DB_NOTFOUND;
}
static int pma_delete_nodup (PMA pma, DBT *k, u_int32_t rand4sem, u_int32_t *fingerprint, u_int32_t *deleted_size) {
int idx = toku_pmainternal_find(pma, k);
struct kv_pair *kv = pma->pairs[idx];
if (!kv_pair_valid(kv)) {
if (0) printf("%s:%d l=%d r=%d\n", __FILE__, __LINE__, idx, DB_NOTFOUND);
static int pma_delete_nodup (PMA pma, DBT *k, DBT *v, u_int32_t rand4sem, u_int32_t *fingerprint, u_int32_t *deleted_size) {
/* find the left most matching key in the pma */
int found;
unsigned int here;
here = pma_left_search(pma, k, v, 0, pma->N, &found);
if (!found)
return DB_NOTFOUND;
struct kv_pair *kv = pma->pairs[here];
if (!kv_pair_valid(kv))
return DB_NOTFOUND;
}
*deleted_size = PMA_ITEM_OVERHEAD + KEY_VALUE_OVERHEAD + kv_pair_keylen(kv) + kv_pair_vallen(kv);
*fingerprint -= rand4sem*toku_calccrc32_kvpair (kv_pair_key_const(kv), kv_pair_keylen(kv), kv_pair_val_const(kv), kv_pair_vallen(kv));
pma->pairs[idx] = kv_pair_set_deleted(kv);
if (__pma_count_cursor_refs(pma, idx) == 0)
__pma_delete_finish(pma, idx);
pma->pairs[here] = kv_pair_set_deleted(kv);
if (__pma_count_cursor_refs(pma, here) == 0)
__pma_delete_finish(pma, here);
return BRT_OK;
}
int toku_pma_delete (PMA pma, DBT *k, u_int32_t rand4sem, u_int32_t *fingerprint, u_int32_t *deleted_size) {
int toku_pma_delete (PMA pma, DBT *k, DBT *v, u_int32_t rand4sem, u_int32_t *fingerprint, u_int32_t *deleted_size) {
v = v;
u_int32_t my_deleted_size;
if (!deleted_size)
deleted_size = &my_deleted_size;
*deleted_size = 0;
if (pma->dup_mode & TOKU_DB_DUP)
return pma_delete_dup(pma, k, rand4sem, fingerprint, deleted_size);
return pma_delete_dup(pma, k, v, rand4sem, fingerprint, deleted_size);
else
return pma_delete_nodup(pma, k, rand4sem, fingerprint, deleted_size);
return pma_delete_nodup(pma, k, v, rand4sem, fingerprint, deleted_size);
}
void __pma_delete_resume(PMA pma, int here) {
......
......@@ -17,8 +17,7 @@
typedef struct pma *PMA;
typedef struct pma_cursor *PMA_CURSOR;
/* compare 2 DBT's
return a value < 0, = 0, > 0 if a < b, a == b, a > b respectively */
/* compare 2 DBT's. return a value < 0, = 0, > 0 if a < b, a == b, a > b respectively */
typedef int (*pma_compare_fun_t)(DB *, const DBT *a, const DBT *b);
int toku_pma_create(PMA *, pma_compare_fun_t compare_fun, DB *, FILENUM filenum, int maxsize);
......@@ -46,12 +45,18 @@ int toku_pma_n_entries (PMA);
/* Any cursors should be updated. */
/* Duplicates the key and keylen. */
//enum pma_errors toku_pma_insert (PMA, bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen);
// The DB pointer is there so that the comparison function can be called.
enum pma_errors toku_pma_insert (PMA, DBT*, DBT*, TOKUTXN, FILENUM, DISKOFF, u_int32_t /*random for fingerprint */, u_int32_t */*fingerprint*/);
/* This returns an error if the key is NOT present. */
int pma_replace (PMA, bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen);
/* This returns an error if the key is NOT present. */
int toku_pma_delete (PMA, DBT *, u_int32_t /*random for fingerprint*/, u_int32_t */*fingerprint*/, u_int32_t *deleted_size);
/* delete pairs from the pma
if val is 0 then delete all pairs from the pma that match the key
if val is not 0 then only delete the pair that matches both the key and the val */
int toku_pma_delete (PMA, DBT */*key*/, DBT */*val*/, u_int32_t /*random for fingerprint*/, u_int32_t */*fingerprint*/, u_int32_t *deleted_size);
int toku_pma_insert_or_replace (PMA /*pma*/, DBT */*k*/, DBT */*v*/,
int */*replaced_v_size*/, /* If it is a replacement, set to the size of the old value, otherwise set to -1. */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment