Commit 8d1f5625 authored by Rich Prohaska's avatar Rich Prohaska

pma delete under cursor changes

git-svn-id: file:///svn/tokudb@236 c7de825b-a66e-492c-adef-691d508d4ae1
parent e804fe3d
......@@ -53,6 +53,28 @@ static inline int kv_pair_vallen(struct kv_pair *pair) {
return pair->vallen;
}
/* use the low bit to indicate an inuse pair that is deleted */
static inline int kv_pair_inuse(struct kv_pair *pair) {
return pair != 0;
}
static inline int kv_pair_deleted(struct kv_pair *pair) {
return ((long) pair & 1) != 0;
}
static inline int kv_pair_valid(struct kv_pair *pair) {
return kv_pair_inuse(pair) && !kv_pair_deleted(pair);
}
static inline struct kv_pair *kv_pair_set_deleted(struct kv_pair *pair) {
return (struct kv_pair *) ((long) pair | 1);
}
static inline struct kv_pair *kv_pair_ptr(struct kv_pair *pair) {
return (struct kv_pair *) ((long) pair & ~1);
}
struct kv_pair_tag {
struct kv_pair *pair;
int oldtag, newtag;
......
......@@ -60,6 +60,17 @@ void __pma_update_my_cursors(PMA pma, struct kv_pair_tag *tpairs, int n);
*/
void __pma_delete_at(PMA pma, int here);
/*
* finish a deletion from the pma. called when there are no cursor references
* to the kv pair.
*/
void __pma_delete_finish(PMA pma, int here);
/*
* count the number of cursors that reference a pma pair
*/
int __pma_count_cursor_refs(PMA pma, int here);
/* density thresholds */
#define PMA_LDT_HIGH 0.25
#define PMA_LDT_LOW 0.40
......
......@@ -609,12 +609,94 @@ void test_pma_cursor_4 (void) {
assert(error == 0);
}
void test_pma_cursor_delete(int n) {
printf("test_pma_cursor_delete:%d\n", n);
PMA pma;
int error;
error = pma_create(&pma, default_compare_fun);
assert(error == 0);
/* insert 1 -> 42 */
DBT key, val; int k, v;
int i;
for (i=0; i<n; i++) {
k = i; v = -i;
fill_dbt(&key, &k, sizeof k);
fill_dbt(&val, &v, sizeof v);
error = pma_insert(pma, &key, &val, 0);
assert(error == 0);
}
/* point the cursor to the first kv */
PMA_CURSOR cursor;
error = pma_cursor(pma, &cursor);
assert(error == 0);
DBT cursorkey, cursorval;
init_dbt(&cursorkey); cursorkey.flags = DB_DBT_MALLOC;
init_dbt(&cursorval); cursorval.flags = DB_DBT_MALLOC;
error = pma_cget_current(cursor, &cursorkey, &cursorval);
assert(error != 0);
error = pma_cursor_set_position_first(cursor);
assert(error == 0);
int kk;
init_dbt(&cursorkey); cursorkey.flags = DB_DBT_MALLOC;
init_dbt(&cursorval); cursorval.flags = DB_DBT_MALLOC;
error = pma_cget_current(cursor, &cursorkey, &cursorval);
assert(error == 0);
assert(cursorkey.size == sizeof kk);
kk = 0;
assert(0 == memcmp(cursorkey.data, &kk, sizeof kk));
toku_free(cursorkey.data);
toku_free(cursorval.data);
/* delete the first key */
k = 0;
fill_dbt(&key, &k, sizeof k);
error = pma_delete(pma, &key, 0);
assert(error == 0);
/* cursor get should fail */
init_dbt(&cursorkey); cursorkey.flags = DB_DBT_MALLOC;
init_dbt(&cursorval); cursorval.flags = DB_DBT_MALLOC;
error = pma_cget_current(cursor, &cursorkey, &cursorval);
assert(error != 0);
error = pma_cursor_set_position_next(cursor);
if (n <= 1)
assert(error != 0);
else {
assert(error == 0);
init_dbt(&cursorkey); cursorkey.flags = DB_DBT_MALLOC;
init_dbt(&cursorval); cursorval.flags = DB_DBT_MALLOC;
error = pma_cget_current(cursor, &cursorkey, &cursorval);
assert(error == 0);
assert(cursorkey.size == sizeof kk);
kk = 1;
assert(0 == memcmp(cursorkey.data, &kk, sizeof kk));
toku_free(cursorkey.data);
toku_free(cursorval.data);
}
error = pma_cursor_free(&cursor);
assert(error == 0);
error = pma_free(&pma);
assert(error == 0);
}
void test_pma_cursor (void) {
test_pma_cursor_0();
test_pma_cursor_1();
test_pma_cursor_2();
test_pma_cursor_3();
test_pma_cursor_4();
test_pma_cursor_delete(1);
test_pma_cursor_delete(2);
}
int wrong_endian_compare_fun (DB *ignore __attribute__((__unused__)),
......@@ -1243,10 +1325,74 @@ void test_pma_delete_cursor(int n) {
assert(r == 0);
}
/*
* insert k,1
* place cursor at k
* delete k
* cursor get current
* lookup k
* insert k,2
* lookup k
* cursor get current
*/
void test_pma_delete_insert() {
printf("test_pma_delete_insert\n");
PMA pma;
int error;
error = pma_create(&pma, default_compare_fun);
assert(error == 0);
PMA_CURSOR pmacursor;
error = pma_cursor(pma, &pmacursor);
assert(error == 0);
DBT key, val;
int k, v;
k = 1; v = 1;
fill_dbt(&key, &k, sizeof k);
fill_dbt(&val, &v, sizeof v);
error = pma_insert(pma, &key, &val, 0);
assert(error == 0);
error = pma_cursor_set_position_first(pmacursor);
assert(error == 0);
assert_cursor_equal(pmacursor, 1);
k = 1;
fill_dbt(&key, &k, sizeof k);
error = pma_delete(pma, &key, 0);
assert(error == 0);
assert_cursor_nokey(pmacursor);
k = 1;
fill_dbt(&key, &k, sizeof k);
init_dbt(&val); val.flags = DB_DBT_MALLOC;
error = pma_lookup(pma, &key, &val, 0);
assert(error != 0);
k = 1; v = 2;
fill_dbt(&key, &k, sizeof k);
fill_dbt(&val, &v, sizeof v);
error = pma_insert(pma, &key, &val, 0);
assert(error == 0);
assert_cursor_equal(pmacursor, 2);
error = pma_cursor_free(&pmacursor);
assert(error == 0);
error = pma_free(&pma);
assert(error == 0);
}
void test_pma_delete() {
test_pma_delete_shrink(256); memory_check_all_free();
test_pma_delete_random(256); memory_check_all_free();
test_pma_delete_cursor(32); memory_check_all_free();
test_pma_delete_insert(); memory_check_all_free();
}
void pma_tests (void) {
......
......@@ -11,7 +11,6 @@
#include <errno.h>
/* Only needed for testing. */
#include <string.h>
#include "list.h"
#include "kv-pair.h"
#include "pma-internal.h"
......@@ -29,14 +28,14 @@ int pma_index_limit (PMA pma) {
int pmanode_valid (PMA pma, int i) {
assert(0<=i); assert(i<pma_index_limit(pma));
return pma->pairs[i] != 0;
return kv_pair_inuse(pma->pairs[i]);
}
bytevec pmanode_key (PMA pma, int i) {
struct kv_pair *pair;
assert(0<=i); assert(i<pma_index_limit(pma));
pair = pma->pairs[i];
assert(pair);
assert(kv_pair_valid(pair));
return kv_pair_key(pair);
}
......@@ -44,7 +43,7 @@ ITEMLEN pmanode_keylen (PMA pma, int i) {
struct kv_pair *pair;
assert(0<=i); assert(i<pma_index_limit(pma));
pair = pma->pairs[i];
assert(pair);
assert(kv_pair_valid(pair));
return kv_pair_keylen(pair);
}
......@@ -52,7 +51,7 @@ bytevec pmanode_val (PMA pma, int i) {
struct kv_pair *pair;
assert(0<=i); assert(i<pma_index_limit(pma));
pair = pma->pairs[i];
assert(pair);
assert(kv_pair_valid(pair));
return kv_pair_val(pair);
}
......@@ -60,7 +59,7 @@ ITEMLEN pmanode_vallen (PMA pma, int i) {
struct kv_pair *pair;
assert(0<=i); assert(i<pma_index_limit(pma));
pair = pma->pairs[i];
assert(pair);
assert(kv_pair_valid(pair));
return kv_pair_vallen(pair);
}
......@@ -72,7 +71,7 @@ int pma_random_pick(PMA pma, bytevec *key, ITEMLEN *keylen, bytevec *val, ITEMLE
/* For now a simple implementation where we simply start at the beginning and look. */
for (i=0; i<pma_index_limit(pma); i++) {
struct kv_pair *pair = pma->pairs[i];
if (pair) {
if (kv_pair_valid(pair)) {
*key = kv_pair_key(pair);
*keylen = kv_pair_keylen(pair);
*val = kv_pair_val(pair);
......@@ -90,7 +89,7 @@ int pma_random_pick(PMA pma, bytevec *key, ITEMLEN *keylen, bytevec *val, ITEMLE
for (i=0; i<l; i++) {
int ir=(i+r)%l;
struct kv_pair *pair = pma->pairs[ir];
if (pair) {
if (kv_pair_valid(pair)) {
*key = kv_pair_key(pair);
*keylen = kv_pair_keylen(pair);
*val = kv_pair_val(pair);
......@@ -125,10 +124,12 @@ int pmainternal_find (PMA pma, DBT *k, DB *db) {
int mid;
// Scan forward looking for a non-null value.
for (mid=(lo+hi)/2; mid<hi; mid++) {
if (pma->pairs[mid]!=0) {
struct kv_pair *kv = pma->pairs[mid];
if (kv_pair_inuse(kv)) {
// Found one.
kv = kv_pair_ptr(kv);
DBT k2;
int cmp = pma->compare_fun(db, k, fill_dbt(&k2, pma->pairs[mid]->key, pma->pairs[mid]->keylen));
int cmp = pma->compare_fun(db, k, fill_dbt(&k2, kv->key, kv->keylen));
if (cmp==0) return mid;
else if (cmp<0) {
/* key is smaller than the midpoint, so look in the low half. */
......@@ -175,8 +176,8 @@ int pmainternal_printpairs (struct kv_pair *pairs[], int N) {
printf("{");
for (i=0; i<N; i++) {
if (i!=0) printf(" ");
if (pairs[i]) {
printf("%s", (char*)pairs[i]->key);
if (kv_pair_valid(pairs[i])) {
printf("%s", (char*)kv_pair_key(pairs[i]));
count++;
}
else printf("_");
......@@ -224,7 +225,7 @@ int pmainternal_smooth_region (struct kv_pair *pairs[], int n, int idx, int base
int i;
int n_present=0;
for (i=0; i<n; i++) {
if (pairs[i]) n_present++;
if (kv_pair_inuse(pairs[i])) n_present++;
}
n_present++; // Save one for the blank guy.
{
......@@ -241,7 +242,7 @@ int pmainternal_smooth_region (struct kv_pair *pairs[], int n, int idx, int base
if (i==idx) {
tmppairs[n_saved++].pair = 0;
}
if (pairs[i]) {
if (kv_pair_inuse(pairs[i])) {
tmppairs[n_saved].oldtag = base + i;
tmppairs[n_saved++].pair = pairs[i];
}
......@@ -295,7 +296,7 @@ void pmainternal_calculate_parameters (PMA pma) {
int pmainternal_count_region (struct kv_pair *pairs[], int lo, int hi) {
int n=0;
while (lo<hi) {
if (pairs[lo]) n++;
if (kv_pair_inuse(pairs[lo])) n++;
lo++;
}
return n;
......@@ -376,7 +377,7 @@ int pma_cursor_get_pma(PMA_CURSOR c, PMA *pmap) {
int pma_cursor_set_position_last (PMA_CURSOR c) {
PMA pma = c->pma;
c->position=pma->N-1;
while (c->pma->pairs[c->position]==0) {
while (!kv_pair_valid(c->pma->pairs[c->position])) {
if (c->position>0) c->position--;
else return DB_NOTFOUND;
}
......@@ -388,8 +389,11 @@ int pma_cursor_set_position_prev (PMA_CURSOR c) {
int old_position = c->position;
c->position--;
while (c->position >= 0) {
if (pma->pairs[c->position] != 0)
if (kv_pair_valid(pma->pairs[c->position])) {
if (old_position >= 0 && kv_pair_deleted(pma->pairs[old_position]) &&__pma_count_cursor_refs(pma, old_position) == 0)
__pma_delete_finish(pma, old_position);
return 0;
}
c->position--;
}
c->position = old_position;
......@@ -399,7 +403,7 @@ int pma_cursor_set_position_prev (PMA_CURSOR c) {
int pma_cursor_set_position_first (PMA_CURSOR c) {
PMA pma = c->pma;
c->position=0;
while (c->pma->pairs[c->position]==0) {
while (!kv_pair_valid(c->pma->pairs[c->position])) {
if (c->position+1<pma->N) c->position++;
else return DB_NOTFOUND;
}
......@@ -411,17 +415,24 @@ int pma_cursor_set_position_next (PMA_CURSOR c) {
int old_position=c->position;
c->position++;
while (c->position<pma->N) {
if (c->pma->pairs[c->position]!=0) return 0;
c->position++;
if (kv_pair_valid(c->pma->pairs[c->position])) {
if (old_position >= 0 && kv_pair_deleted(pma->pairs[old_position]) && __pma_count_cursor_refs(pma, old_position) == 0)
__pma_delete_finish(pma, old_position);
return 0;
}
c->position++;
}
c->position=old_position;
return DB_NOTFOUND;
}
int pma_cget_current (PMA_CURSOR c, DBT *key, DBT *val) {
if (c->position == -1)
return DB_NOTFOUND;
PMA pma = c->pma;
struct kv_pair *pair = pma->pairs[c->position];
if (pair==0) return BRT_KEYEMPTY;
if (!kv_pair_valid(pair))
return BRT_KEYEMPTY;
ybt_set_value(key, pair->key, pair->keylen, &c->skey);
ybt_set_value(val, pair->key + pair->keylen, pair->vallen, &c->sval);
return 0;
......@@ -445,7 +456,12 @@ int pma_cget_first (PMA_CURSOR c, YBT *key, YBT *val) {
int pma_cursor_free (PMA_CURSOR *cursp) {
PMA_CURSOR curs=*cursp;
PMA pma = curs->pma;
list_remove(&curs->next);
if (curs->position >= 0 && kv_pair_deleted(pma->pairs[curs->position]) &&
__pma_count_cursor_refs(pma, curs->position) == 0) {
__pma_delete_finish(pma, curs->position);
}
if (curs->skey) toku_free(curs->skey);
if (curs->sval) toku_free(curs->sval);
toku_free(curs);
......@@ -524,7 +540,7 @@ enum pma_errors pma_lookup (PMA pma, DBT *k, DBT *v, DB *db) {
assert(0<=l ); assert(l<=pma_index_limit(pma));
if (l==pma_index_limit(pma)) return DB_NOTFOUND;
pair = pma->pairs[l];
if (pair!=0 && pma->compare_fun(db, k, fill_dbt(&k2, pair->key, pair->keylen))==0) {
if (kv_pair_valid(pair) && pma->compare_fun(db, k, fill_dbt(&k2, pair->key, pair->keylen))==0) {
return ybt_set_value(v, pair->key + pair->keylen, pair->vallen, &pma->sval);
} else {
return DB_NOTFOUND;
......@@ -541,8 +557,9 @@ int pma_free (PMA *pmap) {
if (pma->n_pairs_present > 0) {
for (i=0; i < pma->N; i++) {
if (pma->pairs[i]) {
kv_pair_free(pma->pairs[i]);
struct kv_pair *kv = pma->pairs[i];
if (kv_pair_inuse(kv)) {
kv_pair_free(kv_pair_ptr(kv));
pma->pairs[i] = 0;
pma->n_pairs_present--;
}
......@@ -562,14 +579,19 @@ int pma_insert (PMA pma, DBT *k, DBT *v, DB* db) {
int idx = pmainternal_find(pma, k, db);
if (idx < pma_index_limit(pma) && pma->pairs[idx]) {
DBT k2;
if (0==pma->compare_fun(db, k, fill_dbt(&k2, pma->pairs[idx]->key, pma->pairs[idx]->keylen))) {
return BRT_ALREADY_THERE; /* It is already here. Return an error. */
struct kv_pair *kv = kv_pair_ptr(pma->pairs[idx]);
if (0==pma->compare_fun(db, k, fill_dbt(&k2, kv->key, kv->keylen))) {
if (kv_pair_deleted(pma->pairs[idx])) {
pma->pairs[idx] = kv_pair_realloc_same_key(kv, v->data, v->size);
return BRT_OK;
} else
return BRT_ALREADY_THERE; /* It is already here. Return an error. */
}
}
if (pma->pairs[idx]) {
if (kv_pair_inuse(pma->pairs[idx])) {
idx = pmainternal_make_space_at (pma, idx); /* returns the new idx. */
}
assert(!pma->pairs[idx]);
assert(!kv_pair_inuse(pma->pairs[idx]));
pma->pairs[idx] = kv_pair_malloc(k->data, k->size, v->data, v->size);
assert(pma->pairs[idx]);
pma->n_pairs_present++;
......@@ -580,18 +602,27 @@ int pma_delete (PMA pma, DBT *k, DB *db) {
int l;
l = pmainternal_find(pma, k, db);
struct kv_pair *pair = pma->pairs[l];
if (pair==0) {
struct kv_pair *kv = pma->pairs[l];
if (!kv_pair_inuse(kv)) {
printf("%s:%d l=%d r=%d\n", __FILE__, __LINE__, l, DB_NOTFOUND);
return DB_NOTFOUND;
}
kv_pair_free(pair);
pma->pairs[l] = 0;
pma->n_pairs_present--;
__pma_delete_at(pma, l);
pma->pairs[l] = kv_pair_set_deleted(kv);
if (__pma_count_cursor_refs(pma, l) == 0)
__pma_delete_finish(pma, l);
return BRT_OK;
}
void __pma_delete_finish(PMA pma, int here) {
struct kv_pair *kv = pma->pairs[here];
if (!kv_pair_inuse(kv))
return;
kv_pair_free(kv_pair_ptr(kv));
pma->pairs[here] = 0;
pma->n_pairs_present--;
__pma_delete_at(pma, here);
}
void __pma_delete_at(PMA pma, int here) {
int size;
int count;
......@@ -666,19 +697,21 @@ int pma_insert_or_replace (PMA pma, DBT *k, DBT *v, DB *db,
) {
//printf("%s:%d v->size=%d\n", __FILE__, __LINE__, v->size);
int idx = pmainternal_find(pma, k, db);
struct kv_pair *pair;
if (idx < pma_index_limit(pma) && (pair=pma->pairs[idx])) {
struct kv_pair *kv;
if (idx < pma_index_limit(pma) && (kv = pma->pairs[idx])) {
DBT k2;
if (0==pma->compare_fun(db, k, fill_dbt(&k2, pair->key, pair->keylen))) {
*replaced_v_size = pair->vallen;
pma->pairs[idx] = kv_pair_realloc_same_key(pair, v->data, v->size);
return BRT_OK; /* It is already here. Return an error. */
kv = kv_pair_ptr(kv);
if (0==pma->compare_fun(db, k, fill_dbt(&k2, kv->key, kv->keylen))) {
if (!kv_pair_deleted(pma->pairs[idx]))
*replaced_v_size = kv->vallen;
pma->pairs[idx] = kv_pair_realloc_same_key(kv, v->data, v->size);
return BRT_OK;
}
}
if (pma->pairs[idx]) {
if (kv_pair_inuse(pma->pairs[idx])) {
idx = pmainternal_make_space_at (pma, idx); /* returns the new idx. */
}
assert(!pma->pairs[idx]);
assert(!kv_pair_inuse(pma->pairs[idx]));
//printf("%s:%d v->size=%d\n", __FILE__, __LINE__, v->size);
pma->pairs[idx] = kv_pair_malloc(k->data, k->size, v->data, v->size);
assert(pma->pairs[idx]);
......@@ -698,6 +731,21 @@ void pma_iterate (PMA pma, void(*f)(bytevec,ITEMLEN,bytevec,ITEMLEN, void*), voi
}
}
int __pma_count_cursor_refs(PMA pma, int here) {
int refs = 0;
struct list *list;
struct pma_cursor *cursor;
list = list_head(&pma->cursors);
while (list != &pma->cursors) {
cursor = list_struct(list, struct pma_cursor, next);
if (cursor->position == here)
refs += 1;
list = list->next;
}
return refs;
}
void __pma_update_cursors_position(PMA pma, struct list *cursor_set, int oldposition, int newposition) {
struct list *list, *nextlist;
struct pma_cursor *cursor;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment