Commit f1efa7b9 authored by Rich Prohaska's avatar Rich Prohaska

simplify the pma (remove the pma cursors and the deleted bit). closes #250

git-svn-id: file:///svn/tokudb@1891 c7de825b-a66e-492c-adef-691d508d4ae1
parent 87ccce98
...@@ -3,13 +3,6 @@ ...@@ -3,13 +3,6 @@
#include "pma.h" #include "pma.h"
#include "mempool.h" #include "mempool.h"
struct pma_cursor {
PMA pma;
int position; /* -1 if the position is undefined. */
struct list next;
void **sskey, *ssval; /* Used in dbts. When a cursor is created, you must provide a void** to return results in. */
};
struct pma { struct pma {
enum typ_tag tag; enum typ_tag tag;
int dup_mode; int dup_mode;
...@@ -25,7 +18,6 @@ struct pma { ...@@ -25,7 +18,6 @@ struct pma {
* Regions of size 128 are 60% full. Regions of size 256 are 50% full. * Regions of size 128 are 60% full. Regions of size 256 are 50% full.
* The density step is 0.10. */ * The density step is 0.10. */
double ldt_step; /* lower density threshold step */ double ldt_step; /* lower density threshold step */
struct list cursors;
pma_compare_fun_t compare_fun; pma_compare_fun_t compare_fun;
pma_compare_fun_t dup_compare_fun; pma_compare_fun_t dup_compare_fun;
DB *db; /* Passed to the compare functions. */ DB *db; /* Passed to the compare functions. */
......
...@@ -498,6 +498,167 @@ static void test_pma_iterate2 (void) { ...@@ -498,6 +498,167 @@ static void test_pma_iterate2 (void) {
r=toku_pma_free(&pma1); assert(r==0); assert(pma1==0); r=toku_pma_free(&pma1); assert(r==0); assert(pma1==0);
} }
typedef struct pma_cursor {
PMA pma;
DBT key;
DBT val;
void *sskey;
void *ssval;
} *PMA_CURSOR;
int toku_pma_cursor(PMA pma, PMA_CURSOR *cursorptr, void **sskey, void **ssval) {
PMA_CURSOR cursor = toku_malloc(sizeof *cursor);
if (cursor == 0) return ENOMEM;
cursor->pma = pma;
toku_init_dbt(&cursor->key);
toku_init_dbt(&cursor->val);
cursor->sskey = sskey;
cursor->ssval = ssval;
*cursorptr = cursor;
return 0;
}
static inline void toku_destroy_dbt(DBT *dbt) {
if (dbt->data && (dbt->flags & DB_DBT_MALLOC)) {
toku_free(dbt->data);
dbt->data = 0;
}
}
int toku_pma_cursor_free(PMA_CURSOR *cursorptr) {
PMA_CURSOR cursor = *cursorptr; *cursorptr = 0;
toku_destroy_dbt(&cursor->key);
toku_destroy_dbt(&cursor->val);
toku_free_n(cursor, sizeof *cursor);
return 0;
}
static void pma_cursor_set_key_val(PMA_CURSOR cursor, DBT *newkey, DBT *newval) {
toku_destroy_dbt(&cursor->key);
toku_destroy_dbt(&cursor->val);
cursor->key = *newkey; toku_init_dbt(newkey);
cursor->val = *newval; toku_init_dbt(newval);
}
static int cursor_compare_one(brt_search_t *so, DBT *x, DBT *y) {
so = so; x = x; y = y;
return 1;
}
int toku_pma_cursor_set_position_first (PMA_CURSOR cursor) {
DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_MALLOC;
DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_MALLOC;
brt_search_t so; brt_search_init(&so, cursor_compare_one, BRT_SEARCH_LEFT, 0, 0, 0);
int r = toku_pma_search(cursor->pma, &so, &newkey, &newval);
if (r == 0)
pma_cursor_set_key_val(cursor, &newkey, &newval);
toku_destroy_dbt(&newkey);
toku_destroy_dbt(&newval);
return r;
}
int toku_pma_cursor_set_position_last (PMA_CURSOR cursor) {
DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_MALLOC;
DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_MALLOC;
brt_search_t so; brt_search_init(&so, cursor_compare_one, BRT_SEARCH_RIGHT, 0, 0, 0);
int r = toku_pma_search(cursor->pma, &so, &newkey, &newval);
if (r == 0)
pma_cursor_set_key_val(cursor, &newkey, &newval);
toku_destroy_dbt(&newkey);
toku_destroy_dbt(&newval);
return r;
}
static int compare_kv_xy(PMA pma, DBT *k, DBT *v, DBT *x, DBT *y) {
int cmp = pma->compare_fun(pma->db, k, x);
if (cmp == 0 && v && y)
cmp = pma->compare_fun(pma->db, v, y);
return cmp;
}
static int cursor_compare_next(brt_search_t *so, DBT *x, DBT *y) {
PMA pma = so->context;
return compare_kv_xy(pma, so->k, so->v, x, y) < 0;
}
int toku_pma_cursor_set_position_next (PMA_CURSOR cursor) {
DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_MALLOC;
DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_MALLOC;
brt_search_t so; brt_search_init(&so, cursor_compare_next, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->pma);
int r = toku_pma_search(cursor->pma, &so, &newkey, &newval);
if (r == 0)
pma_cursor_set_key_val(cursor, &newkey, &newval);
toku_destroy_dbt(&newkey);
toku_destroy_dbt(&newval);
return r;
}
int toku_pma_cursor_set_position_prev (PMA_CURSOR cursor) {
cursor = cursor; assert(0);
return 0;
}
static int cursor_compare_both(brt_search_t *so, DBT *x, DBT *y) {
PMA pma = so->context;
return compare_kv_xy(pma, so->k, so->v, x, y) <= 0;
}
int toku_pma_cursor_set_both(PMA_CURSOR cursor, DBT *key, DBT *val) {
DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_MALLOC;
DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_MALLOC;
brt_search_t so; brt_search_init(&so, cursor_compare_both, BRT_SEARCH_LEFT, key, val, cursor->pma);
int r = toku_pma_search(cursor->pma, &so, &newkey, &newval);
if (r != 0 || compare_kv_xy(cursor->pma, key, val, &newkey, &newval) != 0) {
r = DB_NOTFOUND;
} else
pma_cursor_set_key_val(cursor, &newkey, &newval);
toku_destroy_dbt(&newkey);
toku_destroy_dbt(&newval);
return r;
}
int toku_pma_cursor_get_current(PMA_CURSOR cursor, DBT *key, DBT *val, int even_deleted) {
assert(even_deleted == 0);
if (cursor->key.data == 0 || cursor->val.data == 0)
return EINVAL;
DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_MALLOC;
DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_MALLOC;
brt_search_t so; brt_search_init(&so, cursor_compare_both, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->pma);
int r = toku_pma_search(cursor->pma, &so, &newkey, &newval);
if (r != 0 || compare_kv_xy(cursor->pma, &cursor->key, &cursor->val, &newkey, &newval) != 0) {
r = DB_KEYEMPTY;
}
toku_destroy_dbt(&newkey);
toku_destroy_dbt(&newval);
if (r != 0)
return r;
if (key)
r = toku_dbt_set_value(key, cursor->key.data, cursor->key.size, cursor->sskey);
if (val && r == 0)
r = toku_dbt_set_value(val, cursor->val.data, cursor->val.size, cursor->ssval);
return r;
}
int toku_pma_cursor_set_range_both(PMA_CURSOR cursor, DBT *key, DBT *val) {
DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_MALLOC;
DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_MALLOC;
brt_search_t so; brt_search_init(&so, cursor_compare_both, BRT_SEARCH_LEFT, key, val, cursor->pma);
int r = toku_pma_search(cursor->pma, &so, &newkey, &newval);
if (r == 0)
pma_cursor_set_key_val(cursor, &newkey, &newval);
toku_destroy_dbt(&newkey);
toku_destroy_dbt(&newval);
return r;
}
int toku_pma_cursor_delete_under(PMA_CURSOR cursor, int *kvsize, u_int32_t rand4sem, u_int32_t *fingerprint) {
cursor = cursor; kvsize = kvsize; rand4sem = rand4sem; fingerprint = fingerprint;
assert(0);
return 0;
}
/* Check to see if we can create and kill a cursor. */ /* Check to see if we can create and kill a cursor. */
static void test_pma_cursor_0 (void) { static void test_pma_cursor_0 (void) {
PMA pma; PMA pma;
...@@ -506,7 +667,9 @@ static void test_pma_cursor_0 (void) { ...@@ -506,7 +667,9 @@ static void test_pma_cursor_0 (void) {
r=toku_pma_create(&pma, toku_default_compare_fun, null_db, null_filenum, 0); assert(r==0); r=toku_pma_create(&pma, toku_default_compare_fun, null_db, null_filenum, 0); assert(r==0);
r=toku_pma_cursor(pma, &c, &skey, &sval); assert(r==0); assert(c!=0); r=toku_pma_cursor(pma, &c, &skey, &sval); assert(r==0); assert(c!=0);
if (verbose) printf("%s:%d\n", __FILE__, __LINE__); if (verbose) printf("%s:%d\n", __FILE__, __LINE__);
#if OLDCURSORS
r=toku_pma_free(&pma); assert(r!=0); /* didn't deallocate the cursor. */ r=toku_pma_free(&pma); assert(r!=0); /* didn't deallocate the cursor. */
#endif
if (verbose) printf("%s:%d\n", __FILE__, __LINE__); if (verbose) printf("%s:%d\n", __FILE__, __LINE__);
r=toku_pma_cursor_free(&c); assert(r==0); r=toku_pma_cursor_free(&c); assert(r==0);
if (verbose) printf("%s:%d\n", __FILE__, __LINE__); if (verbose) printf("%s:%d\n", __FILE__, __LINE__);
...@@ -516,6 +679,7 @@ static void test_pma_cursor_0 (void) { ...@@ -516,6 +679,7 @@ static void test_pma_cursor_0 (void) {
/* Make sure we can free the cursors in any order. There is a doubly linked list of cursors /* Make sure we can free the cursors in any order. There is a doubly linked list of cursors
* and if we free them in a different order, then different unlinking code is invoked. */ * and if we free them in a different order, then different unlinking code is invoked. */
static void test_pma_cursor_1 (void) { static void test_pma_cursor_1 (void) {
#if OLDCURSORS
PMA pma; PMA pma;
PMA_CURSOR c0=0,c1=0,c2=0; PMA_CURSOR c0=0,c1=0,c2=0;
int r; int r;
...@@ -543,6 +707,7 @@ static void test_pma_cursor_1 (void) { ...@@ -543,6 +707,7 @@ static void test_pma_cursor_1 (void) {
r=toku_pma_free(&pma); assert(r==0); r=toku_pma_free(&pma); assert(r==0);
} }
#endif
} }
static void test_pma_cursor_2 (void) { static void test_pma_cursor_2 (void) {
...@@ -1045,6 +1210,7 @@ static void print_cursor(const char *str, PMA_CURSOR cursor) { ...@@ -1045,6 +1210,7 @@ static void print_cursor(const char *str, PMA_CURSOR cursor) {
} }
#endif #endif
#if OLDCURSORS
static void walk_cursor(const char *str, PMA_CURSOR cursor) { static void walk_cursor(const char *str, PMA_CURSOR cursor) {
DBT key, val; DBT key, val;
int error; int error;
...@@ -1086,8 +1252,10 @@ static void walk_cursor_reverse(const char *str, PMA_CURSOR cursor) { ...@@ -1086,8 +1252,10 @@ static void walk_cursor_reverse(const char *str, PMA_CURSOR cursor) {
} }
if (verbose) printf("\n"); if (verbose) printf("\n");
} }
#endif
static void test_pma_split_cursor(void) { static void test_pma_split_cursor(void) {
#if OLDCURSORS
PMA pmaa, pmac; PMA pmaa, pmac;
PMA_CURSOR cursora, cursorb, cursorc; PMA_CURSOR cursora, cursorb, cursorc;
int error; int error;
...@@ -1187,6 +1355,7 @@ static void test_pma_split_cursor(void) { ...@@ -1187,6 +1355,7 @@ static void test_pma_split_cursor(void) {
assert(error == 0); assert(error == 0);
error = toku_pma_free(&pmac); error = toku_pma_free(&pmac);
assert(error == 0); assert(error == 0);
#endif
} }
static void test_pma_split(void) { static void test_pma_split(void) {
...@@ -1563,7 +1732,11 @@ static void test_pma_delete_insert() { ...@@ -1563,7 +1732,11 @@ static void test_pma_delete_insert() {
k = 1; v = 2; k = 1; v = 2;
do_insert(pma, &k, sizeof k, &v, sizeof v, rand4fingerprint, &sum, &expect_fingerprint); do_insert(pma, &k, sizeof k, &v, sizeof v, rand4fingerprint, &sum, &expect_fingerprint);
#if OLDCURSORS
assert_cursor_equal(pmacursor, 2); assert_cursor_equal(pmacursor, 2);
#else
assert_cursor_nokey(pmacursor);
#endif
error = toku_pma_cursor_free(&pmacursor); error = toku_pma_cursor_free(&pmacursor);
assert(error == 0); assert(error == 0);
...@@ -1651,8 +1824,9 @@ static void test_pma_cursor_first_delete_last() { ...@@ -1651,8 +1824,9 @@ static void test_pma_cursor_first_delete_last() {
k = htonl(1); k = htonl(1);
v = 1; v = 1;
error = do_delete(pma, &k, sizeof k, &v, sizeof v, rand4fingerprint, &sum, &expect_fingerprint); assert(error == 0); error = do_delete(pma, &k, sizeof k, &v, sizeof v, rand4fingerprint, &sum, &expect_fingerprint); assert(error == 0);
#if OLDCURSORS
assert(toku_pma_n_entries(pma) == 2); assert(toku_pma_n_entries(pma) == 2);
#endif
error = toku_pma_cursor_set_position_last(pmacursor); error = toku_pma_cursor_set_position_last(pmacursor);
assert(error == 0); assert(error == 0);
assert(toku_pma_n_entries(pma) == 1); assert(toku_pma_n_entries(pma) == 1);
...@@ -1698,7 +1872,9 @@ static void test_pma_cursor_last_delete_first() { ...@@ -1698,7 +1872,9 @@ static void test_pma_cursor_last_delete_first() {
k = htonl(2); k = htonl(2);
v = 2; v = 2;
error = do_delete(pma, &k, sizeof k, &v, sizeof v, rand4fingerprint, &sum, &expect_fingerprint); assert(error == 0); error = do_delete(pma, &k, sizeof k, &v, sizeof v, rand4fingerprint, &sum, &expect_fingerprint); assert(error == 0);
#if OLDCURSORS
assert(toku_pma_n_entries(pma) == 2); assert(toku_pma_n_entries(pma) == 2);
#endif
error = toku_pma_cursor_set_position_first(pmacursor); error = toku_pma_cursor_set_position_first(pmacursor);
assert(error == 0); assert(error == 0);
...@@ -1976,6 +2152,7 @@ static void test_pma_cursor_set_range() { ...@@ -1976,6 +2152,7 @@ static void test_pma_cursor_set_range() {
} }
static void test_pma_cursor_delete_under() { static void test_pma_cursor_delete_under() {
#if OLDCURSORS
if (verbose) printf("test_pma_cursor_delete_under\n"); if (verbose) printf("test_pma_cursor_delete_under\n");
int error; int error;
...@@ -2041,9 +2218,11 @@ static void test_pma_cursor_delete_under() { ...@@ -2041,9 +2218,11 @@ static void test_pma_cursor_delete_under() {
assert(toku_pma_n_entries(pma) == 0); assert(toku_pma_n_entries(pma) == 0);
error = toku_pma_free(&pma); assert(error == 0); error = toku_pma_free(&pma); assert(error == 0);
#endif
} }
static void test_pma_cursor_delete_under_mode(int n, int dup_mode) { static void test_pma_cursor_delete_under_mode(int n, int dup_mode) {
#if OLDCURSORS
if (verbose) printf("test_pma_cursor_delete_under_mode:%d %d\n", n, dup_mode); if (verbose) printf("test_pma_cursor_delete_under_mode:%d %d\n", n, dup_mode);
int error; int error;
...@@ -2115,6 +2294,9 @@ static void test_pma_cursor_delete_under_mode(int n, int dup_mode) { ...@@ -2115,6 +2294,9 @@ static void test_pma_cursor_delete_under_mode(int n, int dup_mode) {
assert(toku_pma_n_entries(pma) == 0); assert(toku_pma_n_entries(pma) == 0);
error = toku_pma_free(&pma); assert(error == 0); error = toku_pma_free(&pma); assert(error == 0);
#else
n = n; dup_mode = dup_mode;
#endif
} }
static void test_pma_cursor_set_both() { static void test_pma_cursor_set_both() {
......
...@@ -24,77 +24,29 @@ ...@@ -24,77 +24,29 @@
#include "brt-internal.h" #include "brt-internal.h"
/**************************** static functions forward declarations. *********************/ /**************************** static functions forward declarations. *********************/
/*
* finish a deletion from the pma. called when there are no cursor references
* to the kv pair.
*/
static void __pma_delete_finish(PMA pma, int here);
/* /* resize the pma array to asksize. zero all array entries starting from startx.*/
* resize the pma array to asksize. zero all array entries starting from startx.
*/
static int pma_resize_array(TOKUTXN, FILENUM, DISKOFF, PMA pma, int asksize, int startx, LSN *node_lsn); static int pma_resize_array(TOKUTXN, FILENUM, DISKOFF, PMA pma, int asksize, int startx, LSN *node_lsn);
static int old_pma_resize_array(PMA pma, int asksize, int startx) { static int old_pma_resize_array(PMA pma, int asksize, int startx) {
return pma_resize_array((TOKUTXN)0, (FILENUM){0}, (DISKOFF)0, pma, asksize, startx, (LSN*)0); return pma_resize_array((TOKUTXN)0, (FILENUM){0}, (DISKOFF)0, pma, asksize, startx, (LSN*)0);
} }
/* /* extract pairs from the pma in the window delimited by lo and hi.*/
* extract pairs from the pma in the window delimited by lo and hi.
*/
static struct kv_pair_tag *pma_extract_pairs(PMA pma, int count, unsigned int lo, unsigned int hi); static struct kv_pair_tag *pma_extract_pairs(PMA pma, int count, unsigned int lo, unsigned int hi);
/*
* update the cursors in a cursor set given a set of tagged pairs.
*/
static void __pma_update_cursors(PMA pma, struct list *cursorset, struct kv_pair_tag *tpairs, int n);
/*
* update this pma's cursors given a set of tagged pairs.
*/
static void __pma_update_my_cursors(PMA pma, struct kv_pair_tag *tpairs, int n);
/* /*
* a deletion occured at index "here" in the pma. rebalance the windows around "here". if * a deletion occured at index "here" in the pma. rebalance the windows around "here". if
* necessary, shrink the pma. * necessary, shrink the pma.
*/ */
static void __pma_delete_at(PMA pma, int here); static void pma_delete_at(PMA pma, int here);
/*
* if the pma entry at here is deleted and there are no more references to it
* then finish the deletion
*/
static void __pma_delete_resume(PMA pma, int here);
/*
* count the number of cursors that reference a pma pair
*/
static int __pma_count_cursor_refs(PMA pma, int here);
/**************************** end of static functions forward declarations. *********************/ /**************************** end of static functions forward declarations. *********************/
/* use the low bit in the kv_pair pointer to indicate an inuse pair that is deleted */
static inline int kv_pair_inuse(struct kv_pair *pair) { static inline int kv_pair_inuse(struct kv_pair *pair) {
return pair != 0; return pair != 0;
} }
static inline int kv_pair_deleted(struct kv_pair *pair) {
return ((long) pair & 1) != 0;
}
static inline int kv_pair_valid(struct kv_pair *pair) {
return kv_pair_inuse(pair) && !kv_pair_deleted(pair);
}
static inline struct kv_pair *kv_pair_set_deleted(struct kv_pair *pair) {
return (struct kv_pair *) ((long) pair | 1);
}
static inline struct kv_pair *kv_pair_ptr(struct kv_pair *pair) {
return (struct kv_pair *) ((long) pair & ~1);
}
struct kv_pair_tag { struct kv_pair_tag {
struct kv_pair *pair; struct kv_pair *pair;
int oldtag, newtag; int oldtag, newtag;
...@@ -116,6 +68,7 @@ static struct kv_pair *kv_pair_malloc_mempool(const void *key, int keylen, const ...@@ -116,6 +68,7 @@ static struct kv_pair *kv_pair_malloc_mempool(const void *key, int keylen, const
/* compress all of the kv pairs to the left edge of the memory pool and /* compress all of the kv pairs to the left edge of the memory pool and
update the pma index with the new kv pair locations */ update the pma index with the new kv pair locations */
static int pma_compress_kvspace(PMA pma) { static int pma_compress_kvspace(PMA pma) {
if (toku_mempool_get_frag_size(&pma->kvspace) == 0) if (toku_mempool_get_frag_size(&pma->kvspace) == 0)
return -1; return -1;
...@@ -128,12 +81,9 @@ static int pma_compress_kvspace(PMA pma) { ...@@ -128,12 +81,9 @@ static int pma_compress_kvspace(PMA pma) {
for (i=0; i<pma->N; i++) { for (i=0; i<pma->N; i++) {
struct kv_pair *kv = pma->pairs[i]; struct kv_pair *kv = pma->pairs[i];
if (kv_pair_inuse(kv)) { if (kv_pair_inuse(kv)) {
kv = kv_pair_ptr(kv);
struct kv_pair *newkv = toku_mempool_malloc(&new_kvspace, kv_pair_size(kv), 4); struct kv_pair *newkv = toku_mempool_malloc(&new_kvspace, kv_pair_size(kv), 4);
assert(newkv); assert(newkv);
memcpy(newkv, kv, kv_pair_size(kv)); memcpy(newkv, kv, kv_pair_size(kv));
if (kv_pair_deleted(pma->pairs[i]))
kv_pair_set_deleted(newkv);
pma->pairs[i] = newkv; pma->pairs[i] = newkv;
} }
} }
...@@ -146,6 +96,7 @@ static int pma_compress_kvspace(PMA pma) { ...@@ -146,6 +96,7 @@ static int pma_compress_kvspace(PMA pma) {
/* malloc space for a kv pair from the pma memory pool and initialize it. /* malloc space for a kv pair from the pma memory pool and initialize it.
if the allocation fails, try to compress the memory pool and try again. */ if the allocation fails, try to compress the memory pool and try again. */
static struct kv_pair *pma_malloc_kv_pair(PMA pma __attribute__((unused)), const void *k, int ksize, const void *v, int vsize) { static struct kv_pair *pma_malloc_kv_pair(PMA pma __attribute__((unused)), const void *k, int ksize, const void *v, int vsize) {
#if PMA_USE_MEMPOOL #if PMA_USE_MEMPOOL
struct kv_pair *kv = kv_pair_malloc_mempool(k, ksize, v, vsize, &pma->kvspace); struct kv_pair *kv = kv_pair_malloc_mempool(k, ksize, v, vsize, &pma->kvspace);
...@@ -160,7 +111,6 @@ static struct kv_pair *pma_malloc_kv_pair(PMA pma __attribute__((unused)), const ...@@ -160,7 +111,6 @@ static struct kv_pair *pma_malloc_kv_pair(PMA pma __attribute__((unused)), const
} }
static void pma_mfree_kv_pair(PMA pma __attribute__((unused)), struct kv_pair *kv) { static void pma_mfree_kv_pair(PMA pma __attribute__((unused)), struct kv_pair *kv) {
kv = kv_pair_ptr(kv);
#if PMA_USE_MEMPOOL #if PMA_USE_MEMPOOL
toku_mempool_mfree(&pma->kvspace, kv, kv_pair_size(kv)); toku_mempool_mfree(&pma->kvspace, kv, kv_pair_size(kv));
#else #else
...@@ -178,14 +128,14 @@ unsigned int toku_pma_index_limit (PMA pma) { ...@@ -178,14 +128,14 @@ unsigned int toku_pma_index_limit (PMA pma) {
int toku_pmanode_valid (PMA pma, unsigned int i) { int toku_pmanode_valid (PMA pma, unsigned int i) {
assert(i<toku_pma_index_limit(pma)); assert(i<toku_pma_index_limit(pma));
return kv_pair_valid(pma->pairs[i]); return kv_pair_inuse(pma->pairs[i]);
} }
bytevec toku_pmanode_key (PMA pma, unsigned int i) { bytevec toku_pmanode_key (PMA pma, unsigned int i) {
struct kv_pair *pair; struct kv_pair *pair;
assert(i<toku_pma_index_limit(pma)); assert(i<toku_pma_index_limit(pma));
pair = pma->pairs[i]; pair = pma->pairs[i];
assert(kv_pair_valid(pair)); assert(kv_pair_inuse(pair));
return kv_pair_key(pair); return kv_pair_key(pair);
} }
...@@ -193,7 +143,7 @@ ITEMLEN toku_pmanode_keylen (PMA pma, unsigned int i) { ...@@ -193,7 +143,7 @@ ITEMLEN toku_pmanode_keylen (PMA pma, unsigned int i) {
struct kv_pair *pair; struct kv_pair *pair;
assert(i<toku_pma_index_limit(pma)); assert(i<toku_pma_index_limit(pma));
pair = pma->pairs[i]; pair = pma->pairs[i];
assert(kv_pair_valid(pair)); assert(kv_pair_inuse(pair));
return kv_pair_keylen(pair); return kv_pair_keylen(pair);
} }
...@@ -201,7 +151,7 @@ bytevec toku_pmanode_val (PMA pma, unsigned int i) { ...@@ -201,7 +151,7 @@ bytevec toku_pmanode_val (PMA pma, unsigned int i) {
struct kv_pair *pair; struct kv_pair *pair;
assert(i<toku_pma_index_limit(pma)); assert(i<toku_pma_index_limit(pma));
pair = pma->pairs[i]; pair = pma->pairs[i];
assert(kv_pair_valid(pair)); assert(kv_pair_inuse(pair));
return kv_pair_val(pair); return kv_pair_val(pair);
} }
...@@ -209,7 +159,7 @@ ITEMLEN toku_pmanode_vallen (PMA pma, unsigned int i) { ...@@ -209,7 +159,7 @@ ITEMLEN toku_pmanode_vallen (PMA pma, unsigned int i) {
struct kv_pair *pair; struct kv_pair *pair;
assert(i<toku_pma_index_limit(pma)); assert(i<toku_pma_index_limit(pma));
pair = pma->pairs[i]; pair = pma->pairs[i];
assert(kv_pair_valid(pair)); assert(kv_pair_inuse(pair));
return kv_pair_vallen(pair); return kv_pair_vallen(pair);
} }
...@@ -221,7 +171,7 @@ int toku_pma_random_pick(PMA pma, bytevec *key, ITEMLEN *keylen, bytevec *val, I ...@@ -221,7 +171,7 @@ int toku_pma_random_pick(PMA pma, bytevec *key, ITEMLEN *keylen, bytevec *val, I
/* For now a simple implementation where we simply start at the beginning and look. */ /* For now a simple implementation where we simply start at the beginning and look. */
for (i=0; i<toku_pma_index_limit(pma); i++) { for (i=0; i<toku_pma_index_limit(pma); i++) {
struct kv_pair *pair = pma->pairs[i]; struct kv_pair *pair = pma->pairs[i];
if (kv_pair_valid(pair)) { if (kv_pair_inuse(pair)) {
*key = kv_pair_key(pair); *key = kv_pair_key(pair);
*keylen = kv_pair_keylen(pair); *keylen = kv_pair_keylen(pair);
*val = kv_pair_val(pair); *val = kv_pair_val(pair);
...@@ -239,7 +189,7 @@ int toku_pma_random_pick(PMA pma, bytevec *key, ITEMLEN *keylen, bytevec *val, I ...@@ -239,7 +189,7 @@ int toku_pma_random_pick(PMA pma, bytevec *key, ITEMLEN *keylen, bytevec *val, I
for (i=0; i<l; i++) { for (i=0; i<l; i++) {
int ir=(i+r)%l; int ir=(i+r)%l;
struct kv_pair *pair = pma->pairs[ir]; struct kv_pair *pair = pma->pairs[ir];
if (kv_pair_valid(pair)) { if (kv_pair_inuse(pair)) {
*key = kv_pair_key(pair); *key = kv_pair_key(pair);
*keylen = kv_pair_keylen(pair); *keylen = kv_pair_keylen(pair);
*val = kv_pair_val(pair); *val = kv_pair_val(pair);
...@@ -282,7 +232,7 @@ static unsigned int pma_search(PMA pma, DBT *k, DBT *v, int lo, int hi, int *fou ...@@ -282,7 +232,7 @@ static unsigned int pma_search(PMA pma, DBT *k, DBT *v, int lo, int hi, int *fou
mi++; mi++;
if (mi >= hi) if (mi >= hi)
return pma_search(pma, k, v, lo, omi, found); return pma_search(pma, k, v, lo, omi, found);
int cmp = pma_compare_dbt_kv(pma, k, v, kv_pair_ptr(pma->pairs[mi])); int cmp = pma_compare_dbt_kv(pma, k, v, pma->pairs[mi]);
if (cmp > 0) if (cmp > 0)
return pma_search(pma, k, v, mi+1, hi, found); return pma_search(pma, k, v, mi+1, hi, found);
if (cmp < 0) if (cmp < 0)
...@@ -310,9 +260,10 @@ static unsigned int pma_search_func(PMA pma, brt_search_t *search, int lo, int h ...@@ -310,9 +260,10 @@ static unsigned int pma_search_func(PMA pma, brt_search_t *search, int lo, int h
mi++; mi++;
if (mi >= hi) if (mi >= hi)
return pma_search_func(pma, search, lo, omi, found); return pma_search_func(pma, search, lo, omi, found);
struct kv_pair *kv = kv_pair_ptr(pma->pairs[mi]); struct kv_pair *kv = pma->pairs[mi];
DBT x, y; DBT x, y;
int cmp = search->compare(search, search->k ? toku_fill_dbt(&x, kv_pair_key(kv), kv_pair_keylen(kv)) : 0, search->v ? toku_fill_dbt(&y, kv_pair_val(kv), kv_pair_vallen(kv)) : 0); int cmp = search->compare(search, search->k ? toku_fill_dbt(&x, kv_pair_key(kv), kv_pair_keylen(kv)) : 0,
search->v ? toku_fill_dbt(&y, kv_pair_val(kv), kv_pair_vallen(kv)) : 0);
if (cmp == 0) { if (cmp == 0) {
if (search->direction == BRT_SEARCH_LEFT) if (search->direction == BRT_SEARCH_LEFT)
return pma_search_func(pma, search, mi+1, hi, found); return pma_search_func(pma, search, mi+1, hi, found);
...@@ -355,7 +306,7 @@ int toku_pmainternal_printpairs (struct kv_pair *pairs[], int N) { ...@@ -355,7 +306,7 @@ int toku_pmainternal_printpairs (struct kv_pair *pairs[], int N) {
printf("{"); printf("{");
for (i=0; i<N; i++) { for (i=0; i<N; i++) {
if (i!=0) printf(" "); if (i!=0) printf(" ");
if (kv_pair_valid(pairs[i])) { if (kv_pair_inuse(pairs[i])) {
printf("%s", (char*)kv_pair_key(pairs[i])); printf("%s", (char*)kv_pair_key(pairs[i]));
count++; count++;
} }
...@@ -467,8 +418,6 @@ int toku_pmainternal_smooth_region (TOKUTXN txn, FILENUM filenum, DISKOFF diskof ...@@ -467,8 +418,6 @@ int toku_pmainternal_smooth_region (TOKUTXN txn, FILENUM filenum, DISKOFF diskof
tmppairs, tmppairs,
node_lsn, node_lsn); node_lsn, node_lsn);
if (r!=0) goto cleanup; if (r!=0) goto cleanup;
if (pma && !list_empty(&pma->cursors))
__pma_update_my_cursors(pma, tmppairs, n_present);
*new_idx = newidx; *new_idx = newidx;
cleanup: cleanup:
...@@ -530,7 +479,6 @@ int toku_pma_create(PMA *pma, pma_compare_fun_t compare_fun, DB *db, FILENUM fil ...@@ -530,7 +479,6 @@ int toku_pma_create(PMA *pma, pma_compare_fun_t compare_fun, DB *db, FILENUM fil
result->dup_mode = 0; result->dup_mode = 0;
result->n_pairs_present = 0; result->n_pairs_present = 0;
result->pairs = 0; result->pairs = 0;
list_init(&result->cursors);
result->compare_fun = compare_fun; result->compare_fun = compare_fun;
result->db = db; result->db = db;
result->filenum = filenum; result->filenum = filenum;
...@@ -558,6 +506,7 @@ int toku_pma_create(PMA *pma, pma_compare_fun_t compare_fun, DB *db, FILENUM fil ...@@ -558,6 +506,7 @@ int toku_pma_create(PMA *pma, pma_compare_fun_t compare_fun, DB *db, FILENUM fil
assert((unsigned long)result->pairs[result->N]==0xdeadbeefL); assert((unsigned long)result->pairs[result->N]==0xdeadbeefL);
return 0; return 0;
} }
int toku_resize_pma_exactly (PMA pma, int oldsize, int newsize) { int toku_resize_pma_exactly (PMA pma, int oldsize, int newsize) {
pma->N = newsize; pma->N = newsize;
...@@ -604,111 +553,6 @@ int toku_pma_set_dup_compare(PMA pma, pma_compare_fun_t dup_compare_fun) { ...@@ -604,111 +553,6 @@ int toku_pma_set_dup_compare(PMA pma, pma_compare_fun_t dup_compare_fun) {
return 0; return 0;
} }
int toku_pma_cursor (PMA pma, PMA_CURSOR *cursp, void **sskey, void **ssval) {
PMA_CURSOR MALLOC(curs);
assert(curs!=0);
if (errno!=0) return errno;
curs->position=-1; /* undefined */
curs->pma = pma;
curs->sskey = sskey;
curs->ssval = ssval;
list_push(&pma->cursors, &curs->next);
*cursp=curs;
return 0;
}
int toku_pma_cursor_get_pma(PMA_CURSOR c, PMA *pmap) {
*pmap = c->pma;
return 0;
}
int toku_pma_cursor_set_position_last (PMA_CURSOR c) {
PMA pma = c->pma;
int result = 0;
int old_position = c->position;
c->position=pma->N-1;
while (!kv_pair_valid(c->pma->pairs[c->position])) {
if (c->position>0)
c->position--;
else {
c->position = -1;
result = DB_NOTFOUND;
break;
}
}
if (old_position != c->position)
__pma_delete_resume(pma, old_position);
return result;
}
int toku_pma_cursor_set_position_prev (PMA_CURSOR c) {
PMA pma = c->pma;
int old_position = c->position;
c->position--;
while (c->position >= 0) {
if (kv_pair_valid(pma->pairs[c->position])) {
__pma_delete_resume(pma, old_position);
return 0;
}
c->position--;
}
c->position = old_position;
return DB_NOTFOUND;
}
int toku_pma_cursor_set_position_first (PMA_CURSOR c) {
PMA pma = c->pma;
int result = 0;
int old_position = c->position;
c->position=0;
while (!kv_pair_valid(c->pma->pairs[c->position])) {
if (c->position+1<(signed)pma->N)
c->position++;
else {
c->position = -1;
result =DB_NOTFOUND;
break;
}
}
if (old_position != c->position)
__pma_delete_resume(pma, old_position);
return result;
}
int toku_pma_cursor_set_position_next (PMA_CURSOR c) {
PMA pma = c->pma;
int old_position=c->position;
c->position++;
while (c->position<(signed)pma->N) {
if (kv_pair_valid(c->pma->pairs[c->position])) {
__pma_delete_resume(pma, old_position);
return 0;
}
c->position++;
}
c->position=old_position;
return DB_NOTFOUND;
}
int toku_pma_cursor_get_current(PMA_CURSOR c, DBT *key, DBT *val, int even_deleted) {
if (c->position == -1)
return DB_NOTFOUND;
PMA pma = c->pma;
struct kv_pair *pair = pma->pairs[c->position];
if (!kv_pair_inuse(pair) || (kv_pair_deleted(pair) && !even_deleted))
return DB_KEYEMPTY;
pair = kv_pair_ptr(pair);
if (key) {
int r = toku_dbt_set_value(key, kv_pair_key(pair), kv_pair_keylen(pair), c->sskey);
if (r != 0) return r;
}
if (val) {
int r = toku_dbt_set_value(val, kv_pair_val(pair), kv_pair_vallen(pair), c->ssval);
if (r != 0) return r;
}
return 0;
}
/* find the next matching key in the pma starting from index here */ /* find the next matching key in the pma starting from index here */
static int pma_next_key(PMA pma, DBT *k, DBT *v, int here, int n, int *found) { static int pma_next_key(PMA pma, DBT *k, DBT *v, int here, int n, int *found) {
assert(0 <= here); assert(0 <= here);
...@@ -716,97 +560,13 @@ static int pma_next_key(PMA pma, DBT *k, DBT *v, int here, int n, int *found) { ...@@ -716,97 +560,13 @@ static int pma_next_key(PMA pma, DBT *k, DBT *v, int here, int n, int *found) {
while (here < n && !kv_pair_inuse(pma->pairs[here])) while (here < n && !kv_pair_inuse(pma->pairs[here]))
here += 1; here += 1;
if (here < n) { if (here < n) {
int cmp = pma_compare_dbt_kv(pma, k, v, kv_pair_ptr(pma->pairs[here])); int cmp = pma_compare_dbt_kv(pma, k, v, pma->pairs[here]);
if (cmp == 0) if (cmp == 0)
*found = 1; *found = 1;
} }
return here; return here;
} }
#if 0 /* not used yet */
/* find the previous matching key in the pma starting from index here */
static int pma_prev_key(PMA pma, DBT *k, DBT *v, int here, int n, int *found) {
assert(here < n);
*found = 0;
while (0 <= here && !kv_pair_inuse(pma->pairs[here]))
here -= 1;
if (0 <= here) {
int cmp = pma_compare_dbt_kv(pma, k, v, kv_pair_ptr(pma->pairs[here]));
if (cmp == 0)
*found = 1;
}
return here;
}
#endif
int toku_pma_cursor_set_both(PMA_CURSOR c, DBT *key, DBT *val) {
PMA pma = c->pma;
unsigned int here; int found;
here = pma_search(pma, key, val, 0, pma->N, &found);
assert(here<=toku_pma_index_limit(pma));
int r = DB_NOTFOUND;
/* skip any deleted pairs that match */
while (found && !kv_pair_valid(pma->pairs[here]))
here = pma_next_key(pma, key, val, here+1, pma->N, &found);
if (found) {
__pma_delete_resume(c->pma, c->position);
c->position = here;
r = 0;
}
return r;
}
int toku_pma_cursor_set_range_both(PMA_CURSOR c, DBT *key, DBT *val) {
PMA pma = c->pma;
unsigned int here; int found;
here = pma_search(pma, key, val, 0, pma->N, &found);
assert(here<=toku_pma_index_limit(pma));
/* find the first valid pair where key[here] >= key */
int r = DB_NOTFOUND;
while (here < pma->N) {
if (kv_pair_valid(pma->pairs[here])) {
__pma_delete_resume(c->pma, c->position);
c->position = here;
r = 0;
break;
}
here += 1;
}
return r;
}
int toku_pma_cursor_delete_under(PMA_CURSOR c, int *kvsize, u_int32_t rand4sem, u_int32_t *fingerprint) {
int r = DB_NOTFOUND;
if (c->position >= 0) {
PMA pma = c->pma;
assert((unsigned)c->position < pma->N);
struct kv_pair *kv = pma->pairs[c->position];
if (kv_pair_valid(kv)) {
if (kvsize)
*kvsize = kv_pair_keylen(kv) + kv_pair_vallen(kv);
*fingerprint -= rand4sem*toku_calccrc32_kvpair (kv_pair_key_const(kv), kv_pair_keylen(kv), kv_pair_val_const(kv), kv_pair_vallen(kv));
pma->pairs[c->position] = kv_pair_set_deleted(kv);
r = 0;
}
}
return r;
}
int toku_pma_cursor_free (PMA_CURSOR *cursp) {
PMA_CURSOR curs=*cursp;
PMA pma = curs->pma;
list_remove(&curs->next);
if (curs->position >= 0 && kv_pair_deleted(pma->pairs[curs->position]) &&
__pma_count_cursor_refs(pma, curs->position) == 0) {
__pma_delete_finish(pma, curs->position);
}
// It's not our job to free the sskey and ssval blocks.
toku_free(curs);
*cursp=0;
return 0;
}
/* Make some space for a key to go at idx (the thing currently at idx should end up at to the right.) */ /* Make some space for a key to go at idx (the thing currently at idx should end up at to the right.) */
/* (Making space may involve moving things around, including the hole at index.) */ /* (Making space may involve moving things around, including the hole at index.) */
int toku_pmainternal_make_space_at (TOKUTXN txn, FILENUM filenum, DISKOFF offset, PMA pma, int idx, unsigned int *new_index, LSN *node_lsn) { int toku_pmainternal_make_space_at (TOKUTXN txn, FILENUM filenum, DISKOFF offset, PMA pma, int idx, unsigned int *new_index, LSN *node_lsn) {
...@@ -866,11 +626,10 @@ int toku_pmainternal_make_space_at (TOKUTXN txn, FILENUM filenum, DISKOFF offset ...@@ -866,11 +626,10 @@ int toku_pmainternal_make_space_at (TOKUTXN txn, FILENUM filenum, DISKOFF offset
} }
enum pma_errors toku_pma_lookup (PMA pma, DBT *k, DBT *v) { enum pma_errors toku_pma_lookup (PMA pma, DBT *k, DBT *v) {
unsigned int here;
int found; int found;
here = pma_search(pma, k, 0, 0, pma->N, &found); unsigned int here = pma_search(pma, k, 0, 0, pma->N, &found);
struct kv_pair *kv = pma->pairs[here]; struct kv_pair *kv = pma->pairs[here];
if (found && kv_pair_valid(kv)) if (found && kv_pair_inuse(kv))
return toku_dbt_set_value(v, kv->key + kv->keylen, kv->vallen, &pma->sval); return toku_dbt_set_value(v, kv->key + kv->keylen, kv->vallen, &pma->sval);
else else
return DB_NOTFOUND; return DB_NOTFOUND;
...@@ -880,7 +639,7 @@ int toku_pma_search(PMA pma, brt_search_t *search, DBT *foundk, DBT *foundv) { ...@@ -880,7 +639,7 @@ int toku_pma_search(PMA pma, brt_search_t *search, DBT *foundk, DBT *foundv) {
int found; int found;
unsigned int here = pma_search_func(pma, search, 0, pma->N, &found); unsigned int here = pma_search_func(pma, search, 0, pma->N, &found);
struct kv_pair *kv = pma->pairs[here]; struct kv_pair *kv = pma->pairs[here];
if (found && kv_pair_valid(kv)) { if (found && kv_pair_inuse(kv)) {
int r = 0; int r = 0;
if (foundk) if (foundk)
r = toku_dbt_set_value(foundk, kv_pair_key(kv), kv_pair_keylen(kv), &pma->skey); r = toku_dbt_set_value(foundk, kv_pair_key(kv), kv_pair_keylen(kv), &pma->skey);
...@@ -895,8 +654,6 @@ int toku_pma_search(PMA pma, brt_search_t *search, DBT *foundk, DBT *foundv) { ...@@ -895,8 +654,6 @@ int toku_pma_search(PMA pma, brt_search_t *search, DBT *foundk, DBT *foundv) {
* You must have freed all the cursors, otherwise returns nonzero and does nothing. */ * You must have freed all the cursors, otherwise returns nonzero and does nothing. */
int toku_pma_free (PMA *pmap) { int toku_pma_free (PMA *pmap) {
PMA pma=*pmap; PMA pma=*pmap;
if (!list_empty(&pma->cursors))
return -1;
if (pma->n_pairs_present > 0) { if (pma->n_pairs_present > 0) {
unsigned int i; unsigned int i;
...@@ -927,24 +684,10 @@ int toku_pma_free (PMA *pmap) { ...@@ -927,24 +684,10 @@ int toku_pma_free (PMA *pmap) {
/* returns an error if the key is already present. */ /* returns an error if the key is already present. */
int toku_pma_insert (PMA pma, DBT *k, DBT *v, TOKUTXN txn, FILENUM filenum, DISKOFF diskoff, u_int32_t rand4fingerprint, u_int32_t *fingerprint, LSN *node_lsn) { int toku_pma_insert (PMA pma, DBT *k, DBT *v, TOKUTXN txn, FILENUM filenum, DISKOFF diskoff, u_int32_t rand4fingerprint, u_int32_t *fingerprint, LSN *node_lsn) {
int found; int found;
unsigned int idx; unsigned int idx = pma_search(pma, k, pma->dup_mode & TOKU_DB_DUPSORT ? v : 0, 0, pma->N, &found);
if (found)
if (pma->dup_mode & TOKU_DB_DUPSORT) return BRT_ALREADY_THERE; /* It is already here. Return an error. */
idx = pma_search(pma, k, v, 0, pma->N, &found);
else
idx = pma_search(pma, k, 0, 0, pma->N, &found);
if (found) {
if (kv_pair_deleted(pma->pairs[idx])) {
pma_mfree_kv_pair(pma, pma->pairs[idx]);
pma->pairs[idx] = pma_malloc_kv_pair(pma, k->data, k->size, v->data, v->size);
assert(pma->pairs[idx]);
*fingerprint += rand4fingerprint*toku_calccrc32_kvpair(k->data, k->size, v->data, v->size);
int r = toku_logger_log_phys_add_or_delete_in_leaf(pma->db, txn, diskoff, 0, pma->pairs[idx]);
if (txn && node_lsn && r==0) *node_lsn = toku_txn_get_last_lsn(txn);
return r;
} else
return BRT_ALREADY_THERE; /* It is already here. Return an error. */
}
if (kv_pair_inuse(pma->pairs[idx])) { if (kv_pair_inuse(pma->pairs[idx])) {
unsigned int newidx; unsigned int newidx;
int r = toku_pmainternal_make_space_at (txn, filenum, diskoff, pma, idx, &newidx, (LSN*)0); /* returns the new idx. */ int r = toku_pmainternal_make_space_at (txn, filenum, diskoff, pma, idx, &newidx, (LSN*)0); /* returns the new idx. */
...@@ -970,28 +713,23 @@ int toku_pma_insert (PMA pma, DBT *k, DBT *v, TOKUTXN txn, FILENUM filenum, DISK ...@@ -970,28 +713,23 @@ int toku_pma_insert (PMA pma, DBT *k, DBT *v, TOKUTXN txn, FILENUM filenum, DISK
static int pma_delete_dup (PMA pma, DBT *k, DBT *v, u_int32_t rand4sem, u_int32_t *fingerprint, u_int32_t *deleted_size) { static int pma_delete_dup (PMA pma, DBT *k, DBT *v, u_int32_t rand4sem, u_int32_t *fingerprint, u_int32_t *deleted_size) {
/* find the left most matching key in the pma */ /* find the left most matching key in the pma */
int found; int found;
unsigned int lefthere; unsigned int lefthere = pma_search(pma, k, v, 0, pma->N, &found);
lefthere = pma_search(pma, k, v, 0, pma->N, &found);
int rightfound = found, righthere = lefthere; int rightfound = found, righthere = lefthere;
while (rightfound) { while (rightfound) {
struct kv_pair *kv = pma->pairs[righthere]; struct kv_pair *kv = pma->pairs[righthere];
if (kv_pair_valid(kv)) { if (kv_pair_inuse(kv)) {
/* mark the pair as deleted */
*deleted_size += PMA_ITEM_OVERHEAD+ KEY_VALUE_OVERHEAD + kv_pair_keylen(kv) + kv_pair_vallen(kv); *deleted_size += PMA_ITEM_OVERHEAD+ KEY_VALUE_OVERHEAD + kv_pair_keylen(kv) + kv_pair_vallen(kv);
*fingerprint -= rand4sem*toku_calccrc32_kvpair (kv_pair_key_const(kv), kv_pair_keylen(kv), kv_pair_val_const(kv), kv_pair_vallen(kv)); *fingerprint -= rand4sem*toku_calccrc32_kvpair (kv_pair_key_const(kv), kv_pair_keylen(kv), kv_pair_val_const(kv), kv_pair_vallen(kv));
pma->pairs[righthere] = kv_pair_set_deleted(kv); pma_mfree_kv_pair(pma, kv);
if (__pma_count_cursor_refs(pma, righthere) == 0) { pma->pairs[righthere] = 0;
pma_mfree_kv_pair(pma, kv); pma->n_pairs_present--;
pma->pairs[righthere] = 0;
pma->n_pairs_present--;
}
} }
/* find the next matching key in the pma */ /* find the next matching key in the pma */
righthere = pma_next_key(pma, k, v, righthere+1, pma->N, &rightfound); righthere = pma_next_key(pma, k, v, righthere+1, pma->N, &rightfound);
} }
if (found) { if (found) {
/* check the density of the region centered around the deleted pairs */ /* check the density of the region centered around the deleted pairs */
__pma_delete_at(pma, (lefthere + righthere) / 2); pma_delete_at(pma, (lefthere + righthere) / 2);
} }
return found ? BRT_OK : DB_NOTFOUND; return found ? BRT_OK : DB_NOTFOUND;
} }
...@@ -1002,13 +740,14 @@ static int pma_delete_nodup (PMA pma, DBT *k, DBT *v, u_int32_t rand4sem, u_int3 ...@@ -1002,13 +740,14 @@ static int pma_delete_nodup (PMA pma, DBT *k, DBT *v, u_int32_t rand4sem, u_int3
unsigned int here; unsigned int here;
here = pma_search(pma, k, v, 0, pma->N, &found); here = pma_search(pma, k, v, 0, pma->N, &found);
struct kv_pair *kv = pma->pairs[here]; struct kv_pair *kv = pma->pairs[here];
if (!found || !kv_pair_valid(kv)) if (!found || !kv_pair_inuse(kv))
return DB_NOTFOUND; return DB_NOTFOUND;
*deleted_size = PMA_ITEM_OVERHEAD + KEY_VALUE_OVERHEAD + kv_pair_keylen(kv) + kv_pair_vallen(kv); *deleted_size = PMA_ITEM_OVERHEAD + KEY_VALUE_OVERHEAD + kv_pair_keylen(kv) + kv_pair_vallen(kv);
*fingerprint -= rand4sem*toku_calccrc32_kvpair (kv_pair_key_const(kv), kv_pair_keylen(kv), kv_pair_val_const(kv), kv_pair_vallen(kv)); *fingerprint -= rand4sem*toku_calccrc32_kvpair (kv_pair_key_const(kv), kv_pair_keylen(kv), kv_pair_val_const(kv), kv_pair_vallen(kv));
pma->pairs[here] = kv_pair_set_deleted(kv); pma_mfree_kv_pair(pma, kv);
if (__pma_count_cursor_refs(pma, here) == 0) pma->pairs[here] = 0;
__pma_delete_finish(pma, here); pma->n_pairs_present--;
pma_delete_at(pma, here);
return BRT_OK; return BRT_OK;
} }
...@@ -1024,22 +763,7 @@ int toku_pma_delete (PMA pma, DBT *k, DBT *v, u_int32_t rand4sem, u_int32_t *fin ...@@ -1024,22 +763,7 @@ int toku_pma_delete (PMA pma, DBT *k, DBT *v, u_int32_t rand4sem, u_int32_t *fin
return pma_delete_nodup(pma, k, v, rand4sem, fingerprint, deleted_size); return pma_delete_nodup(pma, k, v, rand4sem, fingerprint, deleted_size);
} }
void __pma_delete_resume(PMA pma, int here) { static void pma_delete_at(PMA pma, int here) {
if (here >= 0 && kv_pair_deleted(pma->pairs[here]) &&__pma_count_cursor_refs(pma, here) == 0)
__pma_delete_finish(pma, here);
}
static void __pma_delete_finish(PMA pma, int here) {
struct kv_pair *kv = pma->pairs[here];
if (!kv_pair_inuse(kv))
return;
pma_mfree_kv_pair(pma, kv);
pma->pairs[here] = 0;
pma->n_pairs_present--;
__pma_delete_at(pma, here);
}
static void __pma_delete_at(PMA pma, int here) {
int count; int count;
struct kv_pair_tag *newpairs; struct kv_pair_tag *newpairs;
...@@ -1081,7 +805,6 @@ static void __pma_delete_at(PMA pma, int here) { ...@@ -1081,7 +805,6 @@ static void __pma_delete_at(PMA pma, int here) {
if (0) printf("delete_at_rebalance %d over %d %d\n", count, lo, hi); if (0) printf("delete_at_rebalance %d over %d %d\n", count, lo, hi);
newpairs = pma_extract_pairs(pma, count, lo, hi); newpairs = pma_extract_pairs(pma, count, lo, hi);
distribute_data(pma->pairs + lo, hi - lo, newpairs, count, pma); distribute_data(pma->pairs + lo, hi - lo, newpairs, count, pma);
__pma_update_my_cursors(pma, newpairs, count);
toku_free(newpairs); toku_free(newpairs);
return; return;
} }
...@@ -1100,8 +823,6 @@ static void __pma_delete_at(PMA pma, int here) { ...@@ -1100,8 +823,6 @@ static void __pma_delete_at(PMA pma, int here) {
assert(newpairs); assert(newpairs);
old_pma_resize_array(pma, size, 0); old_pma_resize_array(pma, size, 0);
distribute_data(pma->pairs, pma->N, newpairs, count, pma); distribute_data(pma->pairs, pma->N, newpairs, count, pma);
/* update the cursors */
__pma_update_my_cursors(pma, newpairs, count);
toku_free(newpairs); toku_free(newpairs);
} }
...@@ -1112,24 +833,15 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v, ...@@ -1112,24 +833,15 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
LSN *node_lsn) { LSN *node_lsn) {
//printf("%s:%d v->size=%d\n", __FILE__, __LINE__, v->size); //printf("%s:%d v->size=%d\n", __FILE__, __LINE__, v->size);
int r; int r;
unsigned int idx;
int found; int found;
if (pma->dup_mode & TOKU_DB_DUPSORT) unsigned int idx = pma_search(pma, k, pma->dup_mode & TOKU_DB_DUPSORT ? v : 0, 0, pma->N, &found);
idx = pma_search(pma, k, v, 0, pma->N, &found);
else
idx = pma_search(pma, k, 0, 0, pma->N, &found);
if (found) { if (found) {
struct kv_pair *kv = pma->pairs[idx]; struct kv_pair *kv = pma->pairs[idx];
if (kv_pair_deleted(kv)) { *replaced_v_size = kv->vallen;
*replaced_v_size = -1; *fingerprint -= rand4fingerprint*toku_calccrc32_kvpair(kv_pair_key_const(kv), kv_pair_keylen(kv), kv_pair_val_const(kv), kv_pair_vallen(kv));
pma->pairs[idx] = kv_pair_ptr(kv); /* mark as not deleted */ r=toku_logger_log_phys_add_or_delete_in_leaf(pma->db, txn, diskoff, 0, kv);
} else { if (r!=0) return r;
*replaced_v_size = kv->vallen; if (txn && node_lsn) *node_lsn = toku_txn_get_last_lsn(txn);
*fingerprint -= rand4fingerprint*toku_calccrc32_kvpair(kv_pair_key_const(kv), kv_pair_keylen(kv), kv_pair_val_const(kv), kv_pair_vallen(kv));
r=toku_logger_log_phys_add_or_delete_in_leaf(pma->db, txn, diskoff, 0, kv);
if (r!=0) return r;
if (txn && node_lsn) *node_lsn = toku_txn_get_last_lsn(txn);
}
if (v->size == (unsigned int) kv_pair_vallen(kv)) { if (v->size == (unsigned int) kv_pair_vallen(kv)) {
memcpy(kv_pair_val(kv), v->data, v->size); memcpy(kv_pair_val(kv), v->data, v->size);
} else { } else {
...@@ -1177,68 +889,6 @@ void toku_pma_iterate (PMA pma, void(*f)(bytevec,ITEMLEN,bytevec,ITEMLEN, void*) ...@@ -1177,68 +889,6 @@ void toku_pma_iterate (PMA pma, void(*f)(bytevec,ITEMLEN,bytevec,ITEMLEN, void*)
} }
} }
int __pma_count_cursor_refs(PMA pma, int here) {
int refs = 0;
struct list *list;
struct pma_cursor *cursor;
list = list_head(&pma->cursors);
while (list != &pma->cursors) {
cursor = list_struct(list, struct pma_cursor, next);
if (cursor->position == here)
refs += 1;
list = list->next;
}
return refs;
}
static void __pma_update_cursors_position(PMA pma, struct list *cursor_set, int oldposition, int newposition) {
struct list *list, *nextlist;
struct pma_cursor *cursor;
list = list_head(cursor_set);
while (list != cursor_set) {
nextlist = list->next; /* may be removed later */
cursor = list_struct(list, struct pma_cursor, next);
if (cursor->position == oldposition) {
if (0) printf("cursor %p %d -> %d\n", cursor, oldposition, newposition);
cursor->position = newposition;
cursor->pma = pma;
list_remove(list);
list_push(&pma->cursors, list);
}
list = nextlist;
}
}
void __pma_update_cursors(PMA pma, struct list *cursor_set, struct kv_pair_tag *tpairs, int n) {
/* short cut */
if (list_empty(cursor_set))
return;
/* update all cursors to their new positions */
int i;
for (i=0; i<n; i++) {
if (tpairs[i].pair && tpairs[i].oldtag >= 0)
__pma_update_cursors_position(pma, cursor_set, tpairs[i].oldtag, tpairs[i].newtag);
}
}
static void __pma_update_my_cursors(PMA pma, struct kv_pair_tag *tpairs, int n) {
if (list_empty(&pma->cursors))
return;
struct list cursors;
list_move(&cursors, &pma->cursors);
__pma_update_cursors(pma, &cursors, tpairs, n);
while (!list_empty(&cursors)) {
struct list *list = list_head(&cursors);
list_remove(list);
list_push(&pma->cursors, list);
}
}
static struct kv_pair_tag *pma_extract_pairs(PMA pma, int npairs, unsigned int lo, unsigned int hi) { static struct kv_pair_tag *pma_extract_pairs(PMA pma, int npairs, unsigned int lo, unsigned int hi) {
struct kv_pair_tag *pairs; struct kv_pair_tag *pairs;
unsigned int i; unsigned int i;
...@@ -1323,12 +973,6 @@ int toku_pma_split(TOKUTXN txn, FILENUM filenum, ...@@ -1323,12 +973,6 @@ int toku_pma_split(TOKUTXN txn, FILENUM filenum,
unsigned int revised_leftpmasize = runlen; unsigned int revised_leftpmasize = runlen;
unsigned int revised_rightpmasize = sumlen-runlen; unsigned int revised_rightpmasize = sumlen-runlen;
/* Get all of the cursors from the original pma */
struct list cursors;
list_init(&cursors);
if (!list_empty(&pma->cursors))
list_move(&cursors, &pma->cursors);
u_int32_t revised_left_fingerprint; u_int32_t revised_left_fingerprint;
u_int32_t revised_right_fingerprint; u_int32_t revised_right_fingerprint;
{ {
...@@ -1370,7 +1014,6 @@ int toku_pma_split(TOKUTXN txn, FILENUM filenum, ...@@ -1370,7 +1014,6 @@ int toku_pma_split(TOKUTXN txn, FILENUM filenum,
pma_mfree_kv_pair(pma, pairs[i].pair); pma_mfree_kv_pair(pma, pairs[i].pair);
} }
#endif #endif
__pma_update_cursors(newpma, &cursors, &pairs[spliti], n);
newpma->n_pairs_present = n; newpma->n_pairs_present = n;
/* put the first half of pairs into the left pma */ /* put the first half of pairs into the left pma */
...@@ -1383,7 +1026,6 @@ int toku_pma_split(TOKUTXN txn, FILENUM filenum, ...@@ -1383,7 +1026,6 @@ int toku_pma_split(TOKUTXN txn, FILENUM filenum,
if (r!=0) { toku_free(pairs); return r; } if (r!=0) { toku_free(pairs); return r; }
} }
// Don't have to relocate kvpairs, because these ones are still there. // Don't have to relocate kvpairs, because these ones are still there.
__pma_update_cursors(pma, &cursors, &pairs[0], n);
pma->n_pairs_present = spliti; pma->n_pairs_present = spliti;
toku_free(pairs); toku_free(pairs);
...@@ -1414,8 +1056,6 @@ int toku_pma_bulk_insert(TOKUTXN txn, FILENUM filenum, DISKOFF diskoff, PMA pma, ...@@ -1414,8 +1056,6 @@ int toku_pma_bulk_insert(TOKUTXN txn, FILENUM filenum, DISKOFF diskoff, PMA pma,
if (n_newpairs == 0) if (n_newpairs == 0)
return 0; return 0;
if (!list_empty(&pma->cursors))
return -1;
if (toku_pma_n_entries(pma) > 0) if (toku_pma_n_entries(pma) > 0)
return -2; return -2;
...@@ -1467,7 +1107,6 @@ void toku_pma_verify(PMA pma) { ...@@ -1467,7 +1107,6 @@ void toku_pma_verify(PMA pma) {
for (i=0; i<pma->N; i++) { for (i=0; i<pma->N; i++) {
kv = pma->pairs[i]; kv = pma->pairs[i];
if (kv_pair_inuse(kv)) { if (kv_pair_inuse(kv)) {
kv = kv_pair_ptr(kv);
i += 1; i += 1;
break; break;
} }
...@@ -1478,7 +1117,6 @@ void toku_pma_verify(PMA pma) { ...@@ -1478,7 +1117,6 @@ void toku_pma_verify(PMA pma) {
for (; i<pma->N; i++) { for (; i<pma->N; i++) {
nextkv = pma->pairs[i]; nextkv = pma->pairs[i];
if (kv_pair_inuse(nextkv)) { if (kv_pair_inuse(nextkv)) {
nextkv = kv_pair_ptr(nextkv);
DBT kv_dbt, nextkv_dbt; DBT kv_dbt, nextkv_dbt;
toku_fill_dbt(&kv_dbt, kv_pair_key(kv), kv_pair_keylen(kv)); toku_fill_dbt(&kv_dbt, kv_pair_key(kv), kv_pair_keylen(kv));
toku_fill_dbt(&nextkv_dbt, kv_pair_key(nextkv), kv_pair_keylen(nextkv)); toku_fill_dbt(&nextkv_dbt, kv_pair_key(nextkv), kv_pair_keylen(nextkv));
...@@ -1502,7 +1140,6 @@ void toku_pma_verify(PMA pma) { ...@@ -1502,7 +1140,6 @@ void toku_pma_verify(PMA pma) {
for (i=0; i<pma->N; i++) { for (i=0; i<pma->N; i++) {
kv = pma->pairs[i]; kv = pma->pairs[i];
if (kv_pair_inuse(kv)) { if (kv_pair_inuse(kv)) {
kv = kv_pair_ptr(kv);
assert(toku_mempool_inrange(&pma->kvspace, kv, kv_pair_size(kv))); assert(toku_mempool_inrange(&pma->kvspace, kv, kv_pair_size(kv)));
} }
} }
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
/* All functions return 0 on success. */ /* All functions return 0 on success. */
typedef struct pma *PMA; typedef struct pma *PMA;
typedef struct pma_cursor *PMA_CURSOR; // typedef struct pma_cursor *PMA_CURSOR;
/* compare 2 DBT's. return a value < 0, = 0, > 0 if a < b, a == b, a > b respectively */ /* compare 2 DBT's. return a value < 0, = 0, > 0 if a < b, a == b, a > b respectively */
typedef int (*pma_compare_fun_t)(DB *, const DBT *a, const DBT *b); typedef int (*pma_compare_fun_t)(DB *, const DBT *a, const DBT *b);
...@@ -108,31 +108,6 @@ int toku_pma_split(TOKUTXN, FILENUM, ...@@ -108,31 +108,6 @@ int toku_pma_split(TOKUTXN, FILENUM,
*/ */
int toku_pma_bulk_insert(TOKUTXN, FILENUM, DISKOFF, PMA pma, DBT *keys, DBT *vals, int n_newpairs, u_int32_t rand4sem, u_int32_t *fingerprint, LSN */*node_lsn*/); int toku_pma_bulk_insert(TOKUTXN, FILENUM, DISKOFF, PMA pma, DBT *keys, DBT *vals, int n_newpairs, u_int32_t rand4sem, u_int32_t *fingerprint, LSN */*node_lsn*/);
/* Move the cursor to the beginning or the end or to a key */
int toku_pma_cursor (PMA, PMA_CURSOR *, void** /*sskey*/, void ** /*ssval*/); // the sskey and ssval point to variables that hold blocks that can be used to return values for zero'd DBTS.
int toku_pma_cursor_free (PMA_CURSOR*);
/* get the pma that a pma cursor is bound to */
int toku_pma_cursor_get_pma(PMA_CURSOR c, PMA *pma);
int toku_pma_cursor_set_position_last (PMA_CURSOR c);
int toku_pma_cursor_set_position_first (PMA_CURSOR c);
int toku_pma_cursor_set_position_next (PMA_CURSOR c); /* Requires the cursor is init'd. Returns DB_NOTFOUND if we fall off the end. */
int toku_pma_cursor_set_position_prev (PMA_CURSOR c);
/* get the key and data under the cursor
if even_deleted is 1 then the key and val under the cursor are returned even if
it has been deleted previously */
int toku_pma_cursor_get_current(PMA_CURSOR c, DBT *key, DBT *val, int even_deleted);
/* move the cursor to the kv pair matching the key and the val if nonzero*/
int toku_pma_cursor_set_both(PMA_CURSOR c, DBT *key, DBT *val);
/* set the cursor to the smallest key in the pma >= key and the val if nonzero*/
int toku_pma_cursor_set_range_both(PMA_CURSOR c, DBT *key, DBT *val);
/* delete the key value pair under the cursor, return the size of the pair */
int toku_pma_cursor_delete_under(PMA_CURSOR /*c*/, int */*kvsize*/, u_int32_t /*rand4sem*/, u_int32_t */*fingerprint*/);
int toku_pma_random_pick(PMA, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen); int toku_pma_random_pick(PMA, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen);
unsigned int toku_pma_index_limit(PMA); // How many slots are in the PMA right now? unsigned int toku_pma_index_limit(PMA); // How many slots are in the PMA right now?
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment