Commit 862072b7 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

#3047 fix le cursor per code review refs[t:3047]

git-svn-id: file:///svn/toku/tokudb@25741 c7de825b-a66e-492c-adef-691d508d4ae1
parent 3bcfd0f9
......@@ -4436,7 +4436,9 @@ brt_search_leaf_node(BRTNODE node, brt_search_t *search, BRT_GET_CALLBACK_FUNCTI
if (r!=0) return r;
LEAFENTRY le = datav;
if (!toku_brt_cursor_is_leaf_mode(brtcursor) && is_le_val_del(le,brtcursor)) {
if (toku_brt_cursor_is_leaf_mode(brtcursor))
goto got_a_good_value; // leaf mode cursors see all leaf entries
if (is_le_val_del(le,brtcursor)) {
// Provisionally deleted stuff is gone.
// So we need to scan in the direction to see if we can find something
while (1) {
......@@ -4846,6 +4848,7 @@ brt_cursor_shortcut (BRT_CURSOR cursor, int direction, u_int32_t limit, BRT_GET_
lazy_assert_zero(r);
if (toku_brt_cursor_is_leaf_mode(cursor) || !is_le_val_del(le, cursor)) {
maybe_do_implicit_promotion_on_query(cursor, le);
u_int32_t keylen;
void *key;
......
......@@ -10,10 +10,10 @@
struct le_cursor {
BRT_CURSOR brt_cursor;
BRT brt;
DBT key;
BOOL neg_infinity;
BOOL pos_infinity;
DBT key; // the key that the le cursor is positioned at
// TODO a better implementation would fetch the key from the brt cursor
BOOL neg_infinity; // TRUE when the le cursor is positioned at -infinity (initial setting)
BOOL pos_infinity; // TRUE when the le cursor is positioned at +infinity (when _next returns DB_NOTFOUND)
};
int
......@@ -25,8 +25,8 @@ le_cursor_create(LE_CURSOR *le_cursor_result, BRT brt, TOKUTXN txn) {
else {
result = toku_brt_cursor(brt, &le_cursor->brt_cursor, txn, FALSE);
if (result == 0) {
// TODO move the leaf mode to the brt cursor constructor
toku_brt_cursor_set_leaf_mode(le_cursor->brt_cursor);
le_cursor->brt = brt;
toku_init_dbt(&le_cursor->key);
le_cursor->key.flags = DB_DBT_REALLOC;
le_cursor->neg_infinity = TRUE;
......@@ -50,10 +50,16 @@ le_cursor_close(LE_CURSOR le_cursor) {
return result;
}
// this implementation copies the key and leafentry into the supplied DBTs.
// this may be too slow. an alternative implementation could avoid copying the
// key by fetching the key from the brt cursor, and could avoid copying the leaf entry
// by processing the leaf entry in the brt cursor callback.
struct le_cursor_callback_arg {
DBT *key, *val;
};
// copy the key and the leaf entry to the given DBTs
static int
le_cursor_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val, void *v) {
struct le_cursor_callback_arg *arg = (struct le_cursor_callback_arg *) v;
......@@ -63,31 +69,35 @@ le_cursor_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val, voi
}
int
le_cursor_next(LE_CURSOR le_cursor, DBT *key, DBT *val) {
le_cursor->neg_infinity = FALSE;
struct le_cursor_callback_arg arg = { &le_cursor->key, val };
int result = toku_brt_cursor_get(le_cursor->brt_cursor, NULL, le_cursor_callback, &arg, DB_NEXT);
if (result == 0 && key != NULL)
toku_dbt_set(le_cursor->key.size, le_cursor->key.data, key, NULL);
else if (result == DB_NOTFOUND)
le_cursor->pos_infinity = TRUE;
le_cursor_next(LE_CURSOR le_cursor, DBT *le) {
int result;
if (le_cursor->pos_infinity)
result = DB_NOTFOUND;
else {
le_cursor->neg_infinity = FALSE;
struct le_cursor_callback_arg arg = { &le_cursor->key, le };
// TODO replace this with a non deprecated function
result = toku_brt_cursor_get(le_cursor->brt_cursor, NULL, le_cursor_callback, &arg, DB_NEXT);
if (result == DB_NOTFOUND)
le_cursor->pos_infinity = TRUE;
}
return result;
}
int
BOOL
is_key_right_of_le_cursor(LE_CURSOR le_cursor, const DBT *key, DB *keycompare_db) {
int result;
BOOL result;
if (le_cursor->neg_infinity)
result = TRUE;
result = TRUE; // all keys are right of -infinity
else if (le_cursor->pos_infinity)
result = FALSE;
result = FALSE; // all keys are left of +infinity
else {
brt_compare_func keycompare = toku_brt_get_bt_compare(le_cursor->brt);
brt_compare_func keycompare = toku_brt_get_bt_compare(le_cursor->brt_cursor->brt);
int r = keycompare(keycompare_db, &le_cursor->key, key);
if (r < 0)
result = TRUE;
result = TRUE; // key is right of the cursor key
else
result = FALSE;
result = FALSE; // key is at or left of the cursor key
}
return result;
}
......
......@@ -28,13 +28,13 @@ int le_cursor_close(LE_CURSOR le_cursor);
// Retrieve the next leaf entry under the LE_CURSOR
// Success: returns zero, stores the leaf entry key into the key dbt, and the leaf entry into the val dbt
// Failure: returns a non-zero error number
int le_cursor_next(LE_CURSOR le_cursor, DBT *key, DBT *val);
int le_cursor_next(LE_CURSOR le_cursor, DBT *le);
// Return TRUE if the key is to the right of the LE_CURSOR position
// Otherwise returns FALSE
// Otherwise returns FALSE when the key is at or to the left of the LE_CURSOR position
// The LE_CURSOR position is intialized to -infinity. Any key comparision with -infinity returns TRUE.
// When the cursor runs off the right edge of the tree, the LE_CURSOR position is set to +infinity. Any key comparision with +infinity
// returns FALSE.
int is_key_right_of_le_cursor(LE_CURSOR le_cursor, const DBT *key, DB *keycompare_db);
BOOL is_key_right_of_le_cursor(LE_CURSOR le_cursor, const DBT *key, DB *keycompare_db);
#endif
......@@ -146,16 +146,16 @@ test_provdel(const char *logdir, const char *fname, int n) {
int i;
for (i=0; ; i++) {
error = le_cursor_next(cursor, &key, &val);
error = le_cursor_next(cursor, &val);
if (error != 0)
break;
assert(key.size == sizeof (int));
int ii;
memcpy(&ii, key.data, key.size);
assert((int) toku_htonl(i) == ii);
LEAFENTRY le = (LEAFENTRY) val.data;
assert(le->type == LE_MVCC);
assert(le->keylen == sizeof (int));
int ii;
memcpy(&ii, le->u.mvcc.key_xrs, le->keylen);
assert((int) toku_htonl(i) == ii);
}
assert(i == n);
......
......@@ -152,16 +152,17 @@ test_pos_infinity(const char *fname, int n) {
int i;
for (i = 0; ; i++) {
error = le_cursor_next(cursor, &key, &val);
error = le_cursor_next(cursor, &val);
if (error != 0)
break;
assert(key.size == sizeof (int));
int ii;
memcpy(&ii, key.data, key.size);
assert((int) toku_htonl(i) == ii);
LEAFENTRY le = (LEAFENTRY) val.data;
assert(le->type == LE_MVCC);
assert(le->keylen == sizeof (int));
int ii;
memcpy(&ii, le->u.mvcc.key_xrs, le->keylen);
assert((int) toku_htonl(i) == ii);
}
assert(i == n);
......@@ -213,16 +214,16 @@ test_between(const char *fname, int n) {
int i;
for (i = 0; ; i++) {
// move the LE_CURSOR forward
error = le_cursor_next(cursor, &key, &val);
error = le_cursor_next(cursor, &val);
if (error != 0)
break;
assert(key.size == sizeof (int));
int ii;
memcpy(&ii, key.data, key.size);
assert((int) toku_htonl(i) == ii);
LEAFENTRY le = (LEAFENTRY) val.data;
assert(le->type == LE_MVCC);
assert(le->keylen == sizeof (int));
int ii;
memcpy(&ii, le->u.mvcc.key_xrs, le->keylen);
assert((int) toku_htonl(i) == ii);
// test that 0 .. i is not right of the cursor
for (int j = 0; j <= i; j++) {
......
......@@ -112,16 +112,16 @@ walk_tree(const char *fname, int n) {
int i;
for (i = 0; ; i++) {
error = le_cursor_next(cursor, &key, &val);
error = le_cursor_next(cursor, &val);
if (error != 0)
break;
assert(key.size == sizeof (int));
int ii;
memcpy(&ii, key.data, key.size);
assert((int) toku_htonl(i) == ii);
LEAFENTRY le = (LEAFENTRY) val.data;
assert(le->type == LE_MVCC);
assert(le->keylen == sizeof (int));
int ii;
memcpy(&ii, le->u.mvcc.key_xrs, le->keylen);
assert((int) toku_htonl(i) == ii);
}
assert(i == n);
......@@ -151,19 +151,28 @@ init_logdir(const char *logdir) {
assert(error == 0);
}
int
test_main (int argc , const char *argv[]) {
default_parse_args(argc, argv);
const char *logdir = "dir." __FILE__;
static void
run_test(const char *logdir, const char *brtfile, int n) {
init_logdir(logdir);
int error = chdir(logdir);
assert(error == 0);
const int n = 1000;
const char *brtfile = __FILE__ ".brt";
create_populate_tree(".", brtfile, n);
walk_tree(brtfile, n);
error = chdir("..");
assert(error == 0);
}
int
test_main (int argc , const char *argv[]) {
default_parse_args(argc, argv);
const char *logdir = "dir." __FILE__;
const char *brtfile = __FILE__ ".brt";
run_test(logdir, brtfile, 0);
run_test(logdir, brtfile, 1000);
return 0;
}
......@@ -231,7 +231,7 @@ build_index(DB_INDEXER *indexer) {
toku_ydb_lock();
result = le_cursor_next(indexer->i->lec, &key, &le);
result = le_cursor_next(indexer->i->lec, &le);
if (result != 0) {
done = TRUE;
if (result == DB_NOTFOUND)
......@@ -243,6 +243,7 @@ build_index(DB_INDEXER *indexer) {
DB *db = indexer->i->dest_dbs[which_db];
result = indexer_undo_do(indexer, db, ule);
if ( (result != 0) && (indexer->i->error_callback != NULL)) {
toku_dbt_set(ule_get_keylen(ule), ule_get_key(ule), &key, NULL);
indexer->i->error_callback(db, which_db, result, &key, NULL, indexer->i->error_extra);
}
}
......
......@@ -204,6 +204,7 @@ BDB_DONTRUN_TESTS = \
upgrade-test-2 \
upgrade-test-3 \
upgrade-test-4 \
upgrade-test-5 \
zombie_db \
#\ ends prev line
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment