Commit 789cb4ad authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:4357], [t:4364], [t:4365], merge fixes to main

git-svn-id: file:///svn/toku/tokudb@38595 c7de825b-a66e-492c-adef-691d508d4ae1
parent 830de7e5
......@@ -756,11 +756,21 @@ void *mempool_malloc_from_omt(OMT omt, struct mempool *mp, size_t size, void **m
// from the OMT (which items refer to items in the old mempool) into the new mempool.
// If MAYBE_FREE is NULL then free the old mempool's space.
// Otherwise, store the old mempool's space in maybe_free.
void
toku_get_node_for_verify(
BLOCKNUM blocknum,
BRT brt,
BRTNODE* nodep
);
int toku_verify_brtnode (BRT brt, MSN rootmsn, MSN parentmsn,
BLOCKNUM blocknum, int height, struct kv_pair *lesser_pivot, struct kv_pair *greatereq_pivot,
int (*progress_callback)(void *extra, float progress), void *extra,
int recurse, int verbose, int keep_on_going)
int
toku_verify_brtnode (BRT brt,
MSN rootmsn, MSN parentmsn,
BRTNODE node, int height,
struct kv_pair *lesser_pivot, // Everything in the subtree should be > lesser_pivot. (lesser_pivot==NULL if there is no lesser pivot.)
struct kv_pair *greatereq_pivot, // Everything in the subtree should be <= lesser_pivot. (lesser_pivot==NULL if there is no lesser pivot.)
int (*progress_callback)(void *extra, float progress), void *progress_extra,
int recurse, int verbose, int keep_going_on_failure)
__attribute__ ((warn_unused_result));
void toku_brtheader_free (struct brt_header *h);
......
......@@ -13,6 +13,7 @@
#include "includes.h"
#include <brt-flusher.h>
#include "brt-cachetable-wrappers.h"
static int
compare_pairs (BRT brt, struct kv_pair *a, struct kv_pair *b) {
......@@ -197,44 +198,42 @@ count_eq_key_msn(BRT brt, FIFO fifo, OMT mt, const void *key, size_t keylen, MSN
return count;
}
void
toku_get_node_for_verify(
BLOCKNUM blocknum,
BRT brt,
BRTNODE* nodep
)
{
u_int32_t fullhash = toku_cachetable_hash(brt->cf, blocknum);
struct brtnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, brt->h);
toku_pin_brtnode_off_client_thread(
brt->h,
blocknum,
fullhash,
&bfe,
0,
NULL,
nodep
);
}
// input is a pinned node, on exit, node is unpinned
int
toku_verify_brtnode (BRT brt,
MSN rootmsn, MSN parentmsn,
BLOCKNUM blocknum, int height,
BRTNODE node, int height,
struct kv_pair *lesser_pivot, // Everything in the subtree should be > lesser_pivot. (lesser_pivot==NULL if there is no lesser pivot.)
struct kv_pair *greatereq_pivot, // Everything in the subtree should be <= lesser_pivot. (lesser_pivot==NULL if there is no lesser pivot.)
int (*progress_callback)(void *extra, float progress), void *progress_extra,
int recurse, int verbose, int keep_going_on_failure)
{
int result=0;
BRTNODE node;
void *node_v;
MSN this_msn;
BLOCKNUM blocknum = node->thisnodename;
u_int32_t fullhash = toku_cachetable_hash(brt->cf, blocknum);
{
struct brtnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, brt->h);
int r = toku_cachetable_get_and_pin(
brt->cf,
blocknum,
fullhash,
&node_v,
NULL,
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_est_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
toku_brtnode_cleaner_callback,
&bfe,
brt->h
);
assert_zero(r); // this is a bad failure if it happens.
}
//printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v);
node = node_v;
toku_assert_entire_node_in_memory(node);
this_msn = node->max_msn_applied_to_node_on_disk;
if (rootmsn.msn == ZERO_MSN.msn) {
......@@ -243,7 +242,6 @@ toku_verify_brtnode (BRT brt,
parentmsn = this_msn;
}
invariant(node->fullhash == fullhash); // this is a bad failure if wrong
if (height >= 0) {
invariant(height == node->height); // this is a bad failure if wrong
}
......@@ -326,7 +324,7 @@ toku_verify_brtnode (BRT brt,
}
last_msn = msn;
}));
struct verify_message_tree_extra extra = { .fifo = bnc->buffer, .broadcast = false, .is_fresh = true, .i = i, .verbose = verbose, .blocknum = blocknum, .keep_going_on_failure = keep_going_on_failure };
struct verify_message_tree_extra extra = { .fifo = bnc->buffer, .broadcast = false, .is_fresh = true, .i = i, .verbose = verbose, .blocknum = node->thisnodename, .keep_going_on_failure = keep_going_on_failure };
int r = toku_omt_iterate(bnc->fresh_message_tree, verify_message_tree, &extra);
if (r != 0) { result = r; goto done; }
extra.is_fresh = false;
......@@ -361,8 +359,10 @@ toku_verify_brtnode (BRT brt,
// Verify that the subtrees have the right properties.
if (recurse && node->height > 0) {
for (int i = 0; i < node->n_children; i++) {
BRTNODE child_node;
toku_get_node_for_verify(BP_BLOCKNUM(node, i), brt, &child_node);
int r = toku_verify_brtnode(brt, rootmsn, this_msn,
BP_BLOCKNUM(node, i), node->height-1,
child_node, node->height-1,
(i==0) ? lesser_pivot : node->childkeys[i-1],
(i==node->n_children-1) ? greatereq_pivot : node->childkeys[i],
progress_callback, progress_extra,
......@@ -375,7 +375,13 @@ toku_verify_brtnode (BRT brt,
}
done:
{
int r = toku_cachetable_unpin(brt->cf, blocknum, fullhash, CACHETABLE_CLEAN, make_brtnode_pair_attr(node));
int r = toku_cachetable_unpin(
brt->cf,
node->thisnodename,
toku_cachetable_hash(brt->cf, node->thisnodename),
CACHETABLE_CLEAN,
make_brtnode_pair_attr(node)
);
assert_zero(r); // this is a bad failure if it happens.
}
......@@ -388,9 +394,13 @@ done:
int
toku_verify_brt_with_progress (BRT brt, int (*progress_callback)(void *extra, float progress), void *progress_extra, int verbose, int keep_on_going) {
assert(brt->h);
toku_cachetable_call_ydb_lock(brt->cf);
u_int32_t root_hash;
CACHEKEY *rootp = toku_calculate_root_offset_pointer(brt->h, &root_hash);
int r = toku_verify_brtnode(brt, ZERO_MSN, ZERO_MSN, *rootp, -1, NULL, NULL, progress_callback, progress_extra, 1, verbose, keep_on_going);
BRTNODE root_node;
toku_get_node_for_verify(*rootp, brt, &root_node);
toku_cachetable_call_ydb_unlock(brt->cf);
int r = toku_verify_brtnode(brt, ZERO_MSN, ZERO_MSN, root_node, -1, NULL, NULL, progress_callback, progress_extra, 1, verbose, keep_on_going);
if (r == 0) {
toku_brtheader_lock(brt->h);
brt->h->time_of_last_verification = time(NULL);
......
......@@ -5729,9 +5729,10 @@ static int
toku_dump_brtnode (FILE *file, BRT brt, BLOCKNUM blocknum, int depth, struct kv_pair *lorange, struct kv_pair *hirange) {
int result=0;
BRTNODE node;
void *node_v;
void* node_v;
toku_get_node_for_verify(blocknum, brt, &node);
result=toku_verify_brtnode(brt, ZERO_MSN, ZERO_MSN, node, -1, lorange, hirange, NULL, NULL, 0, 1, 0);
u_int32_t fullhash = toku_cachetable_hash(brt->cf, blocknum);
result=toku_verify_brtnode(brt, ZERO_MSN, ZERO_MSN, blocknum, -1, lorange, hirange, NULL, NULL, 0, 1, 0);
struct brtnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, brt->h);
int r = toku_cachetable_get_and_pin(
......
......@@ -1234,17 +1234,8 @@ static void cachetable_remove_pair (CACHETABLE ct, PAIR p) {
p->already_removed = TRUE;
}
// Maybe remove a pair from the cachetable and free it, depending on whether
// or not there are any threads interested in the pair. The flush callback
// is called with write_me and keep_me both false, and the pair is destroyed.
// The sole purpose of this function is to remove the node, so the write_me
// argument to the flush callback is false, and the flush callback won't do
// anything except destroy the node.
static void cachetable_maybe_remove_and_free_pair (CACHETABLE ct, PAIR p, BOOL* destroyed) {
*destroyed = FALSE;
if (nb_mutex_users(&p->nb_mutex) == 0) {
cachetable_remove_pair(ct, p);
static void cachetable_free_pair(CACHETABLE ct, PAIR p) {
// helgrind
CACHETABLE_FLUSH_CALLBACK flush_callback = p->flush_callback;
CACHEFILE cachefile = p->cachefile;
......@@ -1265,6 +1256,19 @@ static void cachetable_maybe_remove_and_free_pair (CACHETABLE ct, PAIR p, BOOL*
rwlock_read_unlock(&cachefile->fdlock);
ctpair_destroy(p);
}
// Maybe remove a pair from the cachetable and free it, depending on whether
// or not there are any threads interested in the pair. The flush callback
// is called with write_me and keep_me both false, and the pair is destroyed.
// The sole purpose of this function is to remove the node, so the write_me
// argument to the flush callback is false, and the flush callback won't do
// anything except destroy the node.
static void cachetable_maybe_remove_and_free_pair (CACHETABLE ct, PAIR p, BOOL* destroyed) {
*destroyed = FALSE;
if (nb_mutex_users(&p->nb_mutex) == 0) {
cachetable_remove_pair(ct, p);
cachetable_free_pair(ct, p);
*destroyed = TRUE;
}
}
......@@ -1404,6 +1408,7 @@ static void cachetable_write_pair(CACHETABLE ct, PAIR p, BOOL remove_me) {
if (p->cq)
workqueue_enq(p->cq, &p->asyncwork, 1);
else {
p->state = CTPAIR_IDLE;
BOOL destroyed;
cachetable_complete_write_pair(ct, p, remove_me, &destroyed);
}
......@@ -1415,8 +1420,6 @@ static void cachetable_write_pair(CACHETABLE ct, PAIR p, BOOL remove_me) {
static void cachetable_complete_write_pair (CACHETABLE ct, PAIR p, BOOL do_remove, BOOL* destroyed) {
p->cq = 0;
p->state = CTPAIR_IDLE;
nb_mutex_write_unlock(&p->nb_mutex);
if (do_remove) {
cachetable_maybe_remove_and_free_pair(ct, p, destroyed);
......@@ -1592,6 +1595,16 @@ static void maybe_flush_some (CACHETABLE ct, long size) {
workqueue_enq(&ct->wq, wi, 0);
}
else {
// maybe_flush_some is always run on a client thread
// As a result, the cachefile cannot be in process of closing,
// and therefore a completion queue is not set up for
// closing the cachefile
// Also, because we locked this PAIR when there were no
// other users trying to get access, no thread running
// unpin_and_remove may have gotten in here and
// set up a completion queue.
// So, a completion queue cannot exist
assert(!curr_in_clock->cq);
nb_mutex_write_unlock(&curr_in_clock->nb_mutex);
}
}
......@@ -2341,6 +2354,15 @@ cachetable_unpin_internal(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash,
count++;
if (p->key.b==key.b && p->cachefile==cachefile) {
assert(nb_mutex_writers(&p->nb_mutex)>0);
// this is a client thread that is unlocking the PAIR
// That is, a cleaner, flusher, or get_and_pin thread
// So, there must not be a completion queue lying around
// cachefile closes wait for the client threads to complete,
// and unpin_and_remove cannot be running because
// unpin_and_remove starts by holding the PAIR lock
// So, we should assert that a completion queue does not
// exist
assert(!p->cq);
nb_mutex_write_unlock(&p->nb_mutex);
if (dirty) p->dirty = CACHETABLE_DIRTY;
PAIR_ATTR old_attr = p->attr;
......@@ -2483,8 +2505,28 @@ int toku_cachetable_get_and_pin_nonblocking (
cachetable_wait_writing++;
}
nb_mutex_write_lock(&p->nb_mutex, ct->mutex);
// deadlock discovered in #4357 shows we need
// to do this. After running unlockers and waiting
// on the PAIR lock, a flusher thread may come
// along and try to unpin_and_remove this PAIR.
// In that case, the thread running unpin_and_remove
// sets up a completion queue and we must transfer ownership
// of this PAIR lock to that thread via the completion
// queue
if (p->cq) {
// while we wait on the PAIR lock, a thread may come in and
// call toku_cachetable_unpin_and_remove on this PAIR.
// In that case, we must do NOTHING with the PAIR, as
// it has been removed from the cachetable's data structures.
// So, we should just pass the PAIR over to the completion
// queue.
workitem_init(&p->asyncwork, NULL, p);
workqueue_enq(p->cq, &p->asyncwork, 1);
}
else {
nb_mutex_write_unlock(&p->nb_mutex);
}
}
cachetable_unlock(ct);
if (ct->ydb_lock_callback) ct->ydb_lock_callback();
return TOKUDB_TRY_AGAIN;
......@@ -2601,7 +2643,13 @@ int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
if (doing_prefetch) {
*doing_prefetch = TRUE;
}
} else if (nb_mutex_users(&p->nb_mutex)==0) {
}
else if (nb_mutex_users(&p->nb_mutex)==0) {
// client should not be trying to prefetch a node that is either
// belongs to a cachefile being flushed or to a PAIR being
// unpinned and removed
assert(!p->cq);
// nobody else is using the node, so we should go ahead and prefetch
nb_mutex_write_lock(&p->nb_mutex, ct->mutex);
BOOL partial_fetch_required = pf_req_callback(p->value, read_extraargs);
......@@ -2619,6 +2667,9 @@ int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
}
}
else {
// sanity check, we already have an assert
// before locking the PAIR
assert(!p->cq);
nb_mutex_write_unlock(&p->nb_mutex);
}
}
......@@ -3018,15 +3069,19 @@ int toku_cachetable_unpin_and_remove (
p->checkpoint_pending = FALSE;
//
// Here is a tricky thing.
// In the code below, we may release the
// cachetable lock if there are blocked writers
// on this pair. While the cachetable lock is released,
// Later on in this function, we may release the
// cachetable lock if other threads are blocked
// on this pair, trying to acquire the PAIR lock.
// While the cachetable lock is released,
// we may theoretically begin another checkpoint, or start
// a cleaner thread.
// So, in order for this PAIR to not be marked
// So, just to be sure this PAIR won't be marked
// for the impending checkpoint, we mark the
// PAIR as clean. For the PAIR to not be picked by the
// cleaner thread, we mark the cachepressure_size to be 0
// This should not be an issue because we call
// cachetable_remove_pair before
// releasing the cachetable lock.
//
p->dirty = CACHETABLE_CLEAN;
CACHEKEY key_to_remove = key;
......@@ -3043,25 +3098,54 @@ int toku_cachetable_unpin_and_remove (
remove_key_extra
);
}
// we must not have a completion queue
// lying around, as we may create one now
assert(!p->cq);
nb_mutex_write_unlock(&p->nb_mutex);
//
// need to find a way to assert that
// ONLY the checkpoint thread may be blocked here
//
// The assumption here is that only the checkpoint thread may
// be blocked here. No writer thread may be a blocked writer,
// because the writer thread has only locked PAIRs.
// The writer thread does not try to acquire a lock. It cannot be a
// client thread either, because no client thread should be trying
// to lock a node that another thread is trying to remove
// from the cachetable. It cannot be a kibbutz thread either
// because the client controls what work is done on the kibbutz,
// and should be smart enough to make sure that no other thread
// tries to lock a PAIR while trying to unpin_and_remove it. So,
// the only thread that is left that can possibly be a blocked
// writer is the checkpoint thread.
// As of Dr. Noga, only these threads may be
// blocked waiting to lock this PAIR:
// - the checkpoint thread (because a checkpoint is in progress
// and the PAIR was in the list of pending pairs)
// - a client thread running get_and_pin_nonblocking, who
// ran unlockers, then waited on the PAIR lock.
// While waiting on a PAIR lock, another thread comes in,
// locks the PAIR, and ends up calling unpin_and_remove,
// all while get_and_pin_nonblocking is waiting on the PAIR lock.
// We did not realize this at first, which caused bug #4357
// The following threads CANNOT be blocked waiting on
// the PAIR lock:
// - a thread trying to run eviction via maybe_flush_some.
// That cannot happen because maybe_flush_some only
// attempts to lock PAIRS that are not locked, and this PAIR
// is locked.
// - cleaner thread, for the same reason as a thread running
// eviction
// - client thread doing a normal get_and_pin. The client is smart
// enough to not try to lock a PAIR that another client thread
// is trying to unpin and remove. Note that this includes work
// done on kibbutzes.
// - writer thread. Writer threads do not grab PAIR locks. They
// get PAIR locks transferred to them by client threads.
//
// first thing we do is remove the PAIR from the various
// cachetable data structures, so no other thread can possibly
// access it. We do not want to risk some other thread
// trying to lock this PAIR if we release the cachetable lock
// below. If some thread is already waiting on the lock,
// then we let that thread grab the lock and finish, but
// we don't want any NEW threads to try to grab the PAIR
// lock.
//
// Because we call cachetable_remove_pair and setup a completion queue,
// the threads that may be waiting
// on this PAIR lock must be careful to do NOTHING with the PAIR because
// it notices a completion queue. As per our analysis above, we only need
// to make sure the checkpoint thread and get_and_pin_nonblocking do
// nothing, and looking at those functions, it is clear they do nothing.
//
cachetable_remove_pair(ct, p);
if (nb_mutex_blocked_writers(&p->nb_mutex)>0) {
struct workqueue cq;
workqueue_init(&cq);
......@@ -3071,7 +3155,7 @@ int toku_cachetable_unpin_and_remove (
//They are still blocked because we have not released the
//cachetable lock.
//If we freed the memory for the pair we would have dangling
//pointers. We need to let the checkpoint thread finish up with
//pointers. We need to let the other threads finish up with
//this pair.
p->cq = &cq;
......@@ -3089,7 +3173,6 @@ int toku_cachetable_unpin_and_remove (
//We are holding the write lock on the pair
cachetable_lock(ct);
assert(nb_mutex_writers(&p->nb_mutex) == 1);
BOOL destroyed = FALSE;
// let's also assert that this PAIR was not somehow marked
// as pending a checkpoint. Above, when calling
// remove_key(), we cleared the dirty bit so that
......@@ -3097,12 +3180,13 @@ int toku_cachetable_unpin_and_remove (
// make sure that our assumption is valid.
assert(!p->checkpoint_pending);
assert(p->attr.cache_pressure_size == 0);
nb_mutex_write_unlock(&p->nb_mutex);
// Because we assume it is just the checkpoint thread
// that may have been blocked (as argued above),
// it is safe to simply remove the PAIR from the
// cachetable. We don't need to write anything out.
cachetable_complete_write_pair(ct, p, TRUE, &destroyed);
if (destroyed) {
if (nb_mutex_blocked_writers(&p->nb_mutex) == 0) {
cachetable_free_pair(ct, p);
break;
}
}
......@@ -3110,9 +3194,7 @@ int toku_cachetable_unpin_and_remove (
}
else {
//Remove pair.
BOOL destroyed = FALSE;;
cachetable_maybe_remove_and_free_pair(ct, p, &destroyed);
assert(destroyed);
cachetable_free_pair(ct, p);
}
r = 0;
goto done;
......@@ -3899,6 +3981,7 @@ toku_cleaner_thread (void *cachetable_v)
// The cleaner callback must have unlocked the pair, so we
// don't need to unlock it if the cleaner callback is called.
if (!cleaner_callback_called) {
assert(!best_pair->cq);
nb_mutex_write_unlock(&best_pair->nb_mutex);
}
rwlock_read_unlock(&cf->fdlock);
......
#ident "$Id: cachetable-simple-verify.c 36689 2011-11-07 22:08:05Z zardosht $"
#ident "Copyright (c) 2007-2011 Tokutek Inc. All rights reserved."
#include "includes.h"
#include "test.h"
static void
flush (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
void *v __attribute__((__unused__)),
void *e __attribute__((__unused__)),
PAIR_ATTR s __attribute__((__unused__)),
PAIR_ATTR* new_size __attribute__((__unused__)),
BOOL w __attribute__((__unused__)),
BOOL keep __attribute__((__unused__)),
BOOL c __attribute__((__unused__))
) {
/* Do nothing */
if (verbose) { printf("FLUSH: %d\n", (int)k.b); }
//usleep (5*1024*1024);
}
static int
fetch (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
u_int32_t fullhash __attribute__((__unused__)),
void **value __attribute__((__unused__)),
PAIR_ATTR *sizep __attribute__((__unused__)),
int *dirtyp,
void *extraargs __attribute__((__unused__))
) {
*dirtyp = 0;
*value = NULL;
*sizep = make_pair_attr(8);
return 0;
}
CACHEFILE f1;
static void *pin_nonblocking(void *arg) {
void* v1;
long s1;
int r = toku_cachetable_get_and_pin_nonblocking(
f1,
make_blocknum(1),
toku_cachetable_hash(f1, make_blocknum(1)),
&v1,
&s1,
flush, fetch, def_pe_est_callback, def_pe_callback, def_pf_req_callback, def_pf_callback, def_cleaner_callback,
NULL,
NULL,
NULL
);
assert(r==TOKUDB_TRY_AGAIN);
return arg;
}
static void
cachetable_test (void) {
const int test_limit = 12;
int r;
CACHETABLE ct;
r = toku_create_cachetable(&ct, test_limit, ZERO_LSN, NULL_LOGGER); assert(r == 0);
char fname1[] = __FILE__ "test1.dat";
unlink(fname1);
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
void* v1;
long s1;
r = toku_cachetable_get_and_pin(
f1,
make_blocknum(1),
toku_cachetable_hash(f1, make_blocknum(1)),
&v1,
&s1,
flush, fetch, def_pe_est_callback, def_pe_callback, def_pf_req_callback, def_pf_callback, def_cleaner_callback,
NULL,
NULL
);
toku_pthread_t pin_nonblocking_tid;
r = toku_pthread_create(&pin_nonblocking_tid, NULL, pin_nonblocking, NULL);
assert_zero(r);
// sleep 3 seconds
usleep(3*1024*1024);
r = toku_cachetable_unpin_and_remove(f1, make_blocknum(1), NULL, NULL);
assert_zero(r);
void *ret;
r = toku_pthread_join(pin_nonblocking_tid, &ret);
assert_zero(r);
toku_cachetable_verify(ct);
r = toku_cachefile_close(&f1, 0, FALSE, ZERO_LSN); assert(r == 0 && f1 == 0);
r = toku_cachetable_close(&ct); lazy_assert_zero(r);
}
int
test_main(int argc, const char *argv[]) {
default_parse_args(argc, argv);
cachetable_test();
return 0;
}
#ident "$Id: cachetable-simple-verify.c 36689 2011-11-07 22:08:05Z zardosht $"
#ident "Copyright (c) 2007-2011 Tokutek Inc. All rights reserved."
#include "includes.h"
#include "test.h"
static void
flush (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
void *v __attribute__((__unused__)),
void *e __attribute__((__unused__)),
PAIR_ATTR s __attribute__((__unused__)),
PAIR_ATTR* new_size __attribute__((__unused__)),
BOOL w __attribute__((__unused__)),
BOOL keep __attribute__((__unused__)),
BOOL c __attribute__((__unused__))
) {
/* Do nothing */
if (verbose) { printf("FLUSH: %d\n", (int)k.b); }
//usleep (5*1024*1024);
}
static int
fetch (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
u_int32_t fullhash __attribute__((__unused__)),
void **value __attribute__((__unused__)),
PAIR_ATTR *sizep __attribute__((__unused__)),
int *dirtyp,
void *extraargs __attribute__((__unused__))
) {
*dirtyp = 0;
*value = NULL;
*sizep = make_pair_attr(8);
return 0;
}
CACHEFILE f1;
static void *pin_nonblocking(void *arg) {
void* v1;
long s1;
int r = toku_cachetable_get_and_pin_nonblocking(
f1,
make_blocknum(1),
toku_cachetable_hash(f1, make_blocknum(1)),
&v1,
&s1,
flush, fetch, def_pe_est_callback, def_pe_callback, def_pf_req_callback, def_pf_callback, def_cleaner_callback,
NULL,
NULL,
NULL
);
assert(r==TOKUDB_TRY_AGAIN);
return arg;
}
static void *put_same_key(void *arg) {
int r = toku_cachetable_put(
f1,
make_blocknum(1),
toku_cachetable_hash(f1,make_blocknum(1)),
NULL,
make_pair_attr(4),
flush, def_pe_est_callback, def_pe_callback, def_cleaner_callback,
NULL
);
assert(r==0);
return arg;
}
toku_pthread_t put_tid;
static void test_remove_key(CACHEKEY* UU(cachekey), BOOL UU(for_checkpoint), void* UU(extra)) {
int r = toku_pthread_create(&put_tid, NULL, put_same_key, NULL);
assert_zero(r);
}
static void
cachetable_test (void) {
const int test_limit = 12;
int r;
CACHETABLE ct;
r = toku_create_cachetable(&ct, test_limit, ZERO_LSN, NULL_LOGGER); assert(r == 0);
char fname1[] = __FILE__ "test1.dat";
unlink(fname1);
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
void* v1;
long s1;
r = toku_cachetable_get_and_pin(
f1,
make_blocknum(1),
toku_cachetable_hash(f1, make_blocknum(1)),
&v1,
&s1,
flush, fetch, def_pe_est_callback, def_pe_callback, def_pf_req_callback, def_pf_callback, def_cleaner_callback,
NULL,
NULL
);
toku_pthread_t pin_nonblocking_tid;
r = toku_pthread_create(&pin_nonblocking_tid, NULL, pin_nonblocking, NULL);
assert_zero(r);
// sleep 3 seconds
usleep(3*1024*1024);
r = toku_cachetable_unpin_and_remove(f1, make_blocknum(1), test_remove_key, NULL);
assert_zero(r);
void *ret;
r = toku_pthread_join(pin_nonblocking_tid, &ret);
assert_zero(r);
r = toku_pthread_join(put_tid, &ret);
assert_zero(r);
r = toku_cachetable_unpin(f1, make_blocknum(1), toku_cachetable_hash(f1, make_blocknum(1)), CACHETABLE_CLEAN, make_pair_attr(2));
toku_cachetable_verify(ct);
r = toku_cachefile_close(&f1, 0, FALSE, ZERO_LSN); assert(r == 0 && f1 == 0);
r = toku_cachetable_close(&ct); lazy_assert_zero(r);
}
int
test_main(int argc, const char *argv[]) {
default_parse_args(argc, argv);
for (int i = 0; i < 20; i++) {
cachetable_test();
}
return 0;
}
......@@ -6476,20 +6476,17 @@ struct ydb_verify_context {
static int
ydb_verify_progress_callback(void *extra, float progress) {
struct ydb_verify_context *context = (struct ydb_verify_context *) extra;
toku_ydb_unlock_and_yield(1000);
int r = 0;
if (context->progress_callback)
if (context->progress_callback) {
r = context->progress_callback(context->progress_extra, progress);
toku_ydb_lock();
}
return r;
}
static int
locked_db_verify_with_progress(DB *db, int (*progress_callback)(void *extra, float progress), void *progress_extra, int verbose, int keep_going) {
struct ydb_verify_context context = { progress_callback, progress_extra };
toku_ydb_lock();
int r = toku_verify_brt_with_progress(db->i->brt, ydb_verify_progress_callback, &context, verbose, keep_going);
toku_ydb_unlock();
return r;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment