Commit 593ec83c authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:3644], merge cachetable changes for #3627 from tokudb.3627 to main

git-svn-id: file:///svn/toku/tokudb@32519 c7de825b-a66e-492c-adef-691d508d4ae1
parent a729ed9d
...@@ -312,6 +312,8 @@ struct brtenv { ...@@ -312,6 +312,8 @@ struct brtenv {
extern void toku_brtnode_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM nodename, void *brtnode_v, void *extraargs, long size, BOOL write_me, BOOL keep_me, BOOL for_checkpoint); extern void toku_brtnode_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM nodename, void *brtnode_v, void *extraargs, long size, BOOL write_me, BOOL keep_me, BOOL for_checkpoint);
extern int toku_brtnode_fetch_callback (CACHEFILE cachefile, int fd, BLOCKNUM nodename, u_int32_t fullhash, void **brtnode_pv, long *sizep, int*dirty, void*extraargs); extern int toku_brtnode_fetch_callback (CACHEFILE cachefile, int fd, BLOCKNUM nodename, u_int32_t fullhash, void **brtnode_pv, long *sizep, int*dirty, void*extraargs);
extern int toku_brtnode_pe_callback (void *brtnode_pv, long bytes_to_free, long* bytes_freed, void *extraargs); extern int toku_brtnode_pe_callback (void *brtnode_pv, long bytes_to_free, long* bytes_freed, void *extraargs);
extern BOOL toku_brtnode_pf_req_callback(void* brtnode_pv, void* read_extraargs);
extern int toku_brtnode_pf_callback(void* brtnode_pv, void* read_extraargs, long* sizep);
extern int toku_brt_alloc_init_header(BRT t, TOKUTXN txn); extern int toku_brt_alloc_init_header(BRT t, TOKUTXN txn);
extern int toku_read_brt_header_and_store_in_cachefile (CACHEFILE cf, LSN max_acceptable_lsn, struct brt_header **header, BOOL* was_open); extern int toku_read_brt_header_and_store_in_cachefile (CACHEFILE cf, LSN max_acceptable_lsn, struct brt_header **header, BOOL* was_open);
extern CACHEKEY* toku_calculate_root_offset_pointer (BRT brt, u_int32_t *root_hash); extern CACHEKEY* toku_calculate_root_offset_pointer (BRT brt, u_int32_t *root_hash);
......
...@@ -77,8 +77,19 @@ int toku_testsetup_get_sersize(BRT brt, BLOCKNUM diskoff) // Return the size on ...@@ -77,8 +77,19 @@ int toku_testsetup_get_sersize(BRT brt, BLOCKNUM diskoff) // Return the size on
{ {
assert(testsetup_initialized); assert(testsetup_initialized);
void *node_v; void *node_v;
int r = toku_cachetable_get_and_pin(brt->cf, diskoff, toku_cachetable_hash(brt->cf, diskoff), &node_v, NULL, int r = toku_cachetable_get_and_pin(
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, toku_brtnode_pe_callback, brt); brt->cf, diskoff,
toku_cachetable_hash(brt->cf, diskoff),
&node_v,
NULL,
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
brt->h,
brt->h
);
assert(r==0); assert(r==0);
int size = toku_serialize_brtnode_size(node_v); int size = toku_serialize_brtnode_size(node_v);
toku_unpin_brtnode(brt, node_v); toku_unpin_brtnode(brt, node_v);
...@@ -91,8 +102,20 @@ int toku_testsetup_insert_to_leaf (BRT brt, BLOCKNUM blocknum, char *key, int ke ...@@ -91,8 +102,20 @@ int toku_testsetup_insert_to_leaf (BRT brt, BLOCKNUM blocknum, char *key, int ke
assert(testsetup_initialized); assert(testsetup_initialized);
r = toku_cachetable_get_and_pin(brt->cf, blocknum, toku_cachetable_hash(brt->cf, blocknum), &node_v, NULL, r = toku_cachetable_get_and_pin(
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, toku_brtnode_pe_callback, brt); brt->cf,
blocknum,
toku_cachetable_hash(brt->cf, blocknum),
&node_v,
NULL,
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
brt->h,
brt->h
);
if (r!=0) return r; if (r!=0) return r;
BRTNODE node=node_v; BRTNODE node=node_v;
toku_verify_or_set_counts(node); toku_verify_or_set_counts(node);
...@@ -146,8 +169,20 @@ int toku_testsetup_insert_to_nonleaf (BRT brt, BLOCKNUM blocknum, enum brt_msg_t ...@@ -146,8 +169,20 @@ int toku_testsetup_insert_to_nonleaf (BRT brt, BLOCKNUM blocknum, enum brt_msg_t
assert(testsetup_initialized); assert(testsetup_initialized);
r = toku_cachetable_get_and_pin(brt->cf, blocknum, toku_cachetable_hash(brt->cf, blocknum), &node_v, NULL, r = toku_cachetable_get_and_pin(
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, toku_brtnode_pe_callback, brt); brt->cf,
blocknum,
toku_cachetable_hash(brt->cf, blocknum),
&node_v,
NULL,
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
brt->h,
brt->h
);
if (r!=0) return r; if (r!=0) return r;
BRTNODE node=node_v; BRTNODE node=node_v;
assert(node->height>0); assert(node->height>0);
......
...@@ -113,8 +113,20 @@ toku_verify_brtnode (BRT brt, ...@@ -113,8 +113,20 @@ toku_verify_brtnode (BRT brt,
u_int32_t fullhash = toku_cachetable_hash(brt->cf, blocknum); u_int32_t fullhash = toku_cachetable_hash(brt->cf, blocknum);
{ {
int r = toku_cachetable_get_and_pin(brt->cf, blocknum, fullhash, &node_v, NULL, int r = toku_cachetable_get_and_pin(
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, toku_brtnode_pe_callback, brt->h); brt->cf,
blocknum,
fullhash,
&node_v,
NULL,
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
brt->h,
brt->h
);
assert_zero(r); // this is a bad failure if it happens. assert_zero(r); // this is a bad failure if it happens.
} }
//printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v); //printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v);
......
...@@ -260,7 +260,20 @@ int toku_pin_brtnode (BRT brt, BLOCKNUM blocknum, u_int32_t fullhash, ...@@ -260,7 +260,20 @@ int toku_pin_brtnode (BRT brt, BLOCKNUM blocknum, u_int32_t fullhash,
ANCESTORS ancestors, struct pivot_bounds const * const bounds, ANCESTORS ancestors, struct pivot_bounds const * const bounds,
BRTNODE *node_p) { BRTNODE *node_p) {
void *node_v; void *node_v;
int r = toku_cachetable_get_and_pin_nonblocking(brt->cf, blocknum, fullhash, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, toku_brtnode_pe_callback, brt->h, unlockers); int r = toku_cachetable_get_and_pin_nonblocking(
brt->cf,
blocknum,
fullhash,
&node_v,
NULL,
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
brt->h,
brt->h,
unlockers);
if (r==0) { if (r==0) {
BRTNODE node = node_v; BRTNODE node = node_v;
maybe_apply_ancestors_messages_to_node(brt, node, ancestors, bounds); maybe_apply_ancestors_messages_to_node(brt, node, ancestors, bounds);
...@@ -277,7 +290,20 @@ void toku_pin_brtnode_holding_lock (BRT brt, BLOCKNUM blocknum, u_int32_t fullha ...@@ -277,7 +290,20 @@ void toku_pin_brtnode_holding_lock (BRT brt, BLOCKNUM blocknum, u_int32_t fullha
ANCESTORS ancestors, struct pivot_bounds const * const bounds, ANCESTORS ancestors, struct pivot_bounds const * const bounds,
BRTNODE *node_p) { BRTNODE *node_p) {
void *node_v; void *node_v;
int r = toku_cachetable_get_and_pin(brt->cf, blocknum, fullhash, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, toku_brtnode_pe_callback, brt->h); int r = toku_cachetable_get_and_pin(
brt->cf,
blocknum,
fullhash,
&node_v,
NULL,
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
brt->h,
brt->h
);
assert(r==0); assert(r==0);
BRTNODE node = node_v; BRTNODE node = node_v;
maybe_apply_ancestors_messages_to_node(brt, node, ancestors, bounds); maybe_apply_ancestors_messages_to_node(brt, node, ancestors, bounds);
...@@ -392,7 +418,20 @@ toku_verify_estimates (BRT t, BRTNODE node) { ...@@ -392,7 +418,20 @@ toku_verify_estimates (BRT t, BRTNODE node) {
BLOCKNUM childblocknum = BP_BLOCKNUM(node, childnum); BLOCKNUM childblocknum = BP_BLOCKNUM(node, childnum);
u_int32_t fullhash = compute_child_fullhash(t->cf, node, childnum); u_int32_t fullhash = compute_child_fullhash(t->cf, node, childnum);
void *childnode_v; void *childnode_v;
int r = toku_cachetable_get_and_pin(t->cf, childblocknum, fullhash, &childnode_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, toku_brtnode_pe_callback, t->h); int r = toku_cachetable_get_and_pin(
t->cf,
childblocknum,
fullhash,
&childnode_v,
NULL,
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
t->h,
t->h
);
assert_zero(r); assert_zero(r);
BRTNODE childnode = childnode_v; BRTNODE childnode = childnode_v;
for (int i=0; i<childnode->n_children; i++) { for (int i=0; i<childnode->n_children; i++) {
...@@ -523,6 +562,21 @@ int toku_brtnode_pe_callback (void *brtnode_pv, long bytes_to_free, long* bytes_ ...@@ -523,6 +562,21 @@ int toku_brtnode_pe_callback (void *brtnode_pv, long bytes_to_free, long* bytes_
return 0; return 0;
} }
// callback that sates if partially reading a node is necessary
// could have just used toku_brtnode_fetch_callback, but wanted to separate the two cases to separate functions
BOOL toku_brtnode_pf_req_callback(void* UU(brtnode_pv), void* UU(read_extraargs)) {
// placeholder for now
return FALSE;
}
// callback for partially reading a node
// could have just used toku_brtnode_fetch_callback, but wanted to separate the two cases to separate functions
int toku_brtnode_pf_callback(void* UU(brtnode_pv), void* UU(read_extraargs), long* UU(sizep)) {
assert(FALSE);
}
static int static int
leafval_heaviside_le (u_int32_t klen, void *kval, leafval_heaviside_le (u_int32_t klen, void *kval,
struct cmd_leafval_heaviside_extra *be) struct cmd_leafval_heaviside_extra *be)
...@@ -778,7 +832,7 @@ brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, CACHEKEY *r ...@@ -778,7 +832,7 @@ brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, CACHEKEY *r
u_int32_t fullhash = toku_cachetable_hash(brt->cf, newroot_diskoff); u_int32_t fullhash = toku_cachetable_hash(brt->cf, newroot_diskoff);
newroot->fullhash = fullhash; newroot->fullhash = fullhash;
toku_cachetable_put(brt->cf, newroot_diskoff, fullhash, newroot, brtnode_memory_size(newroot), toku_cachetable_put(brt->cf, newroot_diskoff, fullhash, newroot, brtnode_memory_size(newroot),
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, toku_brtnode_pe_callback, brt->h); toku_brtnode_flush_callback, toku_brtnode_pe_callback, brt->h);
*newrootp = newroot; *newrootp = newroot;
} }
...@@ -800,7 +854,7 @@ void toku_create_new_brtnode (BRT t, BRTNODE *result, int height, int n_children ...@@ -800,7 +854,7 @@ void toku_create_new_brtnode (BRT t, BRTNODE *result, int height, int n_children
n->fullhash = fullhash; n->fullhash = fullhash;
r=toku_cachetable_put(t->cf, n->thisnodename, fullhash, r=toku_cachetable_put(t->cf, n->thisnodename, fullhash,
n, brtnode_memory_size(n), n, brtnode_memory_size(n),
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, toku_brtnode_pe_callback, t->h); toku_brtnode_flush_callback, toku_brtnode_pe_callback, t->h);
assert_zero(r); assert_zero(r);
} }
...@@ -1302,7 +1356,12 @@ brt_split_child (BRT t, BRTNODE node, int childnum, BOOL *did_react) ...@@ -1302,7 +1356,12 @@ brt_split_child (BRT t, BRTNODE node, int childnum, BOOL *did_react)
compute_child_fullhash(t->cf, node, childnum), compute_child_fullhash(t->cf, node, childnum),
&childnode_v, &childnode_v,
NULL, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, toku_brtnode_pe_callback, toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
t->h,
t->h); t->h);
assert(r==0); assert(r==0);
child = childnode_v; child = childnode_v;
...@@ -2212,16 +2271,39 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_react, ...@@ -2212,16 +2271,39 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_react,
{ {
void *childnode_v; void *childnode_v;
u_int32_t childfullhash = compute_child_fullhash(t->cf, node, childnuma); u_int32_t childfullhash = compute_child_fullhash(t->cf, node, childnuma);
int r = toku_cachetable_get_and_pin(t->cf, BP_BLOCKNUM(node, childnuma), childfullhash, &childnode_v, NULL, int r = toku_cachetable_get_and_pin(
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, toku_brtnode_pe_callback, t->h); t->cf,
BP_BLOCKNUM(node, childnuma),
childfullhash,
&childnode_v,
NULL,
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
t->h,
t->h
);
assert(r==0); assert(r==0);
childa = childnode_v; childa = childnode_v;
} }
{ {
void *childnode_v; void *childnode_v;
u_int32_t childfullhash = compute_child_fullhash(t->cf, node, childnumb); u_int32_t childfullhash = compute_child_fullhash(t->cf, node, childnumb);
int r = toku_cachetable_get_and_pin(t->cf, BP_BLOCKNUM(node, childnumb), childfullhash, &childnode_v, NULL, int r = toku_cachetable_get_and_pin(
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, toku_brtnode_pe_callback, t->h); t->cf,
BP_BLOCKNUM(node, childnumb),
childfullhash, &childnode_v,
NULL,
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
t->h,
t->h
);
assert(r==0); assert(r==0);
childb = childnode_v; childb = childnode_v;
} }
...@@ -3166,7 +3248,7 @@ static int setup_initial_brt_root_node (BRT t, BLOCKNUM blocknum) { ...@@ -3166,7 +3248,7 @@ static int setup_initial_brt_root_node (BRT t, BLOCKNUM blocknum) {
node->fullhash = fullhash; node->fullhash = fullhash;
r=toku_cachetable_put(t->cf, blocknum, fullhash, r=toku_cachetable_put(t->cf, blocknum, fullhash,
node, brtnode_memory_size(node), node, brtnode_memory_size(node),
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, toku_brtnode_pe_callback, t->h); toku_brtnode_flush_callback, toku_brtnode_pe_callback, t->h);
if (r!=0) { if (r!=0) {
toku_free(node); toku_free(node);
return r; return r;
...@@ -4864,8 +4946,18 @@ brt_node_maybe_prefetch(BRT brt, BRTNODE node, int childnum, BRT_CURSOR brtcurso ...@@ -4864,8 +4946,18 @@ brt_node_maybe_prefetch(BRT brt, BRTNODE node, int childnum, BRT_CURSOR brtcurso
break; break;
BLOCKNUM nextchildblocknum = BP_BLOCKNUM(node, nextchildnum); BLOCKNUM nextchildblocknum = BP_BLOCKNUM(node, nextchildnum);
u_int32_t nextfullhash = compute_child_fullhash(brt->cf, node, nextchildnum); u_int32_t nextfullhash = compute_child_fullhash(brt->cf, node, nextchildnum);
toku_cachefile_prefetch(brt->cf, nextchildblocknum, nextfullhash, toku_cachefile_prefetch(
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, toku_brtnode_pe_callback, brt->h); brt->cf,
nextchildblocknum,
nextfullhash,
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
brt->h,
brt->h
);
*doprefetch = FALSE; *doprefetch = FALSE;
} }
} }
...@@ -4992,7 +5084,7 @@ brt_search_node( ...@@ -4992,7 +5084,7 @@ brt_search_node(
ANCESTORS ancestors, ANCESTORS ancestors,
struct pivot_bounds const * const bounds struct pivot_bounds const * const bounds
) )
{ int r; { int r = 0;
int child_to_search = brt_search_which_child(brt, node, search); int child_to_search = brt_search_which_child(brt, node, search);
assert(child_to_search >= 0 || child_to_search < node->n_children); assert(child_to_search >= 0 || child_to_search < node->n_children);
// //
...@@ -5566,8 +5658,20 @@ static void toku_brt_keyrange_internal (BRT brt, CACHEKEY nodename, ...@@ -5566,8 +5658,20 @@ static void toku_brt_keyrange_internal (BRT brt, CACHEKEY nodename,
{ {
void *node_v; void *node_v;
//assert(fullhash == toku_cachetable_hash(brt->cf, nodename)); //assert(fullhash == toku_cachetable_hash(brt->cf, nodename));
int rr = toku_cachetable_get_and_pin(brt->cf, nodename, fullhash, int rr = toku_cachetable_get_and_pin(
&node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, toku_brtnode_pe_callback, brt->h); brt->cf,
nodename,
fullhash,
&node_v,
NULL,
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
brt->h,
brt->h
);
assert_zero(rr); assert_zero(rr);
node = node_v; node = node_v;
assert(node->fullhash==fullhash); assert(node->fullhash==fullhash);
...@@ -5640,9 +5744,20 @@ int toku_brt_stat64 (BRT brt, TOKUTXN UU(txn), struct brtstat64_s *s) { ...@@ -5640,9 +5744,20 @@ int toku_brt_stat64 (BRT brt, TOKUTXN UU(txn), struct brtstat64_s *s) {
CACHEKEY *rootp = toku_calculate_root_offset_pointer(brt, &fullhash); CACHEKEY *rootp = toku_calculate_root_offset_pointer(brt, &fullhash);
CACHEKEY root = *rootp; CACHEKEY root = *rootp;
void *node_v; void *node_v;
int r = toku_cachetable_get_and_pin(brt->cf, root, fullhash, int r = toku_cachetable_get_and_pin(
&node_v, NULL, brt->cf,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, toku_brtnode_pe_callback, brt->h); root,
fullhash,
&node_v,
NULL,
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
brt->h,
brt->h
);
if (r!=0) return r; if (r!=0) return r;
BRTNODE node = node_v; BRTNODE node = node_v;
...@@ -5667,9 +5782,20 @@ toku_dump_brtnode (FILE *file, BRT brt, BLOCKNUM blocknum, int depth, struct kv_ ...@@ -5667,9 +5782,20 @@ toku_dump_brtnode (FILE *file, BRT brt, BLOCKNUM blocknum, int depth, struct kv_
BRTNODE node; BRTNODE node;
void *node_v; void *node_v;
u_int32_t fullhash = toku_cachetable_hash(brt->cf, blocknum); u_int32_t fullhash = toku_cachetable_hash(brt->cf, blocknum);
int r = toku_cachetable_get_and_pin(brt->cf, blocknum, fullhash, int r = toku_cachetable_get_and_pin(
&node_v, NULL, brt->cf,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, toku_brtnode_pe_callback, brt->h); blocknum,
fullhash,
&node_v,
NULL,
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
brt->h,
brt->h
);
assert_zero(r); assert_zero(r);
node=node_v; node=node_v;
assert(node->fullhash==fullhash); assert(node->fullhash==fullhash);
...@@ -5971,7 +6097,20 @@ static BOOL is_empty_fast_iter (BRT brt, BRTNODE node) { ...@@ -5971,7 +6097,20 @@ static BOOL is_empty_fast_iter (BRT brt, BRTNODE node) {
void *node_v; void *node_v;
BLOCKNUM childblocknum = BP_BLOCKNUM(node,childnum); BLOCKNUM childblocknum = BP_BLOCKNUM(node,childnum);
u_int32_t fullhash = compute_child_fullhash(brt->cf, node, childnum); u_int32_t fullhash = compute_child_fullhash(brt->cf, node, childnum);
int rr = toku_cachetable_get_and_pin(brt->cf, childblocknum, fullhash, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, toku_brtnode_pe_callback, brt->h); int rr = toku_cachetable_get_and_pin(
brt->cf,
childblocknum,
fullhash,
&node_v,
NULL,
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
brt->h,
brt->h
);
assert(rr ==0); assert(rr ==0);
childnode = node_v; childnode = node_v;
} }
...@@ -6001,8 +6140,20 @@ BOOL toku_brt_is_empty_fast (BRT brt) ...@@ -6001,8 +6140,20 @@ BOOL toku_brt_is_empty_fast (BRT brt)
//assert(fullhash == toku_cachetable_hash(brt->cf, *rootp)); //assert(fullhash == toku_cachetable_hash(brt->cf, *rootp));
{ {
void *node_v; void *node_v;
int rr = toku_cachetable_get_and_pin(brt->cf, *rootp, fullhash, int rr = toku_cachetable_get_and_pin(
&node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, toku_brtnode_pe_callback, brt->h); brt->cf,
*rootp,
fullhash,
&node_v,
NULL,
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
brt->h,
brt->h
);
assert_zero(rr); assert_zero(rr);
node = node_v; node = node_v;
} }
......
...@@ -83,7 +83,7 @@ struct ctpair { ...@@ -83,7 +83,7 @@ struct ctpair {
CACHETABLE_FLUSH_CALLBACK flush_callback; CACHETABLE_FLUSH_CALLBACK flush_callback;
CACHETABLE_FETCH_CALLBACK fetch_callback; CACHETABLE_FETCH_CALLBACK fetch_callback;
CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback; CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback;
void *extraargs; void *write_extraargs;
PAIR next,prev; // In clock. PAIR next,prev; // In clock.
PAIR hash_chain; PAIR hash_chain;
...@@ -1064,13 +1064,13 @@ static void cachetable_maybe_remove_and_free_pair (CACHETABLE ct, PAIR p) { ...@@ -1064,13 +1064,13 @@ static void cachetable_maybe_remove_and_free_pair (CACHETABLE ct, PAIR p) {
CACHEFILE cachefile = p->cachefile; CACHEFILE cachefile = p->cachefile;
CACHEKEY key = p->key; CACHEKEY key = p->key;
void *value = p->value; void *value = p->value;
void *extraargs = p->extraargs; void *write_extraargs = p->write_extraargs;
long size = p->size; long size = p->size;
rwlock_prefer_read_lock(&cachefile->fdlock, ct->mutex); rwlock_prefer_read_lock(&cachefile->fdlock, ct->mutex);
cachetable_unlock(ct); cachetable_unlock(ct);
flush_callback(cachefile, cachefile->fd, key, value, extraargs, size, FALSE, FALSE, TRUE); flush_callback(cachefile, cachefile->fd, key, value, write_extraargs, size, FALSE, FALSE, TRUE);
cachetable_lock(ct); cachetable_lock(ct);
rwlock_read_unlock(&cachefile->fdlock); rwlock_read_unlock(&cachefile->fdlock);
...@@ -1086,12 +1086,17 @@ static void abort_fetch_pair(PAIR p) { ...@@ -1086,12 +1086,17 @@ static void abort_fetch_pair(PAIR p) {
} }
// Read a pair from a cachefile into memory using the pair's fetch callback // Read a pair from a cachefile into memory using the pair's fetch callback
static int cachetable_fetch_pair(CACHETABLE ct, CACHEFILE cf, PAIR p) { static int cachetable_fetch_pair(
CACHETABLE ct,
CACHEFILE cf,
PAIR p,
CACHETABLE_FETCH_CALLBACK fetch_callback,
void* read_extraargs
)
{
// helgrind // helgrind
CACHETABLE_FETCH_CALLBACK fetch_callback = p->fetch_callback;
CACHEKEY key = p->key; CACHEKEY key = p->key;
u_int32_t fullhash = p->fullhash; u_int32_t fullhash = p->fullhash;
void *extraargs = p->extraargs;
void *toku_value = 0; void *toku_value = 0;
long size = 0; long size = 0;
...@@ -1105,7 +1110,7 @@ static int cachetable_fetch_pair(CACHETABLE ct, CACHEFILE cf, PAIR p) { ...@@ -1105,7 +1110,7 @@ static int cachetable_fetch_pair(CACHETABLE ct, CACHEFILE cf, PAIR p) {
int r; int r;
if (toku_cachefile_is_dev_null_unlocked(cf)) r = -1; if (toku_cachefile_is_dev_null_unlocked(cf)) r = -1;
else r = fetch_callback(cf, cf->fd, key, fullhash, &toku_value, &size, &dirty, extraargs); else r = fetch_callback(cf, cf->fd, key, fullhash, &toku_value, &size, &dirty, read_extraargs);
if (dirty) if (dirty)
p->dirty = CACHETABLE_DIRTY; p->dirty = CACHETABLE_DIRTY;
...@@ -1116,6 +1121,7 @@ static int cachetable_fetch_pair(CACHETABLE ct, CACHEFILE cf, PAIR p) { ...@@ -1116,6 +1121,7 @@ static int cachetable_fetch_pair(CACHETABLE ct, CACHEFILE cf, PAIR p) {
cachetable_remove_pair(ct, p); cachetable_remove_pair(ct, p);
p->state = CTPAIR_INVALID; p->state = CTPAIR_INVALID;
if (p->cq) { if (p->cq) {
workitem_init(&p->asyncwork, NULL, p);
workqueue_enq(p->cq, &p->asyncwork, 1); workqueue_enq(p->cq, &p->asyncwork, 1);
return r; return r;
} }
...@@ -1126,6 +1132,7 @@ static int cachetable_fetch_pair(CACHETABLE ct, CACHEFILE cf, PAIR p) { ...@@ -1126,6 +1132,7 @@ static int cachetable_fetch_pair(CACHETABLE ct, CACHEFILE cf, PAIR p) {
p->size = size; p->size = size;
ct->size_current += size; ct->size_current += size;
if (p->cq) { if (p->cq) {
workitem_init(&p->asyncwork, NULL, p);
workqueue_enq(p->cq, &p->asyncwork, 1); workqueue_enq(p->cq, &p->asyncwork, 1);
return 0; return 0;
} }
...@@ -1152,7 +1159,7 @@ static void cachetable_write_pair(CACHETABLE ct, PAIR p, BOOL remove_me) { ...@@ -1152,7 +1159,7 @@ static void cachetable_write_pair(CACHETABLE ct, PAIR p, BOOL remove_me) {
CACHEFILE cachefile = p->cachefile; CACHEFILE cachefile = p->cachefile;
CACHEKEY key = p->key; CACHEKEY key = p->key;
void *value = p->value; void *value = p->value;
void *extraargs = p->extraargs; void *write_extraargs = p->write_extraargs;
long size = p->size; long size = p->size;
BOOL dowrite = (BOOL)(p->dirty); BOOL dowrite = (BOOL)(p->dirty);
BOOL for_checkpoint = p->checkpoint_pending; BOOL for_checkpoint = p->checkpoint_pending;
...@@ -1164,7 +1171,7 @@ static void cachetable_write_pair(CACHETABLE ct, PAIR p, BOOL remove_me) { ...@@ -1164,7 +1171,7 @@ static void cachetable_write_pair(CACHETABLE ct, PAIR p, BOOL remove_me) {
// write callback // write callback
if (toku_cachefile_is_dev_null_unlocked(cachefile)) dowrite = FALSE; if (toku_cachefile_is_dev_null_unlocked(cachefile)) dowrite = FALSE;
flush_callback(cachefile, cachefile->fd, key, value, extraargs, size, dowrite, TRUE, for_checkpoint); flush_callback(cachefile, cachefile->fd, key, value, write_extraargs, size, dowrite, TRUE, for_checkpoint);
cachetable_lock(ct); cachetable_lock(ct);
rwlock_read_unlock(&cachefile->fdlock); rwlock_read_unlock(&cachefile->fdlock);
...@@ -1269,9 +1276,9 @@ static int maybe_flush_some (CACHETABLE ct, long size) { ...@@ -1269,9 +1276,9 @@ static int maybe_flush_some (CACHETABLE ct, long size) {
rwlock_write_lock(&curr_in_clock->rwlock, ct->mutex); rwlock_write_lock(&curr_in_clock->rwlock, ct->mutex);
long size_remaining = (size + ct->size_current) - (ct->size_limit + unattainable_data); long size_remaining = (size + ct->size_current) - (ct->size_limit + unattainable_data);
void *value = curr_in_clock->value; void *value = curr_in_clock->value;
void *extraargs = curr_in_clock->extraargs; void *write_extraargs = curr_in_clock->write_extraargs;
long bytes_freed; long bytes_freed;
curr_in_clock->pe_callback(value, size_remaining, &bytes_freed, extraargs); curr_in_clock->pe_callback(value, size_remaining, &bytes_freed, write_extraargs);
assert(bytes_freed <= ct->size_current); assert(bytes_freed <= ct->size_current);
assert(bytes_freed <= curr_in_clock->size); assert(bytes_freed <= curr_in_clock->size);
ct->size_current -= bytes_freed; ct->size_current -= bytes_freed;
...@@ -1315,9 +1322,8 @@ static PAIR cachetable_insert_at(CACHETABLE ct, ...@@ -1315,9 +1322,8 @@ static PAIR cachetable_insert_at(CACHETABLE ct,
u_int32_t fullhash, u_int32_t fullhash,
long size, long size,
CACHETABLE_FLUSH_CALLBACK flush_callback, CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_FETCH_CALLBACK fetch_callback,
CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback, CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback,
void *extraargs, void *write_extraargs,
enum cachetable_dirty dirty) { enum cachetable_dirty dirty) {
PAIR MALLOC(p); PAIR MALLOC(p);
assert(p); assert(p);
...@@ -1331,9 +1337,8 @@ static PAIR cachetable_insert_at(CACHETABLE ct, ...@@ -1331,9 +1337,8 @@ static PAIR cachetable_insert_at(CACHETABLE ct,
p->size = size; p->size = size;
p->state = state; p->state = state;
p->flush_callback = flush_callback; p->flush_callback = flush_callback;
p->fetch_callback = fetch_callback;
p->pe_callback = pe_callback; p->pe_callback = pe_callback;
p->extraargs = extraargs; p->write_extraargs = write_extraargs;
p->fullhash = fullhash; p->fullhash = fullhash;
p->next = p->prev = 0; p->next = p->prev = 0;
rwlock_init(&p->rwlock); rwlock_init(&p->rwlock);
...@@ -1370,9 +1375,8 @@ note_hash_count (int count) { ...@@ -1370,9 +1375,8 @@ note_hash_count (int count) {
int toku_cachetable_put(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, void*value, long size, int toku_cachetable_put(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, void*value, long size,
CACHETABLE_FLUSH_CALLBACK flush_callback, CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_FETCH_CALLBACK fetch_callback,
CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback, CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback,
void *extraargs) { void *write_extraargs) {
WHEN_TRACE_CT(printf("%s:%d CT cachetable_put(%lld)=%p\n", __FILE__, __LINE__, key, value)); WHEN_TRACE_CT(printf("%s:%d CT cachetable_put(%lld)=%p\n", __FILE__, __LINE__, key, value));
CACHETABLE ct = cachefile->cachetable; CACHETABLE ct = cachefile->cachetable;
int count=0; int count=0;
...@@ -1385,7 +1389,6 @@ int toku_cachetable_put(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, v ...@@ -1385,7 +1389,6 @@ int toku_cachetable_put(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, v
// Semantically, these two asserts are not strictly right. After all, when are two functions eq? // Semantically, these two asserts are not strictly right. After all, when are two functions eq?
// In practice, the functions better be the same. // In practice, the functions better be the same.
assert(p->flush_callback==flush_callback); assert(p->flush_callback==flush_callback);
assert(p->fetch_callback==fetch_callback);
assert(p->pe_callback==pe_callback); assert(p->pe_callback==pe_callback);
rwlock_read_lock(&p->rwlock, ct->mutex); rwlock_read_lock(&p->rwlock, ct->mutex);
note_hash_count(count); note_hash_count(count);
...@@ -1401,7 +1404,19 @@ int toku_cachetable_put(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, v ...@@ -1401,7 +1404,19 @@ int toku_cachetable_put(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, v
} }
// flushing could change the table size, but wont' change the fullhash // flushing could change the table size, but wont' change the fullhash
cachetable_puts++; cachetable_puts++;
PAIR p = cachetable_insert_at(ct, cachefile, key, value, CTPAIR_IDLE, fullhash, size, flush_callback, fetch_callback, pe_callback, extraargs, CACHETABLE_DIRTY); PAIR p = cachetable_insert_at(
ct,
cachefile,
key,
value,
CTPAIR_IDLE,
fullhash,
size,
flush_callback,
pe_callback,
write_extraargs,
CACHETABLE_DIRTY
);
assert(p); assert(p);
rwlock_read_lock(&p->rwlock, ct->mutex); rwlock_read_lock(&p->rwlock, ct->mutex);
note_hash_count(count); note_hash_count(count);
...@@ -1456,11 +1471,21 @@ static CACHEKEY get_and_pin_key = {0}; ...@@ -1456,11 +1471,21 @@ static CACHEKEY get_and_pin_key = {0};
static u_int32_t get_and_pin_fullhash = 0; static u_int32_t get_and_pin_fullhash = 0;
int toku_cachetable_get_and_pin (CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, void**value, long *sizep, int toku_cachetable_get_and_pin (
CACHETABLE_FLUSH_CALLBACK flush_callback, CACHEFILE cachefile,
CACHETABLE_FETCH_CALLBACK fetch_callback, CACHEKEY key,
CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback, u_int32_t fullhash,
void *extraargs) { void**value,
long *sizep,
CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_FETCH_CALLBACK fetch_callback,
CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback,
CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback,
CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,
void* read_extraargs,
void* write_extraargs
)
{
CACHETABLE ct = cachefile->cachetable; CACHETABLE ct = cachefile->cachetable;
PAIR p; PAIR p;
int count=0; int count=0;
...@@ -1500,29 +1525,53 @@ int toku_cachetable_get_and_pin (CACHEFILE cachefile, CACHEKEY key, u_int32_t fu ...@@ -1500,29 +1525,53 @@ int toku_cachetable_get_and_pin (CACHEFILE cachefile, CACHEKEY key, u_int32_t fu
write_pair_for_checkpoint(ct, p, FALSE); write_pair_for_checkpoint(ct, p, FALSE);
} }
// still have the cachetable lock // still have the cachetable lock
//
// at this point, we know the node is at least partially in memory,
// but we do not know if the user requires a partial fetch (because
// some basement node is missing or some message buffer needs
// to be decompressed. So, we check to see if a partial fetch is required
//
get_and_pin_footprint = 7; get_and_pin_footprint = 7;
BOOL partial_fetch_required = pf_req_callback(p->value,read_extraargs);
//
// in this case, a partial fetch is required so we must grab the PAIR's write lock
// and then call a callback to retrieve what we need
//
if (partial_fetch_required) {
rwlock_write_lock(&p->rwlock, ct->mutex);
if (do_wait_time) {
cachetable_waittime += get_tnow() - t0;
}
t0 = get_tnow();
long size = 0;
int r = pf_callback(p->value, read_extraargs, &size);
lazy_assert_zero(r);
cachetable_waittime += get_tnow() - t0;
rwlock_write_unlock(&p->rwlock);
}
rwlock_read_lock(&p->rwlock, ct->mutex); rwlock_read_lock(&p->rwlock, ct->mutex);
if (do_wait_time) if (do_wait_time)
cachetable_waittime += get_tnow() - t0; cachetable_waittime += get_tnow() - t0;
get_and_pin_footprint = 8; get_and_pin_footprint = 8;
if (p->state == CTPAIR_INVALID) { if (p->state == CTPAIR_INVALID) {
get_and_pin_footprint = 9; get_and_pin_footprint = 9;
rwlock_read_unlock(&p->rwlock); rwlock_read_unlock(&p->rwlock);
if (rwlock_users(&p->rwlock) == 0) if (rwlock_users(&p->rwlock) == 0)
ctpair_destroy(p); ctpair_destroy(p);
cachetable_unlock(ct); cachetable_unlock(ct);
get_and_pin_footprint = 1001; get_and_pin_footprint = 1001;
return ENODEV; return ENODEV;
} }
pair_touch(p); pair_touch(p);
*value = p->value; *value = p->value;
if (sizep) *sizep = p->size; if (sizep) *sizep = p->size;
cachetable_hit++; cachetable_hit++;
note_hash_count(count); note_hash_count(count);
cachetable_unlock(ct); cachetable_unlock(ct);
WHEN_TRACE_CT(printf("%s:%d cachtable_get_and_pin(%lld)--> %p\n", __FILE__, __LINE__, key, *value)); WHEN_TRACE_CT(printf("%s:%d cachtable_get_and_pin(%lld)--> %p\n", __FILE__, __LINE__, key, *value));
get_and_pin_footprint = 1000; get_and_pin_footprint = 1000;
return 0; return 0;
} }
} }
get_and_pin_footprint = 9; get_and_pin_footprint = 9;
...@@ -1530,13 +1579,25 @@ int toku_cachetable_get_and_pin (CACHEFILE cachefile, CACHEKEY key, u_int32_t fu ...@@ -1530,13 +1579,25 @@ int toku_cachetable_get_and_pin (CACHEFILE cachefile, CACHEKEY key, u_int32_t fu
int r; int r;
// Note. hashit(t,key) may have changed as a result of flushing. But fullhash won't have changed. // Note. hashit(t,key) may have changed as a result of flushing. But fullhash won't have changed.
{ {
p = cachetable_insert_at(ct, cachefile, key, zero_value, CTPAIR_READING, fullhash, zero_size, flush_callback, fetch_callback, pe_callback, extraargs, CACHETABLE_CLEAN); p = cachetable_insert_at(
ct,
cachefile,
key,
zero_value,
CTPAIR_READING,
fullhash,
zero_size,
flush_callback,
pe_callback,
write_extraargs,
CACHETABLE_CLEAN
);
assert(p); assert(p);
get_and_pin_footprint = 10; get_and_pin_footprint = 10;
rwlock_write_lock(&p->rwlock, ct->mutex); rwlock_write_lock(&p->rwlock, ct->mutex);
uint64_t t0 = get_tnow(); uint64_t t0 = get_tnow();
r = cachetable_fetch_pair(ct, cachefile, p); r = cachetable_fetch_pair(ct, cachefile, p, fetch_callback, read_extraargs);
if (r) { if (r) {
cachetable_unlock(ct); cachetable_unlock(ct);
get_and_pin_footprint = 1002; get_and_pin_footprint = 1002;
...@@ -1691,12 +1752,21 @@ run_unlockers (UNLOCKERS unlockers) { ...@@ -1691,12 +1752,21 @@ run_unlockers (UNLOCKERS unlockers) {
} }
} }
int toku_cachetable_get_and_pin_nonblocking (CACHEFILE cf, CACHEKEY key, u_int32_t fullhash, void**value, long *sizep, int toku_cachetable_get_and_pin_nonblocking (
CACHETABLE_FLUSH_CALLBACK flush_callback, CACHEFILE cf,
CACHETABLE_FETCH_CALLBACK fetch_callback, CACHEKEY key,
CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback, u_int32_t fullhash,
void *extraargs, void**value,
UNLOCKERS unlockers) long *sizep,
CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_FETCH_CALLBACK fetch_callback,
CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback,
CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback,
CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,
void *read_extraargs,
void* write_extraargs,
UNLOCKERS unlockers
)
// Effect: If the block is in the cachetable, then pin it and return it. // Effect: If the block is in the cachetable, then pin it and return it.
// Otherwise call the lock_unlock_callback (to unlock), fetch the data (but don't pin it, since we'll just end up pinning it again later), and the call (to lock) // Otherwise call the lock_unlock_callback (to unlock), fetch the data (but don't pin it, since we'll just end up pinning it again later), and the call (to lock)
// and return TOKUDB_TRY_AGAIN. // and return TOKUDB_TRY_AGAIN.
...@@ -1735,13 +1805,32 @@ int toku_cachetable_get_and_pin_nonblocking (CACHEFILE cf, CACHEKEY key, u_int32 ...@@ -1735,13 +1805,32 @@ int toku_cachetable_get_and_pin_nonblocking (CACHEFILE cf, CACHEKEY key, u_int32
if (ct->ydb_lock_callback) ct->ydb_lock_callback(); if (ct->ydb_lock_callback) ct->ydb_lock_callback();
return TOKUDB_TRY_AGAIN; return TOKUDB_TRY_AGAIN;
case CTPAIR_IDLE: case CTPAIR_IDLE:
rwlock_read_lock(&p->rwlock, ct->mutex); {
pair_touch(p); BOOL partial_fetch_required = pf_req_callback(p->value,read_extraargs);
*value = p->value; //
if (sizep) *sizep = p->size; // in this case, a partial fetch is required so we must grab the PAIR's write lock
cachetable_hit++; // and then call a callback to retrieve what we need
cachetable_unlock(ct); //
return 0; if (partial_fetch_required) {
run_unlockers(unlockers); // The contract says the unlockers are run with the ct lock being held.
if (ct->ydb_unlock_callback) ct->ydb_unlock_callback();
// Now wait for the I/O to occur.
rwlock_write_lock(&p->rwlock, ct->mutex);
long size = 0;
int r = pf_callback(p->value, read_extraargs, &size);
lazy_assert_zero(r);
rwlock_write_unlock(&p->rwlock);
cachetable_unlock(ct);
return TOKUDB_TRY_AGAIN;
}
rwlock_read_lock(&p->rwlock, ct->mutex);
pair_touch(p);
*value = p->value;
if (sizep) *sizep = p->size;
cachetable_hit++;
cachetable_unlock(ct);
return 0;
}
} }
assert(0); // cannot get here assert(0); // cannot get here
} }
...@@ -1749,25 +1838,46 @@ int toku_cachetable_get_and_pin_nonblocking (CACHEFILE cf, CACHEKEY key, u_int32 ...@@ -1749,25 +1838,46 @@ int toku_cachetable_get_and_pin_nonblocking (CACHEFILE cf, CACHEKEY key, u_int32
assert(p==0); assert(p==0);
// Not found // Not found
p = cachetable_insert_at(ct, cf, key, zero_value, CTPAIR_READING, fullhash, zero_size, flush_callback, fetch_callback, pe_callback, extraargs, CACHETABLE_CLEAN); p = cachetable_insert_at(ct, cf, key, zero_value, CTPAIR_READING, fullhash, zero_size, flush_callback, pe_callback, write_extraargs, CACHETABLE_CLEAN);
assert(p); assert(p);
rwlock_write_lock(&p->rwlock, ct->mutex); rwlock_write_lock(&p->rwlock, ct->mutex);
run_unlockers(unlockers); // we hold the ct mutex. run_unlockers(unlockers); // we hold the ct mutex.
if (ct->ydb_unlock_callback) ct->ydb_unlock_callback(); if (ct->ydb_unlock_callback) ct->ydb_unlock_callback();
int r = cachetable_fetch_pair(ct, cf, p); int r = cachetable_fetch_pair(ct, cf, p, fetch_callback, read_extraargs);
cachetable_unlock(ct); cachetable_unlock(ct);
if (ct->ydb_lock_callback) ct->ydb_lock_callback(); if (ct->ydb_lock_callback) ct->ydb_lock_callback();
if (r!=0) return r; if (r!=0) return r;
else return TOKUDB_TRY_AGAIN; else return TOKUDB_TRY_AGAIN;
} }
struct cachefile_prefetch_args {
PAIR p;
CACHETABLE_FETCH_CALLBACK fetch_callback;
void* read_extraargs;
};
//
// PREFETCHING DOES NOT WORK IN MAXWELL AS OF NOW!
//
int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash, int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
CACHETABLE_FLUSH_CALLBACK flush_callback, CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_FETCH_CALLBACK fetch_callback, CACHETABLE_FETCH_CALLBACK fetch_callback,
CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback, CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback,
void *extraargs) CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback __attribute__((unused)),
CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback __attribute__((unused)),
void *read_extraargs,
void *write_extraargs)
// Effect: See the documentation for this function in cachetable.h // Effect: See the documentation for this function in cachetable.h
{ {
// TODO: Fix prefetching, as part of ticket 3635
// Here is the cachetable's reason why we are not doing prefetching in Maxwell.
// The fetch_callback requires data that is only valid in the caller's thread,
// namely, a struct that the caller allocates that contains information
// on what pieces of the node will be needed. This data is not necessarily
// valid when the prefetch thread gets around to trying to prefetch the node
// If we pass this data to another thread, we need a mechanism for freeing it.
// It may be another callback. That is way too many callbacks that are being used
// Fixing this in a clean, simple way requires some thought.
if (0) printf("%s:%d %"PRId64"\n", __FUNCTION__, __LINE__, key.b); if (0) printf("%s:%d %"PRId64"\n", __FUNCTION__, __LINE__, key.b);
CACHETABLE ct = cf->cachetable; CACHETABLE ct = cf->cachetable;
cachetable_lock(ct); cachetable_lock(ct);
...@@ -1784,10 +1894,14 @@ int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash, ...@@ -1784,10 +1894,14 @@ int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
// if not found then create a pair in the READING state and fetch it // if not found then create a pair in the READING state and fetch it
if (p == 0) { if (p == 0) {
cachetable_prefetches++; cachetable_prefetches++;
p = cachetable_insert_at(ct, cf, key, zero_value, CTPAIR_READING, fullhash, zero_size, flush_callback, fetch_callback, pe_callback, extraargs, CACHETABLE_CLEAN); p = cachetable_insert_at(ct, cf, key, zero_value, CTPAIR_READING, fullhash, zero_size, flush_callback, pe_callback, write_extraargs, CACHETABLE_CLEAN);
assert(p); assert(p);
rwlock_write_lock(&p->rwlock, ct->mutex); rwlock_write_lock(&p->rwlock, ct->mutex);
workitem_init(&p->asyncwork, cachetable_reader, p); struct cachefile_prefetch_args *cpargs = toku_xmalloc(sizeof(struct cachefile_prefetch_args));
cpargs->p = p;
cpargs->fetch_callback = fetch_callback;
cpargs->read_extraargs = read_extraargs;
workitem_init(&p->asyncwork, cachetable_reader, cpargs);
workqueue_enq(&ct->wq, &p->asyncwork, 0); workqueue_enq(&ct->wq, &p->asyncwork, 0);
} }
cachetable_unlock(ct); cachetable_unlock(ct);
...@@ -2442,11 +2556,21 @@ static void cachetable_writer(WORKITEM wi) { ...@@ -2442,11 +2556,21 @@ static void cachetable_writer(WORKITEM wi) {
// Worker thread function to read a pair from a cachefile to memory // Worker thread function to read a pair from a cachefile to memory
static void cachetable_reader(WORKITEM wi) { static void cachetable_reader(WORKITEM wi) {
PAIR p = workitem_arg(wi); struct cachefile_prefetch_args* cpargs = workitem_arg(wi);
CACHETABLE ct = p->cachefile->cachetable; CACHETABLE ct = cpargs->p->cachefile->cachetable;
cachetable_lock(ct); cachetable_lock(ct);
cachetable_fetch_pair(ct, p->cachefile, p); // TODO: find a way to properly pass some information for read_extraargs
// This is only called in toku_cachefile_prefetch, by putting it on a workqueue
// The problem is described in comments in toku_cachefile_prefetch
cachetable_fetch_pair(
ct,
cpargs->p->cachefile,
cpargs->p,
cpargs->fetch_callback,
cpargs->read_extraargs
);
cachetable_unlock(ct); cachetable_unlock(ct);
toku_free(cpargs);
} }
......
...@@ -114,16 +114,28 @@ int toku_cachefile_fsync(CACHEFILE cf); ...@@ -114,16 +114,28 @@ int toku_cachefile_fsync(CACHEFILE cf);
// When for_checkpoint is true, this was a 'pending' write // When for_checkpoint is true, this was a 'pending' write
// Returns: 0 if success, otherwise an error number. // Returns: 0 if success, otherwise an error number.
// Can access fd (fd is protected by a readlock during call) // Can access fd (fd is protected by a readlock during call)
typedef void (*CACHETABLE_FLUSH_CALLBACK)(CACHEFILE, int fd, CACHEKEY key, void *value, void *extraargs, long size, BOOL write_me, BOOL keep_me, BOOL for_checkpoint); typedef void (*CACHETABLE_FLUSH_CALLBACK)(CACHEFILE, int fd, CACHEKEY key, void *value, void *write_extraargs, long size, BOOL write_me, BOOL keep_me, BOOL for_checkpoint);
// The fetch callback is called when a thread is attempting to get and pin a memory // The fetch callback is called when a thread is attempting to get and pin a memory
// object and it is not in the cachetable. // object and it is not in the cachetable.
// Returns: 0 if success, otherwise an error number. The address and size of the object // Returns: 0 if success, otherwise an error number. The address and size of the object
// associated with the key are returned. // associated with the key are returned.
// Can access fd (fd is protected by a readlock during call) // Can access fd (fd is protected by a readlock during call)
typedef int (*CACHETABLE_FETCH_CALLBACK)(CACHEFILE, int fd, CACHEKEY key, u_int32_t fullhash, void **value, long *sizep, int *dirtyp, void *extraargs); typedef int (*CACHETABLE_FETCH_CALLBACK)(CACHEFILE, int fd, CACHEKEY key, u_int32_t fullhash, void **value, long *sizep, int *dirtyp, void *read_extraargs);
typedef int (*CACHETABLE_PARTIAL_EVICTION_CALLBACK)(void *brtnode_pv, long bytes_to_free, long* bytes_freed, void *extraargs); typedef int (*CACHETABLE_PARTIAL_EVICTION_CALLBACK)(void *brtnode_pv, long bytes_to_free, long* bytes_freed, void *write_extraargs);
// This callback is called by the cachetable to ask if a partial fetch is required of brtnode_pv. If a partial fetch
// is required, then CACHETABLE_PARTIAL_FETCH_CALLBACK is called (possibly with ydb lock released). The reason
// this callback exists instead of just doing the same functionality in CACHETABLE_PARTIAL_FETCH_CALLBACK
// is so that we can call this cheap function with the ydb lock held, in the hopes of avoiding the more expensive sequence
// of releasing the ydb lock, calling the partial_fetch_callback, reading nothing, reacquiring the ydb lock
typedef BOOL (*CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK)(void *brtnode_pv, void *read_extraargs);
// The partial fetch callback is called when a thread needs to read a subset of a PAIR into memory
// Returns: 0 if success, otherwise an error number.
// The number of bytes added is returned in sizep
typedef int (*CACHETABLE_PARTIAL_FETCH_CALLBACK)(void *brtnode_pv, void *read_extraargs, long *sizep);
void toku_cachefile_set_userdata(CACHEFILE cf, void *userdata, void toku_cachefile_set_userdata(CACHEFILE cf, void *userdata,
int (*log_fassociate_during_checkpoint)(CACHEFILE, void*), int (*log_fassociate_during_checkpoint)(CACHEFILE, void*),
...@@ -154,9 +166,8 @@ CACHETABLE toku_cachefile_get_cachetable(CACHEFILE cf); ...@@ -154,9 +166,8 @@ CACHETABLE toku_cachefile_get_cachetable(CACHEFILE cf);
int toku_cachetable_put(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash, int toku_cachetable_put(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
void *value, long size, void *value, long size,
CACHETABLE_FLUSH_CALLBACK flush_callback, CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_FETCH_CALLBACK fetch_callback,
CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback, CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback,
void *extraargs void *write_extraargs
); );
// Get and pin a memory object. // Get and pin a memory object.
...@@ -164,12 +175,20 @@ int toku_cachetable_put(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash, ...@@ -164,12 +175,20 @@ int toku_cachetable_put(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
// Otherwise, fetch it from storage by calling the fetch callback. If the fetch // Otherwise, fetch it from storage by calling the fetch callback. If the fetch
// succeeded, add the memory object to the cachetable with a read lock on it. // succeeded, add the memory object to the cachetable with a read lock on it.
// Returns: 0 if the memory object is in memory, otherwise an error number. // Returns: 0 if the memory object is in memory, otherwise an error number.
int toku_cachetable_get_and_pin(CACHEFILE, CACHEKEY, u_int32_t /*fullhash*/, int toku_cachetable_get_and_pin (
void **/*value*/, long *sizep, CACHEFILE cachefile,
CACHETABLE_FLUSH_CALLBACK flush_callback, CACHEKEY key,
CACHETABLE_FETCH_CALLBACK fetch_callback, u_int32_t fullhash,
CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback, void**value,
void *extraargs); long *sizep,
CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_FETCH_CALLBACK fetch_callback,
CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback,
CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback __attribute__((unused)),
CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback __attribute__((unused)),
void* read_extraargs,
void* write_extraargs
);
typedef struct unlockers *UNLOCKERS; typedef struct unlockers *UNLOCKERS;
struct unlockers { struct unlockers {
...@@ -182,12 +201,22 @@ struct unlockers { ...@@ -182,12 +201,22 @@ struct unlockers {
// Effect: If the block is in the cachetable, then return it. // Effect: If the block is in the cachetable, then return it.
// Otherwise call the release_lock_callback, call the functions in unlockers, fetch the data (but don't pin it, since we'll just end up pinning it again later), // Otherwise call the release_lock_callback, call the functions in unlockers, fetch the data (but don't pin it, since we'll just end up pinning it again later),
// and return TOKU_DB_TRYAGAIN. // and return TOKU_DB_TRYAGAIN.
int toku_cachetable_get_and_pin_nonblocking (CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, void**value, long *sizep, int toku_cachetable_get_and_pin_nonblocking (
CACHETABLE_FLUSH_CALLBACK flush_callback, CACHEFILE cf,
CACHETABLE_FETCH_CALLBACK fetch_callback, CACHEKEY key,
CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback, u_int32_t fullhash,
void *extraargs, void**value,
UNLOCKERS unlockers); long *sizep,
CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_FETCH_CALLBACK fetch_callback,
CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback,
CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback __attribute__((unused)),
CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback __attribute__((unused)),
void *read_extraargs,
void* write_extraargs,
UNLOCKERS unlockers
);
#define CAN_RELEASE_LOCK_DURING_IO #define CAN_RELEASE_LOCK_DURING_IO
int toku_cachetable_maybe_get_and_pin (CACHEFILE, CACHEKEY, u_int32_t /*fullhash*/, void**); int toku_cachetable_maybe_get_and_pin (CACHEFILE, CACHEKEY, u_int32_t /*fullhash*/, void**);
...@@ -227,8 +256,11 @@ int toku_cachetable_unpin_and_remove (CACHEFILE, CACHEKEY); /* Removing somethin ...@@ -227,8 +256,11 @@ int toku_cachetable_unpin_and_remove (CACHEFILE, CACHEKEY); /* Removing somethin
int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash, int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
CACHETABLE_FLUSH_CALLBACK flush_callback, CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_FETCH_CALLBACK fetch_callback, CACHETABLE_FETCH_CALLBACK fetch_callback,
CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback, CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback,
void *extraargs); CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback __attribute__((unused)),
CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback __attribute__((unused)),
void *read_extraargs,
void *write_extraargs);
// Effect: Prefetch a memory object for a given key into the cachetable // Effect: Prefetch a memory object for a given key into the cachetable
// Precondition: The cachetable mutex is NOT held. // Precondition: The cachetable mutex is NOT held.
// Postcondition: The cachetable mutex is NOT held. // Postcondition: The cachetable mutex is NOT held.
......
...@@ -96,7 +96,7 @@ toku_apply_txn (TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn, ...@@ -96,7 +96,7 @@ toku_apply_txn (TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn,
r = toku_maybe_prefetch_older_rollback_log(txn, log); r = toku_maybe_prefetch_older_rollback_log(txn, log);
assert(r==0); assert(r==0);
last_sequence = log->sequence; last_sequence = log->sequence;
if (func) { if (func) {
while ((item=log->newest_logentry)) { while ((item=log->newest_logentry)) {
...@@ -523,6 +523,14 @@ static int toku_rollback_pe_callback ( ...@@ -523,6 +523,14 @@ static int toku_rollback_pe_callback (
*bytes_freed = 0; *bytes_freed = 0;
return 0; return 0;
} }
static BOOL toku_rollback_pf_req_callback(void* UU(brtnode_pv), void* UU(read_extraargs)) {
return FALSE;
}
static int toku_rollback_pf_callback(void* UU(brtnode_pv), void* UU(read_extraargs), long* UU(sizep)) {
// should never be called, given that toku_rollback_pf_req_callback always returns false
assert(FALSE);
}
...@@ -553,7 +561,6 @@ static int toku_create_new_rollback_log (TOKUTXN txn, BLOCKNUM older, uint32_t o ...@@ -553,7 +561,6 @@ static int toku_create_new_rollback_log (TOKUTXN txn, BLOCKNUM older, uint32_t o
r=toku_cachetable_put(cf, log->thislogname, log->thishash, r=toku_cachetable_put(cf, log->thislogname, log->thishash,
log, rollback_memory_size(log), log, rollback_memory_size(log),
toku_rollback_flush_callback, toku_rollback_flush_callback,
toku_rollback_fetch_callback,
toku_rollback_pe_callback, toku_rollback_pe_callback,
h); h);
assert(r==0); assert(r==0);
...@@ -760,6 +767,7 @@ int toku_txn_find_by_xid (BRT brt, TXNID xid, TOKUTXN *txnptr) { ...@@ -760,6 +767,7 @@ int toku_txn_find_by_xid (BRT brt, TXNID xid, TOKUTXN *txnptr) {
int int
toku_maybe_prefetch_older_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE log) { toku_maybe_prefetch_older_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
//Currently processing 'log'. Prefetch the next (older) log node. //Currently processing 'log'. Prefetch the next (older) log node.
BLOCKNUM name = log->older; BLOCKNUM name = log->older;
int r = 0; int r = 0;
if (name.b != ROLLBACK_NONE.b) { if (name.b != ROLLBACK_NONE.b) {
...@@ -770,6 +778,9 @@ toku_maybe_prefetch_older_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE log) { ...@@ -770,6 +778,9 @@ toku_maybe_prefetch_older_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
toku_rollback_flush_callback, toku_rollback_flush_callback,
toku_rollback_fetch_callback, toku_rollback_fetch_callback,
toku_rollback_pe_callback, toku_rollback_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
h,
h); h);
assert(r==0); assert(r==0);
} }
...@@ -796,6 +807,9 @@ int toku_get_and_pin_rollback_log(TOKUTXN txn, TXNID xid, uint64_t sequence, BLO ...@@ -796,6 +807,9 @@ int toku_get_and_pin_rollback_log(TOKUTXN txn, TXNID xid, uint64_t sequence, BLO
toku_rollback_flush_callback, toku_rollback_flush_callback,
toku_rollback_fetch_callback, toku_rollback_fetch_callback,
toku_rollback_pe_callback, toku_rollback_pe_callback,
toku_rollback_pf_req_callback,
toku_rollback_pf_callback,
h,
h); h);
assert(r==0); assert(r==0);
log = (ROLLBACK_LOG_NODE)log_v; log = (ROLLBACK_LOG_NODE)log_v;
......
...@@ -47,6 +47,14 @@ pe_callback ( ...@@ -47,6 +47,14 @@ pe_callback (
return 0; return 0;
} }
static BOOL pf_req_callback(void* UU(brtnode_pv), void* UU(read_extraargs)) {
return FALSE;
}
static int pf_callback(void* UU(brtnode_pv), void* UU(read_extraargs), long* UU(sizep)) {
assert(FALSE);
}
static void static void
cachetable_test (void) { cachetable_test (void) {
...@@ -62,11 +70,11 @@ cachetable_test (void) { ...@@ -62,11 +70,11 @@ cachetable_test (void) {
void* v1; void* v1;
void* v2; void* v2;
long s1, s2; long s1, s2;
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, flush, fetch, pe_callback, NULL); r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, flush, fetch, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_DIRTY, 8); r = toku_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_DIRTY, 8);
r = toku_cachetable_get_and_pin(f1, make_blocknum(2), 2, &v2, &s2, flush, fetch, pe_callback, NULL); r = toku_cachetable_get_and_pin(f1, make_blocknum(2), 2, &v2, &s2, flush, fetch, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
// usleep (2*1024*1024); // usleep (2*1024*1024);
//r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, flush, fetch, pe_callback, NULL); //r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, flush, fetch, pe_callback, pf_req_callback, pf_callback, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(2), 2, CACHETABLE_CLEAN, 8); r = toku_cachetable_unpin(f1, make_blocknum(2), 2, CACHETABLE_CLEAN, 8);
......
...@@ -64,6 +64,14 @@ pe_callback ( ...@@ -64,6 +64,14 @@ pe_callback (
return 0; return 0;
} }
static BOOL pf_req_callback(void* UU(brtnode_pv), void* UU(read_extraargs)) {
// placeholder for now
return FALSE;
}
static int pf_callback(void* UU(brtnode_pv), void* UU(read_extraargs), long* UU(sizep)) {
assert(FALSE);
}
static void* static void*
do_update (void *UU(ignore)) do_update (void *UU(ignore))
...@@ -75,7 +83,7 @@ do_update (void *UU(ignore)) ...@@ -75,7 +83,7 @@ do_update (void *UU(ignore))
u_int32_t hi = toku_cachetable_hash(cf, key); u_int32_t hi = toku_cachetable_hash(cf, key);
void *vv; void *vv;
long size; long size;
int r = toku_cachetable_get_and_pin(cf, key, hi, &vv, &size, flush, fetch, pe_callback, 0); int r = toku_cachetable_get_and_pin(cf, key, hi, &vv, &size, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0);
//printf("g"); //printf("g");
assert(r==0); assert(r==0);
assert(size==sizeof(int)); assert(size==sizeof(int));
...@@ -124,7 +132,7 @@ static void checkpoint_pending(void) { ...@@ -124,7 +132,7 @@ static void checkpoint_pending(void) {
CACHEKEY key = make_blocknum(i); CACHEKEY key = make_blocknum(i);
u_int32_t hi = toku_cachetable_hash(cf, key); u_int32_t hi = toku_cachetable_hash(cf, key);
values[i] = 42; values[i] = 42;
r = toku_cachetable_put(cf, key, hi, &values[i], sizeof(int), flush, fetch, pe_callback, 0); r = toku_cachetable_put(cf, key, hi, &values[i], sizeof(int), flush, pe_callback, 0);
assert(r == 0); assert(r == 0);
r = toku_cachetable_unpin(cf, key, hi, CACHETABLE_DIRTY, item_size); r = toku_cachetable_unpin(cf, key, hi, CACHETABLE_DIRTY, item_size);
......
...@@ -19,16 +19,6 @@ static void flush(CACHEFILE cf, int UU(fd), CACHEKEY key, void *value, void *ext ...@@ -19,16 +19,6 @@ static void flush(CACHEFILE cf, int UU(fd), CACHEKEY key, void *value, void *ext
if (keep_me) n_keep_me++; if (keep_me) n_keep_me++;
} }
static int fetch(CACHEFILE cf, int UU(fd), CACHEKEY key, u_int32_t fullhash, void **value, long *sizep, int *dirtyp, void *extraargs) {
cf = cf; key = key; fullhash = fullhash; value = value; sizep = sizep; extraargs = extraargs;
assert(0); // should not be called
n_fetch++;
*value = 0;
*sizep = item_size;
*dirtyp = 0;
return 0;
}
static int static int
pe_callback ( pe_callback (
void *brtnode_pv __attribute__((__unused__)), void *brtnode_pv __attribute__((__unused__)),
...@@ -82,7 +72,7 @@ static void cachetable_checkpoint_test(int n, enum cachetable_dirty dirty) { ...@@ -82,7 +72,7 @@ static void cachetable_checkpoint_test(int n, enum cachetable_dirty dirty) {
for (i=0; i<n; i++) { for (i=0; i<n; i++) {
CACHEKEY key = make_blocknum(i); CACHEKEY key = make_blocknum(i);
u_int32_t hi = toku_cachetable_hash(f1, key); u_int32_t hi = toku_cachetable_hash(f1, key);
r = toku_cachetable_put(f1, key, hi, (void *)(long)i, 1, flush, fetch, pe_callback, 0); r = toku_cachetable_put(f1, key, hi, (void *)(long)i, 1, flush, pe_callback, 0);
assert(r == 0); assert(r == 0);
r = toku_cachetable_unpin(f1, key, hi, dirty, item_size); r = toku_cachetable_unpin(f1, key, hi, dirty, item_size);
......
...@@ -43,22 +43,6 @@ fetch (CACHEFILE f __attribute__((__unused__)), ...@@ -43,22 +43,6 @@ fetch (CACHEFILE f __attribute__((__unused__)),
return 0; return 0;
} }
static int
big_fetch (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
u_int32_t fullhash __attribute__((__unused__)),
void **value __attribute__((__unused__)),
long *sizep __attribute__((__unused__)),
int *dirtyp,
void *extraargs __attribute__((__unused__))
) {
*dirtyp = 0;
*value = NULL;
*sizep = 4;
return 0;
}
static int static int
pe_callback ( pe_callback (
void *brtnode_pv __attribute__((__unused__)), void *brtnode_pv __attribute__((__unused__)),
...@@ -71,6 +55,14 @@ pe_callback ( ...@@ -71,6 +55,14 @@ pe_callback (
return 0; return 0;
} }
static BOOL pf_req_callback(void* UU(brtnode_pv), void* UU(read_extraargs)) {
return FALSE;
}
static int pf_callback(void* UU(brtnode_pv), void* UU(read_extraargs), long* UU(sizep)) {
assert(FALSE);
}
static void static void
cachetable_test (void) { cachetable_test (void) {
...@@ -90,24 +82,24 @@ cachetable_test (void) { ...@@ -90,24 +82,24 @@ cachetable_test (void) {
flush_may_occur = FALSE; flush_may_occur = FALSE;
check_flush = TRUE; check_flush = TRUE;
for (int i = 0; i < 100000; i++) { for (int i = 0; i < 100000; i++) {
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, flush, fetch, pe_callback, NULL); r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, flush, fetch, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, 1); r = toku_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, 1);
} }
for (int i = 0; i < 8; i++) { for (int i = 0; i < 8; i++) {
r = toku_cachetable_get_and_pin(f1, make_blocknum(2), 2, &v2, &s2, flush, fetch, pe_callback, NULL); r = toku_cachetable_get_and_pin(f1, make_blocknum(2), 2, &v2, &s2, flush, fetch, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(2), 2, CACHETABLE_CLEAN, 1); r = toku_cachetable_unpin(f1, make_blocknum(2), 2, CACHETABLE_CLEAN, 1);
} }
for (int i = 0; i < 4; i++) { for (int i = 0; i < 4; i++) {
r = toku_cachetable_get_and_pin(f1, make_blocknum(3), 3, &v2, &s2, flush, fetch, pe_callback, NULL); r = toku_cachetable_get_and_pin(f1, make_blocknum(3), 3, &v2, &s2, flush, fetch, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(3), 3, CACHETABLE_CLEAN, 1); r = toku_cachetable_unpin(f1, make_blocknum(3), 3, CACHETABLE_CLEAN, 1);
} }
for (int i = 0; i < 2; i++) { for (int i = 0; i < 2; i++) {
r = toku_cachetable_get_and_pin(f1, make_blocknum(4), 4, &v2, &s2, flush, fetch, pe_callback, NULL); r = toku_cachetable_get_and_pin(f1, make_blocknum(4), 4, &v2, &s2, flush, fetch, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(4), 4, CACHETABLE_CLEAN, 1); r = toku_cachetable_unpin(f1, make_blocknum(4), 4, CACHETABLE_CLEAN, 1);
} }
flush_may_occur = TRUE; flush_may_occur = TRUE;
expected_flushed_key = 4; expected_flushed_key = 4;
r = toku_cachetable_put(f1, make_blocknum(5), 5, NULL, 4, flush, big_fetch, pe_callback, NULL); r = toku_cachetable_put(f1, make_blocknum(5), 5, NULL, 4, flush, pe_callback, NULL);
flush_may_occur = TRUE; flush_may_occur = TRUE;
expected_flushed_key = 5; expected_flushed_key = 5;
r = toku_cachetable_unpin(f1, make_blocknum(5), 5, CACHETABLE_CLEAN, 4); r = toku_cachetable_unpin(f1, make_blocknum(5), 5, CACHETABLE_CLEAN, 4);
......
...@@ -54,20 +54,6 @@ other_flush (CACHEFILE f __attribute__((__unused__)), ...@@ -54,20 +54,6 @@ other_flush (CACHEFILE f __attribute__((__unused__)),
) { ) {
} }
static int
other_fetch (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
u_int32_t fullhash __attribute__((__unused__)),
void **value __attribute__((__unused__)),
long *sizep __attribute__((__unused__)),
int *dirtyp,
void *extraargs __attribute__((__unused__))
) {
*dirtyp = 0;
return 0;
}
static int static int
pe_callback ( pe_callback (
void *brtnode_pv, void *brtnode_pv,
...@@ -95,6 +81,13 @@ other_pe_callback ( ...@@ -95,6 +81,13 @@ other_pe_callback (
{ {
return 0; return 0;
} }
static BOOL pf_req_callback(void* UU(brtnode_pv), void* UU(read_extraargs)) {
return FALSE;
}
static int pf_callback(void* UU(brtnode_pv), void* UU(read_extraargs), long* UU(sizep)) {
assert(FALSE);
}
static void static void
...@@ -113,24 +106,24 @@ cachetable_test (void) { ...@@ -113,24 +106,24 @@ cachetable_test (void) {
long s1, s2; long s1, s2;
flush_may_occur = FALSE; flush_may_occur = FALSE;
for (int i = 0; i < 100000; i++) { for (int i = 0; i < 100000; i++) {
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, flush, fetch, pe_callback, NULL); r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, flush, fetch, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, 4); r = toku_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, 4);
} }
for (int i = 0; i < 8; i++) { for (int i = 0; i < 8; i++) {
r = toku_cachetable_get_and_pin(f1, make_blocknum(2), 2, &v2, &s2, flush, fetch, pe_callback, NULL); r = toku_cachetable_get_and_pin(f1, make_blocknum(2), 2, &v2, &s2, flush, fetch, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(2), 2, CACHETABLE_CLEAN, 4); r = toku_cachetable_unpin(f1, make_blocknum(2), 2, CACHETABLE_CLEAN, 4);
} }
for (int i = 0; i < 4; i++) { for (int i = 0; i < 4; i++) {
r = toku_cachetable_get_and_pin(f1, make_blocknum(3), 3, &v2, &s2, flush, fetch, pe_callback, NULL); r = toku_cachetable_get_and_pin(f1, make_blocknum(3), 3, &v2, &s2, flush, fetch, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(3), 3, CACHETABLE_CLEAN, 4); r = toku_cachetable_unpin(f1, make_blocknum(3), 3, CACHETABLE_CLEAN, 4);
} }
for (int i = 0; i < 2; i++) { for (int i = 0; i < 2; i++) {
r = toku_cachetable_get_and_pin(f1, make_blocknum(4), 4, &v2, &s2, flush, fetch, pe_callback, NULL); r = toku_cachetable_get_and_pin(f1, make_blocknum(4), 4, &v2, &s2, flush, fetch, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(4), 4, CACHETABLE_CLEAN, 4); r = toku_cachetable_unpin(f1, make_blocknum(4), 4, CACHETABLE_CLEAN, 4);
} }
flush_may_occur = FALSE; flush_may_occur = FALSE;
expected_bytes_to_free = 4; expected_bytes_to_free = 4;
r = toku_cachetable_put(f1, make_blocknum(5), 5, NULL, 4, other_flush, other_fetch, other_pe_callback, NULL); r = toku_cachetable_put(f1, make_blocknum(5), 5, NULL, 4, other_flush, other_pe_callback, NULL);
flush_may_occur = TRUE; flush_may_occur = TRUE;
r = toku_cachetable_unpin(f1, make_blocknum(5), 5, CACHETABLE_CLEAN, 4); r = toku_cachetable_unpin(f1, make_blocknum(5), 5, CACHETABLE_CLEAN, 4);
......
...@@ -15,19 +15,6 @@ flush (CACHEFILE f __attribute__((__unused__)), ...@@ -15,19 +15,6 @@ flush (CACHEFILE f __attribute__((__unused__)),
/* Do nothing */ /* Do nothing */
} }
static int
fetch (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
u_int32_t fullhash __attribute__((__unused__)),
void **value __attribute__((__unused__)),
long *sizep __attribute__((__unused__)),
int *dirtyp __attribute__((__unused__)),
void *extraargs __attribute__((__unused__))
) {
return 0;
}
static int static int
pe_callback ( pe_callback (
void *brtnode_pv __attribute__((__unused__)), void *brtnode_pv __attribute__((__unused__)),
...@@ -56,7 +43,7 @@ cachetable_count_pinned_test (int n) { ...@@ -56,7 +43,7 @@ cachetable_count_pinned_test (int n) {
for (i=1; i<=n; i++) { for (i=1; i<=n; i++) {
u_int32_t hi; u_int32_t hi;
hi = toku_cachetable_hash(f1, make_blocknum(i)); hi = toku_cachetable_hash(f1, make_blocknum(i));
r = toku_cachetable_put(f1, make_blocknum(i), hi, (void *)(long)i, 1, flush, fetch, pe_callback, 0); r = toku_cachetable_put(f1, make_blocknum(i), hi, (void *)(long)i, 1, flush, pe_callback, 0);
assert(r == 0); assert(r == 0);
assert(toku_cachefile_count_pinned(f1, 0) == i); assert(toku_cachefile_count_pinned(f1, 0) == i);
......
...@@ -15,19 +15,6 @@ flush (CACHEFILE f __attribute__((__unused__)), ...@@ -15,19 +15,6 @@ flush (CACHEFILE f __attribute__((__unused__)),
/* Do nothing */ /* Do nothing */
} }
static int
fetch (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
u_int32_t fullhash __attribute__((__unused__)),
void **value __attribute__((__unused__)),
long *sizep __attribute__((__unused__)),
int *dirtyp __attribute__((__unused__)),
void *extraargs __attribute__((__unused__))
) {
return 0;
}
static int static int
pe_callback ( pe_callback (
void *brtnode_pv __attribute__((__unused__)), void *brtnode_pv __attribute__((__unused__)),
...@@ -64,7 +51,7 @@ cachetable_debug_test (int n) { ...@@ -64,7 +51,7 @@ cachetable_debug_test (int n) {
const int item_size = 1; const int item_size = 1;
u_int32_t hi; u_int32_t hi;
hi = toku_cachetable_hash(f1, make_blocknum(i)); hi = toku_cachetable_hash(f1, make_blocknum(i));
r = toku_cachetable_put(f1, make_blocknum(i), hi, (void *)(long)i, item_size, flush, fetch, pe_callback, 0); r = toku_cachetable_put(f1, make_blocknum(i), hi, (void *)(long)i, item_size, flush, pe_callback, 0);
assert(r == 0); assert(r == 0);
void *v; int dirty; long long pinned; long pair_size; void *v; int dirty; long long pinned; long pair_size;
......
...@@ -15,19 +15,6 @@ flush (CACHEFILE f __attribute__((__unused__)), ...@@ -15,19 +15,6 @@ flush (CACHEFILE f __attribute__((__unused__)),
/* Do nothing */ /* Do nothing */
} }
static int
fetch (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
u_int32_t fullhash __attribute__((__unused__)),
void **value __attribute__((__unused__)),
long *sizep __attribute__((__unused__)),
int *dirtyp __attribute__((__unused__)),
void *extraargs __attribute__((__unused__))
) {
return 0;
}
static int static int
pe_callback ( pe_callback (
void *brtnode_pv __attribute__((__unused__)), void *brtnode_pv __attribute__((__unused__)),
...@@ -40,7 +27,6 @@ pe_callback ( ...@@ -40,7 +27,6 @@ pe_callback (
return 0; return 0;
} }
static void static void
test_cachetable_flush (int n) { test_cachetable_flush (int n) {
const int test_limit = 2*n; const int test_limit = 2*n;
...@@ -62,12 +48,12 @@ test_cachetable_flush (int n) { ...@@ -62,12 +48,12 @@ test_cachetable_flush (int n) {
for (i=0; i<n; i++) { for (i=0; i<n; i++) {
u_int32_t hi; u_int32_t hi;
hi = toku_cachetable_hash(f1, make_blocknum(i)); hi = toku_cachetable_hash(f1, make_blocknum(i));
r = toku_cachetable_put(f1, make_blocknum(i), hi, (void *)(long)i, 1, flush, fetch, pe_callback, 0); r = toku_cachetable_put(f1, make_blocknum(i), hi, (void *)(long)i, 1, flush, pe_callback, 0);
assert(r == 0); assert(r == 0);
r = toku_cachetable_unpin(f1, make_blocknum(i), hi, CACHETABLE_CLEAN, 1); r = toku_cachetable_unpin(f1, make_blocknum(i), hi, CACHETABLE_CLEAN, 1);
assert(r == 0); assert(r == 0);
hi = toku_cachetable_hash(f2, make_blocknum(i)); hi = toku_cachetable_hash(f2, make_blocknum(i));
r = toku_cachetable_put(f2, make_blocknum(i), hi, (void *)(long)i, 1, flush, fetch, pe_callback, 0); r = toku_cachetable_put(f2, make_blocknum(i), hi, (void *)(long)i, 1, flush, pe_callback, 0);
assert(r == 0); assert(r == 0);
r = toku_cachetable_unpin(f2, make_blocknum(i), hi, CACHETABLE_CLEAN, 1); r = toku_cachetable_unpin(f2, make_blocknum(i), hi, CACHETABLE_CLEAN, 1);
assert(r == 0); assert(r == 0);
......
...@@ -49,6 +49,13 @@ pe_callback ( ...@@ -49,6 +49,13 @@ pe_callback (
*bytes_freed = 0; *bytes_freed = 0;
return 0; return 0;
} }
static BOOL pf_req_callback(void* UU(brtnode_pv), void* UU(read_extraargs)) {
return FALSE;
}
static int pf_callback(void* UU(brtnode_pv), void* UU(read_extraargs), long* UU(sizep)) {
assert(FALSE);
}
static void static void
...@@ -69,7 +76,7 @@ cachetable_getandpin_test (int n) { ...@@ -69,7 +76,7 @@ cachetable_getandpin_test (int n) {
u_int32_t hi; u_int32_t hi;
hi = toku_cachetable_hash(f1, make_blocknum(i)); hi = toku_cachetable_hash(f1, make_blocknum(i));
void *v; long size; void *v; long size;
r = toku_cachetable_get_and_pin(f1, make_blocknum(i), hi, &v, &size, flush, fetch_error, pe_callback, 0); r = toku_cachetable_get_and_pin(f1, make_blocknum(i), hi, &v, &size, flush, fetch_error, pe_callback, pf_req_callback, pf_callback, 0, 0);
assert(r == -1); assert(r == -1);
} }
...@@ -78,7 +85,7 @@ cachetable_getandpin_test (int n) { ...@@ -78,7 +85,7 @@ cachetable_getandpin_test (int n) {
u_int32_t hi; u_int32_t hi;
hi = toku_cachetable_hash(f1, make_blocknum(i)); hi = toku_cachetable_hash(f1, make_blocknum(i));
void *v; long size; void *v; long size;
r = toku_cachetable_get_and_pin(f1, make_blocknum(i), hi, &v, &size, flush, fetch, pe_callback, 0); r = toku_cachetable_get_and_pin(f1, make_blocknum(i), hi, &v, &size, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0);
assert(r == 0); assert(r == 0);
assert(size == i); assert(size == i);
......
...@@ -44,6 +44,14 @@ pe_callback ( ...@@ -44,6 +44,14 @@ pe_callback (
return 0; return 0;
} }
static BOOL pf_req_callback(void* UU(brtnode_pv), void* UU(read_extraargs)) {
return FALSE;
}
static int pf_callback(void* UU(brtnode_pv), void* UU(read_extraargs), long* UU(sizep)) {
assert(FALSE);
}
static int dummy_pin_unpin(CACHEFILE UU(cfu), void* UU(v)) { static int dummy_pin_unpin(CACHEFILE UU(cfu), void* UU(v)) {
return 0; return 0;
} }
...@@ -66,7 +74,7 @@ static void cachetable_prefetch_checkpoint_test(int n, enum cachetable_dirty dir ...@@ -66,7 +74,7 @@ static void cachetable_prefetch_checkpoint_test(int n, enum cachetable_dirty dir
{ {
CACHEKEY key = make_blocknum(n+1); CACHEKEY key = make_blocknum(n+1);
u_int32_t fullhash = toku_cachetable_hash(f1, key); u_int32_t fullhash = toku_cachetable_hash(f1, key);
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, 0); r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0);
toku_cachetable_verify(ct); toku_cachetable_verify(ct);
} }
...@@ -75,7 +83,7 @@ static void cachetable_prefetch_checkpoint_test(int n, enum cachetable_dirty dir ...@@ -75,7 +83,7 @@ static void cachetable_prefetch_checkpoint_test(int n, enum cachetable_dirty dir
for (i=0; i<n; i++) { for (i=0; i<n; i++) {
CACHEKEY key = make_blocknum(i); CACHEKEY key = make_blocknum(i);
u_int32_t hi = toku_cachetable_hash(f1, key); u_int32_t hi = toku_cachetable_hash(f1, key);
r = toku_cachetable_put(f1, key, hi, (void *)(long)i, 1, flush, fetch, pe_callback, 0); r = toku_cachetable_put(f1, key, hi, (void *)(long)i, 1, flush, pe_callback, 0);
assert(r == 0); assert(r == 0);
r = toku_cachetable_unpin(f1, key, hi, dirty, item_size); r = toku_cachetable_unpin(f1, key, hi, dirty, item_size);
......
...@@ -53,6 +53,14 @@ pe_callback ( ...@@ -53,6 +53,14 @@ pe_callback (
return 0; return 0;
} }
static BOOL pf_req_callback(void* UU(brtnode_pv), void* UU(read_extraargs)) {
return FALSE;
}
static int pf_callback(void* UU(brtnode_pv), void* UU(read_extraargs), long* UU(sizep)) {
assert(FALSE);
}
static void cachetable_prefetch_maybegetandpin_test (void) { static void cachetable_prefetch_maybegetandpin_test (void) {
const int test_limit = 1; const int test_limit = 1;
int r; int r;
...@@ -66,7 +74,7 @@ static void cachetable_prefetch_maybegetandpin_test (void) { ...@@ -66,7 +74,7 @@ static void cachetable_prefetch_maybegetandpin_test (void) {
// prefetch block 0. this will take 10 seconds. // prefetch block 0. this will take 10 seconds.
CACHEKEY key = make_blocknum(0); CACHEKEY key = make_blocknum(0);
u_int32_t fullhash = toku_cachetable_hash(f1, make_blocknum(0)); u_int32_t fullhash = toku_cachetable_hash(f1, make_blocknum(0));
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, 0); r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0);
toku_cachetable_verify(ct); toku_cachetable_verify(ct);
// close with the prefetch in progress. the close should block until // close with the prefetch in progress. the close should block until
......
...@@ -54,6 +54,14 @@ pe_callback ( ...@@ -54,6 +54,14 @@ pe_callback (
return 0; return 0;
} }
static BOOL pf_req_callback(void* UU(brtnode_pv), void* UU(read_extraargs)) {
return FALSE;
}
static int pf_callback(void* UU(brtnode_pv), void* UU(read_extraargs), long* UU(sizep)) {
assert(FALSE);
}
static void cachetable_prefetch_close_leak_test (void) { static void cachetable_prefetch_close_leak_test (void) {
const int test_limit = 1; const int test_limit = 1;
int r; int r;
...@@ -67,7 +75,7 @@ static void cachetable_prefetch_close_leak_test (void) { ...@@ -67,7 +75,7 @@ static void cachetable_prefetch_close_leak_test (void) {
// prefetch block 0. this will take 10 seconds. // prefetch block 0. this will take 10 seconds.
CACHEKEY key = make_blocknum(0); CACHEKEY key = make_blocknum(0);
u_int32_t fullhash = toku_cachetable_hash(f1, make_blocknum(0)); u_int32_t fullhash = toku_cachetable_hash(f1, make_blocknum(0));
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, 0); r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0);
toku_cachetable_verify(ct); toku_cachetable_verify(ct);
// close with the prefetch in progress. the close should block until // close with the prefetch in progress. the close should block until
......
...@@ -53,6 +53,14 @@ pe_callback ( ...@@ -53,6 +53,14 @@ pe_callback (
return 0; return 0;
} }
static BOOL pf_req_callback(void* UU(brtnode_pv), void* UU(read_extraargs)) {
return FALSE;
}
static int pf_callback(void* UU(brtnode_pv), void* UU(read_extraargs), long* UU(sizep)) {
assert(FALSE);
}
static void cachetable_prefetch_maybegetandpin_test (void) { static void cachetable_prefetch_maybegetandpin_test (void) {
const int test_limit = 1; const int test_limit = 1;
...@@ -67,7 +75,7 @@ static void cachetable_prefetch_maybegetandpin_test (void) { ...@@ -67,7 +75,7 @@ static void cachetable_prefetch_maybegetandpin_test (void) {
// prefetch block 0. this will take 10 seconds. // prefetch block 0. this will take 10 seconds.
CACHEKEY key = make_blocknum(0); CACHEKEY key = make_blocknum(0);
u_int32_t fullhash = toku_cachetable_hash(f1, make_blocknum(0)); u_int32_t fullhash = toku_cachetable_hash(f1, make_blocknum(0));
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, 0); r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0);
toku_cachetable_verify(ct); toku_cachetable_verify(ct);
// close with the prefetch in progress. the close should block until // close with the prefetch in progress. the close should block until
......
...@@ -65,6 +65,14 @@ pe_callback ( ...@@ -65,6 +65,14 @@ pe_callback (
return 0; return 0;
} }
static BOOL pf_req_callback(void* UU(brtnode_pv), void* UU(read_extraargs)) {
return FALSE;
}
static int pf_callback(void* UU(brtnode_pv), void* UU(read_extraargs), long* UU(sizep)) {
assert(FALSE);
}
// Note: cachetable_size_limit must be a power of 2 // Note: cachetable_size_limit must be a power of 2
static void cachetable_prefetch_flowcontrol_test (int cachetable_size_limit) { static void cachetable_prefetch_flowcontrol_test (int cachetable_size_limit) {
...@@ -82,7 +90,7 @@ static void cachetable_prefetch_flowcontrol_test (int cachetable_size_limit) { ...@@ -82,7 +90,7 @@ static void cachetable_prefetch_flowcontrol_test (int cachetable_size_limit) {
for (i=0; i<cachetable_size_limit; i++) { for (i=0; i<cachetable_size_limit; i++) {
CACHEKEY key = make_blocknum(i); CACHEKEY key = make_blocknum(i);
u_int32_t fullhash = toku_cachetable_hash(f1, key); u_int32_t fullhash = toku_cachetable_hash(f1, key);
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, 0); r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0);
toku_cachetable_verify(ct); toku_cachetable_verify(ct);
} }
...@@ -93,7 +101,7 @@ static void cachetable_prefetch_flowcontrol_test (int cachetable_size_limit) { ...@@ -93,7 +101,7 @@ static void cachetable_prefetch_flowcontrol_test (int cachetable_size_limit) {
for (i=i; i<2*cachetable_size_limit; i++) { for (i=i; i<2*cachetable_size_limit; i++) {
CACHEKEY key = make_blocknum(i); CACHEKEY key = make_blocknum(i);
u_int32_t fullhash = toku_cachetable_hash(f1, key); u_int32_t fullhash = toku_cachetable_hash(f1, key);
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, 0); r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0);
toku_cachetable_verify(ct); toku_cachetable_verify(ct);
// sleep(1); // sleep(1);
} }
......
...@@ -49,6 +49,13 @@ pe_callback ( ...@@ -49,6 +49,13 @@ pe_callback (
return 0; return 0;
} }
static BOOL pf_req_callback(void* UU(brtnode_pv), void* UU(read_extraargs)) {
return FALSE;
}
static int pf_callback(void* UU(brtnode_pv), void* UU(read_extraargs), long* UU(sizep)) {
assert(FALSE);
}
static uint64_t tdelta_usec(struct timeval *tend, struct timeval *tstart) { static uint64_t tdelta_usec(struct timeval *tend, struct timeval *tstart) {
uint64_t t = tend->tv_sec * 1000000 + tend->tv_usec; uint64_t t = tend->tv_sec * 1000000 + tend->tv_usec;
...@@ -72,13 +79,13 @@ static void cachetable_prefetch_maybegetandpin_test (void) { ...@@ -72,13 +79,13 @@ static void cachetable_prefetch_maybegetandpin_test (void) {
// prefetch block 0. this will take 10 seconds. // prefetch block 0. this will take 10 seconds.
CACHEKEY key = make_blocknum(0); CACHEKEY key = make_blocknum(0);
u_int32_t fullhash = toku_cachetable_hash(f1, make_blocknum(0)); u_int32_t fullhash = toku_cachetable_hash(f1, make_blocknum(0));
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, 0); r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0);
toku_cachetable_verify(ct); toku_cachetable_verify(ct);
// verify that get_and_pin waits while the prefetch is in progress // verify that get_and_pin waits while the prefetch is in progress
void *v = 0; void *v = 0;
long size = 0; long size = 0;
r = toku_cachetable_get_and_pin(f1, key, fullhash, &v, &size, flush, fetch, pe_callback, NULL); r = toku_cachetable_get_and_pin(f1, key, fullhash, &v, &size, flush, fetch, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
assert(r != 0); assert(r != 0);
struct timeval tend; struct timeval tend;
......
...@@ -50,6 +50,14 @@ pe_callback ( ...@@ -50,6 +50,14 @@ pe_callback (
return 0; return 0;
} }
static BOOL pf_req_callback(void* UU(brtnode_pv), void* UU(read_extraargs)) {
return FALSE;
}
static int pf_callback(void* UU(brtnode_pv), void* UU(read_extraargs), long* UU(sizep)) {
assert(FALSE);
}
static uint64_t tdelta_usec(struct timeval *tend, struct timeval *tstart) { static uint64_t tdelta_usec(struct timeval *tend, struct timeval *tstart) {
uint64_t t = tend->tv_sec * 1000000 + tend->tv_usec; uint64_t t = tend->tv_sec * 1000000 + tend->tv_usec;
t -= tstart->tv_sec * 1000000 + tstart->tv_usec; t -= tstart->tv_sec * 1000000 + tstart->tv_usec;
...@@ -72,13 +80,13 @@ static void cachetable_prefetch_maybegetandpin_test (void) { ...@@ -72,13 +80,13 @@ static void cachetable_prefetch_maybegetandpin_test (void) {
// prefetch block 0. this will take 10 seconds. // prefetch block 0. this will take 10 seconds.
CACHEKEY key = make_blocknum(0); CACHEKEY key = make_blocknum(0);
u_int32_t fullhash = toku_cachetable_hash(f1, make_blocknum(0)); u_int32_t fullhash = toku_cachetable_hash(f1, make_blocknum(0));
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, 0); r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0);
toku_cachetable_verify(ct); toku_cachetable_verify(ct);
// verify that get_and_pin waits while the prefetch is in progress // verify that get_and_pin waits while the prefetch is in progress
void *v = 0; void *v = 0;
long size = 0; long size = 0;
r = toku_cachetable_get_and_pin(f1, key, fullhash, &v, &size, flush, fetch, pe_callback, NULL); r = toku_cachetable_get_and_pin(f1, key, fullhash, &v, &size, flush, fetch, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
assert(r == 0 && v == 0 && size == 1); assert(r == 0 && v == 0 && size == 1);
struct timeval tend; struct timeval tend;
......
...@@ -49,6 +49,14 @@ pe_callback ( ...@@ -49,6 +49,14 @@ pe_callback (
return 0; return 0;
} }
static BOOL pf_req_callback(void* UU(brtnode_pv), void* UU(read_extraargs)) {
return FALSE;
}
static int pf_callback(void* UU(brtnode_pv), void* UU(read_extraargs), long* UU(sizep)) {
assert(FALSE);
}
static void cachetable_prefetch_maybegetandpin_test (void) { static void cachetable_prefetch_maybegetandpin_test (void) {
const int test_limit = 1; const int test_limit = 1;
...@@ -63,7 +71,7 @@ static void cachetable_prefetch_maybegetandpin_test (void) { ...@@ -63,7 +71,7 @@ static void cachetable_prefetch_maybegetandpin_test (void) {
// prefetch block 0. this will take 10 seconds. // prefetch block 0. this will take 10 seconds.
CACHEKEY key = make_blocknum(0); CACHEKEY key = make_blocknum(0);
u_int32_t fullhash = toku_cachetable_hash(f1, make_blocknum(0)); u_int32_t fullhash = toku_cachetable_hash(f1, make_blocknum(0));
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, 0); r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0);
toku_cachetable_verify(ct); toku_cachetable_verify(ct);
// verify that maybe_get_and_pin returns an error while the prefetch is in progress // verify that maybe_get_and_pin returns an error while the prefetch is in progress
......
...@@ -53,6 +53,14 @@ pe_callback ( ...@@ -53,6 +53,14 @@ pe_callback (
return 0; return 0;
} }
static BOOL pf_req_callback(void* UU(brtnode_pv), void* UU(read_extraargs)) {
return FALSE;
}
static int pf_callback(void* UU(brtnode_pv), void* UU(read_extraargs), long* UU(sizep)) {
assert(FALSE);
}
static void cachetable_prefetch_maybegetandpin_test (void) { static void cachetable_prefetch_maybegetandpin_test (void) {
const int test_limit = 1; const int test_limit = 1;
...@@ -67,11 +75,11 @@ static void cachetable_prefetch_maybegetandpin_test (void) { ...@@ -67,11 +75,11 @@ static void cachetable_prefetch_maybegetandpin_test (void) {
// prefetch block 0. this will take 10 seconds. // prefetch block 0. this will take 10 seconds.
CACHEKEY key = make_blocknum(0); CACHEKEY key = make_blocknum(0);
u_int32_t fullhash = toku_cachetable_hash(f1, make_blocknum(0)); u_int32_t fullhash = toku_cachetable_hash(f1, make_blocknum(0));
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, 0); r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0);
toku_cachetable_verify(ct); toku_cachetable_verify(ct);
// prefetch again. this should do nothing. // prefetch again. this should do nothing.
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, 0); r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0);
toku_cachetable_verify(ct); toku_cachetable_verify(ct);
// verify that maybe_get_and_pin returns an error while the prefetch is in progress // verify that maybe_get_and_pin returns an error while the prefetch is in progress
......
...@@ -15,20 +15,6 @@ flush (CACHEFILE f __attribute__((__unused__)), ...@@ -15,20 +15,6 @@ flush (CACHEFILE f __attribute__((__unused__)),
/* Do nothing */ /* Do nothing */
} }
static int
fetch (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
u_int32_t fullhash __attribute__((__unused__)),
void **value __attribute__((__unused__)),
long *sizep __attribute__((__unused__)),
int *dirtyp __attribute__((__unused__)),
void *extraargs __attribute__((__unused__))
) {
*dirtyp = 0;
return 0;
}
static int static int
pe_callback ( pe_callback (
void *brtnode_pv __attribute__((__unused__)), void *brtnode_pv __attribute__((__unused__)),
...@@ -57,11 +43,11 @@ cachetable_put_test (int n) { ...@@ -57,11 +43,11 @@ cachetable_put_test (int n) {
for (i=1; i<=n; i++) { for (i=1; i<=n; i++) {
u_int32_t hi; u_int32_t hi;
hi = toku_cachetable_hash(f1, make_blocknum(i)); hi = toku_cachetable_hash(f1, make_blocknum(i));
r = toku_cachetable_put(f1, make_blocknum(i), hi, (void *)(long)i, 1, flush, fetch, pe_callback, 0); r = toku_cachetable_put(f1, make_blocknum(i), hi, (void *)(long)i, 1, flush, pe_callback, 0);
assert(r == 0); assert(r == 0);
assert(toku_cachefile_count_pinned(f1, 0) == i); assert(toku_cachefile_count_pinned(f1, 0) == i);
r = toku_cachetable_put(f1, make_blocknum(i), hi, (void *)(long)i, 1, flush, fetch, pe_callback, 0); r = toku_cachetable_put(f1, make_blocknum(i), hi, (void *)(long)i, 1, flush, pe_callback, 0);
assert(r == -1); assert(r == -1);
assert(toku_cachefile_count_pinned(f1, 0) == i); assert(toku_cachefile_count_pinned(f1, 0) == i);
......
...@@ -91,6 +91,14 @@ pe_callback ( ...@@ -91,6 +91,14 @@ pe_callback (
return 0; return 0;
} }
static BOOL pf_req_callback(void* UU(brtnode_pv), void* UU(read_extraargs)) {
return FALSE;
}
static int pf_callback(void* UU(brtnode_pv), void* UU(read_extraargs), long* UU(sizep)) {
assert(FALSE);
}
static void test_rename (void) { static void test_rename (void) {
CACHETABLE t; CACHETABLE t;
...@@ -113,7 +121,7 @@ static void test_rename (void) { ...@@ -113,7 +121,7 @@ static void test_rename (void) {
u_int32_t hnkey = toku_cachetable_hash(f, nkey); u_int32_t hnkey = toku_cachetable_hash(f, nkey);
r = toku_cachetable_put(f, nkey, hnkey, r = toku_cachetable_put(f, nkey, hnkey,
(void*)nval, 1, (void*)nval, 1,
r_flush, r_fetch, pe_callback, 0); r_flush, pe_callback, 0);
assert(r==0); assert(r==0);
test_mutex_lock(); test_mutex_lock();
while (n_keys >= KEYLIMIT) { while (n_keys >= KEYLIMIT) {
...@@ -138,7 +146,7 @@ static void test_rename (void) { ...@@ -138,7 +146,7 @@ static void test_rename (void) {
void *current_value; void *current_value;
long current_size; long current_size;
if (verbose) printf("Rename %" PRIx64 " to %" PRIx64 "\n", okey.b, nkey.b); if (verbose) printf("Rename %" PRIx64 " to %" PRIx64 "\n", okey.b, nkey.b);
r = toku_cachetable_get_and_pin(f, okey, toku_cachetable_hash(f, okey), &current_value, &current_size, r_flush, r_fetch, pe_callback, 0); r = toku_cachetable_get_and_pin(f, okey, toku_cachetable_hash(f, okey), &current_value, &current_size, r_flush, r_fetch, pe_callback, pf_req_callback, pf_callback, 0, 0);
if (r == -42) continue; if (r == -42) continue;
assert(r==0); assert(r==0);
r = toku_cachetable_rename(f, okey, nkey); r = toku_cachetable_rename(f, okey, nkey);
......
...@@ -56,6 +56,14 @@ pe_callback ( ...@@ -56,6 +56,14 @@ pe_callback (
*bytes_freed = 0; *bytes_freed = 0;
return 0; return 0;
} }
static BOOL pf_req_callback(void* UU(brtnode_pv), void* UU(read_extraargs)) {
return FALSE;
}
static int pf_callback(void* UU(brtnode_pv), void* UU(read_extraargs), long* UU(sizep)) {
assert(FALSE);
}
const char fname[] = __FILE__ ".dat"; const char fname[] = __FILE__ ".dat";
...@@ -79,7 +87,7 @@ static void writeit (void) { ...@@ -79,7 +87,7 @@ static void writeit (void) {
u_int32_t fullhash = toku_cachetable_hash(f, key); u_int32_t fullhash = toku_cachetable_hash(f, key);
int j; int j;
for (j=0; j<BLOCKSIZE; j++) ((char*)buf)[j]=(char)((i+j)%256); for (j=0; j<BLOCKSIZE; j++) ((char*)buf)[j]=(char)((i+j)%256);
r = toku_cachetable_put(f, key, fullhash, buf, BLOCKSIZE, f_flush, f_fetch, pe_callback, 0); assert(r==0); r = toku_cachetable_put(f, key, fullhash, buf, BLOCKSIZE, f_flush, pe_callback, 0); assert(r==0);
r = toku_cachetable_unpin(f, key, fullhash, CACHETABLE_CLEAN, BLOCKSIZE); assert(r==0); r = toku_cachetable_unpin(f, key, fullhash, CACHETABLE_CLEAN, BLOCKSIZE); assert(r==0);
} }
gettimeofday(&end, 0); gettimeofday(&end, 0);
...@@ -100,7 +108,7 @@ static void readit (void) { ...@@ -100,7 +108,7 @@ static void readit (void) {
for (i=0; i<N; i++) { for (i=0; i<N; i++) {
CACHEKEY key = make_blocknum(i*BLOCKSIZE); CACHEKEY key = make_blocknum(i*BLOCKSIZE);
u_int32_t fullhash = toku_cachetable_hash(f, key); u_int32_t fullhash = toku_cachetable_hash(f, key);
r=toku_cachetable_get_and_pin(f, key, fullhash, &block, &current_size, f_flush, f_fetch, pe_callback, 0); assert(r==0); r=toku_cachetable_get_and_pin(f, key, fullhash, &block, &current_size, f_flush, f_fetch, pe_callback, pf_req_callback, pf_callback, 0, 0); assert(r==0);
r=toku_cachetable_unpin(f, key, fullhash, CACHETABLE_CLEAN, BLOCKSIZE); assert(r==0); r=toku_cachetable_unpin(f, key, fullhash, CACHETABLE_CLEAN, BLOCKSIZE); assert(r==0);
} }
r = toku_cachefile_close(&f, 0, FALSE, ZERO_LSN); assert(r == 0); r = toku_cachefile_close(&f, 0, FALSE, ZERO_LSN); assert(r == 0);
......
...@@ -45,6 +45,14 @@ pe_callback ( ...@@ -45,6 +45,14 @@ pe_callback (
return 0; return 0;
} }
static BOOL pf_req_callback(void* UU(brtnode_pv), void* UU(read_extraargs)) {
return FALSE;
}
static int pf_callback(void* UU(brtnode_pv), void* UU(read_extraargs), long* UU(sizep)) {
assert(FALSE);
}
static void static void
cachetable_test (void) { cachetable_test (void) {
...@@ -61,7 +69,7 @@ cachetable_test (void) { ...@@ -61,7 +69,7 @@ cachetable_test (void) {
//void* v2; //void* v2;
long s1; long s1;
//long s2; //long s2;
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, flush, fetch, pe_callback, NULL); r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, flush, fetch, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_DIRTY, 8); r = toku_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_DIRTY, 8);
toku_cachetable_verify(ct); toku_cachetable_verify(ct);
r = toku_cachefile_close(&f1, 0, FALSE, ZERO_LSN); assert(r == 0 && f1 == 0); r = toku_cachefile_close(&f1, 0, FALSE, ZERO_LSN); assert(r == 0 && f1 == 0);
......
...@@ -167,6 +167,14 @@ pe_callback ( ...@@ -167,6 +167,14 @@ pe_callback (
return 0; return 0;
} }
static BOOL pf_req_callback(void* UU(brtnode_pv), void* UU(read_extraargs)) {
return FALSE;
}
static int pf_callback(void* UU(brtnode_pv), void* UU(read_extraargs), long* UU(sizep)) {
assert(FALSE);
}
static void maybe_flush(CACHETABLE t) { static void maybe_flush(CACHETABLE t) {
toku_cachetable_maybe_flush_some(t); toku_cachetable_maybe_flush_some(t);
...@@ -201,28 +209,28 @@ static void test0 (void) { ...@@ -201,28 +209,28 @@ static void test0 (void) {
u_int32_t h5 = toku_cachetable_hash(f, make_blocknum(5)); u_int32_t h5 = toku_cachetable_hash(f, make_blocknum(5));
u_int32_t h6 = toku_cachetable_hash(f, make_blocknum(6)); u_int32_t h6 = toku_cachetable_hash(f, make_blocknum(6));
u_int32_t h7 = toku_cachetable_hash(f, make_blocknum(7)); u_int32_t h7 = toku_cachetable_hash(f, make_blocknum(7));
r=toku_cachetable_put(f, make_blocknum(1), h1, make_item(1), test_object_size, flush, fetch, pe_callback, t3); /* 1P */ /* this is the lru list. 1 is pinned. */ r=toku_cachetable_put(f, make_blocknum(1), h1, make_item(1), test_object_size, flush, pe_callback, t3); /* 1P */ /* this is the lru list. 1 is pinned. */
assert(r==0); assert(r==0);
assert(expect_n_flushes==0); assert(expect_n_flushes==0);
expect_init(); expect_init();
r=toku_cachetable_put(f, make_blocknum(2), h2, make_item(2), test_object_size, flush, fetch, pe_callback, t3); r=toku_cachetable_put(f, make_blocknum(2), h2, make_item(2), test_object_size, flush, pe_callback, t3);
assert(r==0); assert(r==0);
r=toku_cachetable_unpin(f, make_blocknum(2), h2, CACHETABLE_DIRTY, 1); /* 2U 1P */ r=toku_cachetable_unpin(f, make_blocknum(2), h2, CACHETABLE_DIRTY, 1); /* 2U 1P */
assert(expect_n_flushes==0); assert(expect_n_flushes==0);
expect_init(); expect_init();
r=toku_cachetable_put(f, make_blocknum(3), h3, make_item(3), test_object_size, flush, fetch, pe_callback, t3); r=toku_cachetable_put(f, make_blocknum(3), h3, make_item(3), test_object_size, flush, pe_callback, t3);
assert(r==0); assert(r==0);
assert(expect_n_flushes==0); /* 3P 2U 1P */ /* 3 is most recently used (pinned), 2 is next (unpinned), 1 is least recent (pinned) */ assert(expect_n_flushes==0); /* 3P 2U 1P */ /* 3 is most recently used (pinned), 2 is next (unpinned), 1 is least recent (pinned) */
expect_init(); expect_init();
r=toku_cachetable_put(f, make_blocknum(4), h4, make_item(4), test_object_size, flush, fetch, pe_callback, t3); r=toku_cachetable_put(f, make_blocknum(4), h4, make_item(4), test_object_size, flush, pe_callback, t3);
assert(r==0); assert(r==0);
assert(expect_n_flushes==0); /* 4P 3P 2U 1P */ assert(expect_n_flushes==0); /* 4P 3P 2U 1P */
expect_init(); expect_init();
r=toku_cachetable_put(f, make_blocknum(5), h5, make_item(5), test_object_size, flush, fetch, pe_callback, t3); r=toku_cachetable_put(f, make_blocknum(5), h5, make_item(5), test_object_size, flush, pe_callback, t3);
assert(r==0); assert(r==0);
r=toku_cachetable_unpin(f, make_blocknum(5), h5, CACHETABLE_DIRTY, test_object_size); r=toku_cachetable_unpin(f, make_blocknum(5), h5, CACHETABLE_DIRTY, test_object_size);
assert(r==0); assert(r==0);
...@@ -231,7 +239,7 @@ static void test0 (void) { ...@@ -231,7 +239,7 @@ static void test0 (void) {
assert(expect_n_flushes==0); /* 5U 4P 3U 2U 1P */ assert(expect_n_flushes==0); /* 5U 4P 3U 2U 1P */
expect1(2); /* 2 is the oldest unpinned item. */ expect1(2); /* 2 is the oldest unpinned item. */
r=toku_cachetable_put(f, make_blocknum(6), h6, make_item(6), test_object_size, flush, fetch, pe_callback, t3); /* 6P 5U 4P 3U 1P */ r=toku_cachetable_put(f, make_blocknum(6), h6, make_item(6), test_object_size, flush, pe_callback, t3); /* 6P 5U 4P 3U 1P */
assert(r==0); assert(r==0);
test_mutex_lock(); test_mutex_lock();
while (expect_n_flushes != 0) { while (expect_n_flushes != 0) {
...@@ -241,7 +249,7 @@ static void test0 (void) { ...@@ -241,7 +249,7 @@ static void test0 (void) {
test_mutex_unlock(); test_mutex_unlock();
expect1(3); expect1(3);
r=toku_cachetable_put(f, make_blocknum(7), h7, make_item(7), test_object_size, flush, fetch, pe_callback, t3); r=toku_cachetable_put(f, make_blocknum(7), h7, make_item(7), test_object_size, flush, pe_callback, t3);
assert(r==0); assert(r==0);
test_mutex_lock(); test_mutex_lock();
while (expect_n_flushes != 0) { while (expect_n_flushes != 0) {
...@@ -255,7 +263,7 @@ static void test0 (void) { ...@@ -255,7 +263,7 @@ static void test0 (void) {
{ {
void *item_v=0; void *item_v=0;
expect_init(); expect_init();
r=toku_cachetable_get_and_pin(f, make_blocknum(5), toku_cachetable_hash(f, make_blocknum(5)), &item_v, NULL, flush, fetch, pe_callback, t3); /* 5P 7U 6P 4P 1P */ r=toku_cachetable_get_and_pin(f, make_blocknum(5), toku_cachetable_hash(f, make_blocknum(5)), &item_v, NULL, flush, fetch, pe_callback, pf_req_callback, pf_callback, t3, t3); /* 5P 7U 6P 4P 1P */
assert(r==0); assert(r==0);
assert(((struct item *)item_v)->key.b==5); assert(((struct item *)item_v)->key.b==5);
assert(strcmp(((struct item *)item_v)->something,"something")==0); assert(strcmp(((struct item *)item_v)->something,"something")==0);
...@@ -270,7 +278,7 @@ static void test0 (void) { ...@@ -270,7 +278,7 @@ static void test0 (void) {
assert(r==0); assert(r==0);
expect1(4); expect1(4);
did_fetch=make_blocknum(-1); did_fetch=make_blocknum(-1);
r=toku_cachetable_get_and_pin(f, make_blocknum(2), toku_cachetable_hash(f, make_blocknum(2)), &item_v, NULL, flush, fetch, pe_callback, t3); /* 2p 5P 7U 6P 1P */ r=toku_cachetable_get_and_pin(f, make_blocknum(2), toku_cachetable_hash(f, make_blocknum(2)), &item_v, NULL, flush, fetch, pe_callback, pf_req_callback, pf_callback, t3, t3); /* 2p 5P 7U 6P 1P */
assert(r==0); assert(r==0);
assert(did_fetch.b==2); /* Expect that 2 is fetched in. */ assert(did_fetch.b==2); /* Expect that 2 is fetched in. */
assert(((struct item *)item_v)->key.b==2); assert(((struct item *)item_v)->key.b==2);
...@@ -346,9 +354,9 @@ static void test_nested_pin (void) { ...@@ -346,9 +354,9 @@ static void test_nested_pin (void) {
i0=0; i1=0; i0=0; i1=0;
u_int32_t f1hash = toku_cachetable_hash(f, make_blocknum(1)); u_int32_t f1hash = toku_cachetable_hash(f, make_blocknum(1));
r = toku_cachetable_put(f, make_blocknum(1), f1hash, &i0, 1, flush_n, fetch_n, pe_callback, f2); r = toku_cachetable_put(f, make_blocknum(1), f1hash, &i0, 1, flush_n, pe_callback, f2);
assert(r==0); assert(r==0);
r = toku_cachetable_get_and_pin(f, make_blocknum(1), f1hash, &vv, NULL, flush_n, fetch_n, pe_callback, f2); r = toku_cachetable_get_and_pin(f, make_blocknum(1), f1hash, &vv, NULL, flush_n, fetch_n, pe_callback, pf_req_callback, pf_callback, f2, f2);
assert(r==0); assert(r==0);
assert(vv==&i0); assert(vv==&i0);
assert(i0==0); assert(i0==0);
...@@ -360,7 +368,7 @@ static void test_nested_pin (void) { ...@@ -360,7 +368,7 @@ static void test_nested_pin (void) {
r = toku_cachetable_unpin(f, make_blocknum(1), f1hash, CACHETABLE_CLEAN, test_object_size); r = toku_cachetable_unpin(f, make_blocknum(1), f1hash, CACHETABLE_CLEAN, test_object_size);
assert(r==0); assert(r==0);
u_int32_t f2hash = toku_cachetable_hash(f, make_blocknum(2)); u_int32_t f2hash = toku_cachetable_hash(f, make_blocknum(2));
r = toku_cachetable_put(f, make_blocknum(2), f2hash, &i1, test_object_size, flush_n, fetch_n, pe_callback, f2); r = toku_cachetable_put(f, make_blocknum(2), f2hash, &i1, test_object_size, flush_n, pe_callback, f2);
assert(r==0); // The other one is pinned, but now the cachetable fails gracefully: It allows the pin to happen assert(r==0); // The other one is pinned, but now the cachetable fails gracefully: It allows the pin to happen
r = toku_cachetable_unpin(f, make_blocknum(1), f1hash, CACHETABLE_CLEAN, test_object_size); r = toku_cachetable_unpin(f, make_blocknum(1), f1hash, CACHETABLE_CLEAN, test_object_size);
assert(r==0); assert(r==0);
...@@ -421,12 +429,12 @@ static void test_multi_filehandles (void) { ...@@ -421,12 +429,12 @@ static void test_multi_filehandles (void) {
assert(f1==f2); assert(f1==f2);
assert(f1!=f3); assert(f1!=f3);
r = toku_cachetable_put(f1, make_blocknum(1), toku_cachetable_hash(f1, make_blocknum(1)), (void*)124, test_object_size, null_flush, add123_fetch, pe_callback, (void*)123); assert(r==0); r = toku_cachetable_put(f1, make_blocknum(1), toku_cachetable_hash(f1, make_blocknum(1)), (void*)124, test_object_size, null_flush, pe_callback, (void*)123); assert(r==0);
r = toku_cachetable_get_and_pin(f2, make_blocknum(1), toku_cachetable_hash(f2, make_blocknum(1)), &v, NULL, null_flush, add123_fetch, pe_callback, (void*)123); assert(r==0); r = toku_cachetable_get_and_pin(f2, make_blocknum(1), toku_cachetable_hash(f2, make_blocknum(1)), &v, NULL, null_flush, add123_fetch, pe_callback, pf_req_callback, pf_callback, (void*)123, (void*)123); assert(r==0);
assert((unsigned long)v==124); assert((unsigned long)v==124);
r = toku_cachetable_get_and_pin(f2, make_blocknum(2), toku_cachetable_hash(f2, make_blocknum(2)), &v, NULL, null_flush, add123_fetch, pe_callback, (void*)123); assert(r==0); r = toku_cachetable_get_and_pin(f2, make_blocknum(2), toku_cachetable_hash(f2, make_blocknum(2)), &v, NULL, null_flush, add123_fetch, pe_callback, pf_req_callback, pf_callback, (void*)123, (void*)123); assert(r==0);
assert((unsigned long)v==125); assert((unsigned long)v==125);
r = toku_cachetable_get_and_pin(f3, make_blocknum(2), toku_cachetable_hash(f3, make_blocknum(2)), &v, NULL, null_flush, add222_fetch, pe_callback, (void*)222); assert(r==0); r = toku_cachetable_get_and_pin(f3, make_blocknum(2), toku_cachetable_hash(f3, make_blocknum(2)), &v, NULL, null_flush, add222_fetch, pe_callback, pf_req_callback, pf_callback, (void*)222, (void*)222); assert(r==0);
assert((unsigned long)v==224); assert((unsigned long)v==224);
r = toku_cachetable_unpin(f1, make_blocknum(1), toku_cachetable_hash(f1, make_blocknum(1)), CACHETABLE_CLEAN, 0); assert(r==0); r = toku_cachetable_unpin(f1, make_blocknum(1), toku_cachetable_hash(f1, make_blocknum(1)), CACHETABLE_CLEAN, 0); assert(r==0);
...@@ -484,7 +492,7 @@ static void test_dirty(void) { ...@@ -484,7 +492,7 @@ static void test_dirty(void) {
key = make_blocknum(1); value = (void*)1; key = make_blocknum(1); value = (void*)1;
u_int32_t hkey = toku_cachetable_hash(f, key); u_int32_t hkey = toku_cachetable_hash(f, key);
r = toku_cachetable_put(f, key, hkey, value, test_object_size, test_dirty_flush, 0, pe_callback, 0); r = toku_cachetable_put(f, key, hkey, value, test_object_size, test_dirty_flush, pe_callback, 0);
assert(r == 0); assert(r == 0);
// cachetable_print_state(t); // cachetable_print_state(t);
...@@ -501,7 +509,7 @@ static void test_dirty(void) { ...@@ -501,7 +509,7 @@ static void test_dirty(void) {
assert(pinned == 0); assert(pinned == 0);
r = toku_cachetable_get_and_pin(f, key, hkey, &value, NULL, test_dirty_flush, r = toku_cachetable_get_and_pin(f, key, hkey, &value, NULL, test_dirty_flush,
test_dirty_fetch, pe_callback, 0); test_dirty_fetch, pe_callback, pf_req_callback, pf_callback, 0, 0);
assert(r == 0); assert(r == 0);
// cachetable_print_state(t); // cachetable_print_state(t);
...@@ -523,7 +531,7 @@ static void test_dirty(void) { ...@@ -523,7 +531,7 @@ static void test_dirty(void) {
hkey = toku_cachetable_hash(f, key); hkey = toku_cachetable_hash(f, key);
r = toku_cachetable_get_and_pin(f, key, hkey, r = toku_cachetable_get_and_pin(f, key, hkey,
&value, NULL, test_dirty_flush, &value, NULL, test_dirty_flush,
test_dirty_fetch, pe_callback, 0); test_dirty_fetch, pe_callback, pf_req_callback, pf_callback, 0, 0);
assert(r == 0); assert(r == 0);
// cachetable_print_state(t); // cachetable_print_state(t);
...@@ -543,7 +551,7 @@ static void test_dirty(void) { ...@@ -543,7 +551,7 @@ static void test_dirty(void) {
r = toku_cachetable_get_and_pin(f, key, hkey, r = toku_cachetable_get_and_pin(f, key, hkey,
&value, NULL, test_dirty_flush, &value, NULL, test_dirty_flush,
test_dirty_fetch, pe_callback, 0); test_dirty_fetch, pe_callback, pf_req_callback, pf_callback, 0, 0);
assert(r == 0); assert(r == 0);
// cachetable_print_state(t); // cachetable_print_state(t);
...@@ -614,7 +622,7 @@ static void test_size_resize(void) { ...@@ -614,7 +622,7 @@ static void test_size_resize(void) {
u_int32_t hkey = toku_cachetable_hash(f, key); u_int32_t hkey = toku_cachetable_hash(f, key);
r = toku_cachetable_put(f, key, hkey, value, size, test_size_flush_callback, 0, pe_callback, 0); r = toku_cachetable_put(f, key, hkey, value, size, test_size_flush_callback, pe_callback, 0);
assert(r == 0); assert(r == 0);
void *entry_value; int dirty; long long pinned; long entry_size; void *entry_value; int dirty; long long pinned; long entry_size;
...@@ -631,7 +639,7 @@ static void test_size_resize(void) { ...@@ -631,7 +639,7 @@ static void test_size_resize(void) {
void *current_value; void *current_value;
long current_size; long current_size;
r = toku_cachetable_get_and_pin(f, key, hkey, &current_value, &current_size, test_size_flush_callback, 0, pe_callback, 0); r = toku_cachetable_get_and_pin(f, key, hkey, &current_value, &current_size, test_size_flush_callback, 0, pe_callback, pf_req_callback, pf_callback, 0, 0);
assert(r == 0); assert(r == 0);
assert(current_value == value); assert(current_value == value);
assert(current_size == new_size); assert(current_size == new_size);
...@@ -677,7 +685,7 @@ static void test_size_flush(void) { ...@@ -677,7 +685,7 @@ static void test_size_flush(void) {
void *value = (void *)(long)-i; void *value = (void *)(long)-i;
// printf("test_size put %lld %p %lld\n", key, value, size); // printf("test_size put %lld %p %lld\n", key, value, size);
u_int32_t hkey = toku_cachetable_hash(f, key); u_int32_t hkey = toku_cachetable_hash(f, key);
r = toku_cachetable_put(f, key, hkey, value, size, test_size_flush_callback, 0, pe_callback, 0); r = toku_cachetable_put(f, key, hkey, value, size, test_size_flush_callback, pe_callback, 0);
assert(r == 0); assert(r == 0);
int n_entries, hash_size; long size_current, size_limit; int n_entries, hash_size; long size_current, size_limit;
......
...@@ -130,6 +130,14 @@ pe_callback ( ...@@ -130,6 +130,14 @@ pe_callback (
return 0; return 0;
} }
static BOOL pf_req_callback(void* UU(brtnode_pv), void* UU(read_extraargs)) {
return FALSE;
}
static int pf_callback(void* UU(brtnode_pv), void* UU(read_extraargs), long* UU(sizep)) {
assert(FALSE);
}
static void verify_cachetable_against_present (void) { static void verify_cachetable_against_present (void) {
int i; int i;
...@@ -174,7 +182,7 @@ static void test_chaining (void) { ...@@ -174,7 +182,7 @@ static void test_chaining (void) {
int fnum = i%N_FILES; int fnum = i%N_FILES;
//printf("%s:%d Add %d\n", __FILE__, __LINE__, i); //printf("%s:%d Add %d\n", __FILE__, __LINE__, i);
u_int32_t fhash = toku_cachetable_hash(f[fnum], make_blocknum(i)); u_int32_t fhash = toku_cachetable_hash(f[fnum], make_blocknum(i));
r = toku_cachetable_put(f[fnum], make_blocknum(i), fhash, (void*)i, test_object_size, flush_forchain, fetch_forchain, pe_callback, (void*)i); r = toku_cachetable_put(f[fnum], make_blocknum(i), fhash, (void*)i, test_object_size, flush_forchain, pe_callback, (void*)i);
assert(r==0); assert(r==0);
item_becomes_present(ct, f[fnum], make_blocknum(i)); item_becomes_present(ct, f[fnum], make_blocknum(i));
r = toku_cachetable_unpin(f[fnum], make_blocknum(i), fhash, CACHETABLE_CLEAN, test_object_size); r = toku_cachetable_unpin(f[fnum], make_blocknum(i), fhash, CACHETABLE_CLEAN, test_object_size);
...@@ -203,7 +211,10 @@ static void test_chaining (void) { ...@@ -203,7 +211,10 @@ static void test_chaining (void) {
flush_forchain, flush_forchain,
fetch_forchain, fetch_forchain,
pe_callback, pe_callback,
(void*)(long)whichkey.b pf_req_callback,
pf_callback,
(void*)(long)whichkey.b,
(void*)(long)whichkey.b
); );
assert(r==0); assert(r==0);
r = toku_cachetable_unpin(whichcf, r = toku_cachetable_unpin(whichcf,
...@@ -219,7 +230,7 @@ static void test_chaining (void) { ...@@ -219,7 +230,7 @@ static void test_chaining (void) {
// if i is a duplicate, cachetable_put will return -1 // if i is a duplicate, cachetable_put will return -1
// printf("%s:%d Add {%ld,%p}\n", __FILE__, __LINE__, i, f[fnum]); // printf("%s:%d Add {%ld,%p}\n", __FILE__, __LINE__, i, f[fnum]);
u_int32_t fhash = toku_cachetable_hash(f[fnum], make_blocknum(i)); u_int32_t fhash = toku_cachetable_hash(f[fnum], make_blocknum(i));
r = toku_cachetable_put(f[fnum], make_blocknum(i), fhash, (void*)i, test_object_size, flush_forchain, fetch_forchain, pe_callback, (void*)i); r = toku_cachetable_put(f[fnum], make_blocknum(i), fhash, (void*)i, test_object_size, flush_forchain, pe_callback, (void*)i);
assert(r==0 || r==-1); assert(r==0 || r==-1);
if (r==0) { if (r==0) {
item_becomes_present(ct, f[fnum], make_blocknum(i)); item_becomes_present(ct, f[fnum], make_blocknum(i));
......
...@@ -40,6 +40,14 @@ pe_callback ( ...@@ -40,6 +40,14 @@ pe_callback (
*bytes_freed = 0; *bytes_freed = 0;
return 0; return 0;
} }
static BOOL pf_req_callback(void* UU(brtnode_pv), void* UU(read_extraargs)) {
return FALSE;
}
static int pf_callback(void* UU(brtnode_pv), void* UU(read_extraargs), long* UU(sizep)) {
assert(FALSE);
}
// test simple unpin and remove // test simple unpin and remove
...@@ -66,7 +74,7 @@ cachetable_unpin_and_remove_test (int n) { ...@@ -66,7 +74,7 @@ cachetable_unpin_and_remove_test (int n) {
// put the keys into the cachetable // put the keys into the cachetable
for (i=0; i<n; i++) { for (i=0; i<n; i++) {
u_int32_t hi = toku_cachetable_hash(f1, make_blocknum(keys[i].b)); u_int32_t hi = toku_cachetable_hash(f1, make_blocknum(keys[i].b));
r = toku_cachetable_put(f1, make_blocknum(keys[i].b), hi, (void *)(long) keys[i].b, 1, flush, fetch, pe_callback, 0); r = toku_cachetable_put(f1, make_blocknum(keys[i].b), hi, (void *)(long) keys[i].b, 1, flush, pe_callback, 0);
assert(r == 0); assert(r == 0);
} }
...@@ -127,7 +135,7 @@ cachetable_put_evict_remove_test (int n) { ...@@ -127,7 +135,7 @@ cachetable_put_evict_remove_test (int n) {
// put 0, 1, 2, ... should evict 0 // put 0, 1, 2, ... should evict 0
for (i=0; i<n; i++) { for (i=0; i<n; i++) {
r = toku_cachetable_put(f1, make_blocknum(i), hi[i], (void *)(long)i, 1, flush, fetch, pe_callback, 0); r = toku_cachetable_put(f1, make_blocknum(i), hi[i], (void *)(long)i, 1, flush, pe_callback, 0);
assert(r == 0); assert(r == 0);
r = toku_cachetable_unpin(f1, make_blocknum(i), hi[i], CACHETABLE_CLEAN, 1); r = toku_cachetable_unpin(f1, make_blocknum(i), hi[i], CACHETABLE_CLEAN, 1);
assert(r == 0); assert(r == 0);
...@@ -135,7 +143,7 @@ cachetable_put_evict_remove_test (int n) { ...@@ -135,7 +143,7 @@ cachetable_put_evict_remove_test (int n) {
// get 0 // get 0
void *v; long s; void *v; long s;
r = toku_cachetable_get_and_pin(f1, make_blocknum(0), hi[0], &v, &s, flush, fetch, pe_callback, 0); r = toku_cachetable_get_and_pin(f1, make_blocknum(0), hi[0], &v, &s, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0);
assert(r == 0); assert(r == 0);
// remove 0 // remove 0
......
...@@ -15,20 +15,6 @@ flush (CACHEFILE f __attribute__((__unused__)), ...@@ -15,20 +15,6 @@ flush (CACHEFILE f __attribute__((__unused__)),
/* Do nothing */ /* Do nothing */
} }
static int
fetch (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
u_int32_t fullhash __attribute__((__unused__)),
void **value __attribute__((__unused__)),
long *sizep __attribute__((__unused__)),
int *dirtyp,
void *extraargs __attribute__((__unused__))
) {
*dirtyp = 0;
return 0;
}
static int static int
pe_callback ( pe_callback (
void *brtnode_pv __attribute__((__unused__)), void *brtnode_pv __attribute__((__unused__)),
...@@ -57,7 +43,7 @@ cachetable_unpin_test (int n) { ...@@ -57,7 +43,7 @@ cachetable_unpin_test (int n) {
for (i=1; i<=n; i++) { for (i=1; i<=n; i++) {
u_int32_t hi; u_int32_t hi;
hi = toku_cachetable_hash(f1, make_blocknum(i)); hi = toku_cachetable_hash(f1, make_blocknum(i));
r = toku_cachetable_put(f1, make_blocknum(i), hi, (void *)(long)i, 1, flush, fetch, pe_callback, 0); r = toku_cachetable_put(f1, make_blocknum(i), hi, (void *)(long)i, 1, flush, pe_callback, 0);
assert(r == 0); assert(r == 0);
assert(toku_cachefile_count_pinned(f1, 0) == i); assert(toku_cachefile_count_pinned(f1, 0) == i);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment