Commit ef0260e5 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

insert_or_replace in one step

git-svn-id: file:///svn/tokudb@111 c7de825b-a66e-492c-adef-691d508d4ae1
parent 360af9e3
...@@ -798,11 +798,23 @@ static int brtnode_maybe_push_down(BRT t, BRTNODE node, int *did_split, BRTNODE ...@@ -798,11 +798,23 @@ static int brtnode_maybe_push_down(BRT t, BRTNODE node, int *did_split, BRTNODE
return 0; return 0;
} }
#define INSERT_ALL_ALL_ONCE
static int brt_leaf_insert (BRT t, BRTNODE node, DBT *k, DBT *v, static int brt_leaf_insert (BRT t, BRTNODE node, DBT *k, DBT *v,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
int debug, int debug,
DB *db) { DB *db) {
DBT v2; DBT v2;
#ifdef INSERT_ALL_AT_ONCE
int replaced_v_size;
enum pma_errors pma_status = pma_insert_or_replace(node->u.l.buffer, k, init_dbt(&v2), db, &replaced_v_size);
assert(pma_status==BRT_OK);
if (replaced_v_size>=0) {
node->u.l.n_bytes_in_buffer += v.size - replaced_v_size;
} else {
node->u.l.n_bytes_in_buffer += k->size + v->size + KEY_VALUE_OVERHEAD;
}
#else
enum pma_errors pma_status = pma_lookup(node->u.l.buffer, k, init_dbt(&v2), db); enum pma_errors pma_status = pma_lookup(node->u.l.buffer, k, init_dbt(&v2), db);
if (pma_status==BRT_OK) { if (pma_status==BRT_OK) {
pma_status = pma_delete(node->u.l.buffer, k, db); pma_status = pma_delete(node->u.l.buffer, k, db);
...@@ -811,6 +823,7 @@ static int brt_leaf_insert (BRT t, BRTNODE node, DBT *k, DBT *v, ...@@ -811,6 +823,7 @@ static int brt_leaf_insert (BRT t, BRTNODE node, DBT *k, DBT *v,
} }
pma_status = pma_insert(node->u.l.buffer, k, v, db); pma_status = pma_insert(node->u.l.buffer, k, v, db);
node->u.l.n_bytes_in_buffer += k->size + v->size + KEY_VALUE_OVERHEAD; node->u.l.n_bytes_in_buffer += k->size + v->size + KEY_VALUE_OVERHEAD;
#endif
// If it doesn't fit, then split the leaf. // If it doesn't fit, then split the leaf.
if (serialize_brtnode_size(node) > node->nodesize) { if (serialize_brtnode_size(node) > node->nodesize) {
int r = brtleaf_split (t, node, nodea, nodeb, splitk, k->app_private, db); int r = brtleaf_split (t, node, nodea, nodeb, splitk, k->app_private, db);
......
...@@ -23,6 +23,16 @@ static inline struct kv_pair *kv_pair_malloc(void *key, int keylen, void *val, i ...@@ -23,6 +23,16 @@ static inline struct kv_pair *kv_pair_malloc(void *key, int keylen, void *val, i
return pair; return pair;
} }
/* replace the val, keep the same key */
static inline struct kv_pair *kv_pair_realloc_same_key(struct kv_pair *p, void *newval, int newvallen) {
struct kv_pair *pair = toku_realloc(p, sizeof (struct kv_pair) + p->keylen + newvallen);
if (pair) {
pair->vallen = newvallen;
memcpy(pair->key + pair->keylen, newval, newvallen);
}
return pair;
}
static inline void kv_pair_free(struct kv_pair *pair) { static inline void kv_pair_free(struct kv_pair *pair) {
toku_free_n(pair, sizeof(struct kv_pair)+pair->keylen+pair->vallen); toku_free_n(pair, sizeof(struct kv_pair)+pair->keylen+pair->vallen);
} }
......
...@@ -532,7 +532,7 @@ void assert_cursor_val(PMA_CURSOR cursor, int v) { ...@@ -532,7 +532,7 @@ void assert_cursor_val(PMA_CURSOR cursor, int v) {
} }
/* make sure cursors are adjusted when the pma grows */ /* make sure cursors are adjusted when the pma grows */
void test_pma_cursor_4() { void test_pma_cursor_4 (void) {
int error; int error;
PMA pma; PMA pma;
PMA_CURSOR cursora, cursorb, cursorc; PMA_CURSOR cursora, cursorb, cursorc;
...@@ -724,7 +724,7 @@ void test_pma_split_n(int n) { ...@@ -724,7 +724,7 @@ void test_pma_split_n(int n) {
assert(error == 0); assert(error == 0);
} }
void test_pma_split_varkey() { void test_pma_split_varkey(void) {
char *keys[] = { char *keys[] = {
"this", "is", "a", "key", "this is a really really big key", "zz", 0 }; "this", "is", "a", "key", "this is a really really big key", "zz", 0 };
PMA pmaa, pmab, pmac; PMA pmaa, pmab, pmac;
...@@ -835,7 +835,7 @@ void walk_cursor_reverse(const char *str, PMA_CURSOR cursor) { ...@@ -835,7 +835,7 @@ void walk_cursor_reverse(const char *str, PMA_CURSOR cursor) {
printf("\n"); printf("\n");
} }
void test_pma_split_cursor() { void test_pma_split_cursor(void) {
PMA pmaa, pmab, pmac; PMA pmaa, pmab, pmac;
PMA_CURSOR cursora, cursorb, cursorc; PMA_CURSOR cursora, cursorb, cursorc;
int error; int error;
...@@ -934,7 +934,7 @@ void test_pma_split_cursor() { ...@@ -934,7 +934,7 @@ void test_pma_split_cursor() {
assert(error == 0); assert(error == 0);
} }
void test_pma_split() { void test_pma_split(void) {
test_pma_split_n(0); memory_check_all_free(); test_pma_split_n(0); memory_check_all_free();
test_pma_split_n(1); memory_check_all_free(); test_pma_split_n(1); memory_check_all_free();
test_pma_split_n(2); memory_check_all_free(); test_pma_split_n(2); memory_check_all_free();
...@@ -1003,7 +1003,7 @@ void test_pma_bulk_insert_n(int n) { ...@@ -1003,7 +1003,7 @@ void test_pma_bulk_insert_n(int n) {
toku_free(vals); toku_free(vals);
} }
void test_pma_bulk_insert() { void test_pma_bulk_insert(void) {
test_pma_bulk_insert_n(0); memory_check_all_free(); test_pma_bulk_insert_n(0); memory_check_all_free();
test_pma_bulk_insert_n(1); memory_check_all_free(); test_pma_bulk_insert_n(1); memory_check_all_free();
test_pma_bulk_insert_n(2); memory_check_all_free(); test_pma_bulk_insert_n(2); memory_check_all_free();
...@@ -1014,6 +1014,41 @@ void test_pma_bulk_insert() { ...@@ -1014,6 +1014,41 @@ void test_pma_bulk_insert() {
test_pma_bulk_insert_n(32); memory_check_all_free(); test_pma_bulk_insert_n(32); memory_check_all_free();
} }
void test_pma_insert_or_replace(void) {
PMA pma;
int r;
DBT dbtk, dbtv;
int n_diff=-2;
r = pma_create(&pma, default_compare_fun);
assert(r==0);
r = pma_insert_or_replace(pma, fill_dbt(&dbtk, "aaa", 4), fill_dbt(&dbtv, "zzz", 4), 0, &n_diff);
assert(r==0); assert(n_diff==-1);
r = pma_lookup(pma, fill_dbt(&dbtk, "aaa", 4), init_dbt(&dbtv), 0);
assert(r==0); assert(dbtv.size==4); assert(memcmp(dbtv.data, "zzz", 4)==0);
r = pma_insert_or_replace(pma, fill_dbt(&dbtk, "bbbb", 5), fill_dbt(&dbtv, "ww", 3), 0, &n_diff);
assert(r==0); assert(n_diff==-1);
r = pma_lookup(pma, fill_dbt(&dbtk, "aaa", 4), init_dbt(&dbtv), 0);
assert(r==0); assert(dbtv.size==4); assert(memcmp(dbtv.data, "zzz", 4)==0);
r = pma_lookup(pma, fill_dbt(&dbtk, "bbbb", 5), init_dbt(&dbtv), 0);
assert(r==0); assert(dbtv.size==3); assert(memcmp(dbtv.data, "ww", 3)==0);
r = pma_insert_or_replace(pma, fill_dbt(&dbtk, "bbbb", 5), fill_dbt(&dbtv, "xxxx", 5), 0, &n_diff);
assert(r==0); assert(n_diff==3);
r = pma_lookup(pma, fill_dbt(&dbtk, "aaa", 4), init_dbt(&dbtv), 0);
assert(r==0); assert(dbtv.size==4); assert(memcmp(dbtv.data, "zzz", 4)==0);
r = pma_lookup(pma, fill_dbt(&dbtk, "bbbb", 5), init_dbt(&dbtv), 0);
assert(r==0); assert(dbtv.size==5); assert(memcmp(dbtv.data, "xxxx", 3)==0);
r=pma_free(&pma);
assert(r==0);
}
void pma_tests (void) { void pma_tests (void) {
memory_check=1; memory_check=1;
test_pma_compare_fun(0); memory_check_all_free(); test_pma_compare_fun(0); memory_check_all_free();
...@@ -1031,6 +1066,7 @@ void pma_tests (void) { ...@@ -1031,6 +1066,7 @@ void pma_tests (void) {
test_pma_cursor(); memory_check_all_free(); test_pma_cursor(); memory_check_all_free();
test_pma_split(); memory_check_all_free(); test_pma_split(); memory_check_all_free();
test_pma_bulk_insert(); memory_check_all_free(); test_pma_bulk_insert(); memory_check_all_free();
test_pma_insert_or_replace(); memory_check_all_free();
} }
int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__unused__))) { int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__unused__))) {
......
...@@ -575,6 +575,30 @@ int pma_delete (PMA pma, DBT *k, DB *db) { ...@@ -575,6 +575,30 @@ int pma_delete (PMA pma, DBT *k, DB *db) {
return BRT_OK; return BRT_OK;
} }
int pma_insert_or_replace (PMA pma, DBT *k, DBT *v, DB *db,
int *replaced_v_size /* If it is a replacement, set to the size of the old value, otherwise set to -1. */
) {
int idx = pmainternal_find(pma, k, db);
struct kv_pair *pair;
if (idx < pma_index_limit(pma) && (pair=pma->pairs[idx])) {
DBT k2;
if (0==pma->compare_fun(db, k, fill_dbt(&k2, pair->key, pair->keylen))) {
*replaced_v_size = pair->vallen;
pma->pairs[idx] = kv_pair_realloc_same_key(pair, v->data, v->size);
return BRT_OK; /* It is already here. Return an error. */
}
}
if (pma->pairs[idx]) {
idx = pmainternal_make_space_at (pma, idx); /* returns the new idx. */
}
assert(!pma->pairs[idx]);
pma->pairs[idx] = kv_pair_malloc(k->data, k->size, v->data, v->size);
assert(pma->pairs[idx]);
pma->n_pairs_present++;
*replaced_v_size = -1;
return BRT_OK;
}
void pma_iterate (PMA pma, void(*f)(bytevec,ITEMLEN,bytevec,ITEMLEN, void*), void*v) { void pma_iterate (PMA pma, void(*f)(bytevec,ITEMLEN,bytevec,ITEMLEN, void*), void*v) {
int i; int i;
for (i=0; i<pma_index_limit(pma); i++) { for (i=0; i<pma_index_limit(pma); i++) {
......
...@@ -32,6 +32,11 @@ int pma_replace (PMA, bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen ...@@ -32,6 +32,11 @@ int pma_replace (PMA, bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen
/* This returns an error if the key is NOT present. */ /* This returns an error if the key is NOT present. */
int pma_delete (PMA, DBT *, DB*); int pma_delete (PMA, DBT *, DB*);
int pma_insert_or_replace (PMA pma, DBT *k, DBT *v, DB *db,
int *replaced_v_size /* If it is a replacement, set to the size of the old value, otherwise set to -1. */
);
/* Exposes internals of the PMA by returning a pointer to the guts. /* Exposes internals of the PMA by returning a pointer to the guts.
* Don't modify the returned data. Don't free it. */ * Don't modify the returned data. Don't free it. */
enum pma_errors pma_lookup (PMA, DBT*, DBT*, DB*); enum pma_errors pma_lookup (PMA, DBT*, DBT*, DB*);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment