Commit 38354089 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:4031], merge changes to main

git-svn-id: file:///svn/toku/tokudb@38244 c7de825b-a66e-492c-adef-691d508d4ae1
parent 2198a741
......@@ -304,7 +304,6 @@ verify_valid_blocknum (struct translation *t, BLOCKNUM b) {
//Can be freed
static inline void
verify_valid_freeable_blocknum (struct translation *t, BLOCKNUM b) {
assert(t->type == TRANSLATION_CURRENT);
assert(b.b >= RESERVED_BLOCKNUMS);
assert(b.b < t->smallest_never_used_blocknum.b);
......@@ -556,21 +555,33 @@ toku_allocate_blocknum(BLOCK_TABLE bt, BLOCKNUM *res, struct brt_header * h) {
}
static void
free_blocknum_unlocked(BLOCK_TABLE bt, BLOCKNUM *bp, struct brt_header * h) {
// Effect: Free a blocknum.
// If the blocknum holds the only reference to a block on disk, free that block
assert(bt->is_locked);
BLOCKNUM b = *bp;
bp->b = 0; //Remove caller's reference.
struct translation *t = &bt->current;
free_blocknum_in_translation(struct translation *t, BLOCKNUM b)
{
verify_valid_freeable_blocknum(t, b);
struct block_translation_pair old_pair = t->block_translation[b.b];
assert(old_pair.size != size_is_free);
PRNTF("free_blocknum", b.b, t->block_translation[b.b].size, t->block_translation[b.b].u.diskoff, bt);
PRNTF("free_blocknum", b.b, t->block_translation[b.b].size, t->block_translation[b.b].u.diskoff, bt);
t->block_translation[b.b].size = size_is_free;
t->block_translation[b.b].u.next_free_blocknum = t->blocknum_freelist_head;
t->blocknum_freelist_head = b;
}
static void
free_blocknum_unlocked(BLOCK_TABLE bt, BLOCKNUM *bp, struct brt_header * h, BOOL for_checkpoint) {
// Effect: Free a blocknum.
// If the blocknum holds the only reference to a block on disk, free that block
assert(bt->is_locked);
BLOCKNUM b = *bp;
bp->b = 0; //Remove caller's reference.
struct block_translation_pair old_pair = bt->current.block_translation[b.b];
free_blocknum_in_translation(&bt->current, b);
if (for_checkpoint) {
assert(h->checkpoint_header->type == BRTHEADER_CHECKPOINT_INPROGRESS);
free_blocknum_in_translation(&bt->inprogress, b);
}
//If the size is 0, no disk block has ever been assigned to this blocknum.
if (old_pair.size > 0) {
......@@ -584,13 +595,13 @@ PRNTF("free_blocknum_free", b.b, old_pair.size, old_pair.u.diskoff, bt);
}
}
else assert(old_pair.size==0 && old_pair.u.diskoff == diskoff_unused);
brtheader_set_dirty(h, FALSE);
brtheader_set_dirty(h, for_checkpoint);
}
void
toku_free_blocknum(BLOCK_TABLE bt, BLOCKNUM *bp, struct brt_header * h) {
toku_free_blocknum(BLOCK_TABLE bt, BLOCKNUM *bp, struct brt_header * h, BOOL for_checkpoint) {
lock_for_blocktable(bt);
free_blocknum_unlocked(bt, bp, h);
free_blocknum_unlocked(bt, bp, h, for_checkpoint);
unlock_for_blocktable(bt);
}
......@@ -606,7 +617,7 @@ toku_block_translation_truncate_unlocked(BLOCK_TABLE bt, int fd, struct brt_head
int64_t i;
for (i=RESERVED_BLOCKNUMS; i<t->smallest_never_used_blocknum.b; i++) {
BLOCKNUM b = make_blocknum(i);
if (t->block_translation[i].size >= 0) free_blocknum_unlocked(bt, &b, h);
if (t->block_translation[i].size >= 0) free_blocknum_unlocked(bt, &b, h, FALSE);
}
maybe_truncate_cachefile(bt, fd, h, allocated_limit_at_start);
}
......
......@@ -37,7 +37,7 @@ void toku_maybe_truncate_cachefile_on_open(BLOCK_TABLE bt, int fd, struct brt_he
//Blocknums
void toku_allocate_blocknum(BLOCK_TABLE bt, BLOCKNUM *res, struct brt_header * h);
void toku_allocate_blocknum_unlocked(BLOCK_TABLE bt, BLOCKNUM *res, struct brt_header * h);
void toku_free_blocknum(BLOCK_TABLE bt, BLOCKNUM *b, struct brt_header * h);
void toku_free_blocknum(BLOCK_TABLE bt, BLOCKNUM *b, struct brt_header * h, BOOL for_checkpoint);
void toku_verify_blocknum_allocated(BLOCK_TABLE bt, BLOCKNUM b);
void toku_block_verify_no_data_blocks_except_root_unlocked(BLOCK_TABLE bt, BLOCKNUM root);
void toku_block_verify_no_free_blocknums(BLOCK_TABLE bt);
......
......@@ -247,31 +247,6 @@ toku_pin_brtnode_off_client_thread(
*node_p = node;
}
void
checkpoint_nodes(struct brt_header* h,
u_int32_t num_dependent_nodes,
BRTNODE* dependent_nodes)
{
CACHEFILE dependent_cf[num_dependent_nodes];
BLOCKNUM dependent_keys[num_dependent_nodes];
u_int32_t dependent_fullhash[num_dependent_nodes];
enum cachetable_dirty dependent_dirty_bits[num_dependent_nodes];
for (u_int32_t i = 0; i < num_dependent_nodes; i++) {
dependent_cf[i] = h->cf;
dependent_keys[i] = dependent_nodes[i]->thisnodename;
dependent_fullhash[i] = toku_cachetable_hash(h->cf, dependent_nodes[i]->thisnodename);
dependent_dirty_bits[i] = (enum cachetable_dirty) dependent_nodes[i]->dirty;
}
toku_checkpoint_pairs(
h->cf,
num_dependent_nodes,
dependent_cf,
dependent_keys,
dependent_fullhash,
dependent_dirty_bits
);
}
void
toku_unpin_brtnode_off_client_thread(struct brt_header* h, BRTNODE node)
{
......
......@@ -54,16 +54,6 @@ toku_create_new_brtnode (
int n_children
);
/**
* write nodes for checkpoint, if necessary
*/
void
checkpoint_nodes(
struct brt_header* h,
u_int32_t num_dependent_nodes,
BRTNODE* dependent_nodes
);
/**
* The intent of toku_pin_brtnode(_holding_lock) is to abstract the
* process of retrieving a node from the rest of brt.c, so that there is
......
......@@ -997,6 +997,16 @@ maybe_merge_pinned_nodes(
}
}
static void merge_remove_key_callback(
BLOCKNUM* bp,
BOOL for_checkpoint,
void* extra
)
{
struct brt_header* h = extra;
toku_free_blocknum(h->blocktable, bp, h, for_checkpoint);
}
//
// Takes as input a locked node and a childnum_to_merge
// As output, two of node's children are merged or rebalanced, and node is unlocked
......@@ -1058,16 +1068,6 @@ brt_merge_child(
flush_this_child(h, node, childb, childnumb, started_at_root, brt_status);
}
//
//prelock cachetable, do checkpointing
//
toku_cachetable_prelock(h->cf);
BRTNODE dependent_nodes[3];
dependent_nodes[0] = node;
dependent_nodes[1] = childa;
dependent_nodes[2] = childb;
checkpoint_nodes(h, 3, dependent_nodes);
// now we have both children pinned in main memory, and cachetable locked,
// so no checkpoints will occur.
......@@ -1112,12 +1112,16 @@ brt_merge_child(
// now we possibly flush the children
//
if (did_merge) {
BLOCKNUM bn = childb->thisnodename;
int rrb = toku_cachetable_unpin_and_remove(h->cf, bn, TRUE);
assert(rrb==0);
toku_free_blocknum(h->blocktable, &bn, h);
// unlock cachetable
toku_cachetable_unlock(h->cf);
BLOCKNUM bn = childb->thisnodename;
// merge_remove_key_callback will free the blocknum
int rrb = toku_cachetable_unpin_and_remove(
h->cf,
bn,
merge_remove_key_callback,
h
);
assert(rrb==0);
// for test
call_flusher_thread_callback(ft_flush_after_merge);
......@@ -1126,8 +1130,6 @@ brt_merge_child(
toku_unpin_brtnode_off_client_thread(h, node);
}
else {
// unlock cachetable
toku_cachetable_unlock(h->cf);
// for test
call_flusher_thread_callback(ft_flush_after_rebalance);
......
......@@ -1829,26 +1829,6 @@ static void checkpoint_dependent_pairs(
}
}
void toku_checkpoint_pairs(
CACHEFILE cf,
u_int32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint
CACHEFILE* dependent_cfs, // array of cachefiles of dependent pairs
CACHEKEY* dependent_keys, // array of cachekeys of dependent pairs
u_int32_t* dependent_fullhash, //array of fullhashes of dependent pairs
enum cachetable_dirty* dependent_dirty // array stating dirty/cleanness of dependent pairs
)
{
checkpoint_dependent_pairs(
cf->cachetable,
num_dependent_pairs,
dependent_cfs,
dependent_keys,
dependent_fullhash,
dependent_dirty
);
}
int toku_cachetable_put_with_dep_pairs(
CACHEFILE cachefile,
CACHETABLE_GET_KEY_AND_FULLHASH get_key_and_fullhash,
......@@ -2373,13 +2353,6 @@ int toku_cachetable_unpin_ct_prelocked_no_flush(CACHEFILE cachefile, CACHEKEY ke
return cachetable_unpin_internal(cachefile, key, fullhash, dirty, attr, TRUE, FALSE);
}
void toku_cachetable_prelock(CACHEFILE cf) {
cachetable_lock(cf->cachetable);
}
void toku_cachetable_unlock(CACHEFILE cf) {
cachetable_unlock(cf->cachetable);
}
static void
run_unlockers (UNLOCKERS unlockers) {
while (unlockers) {
......@@ -2991,19 +2964,61 @@ toku_cachetable_close (CACHETABLE *ctp) {
return 0;
}
int toku_cachetable_unpin_and_remove (CACHEFILE cachefile, CACHEKEY key, BOOL ct_prelocked) {
int toku_cachetable_unpin_and_remove (
CACHEFILE cachefile,
CACHEKEY key,
CACHETABLE_REMOVE_KEY remove_key,
void* remove_key_extra
)
{
int r = ENOENT;
// Removing something already present is OK.
CACHETABLE ct = cachefile->cachetable;
PAIR p;
int count = 0;
if (!ct_prelocked) cachetable_lock(ct);
cachetable_lock(ct);
u_int32_t fullhash = toku_cachetable_hash(cachefile, key);
for (p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) {
count++;
if (p->key.b==key.b && p->cachefile==cachefile) {
p->dirty = CACHETABLE_CLEAN; // clear the dirty bit. We're just supposed to remove it.
assert(nb_mutex_writers(&p->nb_mutex));
//
// take care of key removal
//
BOOL for_checkpoint = p->checkpoint_pending;
// now let's wipe out the pending bit, because we are
// removing the PAIR
p->checkpoint_pending = FALSE;
//
// Here is a tricky thing.
// In the code below, we may release the
// cachetable lock if there are blocked writers
// on this pair. While the cachetable lock is released,
// we may theoretically begin another checkpoint, or start
// a cleaner thread.
// So, in order for this PAIR to not be marked
// for the impending checkpoint, we mark the
// PAIR as clean. For the PAIR to not be picked by the
// cleaner thread, we mark the cachepressure_size to be 0
//
p->dirty = CACHETABLE_CLEAN;
CACHEKEY key_to_remove = key;
p->attr.cache_pressure_size = 0;
//
// callback for removing the key
// for BRTNODEs, this leads to calling
// toku_free_blocknum
//
if (remove_key) {
remove_key(
&key_to_remove,
for_checkpoint,
remove_key_extra
);
}
nb_mutex_write_unlock(&p->nb_mutex);
//
// need to find a way to assert that
......@@ -3050,6 +3065,13 @@ int toku_cachetable_unpin_and_remove (CACHEFILE cachefile, CACHEKEY key, BOOL ct
cachetable_lock(ct);
assert(nb_mutex_writers(&p->nb_mutex) == 1);
BOOL destroyed = FALSE;
// let's also assert that this PAIR was not somehow marked
// as pending a checkpoint. Above, when calling
// remove_key(), we cleared the dirty bit so that
// this PAIR cannot be marked for checkpoint, so let's
// make sure that our assumption is valid.
assert(!p->checkpoint_pending);
assert(p->attr.cache_pressure_size == 0);
// Because we assume it is just the checkpoint thread
// that may have been blocked (as argued above),
// it is safe to simply remove the PAIR from the
......@@ -3073,7 +3095,7 @@ int toku_cachetable_unpin_and_remove (CACHEFILE cachefile, CACHEKEY key, BOOL ct
}
done:
note_hash_count(count);
if (!ct_prelocked) cachetable_unlock(ct);
cachetable_unlock(ct);
return r;
}
......@@ -3781,6 +3803,13 @@ toku_cleaner_thread (void *cachetable_v)
// here we select a PAIR for cleaning
// look at some number of PAIRS, and
// pick what we think is the best one for cleaning
//***** IMPORTANT ******
// we MUST not pick a PAIR whose rating is 0. We have
// numerous assumptions in other parts of the code that
// this is the case:
// - this is how rollback nodes and leaf nodes are not selected for cleaning
// - this is how a thread that is calling unpin_and_remove will prevent
// the cleaner thread from picking its PAIR (see comments in that function)
do {
if (nb_mutex_users(&ct->cleaner_head->nb_mutex) > 0 || ct->cleaner_head->cachefile->is_flushing) {
goto next_pair;
......@@ -3800,6 +3829,8 @@ toku_cleaner_thread (void *cachetable_v)
//
if (best_pair) {
nb_mutex_write_lock(&best_pair->nb_mutex, ct->mutex);
// verify a key assumption.
assert(cleaner_thread_rate_pair(best_pair) > 0);
// the order of operations for these two pieces is important
// we must add the background job first, while we still have the
// cachetable lock and we are assured that the best_pair's
......
......@@ -181,9 +181,10 @@ typedef int (*CACHETABLE_PARTIAL_FETCH_CALLBACK)(void *brtnode_pv, void *read_ex
// TODO(leif) XXX TODO XXX
typedef int (*CACHETABLE_CLEANER_CALLBACK)(void *brtnode_pv, BLOCKNUM blocknum, u_int32_t fullhash, void *write_extraargs);
typedef void (*CACHETABLE_GET_KEY_AND_FULLHASH)(CACHEKEY* cachekey, u_int32_t* fullhash, void* extra);
typedef void (*CACHETABLE_REMOVE_KEY)(CACHEKEY* cachekey, BOOL for_checkpoint, void* extra);
void toku_cachefile_set_userdata(CACHEFILE cf, void *userdata,
int (*log_fassociate_during_checkpoint)(CACHEFILE, void*),
int (*log_suppress_rollback_during_checkpoint)(CACHEFILE, void*),
......@@ -204,15 +205,6 @@ void *toku_cachefile_get_userdata(CACHEFILE);
CACHETABLE toku_cachefile_get_cachetable(CACHEFILE cf);
// Effect: Get the cachetable.
void toku_checkpoint_pairs(
CACHEFILE cf,
u_int32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint
CACHEFILE* dependent_cfs, // array of cachefiles of dependent pairs
CACHEKEY* dependent_keys, // array of cachekeys of dependent pairs
u_int32_t* dependent_fullhash, //array of fullhashes of dependent pairs
enum cachetable_dirty* dependent_dirty // array stating dirty/cleanness of dependent pairs
);
// put something into the cachetable and checkpoint dependent pairs
// if the checkpointing is necessary
......@@ -359,14 +351,7 @@ int toku_cachetable_unpin_ct_prelocked_no_flush(CACHEFILE, CACHEKEY, u_int32_t f
// Effect: The same as tokud_cachetable_unpin, except that the ct must not be locked.
// Requires: The ct is NOT locked.
void toku_cachetable_prelock(CACHEFILE cf);
// Effect: locks cachetable
void toku_cachetable_unlock(CACHEFILE cf);
// Effect: unlocks cachetable
int toku_cachetable_unpin_and_remove (CACHEFILE, CACHEKEY, BOOL); /* Removing something already present is OK. */
int toku_cachetable_unpin_and_remove (CACHEFILE, CACHEKEY, CACHETABLE_REMOVE_KEY, void*); /* Removing something already present is OK. */
// Effect: Remove an object from the cachetable. Don't write it back.
// Requires: The object must be pinned exactly once.
......
......@@ -56,9 +56,9 @@ toku_delete_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
if (txn->pinned_inprogress_rollback_log == log) {
txn->pinned_inprogress_rollback_log = NULL;
}
r = toku_cachetable_unpin_and_remove (cf, log->thislogname, FALSE);
r = toku_cachetable_unpin_and_remove (cf, log->thislogname, NULL, NULL);
assert(r==0);
toku_free_blocknum(h->blocktable, &to_free, h);
toku_free_blocknum(h->blocktable, &to_free, h, FALSE);
return r;
}
......
#ident "$Id: cachetable-simple-verify.c 36689 2011-11-07 22:08:05Z zardosht $"
#ident "Copyright (c) 2007-2011 Tokutek Inc. All rights reserved."
#include "includes.h"
#include "test.h"
static void
flush (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
void *v __attribute__((__unused__)),
void *e __attribute__((__unused__)),
PAIR_ATTR s __attribute__((__unused__)),
PAIR_ATTR* new_size __attribute__((__unused__)),
BOOL w __attribute__((__unused__)),
BOOL keep __attribute__((__unused__)),
BOOL c __attribute__((__unused__))
) {
/* Do nothing */
if (verbose) { printf("FLUSH: %d\n", (int)k.b); }
//usleep (5*1024*1024);
}
static int
fetch (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
u_int32_t fullhash __attribute__((__unused__)),
void **value __attribute__((__unused__)),
PAIR_ATTR *sizep __attribute__((__unused__)),
int *dirtyp,
void *extraargs __attribute__((__unused__))
) {
*dirtyp = 0;
*value = NULL;
*sizep = make_pair_attr(8);
return 0;
}
static void remove_key_expect_checkpoint(
CACHEKEY* UU(cachekey),
BOOL for_checkpoint,
void* UU(extra)
)
{
assert(for_checkpoint);
}
static void remove_key_expect_no_checkpoint(
CACHEKEY* UU(cachekey),
BOOL for_checkpoint,
void* UU(extra)
)
{
assert(!for_checkpoint);
}
static void
cachetable_test (void) {
const int test_limit = 120;
int r;
CACHETABLE ct;
r = toku_create_cachetable(&ct, test_limit, ZERO_LSN, NULL_LOGGER); assert(r == 0);
char fname1[] = __FILE__ "test1.dat";
unlink(fname1);
CACHEFILE f1;
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
void* v1;
//void* v2;
long s1;
//long s2;
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, flush, fetch, def_pe_est_callback, def_pe_callback, def_pf_req_callback, def_pf_callback, def_cleaner_callback, NULL, NULL);
r = toku_cachetable_begin_checkpoint(ct, NULL); assert(r == 0);
r = toku_cachetable_unpin_and_remove(f1, make_blocknum(1), remove_key_expect_checkpoint, NULL);
r = toku_cachetable_end_checkpoint(
ct,
NULL,
fake_ydb_lock,
fake_ydb_unlock,
NULL,
NULL
);
assert(r==0);
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, flush, fetch, def_pe_est_callback, def_pe_callback, def_pf_req_callback, def_pf_callback, def_cleaner_callback, NULL, NULL);
r = toku_cachetable_unpin_and_remove(f1, make_blocknum(1), remove_key_expect_no_checkpoint, NULL);
toku_cachetable_verify(ct);
r = toku_cachefile_close(&f1, 0, FALSE, ZERO_LSN); assert(r == 0 && f1 == 0);
r = toku_cachetable_close(&ct); lazy_assert_zero(r);
}
int
test_main(int argc, const char *argv[]) {
default_parse_args(argc, argv);
cachetable_test();
return 0;
}
......@@ -52,7 +52,7 @@ cachetable_unpin_and_remove_test (int n) {
while (nkeys > 0) {
i = random() % nkeys;
u_int32_t hi = toku_cachetable_hash(f1, make_blocknum(testkeys[i].b));
r = toku_cachetable_unpin_and_remove(f1, testkeys[i], FALSE);
r = toku_cachetable_unpin_and_remove(f1, testkeys[i], NULL, NULL);
assert(r == 0);
toku_cachefile_verify(f1);
......@@ -67,7 +67,7 @@ cachetable_unpin_and_remove_test (int n) {
// verify that all are really removed
for (i=0; i<n; i++) {
r = toku_cachetable_unpin_and_remove(f1, keys[i], FALSE);
r = toku_cachetable_unpin_and_remove(f1, keys[i], NULL, NULL);
// assert(r != 0);
if (r == 0) printf("%s:%d warning %d\n", __FILE__, __LINE__, r);
}
......@@ -115,7 +115,7 @@ cachetable_put_evict_remove_test (int n) {
assert(r == 0);
// remove 0
r = toku_cachetable_unpin_and_remove(f1, make_blocknum(0), FALSE);
r = toku_cachetable_unpin_and_remove(f1, make_blocknum(0), NULL, NULL);
assert(r == 0);
char *error_string;
......
......@@ -57,7 +57,7 @@ run_test (void) {
// give checkpoint thread a chance to start waiting on lock
sleep(1);
r = toku_cachetable_unpin_and_remove(f1, make_blocknum(1), FALSE);
r = toku_cachetable_unpin_and_remove(f1, make_blocknum(1), NULL, NULL);
assert(r==0);
void* ret;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment