Commit 3a887ea7 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

refs #5312, merge to main

git-svn-id: file:///svn/toku/tokudb@47022 c7de825b-a66e-492c-adef-691d508d4ae1
parent 9c9aa700
......@@ -8,6 +8,7 @@
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "frwlock.h"
#include "nonblocking_mutex.h"
#include "kibbutz.h"
#include "background_job_manager.h"
......@@ -32,20 +33,20 @@
// - pair_list->pending_lock_cheap
// - cachefile_list->lock
// - PAIR->mutex
// - PAIR->value_nb_mutex
// - PAIR->value_rwlock
// - PAIR->disk_nb_mutex
//
// Here are rules for how the locks interact:
// - To grab any of the pair_list's locks, or the cachefile_list's lock,
// the cachetable must be in existence
// - To grab the PAIR mutex, we must know the PAIR will not dissappear:
// - the PAIR must be pinned (value_nb_mutex or disk_nb_mutex is held)
// - the PAIR must be pinned (value_rwlock or disk_nb_mutex is held)
// - OR, the pair_list's list lock is held
// - As a result, to get rid of a PAIR from the pair_list, we must hold
// both the pair_list's list_lock and the PAIR's mutex
// - To grab PAIR->value_nb_mutex, we must hold the PAIR's mutex
// - To grab PAIR->value_rwlock, we must hold the PAIR's mutex
// - To grab PAIR->disk_nb_mutex, we must hold the PAIR's mutex
// and hold PAIR->value_nb_mutex
// and hold PAIR->value_rwlock
//
// Now let's talk about ordering. Here is an order from outer to inner (top locks must be grabbed first)
// - pair_list->pending_lock_expensive
......@@ -55,7 +56,7 @@
// - pair_list->pending_lock_cheap <-- after grabbing this lock,
// NO other locks
// should be grabbed.
// - when grabbing PAIR->value_nb_mutex or PAIR->disk_nb_mutex,
// - when grabbing PAIR->value_rwlock or PAIR->disk_nb_mutex,
// if the acquisition will not block, then it does not matter if any other locks held,
// BUT if the acquisition will block, then NO other locks may be held besides
// PAIR->mutex.
......@@ -139,7 +140,7 @@ struct ctpair {
long cloned_value_size; // size of cloned_value_data, used for accounting of size_current
void* disk_data; // data used to fetch/flush value_data to and from disk.
// access to these fields are protected by value_nb_mutex
// access to these fields are protected by value_rwlock
void* value_data; // data used by client threads, FTNODEs and ROLLBACK_LOG_NODEs
PAIR_ATTR attr;
enum cachetable_dirty dirty;
......@@ -148,22 +149,22 @@ struct ctpair {
uint32_t count; // clock count
// locks
struct nb_mutex value_nb_mutex; // single writer, protects value_data
toku::frwlock value_rwlock;
struct nb_mutex disk_nb_mutex; // single writer, protects disk_data, is used for writing cloned nodes for checkpoint
toku_mutex_t mutex;
// Access to checkpoint_pending is protected by two mechanisms,
// the value_nb_mutex and the pair_list's pending locks (expensive and cheap).
// the value_rwlock and the pair_list's pending locks (expensive and cheap).
// checkpoint_pending may be true of false.
// Here are the rules for reading/modifying this bit.
// - To transition this field from false to true during begin_checkpoint,
// we must be holding both of the pair_list's pending locks.
// - To transition this field from true to false during end_checkpoint,
// we must be holding the value_nb_mutex.
// we must be holding the value_rwlock.
// - For a non-checkpoint thread to read the value, we must hold both the
// value_nb_mutex and one of the pair_list's pending locks
// value_rwlock and one of the pair_list's pending locks
// - For the checkpoint thread to read the value, we must
// hold the value_nb_mutex
// hold the value_rwlock
//
bool checkpoint_pending; // If this is on, then we have got to resolve checkpointing modifying it.
......
This diff is collapsed.
......@@ -209,6 +209,12 @@ CACHETABLE toku_cachefile_get_cachetable(CACHEFILE cf);
// Effect: Get the cachetable.
typedef enum {
PL_READ = 0,
PL_WRITE_CHEAP,
PL_WRITE_EXPENSIVE
} pair_lock_type;
// put something into the cachetable and checkpoint dependent pairs
// if the checkpointing is necessary
int toku_cachetable_put_with_dep_pairs(
......@@ -265,7 +271,7 @@ int toku_cachetable_get_and_pin_with_dep_pairs_batched (
CACHETABLE_FETCH_CALLBACK fetch_callback,
CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback,
CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,
bool may_modify_value,
pair_lock_type lock_type,
void* read_extraargs, // parameter for fetch_callback, pf_req_callback, and pf_callback
uint32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint
CACHEFILE* dependent_cfs, // array of cachefiles of dependent pairs
......@@ -286,7 +292,7 @@ int toku_cachetable_get_and_pin_with_dep_pairs (
CACHETABLE_FETCH_CALLBACK fetch_callback,
CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback,
CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,
bool may_modify_value,
pair_lock_type lock_type,
void* read_extraargs, // parameter for fetch_callback, pf_req_callback, and pf_callback
uint32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint
CACHEFILE* dependent_cfs, // array of cachefiles of dependent pairs
......@@ -355,7 +361,7 @@ int toku_cachetable_get_and_pin_nonblocking_batched (
CACHETABLE_FETCH_CALLBACK fetch_callback,
CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback __attribute__((unused)),
CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback __attribute__((unused)),
bool may_modify_value,
pair_lock_type lock_type,
void *read_extraargs, // parameter for fetch_callback, pf_req_callback, and pf_callback
UNLOCKERS unlockers
);
......@@ -372,7 +378,7 @@ int toku_cachetable_get_and_pin_nonblocking (
CACHETABLE_FETCH_CALLBACK fetch_callback,
CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback __attribute__((unused)),
CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback __attribute__((unused)),
bool may_modify_value,
pair_lock_type lock_type,
void *read_extraargs, // parameter for fetch_callback, pf_req_callback, and pf_callback
UNLOCKERS unlockers
);
......
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id: frwlock.h 45930 2012-07-19 19:18:35Z zardosht $"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include <toku_assert.h>
namespace toku {
void frwlock::init(toku_mutex_t *const mutex) {
m_mutex = mutex;
m_num_readers = 0;
m_num_writers = 0;
m_num_want_write = 0;
m_num_want_read = 0;
m_num_signaled_readers = 0;
m_num_expensive_want_write = 0;
toku_cond_init(&m_wait_read, nullptr);
m_queue_item_read = { .cond = &m_wait_read, .next = nullptr };
m_wait_read_is_in_queue = false;
m_current_writer_expensive = false;
m_read_wait_expensive = false;
m_wait_head = nullptr;
m_wait_tail = nullptr;
}
void frwlock::deinit(void) {
toku_cond_destroy(&m_wait_read);
}
inline bool frwlock::queue_is_empty(void) const {
return m_wait_head == nullptr;
}
inline void frwlock::enq_item(queue_item *const item) {
invariant_null(item->next);
if (m_wait_tail != nullptr) {
m_wait_tail->next = item;
} else {
invariant_null(m_wait_head);
m_wait_head = item;
}
m_wait_tail = item;
}
inline toku_cond_t *frwlock::deq_item(void) {
invariant_notnull(m_wait_head);
invariant_notnull(m_wait_tail);
queue_item *item = m_wait_head;
m_wait_head = m_wait_head->next;
if (m_wait_tail == item) {
m_wait_tail = nullptr;
}
return item->cond;
}
// Prerequisite: Holds m_mutex.
inline void frwlock::write_lock(bool expensive) {
if (this->try_write_lock(expensive)) {
return;
}
toku_cond_t cond = TOKU_COND_INITIALIZER;
queue_item item = { .cond = &cond, .next = nullptr };
this->enq_item(&item);
// Wait for our turn.
++m_num_want_write;
if (expensive) {
++m_num_expensive_want_write;
}
toku_cond_wait(&cond, m_mutex);
toku_cond_destroy(&cond);
// Now it's our turn.
invariant(m_num_want_write > 0);
invariant_zero(m_num_readers);
invariant_zero(m_num_writers);
invariant_zero(m_num_signaled_readers);
// Not waiting anymore; grab the lock.
--m_num_want_write;
if (expensive) {
--m_num_expensive_want_write;
}
m_num_writers = 1;
m_current_writer_expensive = expensive;
}
inline bool frwlock::try_write_lock(bool expensive) {
if (m_num_readers > 0 || m_num_writers > 0 || m_num_signaled_readers > 0 || m_num_want_write > 0) {
return false;
}
// No one holds the lock. Grant the write lock.
invariant_zero(m_num_want_write);
invariant_zero(m_num_want_read);
m_num_writers = 1;
m_current_writer_expensive = expensive;
return true;
}
inline void frwlock::read_lock(void) {
if (m_num_writers > 0 || m_num_want_write > 0) {
if (!m_wait_read_is_in_queue) {
// Throw the read cond_t onto the queue.
invariant(m_num_signaled_readers == m_num_want_read);
m_queue_item_read.next = nullptr;
this->enq_item(&m_queue_item_read);
m_wait_read_is_in_queue = true;
invariant(!m_read_wait_expensive);
m_read_wait_expensive = (
m_current_writer_expensive ||
(m_num_expensive_want_write > 0)
);
}
// Wait for our turn.
++m_num_want_read;
toku_cond_wait(&m_wait_read, m_mutex);
// Now it's our turn.
invariant_zero(m_num_writers);
invariant(m_num_want_read > 0);
invariant(m_num_signaled_readers > 0);
// Not waiting anymore; grab the lock.
--m_num_want_read;
--m_num_signaled_readers;
}
++m_num_readers;
}
inline bool frwlock::try_read_lock(void) {
if (m_num_writers > 0 || m_num_want_write > 0) {
return false;
}
// No writer holds the lock.
// No writers are waiting.
// Grant the read lock.
++m_num_readers;
return true;
}
inline void frwlock::maybe_signal_next_writer(void) {
if (m_num_want_write > 0 && m_num_signaled_readers == 0 && m_num_readers == 0) {
toku_cond_t *cond = this->deq_item();
invariant(cond != &m_wait_read);
// Grant write lock to waiting writer.
invariant(m_num_want_write > 0);
toku_cond_signal(cond);
}
}
inline void frwlock::read_unlock(void) {
invariant(m_num_writers == 0);
invariant(m_num_readers > 0);
--m_num_readers;
this->maybe_signal_next_writer();
}
inline bool frwlock::read_lock_is_expensive(void) {
if (m_wait_read_is_in_queue) {
return m_read_wait_expensive;
}
else {
return m_current_writer_expensive || (m_num_expensive_want_write > 0);
}
}
inline void frwlock::maybe_signal_or_broadcast_next(void) {
invariant(m_num_signaled_readers == 0);
if (this->queue_is_empty()) {
invariant(m_num_want_write == 0);
invariant(m_num_want_read == 0);
return;
}
toku_cond_t *cond = this->deq_item();
if (cond == &m_wait_read) {
// Grant read locks to all waiting readers
invariant(m_wait_read_is_in_queue);
invariant(m_num_want_read > 0);
m_num_signaled_readers = m_num_want_read;
m_wait_read_is_in_queue = false;
m_read_wait_expensive = false;
toku_cond_broadcast(cond);
}
else {
// Grant write lock to waiting writer.
invariant(m_num_want_write > 0);
toku_cond_signal(cond);
}
}
inline void frwlock::write_unlock(void) {
invariant(m_num_writers == 1);
m_num_writers = 0;
m_current_writer_expensive = false;
this->maybe_signal_or_broadcast_next();
}
inline bool frwlock::write_lock_is_expensive(void) {
return (m_num_expensive_want_write > 0) || (m_current_writer_expensive);
}
inline uint32_t frwlock::users(void) const {
return m_num_readers + m_num_writers + m_num_want_read + m_num_want_write;
}
inline uint32_t frwlock::blocked_users(void) const {
return m_num_want_read + m_num_want_write;
}
inline uint32_t frwlock::writers(void) const {
return m_num_writers;
}
inline uint32_t frwlock::blocked_writers(void) const {
return m_num_want_write;
}
inline uint32_t frwlock::readers(void) const {
return m_num_readers;
}
inline uint32_t frwlock::blocked_readers(void) const {
return m_num_want_read;
}
} // namespace toku
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ifndef TOKU_FRWLOCK_H
#define TOKU_FRWLOCK_H
#ident "$Id: frwlock.h 45930 2012-07-19 19:18:35Z zardosht $"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include <toku_portability.h>
#include <toku_pthread.h>
#include <stdbool.h>
#include <stdint.h>
//TODO: update comment, this is from rwlock.h
namespace toku {
class frwlock {
public:
void init(toku_mutex_t *const mutex);
void deinit(void);
inline void write_lock(bool expensive);
inline bool try_write_lock(bool expensive);
inline void write_unlock(void);
// returns true if acquiring a write lock will be expensive
inline bool write_lock_is_expensive(void);
inline void read_lock(void);
inline bool try_read_lock(void);
inline void read_unlock(void);
// returns true if acquiring a read lock will be expensive
inline bool read_lock_is_expensive(void);
inline uint32_t users(void) const;
inline uint32_t blocked_users(void) const;
inline uint32_t writers(void) const;
inline uint32_t blocked_writers(void) const;
inline uint32_t readers(void) const;
inline uint32_t blocked_readers(void) const;
private:
struct queue_item {
toku_cond_t *cond;
struct queue_item *next;
};
inline bool queue_is_empty(void) const;
inline void enq_item(queue_item *const item);
inline toku_cond_t *deq_item(void);
inline void maybe_signal_or_broadcast_next(void);
inline void maybe_signal_next_writer(void);
toku_mutex_t *m_mutex;
uint32_t m_num_readers;
uint32_t m_num_writers;
uint32_t m_num_want_write;
uint32_t m_num_want_read;
uint32_t m_num_signaled_readers;
// number of writers waiting that are expensive
// MUST be < m_num_want_write
uint32_t m_num_expensive_want_write;
// bool that states if the current writer is expensive
// if there is no current writer, then is false
bool m_current_writer_expensive;
// bool that states if waiting for a read
// is expensive
// if there are currently no waiting readers, then set to false
bool m_read_wait_expensive;
toku_cond_t m_wait_read;
queue_item m_queue_item_read;
bool m_wait_read_is_in_queue;
queue_item *m_wait_head;
queue_item *m_wait_tail;
};
static_assert(std::is_pod<frwlock>::value, "not pod");
} // namespace toku
// include the implementation here
#include "frwlock.cc"
#endif
......@@ -61,7 +61,6 @@ cachetable_put_empty_node_with_dep_nodes(
fullhash,
toku_node_save_ct_pair);
assert_zero(r);
*result = new_node;
}
......@@ -129,7 +128,7 @@ toku_pin_ftnode(
ANCESTORS ancestors,
const PIVOT_BOUNDS bounds,
FTNODE_FETCH_EXTRA bfe,
bool may_modify_node,
pair_lock_type lock_type,
bool apply_ancestor_messages, // this bool is probably temporary, for #3972, once we know how range query estimates work, will revisit this
FTNODE *node_p,
bool* msgs_applied)
......@@ -143,7 +142,7 @@ toku_pin_ftnode(
ancestors,
bounds,
bfe,
may_modify_node,
lock_type,
apply_ancestor_messages,
false,
node_p,
......@@ -162,7 +161,7 @@ toku_pin_ftnode_batched(
ANCESTORS ancestors,
const PIVOT_BOUNDS bounds,
FTNODE_FETCH_EXTRA bfe,
bool may_modify_node,
pair_lock_type lock_type,
bool apply_ancestor_messages, // this bool is probably temporary, for #3972, once we know how range query estimates work, will revisit this
bool end_batch_on_success,
FTNODE *node_p,
......@@ -180,7 +179,7 @@ toku_pin_ftnode_batched(
toku_ftnode_fetch_callback,
toku_ftnode_pf_req_callback,
toku_ftnode_pf_callback,
may_modify_node,
lock_type,
bfe, //read_extraargs
unlockers);
if (r==0) {
......@@ -191,7 +190,7 @@ toku_pin_ftnode_batched(
if (apply_ancestor_messages && node->height == 0) {
toku_apply_ancestors_messages_to_node(brt, node, ancestors, bounds, msgs_applied);
}
if (may_modify_node && node->height > 0) {
if ((lock_type != PL_READ) && node->height > 0) {
toku_move_ftnode_messages_to_stale(brt->ft, node);
}
*node_p = node;
......@@ -209,7 +208,7 @@ toku_pin_ftnode_off_client_thread_and_maybe_move_messages(
BLOCKNUM blocknum,
uint32_t fullhash,
FTNODE_FETCH_EXTRA bfe,
bool may_modify_node,
pair_lock_type lock_type,
uint32_t num_dependent_nodes,
FTNODE* dependent_nodes,
FTNODE *node_p,
......@@ -221,7 +220,7 @@ toku_pin_ftnode_off_client_thread_and_maybe_move_messages(
blocknum,
fullhash,
bfe,
may_modify_node,
lock_type,
num_dependent_nodes,
dependent_nodes,
node_p,
......@@ -236,13 +235,13 @@ toku_pin_ftnode_off_client_thread(
BLOCKNUM blocknum,
uint32_t fullhash,
FTNODE_FETCH_EXTRA bfe,
bool may_modify_node,
pair_lock_type lock_type,
uint32_t num_dependent_nodes,
FTNODE* dependent_nodes,
FTNODE *node_p)
{
toku_pin_ftnode_off_client_thread_and_maybe_move_messages(
h, blocknum, fullhash, bfe, may_modify_node, num_dependent_nodes, dependent_nodes, node_p, true);
h, blocknum, fullhash, bfe, lock_type, num_dependent_nodes, dependent_nodes, node_p, true);
}
void
......@@ -251,7 +250,7 @@ toku_pin_ftnode_off_client_thread_batched_and_maybe_move_messages(
BLOCKNUM blocknum,
uint32_t fullhash,
FTNODE_FETCH_EXTRA bfe,
bool may_modify_node,
pair_lock_type lock_type,
uint32_t num_dependent_nodes,
FTNODE* dependent_nodes,
FTNODE *node_p,
......@@ -279,7 +278,7 @@ toku_pin_ftnode_off_client_thread_batched_and_maybe_move_messages(
toku_ftnode_fetch_callback,
toku_ftnode_pf_req_callback,
toku_ftnode_pf_callback,
may_modify_node,
lock_type,
bfe,
num_dependent_nodes,
dependent_cf,
......@@ -289,7 +288,7 @@ toku_pin_ftnode_off_client_thread_batched_and_maybe_move_messages(
);
assert(r==0);
FTNODE node = (FTNODE) node_v;
if (may_modify_node && node->height > 0 && move_messages) {
if ((lock_type != PL_READ) && node->height > 0 && move_messages) {
toku_move_ftnode_messages_to_stale(h, node);
}
*node_p = node;
......@@ -301,23 +300,23 @@ toku_pin_ftnode_off_client_thread_batched(
BLOCKNUM blocknum,
uint32_t fullhash,
FTNODE_FETCH_EXTRA bfe,
bool may_modify_node,
pair_lock_type lock_type,
uint32_t num_dependent_nodes,
FTNODE* dependent_nodes,
FTNODE *node_p)
{
toku_pin_ftnode_off_client_thread_batched_and_maybe_move_messages(
h, blocknum, fullhash, bfe, may_modify_node, num_dependent_nodes, dependent_nodes, node_p, true);
h, blocknum, fullhash, bfe, lock_type, num_dependent_nodes, dependent_nodes, node_p, true);
}
int toku_maybe_pin_ftnode_clean(FT ft, BLOCKNUM blocknum, uint32_t fullhash, FTNODE *nodep, bool may_modify_node) {
int toku_maybe_pin_ftnode_clean(FT ft, BLOCKNUM blocknum, uint32_t fullhash, FTNODE *nodep) {
void *node_v;
int r = toku_cachetable_maybe_get_and_pin_clean(ft->cf, blocknum, fullhash, &node_v);
if (r != 0) {
goto cleanup;
}
CAST_FROM_VOIDP(*nodep, node_v);
if (may_modify_node && (*nodep)->height > 0) {
if ((*nodep)->height > 0) {
toku_move_ftnode_messages_to_stale(ft, *nodep);
}
cleanup:
......
......@@ -69,7 +69,7 @@ toku_pin_ftnode(
ANCESTORS ancestors,
const PIVOT_BOUNDS pbounds,
FTNODE_FETCH_EXTRA bfe,
bool may_modify_node,
pair_lock_type lock_type,
bool apply_ancestor_messages, // this bool is probably temporary, for #3972, once we know how range query estimates work, will revisit this
FTNODE *node_p,
bool* msgs_applied
......@@ -88,7 +88,7 @@ toku_pin_ftnode_batched(
ANCESTORS ancestors,
const PIVOT_BOUNDS pbounds,
FTNODE_FETCH_EXTRA bfe,
bool may_modify_node,
pair_lock_type lock_type,
bool apply_ancestor_messages, // this bool is probably temporary, for #3972, once we know how range query estimates work, will revisit this
bool end_batch_on_success,
FTNODE *node_p,
......@@ -108,7 +108,7 @@ toku_pin_ftnode_off_client_thread(
BLOCKNUM blocknum,
uint32_t fullhash,
FTNODE_FETCH_EXTRA bfe,
bool may_modify_node,
pair_lock_type lock_type,
uint32_t num_dependent_nodes,
FTNODE* dependent_nodes,
FTNODE *node_p
......@@ -120,7 +120,7 @@ toku_pin_ftnode_off_client_thread_and_maybe_move_messages(
BLOCKNUM blocknum,
uint32_t fullhash,
FTNODE_FETCH_EXTRA bfe,
bool may_modify_node,
pair_lock_type lock_type,
uint32_t num_dependent_nodes,
FTNODE* dependent_nodes,
FTNODE *node_p,
......@@ -131,7 +131,7 @@ toku_pin_ftnode_off_client_thread_and_maybe_move_messages(
* This function may return a pinned ftnode to the caller, if pinning is cheap.
* If the node is already locked, or is pending a checkpoint, the node is not pinned and -1 is returned.
*/
int toku_maybe_pin_ftnode_clean(FT ft, BLOCKNUM blocknum, uint32_t fullhash, FTNODE *nodep, bool may_modify_node);
int toku_maybe_pin_ftnode_clean(FT ft, BLOCKNUM blocknum, uint32_t fullhash, FTNODE *nodep);
/**
* Batched version of toku_pin_ftnode_off_client_thread, see cachetable
......@@ -143,7 +143,7 @@ toku_pin_ftnode_off_client_thread_batched_and_maybe_move_messages(
BLOCKNUM blocknum,
uint32_t fullhash,
FTNODE_FETCH_EXTRA bfe,
bool may_modify_node,
pair_lock_type lock_type,
uint32_t num_dependent_nodes,
FTNODE* dependent_nodes,
FTNODE *node_p,
......@@ -160,7 +160,7 @@ toku_pin_ftnode_off_client_thread_batched(
BLOCKNUM blocknum,
uint32_t fullhash,
FTNODE_FETCH_EXTRA bfe,
bool may_modify_node,
pair_lock_type lock_type,
uint32_t num_dependent_nodes,
FTNODE* dependent_nodes,
FTNODE *node_p
......
......@@ -402,7 +402,7 @@ ct_maybe_merge_child(struct flusher_advice *fa,
toku_calculate_root_offset_pointer(h, &root, &fullhash);
struct ftnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, h);
toku_pin_ftnode_off_client_thread(h, root, fullhash, &bfe, true, 0, NULL, &root_node);
toku_pin_ftnode_off_client_thread(h, root, fullhash, &bfe, PL_WRITE_EXPENSIVE, 0, NULL, &root_node);
toku_assert_entire_node_in_memory(root_node);
toku_ft_release_treelock(h);
......@@ -1342,7 +1342,7 @@ ft_merge_child(
uint32_t childfullhash = compute_child_fullhash(h->cf, node, childnuma);
struct ftnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, h);
toku_pin_ftnode_off_client_thread(h, BP_BLOCKNUM(node, childnuma), childfullhash, &bfe, true, 1, &node, &childa);
toku_pin_ftnode_off_client_thread(h, BP_BLOCKNUM(node, childnuma), childfullhash, &bfe, PL_WRITE_EXPENSIVE, 1, &node, &childa);
}
// for test
call_flusher_thread_callback(flt_flush_before_pin_second_node_for_merge);
......@@ -1353,7 +1353,7 @@ ft_merge_child(
uint32_t childfullhash = compute_child_fullhash(h->cf, node, childnumb);
struct ftnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, h);
toku_pin_ftnode_off_client_thread(h, BP_BLOCKNUM(node, childnumb), childfullhash, &bfe, true, 2, dep_nodes, &childb);
toku_pin_ftnode_off_client_thread(h, BP_BLOCKNUM(node, childnumb), childfullhash, &bfe, PL_WRITE_EXPENSIVE, 2, dep_nodes, &childb);
}
if (toku_bnc_n_entries(BNC(node,childnuma))>0) {
......@@ -1486,7 +1486,7 @@ flush_some_child(
// Note that we don't read the entire node into memory yet.
// The idea is let's try to do the minimum work before releasing the parent lock
fill_bfe_for_min_read(&bfe, h);
toku_pin_ftnode_off_client_thread(h, targetchild, childfullhash, &bfe, true, 1, &parent, &child);
toku_pin_ftnode_off_client_thread(h, targetchild, childfullhash, &bfe, PL_WRITE_EXPENSIVE, 1, &parent, &child);
// for test
call_flusher_thread_callback(ft_flush_aflter_child_pin);
......@@ -1785,7 +1785,7 @@ flush_node_on_background_thread(FT h, FTNODE parent)
//
FTNODE child;
uint32_t childfullhash = compute_child_fullhash(h->cf, parent, childnum);
int r = toku_maybe_pin_ftnode_clean(h, BP_BLOCKNUM(parent, childnum), childfullhash, &child, true);
int r = toku_maybe_pin_ftnode_clean(h, BP_BLOCKNUM(parent, childnum), childfullhash, &child);
if (r != 0) {
// In this case, we could not lock the child, so just place the parent on the background thread
// In the callback, we will use flush_some_child, which checks to
......
......@@ -277,7 +277,7 @@ toku_ft_hot_optimize(FT_HANDLE brt,
(BLOCKNUM) root_key,
fullhash,
&bfe,
true,
PL_WRITE_EXPENSIVE,
0,
NULL,
&root);
......
......@@ -265,7 +265,13 @@ struct ftnode {
// macros for managing a node's clock
// Should be managed by ft-ops.c, NOT by serialize/deserialize
//
#define BP_TOUCH_CLOCK(node, i) ((node)->bp[i].clock_count = 1)
//
// BP_TOUCH_CLOCK uses a compare and swap because multiple threads
// that have a read lock on an internal node may try to touch the clock
// simultaneously
//
#define BP_TOUCH_CLOCK(node, i) ((void) __sync_val_compare_and_swap(&(node)->bp[i].clock_count, 0, 1))
#define BP_SWEEP_CLOCK(node, i) ((node)->bp[i].clock_count = 0)
#define BP_SHOULD_EVICT(node, i) ((node)->bp[i].clock_count == 0)
// not crazy about having these two here, one is for the case where we create new
......
......@@ -1443,9 +1443,9 @@ toku_ft_bn_apply_cmd_once (
}
}
if (workdone) { // test programs may call with NULL
*workdone += workdone_this_le;
if (*workdone > STATUS_VALUE(FT_MAX_WORKDONE))
STATUS_VALUE(FT_MAX_WORKDONE) = *workdone;
uint64_t new_workdone = __sync_add_and_fetch(workdone, workdone_this_le);
if (new_workdone > STATUS_VALUE(FT_MAX_WORKDONE))
STATUS_VALUE(FT_MAX_WORKDONE) = new_workdone;
}
// if we created a new mempool buffer, free the old one
......@@ -2511,7 +2511,7 @@ toku_ft_root_put_cmd (FT ft, FT_MSG_S * cmd)
root_key,
fullhash,
&bfe,
true, // may_modify_node
PL_WRITE_EXPENSIVE, // may_modify_node
0,
NULL,
&node
......@@ -4354,7 +4354,8 @@ struct unlock_ftnode_extra {
// When this is called, the cachetable lock is held
static void
unlock_ftnode_fun (void *v) {
struct unlock_ftnode_extra *CAST_FROM_VOIDP(x, v);
struct unlock_ftnode_extra *x = NULL;
CAST_FROM_VOIDP(x, v);
FT_HANDLE brt = x->ft_handle;
FTNODE node = x->node;
// CT lock is held
......@@ -4392,11 +4393,12 @@ ft_search_child(FT_HANDLE brt, FTNODE node, int childnum, ft_search_t *search, F
);
bool msgs_applied = false;
{
pair_lock_type lock_type = (node->height == 1) ? PL_WRITE_CHEAP : PL_READ;
int rr = toku_pin_ftnode_batched(brt, childblocknum, fullhash,
unlockers,
&next_ancestors, bounds,
&bfe,
(node->height == 1), // may_modify_node true iff child is leaf
lock_type, // may_modify_node true iff child is leaf
true,
(node->height == 1), // end_batch_on_success true iff child is a leaf
&childnode,
......@@ -4745,7 +4747,7 @@ try_again:
root_key,
fullhash,
&bfe,
false, // may_modify_node set to false, because root cannot change during search
PL_READ, // may_modify_node set to false, because root cannot change during search
0,
NULL,
&node
......@@ -5258,7 +5260,7 @@ toku_ft_keyrange_internal (FT_HANDLE brt, FTNODE node,
&next_ancestors,
bounds,
bfe,
false, // may_modify_node is false, because node guaranteed to not change
PL_READ, // may_modify_node is false, because node guaranteed to not change
false,
&childnode,
&msgs_applied
......@@ -5315,7 +5317,7 @@ try_again:
root_key,
fullhash,
&bfe,
false, // may_modify_node, cannot change root during keyrange
PL_READ, // may_modify_node, cannot change root during keyrange
0,
NULL,
&node
......@@ -5361,27 +5363,21 @@ static int
toku_dump_ftnode (FILE *file, FT_HANDLE brt, BLOCKNUM blocknum, int depth, const DBT *lorange, const DBT *hirange) {
int result=0;
FTNODE node;
void* node_v;
toku_get_node_for_verify(blocknum, brt, &node);
result=toku_verify_ftnode(brt, ZERO_MSN, ZERO_MSN, node, -1, lorange, hirange, NULL, NULL, 0, 1, 0);
uint32_t fullhash = toku_cachetable_hash(brt->ft->cf, blocknum);
struct ftnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, brt->ft);
int r = toku_cachetable_get_and_pin(
brt->ft->cf,
toku_pin_ftnode_off_client_thread(
brt->ft,
blocknum,
fullhash,
&node_v,
&bfe,
PL_WRITE_EXPENSIVE,
0,
NULL,
get_write_callbacks_for_node(brt->ft),
toku_ftnode_fetch_callback,
toku_ftnode_pf_req_callback,
toku_ftnode_pf_callback,
true, // may_modify_value, just safe to set to true, I think it could theoretically be false
&bfe
&node
);
assert_zero(r);
CAST_FROM_VOIDP(node, node_v);
assert(node->fullhash==fullhash);
fprintf(file, "%*sNode=%p\n", depth, "", node);
......@@ -5411,7 +5407,7 @@ toku_dump_ftnode (FILE *file, FT_HANDLE brt, BLOCKNUM blocknum, int depth, const
if (0)
for (int j=0; j<size; j++) {
OMTVALUE v = 0;
r = toku_omt_fetch(BLB_BUFFER(node, i), j, &v);
int r = toku_omt_fetch(BLB_BUFFER(node, i), j, &v);
assert_zero(r);
LEAFENTRY CAST_FROM_VOIDP(le, v);
fprintf(file, " [%d]=", j);
......@@ -5435,8 +5431,7 @@ toku_dump_ftnode (FILE *file, FT_HANDLE brt, BLOCKNUM blocknum, int depth, const
}
}
}
r = toku_cachetable_unpin(brt->ft->cf, node->ct_pair, CACHETABLE_CLEAN, make_ftnode_pair_attr(node));
assert_zero(r);
toku_unpin_ftnode_off_client_thread(brt->ft, node);
return result;
}
......@@ -5590,7 +5585,7 @@ static bool is_empty_fast_iter (FT_HANDLE brt, FTNODE node) {
childblocknum,
fullhash,
&bfe,
false, // may_modify_node set to false, as nodes not modified
PL_READ, // may_modify_node set to false, as nodes not modified
0,
NULL,
&childnode
......@@ -5631,7 +5626,7 @@ bool toku_ft_is_empty_fast (FT_HANDLE brt)
root_key,
fullhash,
&bfe,
false, // may_modify_node set to false, node does not change
PL_READ, // may_modify_node set to false, node does not change
0,
NULL,
&node
......
......@@ -169,7 +169,7 @@ toku_pin_node_with_min_bfe(FTNODE* node, BLOCKNUM b, FT_HANDLE t)
b,
toku_cachetable_hash(t->ft->cf, b),
&bfe,
true,
PL_WRITE_EXPENSIVE,
0,
NULL,
node
......
......@@ -234,7 +234,7 @@ toku_get_node_for_verify(
blocknum,
fullhash,
&bfe,
true, // may_modify_node
PL_WRITE_EXPENSIVE, // may_modify_node
0,
NULL,
nodep,
......@@ -446,15 +446,7 @@ toku_verify_ftnode (FT_HANDLE brt,
}
}
done:
{
int r = toku_cachetable_unpin(
brt->ft->cf,
node->ct_pair,
CACHETABLE_CLEAN,
make_ftnode_pair_attr(node)
);
assert_zero(r); // this is a bad failure if it happens.
}
toku_unpin_ftnode(brt->ft, node);
if (result == 0 && progress_callback)
result = progress_callback(progress_extra, 0.0);
......
......@@ -8,7 +8,9 @@
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include <toku_portability.h>
#include <valgrind/drd.h>
#include <stdint.h>
#include "memory.h"
#include "growable_array.h"
namespace toku {
......@@ -117,7 +119,10 @@ public:
}
inline uint32_t get_index(void) const {
return m_bitfield & MASK_INDEX;
TOKU_DRD_IGNORE_VAR(m_bitfield);
const uint32_t bits = m_bitfield;
TOKU_DRD_STOP_IGNORING_VAR(m_bitfield);
return bits & MASK_INDEX;
}
inline void set_index(uint32_t index) {
......@@ -126,11 +131,23 @@ public:
}
inline bool get_bit(void) const {
return (m_bitfield & MASK_BIT) != 0;
TOKU_DRD_IGNORE_VAR(m_bitfield);
const uint32_t bits = m_bitfield;
TOKU_DRD_STOP_IGNORING_VAR(m_bitfield);
return (bits & MASK_BIT) != 0;
}
inline void enable_bit(void) {
// These bits may be set by a thread with a write lock on some
// leaf, and the index can be read by another thread with a (read
// or write) lock on another thread. Also, the has_marks_below
// bit can be set by two threads simultaneously. Neither of these
// are real races, so if we are using DRD we should tell it to
// ignore these bits just while we set this bit. If there were a
// race in setting the index, that would be a real race.
TOKU_DRD_IGNORE_VAR(m_bitfield);
m_bitfield |= MASK_BIT;
TOKU_DRD_STOP_IGNORING_VAR(m_bitfield);
}
inline void disable_bit(void) {
......
......@@ -106,8 +106,12 @@ static void rollback_log_create (TOKUTXN txn, BLOCKNUM previous, uint32_t previo
void toku_rollback_log_unpin(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
int r;
CACHEFILE cf = txn->logger->rollback_cachefile;
r = toku_cachetable_unpin(cf, log->ct_pair,
(enum cachetable_dirty)log->dirty, rollback_memory_size(log));
r = toku_cachetable_unpin(
cf,
log->ct_pair,
(enum cachetable_dirty)log->dirty,
rollback_memory_size(log)
);
assert(r == 0);
}
......@@ -202,14 +206,15 @@ void toku_get_and_pin_rollback_log(TOKUTXN txn, BLOCKNUM blocknum, uint32_t hash
void * value;
CACHEFILE cf = txn->logger->rollback_cachefile;
FT CAST_FROM_VOIDP(h, toku_cachefile_get_userdata(cf));
int r = toku_cachetable_get_and_pin(cf, blocknum, hash,
int r = toku_cachetable_get_and_pin_with_dep_pairs(cf, blocknum, hash,
&value, NULL,
get_write_callbacks_for_rollback_log(h),
toku_rollback_fetch_callback,
toku_rollback_pf_req_callback,
toku_rollback_pf_callback,
true, // may_modify_value
h
PL_WRITE_EXPENSIVE, // lock_type
h,
0, NULL, NULL, NULL, NULL
);
assert(r == 0);
ROLLBACK_LOG_NODE CAST_FROM_VOIDP(pinned_log, value);
......
......@@ -152,6 +152,14 @@ static inline int rwlock_writers(RWLOCK rwlock) {
return rwlock->writer;
}
static inline bool rwlock_write_will_block(RWLOCK rwlock) {
return (rwlock->writer > 0 || rwlock->reader > 0);
}
static inline int rwlock_read_will_block(RWLOCK rwlock) {
return (rwlock->writer > 0 || rwlock->want_write > 0);
}
static inline void rwlock_wait_for_users(
RWLOCK rwlock,
toku_mutex_t *mutex
......
......@@ -72,7 +72,7 @@ run_test (void) {
def_fetch,
def_pf_req_callback,
def_pf_callback,
true,
PL_WRITE_EXPENSIVE,
NULL,
&foo
);
......
......@@ -17,7 +17,7 @@ static void *pin_nonblocking(void *arg) {
&v1,
&s1,
def_write_callback(NULL), def_fetch, def_pf_req_callback, def_pf_callback,
true,
PL_WRITE_EXPENSIVE,
NULL,
NULL
);
......
......@@ -17,7 +17,7 @@ static void *pin_nonblocking(void *arg) {
&v1,
&s1,
def_write_callback(NULL), def_fetch, def_pf_req_callback, def_pf_callback,
true,
PL_WRITE_EXPENSIVE,
NULL,
NULL
);
......
......@@ -56,12 +56,12 @@ cachetable_test (enum cachetable_dirty dirty, bool cloneable) {
// test that having a pin that passes false for may_modify_value does not stall behind checkpoint
CHECKPOINTER cp = toku_cachetable_get_checkpointer(ct);
r = toku_cachetable_begin_checkpoint(cp, NULL); assert_zero(r);
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, false, NULL, NULL);
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, PL_READ, NULL, NULL);
assert(r == 0);
r = toku_test_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, make_pair_attr(8));
assert(r == 0);
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, true, NULL, NULL);
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, PL_WRITE_EXPENSIVE, NULL, NULL);
if (dirty == CACHETABLE_DIRTY && !cloneable) {
assert(r == TOKUDB_TRY_AGAIN);
}
......
......@@ -102,7 +102,7 @@ static void cachetable_predef_fetch_maybegetandpin_test (void) {
// now verify that the block we are trying to evict is gone
wc = def_write_callback(NULL);
wc.flush_callback = flush;
r = toku_cachetable_get_and_pin_nonblocking(f1, key, fullhash, &v, &size, wc, def_fetch, def_pf_req_callback, def_pf_callback, true, NULL, NULL);
r = toku_cachetable_get_and_pin_nonblocking(f1, key, fullhash, &v, &size, wc, def_fetch, def_pf_req_callback, def_pf_callback, PL_WRITE_EXPENSIVE, NULL, NULL);
assert(r == TOKUDB_TRY_AGAIN);
r = toku_cachetable_get_and_pin(f1, key, fullhash, &v, &size, wc, def_fetch, def_pf_req_callback, def_pf_callback, true, NULL);
assert(r == 0 && v == 0 && size == 8);
......
......@@ -117,7 +117,7 @@ static void cachetable_prefetch_maybegetandpin_test (void) {
def_fetch,
def_pf_req_callback,
def_pf_callback,
true,
PL_WRITE_EXPENSIVE,
NULL,
NULL
);
......
......@@ -49,7 +49,7 @@ cachetable_test (enum pin_evictor_test_type test_type, bool nonblocking) {
if (test_type == pin_in_memory) {
old_num_ev_runs = evictor_test_helpers::get_num_eviction_runs(&ct->ev);
if (nonblocking) {
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, true, NULL, NULL);
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, PL_WRITE_EXPENSIVE, NULL, NULL);
assert_zero(r);
}
else {
......@@ -64,7 +64,7 @@ cachetable_test (enum pin_evictor_test_type test_type, bool nonblocking) {
else if (test_type == pin_fetch) {
old_num_ev_runs = evictor_test_helpers::get_num_eviction_runs(&ct->ev);
if (nonblocking) {
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(2), 2, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, true, NULL, NULL);
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(2), 2, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, PL_WRITE_EXPENSIVE, NULL, NULL);
assert(r == TOKUDB_TRY_AGAIN);
new_num_ev_runs = evictor_test_helpers::get_num_eviction_runs(&ct->ev);
assert(new_num_ev_runs > old_num_ev_runs);
......@@ -81,7 +81,7 @@ cachetable_test (enum pin_evictor_test_type test_type, bool nonblocking) {
else if (test_type == pin_partial_fetch) {
old_num_ev_runs = evictor_test_helpers::get_num_eviction_runs(&ct->ev);
if (nonblocking) {
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, pf_req_callback, pf_callback, true, NULL, NULL);
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, pf_req_callback, pf_callback, PL_WRITE_EXPENSIVE, NULL, NULL);
assert(r == TOKUDB_TRY_AGAIN);
new_num_ev_runs = evictor_test_helpers::get_num_eviction_runs(&ct->ev);
assert(new_num_ev_runs > old_num_ev_runs);
......
......@@ -149,7 +149,7 @@ static void *move_numbers(void *arg) {
&v1,
&s1,
wc, fetch, def_pf_req_callback, def_pf_callback,
true,
PL_WRITE_EXPENSIVE,
NULL,
0, //num_dependent_pairs
NULL,
......@@ -171,7 +171,7 @@ static void *move_numbers(void *arg) {
&v1,
&s1,
wc, fetch, def_pf_req_callback, def_pf_callback,
true,
PL_WRITE_EXPENSIVE,
NULL,
1, //num_dependent_pairs
&f1,
......@@ -205,7 +205,7 @@ static void *move_numbers(void *arg) {
&v1,
&s1,
wc, fetch, def_pf_req_callback, def_pf_callback,
true,
PL_WRITE_EXPENSIVE,
NULL,
1, //num_dependent_pairs
&f1,
......@@ -243,7 +243,7 @@ static void *read_random_numbers(void *arg) {
&v1,
&s1,
wc, fetch, def_pf_req_callback, def_pf_callback,
false,
PL_READ,
NULL,
NULL
);
......
......@@ -50,7 +50,7 @@ run_test (void) {
def_fetch,
def_pf_req_callback,
def_pf_callback,
true,
PL_WRITE_EXPENSIVE,
NULL,
NULL
);
......
......@@ -121,7 +121,7 @@ static void cachetable_prefetch_maybegetandpin_test (bool do_partial_fetch) {
void *v = 0;
long size = 0;
do_pf = false;
r = toku_cachetable_get_and_pin_nonblocking(f1, key, fullhash, &v, &size, wc, fetch, pf_req_callback, pf_callback, true, NULL, NULL);
r = toku_cachetable_get_and_pin_nonblocking(f1, key, fullhash, &v, &size, wc, fetch, pf_req_callback, pf_callback, PL_WRITE_EXPENSIVE, NULL, NULL);
assert(r==TOKUDB_TRY_AGAIN);
r = toku_cachetable_get_and_pin(f1, key, fullhash, &v, &size, wc, fetch, pf_req_callback, pf_callback, true, NULL);
assert(r == 0 && v == 0 && size == 2);
......
......@@ -143,7 +143,7 @@ static void move_number_to_child(
&v1,
&s1,
wc, fetch, def_pf_req_callback, def_pf_callback,
true,
PL_WRITE_EXPENSIVE,
NULL,
1, //num_dependent_pairs
&f1,
......@@ -190,7 +190,7 @@ static void *move_numbers(void *arg) {
&v1,
&s1,
wc, fetch, def_pf_req_callback, def_pf_callback,
true,
PL_WRITE_EXPENSIVE,
NULL,
0, //num_dependent_pairs
NULL,
......@@ -256,7 +256,7 @@ static void merge_and_split_child(
&v1,
&s1,
wc, fetch, def_pf_req_callback, def_pf_callback,
true,
PL_WRITE_EXPENSIVE,
NULL,
1, //num_dependent_pairs
&f1,
......@@ -290,7 +290,7 @@ static void merge_and_split_child(
&v1,
&s1,
wc, fetch, def_pf_req_callback, def_pf_callback,
true,
PL_WRITE_EXPENSIVE,
NULL,
2, //num_dependent_pairs
cfs,
......@@ -368,7 +368,7 @@ static void *merge_and_split(void *arg) {
&v1,
&s1,
wc, fetch, def_pf_req_callback, def_pf_callback,
true,
PL_WRITE_EXPENSIVE,
NULL,
0, //num_dependent_pairs
NULL,
......
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id: cachetable-simple-pin-nonblocking.cc 46977 2012-08-19 01:56:34Z zardosht $"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#include "includes.h"
#include "test.h"
bool pf_called;
static bool true_pf_req_callback(void* UU(ftnode_pv), void* UU(read_extraargs)) {
return true;
}
static int true_pf_callback(void* UU(ftnode_pv), void* UU(dd), void* UU(read_extraargs), int UU(fd), PAIR_ATTR* sizep) {
*sizep = make_pair_attr(9);
pf_called = true;
return 0;
}
static void kibbutz_work(void *fe_v)
{
CACHEFILE CAST_FROM_VOIDP(f1, fe_v);
sleep(2);
int r = toku_test_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, make_pair_attr(8));
assert(r==0);
remove_background_job_from_cf(f1);
}
static void
unlock_dummy (void* UU(v)) {
}
static void reset_unlockers(UNLOCKERS unlockers) {
unlockers->locked = true;
}
static void
run_test (pair_lock_type lock_type) {
const int test_limit = 12;
struct unlockers unlockers = {true, unlock_dummy, NULL, NULL};
int r;
CACHETABLE ct;
r = toku_create_cachetable(&ct, test_limit, ZERO_LSN, NULL_LOGGER); assert(r == 0);
char fname1[] = __SRCFILE__ "test1.dat";
unlink(fname1);
CACHEFILE f1;
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
void* v1;
long s1;
CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
r = toku_cachetable_get_and_pin_with_dep_pairs(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, lock_type, NULL, 0, NULL, NULL, NULL, NULL);
cachefile_kibbutz_enq(f1, kibbutz_work, f1);
reset_unlockers(&unlockers);
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, PL_WRITE_EXPENSIVE, NULL, &unlockers);
if (lock_type == PL_WRITE_EXPENSIVE) {
assert(r == TOKUDB_TRY_AGAIN); assert(!unlockers.locked);
}
else {
assert(r == 0); assert(unlockers.locked);
r = toku_test_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, make_pair_attr(8)); assert(r==0);
}
// now do the same test with a partial fetch required
pf_called = false;
r = toku_cachetable_get_and_pin_with_dep_pairs(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, true_pf_req_callback, true_pf_callback, lock_type, NULL, 0, NULL, NULL, NULL, NULL);
assert(pf_called);
cachefile_kibbutz_enq(f1, kibbutz_work, f1);
reset_unlockers(&unlockers);
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, PL_WRITE_EXPENSIVE, NULL, &unlockers);
if (lock_type == PL_WRITE_EXPENSIVE) {
assert(r == TOKUDB_TRY_AGAIN); assert(!unlockers.locked);
}
else {
assert(r == 0); assert(unlockers.locked);
r = toku_test_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, make_pair_attr(8)); assert(r==0);
}
toku_cachetable_verify(ct);
r = toku_cachefile_close(&f1, 0, false, ZERO_LSN);
assert(r == 0);
r = toku_cachetable_close(&ct); lazy_assert_zero(r);
}
int
test_main(int argc, const char *argv[]) {
default_parse_args(argc, argv);
run_test(PL_READ);
run_test(PL_WRITE_CHEAP);
run_test(PL_WRITE_EXPENSIVE);
return 0;
}
......@@ -124,7 +124,7 @@ cachetable_test (bool write_first, bool write_second, bool start_checkpoint) {
&v3,
&s3,
wc, fetch, def_pf_req_callback, def_pf_callback,
true,
PL_WRITE_EXPENSIVE,
&val3,
2, //num_dependent_pairs
dependent_cfs,
......
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id: cachetable-simple-pin-nonblocking.cc 46977 2012-08-19 01:56:34Z zardosht $"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#include "includes.h"
#include "test.h"
static void
flush (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
void *v __attribute__((__unused__)),
void** UU(dd),
void *e __attribute__((__unused__)),
PAIR_ATTR s __attribute__((__unused__)),
PAIR_ATTR* new_size __attribute__((__unused__)),
bool w __attribute__((__unused__)),
bool keep __attribute__((__unused__)),
bool c __attribute__((__unused__)),
bool UU(is_clone)
) {
if (w) {
assert(c);
assert(keep);
}
}
static void kibbutz_work(void *fe_v)
{
CACHEFILE CAST_FROM_VOIDP(f1, fe_v);
sleep(2);
int r = toku_test_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, make_pair_attr(8));
assert(r==0);
remove_background_job_from_cf(f1);
}
static void
unlock_dummy (void* UU(v)) {
}
static void reset_unlockers(UNLOCKERS unlockers) {
unlockers->locked = true;
}
static void
run_case_that_should_succeed(CACHEFILE f1, pair_lock_type first_lock, pair_lock_type second_lock) {
void* v1;
long s1;
CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
wc.flush_callback = flush;
struct unlockers unlockers = {true, unlock_dummy, NULL, NULL};
int r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, first_lock, NULL, NULL);
assert(r==0);
cachefile_kibbutz_enq(f1, kibbutz_work, f1);
reset_unlockers(&unlockers);
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, second_lock, NULL, &unlockers);
assert(r==0); assert(unlockers.locked);
r = toku_test_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, make_pair_attr(8)); assert(r==0);
}
static void
run_case_that_should_fail(CACHEFILE f1, pair_lock_type first_lock, pair_lock_type second_lock) {
void* v1;
long s1;
CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
wc.flush_callback = flush;
struct unlockers unlockers = {true, unlock_dummy, NULL, NULL};
int r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, first_lock, NULL, NULL);
assert(r==0);
cachefile_kibbutz_enq(f1, kibbutz_work, f1);
reset_unlockers(&unlockers);
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, second_lock, NULL, &unlockers);
assert(r == TOKUDB_TRY_AGAIN); assert(!unlockers.locked);
}
static void
run_test (void) {
const int test_limit = 12;
int r;
CACHETABLE ct;
r = toku_create_cachetable(&ct, test_limit, ZERO_LSN, NULL_LOGGER); assert(r == 0);
char fname1[] = __SRCFILE__ "test1.dat";
unlink(fname1);
CACHEFILE f1;
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
void* v1;
long s1;
CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
wc.flush_callback = flush;
//
// test that if we are getting a PAIR for the first time that TOKUDB_TRY_AGAIN is returned
// because the PAIR was not in the cachetable.
//
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, PL_WRITE_EXPENSIVE, NULL, NULL);
assert(r==TOKUDB_TRY_AGAIN);
run_case_that_should_succeed(f1, PL_READ, PL_WRITE_CHEAP);
run_case_that_should_succeed(f1, PL_READ, PL_WRITE_EXPENSIVE);
run_case_that_should_succeed(f1, PL_WRITE_CHEAP, PL_READ);
run_case_that_should_succeed(f1, PL_WRITE_CHEAP, PL_WRITE_CHEAP);
run_case_that_should_succeed(f1, PL_WRITE_CHEAP, PL_WRITE_EXPENSIVE);
run_case_that_should_fail(f1, PL_WRITE_EXPENSIVE, PL_READ);
run_case_that_should_fail(f1, PL_WRITE_EXPENSIVE, PL_WRITE_CHEAP);
run_case_that_should_fail(f1, PL_WRITE_EXPENSIVE, PL_WRITE_EXPENSIVE);
toku_cachetable_verify(ct);
r = toku_cachefile_close(&f1, 0, false, ZERO_LSN);
assert(r == 0);
r = toku_cachetable_close(&ct); lazy_assert_zero(r);
}
int
test_main(int argc, const char *argv[]) {
default_parse_args(argc, argv);
run_test();
return 0;
}
......@@ -76,15 +76,15 @@ run_test (void) {
// test that if we are getting a PAIR for the first time that TOKUDB_TRY_AGAIN is returned
// because the PAIR was not in the cachetable.
//
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, true, NULL, NULL);
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, PL_WRITE_EXPENSIVE, NULL, NULL);
assert(r==TOKUDB_TRY_AGAIN);
// now it should succeed
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, true, NULL, NULL);
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, PL_WRITE_EXPENSIVE, NULL, NULL);
assert(r==0);
foo = false;
cachefile_kibbutz_enq(f1, kibbutz_work, f1);
// because node is in use, should return TOKUDB_TRY_AGAIN
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, true, NULL, NULL);
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, PL_WRITE_EXPENSIVE, NULL, NULL);
assert(r==TOKUDB_TRY_AGAIN);
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, true, NULL);
assert(foo);
......@@ -92,24 +92,24 @@ run_test (void) {
// now make sure we get TOKUDB_TRY_AGAIN when a partial fetch is involved
// first make sure value is there
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, true, NULL, NULL);
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, PL_WRITE_EXPENSIVE, NULL, NULL);
assert(r==0);
r = toku_test_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, make_pair_attr(8)); assert(r==0);
// now make sure that we get TOKUDB_TRY_AGAIN for the partial fetch
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, true_def_pf_req_callback, true_def_pf_callback, true, NULL, NULL);
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, true_def_pf_req_callback, true_def_pf_callback, PL_WRITE_EXPENSIVE, NULL, NULL);
assert(r==TOKUDB_TRY_AGAIN);
//
// now test that if there is a checkpoint pending,
// first pin and unpin with dirty
//
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, true, NULL, NULL);
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, PL_WRITE_EXPENSIVE, NULL, NULL);
assert(r==0);
r = toku_test_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_DIRTY, make_pair_attr(8)); assert(r==0);
// this should mark the PAIR as pending
CHECKPOINTER cp = toku_cachetable_get_checkpointer(ct);
r = toku_cachetable_begin_checkpoint(cp, NULL); assert(r == 0);
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, true, NULL, NULL);
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, PL_WRITE_EXPENSIVE, NULL, NULL);
assert(r==TOKUDB_TRY_AGAIN);
r = toku_cachetable_end_checkpoint(
cp,
......
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id: cachetable-simple-pin.cc 46797 2012-08-15 01:56:49Z zardosht $"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#include "includes.h"
#include "test.h"
bool pf_called;
bool fetch_called;
CACHEFILE f1;
static int
sleep_fetch (CACHEFILE f __attribute__((__unused__)),
PAIR UU(p),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
uint32_t fullhash __attribute__((__unused__)),
void **value __attribute__((__unused__)),
void **dd __attribute__((__unused__)),
PAIR_ATTR *sizep __attribute__((__unused__)),
int *dirtyp,
void *extraargs __attribute__((__unused__))
) {
sleep(2);
*dirtyp = 0;
*value = NULL;
*sizep = make_pair_attr(8);
fetch_called = true;
return 0;
}
static bool sleep_pf_req_callback(void* UU(ftnode_pv), void* UU(read_extraargs)) {
return true;
}
static int sleep_pf_callback(void* UU(ftnode_pv), void* UU(disk_data), void* UU(read_extraargs), int UU(fd), PAIR_ATTR* sizep) {
sleep(2);
*sizep = make_pair_attr(8);
pf_called = true;
return 0;
}
static void *run_expensive_pf(void *arg) {
void* v1;
long s1;
CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
int r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, sleep_fetch, sleep_pf_req_callback, sleep_pf_callback, PL_READ, NULL, NULL);
assert(r == TOKUDB_TRY_AGAIN);
assert(pf_called);
return arg;
}
static void *run_expensive_fetch(void *arg) {
void* v1;
long s1;
CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
int r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, sleep_fetch, sleep_pf_req_callback, sleep_pf_callback, PL_READ, NULL, NULL);
assert(fetch_called);
assert(r == TOKUDB_TRY_AGAIN);
return arg;
}
static void
run_test (void) {
const int test_limit = 12;
int r;
void *ret;
CACHETABLE ct;
r = toku_create_cachetable(&ct, test_limit, ZERO_LSN, NULL_LOGGER); assert(r == 0);
char fname1[] = __SRCFILE__ "test1.dat";
unlink(fname1);
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
void* v1;
long s1;
CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
toku_pthread_t fetch_tid;
fetch_called = false;
r = toku_pthread_create(&fetch_tid, NULL, run_expensive_fetch, NULL);
sleep(1);
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, wc, sleep_fetch, def_pf_req_callback, def_pf_callback, false, NULL);
assert_zero(r);
assert(fetch_called);
r = toku_test_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, make_pair_attr(8));
assert(r==0);
r = toku_pthread_join(fetch_tid, &ret);
assert_zero(r);
// call with may_modify_node = false twice, make sure we can get it
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, sleep_fetch, def_pf_req_callback, def_pf_callback, PL_READ, NULL, NULL);
assert_zero(r);
r = toku_cachetable_get_and_pin_nonblocking(f1, make_blocknum(1), 1, &v1, &s1, wc, sleep_fetch, def_pf_req_callback, def_pf_callback, PL_READ, NULL, NULL);
assert_zero(r);
r = toku_test_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, make_pair_attr(8));
assert(r==0);
r = toku_test_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, make_pair_attr(8));
assert(r==0);
toku_pthread_t pf_tid;
pf_called = false;
r = toku_pthread_create(&pf_tid, NULL, run_expensive_pf, NULL);
sleep(1);
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, wc, sleep_fetch, def_pf_req_callback, def_pf_callback, false, NULL);
assert_zero(r);
assert(pf_called);
r = toku_test_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, make_pair_attr(8));
assert(r==0);
r = toku_pthread_join(pf_tid, &ret);
assert_zero(r);
toku_cachetable_verify(ct);
r = toku_cachefile_close(&f1, 0, false, ZERO_LSN); assert(r == 0);
r = toku_cachetable_close(&ct); lazy_assert_zero(r);
}
int
test_main(int argc, const char *argv[]) {
default_parse_args(argc, argv);
run_test();
return 0;
}
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id: cachetable-simple-pin.cc 46797 2012-08-15 01:56:49Z zardosht $"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#include "includes.h"
#include "test.h"
bool pf_called;
bool fetch_called;
CACHEFILE f1;
static int
sleep_fetch (CACHEFILE f __attribute__((__unused__)),
PAIR UU(p),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
uint32_t fullhash __attribute__((__unused__)),
void **value __attribute__((__unused__)),
void **dd __attribute__((__unused__)),
PAIR_ATTR *sizep __attribute__((__unused__)),
int *dirtyp,
void *extraargs __attribute__((__unused__))
) {
sleep(2);
*dirtyp = 0;
*value = NULL;
*sizep = make_pair_attr(8);
fetch_called = true;
return 0;
}
static bool sleep_pf_req_callback(void* UU(ftnode_pv), void* UU(read_extraargs)) {
return true;
}
static int sleep_pf_callback(void* UU(ftnode_pv), void* UU(disk_data), void* UU(read_extraargs), int UU(fd), PAIR_ATTR* sizep) {
sleep(2);
*sizep = make_pair_attr(8);
pf_called = true;
return 0;
}
static void *run_expensive_pf(void *arg) {
void* v1;
long s1;
CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
int r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, wc, sleep_fetch, sleep_pf_req_callback, sleep_pf_callback, false, NULL);
assert_zero(r);
assert(pf_called);
return arg;
}
static void *run_expensive_fetch(void *arg) {
void* v1;
long s1;
CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
int r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, wc, sleep_fetch, sleep_pf_req_callback, sleep_pf_callback, false, NULL);
assert_zero(r);
assert(fetch_called);
return arg;
}
static void
run_test (void) {
const int test_limit = 12;
int r;
void *ret;
CACHETABLE ct;
r = toku_create_cachetable(&ct, test_limit, ZERO_LSN, NULL_LOGGER); assert(r == 0);
char fname1[] = __SRCFILE__ "test1.dat";
unlink(fname1);
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
void* v1;
long s1;
CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
toku_pthread_t fetch_tid;
fetch_called = false;
r = toku_pthread_create(&fetch_tid, NULL, run_expensive_fetch, NULL);
sleep(1);
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, wc, sleep_fetch, def_pf_req_callback, def_pf_callback, false, NULL);
assert_zero(r);
assert(fetch_called);
r = toku_test_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, make_pair_attr(8));
assert(r==0);
r = toku_test_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, make_pair_attr(8));
assert(r==0);
r = toku_pthread_join(fetch_tid, &ret);
assert_zero(r);
// call with may_modify_node = false twice, make sure we can get it
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, wc, sleep_fetch, def_pf_req_callback, def_pf_callback, false, NULL);
assert_zero(r);
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, wc, sleep_fetch, def_pf_req_callback, def_pf_callback, false, NULL);
assert_zero(r);
r = toku_test_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, make_pair_attr(8));
assert(r==0);
r = toku_test_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, make_pair_attr(8));
assert(r==0);
toku_pthread_t pf_tid;
pf_called = false;
r = toku_pthread_create(&pf_tid, NULL, run_expensive_pf, NULL);
sleep(1);
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, wc, sleep_fetch, def_pf_req_callback, def_pf_callback, false, NULL);
assert_zero(r);
assert(pf_called);
r = toku_test_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, make_pair_attr(8));
assert(r==0);
r = toku_test_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, make_pair_attr(8));
assert(r==0);
r = toku_pthread_join(pf_tid, &ret);
assert_zero(r);
toku_cachetable_verify(ct);
r = toku_cachefile_close(&f1, 0, false, ZERO_LSN); assert(r == 0);
r = toku_cachetable_close(&ct); lazy_assert_zero(r);
}
int
test_main(int argc, const char *argv[]) {
default_parse_args(argc, argv);
run_test();
return 0;
}
......@@ -64,233 +64,14 @@ struct item {
};
static volatile int expect_n_flushes=0;
static CACHEKEY flushes[100];
static void expect_init(void) {
test_mutex_lock();
expect_n_flushes = 0;
test_mutex_unlock();
}
static void expect1(int64_t blocknum_n) {
test_mutex_lock();
expect_n_flushes=1;
flushes[0].b=blocknum_n;
//if (verbose) printf("%s:%d %lld\n", __FUNCTION__, 0, key.b);
test_mutex_unlock();
}
static void expectN(int64_t blocknum_n) {
test_mutex_lock();
//if (verbose) printf("%s:%d %lld\n", __FUNCTION__, expect_n_flushes, key);
flushes[expect_n_flushes++].b=blocknum_n;
test_mutex_unlock();
}
static CACHEFILE expect_f;
static void flush (CACHEFILE f,
int UU(fd),
CACHEKEY key,
void*value,
void** UU(dd),
void *extra __attribute__((__unused__)),
PAIR_ATTR size __attribute__((__unused__)),
PAIR_ATTR* new_size __attribute__((__unused__)),
bool write_me __attribute__((__unused__)),
bool keep_me __attribute__((__unused__)),
bool for_checkpoint __attribute__((__unused__)),
bool UU(is_clone)
) {
struct item *CAST_FROM_VOIDP(it, value);
int i;
if (keep_me) return;
if (verbose) printf("Flushing %" PRId64 " (it=>key=%" PRId64 ")\n", key.b, it->key.b);
test_mutex_lock();
if (write_me) assert(expect_f==f);
assert(strcmp(it->something,"something")==0);
assert(it->key.b==key.b);
/* Verify that we expected the flush. */
for (i=0; i<expect_n_flushes; i++) {
if (key.b==flushes[i].b) {
flushes[i] = flushes[expect_n_flushes-1];
expect_n_flushes--;
goto found_flush;
}
}
fprintf(stderr, "%" PRId64 " was flushed, but I didn't expect it\n", key.b);
abort();
found_flush:
test_mutex_unlock();
toku_free(value);
}
static struct item *make_item (uint64_t key) {
struct item *MALLOC(it);
it->key.b=key;
it->something="something";
return it;
}
static CACHEKEY did_fetch={-1};
static int fetch (CACHEFILE f, PAIR UU(p), int UU(fd), CACHEKEY key, uint32_t fullhash __attribute__((__unused__)), void**value, void** UU(dd), PAIR_ATTR *sizep __attribute__((__unused__)), int *dirtyp, void*extraargs) {
if (verbose) printf("Fetch %" PRId64 "\n", key.b);
assert (expect_f==f);
assert((long)extraargs==23);
*value = make_item(key.b);
*sizep = make_pair_attr(test_object_size);
*dirtyp = 0;
did_fetch=key;
return 0;
}
static void maybe_flush(CACHETABLE t) {
toku_cachetable_maybe_flush_some(t);
}
// verify that a sequence of cachetable operations causes a particular sequence of
// callbacks
static void test0 (void) {
void* t3=(void*)23;
CACHETABLE t;
CACHEFILE f;
int r;
char fname[] = __SRCFILE__ "test.dat";
r=toku_create_cachetable(&t, 5, ZERO_LSN, NULL_LOGGER);
assert(r==0);
unlink(fname);
r = toku_cachetable_openf(&f, t, fname, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO);
assert(r==0);
TOKULOGGER logger = toku_cachefile_logger(f);
assert(logger == NULL_LOGGER);
expect_f = f;
expect_n_flushes=0;
uint32_t h1 = toku_cachetable_hash(f, make_blocknum(1));
uint32_t h2 = toku_cachetable_hash(f, make_blocknum(2));
uint32_t h3 = toku_cachetable_hash(f, make_blocknum(3));
uint32_t h4 = toku_cachetable_hash(f, make_blocknum(4));
uint32_t h5 = toku_cachetable_hash(f, make_blocknum(5));
uint32_t h6 = toku_cachetable_hash(f, make_blocknum(6));
uint32_t h7 = toku_cachetable_hash(f, make_blocknum(7));
CACHETABLE_WRITE_CALLBACK wc = def_write_callback(t3);
wc.flush_callback = flush;
r=toku_cachetable_put(f, make_blocknum(1), h1, make_item(1), make_pair_attr(test_object_size), wc, put_callback_nop); /* 1P */ /* this is the lru list. 1 is pinned. */
assert(r==0);
assert(expect_n_flushes==0);
expect_init();
r=toku_cachetable_put(f, make_blocknum(2), h2, make_item(2), make_pair_attr(test_object_size), wc, put_callback_nop);
assert(r==0);
r=toku_test_cachetable_unpin(f, make_blocknum(2), h2, CACHETABLE_DIRTY, make_pair_attr(1)); /* 2U 1P */
assert(expect_n_flushes==0);
expect_init();
r=toku_cachetable_put(f, make_blocknum(3), h3, make_item(3), make_pair_attr(test_object_size), wc, put_callback_nop);
assert(r==0);
assert(expect_n_flushes==0); /* 3P 2U 1P */ /* 3 is most recently used (pinned), 2 is next (unpinned), 1 is least recent (pinned) */
expect_init();
r=toku_cachetable_put(f, make_blocknum(4), h4, make_item(4), make_pair_attr(test_object_size), wc, put_callback_nop);
assert(r==0);
assert(expect_n_flushes==0); /* 4P 3P 2U 1P */
expect_init();
r=toku_cachetable_put(f, make_blocknum(5), h5, make_item(5), make_pair_attr(test_object_size), wc, put_callback_nop);
assert(r==0);
r=toku_test_cachetable_unpin(f, make_blocknum(5), h5, CACHETABLE_DIRTY, make_pair_attr(test_object_size));
assert(r==0);
r=toku_test_cachetable_unpin(f, make_blocknum(3), h3, CACHETABLE_DIRTY, make_pair_attr(test_object_size));
assert(r==0);
assert(expect_n_flushes==0); /* 5U 4P 3U 2U 1P */
expect1(2); /* 2 is the oldest unpinned item. */
r=toku_cachetable_put(f, make_blocknum(6), h6, make_item(6), make_pair_attr(test_object_size), wc, put_callback_nop); /* 6P 5U 4P 3U 1P */
assert(r==0);
test_mutex_lock();
while (expect_n_flushes != 0) {
test_mutex_unlock(); toku_pthread_yield(); maybe_flush(t); test_mutex_lock();
}
assert(expect_n_flushes==0);
test_mutex_unlock();
expect1(3);
r=toku_cachetable_put(f, make_blocknum(7), h7, make_item(7), make_pair_attr(test_object_size), wc, put_callback_nop);
assert(r==0);
test_mutex_lock();
while (expect_n_flushes != 0) {
test_mutex_unlock(); toku_pthread_yield(); maybe_flush(t); test_mutex_lock();
}
assert(expect_n_flushes==0);
test_mutex_unlock();
r=toku_test_cachetable_unpin(f, make_blocknum(7), h7, CACHETABLE_DIRTY, make_pair_attr(test_object_size)); /* 7U 6P 5U 4P 1P */
assert(r==0);
{
void *item_v=0;
expect_init();
r=toku_cachetable_get_and_pin(f, make_blocknum(5), toku_cachetable_hash(f, make_blocknum(5)), &item_v, NULL, wc, fetch, def_pf_req_callback, def_pf_callback, true, t3); /* 5P 7U 6P 4P 1P */
assert(r==0);
assert(((struct item *)item_v)->key.b==5);
assert(strcmp(((struct item *)item_v)->something,"something")==0);
test_mutex_lock();
assert(expect_n_flushes==0);
test_mutex_unlock();
}
{
void *item_v=0;
r=toku_test_cachetable_unpin(f, make_blocknum(4), h4, CACHETABLE_DIRTY, make_pair_attr(test_object_size));
assert(r==0);
expect1(4);
did_fetch=make_blocknum(-1);
CACHETABLE_WRITE_CALLBACK wc2 = def_write_callback(t3);
wc2.flush_callback = flush;
r=toku_cachetable_get_and_pin(f, make_blocknum(2), toku_cachetable_hash(f, make_blocknum(2)), &item_v, NULL, wc2, fetch, def_pf_req_callback, def_pf_callback, true, t3); /* 2p 5P 7U 6P 1P */
assert(r==0);
assert(did_fetch.b==2); /* Expect that 2 is fetched in. */
assert(((struct item *)item_v)->key.b==2);
assert(strcmp(((struct item *)item_v)->something,"something")==0);
test_mutex_lock();
while (expect_n_flushes != 0) {
test_mutex_unlock(); toku_pthread_yield(); maybe_flush(t); test_mutex_lock();
}
assert(expect_n_flushes==0);
test_mutex_unlock();
}
r=toku_test_cachetable_unpin(f, make_blocknum(2), h2, CACHETABLE_DIRTY, make_pair_attr(test_object_size));
assert(r==0);
r=toku_test_cachetable_unpin(f, make_blocknum(5), h5, CACHETABLE_DIRTY, make_pair_attr(test_object_size));
assert(r==0);
r=toku_test_cachetable_unpin(f, make_blocknum(6), h6, CACHETABLE_DIRTY, make_pair_attr(test_object_size));
assert(r==0);
r=toku_test_cachetable_unpin(f, make_blocknum(1), h1, CACHETABLE_DIRTY, make_pair_attr(test_object_size));
assert(r==0);
r=toku_cachetable_assert_all_unpinned(t);
assert(r==0);
if (verbose) printf("Closing\n");
expect1(2);
expectN(5);
expectN(7);
expectN(6);
expectN(1);
r=toku_cachefile_close(&f, 0, false, ZERO_LSN);
assert(r==0);
r=toku_cachetable_close(&t);
assert(r==0);
assert(expect_n_flushes==0);
expect_f = 0;
}
static void flush_n (CACHEFILE f __attribute__((__unused__)), int UU(fd), CACHEKEY key __attribute__((__unused__)),
void *value,
......@@ -767,7 +548,6 @@ test_main (int argc, const char *argv[]) {
if (do_malloc_fail)
test_cachetable_create_no_memory(); // fails with valgrind
for (i=0; i<1; i++) {
test0();
test_nested_pin();
#if !TOKU_WINDOWS
test_multi_filehandles ();
......
/* Fair readers writer lock implemented using condition variables.
* This is maintained so that we can measure the performance of a relatively simple implementation (this one)
* compared to a fast one that uses compare-and-swap (the one in ../toku_rwlock.c)
* For now it's only for testing.
*/
#ident "$Id$"
#ident "Copyright (c) 2010 Tokutek Inc. All rights reserved."
// Fair readers/writer locks. These are fair (meaning first-come first-served. No reader starvation, and no writer starvation). And they are
// probably faster than the linux readers/writer locks (pthread_rwlock_t).
struct toku_cv_fair_rwlock_waiter_state; // this structure is used internally.
typedef struct toku_cv_fair_rwlock_s {
toku_mutex_t mutex;
int state; // 0 means no locks, + is number of readers locked, -1 is a writer
struct toku_cv_fair_rwlock_waiter_state *waiters_head, *waiters_tail;
} toku_cv_fair_rwlock_t;
void toku_cv_fair_rwlock_init (toku_cv_fair_rwlock_t *rwlock);
void toku_cv_fair_rwlock_destroy (toku_cv_fair_rwlock_t *rwlock);
int toku_cv_fair_rwlock_rdlock (toku_cv_fair_rwlock_t *rwlock);
int toku_cv_fair_rwlock_wrlock (toku_cv_fair_rwlock_t *rwlock);
int toku_cv_fair_rwlock_unlock (toku_cv_fair_rwlock_t *rwlock);
struct toku_cv_fair_rwlock_waiter_state {
char is_read;
struct toku_cv_fair_rwlock_waiter_state *next;
toku_cond_t cond;
};
static __thread struct toku_cv_fair_rwlock_waiter_state waitstate = {0, NULL, {PTHREAD_COND_INITIALIZER} };
void toku_cv_fair_rwlock_init (toku_cv_fair_rwlock_t *rwlock) {
rwlock->state=0;
rwlock->waiters_head = NULL;
rwlock->waiters_tail = NULL;
toku_mutex_init(&rwlock->mutex, NULL);
}
void toku_cv_fair_rwlock_destroy (toku_cv_fair_rwlock_t *rwlock) {
toku_mutex_destroy(&rwlock->mutex);
}
int toku_cv_fair_rwlock_rdlock (toku_cv_fair_rwlock_t *rwlock) {
toku_mutex_lock(&rwlock->mutex);
if (rwlock->waiters_head!=NULL || rwlock->state<0) {
// Someone is ahead of me in the queue, or someone has a lock.
// We use per-thread-state for the condition variable. A thread cannot get control and try to reuse the waiter state for something else.
if (rwlock->waiters_tail) {
rwlock->waiters_tail->next = &waitstate;
} else {
rwlock->waiters_head = &waitstate;
}
rwlock->waiters_tail = &waitstate;
waitstate.next = NULL;
waitstate.is_read = 1;
do {
toku_cond_wait(&waitstate.cond, &rwlock->mutex);
} while (rwlock->waiters_head!=&waitstate || rwlock->state<0);
rwlock->state++;
rwlock->waiters_head=waitstate.next;
if (waitstate.next==NULL) rwlock->waiters_tail=NULL;
if (rwlock->waiters_head && rwlock->waiters_head->is_read) {
toku_cond_signal(&rwlock->waiters_head->cond);
}
} else {
// No one is waiting, and any holders are readers.
rwlock->state++;
}
toku_mutex_unlock(&rwlock->mutex);
return 0;
}
int toku_cv_fair_rwlock_wrlock (toku_cv_fair_rwlock_t *rwlock) {
toku_mutex_lock(&rwlock->mutex);
if (rwlock->waiters_head!=NULL || rwlock->state!=0) {
// Someone else is ahead of me, or someone has a lock the lock, so we must wait our turn.
if (rwlock->waiters_tail) {
rwlock->waiters_tail->next = &waitstate;
} else {
rwlock->waiters_head = &waitstate;
}
rwlock->waiters_tail = &waitstate;
waitstate.next = NULL;
waitstate.is_read = 0;
do {
toku_cond_wait(&waitstate.cond, &rwlock->mutex);
} while (rwlock->waiters_head!=&waitstate || rwlock->state!=0);
rwlock->waiters_head = waitstate.next;
if (waitstate.next==NULL) rwlock->waiters_tail=NULL;
}
rwlock->state = -1;
toku_mutex_unlock(&rwlock->mutex);
return 0;
}
int toku_cv_fair_rwlock_unlock (toku_cv_fair_rwlock_t *rwlock) {
toku_mutex_lock(&rwlock->mutex);
assert(rwlock->state!=0);
if (rwlock->state>0) {
rwlock->state--;
} else {
rwlock->state=0;
}
if (rwlock->state==0 && rwlock->waiters_head) {
toku_cond_signal(&rwlock->waiters_head->cond);
} else {
// printf(" No one to wake\n");
}
toku_mutex_unlock(&rwlock->mutex);
return 0;
}
......@@ -150,7 +150,7 @@ doit (bool after_child_pin) {
node_root,
toku_cachetable_hash(t->ft->cf, node_root),
&bfe,
true,
PL_WRITE_EXPENSIVE,
0,
NULL,
&node
......@@ -169,7 +169,7 @@ doit (bool after_child_pin) {
node_root,
toku_cachetable_hash(t->ft->cf, node_root),
&bfe,
true,
PL_WRITE_EXPENSIVE,
0,
NULL,
&node
......@@ -206,7 +206,7 @@ doit (bool after_child_pin) {
node_root,
toku_cachetable_hash(c_ft->ft->cf, node_root),
&bfe,
true,
PL_WRITE_EXPENSIVE,
0,
NULL,
&node
......@@ -227,7 +227,7 @@ doit (bool after_child_pin) {
node_leaf,
toku_cachetable_hash(c_ft->ft->cf, node_root),
&bfe,
true,
PL_WRITE_EXPENSIVE,
0,
NULL,
&node
......
......@@ -168,7 +168,7 @@ doit (int state) {
node_root,
toku_cachetable_hash(t->ft->cf, node_root),
&bfe,
true,
PL_WRITE_EXPENSIVE,
0,
NULL,
&node
......@@ -186,7 +186,7 @@ doit (int state) {
node_root,
toku_cachetable_hash(t->ft->cf, node_root),
&bfe,
true,
PL_WRITE_EXPENSIVE,
0,
NULL,
&node
......@@ -225,7 +225,7 @@ doit (int state) {
node_root,
toku_cachetable_hash(c_ft->ft->cf, node_root),
&bfe,
true,
PL_WRITE_EXPENSIVE,
0,
NULL,
&node
......@@ -255,7 +255,7 @@ doit (int state) {
left_child,
toku_cachetable_hash(c_ft->ft->cf, left_child),
&bfe,
true,
PL_WRITE_EXPENSIVE,
0,
NULL,
&node
......@@ -271,7 +271,7 @@ doit (int state) {
right_child,
toku_cachetable_hash(c_ft->ft->cf, right_child),
&bfe,
true,
PL_WRITE_EXPENSIVE,
0,
NULL,
&node
......@@ -288,7 +288,7 @@ doit (int state) {
left_child,
toku_cachetable_hash(c_ft->ft->cf, left_child),
&bfe,
true,
PL_WRITE_EXPENSIVE,
0,
NULL,
&node
......
......@@ -188,7 +188,7 @@ doit (int state) {
node_root,
toku_cachetable_hash(t->ft->cf, node_root),
&bfe,
true,
PL_WRITE_EXPENSIVE,
0,
NULL,
&node
......@@ -206,7 +206,7 @@ doit (int state) {
node_root,
toku_cachetable_hash(t->ft->cf, node_root),
&bfe,
true,
PL_WRITE_EXPENSIVE,
0,
NULL,
&node
......@@ -245,7 +245,7 @@ doit (int state) {
node_root,
toku_cachetable_hash(c_ft->ft->cf, node_root),
&bfe,
true,
PL_WRITE_EXPENSIVE,
0,
NULL,
&node
......@@ -266,7 +266,7 @@ doit (int state) {
left_child,
toku_cachetable_hash(c_ft->ft->cf, left_child),
&bfe,
true,
PL_WRITE_EXPENSIVE,
0,
NULL,
&node
......@@ -282,7 +282,7 @@ doit (int state) {
right_child,
toku_cachetable_hash(c_ft->ft->cf, right_child),
&bfe,
true,
PL_WRITE_EXPENSIVE,
0,
NULL,
&node
......
......@@ -164,7 +164,7 @@ doit (bool after_split) {
node_root,
toku_cachetable_hash(t->ft->cf, node_root),
&bfe,
true,
PL_WRITE_EXPENSIVE,
0,
NULL,
&node
......@@ -182,7 +182,7 @@ doit (bool after_split) {
node_root,
toku_cachetable_hash(t->ft->cf, node_root),
&bfe,
true,
PL_WRITE_EXPENSIVE,
0,
NULL,
&node
......@@ -221,7 +221,7 @@ doit (bool after_split) {
node_root,
toku_cachetable_hash(c_ft->ft->cf, node_root),
&bfe,
true,
PL_WRITE_EXPENSIVE,
0,
NULL,
&node
......@@ -249,7 +249,7 @@ doit (bool after_split) {
left_child,
toku_cachetable_hash(c_ft->ft->cf, left_child),
&bfe,
true,
PL_WRITE_EXPENSIVE,
0,
NULL,
&node
......@@ -265,7 +265,7 @@ doit (bool after_split) {
right_child,
toku_cachetable_hash(c_ft->ft->cf, right_child),
&bfe,
true,
PL_WRITE_EXPENSIVE,
0,
NULL,
&node
......@@ -282,7 +282,7 @@ doit (bool after_split) {
left_child,
toku_cachetable_hash(c_ft->ft->cf, left_child),
&bfe,
true,
PL_WRITE_EXPENSIVE,
0,
NULL,
&node
......
......@@ -166,7 +166,7 @@ doit (void) {
node_leaf,
toku_cachetable_hash(brt->ft->cf, node_leaf),
&bfe,
true,
PL_WRITE_EXPENSIVE,
0,
NULL,
&node
......@@ -195,7 +195,7 @@ doit (void) {
node_leaf,
toku_cachetable_hash(brt->ft->cf, node_leaf),
&bfe,
true,
PL_WRITE_EXPENSIVE,
0,
NULL,
&node
......@@ -215,7 +215,7 @@ doit (void) {
node_internal,
toku_cachetable_hash(brt->ft->cf, node_internal),
&bfe,
true,
PL_WRITE_EXPENSIVE,
0,
NULL,
&node
......@@ -239,7 +239,7 @@ doit (void) {
node_internal,
toku_cachetable_hash(brt->ft->cf, node_internal),
&bfe,
true,
PL_WRITE_EXPENSIVE,
0,
NULL,
&node
......
......@@ -172,7 +172,7 @@ doit (bool keep_other_bn_in_memory) {
node_leaf,
toku_cachetable_hash(brt->ft->cf, node_leaf),
&bfe,
true,
PL_WRITE_EXPENSIVE,
0,
NULL,
&node
......@@ -220,7 +220,7 @@ doit (bool keep_other_bn_in_memory) {
node_leaf,
toku_cachetable_hash(brt->ft->cf, node_leaf),
&bfe,
true,
PL_WRITE_EXPENSIVE,
0,
NULL,
&node
......@@ -245,7 +245,7 @@ doit (bool keep_other_bn_in_memory) {
node_internal,
toku_cachetable_hash(brt->ft->cf, node_internal),
&bfe,
true,
PL_WRITE_EXPENSIVE,
0,
NULL,
&node
......@@ -269,7 +269,7 @@ doit (bool keep_other_bn_in_memory) {
node_internal,
toku_cachetable_hash(brt->ft->cf, node_internal),
&bfe,
true,
PL_WRITE_EXPENSIVE,
0,
NULL,
&node
......
......@@ -158,7 +158,7 @@ doit (void) {
node_internal,
toku_cachetable_hash(brt->ft->cf, node_internal),
&bfe,
true,
PL_WRITE_EXPENSIVE,
0,
NULL,
&node
......@@ -181,7 +181,7 @@ doit (void) {
node_internal,
toku_cachetable_hash(brt->ft->cf, node_internal),
&bfe,
true,
PL_WRITE_EXPENSIVE,
0,
NULL,
&node
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id: test-rwlock.cc 46971 2012-08-18 22:03:43Z zardosht $"
#ident "Copyright (c) 2010 Tokutek Inc. All rights reserved."
#include <toku_pthread.h>
#include <toku_portability.h>
#include <toku_time.h>
#include <toku_assert.h>
#include <toku_portability.h>
#include <sys/time.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include "rwlock.h"
#include <sys/types.h>
#include "rwlock_condvar.h"
#include "toku_fair_rwlock.h"
#include "frwlock.h"
toku_mutex_t mutex;
toku::frwlock w;
static void grab_write_lock(bool expensive) {
toku_mutex_lock(&mutex);
w.write_lock(expensive);
toku_mutex_unlock(&mutex);
}
static void release_write_lock(void) {
toku_mutex_lock(&mutex);
w.write_unlock();
toku_mutex_unlock(&mutex);
}
static void grab_read_lock(void) {
toku_mutex_lock(&mutex);
w.read_lock();
toku_mutex_unlock(&mutex);
}
static void release_read_lock(void) {
toku_mutex_lock(&mutex);
w.read_unlock();
toku_mutex_unlock(&mutex);
}
static void *do_cheap_wait(void *arg) {
grab_write_lock(false);
release_write_lock();
return arg;
}
static void *do_expensive_wait(void *arg) {
grab_write_lock(true);
release_write_lock();
return arg;
}
static void *do_read_wait(void *arg) {
grab_read_lock();
release_read_lock();
return arg;
}
static void launch_cheap_waiter(void) {
toku_pthread_t tid;
int r = toku_pthread_create(&tid, NULL, do_cheap_wait, NULL);
assert_zero(r);
toku_pthread_detach(tid);
sleep(1);
}
static void launch_expensive_waiter(void) {
toku_pthread_t tid;
int r = toku_pthread_create(&tid, NULL, do_expensive_wait, NULL);
assert_zero(r);
toku_pthread_detach(tid);
sleep(1);
}
static void launch_reader(void) {
toku_pthread_t tid;
int r = toku_pthread_create(&tid, NULL, do_read_wait, NULL);
assert_zero(r);
toku_pthread_detach(tid);
sleep(1);
}
static void test_write_cheapness(void) {
toku_mutex_init(&mutex, NULL);
w.init(&mutex);
// single expensive write lock
grab_write_lock(true);
assert(w.write_lock_is_expensive());
assert(w.read_lock_is_expensive());
release_write_lock();
assert(!w.write_lock_is_expensive());
assert(!w.read_lock_is_expensive());
// single cheap write lock
grab_write_lock(false);
assert(!w.write_lock_is_expensive());
assert(!w.read_lock_is_expensive());
release_write_lock();
assert(!w.write_lock_is_expensive());
assert(!w.read_lock_is_expensive());
// multiple read locks
grab_read_lock();
assert(!w.write_lock_is_expensive());
assert(!w.read_lock_is_expensive());
grab_read_lock();
grab_read_lock();
assert(!w.write_lock_is_expensive());
assert(!w.read_lock_is_expensive());
release_read_lock();
release_read_lock();
release_read_lock();
assert(!w.write_lock_is_expensive());
assert(!w.read_lock_is_expensive());
// expensive write lock and cheap writers waiting
grab_write_lock(true);
launch_cheap_waiter();
assert(w.write_lock_is_expensive());
assert(w.read_lock_is_expensive());
launch_cheap_waiter();
launch_cheap_waiter();
assert(w.write_lock_is_expensive());
assert(w.read_lock_is_expensive());
release_write_lock();
sleep(1);
assert(!w.write_lock_is_expensive());
assert(!w.read_lock_is_expensive());
// cheap write lock and expensive writer waiter
grab_write_lock(false);
launch_expensive_waiter();
assert(w.write_lock_is_expensive());
assert(w.read_lock_is_expensive());
release_write_lock();
sleep(1);
// expensive write lock and expensive waiter
grab_write_lock(true);
launch_expensive_waiter();
assert(w.write_lock_is_expensive());
assert(w.read_lock_is_expensive());
release_write_lock();
sleep(1);
// cheap write lock and cheap waiter
grab_write_lock(false);
launch_cheap_waiter();
assert(!w.write_lock_is_expensive());
assert(!w.read_lock_is_expensive());
release_write_lock();
sleep(1);
// read lock held and cheap waiter
grab_read_lock();
launch_cheap_waiter();
assert(!w.write_lock_is_expensive());
assert(!w.read_lock_is_expensive());
// add expensive waiter
launch_expensive_waiter();
assert(w.write_lock_is_expensive());
assert(w.read_lock_is_expensive());
release_read_lock();
sleep(1);
// read lock held and expensive waiter
grab_read_lock();
launch_expensive_waiter();
assert(w.write_lock_is_expensive());
assert(w.read_lock_is_expensive());
// add expensive waiter
launch_cheap_waiter();
assert(w.write_lock_is_expensive());
assert(w.read_lock_is_expensive());
release_read_lock();
sleep(1);
// cheap write lock held and waiting read
grab_write_lock(false);
launch_reader();
assert(!w.write_lock_is_expensive());
assert(!w.read_lock_is_expensive());
launch_expensive_waiter();
assert(w.write_lock_is_expensive());
// tricky case here, because we have a launched reader
// that should be in the queue, a new read lock
// should piggy back off that
assert(!w.read_lock_is_expensive());
release_write_lock();
sleep(1);
// expensive write lock held and waiting read
grab_write_lock(true);
launch_reader();
assert(w.write_lock_is_expensive());
assert(w.read_lock_is_expensive());
launch_cheap_waiter();
assert(w.write_lock_is_expensive());
assert(w.read_lock_is_expensive());
release_write_lock();
sleep(1);
w.deinit();
toku_mutex_destroy(&mutex);
}
int main (int UU(argc), const char* UU(argv[])) {
test_write_cheapness();
return 0;
}
This diff is collapsed.
......@@ -77,7 +77,7 @@ doit (void) {
node_internal,
toku_cachetable_hash(t->ft->cf, node_internal),
&bfe,
true,
PL_WRITE_EXPENSIVE,
0,
NULL,
&node
......
......@@ -39,6 +39,7 @@
#include <stdlib.h>
#include <errno.h>
#include "../../ft/rwlock.h"
#include "../../ft/frwlock.h"
#include "toku_fair_rwlock.h"
#include <sys/types.h>
......@@ -311,6 +312,10 @@ void time_toku_cv_fair_rwlock (void) {
#define N_LOG_ENTRIES (L*N*4)
static toku_fair_rwlock_t rwlock;
static toku::frwlock frwlock;
static toku_mutex_t fmutex;
static bool use_frwlock_for_locking;
static struct log_s {
int threadid, loopid;
......@@ -344,24 +349,44 @@ static void logit (int threadid, int loopid, char action) {
static void grab_rdlock (int threadid, int iteration) {
logit(threadid, iteration, 't');
{ int r = toku_fair_rwlock_rdlock(&rwlock); assert(r==0); }
if (use_frwlock_for_locking) {
toku_mutex_lock(&fmutex);
frwlock.read_lock();
toku_mutex_unlock(&fmutex);
}
else { int r = toku_fair_rwlock_rdlock(&rwlock); assert(r==0); }
logit(threadid, iteration, 'R');
}
static void release_rdlock (int threadid, int iteration) {
logit(threadid, iteration, 'u');
{ int r = toku_fair_rwlock_unlock(&rwlock); assert(r==0); }
if (use_frwlock_for_locking) {
toku_mutex_lock(&fmutex);
frwlock.read_unlock();
toku_mutex_unlock(&fmutex);
}
else { int r = toku_fair_rwlock_unlock(&rwlock); assert(r==0); }
}
static void grab_wrlock (int threadid, int iteration) {
logit(threadid, iteration, 'T');
{ int r = toku_fair_rwlock_wrlock(&rwlock); assert(r==0); }
if (use_frwlock_for_locking) {
toku_mutex_lock(&fmutex);
frwlock.write_lock(true);
toku_mutex_unlock(&fmutex);
}
else { int r = toku_fair_rwlock_wrlock(&rwlock); assert(r==0); }
logit(threadid, iteration, 'W');
}
static void release_wrlock (int threadid, int iteration) {
logit(threadid, iteration, 'U');
{ int r = toku_fair_rwlock_unlock(&rwlock); assert(r==0);}
if (use_frwlock_for_locking) {
toku_mutex_lock(&fmutex);
frwlock.write_unlock();
toku_mutex_unlock(&fmutex);
}
else { int r = toku_fair_rwlock_unlock(&rwlock); assert(r==0);}
}
static void *start_thread (void *vv) {
......@@ -394,18 +419,23 @@ static void *start_thread (void *vv) {
static void *start_thread_random (void *vv) {
int *vp=(int*)vv;
int v=*vp;
int wait;
for (int i=0; i<L; i++) {
if (random()%2==0) {
grab_rdlock(v, i);
for (int j=0; j<random()%20; j++) sched_yield();
wait = random() % 20;
for (int j=0; j<wait; j++) sched_yield();
release_rdlock(v, i);
for (int j=0; j<random()%20; j++) sched_yield();
wait = random() % 20;
for (int j=0; j<wait; j++) sched_yield();
} else {
grab_wrlock(v, i);
for (int j=0; j<random()%20; j++) sched_yield();
wait = random() % 20;
for (int j=0; j<wait; j++) sched_yield();
release_wrlock(v, i);
for (int j=0; j<random()%20; j++) sched_yield();
wait = random() % 20;
for (int j=0; j<wait; j++) sched_yield();
}
}
return NULL;
......@@ -470,12 +500,19 @@ static void check_actionlog (int expected_writer_max_count,
}
static void test_rwlock_internal (void *(*start_th)(void*), int max_wr, int min_rd, int max_rd) {
static void test_rwlock_internal (void *(*start_th)(void*), bool use_frwlock, int max_wr, int min_rd, int max_rd) {
if (verbose>=2) printf("Running threads:\n");
log_counter=0;
pthread_t threads[N];
int v[N];
use_frwlock_for_locking = use_frwlock;
if (use_frwlock_for_locking) {
fmutex = TOKU_MUTEX_INITIALIZER;
frwlock.init(&fmutex);
}
else {
toku_fair_rwlock_init(&rwlock);
}
for (int i=0; i<N; i++) {
v[i]=i;
int r = pthread_create(&threads[i], NULL, start_th, &v[i]);
......@@ -493,14 +530,20 @@ static void test_rwlock_internal (void *(*start_th)(void*), int max_wr, int min_
}
}
check_actionlog(max_wr, min_rd, max_rd);
toku_fair_rwlock_destroy(&rwlock);
if (use_frwlock_for_locking) {
frwlock.deinit();
toku_mutex_destroy(&fmutex);
}
else {
toku_fair_rwlock_destroy(&rwlock);
}
if (verbose>2) printf("OK\n");
}
static void test_rwlock (void) {
test_rwlock_internal(start_thread, 1, 2, 3);
static void test_rwlock (bool use_frwlock) {
test_rwlock_internal(start_thread, use_frwlock, 1, 2, 3);
for (int i=0; i<10; i++) {
test_rwlock_internal(start_thread_random, 1, 0, N);
test_rwlock_internal(start_thread_random, use_frwlock, 1, 0, N);
}
}
int main (int argc, const char *argv[]) {
......@@ -527,7 +570,8 @@ int main (int argc, const char *argv[]) {
printf("// Best fair fast rwlock time=%10.6fns\n", best_fair_rwlock_time);
}
} else {
test_rwlock();
test_rwlock(true);
test_rwlock(false);
}
return 0;
}
......
......@@ -36,8 +36,10 @@ typedef struct toku_mutex {
#if defined(__APPLE__)
static const toku_mutex_t ZERO_MUTEX_INITIALIZER = {{0}};
static const toku_mutex_t TOKU_MUTEX_INITIALIZER = { .pmutex = PTHREAD_MUTEX_INITIALIZER };
#else
static const toku_mutex_t ZERO_MUTEX_INITIALIZER = {{{0}}};
static const toku_mutex_t TOKU_MUTEX_INITIALIZER = { .pmutex = PTHREAD_MUTEX_INITIALIZER };
#endif
static inline void
......@@ -95,6 +97,8 @@ typedef struct toku_cond {
pthread_cond_t pcond;
} toku_cond_t;
#define TOKU_COND_INITIALIZER {PTHREAD_COND_INITIALIZER}
static inline void
toku_cond_init(toku_cond_t *cond, const toku_pthread_condattr_t *attr) {
int r = pthread_cond_init(&cond->pcond, attr);
......@@ -205,6 +209,11 @@ toku_pthread_join(toku_pthread_t thread, void **value_ptr) {
return pthread_join(thread, value_ptr);
}
static inline int
toku_pthread_detach(toku_pthread_t thread) {
return pthread_detach(thread);
}
static inline int
toku_pthread_key_create(toku_pthread_key_t *key, void (*destroyf)(void *)) {
return pthread_key_create(key, destroyf);
......
......@@ -924,6 +924,10 @@ static int UU() scan_op_no_check(DB_TXN *txn, ARG arg, void* operation_extra, vo
return 0;
}
static int dbt_do_nothing (DBT const *UU(key), DBT const *UU(row), void *UU(context)) {
return 0;
}
static int UU() ptquery_and_maybe_check_op(DB* db, DB_TXN *txn, ARG arg, bool check) {
int r;
int rand_key = myrandom_r(arg->random_data);
......@@ -933,7 +937,14 @@ static int UU() ptquery_and_maybe_check_op(DB* db, DB_TXN *txn, ARG arg, bool ch
DBT key, val;
dbt_init(&key, &rand_key, sizeof rand_key);
dbt_init(&val, NULL, 0);
r = db->get(db, txn, &key, &val, 0);
r = db->getf_set(
db,
txn,
0,
&key,
dbt_do_nothing,
NULL
);
if (check) assert(r != DB_NOTFOUND);
r = 0;
return r;
......
......@@ -126,10 +126,14 @@ size_t toku_memory_footprint(void * p, size_t touched);
# define HELGRIND_ANNOTATE_NEW_MEMORY(p, size) ANNOTATE_NEW_MEMORY(p, size)
# define HELGRIND_VALGRIND_HG_ENABLE_CHECKING(p, size) VALGRIND_HG_ENABLE_CHECKING(p, size)
# define HELGRIND_VALGRIND_HG_DISABLE_CHECKING(p, size) VALGRIND_HG_DISABLE_CHECKING(p, size)
# define TOKU_DRD_IGNORE_VAR(v) DRD_IGNORE_VAR(v)
# define TOKU_DRD_STOP_IGNORING_VAR(v) DRD_STOP_IGNORING_VAR(v)
#else
# define HELGRIND_ANNOTATE_NEW_MEMORY(p, size) ((void) 0)
# define HELGRIND_VALGRIND_HG_ENABLE_CHECKING(p, size) ((void) 0)
# define HELGRIND_VALGRIND_HG_DISABLE_CHECKING(p, size) ((void) 0)
# define TOKU_DRD_IGNORE_VAR(v)
# define TOKU_DRD_STOP_IGNORING_VAR(v)
#endif
......
......@@ -54,6 +54,7 @@ extern void (*do_assert_hook)(void); // Set this to a function you want called a
#else
#define assert(expr) ((expr) ? (void)0 : toku_do_assert_fail(#expr, __FUNCTION__, __FILE__, __LINE__, get_maybe_error_errno()))
#define assert_zero(expr) ((expr) == 0 ? (void)0 : toku_do_assert_zero_fail((uintptr_t)(expr), #expr, __FUNCTION__, __FILE__, __LINE__, get_maybe_error_errno()))
#define assert_null(expr) ((expr) == nullptr ? (void)0 : toku_do_assert_zero_fail((uintptr_t)(expr), #expr, __FUNCTION__, __FILE__, __LINE__, get_maybe_error_errno()))
#endif
#ifdef GCOV
......@@ -67,7 +68,7 @@ extern void (*do_assert_hook)(void); // Set this to a function you want called a
#define lazy_assert(a) assert(a) // indicates code is incomplete
#define lazy_assert_zero(a) assert_zero(a) // indicates code is incomplete
#define invariant(a) assert(a) // indicates a code invariant that must be true
#define invariant_null(a) assert_zero(a) // indicates a code invariant that must be true
#define invariant_null(a) assert_null(a) // indicates a code invariant that must be true
#define invariant_notnull(a) assert(a) // indicates a code invariant that must be true
#define invariant_zero(a) assert_zero(a) // indicates a code invariant that must be true
#define resource_assert(a) assert(a) // indicates resource must be available, otherwise unrecoverable
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment