Commit b66f4e0c authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

#4443 try to speed up cursor create and close on the mainline. the read lock...

#4443 try to speed up cursor create and close on the mainline.  the read lock on the directory is a BIG bottleneck refs[t:4443]

git-svn-id: file:///svn/toku/tokudb@39683 c7de825b-a66e-492c-adef-691d508d4ae1
parent 77775ddc
......@@ -9,6 +9,7 @@
#include <pthread.h>
#include <time.h>
#include <stdint.h>
#include "toku_assert.h"
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
......@@ -119,6 +120,56 @@ toku_pthread_mutex_unlock(toku_pthread_mutex_t *mutex) {
return pthread_mutex_unlock(mutex);
}
static inline void
toku_mutex_init(toku_pthread_mutex_t *mutex, const toku_pthread_mutexattr_t *attr) {
int r = pthread_mutex_init(mutex, attr);
assert_zero(r);
}
static inline void
toku_mutex_destroy(toku_pthread_mutex_t *mutex) {
int r = pthread_mutex_destroy(mutex);
assert_zero(r);
}
static inline void
toku_mutex_lock(toku_pthread_mutex_t *mutex) {
int r = pthread_mutex_lock(mutex);
assert_zero(r);
}
static inline void
toku_mutex_unlock(toku_pthread_mutex_t *mutex) {
int r = pthread_mutex_unlock(mutex);
assert_zero(r);
}
typedef pthread_spinlock_t toku_spinlock_t;
static inline void
toku_spin_init(toku_spinlock_t *lock, int pshared) {
int r = pthread_spin_init(lock, pshared);
assert_zero(r);
}
static inline void
toku_spin_destroy(toku_spinlock_t *lock) {
int r = pthread_spin_destroy(lock);
assert_zero(r);
}
static inline void
toku_spin_lock(toku_spinlock_t *lock) {
int r = pthread_spin_lock(lock);
assert_zero(r);
}
static inline void
toku_spin_unlock(toku_spinlock_t *lock) {
int r = pthread_spin_unlock(lock);
assert_zero(r);
}
static inline int
toku_pthread_cond_init(toku_pthread_cond_t *cond, const toku_pthread_condattr_t *attr) {
return pthread_cond_init(cond, attr);
......
......@@ -413,6 +413,7 @@ struct brt {
// The header is shared. It is also ephemeral.
struct brt_header *h;
toku_spinlock_t cursors_lock;
struct toku_list cursors;
unsigned int nodesize;
......
......@@ -3478,6 +3478,8 @@ static void (*callback_db_set_brt)(DB *db, BRT brt) = NULL;
static void
brt_redirect_cursors (BRT brt_to, BRT brt_from) {
assert(brt_to->db == brt_from->db);
toku_spin_lock(&brt_to->cursors_lock);
toku_spin_lock(&brt_from->cursors_lock);
while (!toku_list_empty(&brt_from->cursors)) {
struct toku_list * c_list = toku_list_head(&brt_from->cursors);
BRT_CURSOR c = toku_list_struct(c_list, struct brt_cursor, cursors_link);
......@@ -3488,6 +3490,8 @@ brt_redirect_cursors (BRT brt_to, BRT brt_from) {
c->brt = brt_to;
}
toku_spin_unlock(&brt_to->cursors_lock);
toku_spin_unlock(&brt_from->cursors_lock);
}
static void
......@@ -4092,9 +4096,10 @@ int toku_close_brt_lsn (BRT brt, char **error_string, BOOL oplsn_valid, LSN opls
int r;
while (!toku_list_empty(&brt->cursors)) {
BRT_CURSOR c = toku_list_struct(toku_list_pop(&brt->cursors), struct brt_cursor, cursors_link);
r=toku_brt_cursor_close(c);
r = toku_brt_cursor_close(c);
if (r!=0) return r;
}
toku_spin_destroy(&brt->cursors_lock);
// Must do this work before closing the cf
r=toku_txn_note_close_brt(brt);
......@@ -4133,6 +4138,10 @@ int toku_brt_create(BRT *brt_ptr) {
brt->update_fun = NULL;
int r = toku_omt_create(&brt->txns);
if (r!=0) { toku_free(brt); return r; }
pthread_mutexattr_t attr;
r = pthread_mutexattr_init(&attr); assert_zero(r);
r = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ADAPTIVE_NP); assert_zero(r);
toku_spin_init(&brt->cursors_lock, 0);
*brt_ptr = brt;
return 0;
}
......@@ -4236,7 +4245,11 @@ int toku_brt_cursor (
cursor->is_leaf_mode = FALSE;
cursor->ttxn = ttxn;
cursor->disable_prefetching = disable_prefetching;
if (1) {
toku_spin_lock(&brt->cursors_lock);
toku_list_push(&brt->cursors, &cursor->cursors_link);
toku_spin_unlock(&brt->cursors_lock);
}
*cursorptr = cursor;
return 0;
}
......@@ -4278,8 +4291,6 @@ toku_brt_cursor_set_range_lock(BRT_CURSOR cursor, const DBT *left, const DBT *ri
}
}
//TODO: #1378 When we split the ydb lock, touching cursor->cursors_link
//is not thread safe.
int toku_brt_cursor_close(BRT_CURSOR cursor) {
brt_cursor_cleanup_dbts(cursor);
if (cursor->range_lock_left_key.data) {
......@@ -4290,8 +4301,12 @@ int toku_brt_cursor_close(BRT_CURSOR cursor) {
toku_free(cursor->range_lock_right_key.data);
toku_destroy_dbt(&cursor->range_lock_right_key);
}
if (1) {
toku_spin_lock(&cursor->brt->cursors_lock);
toku_list_remove(&cursor->cursors_link);
toku_free_n(cursor, sizeof *cursor);
toku_spin_unlock(&cursor->brt->cursors_lock);
}
toku_free(cursor);
return 0;
}
......@@ -5738,9 +5753,10 @@ BOOL toku_brt_cursor_uninitialized(BRT_CURSOR c) {
int toku_brt_get_cursor_count (BRT brt) {
int n = 0;
struct toku_list *list;
for (list = brt->cursors.next; list != &brt->cursors; list = list->next)
toku_spin_lock(&brt->cursors_lock);
for (struct toku_list *list = brt->cursors.next; list != &brt->cursors; list = list->next)
n += 1;
toku_spin_unlock(&brt->cursors_lock);
return n;
}
......
......@@ -14,28 +14,8 @@
#include "threaded_stress_test_helpers.h"
//
// This test is a form of stress that does operations on a single dictionary:
// We create a dictionary bigger than the cachetable (around 4x greater).
// Then, we spawn a bunch of pthreads that do the following:
// - scan dictionary forward with bulk fetch
// - scan dictionary forward slowly
// - scan dictionary backward with bulk fetch
// - scan dictionary backward slowly
// - Grow the dictionary with insertions
// - do random point queries into the dictionary
// With the small cachetable, this should produce quite a bit of churn in reading in and evicting nodes.
// If the test runs to completion without crashing, we consider it a success. It also tests that snapshots
// work correctly by verifying that table scans sum their vals to 0.
//
// This does NOT test:
// - splits and merges
// - multiple DBs
//
// Variables that are interesting to tweak and run:
// - small cachetable
// - number of elements
//
// The intent of this test is to measure the throughput of cursor create and close
// with multiple threads.
static void
stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) {
......
......@@ -15,40 +15,12 @@
#include "threaded_stress_test_helpers.h"
//
// This test is a form of stress that does operations on a single dictionary:
// We create a dictionary bigger than the cachetable (around 4x greater).
// Then, we spawn a bunch of pthreads that do the following:
// - scan dictionary forward with bulk fetch
// - scan dictionary forward slowly
// - scan dictionary backward with bulk fetch
// - scan dictionary backward slowly
// - Grow the dictionary with insertions
// - do random point queries into the dictionary
// With the small cachetable, this should produce quite a bit of churn in reading in and evicting nodes.
// If the test runs to completion without crashing, we consider it a success. It also tests that snapshots
// work correctly by verifying that table scans sum their vals to 0.
//
// This does NOT test:
// - splits and merges
// - multiple DBs
//
// Variables that are interesting to tweak and run:
// - small cachetable
// - number of elements
//
// The intent of this test is to measure the throughput of malloc and free
// with multiple threads.
static void
stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) {
int n = cli_args->num_elements;
//
// the threads that we want:
// - some threads constantly updating random values
// - one thread doing table scan with bulk fetch
// - one thread doing table scan without bulk fetch
// - some threads doing random point queries
//
if (verbose) printf("starting creation of pthreads\n");
const int num_threads = cli_args->num_ptquery_threads;
struct arg myargs[num_threads];
......
......@@ -14,40 +14,12 @@
#include "threaded_stress_test_helpers.h"
//
// This test is a form of stress that does operations on a single dictionary:
// We create a dictionary bigger than the cachetable (around 4x greater).
// Then, we spawn a bunch of pthreads that do the following:
// - scan dictionary forward with bulk fetch
// - scan dictionary forward slowly
// - scan dictionary backward with bulk fetch
// - scan dictionary backward slowly
// - Grow the dictionary with insertions
// - do random point queries into the dictionary
// With the small cachetable, this should produce quite a bit of churn in reading in and evicting nodes.
// If the test runs to completion without crashing, we consider it a success. It also tests that snapshots
// work correctly by verifying that table scans sum their vals to 0.
//
// This does NOT test:
// - splits and merges
// - multiple DBs
//
// Variables that are interesting to tweak and run:
// - small cachetable
// - number of elements
//
// The intent of this test is to measure the throughput of the test infrastructure executing a nop
// on multiple threads.
static void
stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) {
int n = cli_args->num_elements;
//
// the threads that we want:
// - some threads constantly updating random values
// - one thread doing table scan with bulk fetch
// - one thread doing table scan without bulk fetch
// - some threads doing random point queries
//
if (verbose) printf("starting creation of pthreads\n");
const int num_threads = cli_args->num_ptquery_threads;
struct arg myargs[num_threads];
......
......@@ -14,40 +14,12 @@
#include "threaded_stress_test_helpers.h"
//
// This test is a form of stress that does operations on a single dictionary:
// We create a dictionary bigger than the cachetable (around 4x greater).
// Then, we spawn a bunch of pthreads that do the following:
// - scan dictionary forward with bulk fetch
// - scan dictionary forward slowly
// - scan dictionary backward with bulk fetch
// - scan dictionary backward slowly
// - Grow the dictionary with insertions
// - do random point queries into the dictionary
// With the small cachetable, this should produce quite a bit of churn in reading in and evicting nodes.
// If the test runs to completion without crashing, we consider it a success. It also tests that snapshots
// work correctly by verifying that table scans sum their vals to 0.
//
// This does NOT test:
// - splits and merges
// - multiple DBs
//
// Variables that are interesting to tweak and run:
// - small cachetable
// - number of elements
//
// The intent of this test is to measure the throughput of toku_malloc and toku_free
// with multiple threads.
static void
stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) {
int n = cli_args->num_elements;
//
// the threads that we want:
// - some threads constantly updating random values
// - one thread doing table scan with bulk fetch
// - one thread doing table scan without bulk fetch
// - some threads doing random point queries
//
if (verbose) printf("starting creation of pthreads\n");
const int num_threads = cli_args->num_ptquery_threads;
struct arg myargs[num_threads];
......
......@@ -458,9 +458,7 @@ static int toku_db_put(DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t f
static int toku_db_update(DB *db, DB_TXN *txn, const DBT *key, const DBT *update_function_extra, u_int32_t flags);
static int toku_db_update_broadcast(DB *db, DB_TXN *txn, const DBT *update_function_extra, u_int32_t flags);
static int toku_db_get (DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags);
static int toku_db_cursor(DB *db, DB_TXN * txn, DBC **c, u_int32_t flags, int is_temporary_cursor);
/* txn methods */
static int toku_db_cursor_internal(DB *db, DB_TXN * txn, DBC **c, u_int32_t flags, int is_temporary_cursor, bool holds_ydb_lock);
/* lightweight cursor methods. */
static int toku_c_getf_first(DBC *c, u_int32_t flag, YDB_CALLBACK_FUNCTION f, void *extra);
......@@ -4111,7 +4109,10 @@ c_getf_set_range_reverse_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, b
return r;
}
static int toku_c_close(DBC * c) {
// Close a cursor.
// Does not require the ydb lock held when called.
static int
toku_c_close(DBC * c) {
HANDLE_PANICKED_DB(c->dbp);
HANDLE_CURSOR_ILLEGAL_WORKING_PARENT_TXN(c);
int r = toku_brt_cursor_close(dbc_struct_i(c)->c);
......@@ -4150,7 +4151,7 @@ toku_c_count(DBC *cursor, db_recno_t *count, u_int32_t flags) {
// lock_flags |= DB_PRELOCKED
//}
r = toku_db_cursor(cursor->dbp, dbc_struct_i(cursor)->txn, &count_cursor, DBC_DISABLE_PREFETCHING, 0);
r = toku_db_cursor_internal(cursor->dbp, dbc_struct_i(cursor)->txn, &count_cursor, DBC_DISABLE_PREFETCHING, 0, true);
if (r != 0) goto finish;
r = toku_c_getf_set(count_cursor, lock_flags, &currentkey, ydb_getf_do_nothing, NULL);
......@@ -4179,7 +4180,7 @@ db_getf_set(DB *db, DB_TXN *txn, u_int32_t flags, DBT *key, YDB_CALLBACK_FUNCTIO
DBC *c;
uint32_t create_flags = flags & (DB_ISOLATION_FLAGS | DB_RMW);
flags &= ~DB_ISOLATION_FLAGS;
int r = toku_db_cursor(db, txn, &c, create_flags | DBC_DISABLE_PREFETCHING, 1);
int r = toku_db_cursor_internal(db, txn, &c, create_flags | DBC_DISABLE_PREFETCHING, 1, true);
if (r==0) {
r = toku_c_getf_set(c, flags, key, f, extra);
int r2 = toku_c_close(c);
......@@ -4401,11 +4402,6 @@ locked_c_get(DBC * c, DBT * key, DBT * data, u_int32_t flag) {
return r;
}
static int
locked_c_close(DBC * c) {
toku_ydb_lock(); int r = toku_c_close(c); toku_ydb_unlock(); return r;
}
static int
locked_c_count(DBC *cursor, db_recno_t *count, u_int32_t flags) {
toku_ydb_lock(); int r = toku_c_count(cursor, count, flags); toku_ydb_unlock(); return r;
......@@ -4419,11 +4415,10 @@ locked_c_del(DBC * c, u_int32_t flags) {
static int locked_c_pre_acquire_range_lock(DBC *dbc, const DBT *key_left, const DBT *key_right);
static int
toku_db_cursor(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags, int is_temporary_cursor) {
toku_db_cursor_internal(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags, int is_temporary_cursor, bool holds_ydb_lock) {
HANDLE_PANICKED_DB(db);
HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
DB_ENV* env = db->dbenv;
int r;
if (flags & ~(DB_SERIALIZABLE | DB_INHERIT_ISOLATION | DB_RMW | DBC_DISABLE_PREFETCHING)) {
return toku_ydb_do_error(
......@@ -4433,17 +4428,22 @@ toku_db_cursor(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags, int is_temporar
);
}
int r = 0;
if (1) {
if (!holds_ydb_lock) toku_ydb_lock();
r = toku_grab_read_lock_on_directory(db, txn);
if (!holds_ydb_lock) toku_ydb_unlock();
if (r != 0)
return r;
}
struct __toku_dbc_external *XMALLOC(eresult); // so the internal stuff is stuck on the end
memset(eresult, 0, sizeof(*eresult));
DBC *result = &eresult->external_part;
// methods that grab the ydb lock
#define SCRS(name) result->name = locked_ ## name
SCRS(c_get);
SCRS(c_close);
SCRS(c_del);
SCRS(c_count);
SCRS(c_getf_first);
......@@ -4457,6 +4457,8 @@ toku_db_cursor(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags, int is_temporar
SCRS(c_getf_set_range_reverse);
SCRS(c_pre_acquire_range_lock);
#undef SCRS
// unlocked methods
result->c_close = toku_c_close;
#if !TOKUDB_NATIVE_H
MALLOC(result->i); // otherwise it is allocated as part of result->ii
......@@ -4528,7 +4530,7 @@ toku_db_get (DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags) {
DBC *dbc;
r = toku_db_cursor(db, txn, &dbc, iso_flags | DBC_DISABLE_PREFETCHING, 1);
r = toku_db_cursor_internal(db, txn, &dbc, iso_flags | DBC_DISABLE_PREFETCHING, 1, true);
if (r!=0) return r;
u_int32_t c_get_flags = DB_SET;
r = toku_c_get(dbc, key, data, c_get_flags | lock_flags);
......@@ -5874,12 +5876,15 @@ autotxn_db_cursor(DB *db, DB_TXN *txn, DBC **c, u_int32_t flags) {
return toku_ydb_do_error(db->dbenv, EINVAL,
"Cursors in a transaction environment must have transactions.\n");
}
return toku_db_cursor(db, txn, c, flags, 0);
return toku_db_cursor_internal(db, txn, c, flags, 0, false);
}
// Create a cursor on a db.
// Called without holding the ydb lock.
static int
locked_db_cursor(DB *db, DB_TXN *txn, DBC **c, u_int32_t flags) {
toku_ydb_lock(); int r = autotxn_db_cursor(db, txn, c, flags); toku_ydb_unlock(); return r;
toku_db_cursor(DB *db, DB_TXN *txn, DBC **c, u_int32_t flags) {
int r = autotxn_db_cursor(db, txn, c, flags);
return r;
}
static inline int
......@@ -6352,10 +6357,10 @@ toku_db_create(DB ** db, DB_ENV * env, u_int32_t flags) {
}
memset(result, 0, sizeof *result);
result->dbenv = env;
// methods that grab the ydb lock
#define SDB(name) result->name = locked_db_ ## name
SDB(key_range64);
SDB(close);
SDB(cursor);
SDB(del);
SDB(get);
// SDB(key_range);
......@@ -6389,6 +6394,9 @@ toku_db_create(DB ** db, DB_ENV * env, u_int32_t flags) {
SDB(get_indexer);
SDB(verify_with_progress);
#undef SDB
// unlocked methods
result->cursor = toku_db_cursor;
result->dbt_pos_infty = toku_db_dbt_pos_infty;
result->dbt_neg_infty = toku_db_dbt_neg_infty;
MALLOC(result->i);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment