Commit d1a42e49 authored by Rich Prohaska's avatar Rich Prohaska

fix DB_NEXT_DUP when the cursor points to a deleted kv pair. closes #121

git-svn-id: file:///svn/tokudb@950 c7de825b-a66e-492c-adef-691d508d4ae1
parent 2a86cadd
......@@ -2842,7 +2842,7 @@ int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int flags, TOKUT
r=unpin_cursor(cursor); if (r!=0) goto died0;
assert(cursor->pmacurs == 0);
r=brtcurs_set_position_last(cursor, *rootp, kbt, txn, null_brtnode); if (r!=0) goto died0;
r=toku_pma_cursor_get_current(cursor->pmacurs, kbt, vbt);
r=toku_pma_cursor_get_current(cursor->pmacurs, kbt, vbt, 0);
if (r == 0) assert_cursor_path(cursor);
break;
case DB_FIRST:
......@@ -2850,14 +2850,14 @@ int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int flags, TOKUT
r=unpin_cursor(cursor); if (r!=0) goto died0;
assert(cursor->pmacurs == 0);
r=brtcurs_set_position_first(cursor, *rootp, kbt, txn, null_brtnode); if (r!=0) goto died0;
r=toku_pma_cursor_get_current(cursor->pmacurs, kbt, vbt);
r=toku_pma_cursor_get_current(cursor->pmacurs, kbt, vbt, 0);
if (r == 0) assert_cursor_path(cursor);
break;
case DB_NEXT:
if (cursor->path_len<=0)
goto do_db_first;
r=brtcurs_set_position_next(cursor, kbt, txn); if (r!=0) goto died0;
r=toku_pma_cursor_get_current(cursor->pmacurs, kbt, vbt); if (r!=0) goto died0;
r=toku_pma_cursor_get_current(cursor->pmacurs, kbt, vbt, 0); if (r!=0) goto died0;
if (r == 0) assert_cursor_path(cursor);
break;
case DB_NEXT_DUP:
......@@ -2868,15 +2868,15 @@ int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int flags, TOKUT
if the keys are the same then return the duplicate key and data */
DBT k1; memset(&k1, 0, sizeof k1);
r = toku_pma_cursor_get_current(cursor->pmacurs, &k1, 0); if (r != 0) goto died0;
r = toku_pma_cursor_get_current(cursor->pmacurs, &k1, 0, 1); if (r != 0) goto died0;
r = toku_pma_cursor_set_position_next(cursor->pmacurs);
if (r == 0) {
DBT k2; memset(&k2, 0, sizeof k2); k2.flags = DB_DBT_MALLOC;
r = toku_pma_cursor_get_current(cursor->pmacurs, &k2, 0); if (r != 0) goto died0;
r = toku_pma_cursor_get_current(cursor->pmacurs, &k2, 0, 0); if (r != 0) goto died0;
int cmp = cursor->brt->compare_fun(cursor->brt->db, &k1, &k2);
toku_free(k2.data);
if (cmp == 0) {
r = toku_pma_cursor_get_current(cursor->pmacurs, kbt, vbt);
r = toku_pma_cursor_get_current(cursor->pmacurs, kbt, vbt, 0);
if (r != 0) {
toku_pma_cursor_set_position_prev(cursor->pmacurs); goto died0;
}
......@@ -2892,14 +2892,14 @@ int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int flags, TOKUT
goto died0;
}
DBT k2; memset(&k2, 0, sizeof k2); k2.flags = DB_DBT_MALLOC;
r = toku_pma_cursor_get_current(cursor->pmacurs, &k2, 0); assert(r == 0);
r = toku_pma_cursor_get_current(cursor->pmacurs, &k2, 0, 0); assert(r == 0);
int cmp = cursor->brt->compare_fun(cursor->brt->db, &k1, &k2);
toku_free(k2.data);
if (cmp != 0) {
brtcurs_set_position_prev(cursor, 0, txn);
r = DB_NOTFOUND;
} else
r = toku_pma_cursor_get_current(cursor->pmacurs, kbt, vbt);
r = toku_pma_cursor_get_current(cursor->pmacurs, kbt, vbt, 0);
if (r != 0) goto died0;
} else
goto died0;
......@@ -2908,14 +2908,14 @@ int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int flags, TOKUT
if (cursor->path_len<= 0)
goto do_db_last;
r = brtcurs_set_position_prev(cursor, kbt, txn); if (r!=0) goto died0;
r = toku_pma_cursor_get_current(cursor->pmacurs, kbt, vbt); if (r!=0) goto died0;
r = toku_pma_cursor_get_current(cursor->pmacurs, kbt, vbt, 0); if (r!=0) goto died0;
if (r == 0) assert_cursor_path(cursor);
break;
case DB_CURRENT:
if (cursor->path_len<=0) {
r = EINVAL; goto died0;
}
r=toku_pma_cursor_get_current(cursor->pmacurs, kbt, vbt); if (r!=0) goto died0;
r=toku_pma_cursor_get_current(cursor->pmacurs, kbt, vbt, 0); if (r!=0) goto died0;
if (r == 0) assert_cursor_path(cursor);
break;
case DB_SET:
......@@ -2923,7 +2923,7 @@ int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int flags, TOKUT
assert(r == 0);
r = brtcurs_set_key(cursor, *rootp, kbt, vbt, DB_SET, txn, null_brtnode);
if (r != 0) goto died0;
r = toku_pma_cursor_get_current(cursor->pmacurs, 0, vbt);
r = toku_pma_cursor_get_current(cursor->pmacurs, 0, vbt, 0);
if (r != 0) goto died0;
break;
case DB_GET_BOTH:
......@@ -2937,7 +2937,7 @@ int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int flags, TOKUT
assert(r == 0);
r = brtcurs_set_range(cursor, *rootp, kbt, txn, null_brtnode);
if (r != 0) goto died0;
r = toku_pma_cursor_get_current(cursor->pmacurs, kbt, vbt);
r = toku_pma_cursor_get_current(cursor->pmacurs, kbt, vbt, 0);
if (r != 0) goto died0;
break;
default:
......
......@@ -561,24 +561,24 @@ static void test_pma_cursor_3 (void) {
r=toku_pma_cursor(pma, &c, &skey, &sval); assert(r==0); assert(c!=0);
r=toku_pma_cursor_set_position_first(c); assert(r==0);
r=toku_pma_cursor_get_current(c, &key, &val); assert(r==0);
r=toku_pma_cursor_get_current(c, &key, &val, 0); assert(r==0);
assert(key.size=3); assert(memcmp(key.data,"aa",3)==0);
assert(val.size=2); assert(memcmp(val.data,"a",2)==0);
r=toku_pma_cursor_set_position_next(c); assert(r==0);
r=toku_pma_cursor_get_current(c, &key, &val); assert(r==0);
r=toku_pma_cursor_get_current(c, &key, &val, 0); assert(r==0);
assert(key.size=2); assert(memcmp(key.data,"m",2)==0);
assert(val.size=3); assert(memcmp(val.data,"mm",3)==0);
r=toku_pma_cursor_set_position_next(c); assert(r==0);
r=toku_pma_cursor_get_current(c, &key, &val); assert(r==0);
r=toku_pma_cursor_get_current(c, &key, &val, 0); assert(r==0);
assert(key.size=2); assert(memcmp(key.data,"x",2)==0);
assert(val.size=3); assert(memcmp(val.data,"xx",3)==0);
r=toku_pma_cursor_set_position_next(c); assert(r==DB_NOTFOUND);
/* After an error, the cursor should still point at the same thing. */
r=toku_pma_cursor_get_current(c, &key, &val); assert(r==0);
r=toku_pma_cursor_get_current(c, &key, &val, 0); assert(r==0);
assert(key.size=2); assert(memcmp(key.data,"x",2)==0);
assert(val.size=3); assert(memcmp(val.data,"xx",3)==0);
......@@ -599,7 +599,7 @@ static void assert_cursor_val(PMA_CURSOR cursor, int v) {
toku_init_dbt(&key); key.flags = DB_DBT_MALLOC;
toku_init_dbt(&val); val.flags = DB_DBT_MALLOC;
error = toku_pma_cursor_get_current(cursor, &key, &val);
error = toku_pma_cursor_get_current(cursor, &key, &val, 0);
assert(error == 0);
assert( v == *(int *)val.data);
toku_free(key.data);
......@@ -706,7 +706,7 @@ static void test_pma_cursor_delete(int n) {
DBT cursorkey, cursorval;
toku_init_dbt(&cursorkey); cursorkey.flags = DB_DBT_MALLOC;
toku_init_dbt(&cursorval); cursorval.flags = DB_DBT_MALLOC;
error = toku_pma_cursor_get_current(cursor, &cursorkey, &cursorval);
error = toku_pma_cursor_get_current(cursor, &cursorkey, &cursorval, 0);
assert(error != 0);
error = toku_pma_cursor_set_position_first(cursor);
......@@ -715,7 +715,7 @@ static void test_pma_cursor_delete(int n) {
int kk;
toku_init_dbt(&cursorkey); cursorkey.flags = DB_DBT_MALLOC;
toku_init_dbt(&cursorval); cursorval.flags = DB_DBT_MALLOC;
error = toku_pma_cursor_get_current(cursor, &cursorkey, &cursorval);
error = toku_pma_cursor_get_current(cursor, &cursorkey, &cursorval, 0);
assert(error == 0);
assert(cursorkey.size == sizeof kk);
kk = 0;
......@@ -730,7 +730,7 @@ static void test_pma_cursor_delete(int n) {
/* cursor get should fail */
toku_init_dbt(&cursorkey); cursorkey.flags = DB_DBT_MALLOC;
toku_init_dbt(&cursorval); cursorval.flags = DB_DBT_MALLOC;
error = toku_pma_cursor_get_current(cursor, &cursorkey, &cursorval);
error = toku_pma_cursor_get_current(cursor, &cursorkey, &cursorval, 0);
assert(error != 0);
error = toku_pma_cursor_set_position_next(cursor);
......@@ -740,7 +740,7 @@ static void test_pma_cursor_delete(int n) {
assert(error == 0);
toku_init_dbt(&cursorkey); cursorkey.flags = DB_DBT_MALLOC;
toku_init_dbt(&cursorval); cursorval.flags = DB_DBT_MALLOC;
error = toku_pma_cursor_get_current(cursor, &cursorkey, &cursorval);
error = toku_pma_cursor_get_current(cursor, &cursorkey, &cursorval, 0);
assert(error == 0);
assert(cursorkey.size == sizeof kk);
kk = 1;
......@@ -812,7 +812,7 @@ static void test_pma_compare_fun (int wrong_endian_p) {
} else {
r=toku_pma_cursor_set_position_next(c); assert(r==0);
}
r=toku_pma_cursor_get_current(c, &key, &val); assert(r==0);
r=toku_pma_cursor_get_current(c, &key, &val, 0); assert(r==0);
//printf("Got %s, expect %s\n", (char*)key.data, expected_keys[i]);
assert(key.size=3); assert(memcmp(key.data,expected_keys[i],3)==0);
assert(val.size=4); assert(memcmp(val.data,expected_keys[i],2)==0);
......@@ -1046,7 +1046,7 @@ static void print_cursor(const char *str, PMA_CURSOR cursor) {
printf("cursor %s: ", str);
toku_init_dbt(&key); key.flags = DB_DBT_MALLOC;
toku_init_dbt(&val); val.flags = DB_DBT_MALLOC;
error = toku_pma_cursor_get_current(cursor, &key, &val);
error = toku_pma_cursor_get_current(cursor, &key, &val, 0);
assert(error == 0);
printf("%s ", (char*)key.data);
toku_free(key.data);
......@@ -1063,7 +1063,7 @@ static void walk_cursor(const char *str, PMA_CURSOR cursor) {
for (;;) {
toku_init_dbt(&key); key.flags = DB_DBT_MALLOC;
toku_init_dbt(&val); val.flags = DB_DBT_MALLOC;
error = toku_pma_cursor_get_current(cursor, &key, &val);
error = toku_pma_cursor_get_current(cursor, &key, &val, 0);
assert(error == 0);
printf("%s ", (char*)key.data);
toku_free(key.data);
......@@ -1084,7 +1084,7 @@ static void walk_cursor_reverse(const char *str, PMA_CURSOR cursor) {
for (;;) {
toku_init_dbt(&key); key.flags = DB_DBT_MALLOC;
toku_init_dbt(&val); val.flags = DB_DBT_MALLOC;
error = toku_pma_cursor_get_current(cursor, &key, &val);
error = toku_pma_cursor_get_current(cursor, &key, &val, 0);
assert(error == 0);
printf("%s ", (char*)key.data);
toku_free(key.data);
......@@ -1451,7 +1451,7 @@ static void assert_cursor_equal(PMA_CURSOR pmacursor, int v) {
toku_init_dbt(&key); key.flags = DB_DBT_MALLOC;
toku_init_dbt(&val); val.flags = DB_DBT_MALLOC;
int r;
r = toku_pma_cursor_get_current(pmacursor, &key, &val);
r = toku_pma_cursor_get_current(pmacursor, &key, &val, 0);
assert(r == 0);
if (0) printf("key %s\n", (char*) key.data);
int thev;
......@@ -1467,7 +1467,7 @@ static void assert_cursor_nokey(PMA_CURSOR pmacursor) {
toku_init_dbt(&key); key.flags = DB_DBT_MALLOC;
toku_init_dbt(&val); val.flags = DB_DBT_MALLOC;
int r;
r = toku_pma_cursor_get_current(pmacursor, &key, &val);
r = toku_pma_cursor_get_current(pmacursor, &key, &val, 0);
assert(r != 0);
}
......@@ -1803,7 +1803,7 @@ static void test_pma_cursor_set_key() {
if (i % 10 == 0) {
assert(error == 0);
toku_init_dbt(&val); val.flags = DB_DBT_MALLOC;
error = toku_pma_cursor_get_current(cursor, &key, &val);
error = toku_pma_cursor_get_current(cursor, &key, &val, 0);
assert(error == 0);
int vv;
assert(val.size == sizeof vv);
......@@ -1862,7 +1862,7 @@ static void test_pma_cursor_set_range() {
assert(i > largest_key);
} else {
toku_init_dbt(&val); val.flags = DB_DBT_MALLOC;
error = toku_pma_cursor_get_current(cursor, &key, &val);
error = toku_pma_cursor_get_current(cursor, &key, &val, 0);
assert(error == 0);
int vv;
assert(val.size == sizeof vv);
......@@ -1927,7 +1927,7 @@ static void test_pma_cursor_delete_under() {
}
toku_init_dbt(&key); key.flags = DB_DBT_MALLOC;
toku_init_dbt(&val); val.flags = DB_DBT_MALLOC;
error = toku_pma_cursor_get_current(cursor, &key, &val);
error = toku_pma_cursor_get_current(cursor, &key, &val, 0);
assert(error == 0);
int vv;
assert(val.size == sizeof vv);
......@@ -2013,7 +2013,7 @@ static void test_pma_cursor_set_both() {
toku_init_dbt(&key); key.flags = DB_DBT_MALLOC;
toku_init_dbt(&val); val.flags = DB_DBT_MALLOC;
error = toku_pma_cursor_get_current(cursor, &key, &val);
error = toku_pma_cursor_get_current(cursor, &key, &val, 0);
assert(error == 0);
int vv;
assert(val.size == sizeof vv);
......@@ -2121,7 +2121,7 @@ static void test_dup_key_insert(int n) {
while (1) {
toku_init_dbt(&key); key.flags = DB_DBT_MALLOC;
toku_init_dbt(&val); val.flags = DB_DBT_MALLOC;
r = toku_pma_cursor_get_current(cursor, &key, &val);
r = toku_pma_cursor_get_current(cursor, &key, &val, 0);
assert(r == 0);
int kk;
assert(key.size == sizeof kk);
......@@ -2219,7 +2219,7 @@ static void test_dup_key_delete(int n, int mode) {
k = htonl(1); v = 1;
toku_init_dbt(&key); key.flags = DB_DBT_MALLOC;
toku_init_dbt(&val); val.flags = DB_DBT_MALLOC;
r = toku_pma_cursor_get_current(cursor, &key, &val);
r = toku_pma_cursor_get_current(cursor, &key, &val, 0);
assert(r == 0);
assert(key.size == sizeof kk);
memcpy(&kk, key.data, key.size);
......@@ -2236,7 +2236,7 @@ static void test_dup_key_delete(int n, int mode) {
k = htonl(3); v = 3;
toku_init_dbt(&key); key.flags = DB_DBT_MALLOC;
toku_init_dbt(&val); val.flags = DB_DBT_MALLOC;
r = toku_pma_cursor_get_current(cursor, &key, &val);
r = toku_pma_cursor_get_current(cursor, &key, &val, 0);
assert(r == 0);
assert(key.size == sizeof kk);
memcpy(&kk, key.data, key.size);
......@@ -2317,7 +2317,7 @@ static void test_dupsort_key_insert(int n, int dup_data) {
while (1) {
toku_init_dbt(&key); key.flags = DB_DBT_MALLOC;
toku_init_dbt(&val); val.flags = DB_DBT_MALLOC;
r = toku_pma_cursor_get_current(cursor, &key, &val);
r = toku_pma_cursor_get_current(cursor, &key, &val, 0);
assert(r == 0);
int kk;
assert(key.size == sizeof kk);
......
......@@ -785,13 +785,14 @@ int toku_pma_cursor_set_position_next (PMA_CURSOR c) {
return DB_NOTFOUND;
}
int toku_pma_cursor_get_current(PMA_CURSOR c, DBT *key, DBT *val) {
int toku_pma_cursor_get_current(PMA_CURSOR c, DBT *key, DBT *val, int even_deleted) {
if (c->position == -1)
return DB_NOTFOUND;
PMA pma = c->pma;
struct kv_pair *pair = pma->pairs[c->position];
if (!kv_pair_valid(pair))
if (!kv_pair_inuse(pair) || (kv_pair_deleted(pair) && !even_deleted))
return BRT_KEYEMPTY;
pair = kv_pair_ptr(pair);
if (key) toku_dbt_set_value(key, kv_pair_key(pair), kv_pair_keylen(pair), c->sskey);
if (val) toku_dbt_set_value(val, kv_pair_val(pair), kv_pair_vallen(pair), c->ssval);
return 0;
......
......@@ -102,11 +102,10 @@ int toku_pma_cursor_set_position_first (PMA_CURSOR c);
int toku_pma_cursor_set_position_next (PMA_CURSOR c); /* Requires the cursor is init'd. Returns DB_NOTFOUND if we fall off the end. */
int toku_pma_cursor_set_position_prev (PMA_CURSOR c);
/* get the key and data under the cursor */
int toku_pma_cursor_get_current(PMA_CURSOR c, DBT *key, DBT *val);
int toku_pma_cursor_get_current_key(PMA_CURSOR c, DBT *key);
int toku_pma_cursor_get_current_data(PMA_CURSOR c, DBT *val);
/* get the key and data under the cursor
if even_deleted is 1 then the key and val under the cursor are returned even if
it has been deleted previously */
int toku_pma_cursor_get_current(PMA_CURSOR c, DBT *key, DBT *val, int even_deleted);
/* set the cursor to the matching key and value pair */
int toku_pma_cursor_set_both(PMA_CURSOR c, DBT *key, DBT *val);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment