Commit 620c0f36 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:3645], merge to main

git-svn-id: file:///svn/toku/tokudb@34156 c7de825b-a66e-492c-adef-691d508d4ae1
parent 63310820
......@@ -507,6 +507,12 @@ struct brt {
};
/* serialization code */
void
toku_create_compressed_partition_from_available(
BRTNODE node,
int childnum,
SUB_BLOCK sb
);
int toku_serialize_brtnode_to_memory (BRTNODE node,
unsigned int basementnodesize,
/*out*/ size_t *n_bytes_to_write,
......@@ -559,6 +565,7 @@ struct brtenv {
extern void toku_brtnode_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM nodename, void *brtnode_v, void *extraargs, long size, BOOL write_me, BOOL keep_me, BOOL for_checkpoint);
extern int toku_brtnode_fetch_callback (CACHEFILE cachefile, int fd, BLOCKNUM nodename, u_int32_t fullhash, void **brtnode_pv, long *sizep, int*dirty, void*extraargs);
extern void toku_brtnode_pe_est_callback(void* brtnode_pv, long* bytes_freed_estimate, enum partial_eviction_cost *cost, void* write_extraargs);
extern int toku_brtnode_pe_callback (void *brtnode_pv, long bytes_to_free, long* bytes_freed, void *extraargs);
extern BOOL toku_brtnode_pf_req_callback(void* brtnode_pv, void* read_extraargs);
int toku_brtnode_pf_callback(void* brtnode_pv, void* read_extraargs, int fd, long* sizep);
......
......@@ -615,6 +615,44 @@ rebalance_brtnode_leaf(BRTNODE node, unsigned int basementnodesize)
static void
serialize_and_compress(BRTNODE node, int npartitions, struct sub_block sb[]);
static void
serialize_and_compress_partition(BRTNODE node, int childnum, SUB_BLOCK sb)
{
serialize_brtnode_partition(node, childnum, sb);
compress_brtnode_sub_block(sb);
}
void
toku_create_compressed_partition_from_available(
BRTNODE node,
int childnum,
SUB_BLOCK sb
)
{
serialize_and_compress_partition(node, childnum, sb);
//
// now we have an sb that would be ready for being written out,
// but we are not writing it out, we are storing it in cache for a potentially
// long time, so we need to do some cleanup
//
// The buffer created above contains metadata in the first 8 bytes, and is overallocated
// It allocates a bound on the compressed length (evaluated before compression) as opposed
// to just the amount of the actual compressed data. So, we create a new buffer and copy
// just the compressed data.
//
u_int32_t compressed_size = toku_dtoh32(*(u_int32_t *)sb->compressed_ptr);
void* compressed_data = toku_xmalloc(compressed_size);
memcpy(compressed_data, (char *)sb->compressed_ptr + 8, compressed_size);
toku_free(sb->compressed_ptr);
sb->compressed_ptr = compressed_data;
sb->compressed_size = compressed_size;
if (sb->uncompressed_ptr) {
toku_free(sb->uncompressed_ptr);
sb->uncompressed_ptr = NULL;
}
}
// tests are showing that serial insertions are slightly faster
// using the pthreads than using CILK. Disabling CILK until we have
......@@ -626,8 +664,7 @@ static void
serialize_and_compress(BRTNODE node, int npartitions, struct sub_block sb[]) {
#pragma cilk grainsize = 1
cilk_for (int i = 0; i < npartitions; i++) {
serialize_brtnode_partition(node, i, &sb[i]);
compress_brtnode_sub_block(&sb[i]);
serialize_and_compress_partition(node, i, &sb[i]);
}
}
......@@ -648,8 +685,7 @@ serialize_and_compress_worker(void *arg) {
if (w == NULL)
break;
int i = w->i;
serialize_brtnode_partition(w->node, i, &w->sb[i]);
compress_brtnode_sub_block(&w->sb[i]);
serialize_and_compress_partition(w->node, i, &w->sb[i]);
}
workset_release_ref(ws);
return arg;
......@@ -658,8 +694,7 @@ serialize_and_compress_worker(void *arg) {
static void
serialize_and_compress(BRTNODE node, int npartitions, struct sub_block sb[]) {
if (npartitions == 1) {
serialize_brtnode_partition(node, 0, &sb[0]);
compress_brtnode_sub_block(&sb[0]);
serialize_and_compress_partition(node, 0, &sb[0]);
} else {
int T = num_cores;
if (T > npartitions)
......
......@@ -86,6 +86,7 @@ int toku_testsetup_get_sersize(BRT brt, BLOCKNUM diskoff) // Return the size on
NULL,
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_est_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
......@@ -114,6 +115,7 @@ int toku_testsetup_insert_to_leaf (BRT brt, BLOCKNUM blocknum, char *key, int ke
NULL,
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_est_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
......@@ -185,6 +187,7 @@ int toku_testsetup_insert_to_nonleaf (BRT brt, BLOCKNUM blocknum, enum brt_msg_t
NULL,
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_est_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
......
......@@ -123,6 +123,7 @@ toku_verify_brtnode (BRT brt,
NULL,
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_est_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
......
......@@ -284,6 +284,7 @@ int toku_pin_brtnode (BRT brt, BLOCKNUM blocknum, u_int32_t fullhash,
NULL,
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_est_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
......@@ -316,6 +317,7 @@ void toku_pin_brtnode_holding_lock (BRT brt, BLOCKNUM blocknum, u_int32_t fullha
NULL,
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_est_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
......@@ -491,6 +493,19 @@ fetch_from_buf (OMT omt, u_int32_t idx) {
return (LEAFENTRY)v;
}
static long
get_avail_internal_node_partition_size(BRTNODE node, int i)
{
long retval = 0;
assert(node->height > 0);
NONLEAF_CHILDINFO childinfo = BNC(node, i);
retval += sizeof(*childinfo);
retval += toku_fifo_memory_size(BNC_BUFFER(node, i));
retval += toku_omt_memory_size(BNC_BROADCAST_BUFFER(node, i));
retval += toku_omt_memory_size(BNC_MESSAGE_TREE(node, i));
return retval;
}
static long
brtnode_memory_size (BRTNODE node)
// Effect: Estimate how much main memory a node requires.
......@@ -513,11 +528,7 @@ brtnode_memory_size (BRTNODE node)
}
else if (BP_STATE(node,i) == PT_AVAIL) {
if (node->height > 0) {
NONLEAF_CHILDINFO childinfo = BNC(node, i);
retval += sizeof(*childinfo);
retval += toku_fifo_memory_size(BNC_BUFFER(node, i));
retval += toku_omt_memory_size(BNC_BROADCAST_BUFFER(node, i));
retval += toku_omt_memory_size(BNC_MESSAGE_TREE(node, i));
retval += get_avail_internal_node_partition_size(node, i);
}
else {
BASEMENTNODE bn = BLB(node, i);
......@@ -659,17 +670,96 @@ int toku_brtnode_fetch_callback (CACHEFILE UU(cachefile), int fd, BLOCKNUM noden
return r;
}
void toku_brtnode_pe_est_callback(
void* brtnode_pv,
long* bytes_freed_estimate,
enum partial_eviction_cost *cost,
void* UU(write_extraargs)
)
{
assert(brtnode_pv != NULL);
long bytes_to_free = 0;
BRTNODE node = (BRTNODE)brtnode_pv;
if (node->dirty || node->height == 0) {
*bytes_freed_estimate = 0;
*cost = PE_CHEAP;
goto exit;
}
//
// we are dealing with a clean internal node
//
*cost = PE_EXPENSIVE;
// now lets get an estimate for how much data we can free up
// we estimate the compressed size of data to be how large
// the compressed data is on disk
for (int i = 0; i < node->n_children; i++) {
if (BP_SHOULD_EVICT(node,i)) {
// calculate how much data would be freed if
// we compress this node and add it to
// bytes_to_free
// first get an estimate for how much space will be taken
// after compression, it is simply the size of compressed
// data on disk plus the size of the struct that holds it
u_int32_t compressed_data_size =
((i==0) ?
BP_OFFSET(node,i) :
(BP_OFFSET(node,i) - BP_OFFSET(node,i-1)));
compressed_data_size += sizeof(struct sub_block);
// now get the space taken now
u_int32_t decompressed_data_size = get_avail_internal_node_partition_size(node,i);
bytes_to_free += (decompressed_data_size - compressed_data_size);
}
}
*bytes_freed_estimate = bytes_to_free;
exit:
return;
}
// callback for partially evicting a node
int toku_brtnode_pe_callback (void *brtnode_pv, long bytes_to_free, long* bytes_freed, void* UU(extraargs)) {
int toku_brtnode_pe_callback (void *brtnode_pv, long UU(bytes_to_free), long* bytes_freed, void* UU(extraargs)) {
BRTNODE node = (BRTNODE)brtnode_pv;
long orig_size = brtnode_memory_size(node);
assert(bytes_to_free > 0);
//
// nothing on internal nodes for now
//
if (node->dirty || node->height > 0) {
if (node->dirty) {
*bytes_freed = 0;
goto exit;
}
//
// partial eviction for internal nodes
//
if (node->height > 0) {
for (int i = 0; i < node->n_children; i++) {
if (BP_STATE(node,i) == PT_AVAIL) {
if (BP_SHOULD_EVICT(node,i)) {
// if we should evict, compress the
// message buffer into a sub_block
SUB_BLOCK sb = NULL;
sb = toku_xmalloc(sizeof(struct sub_block));
sub_block_init(sb);
toku_create_compressed_partition_from_available(node, i, sb);
// now free the old partition and replace it with this
destroy_nonleaf_childinfo(BNC(node,i));
set_BSB(node, i, sb);
BP_STATE(node,i) = PT_COMPRESSED;
}
else {
BP_SWEEP_CLOCK(node,i);
}
}
else {
continue;
}
}
}
//
// partial eviction strategy for basement nodes:
......@@ -683,7 +773,7 @@ int toku_brtnode_pe_callback (void *brtnode_pv, long bytes_to_free, long* bytes_
SUB_BLOCK sb = BSB(node, i);
toku_free(sb->compressed_ptr);
toku_free(sb);
set_BNULL(node, i);
set_BNULL(node, i);
BP_STATE(node,i) = PT_ON_DISK;
}
else if (BP_STATE(node,i) == PT_AVAIL) {
......@@ -693,7 +783,7 @@ int toku_brtnode_pe_callback (void *brtnode_pv, long bytes_to_free, long* bytes_
OMT curr_omt = BLB_BUFFER(node, i);
toku_omt_free_items(curr_omt);
destroy_basement_node(bn);
set_BNULL(node,i);
set_BNULL(node,i);
BP_STATE(node,i) = PT_ON_DISK;
}
else {
......@@ -709,11 +799,14 @@ int toku_brtnode_pe_callback (void *brtnode_pv, long bytes_to_free, long* bytes_
}
}
*bytes_freed = orig_size - brtnode_memory_size(node);
invariant(*bytes_freed >= 0);
if (node->height == 0)
brt_status.bytes_leaf -= *bytes_freed;
else
brt_status.bytes_nonleaf -= *bytes_freed;
exit:
if (node->height == 0) {
brt_status.bytes_leaf -= *bytes_freed;
}
else {
brt_status.bytes_nonleaf -= *bytes_freed;
}
return 0;
}
......@@ -1085,7 +1178,7 @@ brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, CACHEKEY *r
u_int32_t fullhash = toku_cachetable_hash(brt->cf, newroot_diskoff);
newroot->fullhash = fullhash;
toku_cachetable_put(brt->cf, newroot_diskoff, fullhash, newroot, brtnode_memory_size(newroot),
toku_brtnode_flush_callback, toku_brtnode_pe_callback, brt->h);
toku_brtnode_flush_callback, toku_brtnode_pe_est_callback, toku_brtnode_pe_callback, brt->h);
*newrootp = newroot;
}
......@@ -1107,7 +1200,7 @@ toku_create_new_brtnode (BRT t, BRTNODE *result, int height, int n_children) {
n->fullhash = fullhash;
int r = toku_cachetable_put(t->cf, n->thisnodename, fullhash,
n, brtnode_memory_size(n),
toku_brtnode_flush_callback, toku_brtnode_pe_callback, t->h);
toku_brtnode_flush_callback, toku_brtnode_pe_est_callback, toku_brtnode_pe_callback, t->h);
assert_zero(r);
*result = n;
......@@ -3514,7 +3607,7 @@ static int setup_initial_brt_root_node (BRT t, BLOCKNUM blocknum) {
node->fullhash = fullhash;
int r = toku_cachetable_put(t->cf, blocknum, fullhash,
node, brtnode_memory_size(node),
toku_brtnode_flush_callback, toku_brtnode_pe_callback, t->h);
toku_brtnode_flush_callback, toku_brtnode_pe_est_callback, toku_brtnode_pe_callback, t->h);
if (r != 0)
toku_free(node);
else
......@@ -5493,6 +5586,7 @@ brt_node_maybe_prefetch(BRT brt, BRTNODE node, int childnum, BRT_CURSOR brtcurso
nextfullhash,
toku_brtnode_flush_callback,
brtnode_fetch_callback_and_free_bfe,
toku_brtnode_pe_est_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
brtnode_pf_callback_and_free_bfe,
......@@ -6358,6 +6452,7 @@ toku_dump_brtnode (FILE *file, BRT brt, BLOCKNUM blocknum, int depth, struct kv_
NULL,
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_est_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
......@@ -6675,6 +6770,7 @@ static BOOL is_empty_fast_iter (BRT brt, BRTNODE node) {
NULL,
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_est_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
......@@ -6720,6 +6816,7 @@ BOOL toku_brt_is_empty_fast (BRT brt)
NULL,
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_est_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
......
This diff is collapsed.
......@@ -105,6 +105,11 @@ void toku_cachefile_get_workqueue_load (CACHEFILE, int *n_in_queue, int *n_threa
// Handles the case where cf points to /dev/null
int toku_cachefile_fsync(CACHEFILE cf);
enum partial_eviction_cost {
PE_CHEAP=0, // running partial eviction is cheap, and can be done on the client thread
PE_EXPENSIVE=1, // running partial eviction is expensive, and should not be done on the client thread
};
// The flush callback is called when a key value pair is being written to storage and possibly removed from the cachetable.
// When write_me is true, the value should be written to storage.
// When keep_me is false, the value should be freed.
......@@ -120,6 +125,13 @@ typedef void (*CACHETABLE_FLUSH_CALLBACK)(CACHEFILE, int fd, CACHEKEY key, void
// Can access fd (fd is protected by a readlock during call)
typedef int (*CACHETABLE_FETCH_CALLBACK)(CACHEFILE, int fd, CACHEKEY key, u_int32_t fullhash, void **value, long *sizep, int *dirtyp, void *read_extraargs);
// The partial eviction estimate callback is a cheap operation called by the cachetable on the client thread
// to determine whether partial eviction is cheap and can be run on the client thread, or partial eviction
// is expensive and should be done on a background (writer) thread. If the callback says that
// partial eviction is expensive, it returns an estimate of the number of bytes it will free
// so that the cachetable can estimate how much data is being evicted on background threads
typedef void (*CACHETABLE_PARTIAL_EVICTION_EST_CALLBACK)(void *brtnode_pv, long* bytes_freed_estimate, enum partial_eviction_cost *cost, void *write_extraargs);
// The partial eviction callback is called by the cachetable to possibly try and partially evict pieces
// of the PAIR. The strategy for what to evict is left to the callback. The callback may choose to free
// nothing, may choose to free as much as possible.
......@@ -172,6 +184,7 @@ CACHETABLE toku_cachefile_get_cachetable(CACHEFILE cf);
int toku_cachetable_put(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
void *value, long size,
CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_PARTIAL_EVICTION_EST_CALLBACK pe_est_callback,
CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback,
void *write_extraargs
);
......@@ -189,6 +202,7 @@ int toku_cachetable_get_and_pin (
long *sizep,
CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_FETCH_CALLBACK fetch_callback,
CACHETABLE_PARTIAL_EVICTION_EST_CALLBACK pe_est_callback,
CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback,
CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback __attribute__((unused)),
CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback __attribute__((unused)),
......@@ -215,6 +229,7 @@ int toku_cachetable_get_and_pin_nonblocking (
long *sizep,
CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_FETCH_CALLBACK fetch_callback,
CACHETABLE_PARTIAL_EVICTION_EST_CALLBACK pe_est_callback,
CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback,
CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback __attribute__((unused)),
CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback __attribute__((unused)),
......@@ -258,7 +273,8 @@ int toku_cachetable_unpin_and_remove (CACHEFILE, CACHEKEY); /* Removing somethin
int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_FETCH_CALLBACK fetch_callback,
CACHETABLE_FETCH_CALLBACK fetch_callback,
CACHETABLE_PARTIAL_EVICTION_EST_CALLBACK pe_est_callback,
CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback,
CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback,
CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,
......
......@@ -510,15 +510,26 @@ static int toku_rollback_fetch_callback (CACHEFILE cachefile, int fd, BLOCKNUM l
return r;
}
static void toku_rollback_pe_est_callback(
void* rollback_v,
long* bytes_freed_estimate,
enum partial_eviction_cost *cost,
void* UU(write_extraargs)
)
{
assert(rollback_v != NULL);
*bytes_freed_estimate = 0;
*cost = PE_CHEAP;
}
// callback for partially evicting a cachetable entry
static int toku_rollback_pe_callback (
void *rollback_v,
long bytes_to_free,
long UU(bytes_to_free),
long* bytes_freed,
void* UU(extraargs)
)
{
assert(bytes_to_free > 0);
assert(rollback_v != NULL);
*bytes_freed = 0;
return 0;
......@@ -562,6 +573,7 @@ static int toku_create_new_rollback_log (TOKUTXN txn, BLOCKNUM older, uint32_t o
r=toku_cachetable_put(cf, log->thislogname, log->thishash,
log, rollback_memory_size(log),
toku_rollback_flush_callback,
toku_rollback_pe_est_callback,
toku_rollback_pe_callback,
h);
assert(r==0);
......@@ -779,6 +791,7 @@ toku_maybe_prefetch_older_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
r = toku_cachefile_prefetch(cf, name, hash,
toku_rollback_flush_callback,
toku_rollback_fetch_callback,
toku_rollback_pe_est_callback,
toku_rollback_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
......@@ -809,6 +822,7 @@ int toku_get_and_pin_rollback_log(TOKUTXN txn, TXNID xid, uint64_t sequence, BLO
&log_v, NULL,
toku_rollback_flush_callback,
toku_rollback_fetch_callback,
toku_rollback_pe_est_callback,
toku_rollback_pe_callback,
toku_rollback_pf_req_callback,
toku_rollback_pf_callback,
......
......@@ -111,15 +111,43 @@ setup_dn(enum brtnode_verify_type bft, int fd, struct brt_header *brt_h, BRTNODE
}
// if read_none, get rid of the compressed bp's
if (bft == read_none) {
long bytes_freed = 0;
toku_brtnode_pe_callback(*dn, 0xffffffff, &bytes_freed, NULL);
// assert all bp's are on disk
for (int i = 0; i < (*dn)->n_children; i++) {
if ((*dn)->height == 0) {
assert(BP_STATE(*dn,i) == PT_ON_DISK);
assert(is_BNULL(*dn, i));
if ((*dn)->height == 0) {
long bytes_freed = 0;
toku_brtnode_pe_callback(*dn, 0xffffffff, &bytes_freed, NULL);
// assert all bp's are on disk
for (int i = 0; i < (*dn)->n_children; i++) {
if ((*dn)->height == 0) {
assert(BP_STATE(*dn,i) == PT_ON_DISK);
assert(is_BNULL(*dn, i));
}
else {
assert(BP_STATE(*dn,i) == PT_COMPRESSED);
}
}
else {
}
else {
// first decompress everything, and make sure
// that it is available
// then run partial eviction to get it compressed
fill_bfe_for_full_read(&bfe, brt_h, NULL, string_key_cmp);
assert(toku_brtnode_pf_req_callback(*dn, &bfe));
long size;
r = toku_brtnode_pf_callback(*dn, &bfe, fd, &size);
assert(r==0);
// assert all bp's are available
for (int i = 0; i < (*dn)->n_children; i++) {
assert(BP_STATE(*dn,i) == PT_AVAIL);
}
long bytes_freed = 0;
toku_brtnode_pe_callback(*dn, 0xffffffff, &bytes_freed, NULL);
for (int i = 0; i < (*dn)->n_children; i++) {
// assert all bp's are still available, because we touched the clock
assert(BP_STATE(*dn,i) == PT_AVAIL);
// now assert all should be evicted
assert(BP_SHOULD_EVICT(*dn, i));
}
toku_brtnode_pe_callback(*dn, 0xffffffff, &bytes_freed, NULL);
for (int i = 0; i < (*dn)->n_children; i++) {
assert(BP_STATE(*dn,i) == PT_COMPRESSED);
}
}
......@@ -1126,6 +1154,85 @@ test_serialize_nonleaf(enum brtnode_verify_type bft) {
assert(dn->totalchildkeylens==6);
assert(BP_BLOCKNUM(dn,0).b==30);
assert(BP_BLOCKNUM(dn,1).b==35);
FIFO src_fifo_1 = BNC_BUFFER(&sn, 0);
FIFO src_fifo_2 = BNC_BUFFER(&sn, 1);
FIFO dest_fifo_1 = BNC_BUFFER(dn, 0);
FIFO dest_fifo_2 = BNC_BUFFER(dn, 1);
bytevec src_key,src_val, dest_key, dest_val;
ITEMLEN src_keylen, src_vallen;
u_int32_t src_type;
MSN src_msn;
XIDS src_xids;
ITEMLEN dest_keylen, dest_vallen;
u_int32_t dest_type;
MSN dest_msn;
XIDS dest_xids;
r = toku_fifo_peek(src_fifo_1, &src_key, &src_keylen, &src_val, &src_vallen, &src_type, &src_msn, &src_xids);
assert(r==0);
r = toku_fifo_peek(dest_fifo_1, &dest_key, &dest_keylen, &dest_val, &dest_vallen, &dest_type, &dest_msn, &dest_xids);
assert(r==0);
assert(src_keylen == dest_keylen);
assert(src_keylen == 2);
assert(src_vallen == dest_vallen);
assert(src_vallen == 5);
assert(src_type == dest_type);
assert(src_msn.msn == dest_msn.msn);
assert(strcmp(src_key, "a") == 0);
assert(strcmp(dest_key, "a") == 0);
assert(strcmp(src_val, "aval") == 0);
assert(strcmp(dest_val, "aval") == 0);
r = toku_fifo_deq(src_fifo_1);
assert(r==0);
r = toku_fifo_deq(dest_fifo_1);
assert(r==0);
r = toku_fifo_peek(src_fifo_1, &src_key, &src_keylen, &src_val, &src_vallen, &src_type, &src_msn, &src_xids);
assert(r==0);
r = toku_fifo_peek(dest_fifo_1, &dest_key, &dest_keylen, &dest_val, &dest_vallen, &dest_type, &dest_msn, &dest_xids);
assert(r==0);
assert(src_keylen == dest_keylen);
assert(src_keylen == 2);
assert(src_vallen == dest_vallen);
assert(src_vallen == 5);
assert(src_type == dest_type);
assert(src_msn.msn == dest_msn.msn);
assert(strcmp(src_key, "b") == 0);
assert(strcmp(dest_key, "b") == 0);
assert(strcmp(src_val, "bval") == 0);
assert(strcmp(dest_val, "bval") == 0);
r = toku_fifo_deq(src_fifo_1);
assert(r==0);
r = toku_fifo_deq(dest_fifo_1);
assert(r==0);
r = toku_fifo_peek(src_fifo_1, &src_key, &src_keylen, &src_val, &src_vallen, &src_type, &src_msn, &src_xids);
assert(r!=0);
r = toku_fifo_peek(dest_fifo_1, &dest_key, &dest_keylen, &dest_val, &dest_vallen, &dest_type, &dest_msn, &dest_xids);
assert(r!=0);
r = toku_fifo_peek(src_fifo_2, &src_key, &src_keylen, &src_val, &src_vallen, &src_type, &src_msn, &src_xids);
assert(r==0);
r = toku_fifo_peek(dest_fifo_2, &dest_key, &dest_keylen, &dest_val, &dest_vallen, &dest_type, &dest_msn, &dest_xids);
assert(r==0);
assert(src_keylen == dest_keylen);
assert(src_keylen == 2);
assert(src_vallen == dest_vallen);
assert(src_vallen == 5);
assert(src_type == dest_type);
assert(src_msn.msn == dest_msn.msn);
assert(strcmp(src_key, "x") == 0);
assert(strcmp(dest_key, "x") == 0);
assert(strcmp(src_val, "xval") == 0);
assert(strcmp(dest_val, "xval") == 0);
r = toku_fifo_deq(src_fifo_2);
assert(r==0);
r = toku_fifo_deq(dest_fifo_2);
assert(r==0);
r = toku_fifo_peek(src_fifo_2, &src_key, &src_keylen, &src_val, &src_vallen, &src_type, &src_msn, &src_xids);
assert(r!=0);
r = toku_fifo_peek(dest_fifo_2, &dest_key, &dest_keylen, &dest_val, &dest_vallen, &dest_type, &dest_msn, &dest_xids);
assert(r!=0);
toku_brtnode_free(&dn);
kv_pair_free(sn.childkeys[0]);
......
......@@ -37,6 +37,19 @@ fetch (CACHEFILE f __attribute__((__unused__)),
return 0;
}
static void
pe_est_callback(
void* UU(brtnode_pv),
long* bytes_freed_estimate,
enum partial_eviction_cost *cost,
void* UU(write_extraargs)
)
{
*bytes_freed_estimate = 0;
*cost = PE_CHEAP;
}
static int
pe_callback (
void *brtnode_pv __attribute__((__unused__)),
......@@ -72,11 +85,11 @@ cachetable_test (void) {
void* v1;
void* v2;
long s1, s2;
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, flush, fetch, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_DIRTY, 8);
r = toku_cachetable_get_and_pin(f1, make_blocknum(2), 2, &v2, &s2, flush, fetch, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_get_and_pin(f1, make_blocknum(2), 2, &v2, &s2, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
// usleep (2*1024*1024);
//r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, flush, fetch, pe_callback, pf_req_callback, pf_callback, NULL);
//r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(2), 2, CACHETABLE_CLEAN, 8);
......
......@@ -52,6 +52,18 @@ fetch (CACHEFILE UU(thiscf), int UU(fd), CACHEKEY UU(key), u_int32_t UU(fullhash
return 0;
}
static void
pe_est_callback(
void* UU(brtnode_pv),
long* bytes_freed_estimate,
enum partial_eviction_cost *cost,
void* UU(write_extraargs)
)
{
*bytes_freed_estimate = 0;
*cost = PE_CHEAP;
}
static int
pe_callback (
void *brtnode_pv __attribute__((__unused__)),
......@@ -83,7 +95,7 @@ do_update (void *UU(ignore))
u_int32_t hi = toku_cachetable_hash(cf, key);
void *vv;
long size;
int r = toku_cachetable_get_and_pin(cf, key, hi, &vv, &size, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0);
int r = toku_cachetable_get_and_pin(cf, key, hi, &vv, &size, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, 0, 0);
//printf("g");
assert(r==0);
assert(size==sizeof(int));
......@@ -132,7 +144,7 @@ static void checkpoint_pending(void) {
CACHEKEY key = make_blocknum(i);
u_int32_t hi = toku_cachetable_hash(cf, key);
values[i] = 42;
r = toku_cachetable_put(cf, key, hi, &values[i], sizeof(int), flush, pe_callback, 0);
r = toku_cachetable_put(cf, key, hi, &values[i], sizeof(int), flush, pe_est_callback, pe_callback, 0);
assert(r == 0);
r = toku_cachetable_unpin(cf, key, hi, CACHETABLE_DIRTY, item_size);
......
......@@ -21,6 +21,18 @@ static void flush(CACHEFILE cf, int UU(fd), CACHEKEY key, void *value, void *ext
if (keep_me) n_keep_me++;
}
static void
pe_est_callback(
void* UU(brtnode_pv),
long* bytes_freed_estimate,
enum partial_eviction_cost *cost,
void* UU(write_extraargs)
)
{
*bytes_freed_estimate = 0;
*cost = PE_CHEAP;
}
static int
pe_callback (
void *brtnode_pv __attribute__((__unused__)),
......@@ -74,7 +86,7 @@ static void cachetable_checkpoint_test(int n, enum cachetable_dirty dirty) {
for (i=0; i<n; i++) {
CACHEKEY key = make_blocknum(i);
u_int32_t hi = toku_cachetable_hash(f1, key);
r = toku_cachetable_put(f1, key, hi, (void *)(long)i, 1, flush, pe_callback, 0);
r = toku_cachetable_put(f1, key, hi, (void *)(long)i, 1, flush, pe_est_callback, pe_callback, 0);
assert(r == 0);
r = toku_cachetable_unpin(f1, key, hi, dirty, item_size);
......
#ident "$Id: cachetable-clock-eviction.c 32940 2011-07-11 18:24:15Z leifwalsh $"
#ident "Copyright (c) 2007-2011 Tokutek Inc. All rights reserved."
#include "includes.h"
#include "test.h"
static void
flush (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
void *v __attribute__((__unused__)),
void *e __attribute__((__unused__)),
long s __attribute__((__unused__)),
BOOL w __attribute__((__unused__)),
BOOL keep __attribute__((__unused__)),
BOOL c __attribute__((__unused__))
) {
}
static void
pe_est_callback(
void* UU(brtnode_pv),
long* bytes_freed_estimate,
enum partial_eviction_cost *cost,
void* UU(write_extraargs)
)
{
*bytes_freed_estimate = 0;
*cost = PE_CHEAP;
}
static int
pe_callback (
void *brtnode_pv __attribute__((__unused__)),
long bytes_to_free __attribute__((__unused__)),
long* bytes_freed,
void* extraargs __attribute__((__unused__))
)
{
*bytes_freed = 0;
return 0;
}
#if 0
static int
fetch (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
u_int32_t fullhash __attribute__((__unused__)),
void **value __attribute__((__unused__)),
long *sizep __attribute__((__unused__)),
int *dirtyp,
void *extraargs __attribute__((__unused__))
) {
*dirtyp = 1;
*value = NULL;
*sizep = 1;
return 0;
}
static BOOL pf_req_callback(void* UU(brtnode_pv), void* UU(read_extraargs)) {
return FALSE;
}
static int pf_callback(void* UU(brtnode_pv), void* UU(read_extraargs), int UU(fd), long* UU(sizep)) {
assert(FALSE);
}
#endif
static void
cachetable_test (void) {
int num_entries = 100;
int test_limit = 6;
int r;
CACHETABLE ct;
r = toku_create_cachetable(&ct, test_limit, ZERO_LSN, NULL_LOGGER); assert(r == 0);
char fname1[] = __FILE__ "test1.dat";
unlink(fname1);
CACHEFILE f1;
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
// test that putting something too big in the cachetable works fine
r = toku_cachetable_put(f1, make_blocknum(num_entries+1), num_entries+1, NULL, test_limit*2, flush, pe_est_callback, pe_callback, NULL);
assert(r==0);
r = toku_cachetable_unpin(f1, make_blocknum(num_entries+1), num_entries+1, CACHETABLE_DIRTY, test_limit*2);
assert(r==0);
for (int64_t i = 0; i < num_entries; i++) {
r = toku_cachetable_put(f1, make_blocknum(i), i, NULL, 1, flush, pe_est_callback, pe_callback, NULL);
assert(toku_cachefile_count_pinned(f1, 0) == (i+1));
}
for (int64_t i = 0; i < num_entries; i++) {
r = toku_cachetable_unpin(f1, make_blocknum(i), i, CACHETABLE_DIRTY, 1);
}
r = toku_cachefile_close(&f1, 0, FALSE, ZERO_LSN); assert(r == 0 && f1 == 0);
r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0);
}
int
test_main(int argc, const char *argv[]) {
default_parse_args(argc, argv);
cachetable_test();
return 0;
}
......@@ -45,6 +45,18 @@ fetch (CACHEFILE f __attribute__((__unused__)),
return 0;
}
static void
pe_est_callback(
void* UU(brtnode_pv),
long* bytes_freed_estimate,
enum partial_eviction_cost *cost,
void* UU(write_extraargs)
)
{
*bytes_freed_estimate = 0;
*cost = PE_CHEAP;
}
static int
pe_callback (
void *brtnode_pv __attribute__((__unused__)),
......@@ -84,24 +96,24 @@ cachetable_test (void) {
flush_may_occur = FALSE;
check_flush = TRUE;
for (int i = 0; i < 100000; i++) {
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, flush, fetch, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, 1);
}
for (int i = 0; i < 8; i++) {
r = toku_cachetable_get_and_pin(f1, make_blocknum(2), 2, &v2, &s2, flush, fetch, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_get_and_pin(f1, make_blocknum(2), 2, &v2, &s2, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(2), 2, CACHETABLE_CLEAN, 1);
}
for (int i = 0; i < 4; i++) {
r = toku_cachetable_get_and_pin(f1, make_blocknum(3), 3, &v2, &s2, flush, fetch, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_get_and_pin(f1, make_blocknum(3), 3, &v2, &s2, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(3), 3, CACHETABLE_CLEAN, 1);
}
for (int i = 0; i < 2; i++) {
r = toku_cachetable_get_and_pin(f1, make_blocknum(4), 4, &v2, &s2, flush, fetch, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_get_and_pin(f1, make_blocknum(4), 4, &v2, &s2, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(4), 4, CACHETABLE_CLEAN, 1);
}
flush_may_occur = TRUE;
expected_flushed_key = 4;
r = toku_cachetable_put(f1, make_blocknum(5), 5, NULL, 4, flush, pe_callback, NULL);
r = toku_cachetable_put(f1, make_blocknum(5), 5, NULL, 4, flush, pe_est_callback, pe_callback, NULL);
flush_may_occur = TRUE;
expected_flushed_key = 5;
r = toku_cachetable_unpin(f1, make_blocknum(5), 5, CACHETABLE_CLEAN, 4);
......
......@@ -55,15 +55,26 @@ other_flush (CACHEFILE f __attribute__((__unused__)),
) {
}
static void
pe_est_callback(
void* UU(brtnode_pv),
long* bytes_freed_estimate,
enum partial_eviction_cost *cost,
void* UU(write_extraargs)
)
{
*bytes_freed_estimate = 0;
*cost = PE_CHEAP;
}
static int
pe_callback (
void *brtnode_pv,
long bytes_to_free,
long UU(bytes_to_free),
long* bytes_freed,
void* extraargs __attribute__((__unused__))
)
{
assert(expected_bytes_to_free == bytes_to_free);
*bytes_freed = 1;
expected_bytes_to_free--;
int* foo = brtnode_pv;
......@@ -107,26 +118,28 @@ cachetable_test (void) {
long s1, s2;
flush_may_occur = FALSE;
for (int i = 0; i < 100000; i++) {
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, flush, fetch, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, 4);
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, 4);
}
for (int i = 0; i < 8; i++) {
r = toku_cachetable_get_and_pin(f1, make_blocknum(2), 2, &v2, &s2, flush, fetch, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(2), 2, CACHETABLE_CLEAN, 4);
r = toku_cachetable_get_and_pin(f1, make_blocknum(2), 2, &v2, &s2, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(2), 2, CACHETABLE_CLEAN, 4);
}
for (int i = 0; i < 4; i++) {
r = toku_cachetable_get_and_pin(f1, make_blocknum(3), 3, &v2, &s2, flush, fetch, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(3), 3, CACHETABLE_CLEAN, 4);
r = toku_cachetable_get_and_pin(f1, make_blocknum(3), 3, &v2, &s2, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(3), 3, CACHETABLE_CLEAN, 4);
}
for (int i = 0; i < 2; i++) {
r = toku_cachetable_get_and_pin(f1, make_blocknum(4), 4, &v2, &s2, flush, fetch, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(4), 4, CACHETABLE_CLEAN, 4);
r = toku_cachetable_get_and_pin(f1, make_blocknum(4), 4, &v2, &s2, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(4), 4, CACHETABLE_CLEAN, 4);
}
flush_may_occur = FALSE;
expected_bytes_to_free = 4;
r = toku_cachetable_put(f1, make_blocknum(5), 5, NULL, 4, other_flush, other_pe_callback, NULL);
r = toku_cachetable_put(f1, make_blocknum(5), 5, NULL, 4, other_flush, pe_est_callback, other_pe_callback, NULL);
flush_may_occur = TRUE;
r = toku_cachetable_unpin(f1, make_blocknum(5), 5, CACHETABLE_CLEAN, 4);
assert(expected_bytes_to_free == 0);
r = toku_cachefile_close(&f1, 0, FALSE, ZERO_LSN); assert(r == 0 && f1 == 0);
r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0);
......
#ident "$Id: cachetable-clock-eviction2.c 34104 2011-08-22 01:17:04Z zardosht $"
#include "includes.h"
#include "test.h"
BOOL flush_may_occur;
long expected_bytes_to_free;
static void
flush (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
void* UU(v),
void *e __attribute__((__unused__)),
long s __attribute__((__unused__)),
BOOL w __attribute__((__unused__)),
BOOL keep,
BOOL c __attribute__((__unused__))
) {
assert(flush_may_occur);
if (!keep) {
//int* foo = v;
//assert(*foo == 3);
toku_free(v);
}
}
static int
fetch (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
u_int32_t fullhash __attribute__((__unused__)),
void **value __attribute__((__unused__)),
long *sizep __attribute__((__unused__)),
int *dirtyp,
void *extraargs __attribute__((__unused__))
) {
*dirtyp = 0;
int* foo = toku_malloc(sizeof(int));
*value = foo;
*sizep = 4;
*foo = 4;
return 0;
}
static void
other_flush (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
void *v __attribute__((__unused__)),
void *e __attribute__((__unused__)),
long s __attribute__((__unused__)),
BOOL w __attribute__((__unused__)),
BOOL keep __attribute__((__unused__)),
BOOL c __attribute__((__unused__))
) {
}
static void
pe_est_callback(
void* UU(brtnode_pv),
long* bytes_freed_estimate,
enum partial_eviction_cost *cost,
void* UU(write_extraargs)
)
{
*bytes_freed_estimate = 1000;
*cost = PE_EXPENSIVE;
}
static int
pe_callback (
void *brtnode_pv,
long UU(bytes_to_free),
long* bytes_freed,
void* extraargs __attribute__((__unused__))
)
{
*bytes_freed = 1;
printf("calling pe_callback\n");
expected_bytes_to_free--;
int* foo = brtnode_pv;
int blah = *foo;
*foo = blah-1;
return 0;
}
static int
other_pe_callback (
void *brtnode_pv __attribute__((__unused__)),
long bytes_to_free __attribute__((__unused__)),
long* bytes_freed __attribute__((__unused__)),
void* extraargs __attribute__((__unused__))
)
{
return 0;
}
static BOOL pf_req_callback(void* UU(brtnode_pv), void* UU(read_extraargs)) {
return FALSE;
}
static int pf_callback(void* UU(brtnode_pv), void* UU(read_extraargs), int UU(fd), long* UU(sizep)) {
assert(FALSE);
}
static void
cachetable_test (void) {
const int test_limit = 20;
int r;
CACHETABLE ct;
r = toku_create_cachetable(&ct, test_limit, ZERO_LSN, NULL_LOGGER); assert(r == 0);
char fname1[] = __FILE__ "test1.dat";
unlink(fname1);
CACHEFILE f1;
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
void* v1;
void* v2;
long s1, s2;
flush_may_occur = FALSE;
for (int i = 0; i < 100000; i++) {
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, 4);
}
for (int i = 0; i < 8; i++) {
r = toku_cachetable_get_and_pin(f1, make_blocknum(2), 2, &v2, &s2, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(2), 2, CACHETABLE_CLEAN, 4);
}
for (int i = 0; i < 4; i++) {
r = toku_cachetable_get_and_pin(f1, make_blocknum(3), 3, &v2, &s2, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(3), 3, CACHETABLE_CLEAN, 4);
}
for (int i = 0; i < 2; i++) {
r = toku_cachetable_get_and_pin(f1, make_blocknum(4), 4, &v2, &s2, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(4), 4, CACHETABLE_CLEAN, 4);
}
flush_may_occur = FALSE;
expected_bytes_to_free = 4;
r = toku_cachetable_put(f1, make_blocknum(5), 5, NULL, 4, other_flush, pe_est_callback, other_pe_callback, NULL);
flush_may_occur = TRUE;
r = toku_cachetable_unpin(f1, make_blocknum(5), 5, CACHETABLE_CLEAN, 8);
// we are testing that having a wildly different estimate than
// what actually gets freed is ok
// in the callbacks, we estimate that 1000 bytes gets freed
// whereas in reality, only 1 byte will be freed
// we measure that only 1 byte gets freed (which leaves cachetable
// oversubscrubed)
usleep(2*1024*1024);
assert(expected_bytes_to_free == 3);
r = toku_cachefile_close(&f1, 0, FALSE, ZERO_LSN); assert(r == 0 && f1 == 0);
r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0);
}
int
test_main(int argc, const char *argv[]) {
default_parse_args(argc, argv);
cachetable_test();
return 0;
}
#ident "$Id: cachetable-clock-eviction.c 34099 2011-08-21 19:35:34Z zardosht $"
#ident "Copyright (c) 2007-2011 Tokutek Inc. All rights reserved."
#include "includes.h"
#include "test.h"
int num_entries;
BOOL flush_may_occur;
int expected_flushed_key;
BOOL check_flush;
//
// This test verifies that if partial eviction is expensive and
// does not estimate number of freed bytes to be greater than 0,
// then partial eviction is not called, and normal eviction
// is used. The verification is done ia an assert(FALSE) in
// pe_callback.
//
static void
flush (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
void *v __attribute__((__unused__)),
void *e __attribute__((__unused__)),
long s __attribute__((__unused__)),
BOOL w __attribute__((__unused__)),
BOOL keep __attribute__((__unused__)),
BOOL c __attribute__((__unused__))
) {
/* Do nothing */
if (check_flush && !keep) {
printf("FLUSH: %d write_me %d\n", (int)k.b, w);
assert(flush_may_occur);
assert(!w);
assert(expected_flushed_key == (int)k.b);
expected_flushed_key--;
}
}
static int
fetch (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
u_int32_t fullhash __attribute__((__unused__)),
void **value __attribute__((__unused__)),
long *sizep __attribute__((__unused__)),
int *dirtyp,
void *extraargs __attribute__((__unused__))
) {
*dirtyp = 0;
*value = NULL;
*sizep = 1;
return 0;
}
static void
pe_est_callback(
void* UU(brtnode_pv),
long* bytes_freed_estimate,
enum partial_eviction_cost *cost,
void* UU(write_extraargs)
)
{
*bytes_freed_estimate = 0;
*cost = PE_EXPENSIVE;
}
static int
pe_callback (
void *brtnode_pv __attribute__((__unused__)),
long bytes_to_free __attribute__((__unused__)),
long* bytes_freed,
void* extraargs __attribute__((__unused__))
)
{
assert(FALSE);
*bytes_freed = 0;
return 0;
}
static BOOL pf_req_callback(void* UU(brtnode_pv), void* UU(read_extraargs)) {
return FALSE;
}
static int pf_callback(void* UU(brtnode_pv), void* UU(read_extraargs), int UU(fd), long* UU(sizep)) {
assert(FALSE);
}
static void
cachetable_test (void) {
const int test_limit = 4;
num_entries = 0;
int r;
CACHETABLE ct;
r = toku_create_cachetable(&ct, test_limit, ZERO_LSN, NULL_LOGGER); assert(r == 0);
char fname1[] = __FILE__ "test1.dat";
unlink(fname1);
CACHEFILE f1;
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
void* v1;
void* v2;
long s1, s2;
flush_may_occur = FALSE;
check_flush = TRUE;
for (int i = 0; i < 100000; i++) {
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, 1);
}
for (int i = 0; i < 8; i++) {
r = toku_cachetable_get_and_pin(f1, make_blocknum(2), 2, &v2, &s2, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(2), 2, CACHETABLE_CLEAN, 1);
}
for (int i = 0; i < 4; i++) {
r = toku_cachetable_get_and_pin(f1, make_blocknum(3), 3, &v2, &s2, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(3), 3, CACHETABLE_CLEAN, 1);
}
for (int i = 0; i < 2; i++) {
r = toku_cachetable_get_and_pin(f1, make_blocknum(4), 4, &v2, &s2, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(4), 4, CACHETABLE_CLEAN, 1);
}
flush_may_occur = TRUE;
expected_flushed_key = 4;
r = toku_cachetable_put(f1, make_blocknum(5), 5, NULL, 4, flush, pe_est_callback, pe_callback, NULL);
flush_may_occur = TRUE;
expected_flushed_key = 5;
r = toku_cachetable_unpin(f1, make_blocknum(5), 5, CACHETABLE_CLEAN, 4);
check_flush = FALSE;
r = toku_cachefile_close(&f1, 0, FALSE, ZERO_LSN); assert(r == 0 && f1 == 0);
r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0);
}
int
test_main(int argc, const char *argv[]) {
default_parse_args(argc, argv);
cachetable_test();
return 0;
}
......@@ -16,6 +16,18 @@ flush (CACHEFILE f __attribute__((__unused__)),
/* Do nothing */
}
static void
pe_est_callback(
void* UU(brtnode_pv),
long* bytes_freed_estimate,
enum partial_eviction_cost *cost,
void* UU(write_extraargs)
)
{
*bytes_freed_estimate = 0;
*cost = PE_CHEAP;
}
static int
pe_callback (
void *brtnode_pv __attribute__((__unused__)),
......@@ -44,7 +56,7 @@ cachetable_count_pinned_test (int n) {
for (i=1; i<=n; i++) {
u_int32_t hi;
hi = toku_cachetable_hash(f1, make_blocknum(i));
r = toku_cachetable_put(f1, make_blocknum(i), hi, (void *)(long)i, 1, flush, pe_callback, 0);
r = toku_cachetable_put(f1, make_blocknum(i), hi, (void *)(long)i, 1, flush, pe_est_callback, pe_callback, 0);
assert(r == 0);
assert(toku_cachefile_count_pinned(f1, 0) == i);
......
......@@ -16,6 +16,18 @@ flush (CACHEFILE f __attribute__((__unused__)),
/* Do nothing */
}
static void
pe_est_callback(
void* UU(brtnode_pv),
long* bytes_freed_estimate,
enum partial_eviction_cost *cost,
void* UU(write_extraargs)
)
{
*bytes_freed_estimate = 0;
*cost = PE_CHEAP;
}
static int
pe_callback (
void *brtnode_pv __attribute__((__unused__)),
......@@ -52,7 +64,7 @@ cachetable_debug_test (int n) {
const int item_size = 1;
u_int32_t hi;
hi = toku_cachetable_hash(f1, make_blocknum(i));
r = toku_cachetable_put(f1, make_blocknum(i), hi, (void *)(long)i, item_size, flush, pe_callback, 0);
r = toku_cachetable_put(f1, make_blocknum(i), hi, (void *)(long)i, item_size, flush, pe_est_callback, pe_callback, 0);
assert(r == 0);
void *v; int dirty; long long pinned; long pair_size;
......
......@@ -16,6 +16,18 @@ flush (CACHEFILE f __attribute__((__unused__)),
/* Do nothing */
}
static void
pe_est_callback(
void* UU(brtnode_pv),
long* bytes_freed_estimate,
enum partial_eviction_cost *cost,
void* UU(write_extraargs)
)
{
*bytes_freed_estimate = 0;
*cost = PE_CHEAP;
}
static int
pe_callback (
void *brtnode_pv __attribute__((__unused__)),
......@@ -49,12 +61,12 @@ test_cachetable_flush (int n) {
for (i=0; i<n; i++) {
u_int32_t hi;
hi = toku_cachetable_hash(f1, make_blocknum(i));
r = toku_cachetable_put(f1, make_blocknum(i), hi, (void *)(long)i, 1, flush, pe_callback, 0);
r = toku_cachetable_put(f1, make_blocknum(i), hi, (void *)(long)i, 1, flush, pe_est_callback, pe_callback, 0);
assert(r == 0);
r = toku_cachetable_unpin(f1, make_blocknum(i), hi, CACHETABLE_CLEAN, 1);
assert(r == 0);
hi = toku_cachetable_hash(f2, make_blocknum(i));
r = toku_cachetable_put(f2, make_blocknum(i), hi, (void *)(long)i, 1, flush, pe_callback, 0);
r = toku_cachetable_put(f2, make_blocknum(i), hi, (void *)(long)i, 1, flush, pe_est_callback, pe_callback, 0);
assert(r == 0);
r = toku_cachetable_unpin(f2, make_blocknum(i), hi, CACHETABLE_CLEAN, 1);
assert(r == 0);
......
......@@ -39,6 +39,18 @@ fetch_error (CACHEFILE cf __attribute__((__unused__)),
return -1;
}
static void
pe_est_callback(
void* UU(brtnode_pv),
long* bytes_freed_estimate,
enum partial_eviction_cost *cost,
void* UU(write_extraargs)
)
{
*bytes_freed_estimate = 0;
*cost = PE_CHEAP;
}
static int
pe_callback (
void *brtnode_pv __attribute__((__unused__)),
......@@ -77,7 +89,7 @@ cachetable_getandpin_test (int n) {
u_int32_t hi;
hi = toku_cachetable_hash(f1, make_blocknum(i));
void *v; long size;
r = toku_cachetable_get_and_pin(f1, make_blocknum(i), hi, &v, &size, flush, fetch_error, pe_callback, pf_req_callback, pf_callback, 0, 0);
r = toku_cachetable_get_and_pin(f1, make_blocknum(i), hi, &v, &size, flush, fetch_error, pe_est_callback, pe_callback, pf_req_callback, pf_callback, 0, 0);
assert(r == -1);
}
......@@ -86,7 +98,7 @@ cachetable_getandpin_test (int n) {
u_int32_t hi;
hi = toku_cachetable_hash(f1, make_blocknum(i));
void *v; long size;
r = toku_cachetable_get_and_pin(f1, make_blocknum(i), hi, &v, &size, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0);
r = toku_cachetable_get_and_pin(f1, make_blocknum(i), hi, &v, &size, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, 0, 0);
assert(r == 0);
assert(size == i);
......
......@@ -34,6 +34,18 @@ static int fetch(CACHEFILE cf, int UU(fd), CACHEKEY key, u_int32_t fullhash, voi
return 0;
}
static void
pe_est_callback(
void* UU(brtnode_pv),
long* bytes_freed_estimate,
enum partial_eviction_cost *cost,
void* UU(write_extraargs)
)
{
*bytes_freed_estimate = 0;
*cost = PE_CHEAP;
}
static int
pe_callback (
void *brtnode_pv __attribute__((__unused__)),
......@@ -76,7 +88,7 @@ static void cachetable_prefetch_checkpoint_test(int n, enum cachetable_dirty dir
{
CACHEKEY key = make_blocknum(n+1);
u_int32_t fullhash = toku_cachetable_hash(f1, key);
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0, NULL);
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, 0, 0, NULL);
toku_cachetable_verify(ct);
}
......@@ -85,7 +97,7 @@ static void cachetable_prefetch_checkpoint_test(int n, enum cachetable_dirty dir
for (i=0; i<n; i++) {
CACHEKEY key = make_blocknum(i);
u_int32_t hi = toku_cachetable_hash(f1, key);
r = toku_cachetable_put(f1, key, hi, (void *)(long)i, 1, flush, pe_callback, 0);
r = toku_cachetable_put(f1, key, hi, (void *)(long)i, 1, flush, pe_est_callback, pe_callback, 0);
assert(r == 0);
r = toku_cachetable_unpin(f1, key, hi, dirty, item_size);
......
......@@ -43,6 +43,18 @@ fetch (CACHEFILE f __attribute__((__unused__)),
return -42;
}
static void
pe_est_callback(
void* UU(brtnode_pv),
long* bytes_freed_estimate,
enum partial_eviction_cost *cost,
void* UU(write_extraargs)
)
{
*bytes_freed_estimate = 0;
*cost = PE_CHEAP;
}
static int
pe_callback (
void *brtnode_pv __attribute__((__unused__)),
......@@ -76,7 +88,7 @@ static void cachetable_prefetch_maybegetandpin_test (void) {
// prefetch block 0. this will take 10 seconds.
CACHEKEY key = make_blocknum(0);
u_int32_t fullhash = toku_cachetable_hash(f1, make_blocknum(0));
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0, NULL);
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, 0, 0, NULL);
toku_cachetable_verify(ct);
// close with the prefetch in progress. the close should block until
......
......@@ -44,6 +44,18 @@ fetch (CACHEFILE f __attribute__((__unused__)),
return 0;
}
static void
pe_est_callback(
void* UU(brtnode_pv),
long* bytes_freed_estimate,
enum partial_eviction_cost *cost,
void* UU(write_extraargs)
)
{
*bytes_freed_estimate = 0;
*cost = PE_CHEAP;
}
static int
pe_callback (
void *brtnode_pv __attribute__((__unused__)),
......@@ -77,7 +89,7 @@ static void cachetable_prefetch_close_leak_test (void) {
// prefetch block 0. this will take 10 seconds.
CACHEKEY key = make_blocknum(0);
u_int32_t fullhash = toku_cachetable_hash(f1, make_blocknum(0));
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0, NULL);
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, 0, 0, NULL);
toku_cachetable_verify(ct);
// close with the prefetch in progress. the close should block until
......
......@@ -43,6 +43,18 @@ fetch (CACHEFILE f __attribute__((__unused__)),
return 0;
}
static void
pe_est_callback(
void* UU(brtnode_pv),
long* bytes_freed_estimate,
enum partial_eviction_cost *cost,
void* UU(write_extraargs)
)
{
*bytes_freed_estimate = 0;
*cost = PE_CHEAP;
}
static int
pe_callback (
void *brtnode_pv __attribute__((__unused__)),
......@@ -77,7 +89,7 @@ static void cachetable_prefetch_maybegetandpin_test (void) {
// prefetch block 0. this will take 10 seconds.
CACHEKEY key = make_blocknum(0);
u_int32_t fullhash = toku_cachetable_hash(f1, make_blocknum(0));
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0, NULL);
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, 0, 0, NULL);
toku_cachetable_verify(ct);
// close with the prefetch in progress. the close should block until
......
......@@ -55,6 +55,18 @@ fetch (CACHEFILE f __attribute__((__unused__)),
return 0;
}
static void
pe_est_callback(
void* UU(brtnode_pv),
long* bytes_freed_estimate,
enum partial_eviction_cost *cost,
void* UU(write_extraargs)
)
{
*bytes_freed_estimate = 0;
*cost = PE_CHEAP;
}
static int
pe_callback (
void *brtnode_pv __attribute__((__unused__)),
......@@ -92,7 +104,7 @@ static void cachetable_prefetch_flowcontrol_test (int cachetable_size_limit) {
for (i=0; i<cachetable_size_limit; i++) {
CACHEKEY key = make_blocknum(i);
u_int32_t fullhash = toku_cachetable_hash(f1, key);
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0, NULL);
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, 0, 0, NULL);
toku_cachetable_verify(ct);
}
......@@ -103,7 +115,7 @@ static void cachetable_prefetch_flowcontrol_test (int cachetable_size_limit) {
for (i=i; i<2*cachetable_size_limit; i++) {
CACHEKEY key = make_blocknum(i);
u_int32_t fullhash = toku_cachetable_hash(f1, key);
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0, NULL);
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, 0, 0, NULL);
toku_cachetable_verify(ct);
// sleep(1);
}
......
......@@ -39,6 +39,18 @@ fetch (CACHEFILE f __attribute__((__unused__)),
return -42;
}
static void
pe_est_callback(
void* UU(brtnode_pv),
long* bytes_freed_estimate,
enum partial_eviction_cost *cost,
void* UU(write_extraargs)
)
{
*bytes_freed_estimate = 0;
*cost = PE_CHEAP;
}
static int
pe_callback (
void *brtnode_pv __attribute__((__unused__)),
......@@ -81,13 +93,13 @@ static void cachetable_prefetch_maybegetandpin_test (void) {
// prefetch block 0. this will take 10 seconds.
CACHEKEY key = make_blocknum(0);
u_int32_t fullhash = toku_cachetable_hash(f1, make_blocknum(0));
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0, NULL);
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, 0, 0, NULL);
toku_cachetable_verify(ct);
// verify that get_and_pin waits while the prefetch is in progress
void *v = 0;
long size = 0;
r = toku_cachetable_get_and_pin(f1, key, fullhash, &v, &size, flush, fetch, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_get_and_pin(f1, key, fullhash, &v, &size, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
assert(r != 0);
struct timeval tend;
......
......@@ -40,6 +40,18 @@ fetch (CACHEFILE f __attribute__((__unused__)),
return 0;
}
static void
pe_est_callback(
void* UU(brtnode_pv),
long* bytes_freed_estimate,
enum partial_eviction_cost *cost,
void* UU(write_extraargs)
)
{
*bytes_freed_estimate = 0;
*cost = PE_CHEAP;
}
static int
pe_callback (
void *brtnode_pv __attribute__((__unused__)),
......@@ -82,13 +94,13 @@ static void cachetable_prefetch_maybegetandpin_test (void) {
// prefetch block 0. this will take 10 seconds.
CACHEKEY key = make_blocknum(0);
u_int32_t fullhash = toku_cachetable_hash(f1, make_blocknum(0));
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0, NULL);
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, 0, 0, NULL);
toku_cachetable_verify(ct);
// verify that get_and_pin waits while the prefetch is in progress
void *v = 0;
long size = 0;
r = toku_cachetable_get_and_pin(f1, key, fullhash, &v, &size, flush, fetch, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_get_and_pin(f1, key, fullhash, &v, &size, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
assert(r == 0 && v == 0 && size == 1);
struct timeval tend;
......
......@@ -39,6 +39,18 @@ fetch (CACHEFILE f __attribute__((__unused__)),
return 0;
}
static void
pe_est_callback(
void* UU(brtnode_pv),
long* bytes_freed_estimate,
enum partial_eviction_cost *cost,
void* UU(write_extraargs)
)
{
*bytes_freed_estimate = 0;
*cost = PE_CHEAP;
}
static int
pe_callback (
void *brtnode_pv __attribute__((__unused__)),
......@@ -73,7 +85,7 @@ static void cachetable_prefetch_maybegetandpin_test (void) {
// prefetch block 0. this will take 10 seconds.
CACHEKEY key = make_blocknum(0);
u_int32_t fullhash = toku_cachetable_hash(f1, make_blocknum(0));
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0, NULL);
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, 0, 0, NULL);
toku_cachetable_verify(ct);
// verify that maybe_get_and_pin returns an error while the prefetch is in progress
......
......@@ -43,6 +43,18 @@ fetch (CACHEFILE f __attribute__((__unused__)),
return 0;
}
static void
pe_est_callback(
void* UU(brtnode_pv),
long* bytes_freed_estimate,
enum partial_eviction_cost *cost,
void* UU(write_extraargs)
)
{
*bytes_freed_estimate = 0;
*cost = PE_CHEAP;
}
static int
pe_callback (
void *brtnode_pv __attribute__((__unused__)),
......@@ -77,11 +89,11 @@ static void cachetable_prefetch_maybegetandpin_test (void) {
// prefetch block 0. this will take 10 seconds.
CACHEKEY key = make_blocknum(0);
u_int32_t fullhash = toku_cachetable_hash(f1, make_blocknum(0));
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0, NULL);
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, 0, 0, NULL);
toku_cachetable_verify(ct);
// prefetch again. this should do nothing.
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0, NULL);
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, 0, 0, NULL);
toku_cachetable_verify(ct);
// verify that maybe_get_and_pin returns an error while the prefetch is in progress
......
......@@ -16,6 +16,18 @@ flush (CACHEFILE f __attribute__((__unused__)),
/* Do nothing */
}
static void
pe_est_callback(
void* UU(brtnode_pv),
long* bytes_freed_estimate,
enum partial_eviction_cost *cost,
void* UU(write_extraargs)
)
{
*bytes_freed_estimate = 0;
*cost = PE_CHEAP;
}
static int
pe_callback (
void *brtnode_pv __attribute__((__unused__)),
......@@ -44,11 +56,11 @@ cachetable_put_test (int n) {
for (i=1; i<=n; i++) {
u_int32_t hi;
hi = toku_cachetable_hash(f1, make_blocknum(i));
r = toku_cachetable_put(f1, make_blocknum(i), hi, (void *)(long)i, 1, flush, pe_callback, 0);
r = toku_cachetable_put(f1, make_blocknum(i), hi, (void *)(long)i, 1, flush, pe_est_callback, pe_callback, 0);
assert(r == 0);
assert(toku_cachefile_count_pinned(f1, 0) == i);
r = toku_cachetable_put(f1, make_blocknum(i), hi, (void *)(long)i, 1, flush, pe_callback, 0);
r = toku_cachetable_put(f1, make_blocknum(i), hi, (void *)(long)i, 1, flush, pe_est_callback, pe_callback, 0);
assert(r == -1);
assert(toku_cachefile_count_pinned(f1, 0) == i);
......
......@@ -80,6 +80,18 @@ static int r_fetch (CACHEFILE f __attribute__((__unused__)),
return -42;
}
static void
pe_est_callback(
void* UU(brtnode_pv),
long* bytes_freed_estimate,
enum partial_eviction_cost *cost,
void* UU(write_extraargs)
)
{
*bytes_freed_estimate = 0;
*cost = PE_CHEAP;
}
static int
pe_callback (
void *brtnode_pv __attribute__((__unused__)),
......@@ -122,7 +134,7 @@ static void test_rename (void) {
u_int32_t hnkey = toku_cachetable_hash(f, nkey);
r = toku_cachetable_put(f, nkey, hnkey,
(void*)nval, 1,
r_flush, pe_callback, 0);
r_flush, pe_est_callback, pe_callback, 0);
assert(r==0);
test_mutex_lock();
while (n_keys >= KEYLIMIT) {
......@@ -147,7 +159,7 @@ static void test_rename (void) {
void *current_value;
long current_size;
if (verbose) printf("Rename %" PRIx64 " to %" PRIx64 "\n", okey.b, nkey.b);
r = toku_cachetable_get_and_pin(f, okey, toku_cachetable_hash(f, okey), &current_value, &current_size, r_flush, r_fetch, pe_callback, pf_req_callback, pf_callback, 0, 0);
r = toku_cachetable_get_and_pin(f, okey, toku_cachetable_hash(f, okey), &current_value, &current_size, r_flush, r_fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, 0, 0);
if (r == -42) continue;
assert(r==0);
r = toku_cachetable_rename(f, okey, nkey);
......
......@@ -46,6 +46,18 @@ static int f_fetch (CACHEFILE f,
return 0;
}
static void
pe_est_callback(
void* UU(brtnode_pv),
long* bytes_freed_estimate,
enum partial_eviction_cost *cost,
void* UU(write_extraargs)
)
{
*bytes_freed_estimate = 0;
*cost = PE_CHEAP;
}
static int
pe_callback (
void *brtnode_pv __attribute__((__unused__)),
......@@ -88,7 +100,7 @@ static void writeit (void) {
u_int32_t fullhash = toku_cachetable_hash(f, key);
int j;
for (j=0; j<BLOCKSIZE; j++) ((char*)buf)[j]=(char)((i+j)%256);
r = toku_cachetable_put(f, key, fullhash, buf, BLOCKSIZE, f_flush, pe_callback, 0); assert(r==0);
r = toku_cachetable_put(f, key, fullhash, buf, BLOCKSIZE, f_flush, pe_est_callback, pe_callback, 0); assert(r==0);
r = toku_cachetable_unpin(f, key, fullhash, CACHETABLE_CLEAN, BLOCKSIZE); assert(r==0);
}
gettimeofday(&end, 0);
......@@ -109,7 +121,7 @@ static void readit (void) {
for (i=0; i<N; i++) {
CACHEKEY key = make_blocknum(i*BLOCKSIZE);
u_int32_t fullhash = toku_cachetable_hash(f, key);
r=toku_cachetable_get_and_pin(f, key, fullhash, &block, &current_size, f_flush, f_fetch, pe_callback, pf_req_callback, pf_callback, 0, 0); assert(r==0);
r=toku_cachetable_get_and_pin(f, key, fullhash, &block, &current_size, f_flush, f_fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, 0, 0); assert(r==0);
r=toku_cachetable_unpin(f, key, fullhash, CACHETABLE_CLEAN, BLOCKSIZE); assert(r==0);
}
r = toku_cachefile_close(&f, 0, FALSE, ZERO_LSN); assert(r == 0);
......
......@@ -35,6 +35,18 @@ fetch (CACHEFILE f __attribute__((__unused__)),
return 0;
}
static void
pe_est_callback(
void* UU(brtnode_pv),
long* bytes_freed_estimate,
enum partial_eviction_cost *cost,
void* UU(write_extraargs)
)
{
*bytes_freed_estimate = 0;
*cost = PE_CHEAP;
}
static int
pe_callback (
void *brtnode_pv __attribute__((__unused__)),
......@@ -71,7 +83,7 @@ cachetable_test (void) {
//void* v2;
long s1;
//long s2;
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, flush, fetch, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, NULL, NULL);
r = toku_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_DIRTY, 8);
toku_cachetable_verify(ct);
r = toku_cachefile_close(&f1, 0, FALSE, ZERO_LSN); assert(r == 0 && f1 == 0);
......
This diff is collapsed.
......@@ -119,6 +119,18 @@ static int fetch_forchain (CACHEFILE f, int UU(fd), CACHEKEY key, u_int32_t full
return 0;
}
static void
pe_est_callback(
void* UU(brtnode_pv),
long* bytes_freed_estimate,
enum partial_eviction_cost *cost,
void* UU(write_extraargs)
)
{
*bytes_freed_estimate = 0;
*cost = PE_CHEAP;
}
static int
pe_callback (
void *brtnode_pv __attribute__((__unused__)),
......@@ -183,7 +195,7 @@ static void test_chaining (void) {
int fnum = i%N_FILES;
//printf("%s:%d Add %d\n", __FILE__, __LINE__, i);
u_int32_t fhash = toku_cachetable_hash(f[fnum], make_blocknum(i));
r = toku_cachetable_put(f[fnum], make_blocknum(i), fhash, (void*)i, test_object_size, flush_forchain, pe_callback, (void*)i);
r = toku_cachetable_put(f[fnum], make_blocknum(i), fhash, (void*)i, test_object_size, flush_forchain, pe_est_callback, pe_callback, (void*)i);
assert(r==0);
item_becomes_present(ct, f[fnum], make_blocknum(i));
r = toku_cachetable_unpin(f[fnum], make_blocknum(i), fhash, CACHETABLE_CLEAN, test_object_size);
......@@ -211,6 +223,7 @@ static void test_chaining (void) {
NULL,
flush_forchain,
fetch_forchain,
pe_est_callback,
pe_callback,
pf_req_callback,
pf_callback,
......@@ -231,7 +244,7 @@ static void test_chaining (void) {
// if i is a duplicate, cachetable_put will return -1
// printf("%s:%d Add {%ld,%p}\n", __FILE__, __LINE__, i, f[fnum]);
u_int32_t fhash = toku_cachetable_hash(f[fnum], make_blocknum(i));
r = toku_cachetable_put(f[fnum], make_blocknum(i), fhash, (void*)i, test_object_size, flush_forchain, pe_callback, (void*)i);
r = toku_cachetable_put(f[fnum], make_blocknum(i), fhash, (void*)i, test_object_size, flush_forchain, pe_est_callback, pe_callback, (void*)i);
assert(r==0 || r==-1);
if (r==0) {
item_becomes_present(ct, f[fnum], make_blocknum(i));
......
......@@ -31,6 +31,18 @@ fetch (CACHEFILE f __attribute__((__unused__)),
return 0;
}
static void
pe_est_callback(
void* UU(brtnode_pv),
long* bytes_freed_estimate,
enum partial_eviction_cost *cost,
void* UU(write_extraargs)
)
{
*bytes_freed_estimate = 0;
*cost = PE_CHEAP;
}
static int
pe_callback (
void *brtnode_pv __attribute__((__unused__)),
......@@ -76,7 +88,7 @@ cachetable_unpin_and_remove_test (int n) {
// put the keys into the cachetable
for (i=0; i<n; i++) {
u_int32_t hi = toku_cachetable_hash(f1, make_blocknum(keys[i].b));
r = toku_cachetable_put(f1, make_blocknum(keys[i].b), hi, (void *)(long) keys[i].b, 1, flush, pe_callback, 0);
r = toku_cachetable_put(f1, make_blocknum(keys[i].b), hi, (void *)(long) keys[i].b, 1, flush, pe_est_callback, pe_callback, 0);
assert(r == 0);
}
......@@ -137,7 +149,7 @@ cachetable_put_evict_remove_test (int n) {
// put 0, 1, 2, ... should evict 0
for (i=0; i<n; i++) {
r = toku_cachetable_put(f1, make_blocknum(i), hi[i], (void *)(long)i, 1, flush, pe_callback, 0);
r = toku_cachetable_put(f1, make_blocknum(i), hi[i], (void *)(long)i, 1, flush, pe_est_callback, pe_callback, 0);
assert(r == 0);
r = toku_cachetable_unpin(f1, make_blocknum(i), hi[i], CACHETABLE_CLEAN, 1);
assert(r == 0);
......@@ -145,7 +157,7 @@ cachetable_put_evict_remove_test (int n) {
// get 0
void *v; long s;
r = toku_cachetable_get_and_pin(f1, make_blocknum(0), hi[0], &v, &s, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0);
r = toku_cachetable_get_and_pin(f1, make_blocknum(0), hi[0], &v, &s, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, 0, 0);
assert(r == 0);
// remove 0
......
......@@ -16,6 +16,18 @@ flush (CACHEFILE f __attribute__((__unused__)),
/* Do nothing */
}
static void
pe_est_callback(
void* UU(brtnode_pv),
long* bytes_freed_estimate,
enum partial_eviction_cost *cost,
void* UU(write_extraargs)
)
{
*bytes_freed_estimate = 0;
*cost = PE_CHEAP;
}
static int
pe_callback (
void *brtnode_pv __attribute__((__unused__)),
......@@ -44,7 +56,7 @@ cachetable_unpin_test (int n) {
for (i=1; i<=n; i++) {
u_int32_t hi;
hi = toku_cachetable_hash(f1, make_blocknum(i));
r = toku_cachetable_put(f1, make_blocknum(i), hi, (void *)(long)i, 1, flush, pe_callback, 0);
r = toku_cachetable_put(f1, make_blocknum(i), hi, (void *)(long)i, 1, flush, pe_est_callback, pe_callback, 0);
assert(r == 0);
assert(toku_cachefile_count_pinned(f1, 0) == i);
......
#ident "$Id: cachetable-clock-eviction.c 32940 2011-07-11 18:24:15Z leifwalsh $"
#ident "Copyright (c) 2007-2011 Tokutek Inc. All rights reserved."
#include "includes.h"
#include "test.h"
static int total_size;
static int test_limit;
static void
flush (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
void *v __attribute__((__unused__)),
void *e __attribute__((__unused__)),
long s __attribute__((__unused__)),
BOOL w __attribute__((__unused__)),
BOOL keep __attribute__((__unused__)),
BOOL c __attribute__((__unused__))
) {
if (w) {
int curr_size = __sync_fetch_and_add(&total_size, -1);
assert(curr_size <= 200);
usleep(500*1000);
}
}
static void
pe_est_callback(
void* UU(brtnode_pv),
long* bytes_freed_estimate,
enum partial_eviction_cost *cost,
void* UU(write_extraargs)
)
{
*bytes_freed_estimate = 0;
*cost = PE_CHEAP;
}
static int
pe_callback (
void *brtnode_pv __attribute__((__unused__)),
long bytes_to_free __attribute__((__unused__)),
long* bytes_freed,
void* extraargs __attribute__((__unused__))
)
{
*bytes_freed = 0;
return 0;
}
#if 0
static int
fetch (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
u_int32_t fullhash __attribute__((__unused__)),
void **value __attribute__((__unused__)),
long *sizep __attribute__((__unused__)),
int *dirtyp,
void *extraargs __attribute__((__unused__))
) {
*dirtyp = 1;
*value = NULL;
*sizep = 1;
return 0;
}
static BOOL pf_req_callback(void* UU(brtnode_pv), void* UU(read_extraargs)) {
return FALSE;
}
static int pf_callback(void* UU(brtnode_pv), void* UU(read_extraargs), int UU(fd), long* UU(sizep)) {
assert(FALSE);
}
#endif
static void
cachetable_test (void) {
total_size = 0;
int num_entries = 100;
test_limit = 6;
int r;
CACHETABLE ct;
r = toku_create_cachetable(&ct, test_limit, ZERO_LSN, NULL_LOGGER); assert(r == 0);
char fname1[] = __FILE__ "test1.dat";
unlink(fname1);
CACHEFILE f1;
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
for (int64_t i = 0; i < num_entries; i++) {
r = toku_cachetable_put(f1, make_blocknum(i), i, NULL, 1, flush, pe_est_callback, pe_callback, NULL);
int curr_size = __sync_fetch_and_add(&total_size, 1);
assert(curr_size <= test_limit + test_limit/2+1);
r = toku_cachetable_unpin(f1, make_blocknum(i), i, CACHETABLE_DIRTY, 4);
}
r = toku_cachefile_close(&f1, 0, FALSE, ZERO_LSN); assert(r == 0 && f1 == 0);
r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0);
}
int
test_main(int argc, const char *argv[]) {
default_parse_args(argc, argv);
cachetable_test();
return 0;
}
......@@ -232,6 +232,7 @@ BDB_DONTRUN_TESTS = \
test3522 \
test3522b \
test938c \
test_3645 \
test_3755 \
test_abort1 \
test_abort4 \
......@@ -599,6 +600,7 @@ test_groupcommit_perf.bdbrun test_groupcommit_perf.tdbrun: VGRIND=
diskfull.tdbrun: VGRIND=
stress-gc.tdbrun: VGRIND=
insert-dup-prelock: VGRIND=
test_3645: VGRIND=
test_large_update_broadcast_small_cachetable.tdbrun: VGRIND=
test_update_broadcast_stress.tdbrun: VGRIND=
test_update_stress.tdbrun: VGRIND=
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007 Tokutek Inc. All rights reserved."
#include "test.h"
#include <stdio.h>
#include <stdlib.h>
#include <toku_pthread.h>
#include <unistd.h>
#include <memory.h>
#include <sys/stat.h>
#include <db.h>
//
// This test verifies that running evictions on a writer thread
// are ok. We create a dictionary bigger than the cachetable (around 4x greater).
// Then, we spawn a bunch of pthreads that do the following:
// - scan dictionary forward with bulk fetch
// - scan dictionary forward slowly
// - scan dictionary backward with bulk fetch
// - scan dictionary backward slowly
// - update existing values in the dictionary with db->put(DB_YESOVERWRITE)
// With the small cachetable, this should produce quite a bit of churn in reading in and evicting nodes.
// If the test runs to completion without crashing, we consider it a success.
//
BOOL run_test;
int time_of_test;
struct arg {
int n;
DB *db;
DB_ENV* env;
BOOL fast;
BOOL fwd;
};
static int
go_fast(DBT const *a, DBT const *b, void *c) {
assert(a);
assert(b);
assert(c==NULL);
return TOKUDB_CURSOR_CONTINUE;
}
static int
go_slow(DBT const *a, DBT const *b, void *c) {
assert(a);
assert(b);
assert(c==NULL);
return 0;
}
static void *scan_db(void *arg) {
struct arg *myarg = (struct arg *) arg;
DB_ENV* env = myarg->env;
DB* db = myarg->db;
DB_TXN* txn = NULL;
int r = env->txn_begin(env, 0, &txn, DB_READ_UNCOMMITTED); CKERR(r);
while(run_test) {
DBC* cursor = NULL;
CHK(db->cursor(db, txn, &cursor, 0));
while (r != DB_NOTFOUND) {
if (myarg->fwd) {
r = cursor->c_getf_next(cursor, 0, myarg->fast ? go_fast : go_slow, NULL);
}
else {
r = cursor->c_getf_prev(cursor, 0, myarg->fast ? go_fast : go_slow, NULL);
}
assert(r==0 || r==DB_NOTFOUND);
}
CHK(cursor->c_close(cursor));
}
CHK(txn->commit(txn,0));
return arg;
}
static void *ptquery_db(void *arg) {
struct arg *myarg = (struct arg *) arg;
DB_ENV* env = myarg->env;
DB* db = myarg->db;
DB_TXN* txn = NULL;
int n = myarg->n;
int r = env->txn_begin(env, 0, &txn, DB_READ_UNCOMMITTED); CKERR(r);
while(run_test) {
int rand_key = random() % n;
DBT key;
DBT val;
memset(&val, 0, sizeof(val));
dbt_init(&key, &rand_key, sizeof(rand_key));
r = db->get(db, txn, &key, &val, 0);
assert(r != DB_NOTFOUND);
}
CHK(txn->commit(txn,0));
return arg;
}
static void *update_db(void *arg) {
struct arg *myarg = (struct arg *) arg;
DB_ENV* env = myarg->env;
DB* db = myarg->db;
int n = myarg->n;
DB_TXN* txn = NULL;
while (run_test) {
int r = env->txn_begin(env, 0, &txn, DB_READ_UNCOMMITTED); CKERR(r);
for (u_int32_t i = 0; i < 1000; i++) {
int rand_key = random() % n;
int rand_val = random();
DBT key, val;
r = db->put(
db,
txn,
dbt_init(&key, &rand_key, sizeof(rand_key)),
dbt_init(&val, &rand_val, sizeof(rand_val)),
0
);
CKERR(r);
}
CHK(txn->commit(txn,0));
}
return arg;
}
static void *test_time(void *arg) {
assert(arg == NULL);
usleep(time_of_test*1000*1000);
printf("should now end test\n");
run_test = FALSE;
return arg;
}
static void
test_evictions (int nseconds) {
int n = 100000;
if (verbose) printf("test_rand_insert:%d \n", n);
DB_TXN * const null_txn = 0;
const char * const fname = "test.bulk_fetch.brt";
int r;
r = system("rm -rf " ENVDIR);
CKERR(r);
r=toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO); assert(r==0);
/* create the dup database file */
DB_ENV *env;
r = db_env_create(&env, 0); assert(r == 0);
r=env->set_default_bt_compare(env, int_dbt_cmp); CKERR(r);
// set the cache size to 10MB
r = env->set_cachesize(env, 0, 100000, 1); CKERR(r);
r=env->open(env, ENVDIR, DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
DB *db;
r = db_create(&db, env, 0);
assert(r == 0);
r = db->set_flags(db, 0);
assert(r == 0);
r = db->set_pagesize(db, 4096);
assert(r == 0);
r = db->set_readpagesize(db, 1024);
assert(r == 0);
r = db->open(db, null_txn, fname, "main", DB_BTREE, DB_CREATE, 0666);
assert(r == 0);
int keys[n];
for (int i=0; i<n; i++) {
keys[i] = i;
}
for (int i=0; i<n; i++) {
DBT key, val;
r = db->put(db, null_txn, dbt_init(&key, &keys[i], sizeof keys[i]), dbt_init(&val, &i, sizeof i), 0);
assert(r == 0);
}
//
// the threads that we want:
// - one thread constantly updating random values
// - one thread doing table scan with bulk fetch
// - one thread doing table scan without bulk fetch
// - one thread doing random point queries
//
toku_pthread_t mytids[7];
struct arg myargs[7];
for (u_int32_t i = 0; i < sizeof(myargs)/sizeof(myargs[0]); i++) {
myargs[i].n = n;
myargs[i].db = db;
myargs[i].env = env;
myargs[i].fast = TRUE;
myargs[i].fwd = TRUE;
}
// make the forward fast scanner
myargs[0].fast = TRUE;
myargs[0].fwd = TRUE;
CHK(toku_pthread_create(&mytids[0], NULL, scan_db, &myargs[0]));
// make the forward slow scanner
myargs[1].fast = FALSE;
myargs[1].fwd = TRUE;
CHK(toku_pthread_create(&mytids[1], NULL, scan_db, &myargs[1]));
// make the backward fast scanner
myargs[2].fast = TRUE;
myargs[2].fwd = FALSE;
CHK(toku_pthread_create(&mytids[2], NULL, scan_db, &myargs[2]));
// make the backward slow scanner
myargs[3].fast = FALSE;
myargs[3].fwd = FALSE;
CHK(toku_pthread_create(&mytids[3], NULL, scan_db, &myargs[3]));
// make the guy that updates the db
CHK(toku_pthread_create(&mytids[4], NULL, update_db, &myargs[4]));
// make the guy that does point queries
CHK(toku_pthread_create(&mytids[5], NULL, ptquery_db, &myargs[5]));
run_test = TRUE;
time_of_test = nseconds;
// make the guy that sleeps
CHK(toku_pthread_create(&mytids[6], NULL, test_time, NULL));
for (u_int32_t i = 0; i < sizeof(myargs)/sizeof(myargs[0]); i++) {
void *ret;
r = toku_pthread_join(mytids[i], &ret); assert_zero(r);
}
r = db->close(db, 0); CKERR(r);
r = env->close(env, 0); CKERR(r);
}
int
test_main(int argc, char *const argv[]) {
parse_args(argc, argv);
test_evictions(60);
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment