Commit 775aeb0a authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:4311], write pair for checkpointing on cleaner thread if necessary, before...

[t:4311], write pair for checkpointing on cleaner thread if necessary, before calling cleaner_callback

git-svn-id: file:///svn/toku/tokudb@38187 c7de825b-a66e-492c-adef-691d508d4ae1
parent e4d6d69f
...@@ -409,8 +409,6 @@ u_int32_t toku_get_cleaner_iterations_unlocked (CACHETABLE ct) { ...@@ -409,8 +409,6 @@ u_int32_t toku_get_cleaner_iterations_unlocked (CACHETABLE ct) {
// reserve 25% as "unreservable". The loader cannot have it. // reserve 25% as "unreservable". The loader cannot have it.
#define unreservable_memory(size) ((size)/4) #define unreservable_memory(size) ((size)/4)
static int cleaner_thread (void *cachetable_v);
int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN UU(initial_lsn), TOKULOGGER logger) { int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN UU(initial_lsn), TOKULOGGER logger) {
CACHETABLE MALLOC(ct); CACHETABLE MALLOC(ct);
if (ct == 0) return ENOMEM; if (ct == 0) return ENOMEM;
...@@ -429,7 +427,7 @@ int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN UU(initial_l ...@@ -429,7 +427,7 @@ int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN UU(initial_l
ct->kibbutz = toku_kibbutz_create(toku_os_get_number_active_processors()); ct->kibbutz = toku_kibbutz_create(toku_os_get_number_active_processors());
toku_minicron_setup(&ct->checkpointer, 0, checkpoint_thread, ct); // default is no checkpointing toku_minicron_setup(&ct->checkpointer, 0, checkpoint_thread, ct); // default is no checkpointing
toku_minicron_setup(&ct->cleaner, 0, cleaner_thread, ct); // default is no cleaner, for now toku_minicron_setup(&ct->cleaner, 0, toku_cleaner_thread, ct); // default is no cleaner, for now
ct->cleaner_iterations = 1; // default is one iteration ct->cleaner_iterations = 1; // default is one iteration
r = toku_omt_create(&ct->reserved_filenums); assert(r==0); r = toku_omt_create(&ct->reserved_filenums); assert(r==0);
ct->env_dir = toku_xstrdup("."); ct->env_dir = toku_xstrdup(".");
...@@ -3752,8 +3750,8 @@ cleaner_thread_rate_pair(PAIR p) ...@@ -3752,8 +3750,8 @@ cleaner_thread_rate_pair(PAIR p)
static int const CLEANER_N_TO_CHECK = 8; static int const CLEANER_N_TO_CHECK = 8;
static int int
cleaner_thread (void *cachetable_v) toku_cleaner_thread (void *cachetable_v)
// Effect: runs a cleaner. // Effect: runs a cleaner.
// //
// We look through some number of nodes, the first N that we see which are // We look through some number of nodes, the first N that we see which are
...@@ -3795,24 +3793,47 @@ cleaner_thread (void *cachetable_v) ...@@ -3795,24 +3793,47 @@ cleaner_thread (void *cachetable_v)
} while (ct->cleaner_head != first_pair && n_seen < CLEANER_N_TO_CHECK); } while (ct->cleaner_head != first_pair && n_seen < CLEANER_N_TO_CHECK);
if (best_pair) { if (best_pair) {
nb_mutex_write_lock(&best_pair->nb_mutex, ct->mutex); nb_mutex_write_lock(&best_pair->nb_mutex, ct->mutex);
// the order of operations for these two pieces is important
// we must add the background job first, while we still have the
// cachetable lock and we are assured that the best_pair's
// cachefile is not flushing. Once we add the background
// job, we know that flushing a cachefile will wait on
// this background job to be completed.
// If we were to add the background job after
// writing a PAIR for checkpoint, then we risk
// releasing the cachetable lock during the write
// and allowing a cachefile flush to sneak in
add_background_job(best_pair->cachefile, true); add_background_job(best_pair->cachefile, true);
if (best_pair->checkpoint_pending) {
write_locked_pair_for_checkpoint(ct, best_pair);
}
} }
cachetable_unlock(ct); cachetable_unlock(ct);
if (best_pair) { if (best_pair) {
// it's theoretically possible that after writing a PAIR for checkpoint, the
// PAIR's heuristic tells us nothing needs to be done. It is not possible
// in Dr. Noga, but unit tests verify this behavior works properly.
CACHEFILE cf = best_pair->cachefile; CACHEFILE cf = best_pair->cachefile;
if (cleaner_thread_rate_pair(best_pair) > 0) {
int r = best_pair->cleaner_callback(best_pair->value, int r = best_pair->cleaner_callback(best_pair->value,
best_pair->key, best_pair->key,
best_pair->fullhash, best_pair->fullhash,
best_pair->write_extraargs); best_pair->write_extraargs);
assert_zero(r); assert_zero(r);
// The cleaner callback must have unlocked the pair, so we
// don't need to unlock it here.
}
else {
cachetable_lock(ct);
nb_mutex_write_unlock(&best_pair->nb_mutex);
cachetable_unlock(ct);
}
// We need to make sure the cachefile sticks around so a close // We need to make sure the cachefile sticks around so a close
// can't come destroy it. That's the purpose of this // can't come destroy it. That's the purpose of this
// "add/remove_background_job" business, which means the // "add/remove_background_job" business, which means the
// cachefile is still valid here, even though the cleaner // cachefile is still valid here, even though the cleaner
// callback unlocks the pair. // callback unlocks the pair.
remove_background_job(cf, false); remove_background_job(cf, false);
// The cleaner callback must have unlocked the pair, so we
// don't need to unlock it here.
} else { } else {
// If we didn't find anything this time around the cachetable, // If we didn't find anything this time around the cachetable,
// we probably won't find anything if we run around again, so // we probably won't find anything if we run around again, so
......
...@@ -557,6 +557,10 @@ void remove_background_job (CACHEFILE cf, bool already_locked); ...@@ -557,6 +557,10 @@ void remove_background_job (CACHEFILE cf, bool already_locked);
// test-only function // test-only function
extern int toku_cachetable_get_checkpointing_user_data_status(void); extern int toku_cachetable_get_checkpointing_user_data_status(void);
int
toku_cleaner_thread (void *cachetable_v);
#if defined(__cplusplus) || defined(__cilkplusplus) #if defined(__cplusplus) || defined(__cilkplusplus)
}; };
#endif #endif
......
#ident "$Id: cachetable-simple-verify.c 36689 2011-11-07 22:08:05Z zardosht $"
#ident "Copyright (c) 2007-2011 Tokutek Inc. All rights reserved."
#include "includes.h"
#include "test.h"
CACHEFILE f1;
BOOL flush_called;
static void
flush (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
void *v __attribute__((__unused__)),
void *e __attribute__((__unused__)),
PAIR_ATTR s __attribute__((__unused__)),
PAIR_ATTR* new_size __attribute__((__unused__)),
BOOL w __attribute__((__unused__)),
BOOL keep __attribute__((__unused__)),
BOOL c __attribute__((__unused__))
) {
/* Do nothing */
if (verbose) { printf("FLUSH: %d\n", (int)k.b); }
//usleep (5*1024*1024);
PAIR_ATTR attr = make_pair_attr(8);
attr.cache_pressure_size = 8;
*new_size = attr;
if (w) {
assert(!flush_called);
assert(c);
flush_called = TRUE;
}
}
static int
fetch (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
u_int32_t fullhash __attribute__((__unused__)),
void **value __attribute__((__unused__)),
PAIR_ATTR *sizep __attribute__((__unused__)),
int *dirtyp,
void *extraargs __attribute__((__unused__))
) {
*dirtyp = 0;
*value = NULL;
*sizep = make_pair_attr(8);
return 0;
}
BOOL cleaner_called;
static int
cleaner_callback(
void* UU(brtnode_pv),
BLOCKNUM blocknum,
u_int32_t fullhash,
void* UU(extraargs)
)
{
assert(blocknum.b == 1);
assert(fullhash == 1);
assert(!cleaner_called);
assert(flush_called);
cleaner_called = TRUE;
int r = toku_cachetable_unpin(f1, blocknum, fullhash, CACHETABLE_CLEAN, make_pair_attr(8));
assert_zero(r);
return 0;
}
static void
cachetable_test (void) {
const int test_limit = 12;
int r;
CACHETABLE ct;
r = toku_create_cachetable(&ct, test_limit, ZERO_LSN, NULL_LOGGER); assert(r == 0);
char fname1[] = __FILE__ "test1.dat";
unlink(fname1);
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
void* v1;
//void* v2;
long s1;
//long s2;
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, flush, fetch, def_pe_est_callback, def_pe_callback, def_pf_req_callback, def_pf_callback, cleaner_callback, NULL, NULL);
PAIR_ATTR attr = make_pair_attr(8);
attr.cache_pressure_size = 8;
r = toku_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_DIRTY, attr);
cleaner_called = FALSE;
r = toku_cachetable_begin_checkpoint(ct, NULL);
assert_zero(r);
toku_cleaner_thread(ct);
assert(cleaner_called);
r = toku_cachetable_end_checkpoint(
ct,
NULL,
fake_ydb_lock,
fake_ydb_unlock,
NULL,
NULL
);
assert(r==0);
toku_cachetable_verify(ct);
r = toku_cachefile_close(&f1, 0, FALSE, ZERO_LSN); assert(r == 0 && f1 == 0);
r = toku_cachetable_close(&ct); lazy_assert_zero(r);
}
int
test_main(int argc, const char *argv[]) {
default_parse_args(argc, argv);
cachetable_test();
return 0;
}
#ident "$Id: cachetable-simple-verify.c 36689 2011-11-07 22:08:05Z zardosht $"
#ident "Copyright (c) 2007-2011 Tokutek Inc. All rights reserved."
#include "includes.h"
#include "test.h"
CACHEFILE f1;
BOOL flush_called;
static void
flush (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
void *v __attribute__((__unused__)),
void *e __attribute__((__unused__)),
PAIR_ATTR s __attribute__((__unused__)),
PAIR_ATTR* new_size __attribute__((__unused__)),
BOOL w __attribute__((__unused__)),
BOOL keep __attribute__((__unused__)),
BOOL c __attribute__((__unused__))
) {
/* Do nothing */
if (verbose) { printf("FLUSH: %d\n", (int)k.b); }
//usleep (5*1024*1024);
PAIR_ATTR attr = make_pair_attr(8);
attr.cache_pressure_size = 0;
*new_size = attr;
if (w) {
assert(!flush_called);
assert(c);
flush_called = TRUE;
}
}
static int
fetch (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
u_int32_t fullhash __attribute__((__unused__)),
void **value __attribute__((__unused__)),
PAIR_ATTR *sizep __attribute__((__unused__)),
int *dirtyp,
void *extraargs __attribute__((__unused__))
) {
*dirtyp = 0;
*value = NULL;
*sizep = make_pair_attr(8);
return 0;
}
BOOL cleaner_called;
static int
cleaner_callback(
void* UU(brtnode_pv),
BLOCKNUM blocknum,
u_int32_t fullhash,
void* UU(extraargs)
)
{
assert(blocknum.b == 1);
assert(fullhash == 1);
assert(!cleaner_called);
assert(flush_called);
cleaner_called = TRUE;
int r = toku_cachetable_unpin(f1, blocknum, fullhash, CACHETABLE_CLEAN, make_pair_attr(8));
assert_zero(r);
return 0;
}
static void
cachetable_test (void) {
const int test_limit = 12;
int r;
CACHETABLE ct;
r = toku_create_cachetable(&ct, test_limit, ZERO_LSN, NULL_LOGGER); assert(r == 0);
char fname1[] = __FILE__ "test1.dat";
unlink(fname1);
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
void* v1;
//void* v2;
long s1;
//long s2;
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, flush, fetch, def_pe_est_callback, def_pe_callback, def_pf_req_callback, def_pf_callback, cleaner_callback, NULL, NULL);
PAIR_ATTR attr = make_pair_attr(8);
attr.cache_pressure_size = 8;
r = toku_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_DIRTY, attr);
cleaner_called = FALSE;
r = toku_cachetable_begin_checkpoint(ct, NULL);
assert_zero(r);
toku_cleaner_thread(ct);
assert(!cleaner_called);
r = toku_cachetable_end_checkpoint(
ct,
NULL,
fake_ydb_lock,
fake_ydb_unlock,
NULL,
NULL
);
assert(r==0);
toku_cachetable_verify(ct);
r = toku_cachefile_close(&f1, 0, FALSE, ZERO_LSN); assert(r == 0 && f1 == 0);
r = toku_cachetable_close(&ct); lazy_assert_zero(r);
}
int
test_main(int argc, const char *argv[]) {
default_parse_args(argc, argv);
cachetable_test();
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment