Commit 9de46d5f authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:3436], [t:3782], merge ydb level bulk fetch to main line

git-svn-id: file:///svn/toku/tokudb@33770 c7de825b-a66e-492c-adef-691d508d4ae1
parent 288258f4
......@@ -290,6 +290,7 @@ typedef enum {
#define TOKUDB_UPGRADE_FAILURE -100011
#define TOKUDB_TRY_AGAIN -100012
#define TOKUDB_NEEDS_REPAIR -100013
#define TOKUDB_CURSOR_CONTINUE -100014
/* LOADER flags */
#define LOADER_USE_PUTS 1
typedef int (*generate_row_for_put_func)(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val);
......
......@@ -292,6 +292,7 @@ typedef enum {
#define TOKUDB_UPGRADE_FAILURE -100011
#define TOKUDB_TRY_AGAIN -100012
#define TOKUDB_NEEDS_REPAIR -100013
#define TOKUDB_CURSOR_CONTINUE -100014
/* LOADER flags */
#define LOADER_USE_PUTS 1
typedef int (*generate_row_for_put_func)(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val);
......
......@@ -292,6 +292,7 @@ typedef enum {
#define TOKUDB_UPGRADE_FAILURE -100011
#define TOKUDB_TRY_AGAIN -100012
#define TOKUDB_NEEDS_REPAIR -100013
#define TOKUDB_CURSOR_CONTINUE -100014
/* LOADER flags */
#define LOADER_USE_PUTS 1
typedef int (*generate_row_for_put_func)(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val);
......
......@@ -292,6 +292,7 @@ typedef enum {
#define TOKUDB_UPGRADE_FAILURE -100011
#define TOKUDB_TRY_AGAIN -100012
#define TOKUDB_NEEDS_REPAIR -100013
#define TOKUDB_CURSOR_CONTINUE -100014
/* LOADER flags */
#define LOADER_USE_PUTS 1
typedef int (*generate_row_for_put_func)(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val);
......
......@@ -293,6 +293,7 @@ typedef enum {
#define TOKUDB_UPGRADE_FAILURE -100011
#define TOKUDB_TRY_AGAIN -100012
#define TOKUDB_NEEDS_REPAIR -100013
#define TOKUDB_CURSOR_CONTINUE -100014
/* LOADER flags */
#define LOADER_USE_PUTS 1
typedef int (*generate_row_for_put_func)(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val);
......
......@@ -75,6 +75,7 @@ enum {
TOKUDB_UPGRADE_FAILURE = -100011,
TOKUDB_TRY_AGAIN = -100012,
TOKUDB_NEEDS_REPAIR = -100013,
TOKUDB_CURSOR_CONTINUE = -100014,
};
static void print_defines (void) {
......@@ -226,6 +227,7 @@ static void print_defines (void) {
dodefine(TOKUDB_UPGRADE_FAILURE);
dodefine(TOKUDB_TRY_AGAIN);
dodefine(TOKUDB_NEEDS_REPAIR);
dodefine(TOKUDB_CURSOR_CONTINUE);
/* LOADER flags */
printf("/* LOADER flags */\n");
......
......@@ -293,6 +293,7 @@ typedef enum {
#define TOKUDB_UPGRADE_FAILURE -100011
#define TOKUDB_TRY_AGAIN -100012
#define TOKUDB_NEEDS_REPAIR -100013
#define TOKUDB_CURSOR_CONTINUE -100014
/* LOADER flags */
#define LOADER_USE_PUTS 1
typedef int (*generate_row_for_put_func)(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val);
......
......@@ -283,7 +283,8 @@ static int counttotalbytes (DBT const *key, DBT const *data, void *extrav) {
printf("%s:%d %"PRIu64" %"PRIu64"\n", __FUNCTION__, __LINE__, k, expect_key);
expect_key = k + 1;
}
return 0;
return TOKUDB_CURSOR_CONTINUE;
//return 0;
}
static void scanscan_lwc (void) {
......@@ -306,7 +307,7 @@ static void scanscan_lwc (void) {
rowcounter++;
if (limitcount>0 && rowcounter>=limitcount) break;
}
assert(r==DB_NOTFOUND);
//assert(r==DB_NOTFOUND);
r = dbc->c_close(dbc); assert(r==0);
double thistime = gettime();
double tdiff = thistime-prevtime;
......
......@@ -293,6 +293,7 @@ typedef enum {
#define TOKUDB_UPGRADE_FAILURE -100011
#define TOKUDB_TRY_AGAIN -100012
#define TOKUDB_NEEDS_REPAIR -100013
#define TOKUDB_CURSOR_CONTINUE -100014
/* LOADER flags */
#define LOADER_USE_PUTS 1
typedef int (*generate_row_for_put_func)(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val);
......
......@@ -429,7 +429,6 @@ struct brt_header {
unsigned int flags;
DESCRIPTOR_S descriptor;
u_int64_t root_put_counter; // the generation number of the brt
BLOCK_TABLE blocktable;
// If a transaction created this BRT, which one?
......@@ -549,8 +548,6 @@ struct brt_cursor_leaf_info_to_be {
// Values to be used to pin a leaf for shortcut searches
struct brt_cursor_leaf_info {
BLOCKNUM blocknumber;
u_int32_t fullhash;
struct brt_cursor_leaf_info_to_be to_be;
};
......@@ -558,13 +555,10 @@ struct brt_cursor_leaf_info {
struct brt_cursor {
struct toku_list cursors_link;
BRT brt;
BOOL current_in_omt;
BOOL prefetching;
DBT key, val; // The key-value pair that the cursor currently points to
DBT range_lock_left_key, range_lock_right_key;
BOOL left_is_neg_infty, right_is_pos_infty;
OMTCURSOR omtcursor;
u_int64_t root_put_counter; // what was the count on the BRT when we validated the cursor?
TXNID oldest_living_xid;// what was the oldest live txnid when we created the cursor?
BOOL is_snapshot_read; // true if query is read_committed, false otherwise
BOOL is_leaf_mode;
......
......@@ -142,7 +142,7 @@ int toku_testsetup_insert_to_leaf (BRT brt, BLOCKNUM blocknum, char *key, int ke
struct cmd_leafval_heaviside_extra be = {brt, &keydbt};
r = toku_omt_find_zero(BLB_BUFFER(node, 0), toku_cmd_leafval_heaviside, &be, &storeddatav, &idx, NULL);
r = toku_omt_find_zero(BLB_BUFFER(node, 0), toku_cmd_leafval_heaviside, &be, &storeddatav, &idx);
if (r==0) {
......
......@@ -83,7 +83,7 @@ verify_msg_in_child_buffer(BRT brt, int type, MSN msn, bytevec key, ITEMLEN keyl
static LEAFENTRY
get_ith_leafentry (BASEMENTNODE bn, int i) {
OMTVALUE le_v;
int r = toku_omt_fetch(bn->buffer, i, &le_v, NULL);
int r = toku_omt_fetch(bn->buffer, i, &le_v);
invariant(r == 0); // this is a bad failure if it happens.
return (LEAFENTRY)le_v;
}
......
......@@ -143,14 +143,6 @@ toku_brt_header_suppress_rollbacks(struct brt_header *h, TOKUTXN txn) {
h->root_that_created_or_locked_when_empty = rootid;
}
static void brt_cursor_invalidate(BRT_CURSOR brtcursor);
// We invalidate all the OMTCURSORS any time we push into the root of the BRT for that OMT.
// We keep a counter on each brt header, but if the brt header is evicted from the cachetable
// then we lose that counter. So we also keep a global counter.
// An alternative would be to keep only the global counter. But that would invalidate all OMTCURSORS
// even from unrelated BRTs. This way we only invalidate an OMTCURSOR if
static u_int64_t global_root_put_counter = 0;
enum reactivity { RE_STABLE, RE_FUSIBLE, RE_FISSIBLE };
......@@ -509,7 +501,7 @@ toku_verify_estimates (BRT t, BRTNODE node, ANCESTORS ancestors, struct pivot_bo
static LEAFENTRY
fetch_from_buf (OMT omt, u_int32_t idx) {
OMTVALUE v = 0;
int r = toku_omt_fetch(omt, idx, &v, NULL);
int r = toku_omt_fetch(omt, idx, &v);
assert_zero(r);
return (LEAFENTRY)v;
}
......@@ -1167,7 +1159,7 @@ brtleaf_disk_size(BRTNODE node)
for (j=0; j < n_leafentries; j++) {
OMTVALUE v;
LEAFENTRY curr_le = NULL;
int r = toku_omt_fetch(curr_buffer, j, &v, NULL);
int r = toku_omt_fetch(curr_buffer, j, &v);
curr_le = v;
assert_zero(r);
retval += leafentry_disksize(curr_le);
......@@ -1195,7 +1187,7 @@ brtleaf_get_split_loc(
for (j=0; j < n_leafentries; j++) {
LEAFENTRY curr_le = NULL;
OMTVALUE v;
int r = toku_omt_fetch(curr_buffer, j, &v, NULL);
int r = toku_omt_fetch(curr_buffer, j, &v);
curr_le = v;
assert_zero(r);
size_so_far += leafentry_disksize(curr_le);
......@@ -1376,7 +1368,7 @@ brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
if (splitk) {
memset(splitk, 0, sizeof *splitk);
OMTVALUE lev = 0;
int r=toku_omt_fetch(BLB_BUFFER(node, split_node), toku_omt_size(BLB_BUFFER(node, split_node))-1, &lev, NULL);
int r=toku_omt_fetch(BLB_BUFFER(node, split_node), toku_omt_size(BLB_BUFFER(node, split_node))-1, &lev);
assert_zero(r); // that fetch should have worked.
LEAFENTRY le=lev;
splitk->size = le_keylen(le);
......@@ -1903,7 +1895,7 @@ brt_leaf_put_cmd (
*made_change = 1;
if (doing_seqinsert) {
idx = toku_omt_size(bn->buffer);
r = toku_omt_fetch(bn->buffer, idx-1, &storeddatav, NULL);
r = toku_omt_fetch(bn->buffer, idx-1, &storeddatav);
if (r != 0) goto fz;
storeddata = storeddatav;
int cmp = toku_cmd_leafval_heaviside(storeddata, &be);
......@@ -1912,7 +1904,7 @@ brt_leaf_put_cmd (
} else {
fz:
r = toku_omt_find_zero(bn->buffer, toku_cmd_leafval_heaviside, &be,
&storeddatav, &idx, NULL);
&storeddatav, &idx);
}
if (r==DB_NOTFOUND) {
storeddata = 0;
......@@ -1944,7 +1936,7 @@ brt_leaf_put_cmd (
// Apply to all the matches
r = toku_omt_find_zero(bn->buffer, toku_cmd_leafval_heaviside, &be,
&storeddatav, &idx, NULL);
&storeddatav, &idx);
if (r == DB_NOTFOUND) break;
assert(r==0);
storeddata=storeddatav;
......@@ -1969,7 +1961,7 @@ brt_leaf_put_cmd (
assert(idx <= num_leafentries_after);
if (idx == num_leafentries_after) break; //Reached the end of the leaf
r = toku_omt_fetch(bn->buffer, idx, &storeddatav, NULL);
r = toku_omt_fetch(bn->buffer, idx, &storeddatav);
assert_zero(r);
}
storeddata=storeddatav;
......@@ -1995,7 +1987,7 @@ brt_leaf_put_cmd (
// Apply to all leafentries
omt_size = toku_omt_size(bn->buffer);
for (u_int32_t idx = 0; idx < omt_size; ) {
r = toku_omt_fetch(bn->buffer, idx, &storeddatav, NULL);
r = toku_omt_fetch(bn->buffer, idx, &storeddatav);
assert_zero(r);
storeddata=storeddatav;
int deleted = 0;
......@@ -2022,7 +2014,7 @@ brt_leaf_put_cmd (
// Apply to all leafentries if txn is represented
omt_size = toku_omt_size(bn->buffer);
for (u_int32_t idx = 0; idx < omt_size; ) {
r = toku_omt_fetch(bn->buffer, idx, &storeddatav, NULL);
r = toku_omt_fetch(bn->buffer, idx, &storeddatav);
assert_zero(r);
storeddata=storeddatav;
int deleted = 0;
......@@ -2047,7 +2039,7 @@ brt_leaf_put_cmd (
case BRT_UPDATE: {
u_int32_t idx;
r = toku_omt_find_zero(bn->buffer, toku_cmd_leafval_heaviside, &be,
&storeddatav, &idx, NULL);
&storeddatav, &idx);
if (r==DB_NOTFOUND) {
r = do_update(t, bn, se, cmd, idx, NULL, logger, made_change, workdone);
} else if (r==0) {
......@@ -2061,7 +2053,7 @@ brt_leaf_put_cmd (
u_int32_t idx = 0;
u_int32_t num_leafentries_before;
while (idx < (num_leafentries_before = toku_omt_size(bn->buffer))) {
r = toku_omt_fetch(bn->buffer, idx, &storeddatav, NULL);
r = toku_omt_fetch(bn->buffer, idx, &storeddatav);
assert(r==0);
storeddata=storeddatav;
r = do_update(t, bn, se, cmd, idx, storeddata, logger, made_change, workdone);
......@@ -3156,7 +3148,6 @@ toku_brt_root_put_cmd (BRT brt, BRT_MSG_S * cmd)
//assert(0==toku_cachetable_assert_all_unpinned(brt->cachetable));
assert(brt->h);
brt->h->root_put_counter = global_root_put_counter++;
u_int32_t fullhash;
rootp = toku_calculate_root_offset_pointer(brt, &fullhash);
......@@ -3721,8 +3712,6 @@ brt_init_header_partial (BRT t, TOKUTXN txn) {
compute_and_fill_remembered_hash(t);
t->h->root_put_counter = global_root_put_counter++;
BLOCKNUM root = t->h->root;
if ((r=setup_initial_brt_root_node(t, root))!=0) { return r; }
//printf("%s:%d putting %p (%d)\n", __FILE__, __LINE__, t->h, 0);
......@@ -3815,7 +3804,6 @@ int toku_read_brt_header_and_store_in_cachefile (CACHEFILE cf, LSN max_acceptabl
}
if (r!=0) return r;
h->cf = cf;
h->root_put_counter = global_root_put_counter++;
toku_cachefile_set_userdata(cf,
(void*)h,
brtheader_log_fassociate_during_checkpoint,
......@@ -4154,7 +4142,6 @@ brt_redirect_cursors (BRT brt_to, BRT brt_from) {
while (!toku_list_empty(&brt_from->cursors)) {
struct toku_list * c_list = toku_list_head(&brt_from->cursors);
BRT_CURSOR c = toku_list_struct(c_list, struct brt_cursor, cursors_link);
brt_cursor_invalidate(c);
toku_list_remove(&c->cursors_link);
......@@ -4799,12 +4786,10 @@ int toku_brt_flush (BRT brt) {
static inline void
brt_cursor_cleanup_dbts(BRT_CURSOR c) {
if (!c->current_in_omt) {
if (c->key.data) toku_free(c->key.data);
if (c->val.data) toku_free(c->val.data);
memset(&c->key, 0, sizeof(c->key));
memset(&c->val, 0, sizeof(c->val));
}
}
//
......@@ -4834,14 +4819,13 @@ int does_txn_read_entry(TXNID id, TOKUTXN context) {
return rval;
}
static inline int brt_cursor_extract_key_and_val(
static inline void brt_cursor_extract_key_and_val(
LEAFENTRY le,
BRT_CURSOR cursor,
u_int32_t *keylen,
void **key,
u_int32_t *vallen,
void **val) {
int r = 0;
if (toku_brt_cursor_is_leaf_mode(cursor)) {
*key = le_key_and_len(le, keylen);
*val = le;
......@@ -4859,53 +4843,6 @@ static inline int brt_cursor_extract_key_and_val(
*key = le_key_and_len(le, keylen);
*val = le_latest_val_and_len(le, vallen);
}
return r;
}
static inline void load_dbts_from_omt(BRT_CURSOR c, DBT *key, DBT *val) {
OMTVALUE le = 0;
int r = toku_omt_cursor_current(c->omtcursor, &le);
assert_zero(r);
r = brt_cursor_extract_key_and_val(le,
c,
&key->size,
&key->data,
&val->size,
&val->data);
assert_zero(r);
}
// When an omt cursor is invalidated, this is the brt-level function
// that is called. This function is only called by the omt logic.
// This callback is called when either (a) the brt logic invalidates one
// cursor (see brt_cursor_invalidate()) or (b) when the omt logic invalidates
// all the cursors for an omt.
static void
brt_cursor_invalidate_callback(OMTCURSOR UU(omt_c), void *extra) {
BRT_CURSOR cursor = extra;
//TODO: #1378 assert that this thread owns omt lock in brtcursor
if (cursor->current_in_omt) {
DBT key,val;
load_dbts_from_omt(cursor, toku_init_dbt(&key), toku_init_dbt(&val));
cursor->key.data = toku_memdup(key.data, key.size);
cursor->val.data = toku_memdup(val.data, val.size);
cursor->key.size = key.size;
cursor->val.size = val.size;
//TODO: Find some way to deal with ENOMEM here.
//Until then, just assert that the memdups worked.
assert(cursor->key.data && cursor->val.data);
cursor->current_in_omt = FALSE;
}
}
// Called at start of every slow query, and only from slow queries.
// When all cursors are invalidated (from writer thread, or insert/delete),
// this function is not used.
static void
brt_cursor_invalidate(BRT_CURSOR brtcursor) {
toku_omt_cursor_invalidate(brtcursor->omtcursor); // will call brt_cursor_invalidate_callback()
}
int toku_brt_cursor (
......@@ -4929,7 +4866,6 @@ int toku_brt_cursor (
return ENOMEM;
memset(cursor, 0, sizeof(*cursor));
cursor->brt = brt;
cursor->current_in_omt = FALSE;
cursor->prefetching = FALSE;
toku_init_dbt(&cursor->range_lock_left_key);
toku_init_dbt(&cursor->range_lock_right_key);
......@@ -4940,11 +4876,6 @@ int toku_brt_cursor (
cursor->is_leaf_mode = FALSE;
cursor->ttxn = ttxn;
toku_list_push(&brt->cursors, &cursor->cursors_link);
int r = toku_omt_cursor_create(&cursor->omtcursor);
assert_zero(r);
toku_omt_cursor_set_invalidate_callback(cursor->omtcursor,
brt_cursor_invalidate_callback, cursor);
cursor->root_put_counter=0;
*cursorptr = cursor;
return 0;
}
......@@ -4986,19 +4917,9 @@ toku_brt_cursor_set_range_lock(BRT_CURSOR cursor, const DBT *left, const DBT *ri
}
}
// Called during cursor destruction
// It is the same as brt_cursor_invalidate, except that
// we make sure the callback function is never called.
static void
brt_cursor_invalidate_no_callback(BRT_CURSOR brtcursor) {
toku_omt_cursor_set_invalidate_callback(brtcursor->omtcursor, NULL, NULL);
toku_omt_cursor_invalidate(brtcursor->omtcursor); // will NOT call brt_cursor_invalidate_callback()
}
//TODO: #1378 When we split the ydb lock, touching cursor->cursors_link
//is not thread safe.
int toku_brt_cursor_close(BRT_CURSOR cursor) {
brt_cursor_invalidate_no_callback(cursor);
brt_cursor_cleanup_dbts(cursor);
if (cursor->range_lock_left_key.data) {
toku_free(cursor->range_lock_left_key.data);
......@@ -5009,7 +4930,6 @@ int toku_brt_cursor_close(BRT_CURSOR cursor) {
toku_destroy_dbt(&cursor->range_lock_right_key);
}
toku_list_remove(&cursor->cursors_link);
toku_omt_cursor_destroy(&cursor->omtcursor);
toku_free_n(cursor, sizeof *cursor);
return 0;
}
......@@ -5026,7 +4946,7 @@ static inline BOOL brt_cursor_prefetching(BRT_CURSOR cursor) {
static BOOL
brt_cursor_not_set(BRT_CURSOR cursor) {
assert((cursor->key.data==NULL) == (cursor->val.data==NULL));
return (BOOL)(!cursor->current_in_omt && cursor->key.data == NULL);
return (BOOL)(cursor->key.data == NULL);
}
static int
......@@ -5056,30 +4976,6 @@ heaviside_from_search_t (OMTVALUE lev, void *extra) {
}
// This is the only function that associates a brt cursor (and its contained
// omt cursor) with a brt node (and its associated omt). This is different
// from older code because the old code associated the omt cursor with the
// omt when the search found a match. In this new design, the omt cursor
// will not be associated with the omt until after the application-level
// callback accepts the search result.
// The lock is necessary because we don't want two threads modifying
// the omt's list of cursors simultaneously.
// Note, this is only place in brt code that calls toku_omt_cursor_set_index().
// Requires: cursor->omtcursor is valid
static inline void
brt_cursor_update(BRT_CURSOR brtcursor) {
//Free old version if it is using local memory.
OMTCURSOR omtcursor = brtcursor->omtcursor;
if (!brtcursor->current_in_omt) {
brt_cursor_cleanup_dbts(brtcursor);
brtcursor->current_in_omt = TRUE;
toku_omt_cursor_associate(brtcursor->leaf_info.to_be.omt, omtcursor);
//no longer touching linked list, and
//only one thread can touch cursor at a time, protected by ydb lock
}
toku_omt_cursor_set_index(omtcursor, brtcursor->leaf_info.to_be.index);
}
//
// Returns true if the value that is to be read is empty.
//
......@@ -5402,6 +5298,20 @@ maybe_apply_ancestors_messages_to_node (BRT t, BRTNODE node, ANCESTORS ancestors
VERIFY_NODE(t, node);
}
static int
brt_cursor_shortcut (
BRT_CURSOR cursor,
int direction,
BRT_GET_CALLBACK_FUNCTION getf,
void *getf_v,
u_int32_t *keylen,
void **key,
u_int32_t *vallen,
void **val
);
// This is a bottom layer of the search functions.
static int
brt_search_basement_node(
......@@ -5410,9 +5320,8 @@ brt_search_basement_node(
BRT_GET_CALLBACK_FUNCTION getf,
void *getf_v,
BOOL *doprefetch,
BLOCKNUM thisnodename,
u_int32_t fullhash,
BRT_CURSOR brtcursor
BRT_CURSOR brtcursor,
BOOL can_bulk_fetch
)
{
assert(bn->max_dsn_applied.dsn >= MIN_DSN.dsn);
......@@ -5432,7 +5341,7 @@ brt_search_basement_node(
heaviside_from_search_t,
search,
direction,
&datav, &idx, NULL);
&datav, &idx);
if (r!=0) return r;
LEAFENTRY le = datav;
......@@ -5454,7 +5363,7 @@ brt_search_basement_node(
default:
assert(FALSE);
}
r = toku_omt_fetch(bn->buffer, idx, &datav, NULL);
r = toku_omt_fetch(bn->buffer, idx, &datav);
assert_zero(r); // we just validated the index
le = datav;
if (!is_le_val_del(le,brtcursor)) goto got_a_good_value;
......@@ -5467,35 +5376,42 @@ brt_search_basement_node(
u_int32_t vallen;
void *val;
r = brt_cursor_extract_key_and_val(le,
brt_cursor_extract_key_and_val(le,
brtcursor,
&keylen,
&key,
&vallen,
&val);
&val
);
assert(brtcursor->current_in_omt == FALSE);
if (r==0) {
r = getf(keylen, key, vallen, val, getf_v);
}
if (r==0) {
// Leave the omtcursor alone above (pass NULL to omt_find/fetch)
// This prevents the omt from calling associate(), which would
// require a lock to keep the list of cursors safe when the omt
// is used by the brt. (We don't want to impose the locking requirement
// on the omt for non-brt uses.)
//
// Instead, all associating of omtcursors with omts (for leaf nodes)
// is done in brt_cursor_update.
if (r==0 || r == TOKUDB_CURSOR_CONTINUE) {
brtcursor->leaf_info.to_be.omt = bn->buffer;
brtcursor->leaf_info.to_be.index = idx;
brtcursor->leaf_info.fullhash = fullhash;
brtcursor->leaf_info.blocknumber = thisnodename;
brt_cursor_update(brtcursor);
if (r == TOKUDB_CURSOR_CONTINUE && can_bulk_fetch) {
r = brt_cursor_shortcut(
brtcursor,
direction,
getf,
getf_v,
&keylen,
&key,
&vallen,
&val
);
}
brt_cursor_cleanup_dbts(brtcursor);
brtcursor->key.data = toku_memdup(key, keylen);
brtcursor->val.data = toku_memdup(val, vallen);
brtcursor->key.size = keylen;
brtcursor->val.size = vallen;
//The search was successful. Prefetching can continue.
*doprefetch = TRUE;
}
}
if (r == TOKUDB_CURSOR_CONTINUE) r = 0;
return r;
}
......@@ -5511,7 +5427,8 @@ brt_search_node (
BRT_CURSOR brtcursor,
UNLOCKERS unlockers,
ANCESTORS,
struct pivot_bounds const * const bounds
struct pivot_bounds const * const bounds,
BOOL can_bulk_fetch
);
// the number of nodes to prefetch
......@@ -5591,7 +5508,7 @@ unlock_brtnode_fun (void *v) {
/* search in a node's child */
static int
brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v, BOOL *doprefetch, BRT_CURSOR brtcursor, UNLOCKERS unlockers,
ANCESTORS ancestors, struct pivot_bounds const * const bounds)
ANCESTORS ancestors, struct pivot_bounds const * const bounds, BOOL can_bulk_fetch)
// Effect: Search in a node's child. Searches are read-only now (at least as far as the hardcopy is concerned).
{
struct ancestors next_ancestors = {node, childnum, ancestors};
......@@ -5624,7 +5541,7 @@ brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, BRT_
struct unlock_brtnode_extra unlock_extra = {brt,childnode};
struct unlockers next_unlockers = {TRUE, unlock_brtnode_fun, (void*)&unlock_extra, unlockers};
int r = brt_search_node(brt, childnode, search, bfe.child_to_read, getf, getf_v, doprefetch, brtcursor, &next_unlockers, &next_ancestors, bounds);
int r = brt_search_node(brt, childnode, search, bfe.child_to_read, getf, getf_v, doprefetch, brtcursor, &next_unlockers, &next_ancestors, bounds, can_bulk_fetch);
if (r!=TOKUDB_TRY_AGAIN) {
// Even if r is reactive, we want to handle the maybe reactive child.
......@@ -5714,7 +5631,8 @@ brt_search_node(
BRT_CURSOR brtcursor,
UNLOCKERS unlockers,
ANCESTORS ancestors,
struct pivot_bounds const * const bounds
struct pivot_bounds const * const bounds,
BOOL can_bulk_fetch
)
{ int r = 0;
// assert that we got a valid child_to_search
......@@ -5749,7 +5667,8 @@ brt_search_node(
brtcursor,
unlockers,
ancestors,
&next_bounds
&next_bounds,
can_bulk_fetch
);
}
else {
......@@ -5759,9 +5678,8 @@ brt_search_node(
getf,
getf_v,
doprefetch,
node->thisnodename,
node->fullhash,
brtcursor
brtcursor,
can_bulk_fetch
);
}
if (r == 0) return r; //Success
......@@ -5799,7 +5717,7 @@ brt_search_node(
}
static int
toku_brt_search (BRT brt, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v, BRT_CURSOR brtcursor, u_int64_t *root_put_counter)
toku_brt_search (BRT brt, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v, BRT_CURSOR brtcursor, BOOL can_bulk_fetch)
// Effect: Perform a search. Associate cursor with a leaf if possible.
// All searches are performed through this function.
{
......@@ -5809,8 +5727,6 @@ toku_brt_search (BRT brt, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf,
assert(brt->h);
*root_put_counter = brt->h->root_put_counter;
u_int32_t fullhash;
CACHEKEY *rootp = toku_calculate_root_offset_pointer(brt, &fullhash);
......@@ -5861,7 +5777,7 @@ toku_brt_search (BRT brt, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf,
{
BOOL doprefetch = FALSE;
//static int counter = 0; counter++;
r = brt_search_node(brt, node, search, bfe.child_to_read, getf, getf_v, &doprefetch, brtcursor, &unlockers, (ANCESTORS)NULL, &infinite_bounds);
r = brt_search_node(brt, node, search, bfe.child_to_read, getf, getf_v, &doprefetch, brtcursor, &unlockers, (ANCESTORS)NULL, &infinite_bounds, can_bulk_fetch);
if (r==TOKUDB_TRY_AGAIN) {
// there are two cases where we get TOKUDB_TRY_AGAIN
// case 1 is when some later call to toku_pin_brtnode returned
......@@ -5913,10 +5829,9 @@ struct brt_cursor_search_struct {
/* search for the first kv pair that matches the search object */
static int
brt_cursor_search(BRT_CURSOR cursor, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v)
brt_cursor_search(BRT_CURSOR cursor, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v, BOOL can_bulk_fetch)
{
brt_cursor_invalidate(cursor);
int r = toku_brt_search(cursor->brt, search, getf, getf_v, cursor, &cursor->root_put_counter);
int r = toku_brt_search(cursor->brt, search, getf, getf_v, cursor, can_bulk_fetch);
return r;
}
......@@ -5946,7 +5861,6 @@ brt_cursor_current_getf(ITEMLEN keylen, bytevec key,
} else {
BRT_CURSOR cursor = bcss->cursor;
DBT newkey = {.size=keylen, .data=(void*)key}; // initializes other fields to zero
//Safe to access cursor->key/val because current_in_omt is FALSE
if (compare_k_x(cursor->brt, &cursor->key, &newkey) != 0) {
r = bcss->getf(0, NULL, 0, NULL, bcss->getf_v); // This was once DB_KEYEMPTY
if (r==0) r = TOKUDB_FOUND_BUT_REJECTED;
......@@ -5963,14 +5877,12 @@ toku_brt_cursor_current(BRT_CURSOR cursor, int op, BRT_GET_CALLBACK_FUNCTION get
if (brt_cursor_not_set(cursor))
return EINVAL;
if (op == DB_CURRENT) {
brt_cursor_invalidate(cursor);
struct brt_cursor_search_struct bcss = {getf, getf_v, cursor, 0};
brt_search_t search; brt_search_init(&search, brt_cursor_compare_set, BRT_SEARCH_LEFT, &cursor->key, cursor->brt);
int r = toku_brt_search(cursor->brt, &search, brt_cursor_current_getf, &bcss, cursor, &cursor->root_put_counter);
int r = toku_brt_search(cursor->brt, &search, brt_cursor_current_getf, &bcss, cursor, FALSE);
brt_search_finish(&search);
return r;
}
brt_cursor_invalidate(cursor);
return getf(cursor->key.size, cursor->key.data, cursor->val.size, cursor->val.data, getf_v); // brt_cursor_copyout(cursor, outkey, outval);
}
......@@ -5988,7 +5900,7 @@ toku_brt_flatten(BRT brt, TOKUTXN ttxn)
int r = toku_brt_cursor(brt, &tmp_cursor, ttxn, FALSE);
if (r!=0) return r;
brt_search_t search; brt_search_init(&search, brt_cursor_compare_one, BRT_SEARCH_LEFT, 0, tmp_cursor->brt);
r = brt_cursor_search(tmp_cursor, &search, brt_flatten_getf, NULL);
r = brt_cursor_search(tmp_cursor, &search, brt_flatten_getf, NULL, FALSE);
brt_search_finish(&search);
if (r==DB_NOTFOUND) r = 0;
{
......@@ -6003,7 +5915,7 @@ int
toku_brt_cursor_first(BRT_CURSOR cursor, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v)
{
brt_search_t search; brt_search_init(&search, brt_cursor_compare_one, BRT_SEARCH_LEFT, 0, cursor->brt);
int r = brt_cursor_search(cursor, &search, getf, getf_v);
int r = brt_cursor_search(cursor, &search, getf, getf_v, FALSE);
brt_search_finish(&search);
return r;
}
......@@ -6012,8 +5924,8 @@ int
toku_brt_cursor_last(BRT_CURSOR cursor, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v)
{
brt_search_t search; brt_search_init(&search, brt_cursor_compare_one, BRT_SEARCH_RIGHT, 0, cursor->brt);
int r = brt_cursor_search(cursor, &search, getf, getf_v);
brt_search_finish(&search);;
int r = brt_cursor_search(cursor, &search, getf, getf_v, FALSE);
brt_search_finish(&search);
return r;
}
......@@ -6022,113 +5934,66 @@ static int brt_cursor_compare_next(brt_search_t *search, DBT *x) {
return compare_k_x(brt, search->k, x) < 0; /* return min xy: kv < xy */
}
static int
brt_cursor_shortcut (BRT_CURSOR cursor, int direction, u_int32_t limit, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v) {
int r;
OMTCURSOR omtcursor = cursor->omtcursor;
OMT omt = toku_omt_cursor_get_omt(omtcursor);
u_int64_t h_put_counter = cursor->brt->h->root_put_counter;
u_int64_t c_put_counter = cursor->root_put_counter;
BOOL found = FALSE;
//Verify that no messages have been inserted
//since the last time the cursor's pointer was set.
//Also verify the omt cursor is still valid.
//(Necessary to recheck after the maybe_get_and_pin)
if (c_put_counter==h_put_counter && toku_omt_cursor_is_valid(cursor->omtcursor)) {
u_int32_t index = 0;
r = toku_omt_cursor_current_index(omtcursor, &index);
assert_zero(r);
brt_cursor_shortcut (
BRT_CURSOR cursor,
int direction,
BRT_GET_CALLBACK_FUNCTION getf,
void *getf_v,
u_int32_t *keylen,
void **key,
u_int32_t *vallen,
void **val
)
{
int r = 0;
u_int32_t index = cursor->leaf_info.to_be.index;
OMT omt = cursor->leaf_info.to_be.omt;
// if we are searching towards the end, limit is last element
// if we are searching towards the beginning, limit is the first element
u_int32_t limit = (direction > 0) ? (toku_omt_size(omt) - 1) : 0;
//Starting with the prev, find the first real (non-provdel) leafentry.
while (index != limit) {
OMTVALUE le = NULL;
index += direction;
r = toku_omt_fetch(omt, index, &le, NULL);
r = toku_omt_fetch(omt, index, &le);
assert_zero(r);
if (toku_brt_cursor_is_leaf_mode(cursor) || !is_le_val_del(le, cursor)) {
u_int32_t keylen;
void *key;
u_int32_t vallen;
void *val;
r = brt_cursor_extract_key_and_val(le,
brt_cursor_extract_key_and_val(
le,
cursor,
&keylen,
&key,
&vallen,
&val);
keylen,
key,
vallen,
val
);
if (r==0) {
r = getf(keylen, key, vallen, val, getf_v);
}
if (r==0) {
r = getf(*keylen, *key, *vallen, *val, getf_v);
if (r==0 || r == TOKUDB_CURSOR_CONTINUE) {
//Update cursor.
cursor->leaf_info.to_be.index = index;
brt_cursor_update(cursor);
found = TRUE;
}
break;
}
}
if (r==0 && !found) r = DB_NOTFOUND;
if (r== TOKUDB_CURSOR_CONTINUE) {
continue;
}
else r = EINVAL;
return r;
}
static int
brt_cursor_maybe_get_and_pin_leaf(BRT_CURSOR brtcursor, BRTNODE* leafp) {
void *leafv;
int r = toku_cachetable_maybe_get_and_pin_clean(brtcursor->brt->cf,
brtcursor->leaf_info.blocknumber,
brtcursor->leaf_info.fullhash,
&leafv);
if (r == 0) {
*leafp = leafv;
else {
break;
}
return r;
}
static void
brt_cursor_unpin_leaf(BRT_CURSOR brtcursor, BRTNODE leaf) {
toku_unpin_brtnode(brtcursor->brt, leaf);
}
static int
brt_cursor_next_shortcut (BRT_CURSOR cursor, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v)
// Effect: If possible, increment the cursor and return the key-value pair
// (i.e., the next one from what the cursor pointed to before.)
// That is, do DB_NEXT on DUP databases, and do DB_NEXT_NODUP on NODUP databases.
{
int r;
if (toku_omt_cursor_is_valid(cursor->omtcursor)) {
BRTNODE leaf;
r = brt_cursor_maybe_get_and_pin_leaf(cursor, &leaf);
if (r == 0) {
u_int32_t limit = toku_omt_size(toku_omt_cursor_get_omt(cursor->omtcursor)) - 1;
r = brt_cursor_shortcut(cursor, 1, limit, getf, getf_v);
brt_cursor_unpin_leaf(cursor, leaf);
}
}
else r = EINVAL;
return r;
}
int
toku_brt_cursor_next(BRT_CURSOR cursor, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v)
{
int r;
if (brt_cursor_next_shortcut(cursor, getf, getf_v)==0) {
r = 0;
} else {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_next, BRT_SEARCH_LEFT, &cursor->key, cursor->brt);
r = brt_cursor_search(cursor, &search, getf, getf_v);
int r = brt_cursor_search(cursor, &search, getf, getf_v, TRUE);
brt_search_finish(&search);
}
if (r == 0) brt_cursor_set_prefetching(cursor);
return r;
}
......@@ -6158,33 +6023,11 @@ brt_cursor_search_eq_k_x_getf(ITEMLEN keylen, bytevec key,
static int
brt_cursor_search_eq_k_x(BRT_CURSOR cursor, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v)
{
brt_cursor_invalidate(cursor);
struct brt_cursor_search_struct bcss = {getf, getf_v, cursor, search};
int r = toku_brt_search(cursor->brt, search, brt_cursor_search_eq_k_x_getf, &bcss, cursor, &cursor->root_put_counter);
return r;
}
static int
brt_cursor_prev_shortcut (BRT_CURSOR cursor, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v)
// Effect: If possible, decrement the cursor and return the key-value pair
// (i.e., the previous one from what the cursor pointed to before.)
// That is, do DB_PREV on DUP databases, and do DB_PREV_NODUP on NODUP databases.
{
int r;
if (toku_omt_cursor_is_valid(cursor->omtcursor)) {
BRTNODE leaf;
r = brt_cursor_maybe_get_and_pin_leaf(cursor, &leaf);
if (r == 0) {
r = brt_cursor_shortcut(cursor, -1, 0, getf, getf_v);
brt_cursor_unpin_leaf(cursor, leaf);
}
}
else r = EINVAL;
int r = toku_brt_search(cursor->brt, search, brt_cursor_search_eq_k_x_getf, &bcss, cursor, FALSE);
return r;
}
static int brt_cursor_compare_prev(brt_search_t *search, DBT *x) {
BRT brt = search->context;
return compare_k_x(brt, search->k, x) > 0; /* return max xy: kv > xy */
......@@ -6193,10 +6036,8 @@ static int brt_cursor_compare_prev(brt_search_t *search, DBT *x) {
int
toku_brt_cursor_prev(BRT_CURSOR cursor, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v)
{
if (brt_cursor_prev_shortcut(cursor, getf, getf_v)==0)
return 0;
brt_search_t search; brt_search_init(&search, brt_cursor_compare_prev, BRT_SEARCH_RIGHT, &cursor->key, cursor->brt);
int r = brt_cursor_search(cursor, &search, getf, getf_v);
int r = brt_cursor_search(cursor, &search, getf, getf_v, TRUE);
brt_search_finish(&search);
return r;
}
......@@ -6219,7 +6060,7 @@ int
toku_brt_cursor_set_range(BRT_CURSOR cursor, DBT *key, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v)
{
brt_search_t search; brt_search_init(&search, brt_cursor_compare_set_range, BRT_SEARCH_LEFT, key, cursor->brt);
int r = brt_cursor_search(cursor, &search, getf, getf_v);
int r = brt_cursor_search(cursor, &search, getf, getf_v, FALSE);
brt_search_finish(&search);
return r;
}
......@@ -6233,7 +6074,7 @@ int
toku_brt_cursor_set_range_reverse(BRT_CURSOR cursor, DBT *key, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v)
{
brt_search_t search; brt_search_init(&search, brt_cursor_compare_set_range_reverse, BRT_SEARCH_RIGHT, key, cursor->brt);
int r = brt_cursor_search(cursor, &search, getf, getf_v);
int r = brt_cursor_search(cursor, &search, getf, getf_v, FALSE);
brt_search_finish(&search);
return r;
}
......@@ -6284,7 +6125,6 @@ toku_brt_cursor_peek(BRT_CURSOR cursor, const DBT **pkey, const DBT **pval)
// Requires: The caller must be in the context of a
// BRT_GET_(STRADDLE_)CALLBACK_FUNCTION
{
if (cursor->current_in_omt) load_dbts_from_omt(cursor, &cursor->key, &cursor->val);
*pkey = &cursor->key;
*pval = &cursor->val;
}
......@@ -6355,12 +6195,6 @@ toku_brt_cursor_delete(BRT_CURSOR cursor, int flags, TOKUTXN txn) {
r = toku_brt_cursor_current(cursor, DB_CURRENT, getf_nothing, NULL);
}
if (r == 0) {
//We need to have access to the (key,val) that the cursor points to.
//By invalidating the cursor we guarantee we have a local copy.
//
//If we try to use the omtcursor, there exists a race condition
//(node could be evicted), but maybe_get_and_pin() prevents delete.
brt_cursor_invalidate(cursor);
r = toku_brt_delete(cursor->brt, &cursor->key, txn);
}
}
......@@ -6417,7 +6251,7 @@ static void toku_brt_keyrange_internal (BRT brt, CACHEKEY nodename,
if (BP_STATE(node,i) == PT_AVAIL) {
struct cmd_leafval_heaviside_extra be = {brt, key};
u_int32_t idx;
int r = toku_omt_find_zero(BLB_BUFFER(node, i), toku_cmd_leafval_heaviside, &be, 0, &idx, NULL);
int r = toku_omt_find_zero(BLB_BUFFER(node, i), toku_cmd_leafval_heaviside, &be, 0, &idx);
*less += idx;
*greater += toku_omt_size(BLB_BUFFER(node, i))-idx;
if (r==0) {
......@@ -6543,7 +6377,7 @@ toku_dump_brtnode (FILE *file, BRT brt, BLOCKNUM blocknum, int depth, struct kv_
if (0)
for (int j=0; j<size; j++) {
OMTVALUE v = 0;
r = toku_omt_fetch(BLB_BUFFER(node, i), j, &v, 0);
r = toku_omt_fetch(BLB_BUFFER(node, i), j, &v);
assert_zero(r);
fprintf(file, " [%d]=", j);
print_leafentry(file, v);
......
......@@ -410,7 +410,7 @@ is_filenum_reserved(CACHETABLE ct, FILENUM filenum) {
int r;
BOOL rval;
r = toku_omt_find_zero(ct->reserved_filenums, find_by_filenum, &filenum, &v, NULL, NULL);
r = toku_omt_find_zero(ct->reserved_filenums, find_by_filenum, &filenum, &v, NULL);
if (r==0) {
FILENUM* found = v;
assert(found->fileid == filenum.fileid);
......@@ -429,7 +429,7 @@ reserve_filenum(CACHETABLE ct, FILENUM filenum) {
assert(filenum.fileid != FILENUM_NONE.fileid);
uint32_t index;
r = toku_omt_find_zero(ct->reserved_filenums, find_by_filenum, &filenum, NULL, &index, NULL);
r = toku_omt_find_zero(ct->reserved_filenums, find_by_filenum, &filenum, NULL, &index);
assert(r==DB_NOTFOUND);
FILENUM *XMALLOC(entry);
*entry = filenum;
......@@ -443,7 +443,7 @@ unreserve_filenum(CACHETABLE ct, FILENUM filenum) {
int r;
uint32_t index;
r = toku_omt_find_zero(ct->reserved_filenums, find_by_filenum, &filenum, &v, &index, NULL);
r = toku_omt_find_zero(ct->reserved_filenums, find_by_filenum, &filenum, &v, &index);
assert(r==0);
FILENUM* found = v;
assert(found->fileid == filenum.fileid);
......
......@@ -1192,7 +1192,7 @@ int toku_txnid2txn (TOKULOGGER logger, TXNID txnid, TOKUTXN *result) {
OMTVALUE txnfound;
int rval;
int r = toku_omt_find_zero(logger->live_txns, find_by_xid, &txnid, &txnfound, NULL, NULL);
int r = toku_omt_find_zero(logger->live_txns, find_by_xid, &txnid, &txnfound, NULL);
if (r==0) {
TOKUTXN txn = txnfound;
assert(txn->txnid64==txnid);
......
......@@ -48,15 +48,6 @@ struct omt {
struct omt_array a;
struct omt_tree t;
} i;
OMTCURSOR associated; // the OMTs associated with this.
};
struct omt_cursor {
OMT omt; // The omt this cursor is associated with. NULL if not present.
void (*invalidate)(OMTCURSOR, void*);
void *invalidate_extra;
u_int32_t index; // This is the state for the initial implementation
OMTCURSOR next,prev; // circular linked list of all OMTCURSORs associated with omt.
};
static inline int
......@@ -66,7 +57,6 @@ omt_create_no_array(OMT *omtp) {
result->is_array = TRUE;
result->i.a.num_values = 0;
result->i.a.start_idx = 0;
result->associated = NULL;
*omtp = result;
return 0;
}
......@@ -99,82 +89,6 @@ toku_omt_create_steal_sorted_array(OMT *omtp, OMTVALUE **valuesp, u_int32_t numv
return 0;
}
int toku_omt_cursor_create (OMTCURSOR *omtcp) {
OMTCURSOR MALLOC(c);
if (c==NULL) return errno;
c->omt = NULL;
c->next = c->prev = NULL;
c->invalidate = NULL;
c->invalidate_extra = NULL;
*omtcp = c;
return 0;
}
OMT toku_omt_cursor_get_omt(OMTCURSOR c) {
return c->omt;
}
void toku_omt_cursor_set_invalidate_callback(OMTCURSOR c, void (*f)(OMTCURSOR,void*), void* extra) {
c->invalidate = f;
c->invalidate_extra = extra;
}
void toku_omt_cursor_invalidate (OMTCURSOR c) {
//If already invalid, do nothing.
if (c==NULL || c->omt==NULL) return;
if (c->invalidate) c->invalidate(c, c->invalidate_extra);
if (c->next == c) {
// It's the last one.
c->omt->associated = NULL;
} else {
OMTCURSOR next = c->next;
OMTCURSOR prev = c->prev;
if (c->omt->associated == c) {
c->omt->associated = next;
}
next->prev = prev;
prev->next = next;
}
c->next = c->prev = NULL;
c->omt = NULL;
}
void toku_omt_cursor_destroy (OMTCURSOR *p) {
toku_omt_cursor_invalidate(*p);
toku_free(*p);
*p = NULL;
}
static void invalidate_cursors (OMT omt) {
OMTCURSOR assoced;
while ((assoced = omt->associated)) {
toku_omt_cursor_invalidate(assoced);
}
}
static void associate (OMT omt, OMTCURSOR c)
{
if (c->omt==omt) return;
toku_omt_cursor_invalidate(c);
if (omt->associated==NULL) {
c->prev = c;
c->next = c;
omt->associated = c;
} else {
c->prev = omt->associated->prev;
c->next = omt->associated;
omt->associated->prev->next = c;
omt->associated->prev = c;
}
c->omt = omt;
}
void
toku_omt_cursor_associate(OMT omt, OMTCURSOR c) {
assert(c->omt==NULL||c->omt==omt);
associate(omt, c);
}
static inline u_int32_t nweight(OMT omt, node_idx idx) {
if (idx==NODE_NULL) return 0;
else return (omt->i.t.nodes+idx)->weight;
......@@ -639,50 +553,6 @@ static inline int find_internal_plus(OMT omt, node_idx n_idx, int (*h)(OMTVALUE,
}
}
int toku_omt_cursor_is_valid (OMTCURSOR c) {
return c->omt!=NULL;
}
void toku_omt_cursor_set_index(OMTCURSOR c, u_int32_t index) {
assert(c->omt);
c->index = index;
}
int toku_omt_cursor_next (OMTCURSOR c, OMTVALUE *v) {
if (c->omt == NULL) return EINVAL;
int r = toku_omt_fetch(c->omt, c->index+1, v, NULL);
if (r==0) c->index++;
else toku_omt_cursor_invalidate(c);
return r;
}
int toku_omt_cursor_prev (OMTCURSOR c, OMTVALUE *v) {
if (c->omt == NULL) return EINVAL;
if (c->index==0) {
toku_omt_cursor_invalidate(c);
return EINVAL;
}
c->index--;
int r = toku_omt_fetch(c->omt, c->index, v, NULL);
assert(r==0);
return r;
}
int toku_omt_cursor_current (OMTCURSOR c, OMTVALUE *v) {
if (c->omt == NULL) return EINVAL;
int r = toku_omt_fetch(c->omt, c->index, v, NULL);
assert(r==0);
return r;
}
int toku_omt_cursor_current_index(OMTCURSOR c, u_int32_t *index) {
if (c->omt == NULL) return EINVAL;
*index = c->index;
return 0;
}
//TODO: Put all omt API functions here.
int toku_omt_create (OMT *omtp) {
return omt_create_internal(omtp, 2);
......@@ -690,7 +560,6 @@ int toku_omt_create (OMT *omtp) {
void toku_omt_destroy(OMT *omtp) {
OMT omt=*omtp;
invalidate_cursors(omt);
if (omt->is_array) toku_free(omt->i.a.values);
else toku_free(omt->i.t.nodes);
toku_free(omt);
......@@ -713,7 +582,6 @@ int toku_omt_create_from_sorted_array(OMT *omtp, OMTVALUE *values, u_int32_t num
int toku_omt_insert_at(OMT omt, OMTVALUE value, u_int32_t index) {
int r;
invalidate_cursors(omt);
if (index>omt_size(omt)) return EINVAL;
if ((r=maybe_resize_or_convert(omt, 1+omt_size(omt)))) return r;
if (omt->is_array && index!=omt->i.a.num_values &&
......@@ -739,7 +607,6 @@ int toku_omt_insert_at(OMT omt, OMTVALUE value, u_int32_t index) {
int toku_omt_set_at (OMT omt, OMTVALUE value, u_int32_t index) {
if (index>=omt_size(omt)) return EINVAL;
invalidate_cursors(omt);
if (omt->is_array) {
set_at_internal_array(omt, value, index);
}
......@@ -752,7 +619,6 @@ int toku_omt_set_at (OMT omt, OMTVALUE value, u_int32_t index) {
int toku_omt_delete_at(OMT omt, u_int32_t index) {
OMTVALUE v;
int r;
invalidate_cursors(omt);
if (index>=omt_size(omt)) return EINVAL;
if ((r=maybe_resize_or_convert(omt, -1+omt_size(omt)))) return r;
if (omt->is_array && index!=0 && index!=omt->i.a.num_values-1) {
......@@ -772,7 +638,7 @@ int toku_omt_delete_at(OMT omt, u_int32_t index) {
return 0;
}
int toku_omt_fetch(OMT V, u_int32_t i, OMTVALUE *v, OMTCURSOR c) {
int toku_omt_fetch(OMT V, u_int32_t i, OMTVALUE *v) {
if (i>=omt_size(V)) return EINVAL;
if (V->is_array) {
fetch_internal_array(V, i, v);
......@@ -780,10 +646,6 @@ int toku_omt_fetch(OMT V, u_int32_t i, OMTVALUE *v, OMTCURSOR c) {
else {
fetch_internal(V, V->i.t.root, i, v);
}
if (c) {
associate(V,c);
c->index = i;
}
return 0;
}
......@@ -796,7 +658,6 @@ free_item (OMTVALUE lev, u_int32_t UU(idx), void *vsi) {
void toku_omt_free_items(OMT omt) {
invalidate_cursors(omt);
int r = toku_omt_iterate(omt, free_item, NULL);
lazy_assert_zero(r);
}
......@@ -820,9 +681,7 @@ int toku_omt_insert(OMT omt, OMTVALUE value, int(*h)(OMTVALUE, void*v), void *v,
int r;
u_int32_t idx;
invalidate_cursors(omt);
r = toku_omt_find_zero(omt, h, v, NULL, &idx, NULL);
r = toku_omt_find_zero(omt, h, v, NULL, &idx);
if (r==0) {
if (index) *index = idx;
return DB_KEYEXIST;
......@@ -835,7 +694,7 @@ int toku_omt_insert(OMT omt, OMTVALUE value, int(*h)(OMTVALUE, void*v), void *v,
return 0;
}
int toku_omt_find_zero(OMT V, int (*h)(OMTVALUE, void*extra), void*extra, OMTVALUE *value, u_int32_t *index, OMTCURSOR c) {
int toku_omt_find_zero(OMT V, int (*h)(OMTVALUE, void*extra), void*extra, OMTVALUE *value, u_int32_t *index) {
u_int32_t tmp_index;
if (index==NULL) index=&tmp_index;
int r;
......@@ -845,16 +704,10 @@ int toku_omt_find_zero(OMT V, int (*h)(OMTVALUE, void*extra), void*extra, OMTVAL
else {
r = find_internal_zero(V, V->i.t.root, h, extra, value, index);
}
if (c && r==0) {
associate(V,c);
c->index = *index;
} else {
toku_omt_cursor_invalidate(c);
}
return r;
}
int toku_omt_find(OMT V, int (*h)(OMTVALUE, void*extra), void*extra, int direction, OMTVALUE *value, u_int32_t *index, OMTCURSOR c) {
int toku_omt_find(OMT V, int (*h)(OMTVALUE, void*extra), void*extra, int direction, OMTVALUE *value, u_int32_t *index) {
u_int32_t tmp_index;
int r;
if (index==NULL) index=&tmp_index;
......@@ -875,19 +728,12 @@ int toku_omt_find(OMT V, int (*h)(OMTVALUE, void*extra), void*extra, int directi
r = find_internal_plus( V, V->i.t.root, h, extra, value, index);
}
}
if (c && r==0) {
associate(V,c);
c->index=*index;
} else {
toku_omt_cursor_invalidate(c);
}
return r;
}
int toku_omt_split_at(OMT omt, OMT *newomtp, u_int32_t index) {
int r;
OMT newomt;
invalidate_cursors(omt);
if (index>omt_size(omt)) return EINVAL;
if ((r=omt_convert_to_array(omt))) return r;
......@@ -909,8 +755,6 @@ int toku_omt_split_at(OMT omt, OMT *newomtp, u_int32_t index) {
int toku_omt_merge(OMT leftomt, OMT rightomt, OMT *newomtp) {
int r;
OMT newomt = 0;
invalidate_cursors(leftomt);
invalidate_cursors(rightomt);
u_int32_t newsize = omt_size(leftomt)+omt_size(rightomt);
if ((r = omt_create_internal(&newomt, newsize))) return r;
......@@ -938,7 +782,6 @@ int toku_omt_merge(OMT leftomt, OMT rightomt, OMT *newomtp) {
}
void toku_omt_clear(OMT omt) {
invalidate_cursors(omt);
if (omt->is_array) {
omt->i.a.start_idx = 0;
omt->i.a.num_values = 0;
......
......@@ -153,7 +153,6 @@ extern "C" {
//typedef struct value *OMTVALUE; // A slight improvement over using void*.
typedef void *OMTVALUE;
typedef struct omt *OMT;
typedef struct omt_cursor *OMTCURSOR;
int toku_omt_create (OMT *omtp);
......@@ -302,13 +301,7 @@ int toku_omt_delete_at(OMT omt, u_int32_t idx);
// Rationale: To delete an item, first find its index using toku_omt_find, then delete it.
// Performance: time=O(\log N) amortized.
void toku_omt_cursor_set_index(OMTCURSOR c, u_int32_t idx);
// Effect:
// Set the abstract index.
// Requires:
// The cursor is not invalid.
int toku_omt_fetch (OMT V, u_int32_t i, OMTVALUE *v, OMTCURSOR c);
int toku_omt_fetch (OMT V, u_int32_t i, OMTVALUE *v);
// Effect: Set *v=V_i
// If c!=NULL then set c's abstract offset to i.
// Requires: v != NULL
......@@ -323,14 +316,14 @@ int toku_omt_fetch (OMT V, u_int32_t i, OMTVALUE *v, OMTCURSOR c);
// function, the function must remove c's association with the old
// OMT, and associate it with the new OMT.
int toku_omt_find_zero(OMT V, int (*h)(OMTVALUE, void*extra), void*extra, OMTVALUE *value, u_int32_t *idx, OMTCURSOR c);
int toku_omt_find_zero(OMT V, int (*h)(OMTVALUE, void*extra), void*extra, OMTVALUE *value, u_int32_t *idx);
// Effect: Find the smallest i such that h(V_i, extra)>=0
// If there is such an i and h(V_i,extra)==0 then set *index=i and return 0.
// If there is such an i and h(V_i,extra)>0 then set *index=i and return DB_NOTFOUND.
// If there is no such i then set *index=toku_omt_size(V) and return DB_NOTFOUND.
// Requires: index!=NULL
int toku_omt_find(OMT V, int (*h)(OMTVALUE, void*extra), void*extra, int direction, OMTVALUE *value, u_int32_t *idx, OMTCURSOR c);
int toku_omt_find(OMT V, int (*h)(OMTVALUE, void*extra), void*extra, int direction, OMTVALUE *value, u_int32_t *idx);
// Effect:
// If direction >0 then find the smallest i such that h(V_i,extra)>0.
// If direction <0 then find the largest i such that h(V_i,extra)<0.
......@@ -422,98 +415,6 @@ void toku_omt_clear(OMT omt);
unsigned long toku_omt_memory_size (OMT omt);
// Effect: Return the size (in bytes) of the omt, as it resides in main memory. Don't include any of the OMTVALUES.
int toku_omt_cursor_create (OMTCURSOR *p);
// Effect: Create an OMTCURSOR. Stores it in *p. The OMTCURSOR is
// initially invalid.
// Requires: p != NULL
// Returns:
// 0 success
// ENOMEM out of memory (and doesn't modify *omtp)
// Performance: constant time.
void toku_omt_cursor_destroy (OMTCURSOR *p);
// Effect: Invalidates *p (if it is valid) and frees any memory
// associated with *p.
// Also sets *p=NULL.
// Rationale: The usage is to do something like
// toku_omt_cursor_destroy(&c);
// and now c will have a NULL pointer instead of a dangling freed pointer.
// Rationale: Returns no values since free() cannot fail.
int toku_omt_cursor_is_valid (OMTCURSOR c);
// Effect: returns 0 iff c is invalid.
// Performance: time=O(1)
int toku_omt_cursor_next (OMTCURSOR c, OMTVALUE *v);
// Effect: Increment c's offset, and find and store the value in v.
// Requires: v != NULL
// Returns
// 0 success
// EINVAL if the offset goes out of range or c is invalid.
// On nonzero return, *v is unchanged and c is invalidated.
// Performance: time=O(log N) worst case, expected time=O(1) for a randomly
// chosen initial position.
int toku_omt_cursor_current_index(OMTCURSOR c, u_int32_t *idx);
// Effect: Stores c's offset in *index.
// Requires: index != NULL
// Returns
// 0 success
// EINVAL c is invalid
// On nonzero return, *index is unchanged and c is unchanged.
// Performance: time=O(1)
OMT toku_omt_cursor_get_omt(OMTCURSOR c);
// Effect: returns the associated omt or NULL if not associated.
// Performance: time=O(1)
int toku_omt_cursor_current (OMTCURSOR c, OMTVALUE *v);
// Effect: Store in v the value pointed by c's abstract offset
// Requires: v != NULL
// Returns
// 0 success
// EINVAL if c is invalid
// On non-zero return, *v is unchanged
// Performance: O(1) time
int toku_omt_cursor_prev (OMTCURSOR c, OMTVALUE *v);
// Effect: Decrement c's offset, and find and store the value in v.
// Requires: v != NULL
// Returns
// 0 success
// EINVAL if the offset goes out of range or c is invalid.
// On nonzero return, *v is unchanged and c is invalidated.
// Performance: time=O(log N) worst case, expected time=O(1) for a randomly
// chosen initial position.
void toku_omt_cursor_invalidate (OMTCURSOR c);
// Effect: Invalidate c. (This does not mean that c is destroyed or
// that its memory is freed.)
// If c is valid, the invalidate callback function (if any) will be called
// before invalidating c.
void toku_omt_cursor_set_invalidate_callback(OMTCURSOR c, void (*f)(OMTCURSOR,void*), void* extra);
// Effect:
// Saves function 'f' to be called whenever the cursor is invalidated.
// 'extra' is passed as an additional parameter to f.
// Requires:
// The lifetime of the 'extra' parameter must continue at least till the cursor
// is destroyed.
void toku_omt_cursor_associate(OMT omt, OMTCURSOR c);
// Effect:
// Associates an omtcursor with an omt.
// Requires:
// The omtcursor is not associated with any other omt.
// Requires:
// toku_omt_associate must be called when the omt-lock is held
// Rationale:
// This is used by brt_cursors for omts representing leaf nodes.
// These omts are touched by multiple threads, and therefore require locks
// for modifying the list of omtcursors.
// We do not want to grab two locks (one for omt, and one for the old
// associated omt).
#if defined(__cplusplus) || defined(__cilkplusplus)
};
......
......@@ -99,7 +99,7 @@ static void file_map_close_dictionaries(struct file_map *fmap, BOOL recovery_suc
if (n == 0)
break;
OMTVALUE v;
r = toku_omt_fetch(fmap->filenums, n-1, &v, NULL);
r = toku_omt_fetch(fmap->filenums, n-1, &v);
assert(r == 0);
r = toku_omt_delete_at(fmap->filenums, n-1);
assert(r == 0);
......@@ -150,7 +150,7 @@ static int file_map_insert (struct file_map *fmap, FILENUM fnum, BRT brt, char *
static void file_map_remove(struct file_map *fmap, FILENUM fnum) {
OMTVALUE v; u_int32_t idx;
int r = toku_omt_find_zero(fmap->filenums, file_map_h, &fnum, &v, &idx, NULL);
int r = toku_omt_find_zero(fmap->filenums, file_map_h, &fnum, &v, &idx);
if (r == 0) {
struct file_map_tuple *tuple = v;
r = toku_omt_delete_at(fmap->filenums, idx);
......@@ -162,7 +162,7 @@ static void file_map_remove(struct file_map *fmap, FILENUM fnum) {
// Look up file info: given FILENUM, return file_map_tuple (or DB_NOTFOUND)
static int file_map_find(struct file_map *fmap, FILENUM fnum, struct file_map_tuple **file_map_tuple) {
OMTVALUE v; u_int32_t idx;
int r = toku_omt_find_zero(fmap->filenums, file_map_h, &fnum, &v, &idx, NULL);
int r = toku_omt_find_zero(fmap->filenums, file_map_h, &fnum, &v, &idx);
if (r == 0) {
struct file_map_tuple *tuple = v;
assert(tuple->filenum.fileid == fnum.fileid);
......@@ -1142,7 +1142,7 @@ static void recover_abort_live_txns(RECOVER_ENV renv) {
if (n_live_txns == 0)
break;
OMTVALUE v;
r = toku_omt_fetch(renv->logger->live_txns, n_live_txns-1, &v, NULL);
r = toku_omt_fetch(renv->logger->live_txns, n_live_txns-1, &v);
if (r != 0)
break;
TOKUTXN txn = (TOKUTXN) v;
......
......@@ -152,7 +152,7 @@ static int do_insertion (enum brt_msg_type type, FILENUM filenum, BYTESTRING key
(void)toku_cachefile_get_and_pin_fd(cf);
if (!toku_cachefile_is_dev_null_unlocked(cf)) {
OMTVALUE brtv=NULL;
r = toku_omt_find_zero(txn->open_brts, find_brt_from_filenum, &filenum, &brtv, NULL, NULL);
r = toku_omt_find_zero(txn->open_brts, find_brt_from_filenum, &filenum, &brtv, NULL);
assert(r==0);
BRT brt = brtv;
......@@ -561,7 +561,7 @@ toku_rollback_change_fdescriptor(FILENUM filenum,
fd = toku_cachefile_get_and_pin_fd(cf);
if (!toku_cachefile_is_dev_null_unlocked(cf)) {
OMTVALUE brtv=NULL;
r = toku_omt_find_zero(txn->open_brts, find_brt_from_filenum, &filenum, &brtv, NULL, NULL);
r = toku_omt_find_zero(txn->open_brts, find_brt_from_filenum, &filenum, &brtv, NULL);
assert(r==0);
BRT brt = brtv;
DESCRIPTOR_S d;
......
......@@ -176,7 +176,7 @@ live_list_reverse_note_txn_end_iter(OMTVALUE live_xidv, u_int32_t UU(index), voi
int r;
OMT reverse = txn->logger->live_list_reverse;
r = toku_omt_find_zero(reverse, toku_find_pair_by_xid, live_xid, &pairv, &idx, NULL);
r = toku_omt_find_zero(reverse, toku_find_pair_by_xid, live_xid, &pairv, &idx);
invariant(r==0);
pair = pairv;
invariant(pair->xid1 == *live_xid); //sanity check
......@@ -188,7 +188,7 @@ live_list_reverse_note_txn_end_iter(OMTVALUE live_xidv, u_int32_t UU(index), voi
OMT snapshot = txn->logger->snapshot_txnids;
BOOL should_delete = TRUE;
// find the youngest txn in snapshot that is older than xid
r = toku_omt_find(snapshot, toku_find_xid_by_xid, &xid, -1, &olderv, &olderidx, NULL);
r = toku_omt_find(snapshot, toku_find_xid_by_xid, &xid, -1, &olderv, &olderidx);
if (r==0) {
//There is an older txn
olderxid = olderv;
......@@ -237,7 +237,7 @@ void toku_rollback_txn_close (TOKUTXN txn) {
//Remove txn from list (omt) of live transactions
OMTVALUE txnagain;
u_int32_t idx;
r = toku_omt_find_zero(txn->logger->live_txns, find_xid, txn, &txnagain, &idx, NULL);
r = toku_omt_find_zero(txn->logger->live_txns, find_xid, txn, &txnagain, &idx);
assert(r==0);
assert(txn==txnagain);
r = toku_omt_delete_at(txn->logger->live_txns, idx);
......@@ -248,7 +248,7 @@ void toku_rollback_txn_close (TOKUTXN txn) {
OMTVALUE txnagain;
u_int32_t idx;
//Remove txn from list of live root txns
r = toku_omt_find_zero(txn->logger->live_root_txns, find_xid, txn, &txnagain, &idx, NULL);
r = toku_omt_find_zero(txn->logger->live_root_txns, find_xid, txn, &txnagain, &idx);
assert(r==0);
assert(txn==txnagain);
r = toku_omt_delete_at(txn->logger->live_root_txns, idx);
......@@ -266,7 +266,7 @@ void toku_rollback_txn_close (TOKUTXN txn) {
u_int32_t idx;
OMTVALUE v;
//Free memory used for snapshot_txnids
r = toku_omt_find_zero(txn->logger->snapshot_txnids, toku_find_xid_by_xid, &txn->txnid64, &v, &idx, NULL);
r = toku_omt_find_zero(txn->logger->snapshot_txnids, toku_find_xid_by_xid, &txn->txnid64, &v, &idx);
invariant(r==0);
TXNID *xid = v;
invariant(*xid == txn->txnid64);
......@@ -280,7 +280,7 @@ void toku_rollback_txn_close (TOKUTXN txn) {
invariant(toku_omt_size(txn->live_root_txn_list) > 0);
OMTVALUE v;
//store a single array of txnids
r = toku_omt_fetch(txn->live_root_txn_list, 0, &v, NULL);
r = toku_omt_fetch(txn->live_root_txn_list, 0, &v);
invariant(r==0);
toku_free(v);
toku_omt_destroy(&txn->live_root_txn_list);
......@@ -293,7 +293,7 @@ void toku_rollback_txn_close (TOKUTXN txn) {
TOKULOGGER logger = txn->logger;
OMTVALUE oldest_txnv;
r = toku_omt_fetch(logger->live_txns, 0, &oldest_txnv, NULL);
r = toku_omt_fetch(logger->live_txns, 0, &oldest_txnv);
if (r==0) {
TOKUTXN oldest_txn = oldest_txnv;
assert(oldest_txn != txn); // We just removed it
......@@ -638,7 +638,7 @@ int toku_txn_note_brt (TOKUTXN txn, BRT brt) {
OMTVALUE txnv;
u_int32_t index;
// Does brt already know about transaction txn?
int r = toku_omt_find_zero(brt->txns, find_xid, txn, &txnv, &index, NULL);
int r = toku_omt_find_zero(brt->txns, find_xid, txn, &txnv, &index);
if (r==0) {
// It's already there.
assert((TOKUTXN)txnv==txn);
......@@ -669,7 +669,7 @@ static int swap_brt (OMTVALUE txnv, u_int32_t UU(idx), void *extra) {
int r;
r = toku_txn_note_brt(txn, info->live); //Add new brt.
assert(r==0);
r = toku_omt_find_zero(txn->open_brts, find_filenum, info->zombie, &zombie_again, &index, NULL);
r = toku_omt_find_zero(txn->open_brts, find_filenum, info->zombie, &zombie_again, &index);
assert(r==0);
assert((void*)zombie_again==info->zombie);
r = toku_omt_delete_at(txn->open_brts, index); //Delete old brt.
......@@ -703,7 +703,7 @@ static int remove_brt (OMTVALUE txnv, u_int32_t UU(idx), void *brtv) {
BRT brt = brtv;
OMTVALUE brtv_again=NULL;
u_int32_t index;
int r = toku_omt_find_zero(txn->open_brts, find_filenum, brt, &brtv_again, &index, NULL);
int r = toku_omt_find_zero(txn->open_brts, find_filenum, brt, &brtv_again, &index);
assert(r==0);
assert((void*)brtv_again==brtv);
r = toku_omt_delete_at(txn->open_brts, index);
......@@ -723,7 +723,7 @@ static int remove_txn (OMTVALUE brtv, u_int32_t UU(idx), void *txnv) {
TOKUTXN txn = txnv;
OMTVALUE txnv_again=NULL;
u_int32_t index;
int r = toku_omt_find_zero(brt->txns, find_xid, txn, &txnv_again, &index, NULL);
int r = toku_omt_find_zero(brt->txns, find_xid, txn, &txnv_again, &index);
assert(r==0);
assert((void*)txnv_again==txnv);
r = toku_omt_delete_at(brt->txns, index);
......@@ -760,7 +760,7 @@ int toku_txn_find_by_xid (BRT brt, TXNID xid, TOKUTXN *txnptr) {
struct tokutxn fake_txn; fake_txn.txnid64 = xid;
u_int32_t index;
OMTVALUE txnv;
int r = toku_omt_find_zero(brt->txns, find_xid, &fake_txn, &txnv, &index, NULL);
int r = toku_omt_find_zero(brt->txns, find_xid, &fake_txn, &txnv, &index);
if (r == 0) *txnptr = txnv;
return r;
}
......
#ident "$Id$"
#ident "Copyright (c) 2007-2011 Tokutek Inc. All rights reserved."
#include <toku_portability.h>
#include <errno.h>
#include "test.h"
#include <stdio.h>
#include "omt.h"
#include "memory.h"
#include "brttypes.h"
#include <stdlib.h>
#include <string.h>
enum { N=10 };
struct value { int x; } vs[N];
OMTVALUE ps[N];
#define V(x) ((struct value *)(x))
static void callback (OMTCURSOR c, void *extra) {
if (verbose) printf("%s:%d %p %p\n", __FUNCTION__, __LINE__, c, extra);
OMTVALUE v = NULL;
int r = toku_omt_cursor_current(c, &v);
if (verbose) printf("%s:%d %d\n", __FUNCTION__, __LINE__, r);
}
static void test (void) {
OMT o;
OMTCURSOR curs, curs2, curs3;
int i, r;
OMTVALUE v;
for (i=0; i<N; i++) {
vs[i].x=i;
ps[i]=&vs[i];
}
// destroy the omt first
r = toku_omt_create_from_sorted_array(&o, ps, 10); assert(r==0);
r = toku_omt_cursor_create(&curs); assert(r==0);
r = toku_omt_fetch(o, 5, &v, curs); assert(r==0);
toku_omt_destroy(&o);
toku_omt_cursor_destroy(&curs);
// destroy the cursor first
r = toku_omt_create_from_sorted_array(&o, ps, 10); assert(r==0);
r = toku_omt_cursor_create(&curs); assert(r==0);
r = toku_omt_fetch(o, 5, &v, curs); assert(r==0);
assert(V(v)->x==5);
r = toku_omt_cursor_next(curs, &v);
assert(r==0 && V(v)->x==6);
r = toku_omt_cursor_prev(curs, &v);
assert(r==0 && V(v)->x==5);
toku_omt_cursor_destroy(&curs);
toku_omt_destroy(&o);
// Create two cursors, destroy omt first
r = toku_omt_create_from_sorted_array(&o, ps, 10); assert(r==0);
r = toku_omt_cursor_create(&curs); assert(r==0);
r = toku_omt_fetch(o, 5, &v, curs); assert(r==0);
r = toku_omt_cursor_create(&curs2); assert(r==0);
r = toku_omt_fetch(o, 4, &v, curs2); assert(r==0);
r = toku_omt_cursor_next(curs, &v); assert(r==0 && V(v)->x==6);
toku_omt_destroy(&o);
toku_omt_cursor_destroy(&curs);
toku_omt_cursor_destroy(&curs2);
// Create two cursors, destroy them first
r = toku_omt_create_from_sorted_array(&o, ps, 10); assert(r==0);
r = toku_omt_cursor_create(&curs); assert(r==0);
r = toku_omt_fetch(o, 5, &v, curs); assert(r==0);
r = toku_omt_cursor_create(&curs2); assert(r==0);
r = toku_omt_fetch(o, 4, &v, curs2); assert(r==0);
r = toku_omt_cursor_next(curs, &v); assert(r==0 && V(v)->x==6);
r = toku_omt_cursor_prev(curs2, &v); assert(r==0 && V(v)->x==3);
toku_omt_cursor_destroy(&curs);
r = toku_omt_cursor_prev(curs2, &v); assert(r==0 && V(v)->x==2);
toku_omt_cursor_destroy(&curs2);
toku_omt_destroy(&o);
// Create three cursors, destroy them first
r = toku_omt_create_from_sorted_array(&o, ps, 10); assert(r==0);
r = toku_omt_cursor_create(&curs); assert(r==0);
r = toku_omt_fetch(o, 5, &v, curs); assert(r==0);
r = toku_omt_cursor_create(&curs2); assert(r==0);
r = toku_omt_fetch(o, 4, &v, curs2); assert(r==0);
r = toku_omt_cursor_create(&curs3); assert(r==0);
r = toku_omt_fetch(o, 9, &v, curs3); assert(r==0);
r = toku_omt_cursor_next(curs, &v); assert(r==0 && V(v)->x==6);
r = toku_omt_cursor_prev(curs2, &v); assert(r==0 && V(v)->x==3);
r = toku_omt_cursor_next(curs3, &v); assert(r!=0 && !toku_omt_cursor_is_valid(curs3));
toku_omt_cursor_destroy(&curs);
r = toku_omt_cursor_prev(curs2, &v); assert(r==0 && V(v)->x==2);
r = toku_omt_cursor_prev(curs2, &v); assert(r==0 && V(v)->x==1);
r = toku_omt_fetch(o, 1, &v, curs3); assert(r==0);
r = toku_omt_cursor_prev(curs3, &v); assert(r==0 && V(v)->x==0);
r = toku_omt_cursor_prev(curs3, &v); assert(r!=0 && !toku_omt_cursor_is_valid(curs3));
toku_omt_cursor_destroy(&curs2);
toku_omt_destroy(&o);
toku_omt_cursor_destroy(&curs3);
// ticket 1622, invalidate recursion
r = toku_omt_create_from_sorted_array(&o, ps, 10); assert(r==0);
r = toku_omt_cursor_create(&curs); assert(r==0);
toku_omt_cursor_set_invalidate_callback(curs, callback, 0);
r = toku_omt_fetch(o, 9, &v, curs); assert(r==0);
r = toku_omt_cursor_next(curs, &v);
toku_omt_destroy(&o);
toku_omt_cursor_destroy(&curs);
}
int
test_main (int argc __attribute__((__unused__)), const char *argv[] __attribute__((__unused__))) {
test();
return 0;
}
......@@ -264,94 +264,30 @@ test_create_from_sorted_array_size (enum create_type create_choice, enum close_w
static void
test_fetch_verify (OMT omtree, TESTVALUE* val, u_int32_t len ) {
u_int32_t i;
int j;
int r;
TESTVALUE v = (TESTVALUE)&i;
TESTVALUE oldv = v;
OMTCURSOR c;
r = toku_omt_cursor_create(&c);
CKERR(r);
assert(len == toku_omt_size(omtree));
for (i = 0; i < len; i++) {
assert(oldv!=val[i]);
v = NULL;
r = toku_omt_fetch(omtree, i, &v, NULL);
CKERR(r);
assert(v != NULL);
assert(v != oldv);
assert(v == val[i]);
assert(V(v)->number == V(val[i])->number);
v = oldv;
r = toku_omt_fetch(omtree, i, &v, c);
CKERR(r);
assert(v != NULL);
assert(v != oldv);
assert(v == val[i]);
assert(V(v)->number == V(val[i])->number);
assert(toku_omt_cursor_is_valid(c));
v = oldv;
r = toku_omt_cursor_current(c, &v);
CKERR(r);
assert(v != NULL);
assert(v != oldv);
assert(v == val[i]);
assert(V(v)->number == V(val[i])->number);
assert(toku_omt_cursor_is_valid(c));
v = oldv;
j = i + 1;
while ((r = toku_omt_cursor_next(c, &v)) == 0) {
assert(toku_omt_cursor_is_valid(c));
assert(v != NULL);
assert(v != oldv);
assert(v == val[j]);
assert(V(v)->number == V(val[j])->number);
j++;
v = oldv;
}
CKERR2(r, EINVAL);
assert(j == (int) len);
assert(oldv!=val[i]);
v = NULL;
r = toku_omt_fetch(omtree, i, &v, c);
r = toku_omt_fetch(omtree, i, &v);
CKERR(r);
assert(v != NULL);
assert(v != oldv);
assert(v == val[i]);
assert(V(v)->number == V(val[i])->number);
v = oldv;
j = i - 1;
while ((r = toku_omt_cursor_prev(c, &v)) == 0) {
assert(toku_omt_cursor_is_valid(c));
assert(v != NULL);
assert(v != oldv);
assert(v == val[j]);
assert(V(v)->number == V(val[j])->number);
j--;
v = oldv;
}
CKERR2(r, EINVAL);
assert(j == -1);
}
for (i = len; i < len*2; i++) {
v = oldv;
r = toku_omt_fetch(omtree, i, &v, NULL);
r = toku_omt_fetch(omtree, i, &v);
CKERR2(r, EINVAL);
assert(v == oldv);
v = NULL;
r = toku_omt_fetch(omtree, i, &v, c);
CKERR2(r, EINVAL);
assert(v == NULL);
}
toku_omt_cursor_destroy(&c);
}
static void
......@@ -700,69 +636,22 @@ heavy_extra (h_extra* extra, u_int32_t first_zero, u_int32_t first_pos) {
static void
test_find_dir (int dir, void* extra, int (*h)(OMTVALUE, void*),
int r_expect, BOOL idx_will_change, u_int32_t idx_expect,
u_int32_t number_expect, BOOL cursor_valid) {
u_int32_t number_expect, BOOL UU(cursor_valid)) {
u_int32_t idx = UINT32_MAX;
u_int32_t old_idx = idx;
TESTVALUE omt_val, omt_val_curs;
OMTCURSOR c;
TESTVALUE omt_val;
int r;
BOOL found;
r = toku_omt_cursor_create(&c);
CKERR(r);
omt_val = NULL;
if (dir == 0) {
r = toku_omt_find_zero(omt, h, extra, &omt_val, &idx, c);
}
else {
r = toku_omt_find( omt, h, extra, dir, &omt_val, &idx, c);
}
CKERR2(r, r_expect);
if (idx_will_change) {
assert(idx == idx_expect);
}
else {
assert(idx == old_idx);
}
if (r == DB_NOTFOUND) {
assert(omt_val == NULL);
found = FALSE;
}
else {
assert(V(omt_val)->number == number_expect);
found = TRUE;
}
assert(!cursor_valid == !toku_omt_cursor_is_valid(c));
if (cursor_valid) {
TESTVALUE tmp;
assert(idx_will_change);
omt_val_curs = NULL;
r = toku_omt_cursor_current(c, &omt_val_curs);
CKERR(r);
assert(toku_omt_cursor_is_valid(c));
r = toku_omt_fetch(omt, idx, &tmp, NULL);
CKERR(r);
if (found) assert(tmp==omt_val);
assert(omt_val_curs != NULL);
assert(omt_val_curs == tmp);
assert(V(omt_val_curs)->number == V(tmp)->number);
if (found) assert(V(omt_val_curs)->number==number_expect);
}
toku_omt_cursor_invalidate(c);
assert(!toku_omt_cursor_is_valid(c));
toku_omt_cursor_destroy(&c);
/* Verify we can pass NULL value. */
omt_val = NULL;
idx = old_idx;
if (dir == 0) {
r = toku_omt_find_zero(omt, h, extra, NULL, &idx, NULL);
r = toku_omt_find_zero(omt, h, extra, NULL, &idx);
}
else {
r = toku_omt_find( omt, h, extra, dir, NULL, &idx, NULL);
r = toku_omt_find( omt, h, extra, dir, NULL, &idx);
}
CKERR2(r, r_expect);
if (idx_will_change) {
......@@ -777,10 +666,10 @@ test_find_dir (int dir, void* extra, int (*h)(OMTVALUE, void*),
omt_val = NULL;
idx = old_idx;
if (dir == 0) {
r = toku_omt_find_zero(omt, h, extra, &omt_val, 0, NULL);
r = toku_omt_find_zero(omt, h, extra, &omt_val, 0);
}
else {
r = toku_omt_find( omt, h, extra, dir, &omt_val, 0, NULL);
r = toku_omt_find( omt, h, extra, dir, &omt_val, 0);
}
CKERR2(r, r_expect);
assert(idx == old_idx);
......@@ -795,10 +684,10 @@ test_find_dir (int dir, void* extra, int (*h)(OMTVALUE, void*),
omt_val = NULL;
idx = old_idx;
if (dir == 0) {
r = toku_omt_find_zero(omt, h, extra, NULL, 0, NULL);
r = toku_omt_find_zero(omt, h, extra, NULL, 0);
}
else {
r = toku_omt_find( omt, h, extra, dir, NULL, 0, NULL);
r = toku_omt_find( omt, h, extra, dir, NULL, 0);
}
CKERR2(r, r_expect);
assert(idx == old_idx);
......@@ -879,100 +768,12 @@ test_find (enum create_type create_choice, enum close_when_done do_close) {
test_close(do_close);
}
static void
invalidate_callback_null (OMTCURSOR c, void *extra) {
assert(c && !extra);
}
static void
invalidate_callback_inc (OMTCURSOR c, void *extra) {
assert(c);
int *num = extra;
(*num)++;
}
static void
test_invalidate (enum create_type create_choice, BOOL set_callback, BOOL invalidate_callback) {
init_identity_values(random_seed, 100);
test_create_from_sorted_array(create_choice, KEEP_WHEN_DONE);
OMTCURSOR c;
int invalidate_count = 0;
int r = toku_omt_cursor_create(&c);
if (set_callback || invalidate_callback) {
toku_omt_cursor_set_invalidate_callback(c, invalidate_callback_inc, &invalidate_count);
}
if (invalidate_callback) {
toku_omt_cursor_set_invalidate_callback(c, invalidate_callback_null, NULL);
}
OMTVALUE val;
r = toku_omt_fetch(omt, 0, &val, c); CKERR(r);
assert(toku_omt_cursor_is_valid(c));
assert(invalidate_count==0);
r = toku_omt_cursor_prev(c, &val); CKERR2(r, EINVAL);
assert(!toku_omt_cursor_is_valid(c));
if (set_callback && !invalidate_callback) assert(invalidate_count==1);
else assert(invalidate_count==0);
r = toku_omt_cursor_prev(c, &val); CKERR2(r, EINVAL);
assert(!toku_omt_cursor_is_valid(c));
if (set_callback && !invalidate_callback) assert(invalidate_count==1);
else assert(invalidate_count==0);
r = toku_omt_fetch(omt, toku_omt_size(omt)-1, &val, c); CKERR(r);
assert(toku_omt_cursor_is_valid(c));
if (set_callback && !invalidate_callback) assert(invalidate_count==1);
else assert(invalidate_count==0);
r = toku_omt_cursor_prev(c, &val); CKERR(r);
assert(toku_omt_cursor_is_valid(c));
if (set_callback && !invalidate_callback) assert(invalidate_count==1);
else assert(invalidate_count==0);
r = toku_omt_cursor_next(c, &val); CKERR(r);
assert(toku_omt_cursor_is_valid(c));
if (set_callback && !invalidate_callback) assert(invalidate_count==1);
else assert(invalidate_count==0);
r = toku_omt_cursor_next(c, &val); CKERR2(r, EINVAL);
assert(!toku_omt_cursor_is_valid(c));
if (set_callback && !invalidate_callback) assert(invalidate_count==2);
else assert(invalidate_count==0);
r = toku_omt_fetch(omt, toku_omt_size(omt)-1, &val, c); CKERR(r);
assert(toku_omt_cursor_is_valid(c));
if (set_callback && !invalidate_callback) assert(invalidate_count==2);
else assert(invalidate_count==0);
test_close(CLOSE_WHEN_DONE);
assert(!toku_omt_cursor_is_valid(c));
if (set_callback && !invalidate_callback) assert(invalidate_count==3);
else assert(invalidate_count==0);
init_identity_values(random_seed, 100);
test_create_from_sorted_array(create_choice, KEEP_WHEN_DONE);
r = toku_omt_fetch(omt, toku_omt_size(omt)-1, &val, c); CKERR(r);
assert(toku_omt_cursor_is_valid(c));
if (set_callback && !invalidate_callback) assert(invalidate_count==3);
else assert(invalidate_count==0);
toku_omt_cursor_destroy(&c);
if (set_callback && !invalidate_callback) assert(invalidate_count==4);
else assert(invalidate_count==0);
test_close(CLOSE_WHEN_DONE);
if (set_callback && !invalidate_callback) assert(invalidate_count==4);
else assert(invalidate_count==0);
}
static void
runtests_create_choice (enum create_type create_choice) {
test_create_array(create_choice, TEST_SORTED);
test_create_array(create_choice, TEST_RANDOM);
test_create_array(create_choice, TEST_IDENTITY);
test_find( create_choice, CLOSE_WHEN_DONE);
test_invalidate( create_choice, FALSE, FALSE);
test_invalidate( create_choice, FALSE, TRUE);
test_invalidate( create_choice, TRUE, FALSE);
test_invalidate( create_choice, TRUE, TRUE);
}
int
......
......@@ -114,7 +114,7 @@ live_list_reverse_note_txn_start_iter(OMTVALUE live_xidv, u_int32_t UU(index), v
int r;
OMT reverse = txn->logger->live_list_reverse;
r = toku_omt_find_zero(reverse, toku_find_pair_by_xid, live_xid, &pairv, &idx, NULL);
r = toku_omt_find_zero(reverse, toku_find_pair_by_xid, live_xid, &pairv, &idx);
if (r==0) {
pair = pairv;
invariant(pair->xid1 == *live_xid); //sanity check
......@@ -488,7 +488,7 @@ TXNID toku_get_oldest_in_live_root_txn_list(TOKUTXN txn) {
invariant(toku_omt_size(omt)>0);
OMTVALUE v;
int r;
r = toku_omt_fetch(omt, 0, &v, NULL);
r = toku_omt_fetch(omt, 0, &v);
assert_zero(r);
TXNID *xidp = v;
return *xidp;
......@@ -509,7 +509,7 @@ BOOL toku_is_txn_in_live_root_txn_list(TOKUTXN txn, TXNID xid) {
OMTVALUE txnidpv;
uint32_t index;
BOOL retval = FALSE;
int r = toku_omt_find_zero(omt, find_xidp, &xid, &txnidpv, &index, NULL);
int r = toku_omt_find_zero(omt, find_xidp, &xid, &txnidpv, &index);
if (r==0) {
TXNID *txnidp = txnidpv;
invariant(*txnidp == xid);
......@@ -536,19 +536,19 @@ verify_snapshot_system(TOKULOGGER logger) {
//set up arrays for easier access
for (i = 0; i < num_snapshot_txnids; i++) {
OMTVALUE v;
r = toku_omt_fetch(logger->snapshot_txnids, i, &v, NULL);
r = toku_omt_fetch(logger->snapshot_txnids, i, &v);
assert_zero(r);
snapshot_txnids[i] = *(TXNID*)v;
}
for (i = 0; i < num_live_txns; i++) {
OMTVALUE v;
r = toku_omt_fetch(logger->live_txns, i, &v, NULL);
r = toku_omt_fetch(logger->live_txns, i, &v);
assert_zero(r);
live_txns[i] = v;
}
for (i = 0; i < num_live_list_reverse; i++) {
OMTVALUE v;
r = toku_omt_fetch(logger->live_list_reverse, i, &v, NULL);
r = toku_omt_fetch(logger->live_list_reverse, i, &v);
assert_zero(r);
live_list_reverse[i] = v;
}
......@@ -566,7 +566,7 @@ verify_snapshot_system(TOKULOGGER logger) {
{
for (j = 0; j < num_live_root_txn_list; j++) {
OMTVALUE v;
r = toku_omt_fetch(snapshot_txn->live_root_txn_list, j, &v, NULL);
r = toku_omt_fetch(snapshot_txn->live_root_txn_list, j, &v);
assert_zero(r);
live_root_txn_list[j] = *(TXNID*)v;
}
......@@ -595,7 +595,7 @@ verify_snapshot_system(TOKULOGGER logger) {
OMTVALUE v2;
r = toku_omt_find_zero(logger->snapshot_txnids,
toku_find_xid_by_xid,
&pair->xid2, &v2, &index, NULL);
&pair->xid2, &v2, &index);
assert_zero(r);
}
for (j = 0; j < num_live_txns; j++) {
......@@ -621,7 +621,7 @@ verify_snapshot_system(TOKULOGGER logger) {
OMTVALUE v2;
r = toku_omt_find_zero(logger->snapshot_txnids,
toku_find_xid_by_xid,
&txn->txnid64, &v2, &index, NULL);
&txn->txnid64, &v2, &index);
invariant(r==0 || r==DB_NOTFOUND);
invariant((r==0) == (expect!=0));
}
......
......@@ -137,7 +137,7 @@ get_next_older_txnid(TXNID xc, OMT omt) {
OMTVALUE v;
uint32_t idx;
TXNID rval;
r = toku_omt_find(omt, toku_find_pair_by_xid, &xc, -1, &v, &idx, NULL);
r = toku_omt_find(omt, toku_find_pair_by_xid, &xc, -1, &v, &idx);
if (r==0) {
xid = v;
invariant(*xid < xc); //sanity check
......@@ -157,7 +157,7 @@ toku_get_youngest_live_list_txnid_for(TXNID xc, OMT live_list_reverse) {
uint32_t idx;
TXNID rval;
int r;
r = toku_omt_find_zero(live_list_reverse, toku_find_pair_by_xid, &xc, &pairv, &idx, NULL);
r = toku_omt_find_zero(live_list_reverse, toku_find_pair_by_xid, &xc, &pairv, &idx);
if (r==0) {
pair = pairv;
invariant(pair->xid1 == xc); //sanity check
......@@ -226,7 +226,7 @@ garbage_collection(ULE ule, OMT snapshot_xids, OMT live_list_reverse) {
{
u_int32_t idx;
OMTVALUE txnagain;
int r = toku_omt_find_zero(snapshot_xids, toku_find_xid_by_xid, &tl1, &txnagain, &idx, NULL);
int r = toku_omt_find_zero(snapshot_xids, toku_find_xid_by_xid, &tl1, &txnagain, &idx);
invariant(r==0); //make sure that the txn you are claiming is live is actually live
}
//
......
......@@ -1710,7 +1710,7 @@ static int lt_do_escalation(toku_lock_tree* lt) {
OMTVALUE dbv;
assert(toku_omt_size(lt->dbs) > 0); // there is at least one db associated with this locktree
r = toku_omt_fetch(lt->dbs, 0, &dbv, NULL);
r = toku_omt_fetch(lt->dbs, 0, &dbv);
assert(r == 0);
db = dbv;
lt_set_comparison_functions(lt, db);
......@@ -2143,7 +2143,7 @@ static void lt_add_db(toku_lock_tree* tree, DB *db) {
int r;
OMTVALUE get_dbv = NULL;
uint32_t index;
r = toku_omt_find_zero(tree->dbs, find_db, db, &get_dbv, &index, NULL);
r = toku_omt_find_zero(tree->dbs, find_db, db, &get_dbv, &index);
assert(r == DB_NOTFOUND);
r = toku_omt_insert_at(tree->dbs, db, index);
assert_zero(r);
......@@ -2155,7 +2155,7 @@ static void lt_remove_db(toku_lock_tree* tree, DB *db) {
int r;
OMTVALUE get_dbv = NULL;
uint32_t index;
r = toku_omt_find_zero(tree->dbs, find_db, db, &get_dbv, &index, NULL);
r = toku_omt_find_zero(tree->dbs, find_db, db, &get_dbv, &index);
assert_zero(r);
assert(db == get_dbv);
r = toku_omt_delete_at(tree->dbs, index);
......
......@@ -157,7 +157,7 @@ toku_rt_find(toku_range_tree* tree, toku_interval* query, u_int32_t k,
extra.end_cmp = tree->end_cmp;
extra.query = *query;
r = toku_omt_find_zero(tree->i.omt, rt_heaviside, &extra, NULL, &leftmost, NULL);
r = toku_omt_find_zero(tree->i.omt, rt_heaviside, &extra, NULL, &leftmost);
if (r == DB_NOTFOUND) {
/* Nothing overlaps. */
*numfound = 0;
......@@ -199,7 +199,7 @@ toku_rt_insert(toku_range_tree* tree, toku_range* range) {
extra.end_cmp = tree->end_cmp;
extra.query = range->ends;
r = toku_omt_find_zero(tree->i.omt, rt_heaviside, &extra, NULL, &index, NULL);
r = toku_omt_find_zero(tree->i.omt, rt_heaviside, &extra, NULL, &index);
if (r == 0) {
r = EDOM; goto cleanup;
}
......@@ -233,7 +233,7 @@ toku_rt_delete(toku_range_tree* tree, toku_range* range) {
extra.end_cmp = tree->end_cmp;
extra.query = range->ends;
r = toku_omt_find_zero(tree->i.omt, rt_heaviside, &extra, &value, &index, NULL);
r = toku_omt_find_zero(tree->i.omt, rt_heaviside, &extra, &value, &index);
if (r != 0) {
r = EDOM; goto cleanup;
}
......@@ -270,7 +270,7 @@ rt_neightbor(toku_range_tree* tree, toku_point* point,
extra.query.right = point;
assert(direction==1 || direction==-1);
r = toku_omt_find(tree->i.omt, rt_heaviside, &extra, direction, &value, &index, NULL);
r = toku_omt_find(tree->i.omt, rt_heaviside, &extra, direction, &value, &index);
if (r == DB_NOTFOUND) {
*wasfound = FALSE;
r = 0;
......@@ -349,14 +349,14 @@ toku_rt_verify(toku_range_tree *tree) {
for (u_int32_t i = 0; i < tree->numelements; i++) {
// assert left <= right
OMTVALUE omtv;
r = toku_omt_fetch(tree->i.omt, i, &omtv, NULL);
r = toku_omt_fetch(tree->i.omt, i, &omtv);
assert_zero(r);
toku_range *v = (toku_range *) omtv;
assert(tree->end_cmp(v->ends.left, v->ends.right) <= 0);
// assert ranges are sorted
if (i < tree->numelements-1) {
OMTVALUE omtvnext;
r = toku_omt_fetch(tree->i.omt, i+1, &omtvnext, NULL);
r = toku_omt_fetch(tree->i.omt, i+1, &omtvnext);
assert_zero(r);
toku_range *vnext = (toku_range *) omtvnext;
assert(tree->end_cmp(v->ends.right, vnext->ends.left) < 0);
......@@ -365,11 +365,11 @@ toku_rt_verify(toku_range_tree *tree) {
// verify no overlaps
for (u_int32_t i = 1; i < tree->numelements; i++) {
OMTVALUE omtvprev;
r = toku_omt_fetch(tree->i.omt, i-1, &omtvprev, NULL);
r = toku_omt_fetch(tree->i.omt, i-1, &omtvprev);
assert_zero(r);
toku_range *vprev = (toku_range *) omtvprev;
OMTVALUE omtv;
r = toku_omt_fetch(tree->i.omt, i, &omtv, NULL);
r = toku_omt_fetch(tree->i.omt, i, &omtv);
assert_zero(r);
toku_range *v = (toku_range *) omtv;
assert(!toku__rt_overlap(tree, &vprev->ends, &v->ends));
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007 Tokutek Inc. All rights reserved."
#include "test.h"
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <memory.h>
#include <sys/stat.h>
#include <db.h>
static void
verify_val(DBT const *a, DBT const *b, void *c) {
assert(a->size == sizeof(int));
assert(b->size == sizeof(int));
int* expected = (int *)c;
assert(*expected == *(int *)a->data);
assert(*expected == *(int *)b->data);
}
static int
verify_fwd_fast(DBT const *a, DBT const *b, void *c) {
verify_val(a,b,c);
int* expected = (int *)c;
*expected = *expected + 1;
return TOKUDB_CURSOR_CONTINUE;
}
static int
verify_fwd_slow(DBT const *a, DBT const *b, void *c) {
verify_val(a,b,c);
int* expected = (int *)c;
*expected = *expected + 1;
return 0;
}
static int
verify_bwd_fast(DBT const *a, DBT const *b, void *c) {
verify_val(a,b,c);
int* expected = (int *)c;
*expected = *expected - 1;
return TOKUDB_CURSOR_CONTINUE;
}
static int
verify_bwd_slow(DBT const *a, DBT const *b, void *c) {
verify_val(a,b,c);
int* expected = (int *)c;
*expected = *expected - 1;
return 0;
}
static void
test_bulk_fetch (int n) {
if (verbose) printf("test_rand_insert:%d \n", n);
DB_TXN * const null_txn = 0;
const char * const fname = "test.bulk_fetch.brt";
int r;
r = system("rm -rf " ENVDIR);
CKERR(r);
r=toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO); assert(r==0);
/* create the dup database file */
DB_ENV *env;
r = db_env_create(&env, 0); assert(r == 0);
r=env->set_default_bt_compare(env, int_dbt_cmp); CKERR(r);
r = env->open(env, ENVDIR, DB_CREATE+DB_PRIVATE+DB_INIT_MPOOL, 0); assert(r == 0);
DB *db;
r = db_create(&db, env, 0);
assert(r == 0);
r = db->set_flags(db, 0);
assert(r == 0);
r = db->set_pagesize(db, 4096);
assert(r == 0);
r = db->open(db, null_txn, fname, "main", DB_BTREE, DB_CREATE, 0666);
assert(r == 0);
int keys[n];
int i;
for (i=0; i<n; i++) {
keys[i] = i;
}
for (i=0; i<n; i++) {
DBT key, val;
r = db->put(db, null_txn, dbt_init(&key, &keys[i], sizeof keys[i]), dbt_init(&val, &i, sizeof i), 0);
assert(r == 0);
}
//
// data inserted, now verify that using TOKUDB_CURSOR_CONTINUE in the callback works
//
DBC* cursor;
// verify fast
r = db->cursor(db, NULL, &cursor, 0);
CKERR(r);
int expected = 0;
while (r != DB_NOTFOUND) {
r = cursor->c_getf_next(cursor, 0, verify_fwd_fast, &expected);
assert(r==0 || r==DB_NOTFOUND);
}
r = cursor->c_close(cursor); CKERR(r);
// verify slow
r = db->cursor(db, NULL, &cursor, 0);
CKERR(r);
expected = 0;
while (r != DB_NOTFOUND) {
r = cursor->c_getf_next(cursor, 0, verify_fwd_slow, &expected);
assert(r==0 || r==DB_NOTFOUND);
}
r = cursor->c_close(cursor); CKERR(r);
// now do backwards
r = db->cursor(db, NULL, &cursor, 0);
CKERR(r);
expected = n-1;
while (r != DB_NOTFOUND) {
r = cursor->c_getf_prev(cursor, 0, verify_bwd_fast, &expected);
assert(r==0 || r==DB_NOTFOUND);
}
r = cursor->c_close(cursor); CKERR(r);
// verify slow
r = db->cursor(db, NULL, &cursor, 0);
CKERR(r);
expected = n-1;
while (r != DB_NOTFOUND) {
r = cursor->c_getf_prev(cursor, 0, verify_bwd_slow, &expected);
assert(r==0 || r==DB_NOTFOUND);
}
r = cursor->c_close(cursor); CKERR(r);
r = db->close(db, 0); CKERR(r);
r = env->close(env, 0); CKERR(r);
}
int
test_main(int argc, char *const argv[]) {
parse_args(argc, argv);
test_bulk_fetch(10000);
return 0;
}
......@@ -2819,7 +2819,7 @@ env_note_db_opened(DB_ENV *env, DB *db) {
OMTVALUE dbv;
uint32_t idx;
env->i->num_open_dbs++;
r = toku_omt_find_zero(env->i->open_dbs, find_db_by_db, db, &dbv, &idx, NULL);
r = toku_omt_find_zero(env->i->open_dbs, find_db_by_db, db, &dbv, &idx);
assert(r==DB_NOTFOUND); //Must not already be there.
r = toku_omt_insert_at(env->i->open_dbs, db, idx);
assert(r==0);
......@@ -2834,7 +2834,7 @@ env_note_db_closed(DB_ENV *env, DB *db) {
OMTVALUE dbv;
uint32_t idx;
env->i->num_open_dbs--;
r = toku_omt_find_zero(env->i->open_dbs, find_db_by_db, db, &dbv, &idx, NULL);
r = toku_omt_find_zero(env->i->open_dbs, find_db_by_db, db, &dbv, &idx);
assert(r==0); //Must already be there.
assert((DB*)dbv == db);
r = toku_omt_delete_at(env->i->open_dbs, idx);
......@@ -2850,7 +2850,7 @@ env_note_zombie_db(DB_ENV *env, DB *db) {
OMTVALUE dbv;
uint32_t idx;
env->i->num_zombie_dbs++;
r = toku_omt_find_zero(env->i->open_dbs, find_db_by_db, db, &dbv, &idx, NULL);
r = toku_omt_find_zero(env->i->open_dbs, find_db_by_db, db, &dbv, &idx);
assert(r==DB_NOTFOUND); //Must not already be there.
r = toku_omt_insert_at(env->i->open_dbs, db, idx);
assert(r==0);
......@@ -2865,7 +2865,7 @@ env_note_zombie_db_closed(DB_ENV *env, DB *db) {
OMTVALUE dbv;
uint32_t idx;
env->i->num_zombie_dbs--;
r = toku_omt_find_zero(env->i->open_dbs, find_db_by_db, db, &dbv, &idx, NULL);
r = toku_omt_find_zero(env->i->open_dbs, find_db_by_db, db, &dbv, &idx);
assert(r==0); //Must already be there.
assert((DB*)dbv == db);
r = toku_omt_delete_at(env->i->open_dbs, idx);
......@@ -2907,7 +2907,7 @@ env_is_db_with_dname_open(DB_ENV *env, const char *dname) {
BOOL rval;
OMTVALUE dbv;
uint32_t idx;
r = toku_omt_find_zero(env->i->open_dbs, find_open_db_by_dname, (void*)dname, &dbv, &idx, NULL);
r = toku_omt_find_zero(env->i->open_dbs, find_open_db_by_dname, (void*)dname, &dbv, &idx);
if (r==0) {
DB *db = dbv;
assert(strcmp(dname, db->i->dname) == 0);
......@@ -2928,7 +2928,7 @@ env_get_zombie_db_with_dname(DB_ENV *env, const char *dname) {
DB* rval;
OMTVALUE dbv;
uint32_t idx;
r = toku_omt_find_zero(env->i->open_dbs, find_zombie_db_by_dname, (void*)dname, &dbv, &idx, NULL);
r = toku_omt_find_zero(env->i->open_dbs, find_zombie_db_by_dname, (void*)dname, &dbv, &idx);
if (r==0) {
DB *db = dbv;
assert(db);
......@@ -3461,7 +3461,7 @@ c_getf_first_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val,
//Call application-layer callback if found and locks were successfully obtained.
if (r==0 && key!=NULL) {
context->r_user_callback = super_context->f(&found_key, &found_val, context->f_extra);
if (context->r_user_callback) r = TOKUDB_USER_CALLBACK_ERROR;
r = context->r_user_callback;
}
//Give brt-layer an error (if any) to return from toku_brt_cursor_first
......@@ -3512,7 +3512,7 @@ c_getf_last_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val, v
//Call application-layer callback if found and locks were successfully obtained.
if (r==0 && key!=NULL) {
context->r_user_callback = super_context->f(&found_key, &found_val, context->f_extra);
if (context->r_user_callback) r = TOKUDB_USER_CALLBACK_ERROR;
r = context->r_user_callback;
}
//Give brt-layer an error (if any) to return from toku_brt_cursor_last
......@@ -3567,7 +3567,7 @@ c_getf_next_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val, v
//Call application-layer callback if found and locks were successfully obtained.
if (r==0 && key!=NULL) {
context->r_user_callback = super_context->f(&found_key, &found_val, context->f_extra);
if (context->r_user_callback) r = TOKUDB_USER_CALLBACK_ERROR;
r = context->r_user_callback;
}
//Give brt-layer an error (if any) to return from toku_brt_cursor_next
......@@ -3622,7 +3622,7 @@ c_getf_prev_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val, v
//Call application-layer callback if found and locks were successfully obtained.
if (r==0 && key!=NULL) {
context->r_user_callback = super_context->f(&found_key, &found_val, context->f_extra);
if (context->r_user_callback) r = TOKUDB_USER_CALLBACK_ERROR;
r = context->r_user_callback;
}
//Give brt-layer an error (if any) to return from toku_brt_cursor_prev
......@@ -3660,7 +3660,7 @@ c_getf_current_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val
//Call application-layer callback if found.
if (key!=NULL) {
context->r_user_callback = super_context->f(&found_key, &found_val, context->f_extra);
if (context->r_user_callback) r = TOKUDB_USER_CALLBACK_ERROR;
r = context->r_user_callback;
}
//Give brt-layer an error (if any) to return from toku_brt_cursor_current
......@@ -3724,7 +3724,7 @@ c_getf_set_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val, vo
//Call application-layer callback if found and locks were successfully obtained.
if (r==0 && key!=NULL) {
context->r_user_callback = super_context->f(&found_key, &found_val, context->f_extra);
if (context->r_user_callback) r = TOKUDB_USER_CALLBACK_ERROR;
r = context->r_user_callback;
}
//Give brt-layer an error (if any) to return from toku_brt_cursor_set
......@@ -3779,7 +3779,7 @@ c_getf_set_range_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec v
//Call application-layer callback if found and locks were successfully obtained.
if (r==0 && key!=NULL) {
context->r_user_callback = super_context->f(&found_key, &found_val, context->f_extra);
if (context->r_user_callback) r = TOKUDB_USER_CALLBACK_ERROR;
r = context->r_user_callback;
}
//Give brt-layer an error (if any) to return from toku_brt_cursor_set_range
......@@ -3835,7 +3835,7 @@ c_getf_set_range_reverse_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, b
//Call application-layer callback if found and locks were successfully obtained.
if (r==0 && key!=NULL) {
context->r_user_callback = super_context->f(&found_key, &found_val, context->f_extra);
if (context->r_user_callback) r = TOKUDB_USER_CALLBACK_ERROR;
r = context->r_user_callback;
}
//Give brt-layer an error (if any) to return from toku_brt_cursor_set_range_reverse
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment