Commit cc600a42 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

refs #5634, merge bucket mutexes to main

git-svn-id: file:///svn/toku/tokudb@49391 c7de825b-a66e-492c-adef-691d508d4ae1
parent 4b673e11
......@@ -152,7 +152,7 @@ struct ctpair {
// locks
toku::frwlock value_rwlock;
struct nb_mutex disk_nb_mutex; // single writer, protects disk_data, is used for writing cloned nodes for checkpoint
toku_mutex_t mutex;
toku_mutex_t* mutex; // gotten from the pair list
// Access to checkpoint_pending is protected by two mechanisms,
// the value_rwlock and the pair_list's pending locks (expensive and cheap).
......@@ -215,7 +215,9 @@ public:
//
uint32_t m_n_in_table; // number of pairs in the hash table
uint32_t m_table_size; // number of buckets in the hash table
uint32_t m_num_locks;
PAIR *m_table; // hash table
toku_mutex_aligned_t *m_mutexes;
//
// The following fields are the heads of various linked lists.
// They also protected by the list lock, but their
......@@ -232,6 +234,7 @@ public:
//
PAIR m_clock_head; // of clock . head is the next thing to be up for decrement.
PAIR m_cleaner_head; // for cleaner thread. head is the next thing to look at for possible cleaning.
PAIR m_checkpoint_head; // for begin checkpoint to iterate over PAIRs and mark as pending_checkpoint
PAIR m_pending_head; // list of pairs marked with checkpoint_pending
// this field is public so we are still POD
......@@ -281,10 +284,12 @@ public:
void read_pending_cheap_unlock();
void write_pending_cheap_lock();
void write_pending_cheap_unlock();
toku_mutex_t* get_mutex_for_pair(uint32_t fullhash);
void pair_lock_by_fullhash(uint32_t fullhash);
void pair_unlock_by_fullhash(uint32_t fullhash);
private:
void pair_remove (PAIR p);
void rehash (uint32_t newtable_size);
void add_to_clock (PAIR p);
PAIR remove_from_hash_chain (PAIR remove_me, PAIR list);
};
......
This diff is collapsed.
......@@ -166,7 +166,7 @@ typedef int (*CACHETABLE_PARTIAL_FETCH_CALLBACK)(void *value_data, void* disk_da
// The cachetable calls the put callback during a cachetable_put command to provide the opaque PAIR.
// The PAIR can then be used to later unpin the pair.
// Returns: 0 if success, otherwise an error number.
typedef void (*CACHETABLE_PUT_CALLBACK)(void *value_data, PAIR p);
typedef void (*CACHETABLE_PUT_CALLBACK)(CACHEKEY key, void *value_data, PAIR p);
// TODO(leif) XXX TODO XXX
typedef int (*CACHETABLE_CLEANER_CALLBACK)(void *ftnode_pv, BLOCKNUM blocknum, uint32_t fullhash, void *write_extraargs);
......@@ -226,9 +226,7 @@ void toku_cachetable_put_with_dep_pairs(
CACHETABLE_WRITE_CALLBACK write_callback,
void *get_key_and_fullhash_extra,
uint32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint
CACHEFILE* dependent_cfs, // array of cachefiles of dependent pairs
CACHEKEY* dependent_keys, // array of cachekeys of dependent pairs
uint32_t* dependent_fullhash, //array of fullhashes of dependent pairs
PAIR* dependent_pairs,
enum cachetable_dirty* dependent_dirty, // array stating dirty/cleanness of dependent pairs
CACHEKEY* key,
uint32_t* fullhash,
......@@ -255,8 +253,6 @@ void toku_cachetable_put(CACHEFILE cf, CACHEKEY key, uint32_t fullhash,
// then the required PAIRs are written to disk for checkpoint.
// KEY PROPERTY OF DEPENDENT PAIRS: They are already locked by the client
// Returns: 0 if the memory object is in memory, otherwise an error number.
// Requires: toku_cachetable_begin_batched_pin must have been called before entering this function.
// Requires: toku_cachetable_end_batched_pin must be called after this function.
// Rationale:
// begin_batched_pin and end_batched_pin take and release a read lock on the pair list.
// Normally, that would be done within this get_and_pin, but we want to pin multiple nodes with a single acquisition of the read lock.
......@@ -273,9 +269,7 @@ int toku_cachetable_get_and_pin_with_dep_pairs_batched (
pair_lock_type lock_type,
void* read_extraargs, // parameter for fetch_callback, pf_req_callback, and pf_callback
uint32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint
CACHEFILE* dependent_cfs, // array of cachefiles of dependent pairs
CACHEKEY* dependent_keys, // array of cachekeys of dependent pairs
uint32_t* dependent_fullhash, //array of fullhashes of dependent pairs
PAIR* dependent_pairs,
enum cachetable_dirty* dependent_dirty // array stating dirty/cleanness of dependent pairs
);
......@@ -294,9 +288,7 @@ int toku_cachetable_get_and_pin_with_dep_pairs (
pair_lock_type lock_type,
void* read_extraargs, // parameter for fetch_callback, pf_req_callback, and pf_callback
uint32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint
CACHEFILE* dependent_cfs, // array of cachefiles of dependent pairs
CACHEKEY* dependent_keys, // array of cachekeys of dependent pairs
uint32_t* dependent_fullhash, //array of fullhashes of dependent pairs
PAIR* dependent_pairs,
enum cachetable_dirty* dependent_dirty // array stating dirty/cleanness of dependent pairs
);
......@@ -332,21 +324,13 @@ void toku_cachetable_pf_pinned_pair(
struct unlockers {
bool locked;
void (*f)(void*extra);
void (*f)(PAIR p, void* extra);
void *extra;
UNLOCKERS next;
};
// Effect: Makes necessary preparations (grabs locks) for pinning multiple nodes.
void toku_cachetable_begin_batched_pin(CACHEFILE cf);
// Effect: Clean up (release locks) after pinning multiple nodes.
void toku_cachetable_end_batched_pin(CACHEFILE cf);
// Effect: If the block is in the cachetable, then return it.
// Otherwise call the functions in unlockers, fetch the data (but don't pin it, since we'll just end up pinning it again later), and return TOKUDB_TRY_AGAIN.
// Requires: toku_cachetable_begin_batched_pin must have been called before entering this function.
// Requires: toku_cachetable_end_batched_pin must be called after this function.
// Rationale:
// begin_batched_pin and end_batched_pin take and release a read lock on the pair list.
// Normally, that would be done within this get_and_pin, but we want to pin multiple nodes with a single acquisition of the read lock.
......@@ -399,7 +383,7 @@ int toku_cachetable_unpin(CACHEFILE, PAIR, enum cachetable_dirty dirty, PAIR_ATT
// Returns: 0 if success, otherwise returns an error number.
// Requires: The ct is locked.
int toku_cachetable_unpin_ct_prelocked_no_flush(CACHEFILE, PAIR, enum cachetable_dirty dirty, PAIR_ATTR size);
int toku_cachetable_unpin_ct_prelocked_no_flush(PAIR, CACHEFILE, PAIR, enum cachetable_dirty dirty, PAIR_ATTR size);
// Effect: The same as tokud_cachetable_unpin, except that the ct must not be locked.
// Requires: The ct is NOT locked.
......
......@@ -34,14 +34,10 @@ cachetable_put_empty_node_with_dep_nodes(
FTNODE* result)
{
FTNODE XMALLOC(new_node);
CACHEFILE dependent_cf[num_dependent_nodes];
BLOCKNUM dependent_keys[num_dependent_nodes];
uint32_t dependent_fullhash[num_dependent_nodes];
PAIR dependent_pairs[num_dependent_nodes];
enum cachetable_dirty dependent_dirty_bits[num_dependent_nodes];
for (uint32_t i = 0; i < num_dependent_nodes; i++) {
dependent_cf[i] = h->cf;
dependent_keys[i] = dependent_nodes[i]->thisnodename;
dependent_fullhash[i] = toku_cachetable_hash(h->cf, dependent_nodes[i]->thisnodename);
dependent_pairs[i] = dependent_nodes[i]->ct_pair;
dependent_dirty_bits[i] = (enum cachetable_dirty) dependent_nodes[i]->dirty;
}
......@@ -53,9 +49,7 @@ cachetable_put_empty_node_with_dep_nodes(
get_write_callbacks_for_node(h),
h,
num_dependent_nodes,
dependent_cf,
dependent_keys,
dependent_fullhash,
dependent_pairs,
dependent_dirty_bits,
name,
fullhash,
......@@ -126,7 +120,6 @@ toku_pin_ftnode_batched(
FTNODE_FETCH_EXTRA bfe,
pair_lock_type lock_type,
bool apply_ancestor_messages, // this bool is probably temporary, for #3972, once we know how range query estimates work, will revisit this
bool end_batch_on_success,
FTNODE *node_p,
bool* msgs_applied)
{
......@@ -159,9 +152,6 @@ try_again_for_write_lock:
goto try_again_for_write_lock;
}
}
if (end_batch_on_success) {
toku_cachetable_end_batched_pin(brt->ft->cf);
}
if (apply_ancestor_messages && node->height == 0) {
if (needs_ancestors_messages) {
invariant(needed_lock_type != PL_READ);
......@@ -219,7 +209,6 @@ toku_pin_ftnode_off_client_thread_and_maybe_move_messages(
FTNODE *node_p,
bool move_messages)
{
toku_cachetable_begin_batched_pin(h->cf);
toku_pin_ftnode_off_client_thread_batched_and_maybe_move_messages(
h,
blocknum,
......@@ -231,7 +220,6 @@ toku_pin_ftnode_off_client_thread_and_maybe_move_messages(
node_p,
move_messages
);
toku_cachetable_end_batched_pin(h->cf);
}
void
......@@ -262,14 +250,10 @@ toku_pin_ftnode_off_client_thread_batched_and_maybe_move_messages(
bool move_messages)
{
void *node_v;
CACHEFILE dependent_cf[num_dependent_nodes];
BLOCKNUM dependent_keys[num_dependent_nodes];
uint32_t dependent_fullhash[num_dependent_nodes];
PAIR dependent_pairs[num_dependent_nodes];
enum cachetable_dirty dependent_dirty_bits[num_dependent_nodes];
for (uint32_t i = 0; i < num_dependent_nodes; i++) {
dependent_cf[i] = h->cf;
dependent_keys[i] = dependent_nodes[i]->thisnodename;
dependent_fullhash[i] = toku_cachetable_hash(h->cf, dependent_nodes[i]->thisnodename);
dependent_pairs[i] = dependent_nodes[i]->ct_pair;
dependent_dirty_bits[i] = (enum cachetable_dirty) dependent_nodes[i]->dirty;
}
......@@ -286,9 +270,7 @@ toku_pin_ftnode_off_client_thread_batched_and_maybe_move_messages(
lock_type,
bfe,
num_dependent_nodes,
dependent_cf,
dependent_keys,
dependent_fullhash,
dependent_pairs,
dependent_dirty_bits
);
assert(r==0);
......
......@@ -68,7 +68,6 @@ toku_pin_ftnode_batched(
FTNODE_FETCH_EXTRA bfe,
pair_lock_type lock_type,
bool apply_ancestor_messages, // this bool is probably temporary, for #3972, once we know how range query estimates work, will revisit this
bool end_batch_on_success,
FTNODE *node_p,
bool* msgs_applied
);
......
......@@ -4340,13 +4340,14 @@ struct unlock_ftnode_extra {
};
// When this is called, the cachetable lock is held
static void
unlock_ftnode_fun (void *v) {
unlock_ftnode_fun (PAIR p, void *v) {
struct unlock_ftnode_extra *x = NULL;
CAST_FROM_VOIDP(x, v);
FT_HANDLE brt = x->ft_handle;
FTNODE node = x->node;
// CT lock is held
int r = toku_cachetable_unpin_ct_prelocked_no_flush(
p,
brt->ft->cf,
node->ct_pair,
(enum cachetable_dirty) node->dirty,
......@@ -4386,13 +4387,9 @@ ft_search_child(FT_HANDLE brt, FTNODE node, int childnum, ft_search_t *search, F
&bfe,
PL_READ, // we try to get a read lock, but we may upgrade to a write lock on a leaf for message application.
true,
(node->height == 1), // end_batch_on_success true iff child is a leaf
&childnode,
&msgs_applied);
if (rr==TOKUDB_TRY_AGAIN) {
// We're going to try again, so we aren't pinning any more
// nodes in this batch.
toku_cachetable_end_batched_pin(brt->ft->cf);
return rr;
}
// We end the batch before applying ancestor messages if we get
......@@ -4573,10 +4570,6 @@ ft_search_node(
// At this point, we must have the necessary partition available to continue the search
//
assert(BP_STATE(node,child_to_search) == PT_AVAIL);
// When we enter, we are in a batch. If we search a node but get
// DB_NOTFOUND and need to search the next node, we'll need to start
// another batch.
bool must_begin_batch = false;
while (child_to_search >= 0 && child_to_search < node->n_children) {
//
// Normally, the child we want to use is available, as we checked
......@@ -4592,10 +4585,6 @@ ft_search_node(
}
const struct pivot_bounds next_bounds = next_pivot_keys(node, child_to_search, bounds);
if (node->height > 0) {
if (must_begin_batch) {
toku_cachetable_begin_batched_pin(brt->ft->cf);
must_begin_batch = false;
}
r = ft_search_child(
brt,
node,
......@@ -4655,7 +4644,6 @@ ft_search_node(
maybe_search_save_bound(node, child_to_search, search);
// We're about to pin some more nodes, but we thought we were done before.
must_begin_batch = true;
if (search->direction == FT_SEARCH_LEFT) {
child_to_search++;
}
......@@ -4722,11 +4710,6 @@ try_again:
uint32_t fullhash;
CACHEKEY root_key;
toku_calculate_root_offset_pointer(ft, &root_key, &fullhash);
// Begin a batch of pins here. If a child gets TOKUDB_TRY_AGAIN
// it must immediately end the batch. Otherwise, it must end the
// batch as soon as it pins the leaf. The batch will never be
// ended in this function.
toku_cachetable_begin_batched_pin(ft->cf);
toku_pin_ftnode_off_client_thread_batched(
ft,
root_key,
......@@ -4737,12 +4720,6 @@ try_again:
NULL,
&node
);
if (node->height == 0) {
// The root is a leaf, must end the batch now because we
// won't apply ancestor messages, which is where we usually
// end it.
toku_cachetable_end_batched_pin(ft->cf);
}
}
uint tree_height = node->height + 1; // How high is the tree? This is the height of the root node plus one (leaf is at height 0).
......@@ -5248,7 +5225,6 @@ toku_ft_keyrange_internal (FT_HANDLE brt, FTNODE node,
bfe,
PL_READ, // may_modify_node is false, because node guaranteed to not change
false,
false,
&childnode,
&msgs_applied
);
......@@ -5296,7 +5272,6 @@ try_again:
uint32_t fullhash;
CACHEKEY root_key;
toku_calculate_root_offset_pointer(brt->ft, &root_key, &fullhash);
toku_cachetable_begin_batched_pin(brt->ft->cf);
toku_pin_ftnode_off_client_thread_batched(
brt->ft,
root_key,
......@@ -5321,7 +5296,6 @@ try_again:
numrows,
&bfe, &unlockers, (ANCESTORS)NULL, &infinite_bounds);
assert(r == 0 || r == TOKUDB_TRY_AGAIN);
toku_cachetable_end_batched_pin(brt->ft->cf);
if (r == TOKUDB_TRY_AGAIN) {
assert(!unlockers.locked);
goto try_again;
......
......@@ -291,7 +291,7 @@ static void ft_note_unpin_by_checkpoint (CACHEFILE UU(cachefile), void *header_v
// End of Functions that are callbacks to the cachefile
/////////////////////////////////////////////////////////////////////////
void toku_node_save_ct_pair(void *value_data, PAIR p) {
void toku_node_save_ct_pair(CACHEKEY UU(key), void *value_data, PAIR p) {
FTNODE CAST_FROM_VOIDP(node, value_data);
node->ct_pair = p;
}
......
......@@ -102,7 +102,7 @@ void toku_ft_set_basementnodesize(FT ft, unsigned int basementnodesize);
void toku_ft_get_basementnodesize(FT ft, unsigned int *basementnodesize);
void toku_ft_set_compression_method(FT ft, enum toku_compression_method method);
void toku_ft_get_compression_method(FT ft, enum toku_compression_method *methodp);
void toku_node_save_ct_pair(void *value_data, PAIR p);
void toku_node_save_ct_pair(CACHEKEY UU(key), void *value_data, PAIR p);
// mark the ft as a blackhole. any message injections will be a no op.
void toku_ft_set_blackhole(FT_HANDLE ft_handle);
......
......@@ -64,7 +64,7 @@ rollback_memory_size(ROLLBACK_LOG_NODE log) {
return make_rollback_pair_attr(size);
}
static void toku_rollback_node_save_ct_pair(void *value_data, PAIR p) {
static void toku_rollback_node_save_ct_pair(CACHEKEY UU(key), void *value_data, PAIR p) {
ROLLBACK_LOG_NODE CAST_FROM_VOIDP(log, value_data);
log->ct_pair = p;
}
......@@ -256,7 +256,7 @@ void toku_get_and_pin_rollback_log(TOKUTXN txn, BLOCKNUM blocknum, uint32_t hash
toku_rollback_pf_callback,
PL_WRITE_CHEAP, // lock_type
h,
0, NULL, NULL, NULL, NULL
0, NULL, NULL
);
assert(r == 0);
ROLLBACK_LOG_NODE CAST_FROM_VOIDP(pinned_log, value);
......
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id$"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#include "includes.h"
#include "test.h"
#include "cachetable-test.h"
CACHETABLE ct;
CACHEFILE f1;
static void
unlock_test_fun (void *v) {
assert(v == NULL);
// CT lock is held
int r = toku_test_cachetable_unpin_ct_prelocked_no_flush(f1, make_blocknum(2), 2, CACHETABLE_CLEAN, make_pair_attr(8));
assert(r==0);
}
static void
run_test (void) {
const int test_limit = 20;
int r;
ct = NULL;
toku_cachetable_create(&ct, test_limit, ZERO_LSN, NULL_LOGGER);
char fname1[] = __SRCFILE__ "test1.dat";
unlink(fname1);
f1 = NULL;
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
create_dummy_functions(f1);
void* v1;
void* v2;
long s1;
long s2;
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, def_write_callback(NULL), def_fetch, def_pf_req_callback, def_pf_callback, true, NULL);
r = toku_test_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, make_pair_attr(8)); assert(r==0);
for (int i = 0; i < 20; i++) {
r = toku_cachetable_get_and_pin(f1, make_blocknum(2), 2, &v2, &s2, def_write_callback(NULL), def_fetch, def_pf_req_callback, def_pf_callback, true, NULL);
r = toku_test_cachetable_unpin(f1, make_blocknum(2), 2, CACHETABLE_CLEAN, make_pair_attr(8)); assert(r==0);
}
//
// so at this point, we have 16 bytes in the cachetable that has a limit of 20 bytes
// block 2 has been touched much more than block 1, so if one had to be evicted,
// it would be block 2
//
// pin 1 and 2
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v2, &s2, def_write_callback(NULL), def_fetch, def_pf_req_callback, def_pf_callback, true, NULL);
CHECKPOINTER cp = toku_cachetable_get_checkpointer(ct);
toku_cachetable_begin_checkpoint(cp, NULL);
// mark nodes as pending a checkpoint, so that get_and_pin_nonblocking on block 1 will return TOKUDB_TRY_AGAIN
r = toku_test_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_DIRTY, make_pair_attr(8)); assert(r==0);
r = toku_cachetable_get_and_pin(f1, make_blocknum(2), 2, &v2, &s2, def_write_callback(NULL), def_fetch, def_pf_req_callback, def_pf_callback, true, NULL);
// now we try to pin 1, and it should get evicted out from under us
struct unlockers foo;
foo.extra = NULL;
foo.locked = true;
foo.f = unlock_test_fun;
foo.next = NULL;
r = toku_cachetable_get_and_pin_nonblocking(
f1,
make_blocknum(1),
1,
&v1,
&s1,
def_write_callback(NULL),
def_fetch,
def_pf_req_callback,
def_pf_callback,
PL_WRITE_EXPENSIVE,
NULL,
&foo
);
assert(r==TOKUDB_TRY_AGAIN);
toku_cachetable_end_checkpoint(
cp,
NULL,
NULL,
NULL
);
toku_cachetable_verify(ct);
toku_cachefile_close(&f1, false, ZERO_LSN);
toku_cachetable_close(&ct);
}
int
test_main(int argc, const char *argv[]) {
default_parse_args(argc, argv);
run_test();
return 0;
}
......@@ -109,6 +109,7 @@ void checkpointer_test::test_pending_bits() {
// 2. One entry in pair chain
//
struct cachefile cf;
cf.cachetable = &ctbl;
memset(&cf, 0, sizeof(cf));
cf.next = NULL;
cf.for_checkpoint = true;
......
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id: cachetable-cleaner-thread-simple.cc 48237 2012-09-24 18:27:59Z esmet $"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#include "includes.h"
#include "test.h"
//
// This test verifies that the cleaner thread doesn't call the callback if
// nothing needs flushing.
//
CACHEFILE f1;
bool my_cleaner_callback_called;
static int
my_cleaner_callback(
void* UU(ftnode_pv),
BLOCKNUM blocknum,
uint32_t fullhash,
void* UU(extraargs)
)
{
PAIR_ATTR attr = make_pair_attr(8);
attr.cache_pressure_size = 0;
int r = toku_test_cachetable_unpin(f1, blocknum, fullhash, CACHETABLE_CLEAN, attr);
my_cleaner_callback_called = true;
return r;
}
// point of this test is to have two pairs that have the same fullhash,
// and therefore, the same bucket mutex
static void
run_test (void) {
const int test_limit = 1000;
int r;
CACHETABLE ct;
toku_cachetable_create(&ct, test_limit, ZERO_LSN, NULL_LOGGER);
my_cleaner_callback_called = false;
char fname1[] = __SRCFILE__ "test1.dat";
unlink(fname1);
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
void* vs[5];
//void* v2;
long ss[5];
//long s2;
CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
wc.cleaner_callback = my_cleaner_callback;
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &vs[0], &ss[0],
wc,
def_fetch,
def_pf_req_callback,
def_pf_callback,
true,
NULL);
PAIR_ATTR attr = make_pair_attr(8);
attr.cache_pressure_size = 100;
r = toku_test_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, attr);
r = toku_cachetable_get_and_pin(f1, make_blocknum(2), 1, &vs[1], &ss[1],
wc,
def_fetch,
def_pf_req_callback,
def_pf_callback,
true,
NULL);
attr = make_pair_attr(8);
attr.cache_pressure_size = 50;
r = toku_test_cachetable_unpin(f1, make_blocknum(2), 1, CACHETABLE_CLEAN, attr);
toku_cleaner_thread_for_test(ct);
assert(my_cleaner_callback_called);
toku_cachetable_verify(ct);
toku_cachefile_close(&f1, false, ZERO_LSN);
toku_cachetable_close(&ct);
}
int
test_main(int argc, const char *argv[]) {
default_parse_args(argc, argv);
run_test();
return 0;
}
......@@ -65,13 +65,8 @@ cachetable_test (enum cachetable_dirty dirty, bool cloneable) {
assert(r == 0);
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, PL_WRITE_EXPENSIVE, NULL, NULL);
if (dirty == CACHETABLE_DIRTY && !cloneable) {
assert(r == TOKUDB_TRY_AGAIN);
}
else {
assert(r == 0);
r = toku_test_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, make_pair_attr(8));
}
toku_cachetable_end_checkpoint(
cp,
......
......@@ -20,6 +20,7 @@
int64_t data[NUM_ELEMENTS];
int64_t checkpointed_data[NUM_ELEMENTS];
PAIR data_pair[NUM_ELEMENTS];
uint32_t time_of_test;
bool run_test;
......@@ -70,7 +71,7 @@ flush (CACHEFILE f __attribute__((__unused__)),
static int
fetch (CACHEFILE f __attribute__((__unused__)),
PAIR UU(p),
PAIR p,
int UU(fd),
CACHEKEY k,
uint32_t fullhash __attribute__((__unused__)),
......@@ -87,6 +88,7 @@ fetch (CACHEFILE f __attribute__((__unused__)),
int64_t* XMALLOC(data_val);
usleep(10);
*data_val = data[data_index];
data_pair[data_index] = p;
*value = data_val;
*sizep = make_pair_attr(8);
return 0;
......@@ -153,8 +155,6 @@ static void *move_numbers(void *arg) {
NULL,
0, //num_dependent_pairs
NULL,
NULL,
NULL,
NULL
);
assert(r==0);
......@@ -164,6 +164,7 @@ static void *move_numbers(void *arg) {
greater_key.b = greater;
uint32_t greater_fullhash = greater;
enum cachetable_dirty greater_dirty = CACHETABLE_DIRTY;
PAIR dep_pair = data_pair[less];
r = toku_cachetable_get_and_pin_with_dep_pairs(
f1,
make_blocknum(greater),
......@@ -174,9 +175,7 @@ static void *move_numbers(void *arg) {
PL_WRITE_CHEAP,
NULL,
1, //num_dependent_pairs
&f1,
&less_key,
&less_fullhash,
&dep_pair,
&less_dirty
);
assert(r==0);
......@@ -196,6 +195,7 @@ static void *move_numbers(void *arg) {
third = (random() % (num_possible_values)) + greater + 1;
CACHEKEY third_key;
third_key.b = third;
dep_pair = data_pair[greater];
uint32_t third_fullhash = third;
enum cachetable_dirty third_dirty = CACHETABLE_DIRTY;
r = toku_cachetable_get_and_pin_with_dep_pairs(
......@@ -208,9 +208,7 @@ static void *move_numbers(void *arg) {
PL_WRITE_CHEAP,
NULL,
1, //num_dependent_pairs
&f1,
&greater_key,
&greater_fullhash,
&dep_pair,
&greater_dirty
);
assert(r==0);
......
......@@ -23,10 +23,21 @@
int64_t data[NUM_ELEMENTS];
int64_t checkpointed_data[NUM_ELEMENTS];
PAIR data_pair[NUM_ELEMENTS];
uint32_t time_of_test;
bool run_test;
static void
put_callback_pair(
CACHEKEY key,
void *UU(v),
PAIR p)
{
int64_t data_index = key.b;
data_pair[data_index] = p;
}
static void
clone_callback(
void* value_data,
......@@ -72,7 +83,7 @@ flush (CACHEFILE f __attribute__((__unused__)),
static int
fetch (CACHEFILE f __attribute__((__unused__)),
PAIR UU(p),
PAIR p,
int UU(fd),
CACHEKEY k,
uint32_t fullhash __attribute__((__unused__)),
......@@ -92,6 +103,7 @@ fetch (CACHEFILE f __attribute__((__unused__)),
int64_t* XMALLOC(data_val);
usleep(10);
*data_val = data[data_index];
data_pair[data_index] = p;
*value = data_val;
*sizep = make_pair_attr(8);
return 0;
......@@ -136,6 +148,7 @@ static void move_number_to_child(
CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
wc.flush_callback = flush;
wc.clone_callback = clone_callback;
PAIR dep_pair = data_pair[parent];
r = toku_cachetable_get_and_pin_with_dep_pairs(
f1,
child_key,
......@@ -146,9 +159,7 @@ static void move_number_to_child(
PL_WRITE_CHEAP,
NULL,
1, //num_dependent_pairs
&f1,
&parent_key,
&parent_fullhash,
&dep_pair,
&parent_dirty
);
assert(r==0);
......@@ -194,8 +205,6 @@ static void *move_numbers(void *arg) {
NULL,
0, //num_dependent_pairs
NULL,
NULL,
NULL,
NULL
);
assert(r==0);
......@@ -249,6 +258,7 @@ static void merge_and_split_child(
CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
wc.flush_callback = flush;
wc.clone_callback = clone_callback;
PAIR dep_pair = data_pair[parent];
r = toku_cachetable_get_and_pin_with_dep_pairs(
f1,
child_key,
......@@ -259,9 +269,7 @@ static void merge_and_split_child(
PL_WRITE_CHEAP,
NULL,
1, //num_dependent_pairs
&f1,
&parent_key,
&parent_fullhash,
&dep_pair,
&parent_dirty
);
assert(r==0);
......@@ -270,18 +278,12 @@ static void merge_and_split_child(
CACHEKEY other_child_key;
other_child_key.b = other_child;
uint32_t other_child_fullhash = toku_cachetable_hash(f1, other_child_key);
CACHEFILE cfs[2];
cfs[0] = f1;
cfs[1] = f1;
CACHEKEY keys[2];
keys[0] = parent_key;
keys[1] = child_key;
uint32_t hashes[2];
hashes[0] = parent_fullhash;
hashes[1] = child_fullhash;
enum cachetable_dirty dirties[2];
dirties[0] = parent_dirty;
dirties[1] = child_dirty;
PAIR dep_pairs[2];
dep_pairs[0] = data_pair[parent];
dep_pairs[1] = data_pair[child];
r = toku_cachetable_get_and_pin_with_dep_pairs(
f1,
......@@ -293,9 +295,7 @@ static void merge_and_split_child(
PL_WRITE_CHEAP,
NULL,
2, //num_dependent_pairs
cfs,
keys,
hashes,
dep_pairs,
dirties
);
assert(r==0);
......@@ -323,13 +323,11 @@ static void merge_and_split_child(
wc,
&other_child,
2, // number of dependent pairs that we may need to checkpoint
cfs,
keys,
hashes,
dep_pairs,
dirties,
&new_key,
&new_fullhash,
put_callback_nop
put_callback_pair
);
assert(new_key.b == other_child);
assert(new_fullhash == other_child_fullhash);
......@@ -372,8 +370,6 @@ static void *merge_and_split(void *arg) {
NULL,
0, //num_dependent_pairs
NULL,
NULL,
NULL,
NULL
);
assert(r==0);
......
......@@ -27,7 +27,7 @@ static void kibbutz_work(void *fe_v)
}
static void
unlock_dummy (void* UU(v)) {
unlock_dummy (PAIR UU(p), void* UU(v)) {
}
static void reset_unlockers(UNLOCKERS unlockers) {
......@@ -49,7 +49,7 @@ run_test (pair_lock_type lock_type) {
void* v1;
long s1;
CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
r = toku_cachetable_get_and_pin_with_dep_pairs(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, lock_type, NULL, 0, NULL, NULL, NULL, NULL);
r = toku_cachetable_get_and_pin_with_dep_pairs(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, lock_type, NULL, 0, NULL, NULL);
cachefile_kibbutz_enq(f1, kibbutz_work, f1);
reset_unlockers(&unlockers);
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, PL_WRITE_EXPENSIVE, NULL, &unlockers);
......@@ -67,7 +67,7 @@ run_test (pair_lock_type lock_type) {
// now do the same test with a partial fetch required
pf_called = false;
r = toku_cachetable_get_and_pin_with_dep_pairs(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, true_pf_req_callback, true_pf_callback, lock_type, NULL, 0, NULL, NULL, NULL, NULL);
r = toku_cachetable_get_and_pin_with_dep_pairs(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, true_pf_req_callback, true_pf_callback, lock_type, NULL, 0, NULL, NULL);
assert(pf_called);
cachefile_kibbutz_enq(f1, kibbutz_work, f1);
reset_unlockers(&unlockers);
......
......@@ -13,6 +13,7 @@ uint64_t val2;
uint64_t val3;
bool check_me;
static void
flush (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
......@@ -46,9 +47,11 @@ flush (CACHEFILE f __attribute__((__unused__)),
}
}
PAIR* dest_pair;
static int
fetch (CACHEFILE f __attribute__((__unused__)),
PAIR UU(p),
PAIR p,
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
uint32_t fullhash __attribute__((__unused__)),
......@@ -61,6 +64,7 @@ fetch (CACHEFILE f __attribute__((__unused__)),
*dirtyp = 0;
*value = extraargs;
*sizep = make_pair_attr(8);
*dest_pair = p;
return 0;
}
......@@ -82,22 +86,16 @@ cachetable_test (bool write_first, bool write_second, bool start_checkpoint) {
long s1;
long s2;
long s3;
PAIR dependent_pairs[2];
CACHETABLE_WRITE_CALLBACK wc = def_write_callback(&val1);
wc.flush_callback = flush;
wc.write_extraargs = &val1;
dest_pair = &dependent_pairs[0];
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, wc, fetch, def_pf_req_callback, def_pf_callback, true, &val1);
dest_pair = &dependent_pairs[1];
wc.write_extraargs = &val2;
r = toku_cachetable_get_and_pin(f1, make_blocknum(2), 2, &v2, &s2, wc, fetch, def_pf_req_callback, def_pf_callback, true, &val2);
CACHEFILE dependent_cfs[2];
dependent_cfs[0] = f1;
dependent_cfs[1] = f1;
CACHEKEY dependent_keys[2];
dependent_keys[0] = make_blocknum(1);
dependent_keys[1] = make_blocknum(2);
uint32_t dependent_fullhash[2];
dependent_fullhash[0] = 1;
dependent_fullhash[1] = 2;
// now we set the dirty state of these two.
enum cachetable_dirty cd[2];
cd[0] = write_first ? CACHETABLE_DIRTY : CACHETABLE_CLEAN;
......@@ -126,9 +124,7 @@ cachetable_test (bool write_first, bool write_second, bool start_checkpoint) {
PL_WRITE_EXPENSIVE,
&val3,
2, //num_dependent_pairs
dependent_cfs,
dependent_keys,
dependent_fullhash,
dependent_pairs,
cd
);
if (start_checkpoint) {
......
......@@ -35,7 +35,7 @@ static void kibbutz_work(void *fe_v)
}
static void
unlock_dummy (void* UU(v)) {
unlock_dummy (PAIR UU(p), void* UU(v)) {
}
static void reset_unlockers(UNLOCKERS unlockers) {
......
......@@ -100,25 +100,6 @@ run_test (void) {
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, true_def_pf_req_callback, true_def_pf_callback, PL_WRITE_EXPENSIVE, NULL, NULL);
assert(r==TOKUDB_TRY_AGAIN);
//
// now test that if there is a checkpoint pending,
// first pin and unpin with dirty
//
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, PL_WRITE_EXPENSIVE, NULL, NULL);
assert(r==0);
r = toku_test_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_DIRTY, make_pair_attr(8)); assert(r==0);
// this should mark the PAIR as pending
CHECKPOINTER cp = toku_cachetable_get_checkpointer(ct);
toku_cachetable_begin_checkpoint(cp, NULL);
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, PL_WRITE_EXPENSIVE, NULL, NULL);
assert(r==TOKUDB_TRY_AGAIN);
toku_cachetable_end_checkpoint(
cp,
NULL,
NULL,
NULL
);
toku_cachetable_verify(ct);
toku_cachefile_close(&f1, false, ZERO_LSN);
toku_cachetable_close(&ct);
......
......@@ -12,6 +12,17 @@ bool v2_written;
uint64_t val2;
uint64_t val3;
bool check_me;
PAIR* dest_pair;
static void
put_callback_pair(
CACHEKEY UU(key),
void *UU(v),
PAIR p)
{
*dest_pair = p;
}
static void
flush (CACHEFILE f __attribute__((__unused__)),
......@@ -61,6 +72,7 @@ fetch (CACHEFILE f __attribute__((__unused__)),
*dirtyp = 0;
*value = extraargs;
*sizep = make_pair_attr(8);
*dest_pair = p;
return 0;
}
......@@ -87,22 +99,16 @@ cachetable_test (bool write_first, bool write_second, bool start_checkpoint) {
void* v2;
long s1;
long s2;
PAIR dependent_pairs[2];
CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
wc.flush_callback = flush;
dest_pair = &dependent_pairs[0];
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, wc, fetch, def_pf_req_callback, def_pf_callback, true, &val1);
assert(r==0);
dest_pair = &dependent_pairs[1];
r = toku_cachetable_get_and_pin(f1, make_blocknum(2), 2, &v2, &s2, wc, fetch, def_pf_req_callback, def_pf_callback, true, &val2);
assert(r==0);
CACHEFILE dependent_cfs[2];
dependent_cfs[0] = f1;
dependent_cfs[1] = f1;
CACHEKEY dependent_keys[2];
dependent_keys[0] = make_blocknum(1);
dependent_keys[1] = make_blocknum(2);
uint32_t dependent_fullhash[2];
dependent_fullhash[0] = 1;
dependent_fullhash[1] = 2;
// now we set the dirty state of these two.
enum cachetable_dirty cd[2];
cd[0] = write_first ? CACHETABLE_DIRTY : CACHETABLE_CLEAN;
......@@ -123,6 +129,8 @@ cachetable_test (bool write_first, bool write_second, bool start_checkpoint) {
CACHEKEY put_key;
uint32_t put_fullhash;
PAIR dummy_pair;
dest_pair = &dummy_pair;
toku_cachetable_put_with_dep_pairs(
f1,
get_key_and_fullhash,
......@@ -131,13 +139,11 @@ cachetable_test (bool write_first, bool write_second, bool start_checkpoint) {
wc,
NULL,
2, //num_dependent_pairs
dependent_cfs,
dependent_keys,
dependent_fullhash,
dependent_pairs,
cd,
&put_key,
&put_fullhash,
put_callback_nop
put_callback_pair
);
assert(put_key.b == 3);
assert(put_fullhash == 3);
......
......@@ -41,7 +41,7 @@ cachetable_test (void) {
long s1;
//long s2;
CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, true, NULL);
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), toku_cachetable_hash(f1, make_blocknum(1)), &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, true, NULL);
CHECKPOINTER cp = toku_cachetable_get_checkpointer(ct);
toku_cachetable_begin_checkpoint(cp, NULL);
r = toku_test_cachetable_unpin_and_remove(f1, make_blocknum(1), remove_key_expect_checkpoint, NULL);
......@@ -52,7 +52,7 @@ cachetable_test (void) {
NULL
);
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, true, NULL);
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), toku_cachetable_hash(f1, make_blocknum(1)), &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, true, NULL);
r = toku_test_cachetable_unpin_and_remove(f1, make_blocknum(1), remove_key_expect_no_checkpoint, NULL);
toku_cachetable_verify(ct);
......
......@@ -185,6 +185,7 @@ def_fetch (CACHEFILE f __attribute__((__unused__)),
static UU() void
put_callback_nop(
CACHEKEY UU(key),
void *UU(v),
PAIR UU(p)) {
}
......
......@@ -37,6 +37,10 @@ typedef struct toku_mutex {
#endif
} toku_mutex_t;
typedef struct toku_mutex_aligned {
toku_mutex_t aligned_mutex __attribute__((__aligned__(64)));
} toku_mutex_aligned_t;
#if defined(__FreeBSD__)
# define TOKU_MUTEX_ADAPTIVE PTHREAD_MUTEX_ADAPTIVE_NP
static const toku_mutex_t ZERO_MUTEX_INITIALIZER = {0};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment