Commit ddfd46fb authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

refs #5634, merge bucket mutexes to main

git-svn-id: file:///svn/toku/tokudb@49391 c7de825b-a66e-492c-adef-691d508d4ae1
parent 72de8ca9
......@@ -152,7 +152,7 @@ struct ctpair {
// locks
toku::frwlock value_rwlock;
struct nb_mutex disk_nb_mutex; // single writer, protects disk_data, is used for writing cloned nodes for checkpoint
toku_mutex_t mutex;
toku_mutex_t* mutex; // gotten from the pair list
// Access to checkpoint_pending is protected by two mechanisms,
// the value_rwlock and the pair_list's pending locks (expensive and cheap).
......@@ -215,7 +215,9 @@ public:
//
uint32_t m_n_in_table; // number of pairs in the hash table
uint32_t m_table_size; // number of buckets in the hash table
uint32_t m_num_locks;
PAIR *m_table; // hash table
toku_mutex_aligned_t *m_mutexes;
//
// The following fields are the heads of various linked lists.
// They also protected by the list lock, but their
......@@ -232,6 +234,7 @@ public:
//
PAIR m_clock_head; // of clock . head is the next thing to be up for decrement.
PAIR m_cleaner_head; // for cleaner thread. head is the next thing to look at for possible cleaning.
PAIR m_checkpoint_head; // for begin checkpoint to iterate over PAIRs and mark as pending_checkpoint
PAIR m_pending_head; // list of pairs marked with checkpoint_pending
// this field is public so we are still POD
......@@ -281,10 +284,12 @@ public:
void read_pending_cheap_unlock();
void write_pending_cheap_lock();
void write_pending_cheap_unlock();
toku_mutex_t* get_mutex_for_pair(uint32_t fullhash);
void pair_lock_by_fullhash(uint32_t fullhash);
void pair_unlock_by_fullhash(uint32_t fullhash);
private:
void pair_remove (PAIR p);
void rehash (uint32_t newtable_size);
void add_to_clock (PAIR p);
PAIR remove_from_hash_chain (PAIR remove_me, PAIR list);
};
......
......@@ -84,18 +84,17 @@ static PAIR_ATTR const zero_attr = {
static inline void ctpair_destroy(PAIR p) {
toku_mutex_destroy(&p->mutex);
p->value_rwlock.deinit();
nb_mutex_destroy(&p->disk_nb_mutex);
toku_free(p);
}
static inline void pair_lock(PAIR p) {
toku_mutex_lock(&p->mutex);
toku_mutex_lock(p->mutex);
}
static inline void pair_unlock(PAIR p) {
toku_mutex_unlock(&p->mutex);
toku_mutex_unlock(p->mutex);
}
void
......@@ -665,7 +664,7 @@ static void cachetable_write_locked_pair(
// then we may try to evict a PAIR that is in the process
// of having its clone be written out
pair_lock(p);
nb_mutex_lock(&p->disk_nb_mutex, &p->mutex);
nb_mutex_lock(&p->disk_nb_mutex, p->mutex);
pair_unlock(p);
// make sure that assumption about cloned_value_data is true
// if we have grabbed the disk_nb_mutex, then that means that
......@@ -756,8 +755,9 @@ void pair_init(PAIR p,
p->count = 0; // <CER> Is zero the correct init value?
p->checkpoint_pending = false;
toku_mutex_init(&p->mutex, NULL);
p->value_rwlock.init(&p->mutex);
p->mutex = list->get_mutex_for_pair(fullhash);
assert(p->mutex);
p->value_rwlock.init(p->mutex);
nb_mutex_init(&p->disk_nb_mutex);
p->size_evicting_estimate = 0; // <CER> Is zero the correct init value?
......@@ -775,7 +775,8 @@ void pair_init(PAIR p,
// Its callers (toku_cachetable_put_with_dep_pairs) depend on this behavior.
//
// Requires pair list's write lock to be held on entry.
// On exit, get pair with mutex held
// the pair's mutex must be held as wel
//
//
static PAIR cachetable_insert_at(CACHETABLE ct,
CACHEFILE cachefile, CACHEKEY key, void *value,
......@@ -803,6 +804,8 @@ static PAIR cachetable_insert_at(CACHETABLE ct,
return p;
}
// on input, the write list lock must be held AND
// the pair's mutex must be held as wel
static void cachetable_insert_pair_at(CACHETABLE ct, PAIR p, PAIR_ATTR attr) {
ct->list.put(p);
ct->ev.add_pair_attr(attr);
......@@ -833,7 +836,7 @@ static void cachetable_put_internal(
//invariant_null(dummy_p);
cachetable_insert_pair_at(ct, p, attr);
invariant_notnull(put_callback);
put_callback(value, p);
put_callback(p->key, value, p);
}
// Pair mutex (p->mutex) is may or may not be held on entry,
......@@ -915,7 +918,7 @@ write_locked_pair_for_checkpoint(CACHETABLE ct, PAIR p, bool checkpoint_pending)
if (p->dirty && checkpoint_pending) {
if (p->clone_callback) {
pair_lock(p);
nb_mutex_lock(&p->disk_nb_mutex, &p->mutex);
nb_mutex_lock(&p->disk_nb_mutex, p->mutex);
pair_unlock(p);
assert(!p->cloned_value_data);
clone_pair(&ct->ev, p);
......@@ -951,7 +954,7 @@ write_pair_for_checkpoint_thread (evictor* ev, PAIR p)
p->value_rwlock.write_lock(false);
if (p->dirty && p->checkpoint_pending) {
if (p->clone_callback) {
nb_mutex_lock(&p->disk_nb_mutex, &p->mutex);
nb_mutex_lock(&p->disk_nb_mutex, p->mutex);
assert(!p->cloned_value_data);
clone_pair(ev, p);
assert(p->cloned_value_data);
......@@ -1026,62 +1029,6 @@ static void checkpoint_dependent_pairs(
}
}
//
// must be holding a lock on the pair_list's list_lock on entry
//
static void get_pairs(
pair_list* pl,
uint32_t num_pairs, // number of dependent pairs that we may need to checkpoint
CACHEFILE* cfs, // array of cachefiles of dependent pairs
CACHEKEY* keys, // array of cachekeys of dependent pairs
uint32_t* fullhash, //array of fullhashes of dependent pairs
PAIR* out_pairs
)
{
for (uint32_t i =0; i < num_pairs; i++) {
out_pairs[i] = pl->find_pair(
cfs[i],
keys[i],
fullhash[i]
);
assert(out_pairs[i] != NULL);
// pair had better be locked, as we are assuming
// to own the write lock
assert(out_pairs[i]->value_rwlock.writers());
}
}
// does NOT include the actual key and fullhash we eventually want
// a helper function for the two cachetable_put functions below
static inline PAIR malloc_and_init_pair(
CACHEFILE cachefile,
void *value,
PAIR_ATTR attr,
CACHETABLE_WRITE_CALLBACK write_callback
)
{
CACHETABLE ct = cachefile->cachetable;
CACHEKEY dummy_key = {0};
uint32_t dummy_fullhash = 0;
PAIR XMALLOC(p);
memset(p, 0, sizeof *p);
pair_init(p,
cachefile,
dummy_key,
value,
attr,
CACHETABLE_DIRTY,
dummy_fullhash,
write_callback,
&ct->ev,
&ct->list
);
pair_lock(p);
p->value_rwlock.write_lock(true);
pair_unlock(p);
return p;
}
void toku_cachetable_put_with_dep_pairs(
CACHEFILE cachefile,
CACHETABLE_GET_KEY_AND_FULLHASH get_key_and_fullhash,
......@@ -1090,9 +1037,7 @@ void toku_cachetable_put_with_dep_pairs(
CACHETABLE_WRITE_CALLBACK write_callback,
void *get_key_and_fullhash_extra,
uint32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint
CACHEFILE* dependent_cfs, // array of cachefiles of dependent pairs
CACHEKEY* dependent_keys, // array of cachekeys of dependent pairs
uint32_t* dependent_fullhash, //array of fullhashes of dependent pairs
PAIR* dependent_pairs,
enum cachetable_dirty* dependent_dirty, // array stating dirty/cleanness of dependent pairs
CACHEKEY* key,
uint32_t* fullhash,
......@@ -1110,12 +1055,26 @@ void toku_cachetable_put_with_dep_pairs(
ct->ev.signal_eviction_thread();
}
PAIR p = malloc_and_init_pair(cachefile, value, attr, write_callback);
PAIR p = NULL;
XMALLOC(p);
memset(p, 0, sizeof *p);
ct->list.write_list_lock();
get_key_and_fullhash(key, fullhash, get_key_and_fullhash_extra);
p->key.b = key->b;
p->fullhash = *fullhash;
pair_init(
p,
cachefile,
*key,
value,
attr,
CACHETABLE_DIRTY,
*fullhash,
write_callback,
&ct->ev,
&ct->list
);
pair_lock(p);
p->value_rwlock.write_lock(true);
cachetable_put_internal(
cachefile,
p,
......@@ -1123,15 +1082,7 @@ void toku_cachetable_put_with_dep_pairs(
attr,
put_callback
);
PAIR dependent_pairs[num_dependent_pairs];
get_pairs(
&ct->list,
num_dependent_pairs,
dependent_cfs,
dependent_keys,
dependent_fullhash,
dependent_pairs
);
pair_unlock(p);
bool checkpoint_pending[num_dependent_pairs];
ct->list.write_pending_cheap_lock();
for (uint32_t i = 0; i < num_dependent_pairs; i++) {
......@@ -1165,11 +1116,26 @@ void toku_cachetable_put(CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, v
if (ct->ev.should_client_wake_eviction_thread()) {
ct->ev.signal_eviction_thread();
}
PAIR p = malloc_and_init_pair(cachefile, value, attr, write_callback);
PAIR p = NULL;
XMALLOC(p);
memset(p, 0, sizeof *p);
ct->list.write_list_lock();
p->key.b = key.b;
p->fullhash = fullhash;
pair_init(
p,
cachefile,
key,
value,
attr,
CACHETABLE_DIRTY,
fullhash,
write_callback,
&ct->ev,
&ct->list
);
pair_lock(p);
p->value_rwlock.write_lock(true);
cachetable_put_internal(
cachefile,
p,
......@@ -1177,6 +1143,7 @@ void toku_cachetable_put(CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, v
attr,
put_callback
);
pair_unlock(p);
ct->list.write_list_unlock();
}
......@@ -1210,7 +1177,7 @@ do_partial_fetch(
assert(!p->dirty);
pair_lock(p);
nb_mutex_lock(&p->disk_nb_mutex, &p->mutex);
nb_mutex_lock(&p->disk_nb_mutex, p->mutex);
pair_unlock(p);
int r = pf_callback(p->value_data, p->disk_data, read_extraargs, cachefile->fd, &new_attr);
lazy_assert_zero(r);
......@@ -1236,15 +1203,12 @@ void toku_cachetable_pf_pinned_pair(
PAIR_ATTR attr;
PAIR p = NULL;
CACHETABLE ct = cf->cachetable;
ct->list.read_list_lock();
ct->list.pair_lock_by_fullhash(fullhash);
p = ct->list.find_pair(cf, key, fullhash);
assert(p != NULL);
assert(p->value_data == value);
assert(p->value_rwlock.writers());
ct->list.read_list_unlock();
pair_lock(p);
nb_mutex_lock(&p->disk_nb_mutex, &p->mutex);
nb_mutex_lock(&p->disk_nb_mutex, p->mutex);
pair_unlock(p);
int fd = cf->fd;
......@@ -1291,9 +1255,7 @@ int toku_cachetable_get_and_pin (
lock_type,
read_extraargs,
0, // number of dependent pairs that we may need to checkpoint
NULL, // array of cachefiles of dependent pairs
NULL, // array of cachekeys of dependent pairs
NULL, //array of fullhashes of dependent pairs
NULL, // array of dependent pairs
NULL // array stating dirty/cleanness of dependent pairs
);
}
......@@ -1321,7 +1283,7 @@ static void cachetable_fetch_pair(
int dirty = 0;
pair_lock(p);
nb_mutex_lock(&p->disk_nb_mutex, &p->mutex);
nb_mutex_lock(&p->disk_nb_mutex, p->mutex);
pair_unlock(p);
int r;
......@@ -1352,9 +1314,6 @@ static bool get_checkpoint_pending(PAIR p, pair_list* pl) {
return checkpoint_pending;
}
static bool resolve_checkpointing_fast(PAIR p, bool checkpoint_pending) {
return !(checkpoint_pending && (p->dirty == CACHETABLE_DIRTY) && !p->clone_callback);
}
static void checkpoint_pair_and_dependent_pairs(
CACHETABLE ct,
PAIR p,
......@@ -1413,13 +1372,10 @@ static void unpin_pair(PAIR p, bool read_lock_grabbed) {
// on output, the pair's mutex is not held.
// if true, we must try again, and pair is not pinned
// if false, we succeeded, the pair is pinned
// NOTE: On entry, the read list lock may be held (and have_read_list_lock must be set accordingly).
// On exit, the read list lock is held.
static bool try_pin_pair(
PAIR p,
CACHETABLE ct,
CACHEFILE cachefile,
bool have_read_list_lock,
pair_lock_type lock_type,
uint32_t num_dependent_pairs,
PAIR* dependent_pairs,
......@@ -1432,32 +1388,15 @@ static bool try_pin_pair(
{
bool dep_checkpoint_pending[num_dependent_pairs];
bool try_again = true;
bool reacquire_lock = !have_read_list_lock;
bool expensive = (lock_type == PL_WRITE_EXPENSIVE);
if (lock_type != PL_READ) {
if (!p->value_rwlock.try_write_lock(expensive)) {
reacquire_lock = true;
if (have_read_list_lock) {
ct->list.read_list_unlock();
}
p->value_rwlock.write_lock(expensive);
}
}
else {
if (!p->value_rwlock.try_read_lock()) {
reacquire_lock = true;
if (have_read_list_lock) {
ct->list.read_list_unlock();
}
p->value_rwlock.read_lock();
}
}
pair_touch(p);
pair_unlock(p);
// reacquire the read list lock here, we hold it for the rest of the function.
if (reacquire_lock) {
ct->list.read_list_lock();
}
bool partial_fetch_required = pf_req_callback(p->value_data,read_extraargs);
......@@ -1483,9 +1422,6 @@ static bool try_pin_pair(
// so we do a sanity check here.
assert(!p->dirty);
// This may be slow, better release and re-grab the
// read list lock.
ct->list.read_list_unlock();
if (lock_type == PL_READ) {
pair_lock(p);
p->value_rwlock.read_unlock();
......@@ -1525,7 +1461,6 @@ static bool try_pin_pair(
// followed by a relock, so we do it again.
bool pf_required = pf_req_callback(p->value_data,read_extraargs);
assert(!pf_required);
ct->list.read_list_lock();
}
if (lock_type != PL_READ) {
......@@ -1566,9 +1501,7 @@ int toku_cachetable_get_and_pin_with_dep_pairs_batched (
pair_lock_type lock_type,
void* read_extraargs, // parameter for fetch_callback, pf_req_callback, and pf_callback
uint32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint
CACHEFILE* dependent_cfs, // array of cachefiles of dependent pairs
CACHEKEY* dependent_keys, // array of cachekeys of dependent pairs
uint32_t* dependent_fullhash, //array of fullhashes of dependent pairs
PAIR* dependent_pairs,
enum cachetable_dirty* dependent_dirty // array stating dirty/cleanness of dependent pairs
)
// See cachetable.h
......@@ -1576,7 +1509,6 @@ int toku_cachetable_get_and_pin_with_dep_pairs_batched (
CACHETABLE ct = cachefile->cachetable;
bool wait = false;
bool already_slept = false;
PAIR dependent_pairs[num_dependent_pairs];
bool dep_checkpoint_pending[num_dependent_pairs];
//
......@@ -1589,31 +1521,19 @@ beginning:
if (wait) {
// We shouldn't be holding the read list lock while
// waiting for the evictor to remove pairs.
ct->list.read_list_unlock();
already_slept = true;
ct->ev.wait_for_cache_pressure_to_subside();
ct->list.read_list_lock();
}
get_pairs(
&ct->list,
num_dependent_pairs,
dependent_cfs,
dependent_keys,
dependent_fullhash,
dependent_pairs
);
ct->list.pair_lock_by_fullhash(fullhash);
PAIR p = ct->list.find_pair(cachefile, key, fullhash);
if (p) {
pair_lock(p);
// on entry, holds p->mutex and read list lock
// on exit, does not hold p->mutex, holds read list lock
// on entry, holds p->mutex (which is locked via pair_lock_by_fullhash)
// on exit, does not hold p->mutex
bool try_again = try_pin_pair(
p,
ct,
cachefile,
true,
lock_type,
num_dependent_pairs,
dependent_pairs,
......@@ -1632,6 +1552,7 @@ beginning:
}
}
else {
ct->list.pair_unlock_by_fullhash(fullhash);
// we only want to sleep once per call to get_and_pin. If we have already
// slept and there is still cache pressure, then we might as
// well just complete the call, because the sleep did not help
......@@ -1649,21 +1570,17 @@ beginning:
// Since the pair was not found, we need the write list
// lock to add it. So, we have to release the read list lock
// first.
ct->list.read_list_unlock();
ct->list.write_list_lock();
ct->list.pair_lock_by_fullhash(fullhash);
p = ct->list.find_pair(cachefile, key, fullhash);
if (p != NULL) {
pair_lock(p);
ct->list.write_list_unlock();
// we will gain the read_list_lock again before exiting try_pin_pair
// on entry, holds p->mutex,
// on exit, does not hold p->mutex, holds read list lock
// on exit, does not hold p->mutex
bool try_again = try_pin_pair(
p,
ct,
cachefile,
false,
lock_type,
num_dependent_pairs,
dependent_pairs,
......@@ -1698,10 +1615,10 @@ beginning:
invariant_notnull(p);
// Pin the pair.
pair_lock(p);
p->value_rwlock.write_lock(true);
pair_unlock(p);
if (lock_type != PL_READ) {
ct->list.read_pending_cheap_lock();
invariant(!p->checkpoint_pending);
......@@ -1711,7 +1628,6 @@ beginning:
}
ct->list.read_pending_cheap_unlock();
}
// We should release the lock before we perform
// these expensive operations.
ct->list.write_list_unlock();
......@@ -1755,11 +1671,6 @@ beginning:
bool pf_required = pf_req_callback(p->value_data,read_extraargs);
assert(!pf_required);
}
// We need to be holding the read list lock when we exit.
// We grab it here because we released it earlier to
// grab the write list lock because the checkpointing and
// fetching are expensive/slow.
ct->list.read_list_lock();
goto got_value;
}
got_value:
......@@ -1781,14 +1692,11 @@ int toku_cachetable_get_and_pin_with_dep_pairs (
pair_lock_type lock_type,
void* read_extraargs, // parameter for fetch_callback, pf_req_callback, and pf_callback
uint32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint
CACHEFILE* dependent_cfs, // array of cachefiles of dependent pairs
CACHEKEY* dependent_keys, // array of cachekeys of dependent pairs
uint32_t* dependent_fullhash, //array of fullhashes of dependent pairs
PAIR* dependent_pairs,
enum cachetable_dirty* dependent_dirty // array stating dirty/cleanness of dependent pairs
)
// See cachetable.h
{
toku_cachetable_begin_batched_pin(cachefile);
int r = toku_cachetable_get_and_pin_with_dep_pairs_batched(
cachefile,
key,
......@@ -1802,12 +1710,9 @@ int toku_cachetable_get_and_pin_with_dep_pairs (
lock_type,
read_extraargs,
num_dependent_pairs,
dependent_cfs,
dependent_keys,
dependent_fullhash,
dependent_pairs,
dependent_dirty
);
toku_cachetable_end_batched_pin(cachefile);
return r;
}
......@@ -1824,12 +1729,9 @@ int toku_cachetable_get_and_pin_with_dep_pairs (
int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, void**value) {
CACHETABLE ct = cachefile->cachetable;
int r = -1;
ct->list.read_list_lock();
ct->list.pair_lock_by_fullhash(fullhash);
PAIR p = ct->list.find_pair(cachefile, key, fullhash);
if (p) {
pair_lock(p);
ct->list.read_list_unlock();
if (p->value_rwlock.try_write_lock(true)) {
if (p && p->value_rwlock.try_write_lock(true)) {
// we got the write lock fast, so continue
ct->list.read_pending_cheap_lock();
//
......@@ -1847,11 +1749,10 @@ int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, uint32
r = 0;
}
ct->list.read_pending_cheap_unlock();
}
pair_unlock(p);
}
else {
ct->list.read_list_unlock();
ct->list.pair_unlock_by_fullhash(fullhash);
}
return r;
}
......@@ -1862,12 +1763,9 @@ int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, uint32
int toku_cachetable_maybe_get_and_pin_clean (CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, void**value) {
CACHETABLE ct = cachefile->cachetable;
int r = -1;
ct->list.read_list_lock();
ct->list.pair_lock_by_fullhash(fullhash);
PAIR p = ct->list.find_pair(cachefile, key, fullhash);
if (p) {
pair_lock(p);
ct->list.read_list_unlock();
if (p->value_rwlock.try_write_lock(true)) {
if (p && p->value_rwlock.try_write_lock(true)) {
// got the write lock fast, so continue
ct->list.read_pending_cheap_lock();
//
......@@ -1877,19 +1775,25 @@ int toku_cachetable_maybe_get_and_pin_clean (CACHEFILE cachefile, CACHEKEY key,
// because it is expensive
//
if (p->checkpoint_pending) {
if (p->dirty) {
p->value_rwlock.write_unlock();
r = -1;
}
else {
p->checkpoint_pending = false;
*value = p->value_data;
r = 0;
}
ct->list.read_pending_cheap_unlock();
}
else {
*value = p->value_data;
r = 0;
}
ct->list.read_pending_cheap_unlock();
pair_unlock(p);
}
else {
ct->list.read_list_unlock();
ct->list.pair_unlock_by_fullhash(fullhash);
}
return r;
}
......@@ -1906,6 +1810,7 @@ int toku_cachetable_maybe_get_and_pin_clean (CACHEFILE cachefile, CACHEKEY key,
//
static int
cachetable_unpin_internal(
PAIR locked_p,
CACHEFILE cachefile,
PAIR p,
enum cachetable_dirty dirty,
......@@ -1918,7 +1823,10 @@ cachetable_unpin_internal(
CACHETABLE ct = cachefile->cachetable;
bool added_data_to_cachetable = false;
// hack for #3969, only exists in case where we run unlockers
if (!locked_p || locked_p->mutex != p->mutex) {
pair_lock(p);
}
PAIR_ATTR old_attr = p->attr;
PAIR_ATTR new_attr = attr;
if (dirty) {
......@@ -1929,7 +1837,9 @@ cachetable_unpin_internal(
}
bool read_lock_grabbed = p->value_rwlock.readers() != 0;
unpin_pair(p, read_lock_grabbed);
if (!locked_p || locked_p->mutex != p->mutex) {
pair_unlock(p);
}
if (attr.is_valid) {
if (new_attr.size > old_attr.size) {
......@@ -1951,18 +1861,18 @@ cachetable_unpin_internal(
}
int toku_cachetable_unpin(CACHEFILE cachefile, PAIR p, enum cachetable_dirty dirty, PAIR_ATTR attr) {
return cachetable_unpin_internal(cachefile, p, dirty, attr, true);
return cachetable_unpin_internal(NULL, cachefile, p, dirty, attr, true);
}
int toku_cachetable_unpin_ct_prelocked_no_flush(CACHEFILE cachefile, PAIR p, enum cachetable_dirty dirty, PAIR_ATTR attr) {
return cachetable_unpin_internal(cachefile, p, dirty, attr, false);
int toku_cachetable_unpin_ct_prelocked_no_flush(PAIR locked_p, CACHEFILE cachefile, PAIR p, enum cachetable_dirty dirty, PAIR_ATTR attr) {
return cachetable_unpin_internal(locked_p, cachefile, p, dirty, attr, false);
}
static void
run_unlockers (UNLOCKERS unlockers) {
run_unlockers (PAIR p, UNLOCKERS unlockers) {
while (unlockers) {
assert(unlockers->locked);
unlockers->locked = false;
unlockers->f(unlockers->extra);
unlockers->f(p, unlockers->extra);
unlockers=unlockers->next;
}
}
......@@ -1974,33 +1884,18 @@ run_unlockers (UNLOCKERS unlockers) {
// pins the pair, then releases the pin,
// and then returns TOKUDB_TRY_AGAIN
//
// on entry and exit, pair mutex is NOT held
// on entry and exit, the list read lock is held
// on entry, pair mutex is held,
// on exit, pair mutex is NOT held
static int
maybe_pin_pair(
PAIR p,
CACHETABLE ct,
pair_lock_type lock_type,
UNLOCKERS unlockers
)
{
int retval = 0;
bool expensive = (lock_type == PL_WRITE_EXPENSIVE);
pair_lock(p);
//
// first try to acquire the necessary locks without releasing the read_list_lock
//
if (lock_type == PL_READ && p->value_rwlock.try_read_lock()) {
pair_unlock(p);
goto exit;
}
if (lock_type != PL_READ && p->value_rwlock.try_write_lock(expensive)){
pair_unlock(p);
goto exit;
}
ct->list.read_list_unlock();
// now that we have released the read_list_lock,
// we can pin the PAIR. In each case, we check to see
// if acquiring the pin is expensive. If so, we run the unlockers, set the
// retval to TOKUDB_TRY_AGAIN, pin AND release the PAIR.
......@@ -2008,30 +1903,22 @@ maybe_pin_pair(
// run the unlockers, as we intend to return the value to the user
if (lock_type == PL_READ) {
if (p->value_rwlock.read_lock_is_expensive()) {
run_unlockers(unlockers);
run_unlockers(p, unlockers);
retval = TOKUDB_TRY_AGAIN;
}
p->value_rwlock.read_lock();
}
else if (lock_type == PL_WRITE_EXPENSIVE || lock_type == PL_WRITE_CHEAP){
if (p->value_rwlock.write_lock_is_expensive()) {
run_unlockers(unlockers);
run_unlockers(p, unlockers);
retval = TOKUDB_TRY_AGAIN;
}
p->value_rwlock.write_lock(expensive);
}
else {
assert(false);
}
// If we are going to be returning TOKUDB_TRY_AGAIN, we might
// as well resolve the checkpointing given the chance. This step is
// not necessary for correctness, it is just an opportunistic optimization.
if (lock_type != PL_READ && retval == TOKUDB_TRY_AGAIN) {
bool checkpoint_pending = get_checkpoint_pending(p, &ct->list);
pair_unlock(p);
write_locked_pair_for_checkpoint(ct, p, checkpoint_pending);
pair_lock(p);
abort();
}
if (retval == TOKUDB_TRY_AGAIN) {
unpin_pair(p, (lock_type == PL_READ));
}
......@@ -2040,23 +1927,9 @@ maybe_pin_pair(
assert(retval == 0);
}
pair_unlock(p);
ct->list.read_list_lock();
exit:
return retval;
}
void toku_cachetable_begin_batched_pin(CACHEFILE cf)
// See cachetable.h.
{
cf->cachetable->list.read_list_lock();
}
void toku_cachetable_end_batched_pin(CACHEFILE cf)
// See cachetable.h.
{
cf->cachetable->list.read_list_unlock();
}
int toku_cachetable_get_and_pin_nonblocking_batched(
CACHEFILE cf,
CACHEKEY key,
......@@ -2079,12 +1952,13 @@ int toku_cachetable_get_and_pin_nonblocking_batched(
lock_type == PL_WRITE_EXPENSIVE
);
try_again:
ct->list.pair_lock_by_fullhash(fullhash);
PAIR p = ct->list.find_pair(cf, key, fullhash);
if (p == NULL) {
// Not found
ct->list.read_list_unlock();
ct->list.pair_unlock_by_fullhash(fullhash);
ct->list.write_list_lock();
ct->list.pair_lock_by_fullhash(fullhash);
p = ct->list.find_pair(cf, key, fullhash);
if (p != NULL) {
// we just did another search with the write list lock and
......@@ -2094,7 +1968,7 @@ try_again:
// the cachetable. For simplicity, we just return
// to the top and restart the function
ct->list.write_list_unlock();
ct->list.read_list_lock();
ct->list.pair_unlock_by_fullhash(fullhash);
goto try_again;
}
......@@ -2109,7 +1983,6 @@ try_again:
CACHETABLE_CLEAN
);
assert(p);
pair_lock(p);
// grab expensive write lock, because we are about to do a fetch
// off disk
// No one can access this pair because
......@@ -2118,7 +1991,7 @@ try_again:
// will not block.
p->value_rwlock.write_lock(true);
pair_unlock(p);
run_unlockers(unlockers); // we hold the write list_lock.
run_unlockers(NULL, unlockers); // we hold the write list_lock.
ct->list.write_list_unlock();
// at this point, only the pair is pinned,
......@@ -2136,14 +2009,10 @@ try_again:
ct->ev.signal_eviction_thread();
}
// We need to be holding the read list lock on exit,
// and we don't want to hold during our wait for
// cache pressure to subside.
ct->list.read_list_lock();
return TOKUDB_TRY_AGAIN;
}
else {
int r = maybe_pin_pair(p, ct, lock_type, unlockers);
int r = maybe_pin_pair(p, lock_type, unlockers);
if (r == TOKUDB_TRY_AGAIN) {
return TOKUDB_TRY_AGAIN;
}
......@@ -2151,26 +2020,7 @@ try_again:
if (lock_type != PL_READ) {
bool checkpoint_pending = get_checkpoint_pending(p, &ct->list);
bool is_checkpointing_fast = resolve_checkpointing_fast(
p,
checkpoint_pending
);
if (!is_checkpointing_fast) {
run_unlockers(unlockers);
}
// We hold the read list lock throughout this call.
// This is O.K. because in production, this function
// should always put the write on a background thread.
write_locked_pair_for_checkpoint(ct, p, checkpoint_pending);
if (!is_checkpointing_fast) {
pair_lock(p);
p->value_rwlock.write_unlock();
pair_unlock(p);
return TOKUDB_TRY_AGAIN;
}
}
// At this point, we have pinned the PAIR
......@@ -2180,12 +2030,7 @@ try_again:
// still check for partial fetch
bool partial_fetch_required = pf_req_callback(p->value_data,read_extraargs);
if (partial_fetch_required) {
// Since we have to do disk I/O we should temporarily
// release the read list lock.
ct->list.read_list_unlock();
// we can unpin without the read list lock
run_unlockers(unlockers);
run_unlockers(NULL, unlockers);
// we are now getting an expensive write lock, because we
// are doing a partial fetch. So, if we previously have
......@@ -2222,10 +2067,6 @@ try_again:
ct->ev.signal_eviction_thread();
}
// We need to be holding the read list lock on exit,
// and we don't want to hold during neither our wait for
// cache pressure to subside, nor our partial fetch.
ct->list.read_list_lock();
return TOKUDB_TRY_AGAIN;
}
else {
......@@ -2254,7 +2095,6 @@ int toku_cachetable_get_and_pin_nonblocking (
// See cachetable.h.
{
int r = 0;
toku_cachetable_begin_batched_pin(cf);
r = toku_cachetable_get_and_pin_nonblocking_batched(
cf,
key,
......@@ -2269,7 +2109,6 @@ int toku_cachetable_get_and_pin_nonblocking (
read_extraargs,
unlockers
);
toku_cachetable_end_batched_pin(cf);
return r;
}
......@@ -2330,17 +2169,17 @@ int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, uint32_t fullhash,
if (ct->ev.should_client_thread_sleep()) {
goto exit;
}
ct->list.read_list_lock();
ct->list.pair_lock_by_fullhash(fullhash);
// lookup
p = ct->list.find_pair(cf, key, fullhash);
// if not found then create a pair in the READING state and fetch it
// if not found then create a pair and fetch it
if (p == NULL) {
cachetable_prefetches++;
ct->list.read_list_unlock();
ct->list.pair_unlock_by_fullhash(fullhash);
ct->list.write_list_lock();
ct->list.pair_lock_by_fullhash(fullhash);
p = ct->list.find_pair(cf, key, fullhash);
if (p != NULL) {
pair_lock(p);
ct->list.write_list_unlock();
goto found_pair;
}
......@@ -2358,7 +2197,6 @@ int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, uint32_t fullhash,
CACHETABLE_CLEAN
);
assert(p);
pair_lock(p);
p->value_rwlock.write_lock(true);
pair_unlock(p);
ct->list.write_list_unlock();
......@@ -2373,8 +2211,6 @@ int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, uint32_t fullhash,
}
goto exit;
}
pair_lock(p);
ct->list.read_list_unlock();
found_pair:
// at this point, p is found, pair's mutex is grabbed, and
......@@ -2595,7 +2431,7 @@ int toku_test_cachetable_unpin(CACHEFILE cachefile, CACHEKEY key, uint32_t fullh
int toku_test_cachetable_unpin_ct_prelocked_no_flush(CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, enum cachetable_dirty dirty, PAIR_ATTR attr) {
// We hold the cachetable mutex.
PAIR p = test_get_pair(cachefile, key, fullhash, true);
return toku_cachetable_unpin_ct_prelocked_no_flush(cachefile, p, dirty, attr);
return toku_cachetable_unpin_ct_prelocked_no_flush(NULL, cachefile, p, dirty, attr);
}
//test-only wrapper
......@@ -2626,7 +2462,7 @@ int toku_cachetable_unpin_and_remove (
// out a cloned value completes
pair_lock(p);
assert(p->value_rwlock.writers());
nb_mutex_lock(&p->disk_nb_mutex, &p->mutex);
nb_mutex_lock(&p->disk_nb_mutex, p->mutex);
pair_unlock(p);
assert(p->cloned_value_data == NULL);
......@@ -3118,6 +2954,22 @@ int cleaner::run_cleaner(void) {
// - this is how a thread that is calling unpin_and_remove will prevent
// the cleaner thread from picking its PAIR (see comments in that function)
do {
//
// We are already holding onto best_pair, if we run across a pair that
// has the same mutex due to a collision in the hashtable, we need
// to be careful.
//
if (best_pair && m_pl->m_cleaner_head->mutex == best_pair->mutex) {
// Advance the cleaner head.
long score = 0;
score = cleaner_thread_rate_pair(m_pl->m_cleaner_head);
if (score > best_score) {
best_score = score;
best_pair = m_pl->m_cleaner_head;
}
m_pl->m_cleaner_head = m_pl->m_cleaner_head->clock_next;
continue;
}
pair_lock(m_pl->m_cleaner_head);
if (m_pl->m_cleaner_head->value_rwlock.users() > 0) {
pair_unlock(m_pl->m_cleaner_head);
......@@ -3217,15 +3069,19 @@ int cleaner::run_cleaner(void) {
static_assert(std::is_pod<pair_list>::value, "pair_list isn't POD");
const uint32_t INITIAL_PAIR_LIST_SIZE = 4;
const uint32_t INITIAL_PAIR_LIST_SIZE = 1<<20;
const uint32_t PAIR_LOCK_SIZE = 1<<20;
// Allocates the hash table of pairs inside this pair list.
//
void pair_list::init() {
m_table_size = INITIAL_PAIR_LIST_SIZE;
m_num_locks = PAIR_LOCK_SIZE;
m_n_in_table = 0;
m_clock_head = NULL;
m_cleaner_head = NULL;
m_checkpoint_head = NULL;
m_pending_head = NULL;
m_table = NULL;
......@@ -3242,6 +3098,10 @@ void pair_list::init() {
toku_pthread_rwlock_init(&m_pending_lock_expensive, &attr);
toku_pthread_rwlock_init(&m_pending_lock_cheap, &attr);
XCALLOC_N(m_table_size, m_table);
XCALLOC_N(m_num_locks, m_mutexes);
for (uint64_t i = 0; i < m_num_locks; i++) {
toku_mutex_init(&m_mutexes[i].aligned_mutex, NULL);
}
}
// Frees the pair_list hash table. It is expected to be empty by
......@@ -3252,15 +3112,20 @@ void pair_list::destroy() {
for (uint32_t i = 0; i < m_table_size; ++i) {
invariant_null(m_table[i]);
}
for (uint64_t i = 0; i < m_num_locks; i++) {
toku_mutex_destroy(&m_mutexes[i].aligned_mutex);
}
toku_pthread_rwlock_destroy(&m_list_lock);
toku_pthread_rwlock_destroy(&m_pending_lock_expensive);
toku_pthread_rwlock_destroy(&m_pending_lock_cheap);
toku_free(m_table);
toku_free(m_mutexes);
}
// This places the given pair inside of the pair list.
//
// requires caller to have grabbed write lock on list.
// requires caller to have p->mutex held as well
//
void pair_list::put(PAIR p) {
// sanity check to make sure that the PAIR does not already exist
......@@ -3272,10 +3137,6 @@ void pair_list::put(PAIR p) {
p->hash_chain = m_table[h];
m_table[h] = p;
m_n_in_table++;
if (m_n_in_table > m_table_size) {
this->rehash(m_table_size * 2);
}
}
// This removes the given pair from the pair list.
......@@ -3292,11 +3153,6 @@ void pair_list::evict(PAIR p) {
// Remove it from the hash chain.
unsigned int h = p->fullhash&(m_table_size - 1);
m_table[h] = this->remove_from_hash_chain(p, m_table[h]);
// possibly rehash
if ((4 * m_n_in_table < m_table_size) && m_table_size > 4) {
this->rehash(m_table_size / 2);
}
}
PAIR pair_list::remove_from_hash_chain (PAIR remove_me, PAIR list) {
......@@ -3318,8 +3174,10 @@ void pair_list::pair_remove (PAIR p) {
invariant(m_clock_head == p);
invariant(p->clock_next == p);
invariant(m_cleaner_head == p);
invariant(m_checkpoint_head == p);
m_clock_head = NULL;
m_cleaner_head = NULL;
m_checkpoint_head = NULL;
}
else {
if (p == m_clock_head) {
......@@ -3328,6 +3186,9 @@ void pair_list::pair_remove (PAIR p) {
if (p == m_cleaner_head) {
m_cleaner_head = m_cleaner_head->clock_next;
}
if (p == m_checkpoint_head) {
m_checkpoint_head = m_checkpoint_head->clock_next;
}
p->clock_prev->clock_next = p->clock_next;
p->clock_next->clock_prev = p->clock_prev;
......@@ -3357,8 +3218,8 @@ void pair_list::pending_pairs_remove (PAIR p) {
// Returns a pair from the pair list, using the given
// pair. If the pair cannot be found, null is returned.
//
//
// requires caller to have grabbed read lock on list.
// requires caller to have grabbed either a read lock on the list or
// bucket's mutex.
//
PAIR pair_list::find_pair(CACHEFILE file, CACHEKEY key, uint32_t fullhash) {
PAIR found_pair = nullptr;
......@@ -3371,34 +3232,6 @@ PAIR pair_list::find_pair(CACHEFILE file, CACHEKEY key, uint32_t fullhash) {
return found_pair;
}
// has ct locked on entry
// This function MUST NOT release and reacquire the cachetable lock
// Its callers (toku_cachetable_put_with_dep_pairs) depend on this behavior.
//
// requires caller to have grabbed write lock on list.
//
void pair_list::rehash (uint32_t newtable_size) {
assert(newtable_size >= 4 && ((newtable_size & (newtable_size - 1))==0));
PAIR *XCALLOC_N(newtable_size, newtable);
assert(newtable!=0);
uint32_t oldtable_size = m_table_size;
m_table_size = newtable_size;
for (uint32_t i = 0; i < newtable_size; i++) {
newtable[i] = 0;
}
for (uint32_t i = 0; i < oldtable_size; i++) {
PAIR p;
while ((p = m_table[i]) != 0) {
unsigned int h = p->fullhash&(newtable_size - 1);
m_table[i] = p->hash_chain;
p->hash_chain = newtable[h];
newtable[h] = p;
}
}
toku_free(m_table);
m_table = newtable;
}
// Add PAIR to linked list shared by cleaner thread and clock
//
// requires caller to have grabbed write lock on list.
......@@ -3412,6 +3245,7 @@ void pair_list::add_to_clock (PAIR p) {
// tail and head exist
if (m_clock_head) {
assert(m_cleaner_head);
assert(m_checkpoint_head);
// insert right before the head
p->clock_next = m_clock_head;
p->clock_prev = m_clock_head->clock_prev;
......@@ -3425,6 +3259,7 @@ void pair_list::add_to_clock (PAIR p) {
m_clock_head = p;
p->clock_next = p->clock_prev = m_clock_head;
m_cleaner_head = p;
m_checkpoint_head = p;
}
}
......@@ -3538,6 +3373,18 @@ void pair_list::write_pending_cheap_unlock() {
toku_pthread_rwlock_wrunlock(&m_pending_lock_cheap);
}
toku_mutex_t* pair_list::get_mutex_for_pair(uint32_t fullhash) {
return &m_mutexes[fullhash&(m_num_locks - 1)].aligned_mutex;
}
void pair_list::pair_lock_by_fullhash(uint32_t fullhash) {
toku_mutex_lock(&m_mutexes[fullhash&(m_num_locks - 1)].aligned_mutex);
}
void pair_list::pair_unlock_by_fullhash(uint32_t fullhash) {
toku_mutex_unlock(&m_mutexes[fullhash&(m_num_locks - 1)].aligned_mutex);
}
ENSURE_POD(evictor);
......@@ -3998,7 +3845,7 @@ void evictor::evict_pair(PAIR p, bool for_checkpoint) {
// the pair's mutex, then grab the write list lock, then regrab the
// pair's mutex. The pair cannot go anywhere because
// the pair is still pinned
nb_mutex_lock(&p->disk_nb_mutex, &p->mutex);
nb_mutex_lock(&p->disk_nb_mutex, p->mutex);
pair_unlock(p);
m_pl->write_list_lock();
pair_lock(p);
......@@ -4322,9 +4169,9 @@ void checkpointer::log_begin_checkpoint() {
// both pending locks are grabbed
//
void checkpointer::turn_on_pending_bits() {
for (uint32_t i = 0; i < m_list->m_table_size; i++) {
PAIR p;
for (p = m_list->m_table[i]; p; p = p->hash_chain) {
PAIR p = NULL;
uint32_t i;
for (i = 0, p = m_list->m_checkpoint_head; i < m_list->m_n_in_table; i++, p = p->clock_next) {
assert(!p->checkpoint_pending);
//Only include pairs belonging to cachefiles in the checkpoint
if (!p->cachefile->for_checkpoint) {
......@@ -4347,7 +4194,7 @@ void checkpointer::turn_on_pending_bits() {
p->pending_prev = NULL;
m_list->m_pending_head = p;
}
}
invariant(p == m_list->m_checkpoint_head);
}
void checkpointer::add_background_job() {
......
......@@ -166,7 +166,7 @@ typedef int (*CACHETABLE_PARTIAL_FETCH_CALLBACK)(void *value_data, void* disk_da
// The cachetable calls the put callback during a cachetable_put command to provide the opaque PAIR.
// The PAIR can then be used to later unpin the pair.
// Returns: 0 if success, otherwise an error number.
typedef void (*CACHETABLE_PUT_CALLBACK)(void *value_data, PAIR p);
typedef void (*CACHETABLE_PUT_CALLBACK)(CACHEKEY key, void *value_data, PAIR p);
// TODO(leif) XXX TODO XXX
typedef int (*CACHETABLE_CLEANER_CALLBACK)(void *ftnode_pv, BLOCKNUM blocknum, uint32_t fullhash, void *write_extraargs);
......@@ -226,9 +226,7 @@ void toku_cachetable_put_with_dep_pairs(
CACHETABLE_WRITE_CALLBACK write_callback,
void *get_key_and_fullhash_extra,
uint32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint
CACHEFILE* dependent_cfs, // array of cachefiles of dependent pairs
CACHEKEY* dependent_keys, // array of cachekeys of dependent pairs
uint32_t* dependent_fullhash, //array of fullhashes of dependent pairs
PAIR* dependent_pairs,
enum cachetable_dirty* dependent_dirty, // array stating dirty/cleanness of dependent pairs
CACHEKEY* key,
uint32_t* fullhash,
......@@ -255,8 +253,6 @@ void toku_cachetable_put(CACHEFILE cf, CACHEKEY key, uint32_t fullhash,
// then the required PAIRs are written to disk for checkpoint.
// KEY PROPERTY OF DEPENDENT PAIRS: They are already locked by the client
// Returns: 0 if the memory object is in memory, otherwise an error number.
// Requires: toku_cachetable_begin_batched_pin must have been called before entering this function.
// Requires: toku_cachetable_end_batched_pin must be called after this function.
// Rationale:
// begin_batched_pin and end_batched_pin take and release a read lock on the pair list.
// Normally, that would be done within this get_and_pin, but we want to pin multiple nodes with a single acquisition of the read lock.
......@@ -273,9 +269,7 @@ int toku_cachetable_get_and_pin_with_dep_pairs_batched (
pair_lock_type lock_type,
void* read_extraargs, // parameter for fetch_callback, pf_req_callback, and pf_callback
uint32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint
CACHEFILE* dependent_cfs, // array of cachefiles of dependent pairs
CACHEKEY* dependent_keys, // array of cachekeys of dependent pairs
uint32_t* dependent_fullhash, //array of fullhashes of dependent pairs
PAIR* dependent_pairs,
enum cachetable_dirty* dependent_dirty // array stating dirty/cleanness of dependent pairs
);
......@@ -294,9 +288,7 @@ int toku_cachetable_get_and_pin_with_dep_pairs (
pair_lock_type lock_type,
void* read_extraargs, // parameter for fetch_callback, pf_req_callback, and pf_callback
uint32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint
CACHEFILE* dependent_cfs, // array of cachefiles of dependent pairs
CACHEKEY* dependent_keys, // array of cachekeys of dependent pairs
uint32_t* dependent_fullhash, //array of fullhashes of dependent pairs
PAIR* dependent_pairs,
enum cachetable_dirty* dependent_dirty // array stating dirty/cleanness of dependent pairs
);
......@@ -332,21 +324,13 @@ void toku_cachetable_pf_pinned_pair(
struct unlockers {
bool locked;
void (*f)(void*extra);
void (*f)(PAIR p, void* extra);
void *extra;
UNLOCKERS next;
};
// Effect: Makes necessary preparations (grabs locks) for pinning multiple nodes.
void toku_cachetable_begin_batched_pin(CACHEFILE cf);
// Effect: Clean up (release locks) after pinning multiple nodes.
void toku_cachetable_end_batched_pin(CACHEFILE cf);
// Effect: If the block is in the cachetable, then return it.
// Otherwise call the functions in unlockers, fetch the data (but don't pin it, since we'll just end up pinning it again later), and return TOKUDB_TRY_AGAIN.
// Requires: toku_cachetable_begin_batched_pin must have been called before entering this function.
// Requires: toku_cachetable_end_batched_pin must be called after this function.
// Rationale:
// begin_batched_pin and end_batched_pin take and release a read lock on the pair list.
// Normally, that would be done within this get_and_pin, but we want to pin multiple nodes with a single acquisition of the read lock.
......@@ -399,7 +383,7 @@ int toku_cachetable_unpin(CACHEFILE, PAIR, enum cachetable_dirty dirty, PAIR_ATT
// Returns: 0 if success, otherwise returns an error number.
// Requires: The ct is locked.
int toku_cachetable_unpin_ct_prelocked_no_flush(CACHEFILE, PAIR, enum cachetable_dirty dirty, PAIR_ATTR size);
int toku_cachetable_unpin_ct_prelocked_no_flush(PAIR, CACHEFILE, PAIR, enum cachetable_dirty dirty, PAIR_ATTR size);
// Effect: The same as tokud_cachetable_unpin, except that the ct must not be locked.
// Requires: The ct is NOT locked.
......
......@@ -34,14 +34,10 @@ cachetable_put_empty_node_with_dep_nodes(
FTNODE* result)
{
FTNODE XMALLOC(new_node);
CACHEFILE dependent_cf[num_dependent_nodes];
BLOCKNUM dependent_keys[num_dependent_nodes];
uint32_t dependent_fullhash[num_dependent_nodes];
PAIR dependent_pairs[num_dependent_nodes];
enum cachetable_dirty dependent_dirty_bits[num_dependent_nodes];
for (uint32_t i = 0; i < num_dependent_nodes; i++) {
dependent_cf[i] = h->cf;
dependent_keys[i] = dependent_nodes[i]->thisnodename;
dependent_fullhash[i] = toku_cachetable_hash(h->cf, dependent_nodes[i]->thisnodename);
dependent_pairs[i] = dependent_nodes[i]->ct_pair;
dependent_dirty_bits[i] = (enum cachetable_dirty) dependent_nodes[i]->dirty;
}
......@@ -53,9 +49,7 @@ cachetable_put_empty_node_with_dep_nodes(
get_write_callbacks_for_node(h),
h,
num_dependent_nodes,
dependent_cf,
dependent_keys,
dependent_fullhash,
dependent_pairs,
dependent_dirty_bits,
name,
fullhash,
......@@ -126,7 +120,6 @@ toku_pin_ftnode_batched(
FTNODE_FETCH_EXTRA bfe,
pair_lock_type lock_type,
bool apply_ancestor_messages, // this bool is probably temporary, for #3972, once we know how range query estimates work, will revisit this
bool end_batch_on_success,
FTNODE *node_p,
bool* msgs_applied)
{
......@@ -159,9 +152,6 @@ try_again_for_write_lock:
goto try_again_for_write_lock;
}
}
if (end_batch_on_success) {
toku_cachetable_end_batched_pin(brt->ft->cf);
}
if (apply_ancestor_messages && node->height == 0) {
if (needs_ancestors_messages) {
invariant(needed_lock_type != PL_READ);
......@@ -219,7 +209,6 @@ toku_pin_ftnode_off_client_thread_and_maybe_move_messages(
FTNODE *node_p,
bool move_messages)
{
toku_cachetable_begin_batched_pin(h->cf);
toku_pin_ftnode_off_client_thread_batched_and_maybe_move_messages(
h,
blocknum,
......@@ -231,7 +220,6 @@ toku_pin_ftnode_off_client_thread_and_maybe_move_messages(
node_p,
move_messages
);
toku_cachetable_end_batched_pin(h->cf);
}
void
......@@ -262,14 +250,10 @@ toku_pin_ftnode_off_client_thread_batched_and_maybe_move_messages(
bool move_messages)
{
void *node_v;
CACHEFILE dependent_cf[num_dependent_nodes];
BLOCKNUM dependent_keys[num_dependent_nodes];
uint32_t dependent_fullhash[num_dependent_nodes];
PAIR dependent_pairs[num_dependent_nodes];
enum cachetable_dirty dependent_dirty_bits[num_dependent_nodes];
for (uint32_t i = 0; i < num_dependent_nodes; i++) {
dependent_cf[i] = h->cf;
dependent_keys[i] = dependent_nodes[i]->thisnodename;
dependent_fullhash[i] = toku_cachetable_hash(h->cf, dependent_nodes[i]->thisnodename);
dependent_pairs[i] = dependent_nodes[i]->ct_pair;
dependent_dirty_bits[i] = (enum cachetable_dirty) dependent_nodes[i]->dirty;
}
......@@ -286,9 +270,7 @@ toku_pin_ftnode_off_client_thread_batched_and_maybe_move_messages(
lock_type,
bfe,
num_dependent_nodes,
dependent_cf,
dependent_keys,
dependent_fullhash,
dependent_pairs,
dependent_dirty_bits
);
assert(r==0);
......
......@@ -68,7 +68,6 @@ toku_pin_ftnode_batched(
FTNODE_FETCH_EXTRA bfe,
pair_lock_type lock_type,
bool apply_ancestor_messages, // this bool is probably temporary, for #3972, once we know how range query estimates work, will revisit this
bool end_batch_on_success,
FTNODE *node_p,
bool* msgs_applied
);
......
......@@ -4340,13 +4340,14 @@ struct unlock_ftnode_extra {
};
// When this is called, the cachetable lock is held
static void
unlock_ftnode_fun (void *v) {
unlock_ftnode_fun (PAIR p, void *v) {
struct unlock_ftnode_extra *x = NULL;
CAST_FROM_VOIDP(x, v);
FT_HANDLE brt = x->ft_handle;
FTNODE node = x->node;
// CT lock is held
int r = toku_cachetable_unpin_ct_prelocked_no_flush(
p,
brt->ft->cf,
node->ct_pair,
(enum cachetable_dirty) node->dirty,
......@@ -4386,13 +4387,9 @@ ft_search_child(FT_HANDLE brt, FTNODE node, int childnum, ft_search_t *search, F
&bfe,
PL_READ, // we try to get a read lock, but we may upgrade to a write lock on a leaf for message application.
true,
(node->height == 1), // end_batch_on_success true iff child is a leaf
&childnode,
&msgs_applied);
if (rr==TOKUDB_TRY_AGAIN) {
// We're going to try again, so we aren't pinning any more
// nodes in this batch.
toku_cachetable_end_batched_pin(brt->ft->cf);
return rr;
}
// We end the batch before applying ancestor messages if we get
......@@ -4573,10 +4570,6 @@ ft_search_node(
// At this point, we must have the necessary partition available to continue the search
//
assert(BP_STATE(node,child_to_search) == PT_AVAIL);
// When we enter, we are in a batch. If we search a node but get
// DB_NOTFOUND and need to search the next node, we'll need to start
// another batch.
bool must_begin_batch = false;
while (child_to_search >= 0 && child_to_search < node->n_children) {
//
// Normally, the child we want to use is available, as we checked
......@@ -4592,10 +4585,6 @@ ft_search_node(
}
const struct pivot_bounds next_bounds = next_pivot_keys(node, child_to_search, bounds);
if (node->height > 0) {
if (must_begin_batch) {
toku_cachetable_begin_batched_pin(brt->ft->cf);
must_begin_batch = false;
}
r = ft_search_child(
brt,
node,
......@@ -4655,7 +4644,6 @@ ft_search_node(
maybe_search_save_bound(node, child_to_search, search);
// We're about to pin some more nodes, but we thought we were done before.
must_begin_batch = true;
if (search->direction == FT_SEARCH_LEFT) {
child_to_search++;
}
......@@ -4722,11 +4710,6 @@ try_again:
uint32_t fullhash;
CACHEKEY root_key;
toku_calculate_root_offset_pointer(ft, &root_key, &fullhash);
// Begin a batch of pins here. If a child gets TOKUDB_TRY_AGAIN
// it must immediately end the batch. Otherwise, it must end the
// batch as soon as it pins the leaf. The batch will never be
// ended in this function.
toku_cachetable_begin_batched_pin(ft->cf);
toku_pin_ftnode_off_client_thread_batched(
ft,
root_key,
......@@ -4737,12 +4720,6 @@ try_again:
NULL,
&node
);
if (node->height == 0) {
// The root is a leaf, must end the batch now because we
// won't apply ancestor messages, which is where we usually
// end it.
toku_cachetable_end_batched_pin(ft->cf);
}
}
uint tree_height = node->height + 1; // How high is the tree? This is the height of the root node plus one (leaf is at height 0).
......@@ -5248,7 +5225,6 @@ toku_ft_keyrange_internal (FT_HANDLE brt, FTNODE node,
bfe,
PL_READ, // may_modify_node is false, because node guaranteed to not change
false,
false,
&childnode,
&msgs_applied
);
......@@ -5296,7 +5272,6 @@ try_again:
uint32_t fullhash;
CACHEKEY root_key;
toku_calculate_root_offset_pointer(brt->ft, &root_key, &fullhash);
toku_cachetable_begin_batched_pin(brt->ft->cf);
toku_pin_ftnode_off_client_thread_batched(
brt->ft,
root_key,
......@@ -5321,7 +5296,6 @@ try_again:
numrows,
&bfe, &unlockers, (ANCESTORS)NULL, &infinite_bounds);
assert(r == 0 || r == TOKUDB_TRY_AGAIN);
toku_cachetable_end_batched_pin(brt->ft->cf);
if (r == TOKUDB_TRY_AGAIN) {
assert(!unlockers.locked);
goto try_again;
......
......@@ -291,7 +291,7 @@ static void ft_note_unpin_by_checkpoint (CACHEFILE UU(cachefile), void *header_v
// End of Functions that are callbacks to the cachefile
/////////////////////////////////////////////////////////////////////////
void toku_node_save_ct_pair(void *value_data, PAIR p) {
void toku_node_save_ct_pair(CACHEKEY UU(key), void *value_data, PAIR p) {
FTNODE CAST_FROM_VOIDP(node, value_data);
node->ct_pair = p;
}
......
......@@ -102,7 +102,7 @@ void toku_ft_set_basementnodesize(FT ft, unsigned int basementnodesize);
void toku_ft_get_basementnodesize(FT ft, unsigned int *basementnodesize);
void toku_ft_set_compression_method(FT ft, enum toku_compression_method method);
void toku_ft_get_compression_method(FT ft, enum toku_compression_method *methodp);
void toku_node_save_ct_pair(void *value_data, PAIR p);
void toku_node_save_ct_pair(CACHEKEY UU(key), void *value_data, PAIR p);
// mark the ft as a blackhole. any message injections will be a no op.
void toku_ft_set_blackhole(FT_HANDLE ft_handle);
......
......@@ -64,7 +64,7 @@ rollback_memory_size(ROLLBACK_LOG_NODE log) {
return make_rollback_pair_attr(size);
}
static void toku_rollback_node_save_ct_pair(void *value_data, PAIR p) {
static void toku_rollback_node_save_ct_pair(CACHEKEY UU(key), void *value_data, PAIR p) {
ROLLBACK_LOG_NODE CAST_FROM_VOIDP(log, value_data);
log->ct_pair = p;
}
......@@ -256,7 +256,7 @@ void toku_get_and_pin_rollback_log(TOKUTXN txn, BLOCKNUM blocknum, uint32_t hash
toku_rollback_pf_callback,
PL_WRITE_CHEAP, // lock_type
h,
0, NULL, NULL, NULL, NULL
0, NULL, NULL
);
assert(r == 0);
ROLLBACK_LOG_NODE CAST_FROM_VOIDP(pinned_log, value);
......
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id$"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#include "includes.h"
#include "test.h"
#include "cachetable-test.h"
CACHETABLE ct;
CACHEFILE f1;
static void
unlock_test_fun (void *v) {
assert(v == NULL);
// CT lock is held
int r = toku_test_cachetable_unpin_ct_prelocked_no_flush(f1, make_blocknum(2), 2, CACHETABLE_CLEAN, make_pair_attr(8));
assert(r==0);
}
static void
run_test (void) {
const int test_limit = 20;
int r;
ct = NULL;
toku_cachetable_create(&ct, test_limit, ZERO_LSN, NULL_LOGGER);
char fname1[] = __SRCFILE__ "test1.dat";
unlink(fname1);
f1 = NULL;
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
create_dummy_functions(f1);
void* v1;
void* v2;
long s1;
long s2;
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, def_write_callback(NULL), def_fetch, def_pf_req_callback, def_pf_callback, true, NULL);
r = toku_test_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, make_pair_attr(8)); assert(r==0);
for (int i = 0; i < 20; i++) {
r = toku_cachetable_get_and_pin(f1, make_blocknum(2), 2, &v2, &s2, def_write_callback(NULL), def_fetch, def_pf_req_callback, def_pf_callback, true, NULL);
r = toku_test_cachetable_unpin(f1, make_blocknum(2), 2, CACHETABLE_CLEAN, make_pair_attr(8)); assert(r==0);
}
//
// so at this point, we have 16 bytes in the cachetable that has a limit of 20 bytes
// block 2 has been touched much more than block 1, so if one had to be evicted,
// it would be block 2
//
// pin 1 and 2
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v2, &s2, def_write_callback(NULL), def_fetch, def_pf_req_callback, def_pf_callback, true, NULL);
CHECKPOINTER cp = toku_cachetable_get_checkpointer(ct);
toku_cachetable_begin_checkpoint(cp, NULL);
// mark nodes as pending a checkpoint, so that get_and_pin_nonblocking on block 1 will return TOKUDB_TRY_AGAIN
r = toku_test_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_DIRTY, make_pair_attr(8)); assert(r==0);
r = toku_cachetable_get_and_pin(f1, make_blocknum(2), 2, &v2, &s2, def_write_callback(NULL), def_fetch, def_pf_req_callback, def_pf_callback, true, NULL);
// now we try to pin 1, and it should get evicted out from under us
struct unlockers foo;
foo.extra = NULL;
foo.locked = true;
foo.f = unlock_test_fun;
foo.next = NULL;
r = toku_cachetable_get_and_pin_nonblocking(
f1,
make_blocknum(1),
1,
&v1,
&s1,
def_write_callback(NULL),
def_fetch,
def_pf_req_callback,
def_pf_callback,
PL_WRITE_EXPENSIVE,
NULL,
&foo
);
assert(r==TOKUDB_TRY_AGAIN);
toku_cachetable_end_checkpoint(
cp,
NULL,
NULL,
NULL
);
toku_cachetable_verify(ct);
toku_cachefile_close(&f1, false, ZERO_LSN);
toku_cachetable_close(&ct);
}
int
test_main(int argc, const char *argv[]) {
default_parse_args(argc, argv);
run_test();
return 0;
}
......@@ -109,6 +109,7 @@ void checkpointer_test::test_pending_bits() {
// 2. One entry in pair chain
//
struct cachefile cf;
cf.cachetable = &ctbl;
memset(&cf, 0, sizeof(cf));
cf.next = NULL;
cf.for_checkpoint = true;
......
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id: cachetable-cleaner-thread-simple.cc 48237 2012-09-24 18:27:59Z esmet $"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#include "includes.h"
#include "test.h"
//
// This test verifies that the cleaner thread doesn't call the callback if
// nothing needs flushing.
//
CACHEFILE f1;
bool my_cleaner_callback_called;
static int
my_cleaner_callback(
void* UU(ftnode_pv),
BLOCKNUM blocknum,
uint32_t fullhash,
void* UU(extraargs)
)
{
PAIR_ATTR attr = make_pair_attr(8);
attr.cache_pressure_size = 0;
int r = toku_test_cachetable_unpin(f1, blocknum, fullhash, CACHETABLE_CLEAN, attr);
my_cleaner_callback_called = true;
return r;
}
// point of this test is to have two pairs that have the same fullhash,
// and therefore, the same bucket mutex
static void
run_test (void) {
const int test_limit = 1000;
int r;
CACHETABLE ct;
toku_cachetable_create(&ct, test_limit, ZERO_LSN, NULL_LOGGER);
my_cleaner_callback_called = false;
char fname1[] = __SRCFILE__ "test1.dat";
unlink(fname1);
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
void* vs[5];
//void* v2;
long ss[5];
//long s2;
CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
wc.cleaner_callback = my_cleaner_callback;
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &vs[0], &ss[0],
wc,
def_fetch,
def_pf_req_callback,
def_pf_callback,
true,
NULL);
PAIR_ATTR attr = make_pair_attr(8);
attr.cache_pressure_size = 100;
r = toku_test_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, attr);
r = toku_cachetable_get_and_pin(f1, make_blocknum(2), 1, &vs[1], &ss[1],
wc,
def_fetch,
def_pf_req_callback,
def_pf_callback,
true,
NULL);
attr = make_pair_attr(8);
attr.cache_pressure_size = 50;
r = toku_test_cachetable_unpin(f1, make_blocknum(2), 1, CACHETABLE_CLEAN, attr);
toku_cleaner_thread_for_test(ct);
assert(my_cleaner_callback_called);
toku_cachetable_verify(ct);
toku_cachefile_close(&f1, false, ZERO_LSN);
toku_cachetable_close(&ct);
}
int
test_main(int argc, const char *argv[]) {
default_parse_args(argc, argv);
run_test();
return 0;
}
......@@ -65,13 +65,8 @@ cachetable_test (enum cachetable_dirty dirty, bool cloneable) {
assert(r == 0);
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, PL_WRITE_EXPENSIVE, NULL, NULL);
if (dirty == CACHETABLE_DIRTY && !cloneable) {
assert(r == TOKUDB_TRY_AGAIN);
}
else {
assert(r == 0);
r = toku_test_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, make_pair_attr(8));
}
toku_cachetable_end_checkpoint(
cp,
......
......@@ -20,6 +20,7 @@
int64_t data[NUM_ELEMENTS];
int64_t checkpointed_data[NUM_ELEMENTS];
PAIR data_pair[NUM_ELEMENTS];
uint32_t time_of_test;
bool run_test;
......@@ -70,7 +71,7 @@ flush (CACHEFILE f __attribute__((__unused__)),
static int
fetch (CACHEFILE f __attribute__((__unused__)),
PAIR UU(p),
PAIR p,
int UU(fd),
CACHEKEY k,
uint32_t fullhash __attribute__((__unused__)),
......@@ -87,6 +88,7 @@ fetch (CACHEFILE f __attribute__((__unused__)),
int64_t* XMALLOC(data_val);
usleep(10);
*data_val = data[data_index];
data_pair[data_index] = p;
*value = data_val;
*sizep = make_pair_attr(8);
return 0;
......@@ -153,8 +155,6 @@ static void *move_numbers(void *arg) {
NULL,
0, //num_dependent_pairs
NULL,
NULL,
NULL,
NULL
);
assert(r==0);
......@@ -164,6 +164,7 @@ static void *move_numbers(void *arg) {
greater_key.b = greater;
uint32_t greater_fullhash = greater;
enum cachetable_dirty greater_dirty = CACHETABLE_DIRTY;
PAIR dep_pair = data_pair[less];
r = toku_cachetable_get_and_pin_with_dep_pairs(
f1,
make_blocknum(greater),
......@@ -174,9 +175,7 @@ static void *move_numbers(void *arg) {
PL_WRITE_CHEAP,
NULL,
1, //num_dependent_pairs
&f1,
&less_key,
&less_fullhash,
&dep_pair,
&less_dirty
);
assert(r==0);
......@@ -196,6 +195,7 @@ static void *move_numbers(void *arg) {
third = (random() % (num_possible_values)) + greater + 1;
CACHEKEY third_key;
third_key.b = third;
dep_pair = data_pair[greater];
uint32_t third_fullhash = third;
enum cachetable_dirty third_dirty = CACHETABLE_DIRTY;
r = toku_cachetable_get_and_pin_with_dep_pairs(
......@@ -208,9 +208,7 @@ static void *move_numbers(void *arg) {
PL_WRITE_CHEAP,
NULL,
1, //num_dependent_pairs
&f1,
&greater_key,
&greater_fullhash,
&dep_pair,
&greater_dirty
);
assert(r==0);
......
......@@ -23,10 +23,21 @@
int64_t data[NUM_ELEMENTS];
int64_t checkpointed_data[NUM_ELEMENTS];
PAIR data_pair[NUM_ELEMENTS];
uint32_t time_of_test;
bool run_test;
static void
put_callback_pair(
CACHEKEY key,
void *UU(v),
PAIR p)
{
int64_t data_index = key.b;
data_pair[data_index] = p;
}
static void
clone_callback(
void* value_data,
......@@ -72,7 +83,7 @@ flush (CACHEFILE f __attribute__((__unused__)),
static int
fetch (CACHEFILE f __attribute__((__unused__)),
PAIR UU(p),
PAIR p,
int UU(fd),
CACHEKEY k,
uint32_t fullhash __attribute__((__unused__)),
......@@ -92,6 +103,7 @@ fetch (CACHEFILE f __attribute__((__unused__)),
int64_t* XMALLOC(data_val);
usleep(10);
*data_val = data[data_index];
data_pair[data_index] = p;
*value = data_val;
*sizep = make_pair_attr(8);
return 0;
......@@ -136,6 +148,7 @@ static void move_number_to_child(
CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
wc.flush_callback = flush;
wc.clone_callback = clone_callback;
PAIR dep_pair = data_pair[parent];
r = toku_cachetable_get_and_pin_with_dep_pairs(
f1,
child_key,
......@@ -146,9 +159,7 @@ static void move_number_to_child(
PL_WRITE_CHEAP,
NULL,
1, //num_dependent_pairs
&f1,
&parent_key,
&parent_fullhash,
&dep_pair,
&parent_dirty
);
assert(r==0);
......@@ -194,8 +205,6 @@ static void *move_numbers(void *arg) {
NULL,
0, //num_dependent_pairs
NULL,
NULL,
NULL,
NULL
);
assert(r==0);
......@@ -249,6 +258,7 @@ static void merge_and_split_child(
CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
wc.flush_callback = flush;
wc.clone_callback = clone_callback;
PAIR dep_pair = data_pair[parent];
r = toku_cachetable_get_and_pin_with_dep_pairs(
f1,
child_key,
......@@ -259,9 +269,7 @@ static void merge_and_split_child(
PL_WRITE_CHEAP,
NULL,
1, //num_dependent_pairs
&f1,
&parent_key,
&parent_fullhash,
&dep_pair,
&parent_dirty
);
assert(r==0);
......@@ -270,18 +278,12 @@ static void merge_and_split_child(
CACHEKEY other_child_key;
other_child_key.b = other_child;
uint32_t other_child_fullhash = toku_cachetable_hash(f1, other_child_key);
CACHEFILE cfs[2];
cfs[0] = f1;
cfs[1] = f1;
CACHEKEY keys[2];
keys[0] = parent_key;
keys[1] = child_key;
uint32_t hashes[2];
hashes[0] = parent_fullhash;
hashes[1] = child_fullhash;
enum cachetable_dirty dirties[2];
dirties[0] = parent_dirty;
dirties[1] = child_dirty;
PAIR dep_pairs[2];
dep_pairs[0] = data_pair[parent];
dep_pairs[1] = data_pair[child];
r = toku_cachetable_get_and_pin_with_dep_pairs(
f1,
......@@ -293,9 +295,7 @@ static void merge_and_split_child(
PL_WRITE_CHEAP,
NULL,
2, //num_dependent_pairs
cfs,
keys,
hashes,
dep_pairs,
dirties
);
assert(r==0);
......@@ -323,13 +323,11 @@ static void merge_and_split_child(
wc,
&other_child,
2, // number of dependent pairs that we may need to checkpoint
cfs,
keys,
hashes,
dep_pairs,
dirties,
&new_key,
&new_fullhash,
put_callback_nop
put_callback_pair
);
assert(new_key.b == other_child);
assert(new_fullhash == other_child_fullhash);
......@@ -372,8 +370,6 @@ static void *merge_and_split(void *arg) {
NULL,
0, //num_dependent_pairs
NULL,
NULL,
NULL,
NULL
);
assert(r==0);
......
......@@ -27,7 +27,7 @@ static void kibbutz_work(void *fe_v)
}
static void
unlock_dummy (void* UU(v)) {
unlock_dummy (PAIR UU(p), void* UU(v)) {
}
static void reset_unlockers(UNLOCKERS unlockers) {
......@@ -49,7 +49,7 @@ run_test (pair_lock_type lock_type) {
void* v1;
long s1;
CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
r = toku_cachetable_get_and_pin_with_dep_pairs(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, lock_type, NULL, 0, NULL, NULL, NULL, NULL);
r = toku_cachetable_get_and_pin_with_dep_pairs(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, lock_type, NULL, 0, NULL, NULL);
cachefile_kibbutz_enq(f1, kibbutz_work, f1);
reset_unlockers(&unlockers);
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, PL_WRITE_EXPENSIVE, NULL, &unlockers);
......@@ -67,7 +67,7 @@ run_test (pair_lock_type lock_type) {
// now do the same test with a partial fetch required
pf_called = false;
r = toku_cachetable_get_and_pin_with_dep_pairs(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, true_pf_req_callback, true_pf_callback, lock_type, NULL, 0, NULL, NULL, NULL, NULL);
r = toku_cachetable_get_and_pin_with_dep_pairs(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, true_pf_req_callback, true_pf_callback, lock_type, NULL, 0, NULL, NULL);
assert(pf_called);
cachefile_kibbutz_enq(f1, kibbutz_work, f1);
reset_unlockers(&unlockers);
......
......@@ -13,6 +13,7 @@ uint64_t val2;
uint64_t val3;
bool check_me;
static void
flush (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
......@@ -46,9 +47,11 @@ flush (CACHEFILE f __attribute__((__unused__)),
}
}
PAIR* dest_pair;
static int
fetch (CACHEFILE f __attribute__((__unused__)),
PAIR UU(p),
PAIR p,
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
uint32_t fullhash __attribute__((__unused__)),
......@@ -61,6 +64,7 @@ fetch (CACHEFILE f __attribute__((__unused__)),
*dirtyp = 0;
*value = extraargs;
*sizep = make_pair_attr(8);
*dest_pair = p;
return 0;
}
......@@ -82,22 +86,16 @@ cachetable_test (bool write_first, bool write_second, bool start_checkpoint) {
long s1;
long s2;
long s3;
PAIR dependent_pairs[2];
CACHETABLE_WRITE_CALLBACK wc = def_write_callback(&val1);
wc.flush_callback = flush;
wc.write_extraargs = &val1;
dest_pair = &dependent_pairs[0];
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, wc, fetch, def_pf_req_callback, def_pf_callback, true, &val1);
dest_pair = &dependent_pairs[1];
wc.write_extraargs = &val2;
r = toku_cachetable_get_and_pin(f1, make_blocknum(2), 2, &v2, &s2, wc, fetch, def_pf_req_callback, def_pf_callback, true, &val2);
CACHEFILE dependent_cfs[2];
dependent_cfs[0] = f1;
dependent_cfs[1] = f1;
CACHEKEY dependent_keys[2];
dependent_keys[0] = make_blocknum(1);
dependent_keys[1] = make_blocknum(2);
uint32_t dependent_fullhash[2];
dependent_fullhash[0] = 1;
dependent_fullhash[1] = 2;
// now we set the dirty state of these two.
enum cachetable_dirty cd[2];
cd[0] = write_first ? CACHETABLE_DIRTY : CACHETABLE_CLEAN;
......@@ -126,9 +124,7 @@ cachetable_test (bool write_first, bool write_second, bool start_checkpoint) {
PL_WRITE_EXPENSIVE,
&val3,
2, //num_dependent_pairs
dependent_cfs,
dependent_keys,
dependent_fullhash,
dependent_pairs,
cd
);
if (start_checkpoint) {
......
......@@ -35,7 +35,7 @@ static void kibbutz_work(void *fe_v)
}
static void
unlock_dummy (void* UU(v)) {
unlock_dummy (PAIR UU(p), void* UU(v)) {
}
static void reset_unlockers(UNLOCKERS unlockers) {
......
......@@ -100,25 +100,6 @@ run_test (void) {
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, true_def_pf_req_callback, true_def_pf_callback, PL_WRITE_EXPENSIVE, NULL, NULL);
assert(r==TOKUDB_TRY_AGAIN);
//
// now test that if there is a checkpoint pending,
// first pin and unpin with dirty
//
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, PL_WRITE_EXPENSIVE, NULL, NULL);
assert(r==0);
r = toku_test_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_DIRTY, make_pair_attr(8)); assert(r==0);
// this should mark the PAIR as pending
CHECKPOINTER cp = toku_cachetable_get_checkpointer(ct);
toku_cachetable_begin_checkpoint(cp, NULL);
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, PL_WRITE_EXPENSIVE, NULL, NULL);
assert(r==TOKUDB_TRY_AGAIN);
toku_cachetable_end_checkpoint(
cp,
NULL,
NULL,
NULL
);
toku_cachetable_verify(ct);
toku_cachefile_close(&f1, false, ZERO_LSN);
toku_cachetable_close(&ct);
......
......@@ -12,6 +12,17 @@ bool v2_written;
uint64_t val2;
uint64_t val3;
bool check_me;
PAIR* dest_pair;
static void
put_callback_pair(
CACHEKEY UU(key),
void *UU(v),
PAIR p)
{
*dest_pair = p;
}
static void
flush (CACHEFILE f __attribute__((__unused__)),
......@@ -61,6 +72,7 @@ fetch (CACHEFILE f __attribute__((__unused__)),
*dirtyp = 0;
*value = extraargs;
*sizep = make_pair_attr(8);
*dest_pair = p;
return 0;
}
......@@ -87,22 +99,16 @@ cachetable_test (bool write_first, bool write_second, bool start_checkpoint) {
void* v2;
long s1;
long s2;
PAIR dependent_pairs[2];
CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
wc.flush_callback = flush;
dest_pair = &dependent_pairs[0];
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, wc, fetch, def_pf_req_callback, def_pf_callback, true, &val1);
assert(r==0);
dest_pair = &dependent_pairs[1];
r = toku_cachetable_get_and_pin(f1, make_blocknum(2), 2, &v2, &s2, wc, fetch, def_pf_req_callback, def_pf_callback, true, &val2);
assert(r==0);
CACHEFILE dependent_cfs[2];
dependent_cfs[0] = f1;
dependent_cfs[1] = f1;
CACHEKEY dependent_keys[2];
dependent_keys[0] = make_blocknum(1);
dependent_keys[1] = make_blocknum(2);
uint32_t dependent_fullhash[2];
dependent_fullhash[0] = 1;
dependent_fullhash[1] = 2;
// now we set the dirty state of these two.
enum cachetable_dirty cd[2];
cd[0] = write_first ? CACHETABLE_DIRTY : CACHETABLE_CLEAN;
......@@ -123,6 +129,8 @@ cachetable_test (bool write_first, bool write_second, bool start_checkpoint) {
CACHEKEY put_key;
uint32_t put_fullhash;
PAIR dummy_pair;
dest_pair = &dummy_pair;
toku_cachetable_put_with_dep_pairs(
f1,
get_key_and_fullhash,
......@@ -131,13 +139,11 @@ cachetable_test (bool write_first, bool write_second, bool start_checkpoint) {
wc,
NULL,
2, //num_dependent_pairs
dependent_cfs,
dependent_keys,
dependent_fullhash,
dependent_pairs,
cd,
&put_key,
&put_fullhash,
put_callback_nop
put_callback_pair
);
assert(put_key.b == 3);
assert(put_fullhash == 3);
......
......@@ -41,7 +41,7 @@ cachetable_test (void) {
long s1;
//long s2;
CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, true, NULL);
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), toku_cachetable_hash(f1, make_blocknum(1)), &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, true, NULL);
CHECKPOINTER cp = toku_cachetable_get_checkpointer(ct);
toku_cachetable_begin_checkpoint(cp, NULL);
r = toku_test_cachetable_unpin_and_remove(f1, make_blocknum(1), remove_key_expect_checkpoint, NULL);
......@@ -52,7 +52,7 @@ cachetable_test (void) {
NULL
);
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, true, NULL);
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), toku_cachetable_hash(f1, make_blocknum(1)), &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, true, NULL);
r = toku_test_cachetable_unpin_and_remove(f1, make_blocknum(1), remove_key_expect_no_checkpoint, NULL);
toku_cachetable_verify(ct);
......
......@@ -185,6 +185,7 @@ def_fetch (CACHEFILE f __attribute__((__unused__)),
static UU() void
put_callback_nop(
CACHEKEY UU(key),
void *UU(v),
PAIR UU(p)) {
}
......
......@@ -37,6 +37,10 @@ typedef struct toku_mutex {
#endif
} toku_mutex_t;
typedef struct toku_mutex_aligned {
toku_mutex_t aligned_mutex __attribute__((__aligned__(64)));
} toku_mutex_aligned_t;
#if defined(__FreeBSD__)
# define TOKU_MUTEX_ADAPTIVE PTHREAD_MUTEX_ADAPTIVE_NP
static const toku_mutex_t ZERO_MUTEX_INITIALIZER = {0};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment