Commit be63bc38 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

#3047 fix le cursor per code review refs[t:3047]

git-svn-id: file:///svn/toku/tokudb@25741 c7de825b-a66e-492c-adef-691d508d4ae1
parent f3906cab
...@@ -4436,7 +4436,9 @@ brt_search_leaf_node(BRTNODE node, brt_search_t *search, BRT_GET_CALLBACK_FUNCTI ...@@ -4436,7 +4436,9 @@ brt_search_leaf_node(BRTNODE node, brt_search_t *search, BRT_GET_CALLBACK_FUNCTI
if (r!=0) return r; if (r!=0) return r;
LEAFENTRY le = datav; LEAFENTRY le = datav;
if (!toku_brt_cursor_is_leaf_mode(brtcursor) && is_le_val_del(le,brtcursor)) { if (toku_brt_cursor_is_leaf_mode(brtcursor))
goto got_a_good_value; // leaf mode cursors see all leaf entries
if (is_le_val_del(le,brtcursor)) {
// Provisionally deleted stuff is gone. // Provisionally deleted stuff is gone.
// So we need to scan in the direction to see if we can find something // So we need to scan in the direction to see if we can find something
while (1) { while (1) {
...@@ -4846,6 +4848,7 @@ brt_cursor_shortcut (BRT_CURSOR cursor, int direction, u_int32_t limit, BRT_GET_ ...@@ -4846,6 +4848,7 @@ brt_cursor_shortcut (BRT_CURSOR cursor, int direction, u_int32_t limit, BRT_GET_
lazy_assert_zero(r); lazy_assert_zero(r);
if (toku_brt_cursor_is_leaf_mode(cursor) || !is_le_val_del(le, cursor)) { if (toku_brt_cursor_is_leaf_mode(cursor) || !is_le_val_del(le, cursor)) {
maybe_do_implicit_promotion_on_query(cursor, le); maybe_do_implicit_promotion_on_query(cursor, le);
u_int32_t keylen; u_int32_t keylen;
void *key; void *key;
......
...@@ -10,10 +10,10 @@ ...@@ -10,10 +10,10 @@
struct le_cursor { struct le_cursor {
BRT_CURSOR brt_cursor; BRT_CURSOR brt_cursor;
BRT brt; DBT key; // the key that the le cursor is positioned at
DBT key; // TODO a better implementation would fetch the key from the brt cursor
BOOL neg_infinity; BOOL neg_infinity; // TRUE when the le cursor is positioned at -infinity (initial setting)
BOOL pos_infinity; BOOL pos_infinity; // TRUE when the le cursor is positioned at +infinity (when _next returns DB_NOTFOUND)
}; };
int int
...@@ -25,8 +25,8 @@ le_cursor_create(LE_CURSOR *le_cursor_result, BRT brt, TOKUTXN txn) { ...@@ -25,8 +25,8 @@ le_cursor_create(LE_CURSOR *le_cursor_result, BRT brt, TOKUTXN txn) {
else { else {
result = toku_brt_cursor(brt, &le_cursor->brt_cursor, txn, FALSE); result = toku_brt_cursor(brt, &le_cursor->brt_cursor, txn, FALSE);
if (result == 0) { if (result == 0) {
// TODO move the leaf mode to the brt cursor constructor
toku_brt_cursor_set_leaf_mode(le_cursor->brt_cursor); toku_brt_cursor_set_leaf_mode(le_cursor->brt_cursor);
le_cursor->brt = brt;
toku_init_dbt(&le_cursor->key); toku_init_dbt(&le_cursor->key);
le_cursor->key.flags = DB_DBT_REALLOC; le_cursor->key.flags = DB_DBT_REALLOC;
le_cursor->neg_infinity = TRUE; le_cursor->neg_infinity = TRUE;
...@@ -50,10 +50,16 @@ le_cursor_close(LE_CURSOR le_cursor) { ...@@ -50,10 +50,16 @@ le_cursor_close(LE_CURSOR le_cursor) {
return result; return result;
} }
// this implementation copies the key and leafentry into the supplied DBTs.
// this may be too slow. an alternative implementation could avoid copying the
// key by fetching the key from the brt cursor, and could avoid copying the leaf entry
// by processing the leaf entry in the brt cursor callback.
struct le_cursor_callback_arg { struct le_cursor_callback_arg {
DBT *key, *val; DBT *key, *val;
}; };
// copy the key and the leaf entry to the given DBTs
static int static int
le_cursor_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val, void *v) { le_cursor_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val, void *v) {
struct le_cursor_callback_arg *arg = (struct le_cursor_callback_arg *) v; struct le_cursor_callback_arg *arg = (struct le_cursor_callback_arg *) v;
...@@ -63,31 +69,35 @@ le_cursor_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val, voi ...@@ -63,31 +69,35 @@ le_cursor_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val, voi
} }
int int
le_cursor_next(LE_CURSOR le_cursor, DBT *key, DBT *val) { le_cursor_next(LE_CURSOR le_cursor, DBT *le) {
int result;
if (le_cursor->pos_infinity)
result = DB_NOTFOUND;
else {
le_cursor->neg_infinity = FALSE; le_cursor->neg_infinity = FALSE;
struct le_cursor_callback_arg arg = { &le_cursor->key, val }; struct le_cursor_callback_arg arg = { &le_cursor->key, le };
int result = toku_brt_cursor_get(le_cursor->brt_cursor, NULL, le_cursor_callback, &arg, DB_NEXT); // TODO replace this with a non deprecated function
if (result == 0 && key != NULL) result = toku_brt_cursor_get(le_cursor->brt_cursor, NULL, le_cursor_callback, &arg, DB_NEXT);
toku_dbt_set(le_cursor->key.size, le_cursor->key.data, key, NULL); if (result == DB_NOTFOUND)
else if (result == DB_NOTFOUND)
le_cursor->pos_infinity = TRUE; le_cursor->pos_infinity = TRUE;
}
return result; return result;
} }
int BOOL
is_key_right_of_le_cursor(LE_CURSOR le_cursor, const DBT *key, DB *keycompare_db) { is_key_right_of_le_cursor(LE_CURSOR le_cursor, const DBT *key, DB *keycompare_db) {
int result; BOOL result;
if (le_cursor->neg_infinity) if (le_cursor->neg_infinity)
result = TRUE; result = TRUE; // all keys are right of -infinity
else if (le_cursor->pos_infinity) else if (le_cursor->pos_infinity)
result = FALSE; result = FALSE; // all keys are left of +infinity
else { else {
brt_compare_func keycompare = toku_brt_get_bt_compare(le_cursor->brt); brt_compare_func keycompare = toku_brt_get_bt_compare(le_cursor->brt_cursor->brt);
int r = keycompare(keycompare_db, &le_cursor->key, key); int r = keycompare(keycompare_db, &le_cursor->key, key);
if (r < 0) if (r < 0)
result = TRUE; result = TRUE; // key is right of the cursor key
else else
result = FALSE; result = FALSE; // key is at or left of the cursor key
} }
return result; return result;
} }
......
...@@ -28,13 +28,13 @@ int le_cursor_close(LE_CURSOR le_cursor); ...@@ -28,13 +28,13 @@ int le_cursor_close(LE_CURSOR le_cursor);
// Retrieve the next leaf entry under the LE_CURSOR // Retrieve the next leaf entry under the LE_CURSOR
// Success: returns zero, stores the leaf entry key into the key dbt, and the leaf entry into the val dbt // Success: returns zero, stores the leaf entry key into the key dbt, and the leaf entry into the val dbt
// Failure: returns a non-zero error number // Failure: returns a non-zero error number
int le_cursor_next(LE_CURSOR le_cursor, DBT *key, DBT *val); int le_cursor_next(LE_CURSOR le_cursor, DBT *le);
// Return TRUE if the key is to the right of the LE_CURSOR position // Return TRUE if the key is to the right of the LE_CURSOR position
// Otherwise returns FALSE // Otherwise returns FALSE when the key is at or to the left of the LE_CURSOR position
// The LE_CURSOR position is intialized to -infinity. Any key comparision with -infinity returns TRUE. // The LE_CURSOR position is intialized to -infinity. Any key comparision with -infinity returns TRUE.
// When the cursor runs off the right edge of the tree, the LE_CURSOR position is set to +infinity. Any key comparision with +infinity // When the cursor runs off the right edge of the tree, the LE_CURSOR position is set to +infinity. Any key comparision with +infinity
// returns FALSE. // returns FALSE.
int is_key_right_of_le_cursor(LE_CURSOR le_cursor, const DBT *key, DB *keycompare_db); BOOL is_key_right_of_le_cursor(LE_CURSOR le_cursor, const DBT *key, DB *keycompare_db);
#endif #endif
...@@ -146,16 +146,16 @@ test_provdel(const char *logdir, const char *fname, int n) { ...@@ -146,16 +146,16 @@ test_provdel(const char *logdir, const char *fname, int n) {
int i; int i;
for (i=0; ; i++) { for (i=0; ; i++) {
error = le_cursor_next(cursor, &key, &val); error = le_cursor_next(cursor, &val);
if (error != 0) if (error != 0)
break; break;
assert(key.size == sizeof (int));
int ii;
memcpy(&ii, key.data, key.size);
assert((int) toku_htonl(i) == ii);
LEAFENTRY le = (LEAFENTRY) val.data; LEAFENTRY le = (LEAFENTRY) val.data;
assert(le->type == LE_MVCC); assert(le->type == LE_MVCC);
assert(le->keylen == sizeof (int));
int ii;
memcpy(&ii, le->u.mvcc.key_xrs, le->keylen);
assert((int) toku_htonl(i) == ii);
} }
assert(i == n); assert(i == n);
......
...@@ -152,16 +152,17 @@ test_pos_infinity(const char *fname, int n) { ...@@ -152,16 +152,17 @@ test_pos_infinity(const char *fname, int n) {
int i; int i;
for (i = 0; ; i++) { for (i = 0; ; i++) {
error = le_cursor_next(cursor, &key, &val); error = le_cursor_next(cursor, &val);
if (error != 0) if (error != 0)
break; break;
assert(key.size == sizeof (int));
int ii;
memcpy(&ii, key.data, key.size);
assert((int) toku_htonl(i) == ii);
LEAFENTRY le = (LEAFENTRY) val.data; LEAFENTRY le = (LEAFENTRY) val.data;
assert(le->type == LE_MVCC); assert(le->type == LE_MVCC);
assert(le->keylen == sizeof (int));
int ii;
memcpy(&ii, le->u.mvcc.key_xrs, le->keylen);
assert((int) toku_htonl(i) == ii);
} }
assert(i == n); assert(i == n);
...@@ -213,16 +214,16 @@ test_between(const char *fname, int n) { ...@@ -213,16 +214,16 @@ test_between(const char *fname, int n) {
int i; int i;
for (i = 0; ; i++) { for (i = 0; ; i++) {
// move the LE_CURSOR forward // move the LE_CURSOR forward
error = le_cursor_next(cursor, &key, &val); error = le_cursor_next(cursor, &val);
if (error != 0) if (error != 0)
break; break;
assert(key.size == sizeof (int));
int ii;
memcpy(&ii, key.data, key.size);
assert((int) toku_htonl(i) == ii);
LEAFENTRY le = (LEAFENTRY) val.data; LEAFENTRY le = (LEAFENTRY) val.data;
assert(le->type == LE_MVCC); assert(le->type == LE_MVCC);
assert(le->keylen == sizeof (int));
int ii;
memcpy(&ii, le->u.mvcc.key_xrs, le->keylen);
assert((int) toku_htonl(i) == ii);
// test that 0 .. i is not right of the cursor // test that 0 .. i is not right of the cursor
for (int j = 0; j <= i; j++) { for (int j = 0; j <= i; j++) {
......
...@@ -112,16 +112,16 @@ walk_tree(const char *fname, int n) { ...@@ -112,16 +112,16 @@ walk_tree(const char *fname, int n) {
int i; int i;
for (i = 0; ; i++) { for (i = 0; ; i++) {
error = le_cursor_next(cursor, &key, &val); error = le_cursor_next(cursor, &val);
if (error != 0) if (error != 0)
break; break;
assert(key.size == sizeof (int));
int ii;
memcpy(&ii, key.data, key.size);
assert((int) toku_htonl(i) == ii);
LEAFENTRY le = (LEAFENTRY) val.data; LEAFENTRY le = (LEAFENTRY) val.data;
assert(le->type == LE_MVCC); assert(le->type == LE_MVCC);
assert(le->keylen == sizeof (int));
int ii;
memcpy(&ii, le->u.mvcc.key_xrs, le->keylen);
assert((int) toku_htonl(i) == ii);
} }
assert(i == n); assert(i == n);
...@@ -151,19 +151,28 @@ init_logdir(const char *logdir) { ...@@ -151,19 +151,28 @@ init_logdir(const char *logdir) {
assert(error == 0); assert(error == 0);
} }
int static void
test_main (int argc , const char *argv[]) { run_test(const char *logdir, const char *brtfile, int n) {
default_parse_args(argc, argv);
const char *logdir = "dir." __FILE__;
init_logdir(logdir); init_logdir(logdir);
int error = chdir(logdir); int error = chdir(logdir);
assert(error == 0); assert(error == 0);
const int n = 1000;
const char *brtfile = __FILE__ ".brt";
create_populate_tree(".", brtfile, n); create_populate_tree(".", brtfile, n);
walk_tree(brtfile, n); walk_tree(brtfile, n);
error = chdir("..");
assert(error == 0);
}
int
test_main (int argc , const char *argv[]) {
default_parse_args(argc, argv);
const char *logdir = "dir." __FILE__;
const char *brtfile = __FILE__ ".brt";
run_test(logdir, brtfile, 0);
run_test(logdir, brtfile, 1000);
return 0; return 0;
} }
...@@ -231,7 +231,7 @@ build_index(DB_INDEXER *indexer) { ...@@ -231,7 +231,7 @@ build_index(DB_INDEXER *indexer) {
toku_ydb_lock(); toku_ydb_lock();
result = le_cursor_next(indexer->i->lec, &key, &le); result = le_cursor_next(indexer->i->lec, &le);
if (result != 0) { if (result != 0) {
done = TRUE; done = TRUE;
if (result == DB_NOTFOUND) if (result == DB_NOTFOUND)
...@@ -243,6 +243,7 @@ build_index(DB_INDEXER *indexer) { ...@@ -243,6 +243,7 @@ build_index(DB_INDEXER *indexer) {
DB *db = indexer->i->dest_dbs[which_db]; DB *db = indexer->i->dest_dbs[which_db];
result = indexer_undo_do(indexer, db, ule); result = indexer_undo_do(indexer, db, ule);
if ( (result != 0) && (indexer->i->error_callback != NULL)) { if ( (result != 0) && (indexer->i->error_callback != NULL)) {
toku_dbt_set(ule_get_keylen(ule), ule_get_key(ule), &key, NULL);
indexer->i->error_callback(db, which_db, result, &key, NULL, indexer->i->error_extra); indexer->i->error_callback(db, which_db, result, &key, NULL, indexer->i->error_extra);
} }
} }
......
...@@ -204,6 +204,7 @@ BDB_DONTRUN_TESTS = \ ...@@ -204,6 +204,7 @@ BDB_DONTRUN_TESTS = \
upgrade-test-2 \ upgrade-test-2 \
upgrade-test-3 \ upgrade-test-3 \
upgrade-test-4 \ upgrade-test-4 \
upgrade-test-5 \
zombie_db \ zombie_db \
#\ ends prev line #\ ends prev line
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment