Commit 63f93f3e authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

#4470 add a nop performance test and don't share per thread worker_extra in...

#4470 add a nop performance test and don't share per thread worker_extra in processor cache lines closes[t:4470]

git-svn-id: file:///svn/toku/tokudb@39614 c7de825b-a66e-492c-adef-691d508d4ae1
parent 26497442
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007 Tokutek Inc. All rights reserved."
#ident "$Id: test_stress1.c 39258 2012-01-27 13:51:58Z zardosht $"
#include "test.h"
#include <stdio.h>
#include <stdlib.h>
#include <toku_pthread.h>
#include <unistd.h>
#include <memory.h>
#include <sys/stat.h>
#include <db.h>
#include "threaded_stress_test_helpers.h"
//
// This test is a form of stress that does operations on a single dictionary:
// We create a dictionary bigger than the cachetable (around 4x greater).
// Then, we spawn a bunch of pthreads that do the following:
// - scan dictionary forward with bulk fetch
// - scan dictionary forward slowly
// - scan dictionary backward with bulk fetch
// - scan dictionary backward slowly
// - Grow the dictionary with insertions
// - do random point queries into the dictionary
// With the small cachetable, this should produce quite a bit of churn in reading in and evicting nodes.
// If the test runs to completion without crashing, we consider it a success. It also tests that snapshots
// work correctly by verifying that table scans sum their vals to 0.
//
// This does NOT test:
// - splits and merges
// - multiple DBs
//
// Variables that are interesting to tweak and run:
// - small cachetable
// - number of elements
//
static void
stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) {
int n = cli_args->num_elements;
//
// the threads that we want:
// - some threads constantly updating random values
// - one thread doing table scan with bulk fetch
// - one thread doing table scan without bulk fetch
// - some threads doing random point queries
//
if (verbose) printf("starting creation of pthreads\n");
const int num_threads = cli_args->num_ptquery_threads;
struct arg myargs[num_threads];
for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], n, dbp, env, cli_args);
}
for (int i = 0; i < num_threads; i++) {
myargs[i].operation = nop;
}
run_workers(myargs, num_threads, cli_args->time_of_test, false, cli_args);
}
int
test_main(int argc, char *const argv[]) {
struct cli_args args = get_default_args_for_perf();
parse_stress_test_args(argc, argv, &args);
stress_test_main(&args);
return 0;
}
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
#include <memory.h> #include <memory.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <db.h> #include <db.h>
#include <malloc.h>
static inline int32_t static inline int32_t
myrandom_r(struct random_data *buf) myrandom_r(struct random_data *buf)
...@@ -58,6 +58,7 @@ struct arg { ...@@ -58,6 +58,7 @@ struct arg {
enum stress_lock_type lock_type; // states if operation must be exclusive, shared, or does not require locking enum stress_lock_type lock_type; // states if operation must be exclusive, shared, or does not require locking
bool crash_on_operation_failure; // true if we should crash if operation returns non-zero, false otherwise bool crash_on_operation_failure; // true if we should crash if operation returns non-zero, false otherwise
struct random_data *random_data; // state for random_r struct random_data *random_data; // state for random_r
bool single_txn;
}; };
struct env_args { struct env_args {
...@@ -90,6 +91,7 @@ struct cli_args { ...@@ -90,6 +91,7 @@ struct cli_args {
u_int32_t key_size; // number of bytes in vals. Must be at least 4 u_int32_t key_size; // number of bytes in vals. Must be at least 4
u_int32_t val_size; // number of bytes in vals. Must be at least 4 u_int32_t val_size; // number of bytes in vals. Must be at least 4
struct env_args env_args; // specifies environment variables struct env_args env_args; // specifies environment variables
bool single_txn;
}; };
DB_TXN * const null_txn = 0; DB_TXN * const null_txn = 0;
...@@ -104,6 +106,7 @@ static void arg_init(struct arg *arg, int num_elements, DB **dbp, DB_ENV *env, s ...@@ -104,6 +106,7 @@ static void arg_init(struct arg *arg, int num_elements, DB **dbp, DB_ENV *env, s
arg->lock_type = STRESS_LOCK_NONE; arg->lock_type = STRESS_LOCK_NONE;
arg->txn_type = DB_TXN_SNAPSHOT; arg->txn_type = DB_TXN_SNAPSHOT;
arg->crash_on_operation_failure = cli_args->crash_on_update_failure; arg->crash_on_operation_failure = cli_args->crash_on_update_failure;
arg->single_txn = cli_args->single_txn;
arg->operation_extra = NULL; arg->operation_extra = NULL;
} }
...@@ -111,7 +114,8 @@ struct worker_extra { ...@@ -111,7 +114,8 @@ struct worker_extra {
struct arg* thread_arg; struct arg* thread_arg;
toku_pthread_mutex_t *operation_lock_mutex; toku_pthread_mutex_t *operation_lock_mutex;
struct rwlock *operation_lock; struct rwlock *operation_lock;
int num_operations_completed; int64_t num_operations_completed;
int64_t pad[4]; // pad to 64 bytes
}; };
static void lock_worker_op(struct worker_extra* we) { static void lock_worker_op(struct worker_extra* we) {
...@@ -158,17 +162,26 @@ static void *worker(void *arg_v) { ...@@ -158,17 +162,26 @@ static void *worker(void *arg_v) {
if (verbose) { if (verbose) {
printf("%lu starting %p\n", toku_pthread_self(), arg->operation); printf("%lu starting %p\n", toku_pthread_self(), arg->operation);
} }
if (arg->single_txn) {
r = env->txn_begin(env, 0, &txn, arg->txn_type); CKERR(r);
}
while (run_test) { while (run_test) {
lock_worker_op(we); lock_worker_op(we);
r = env->txn_begin(env, 0, &txn, arg->txn_type); CKERR(r); if (!arg->single_txn) {
r = env->txn_begin(env, 0, &txn, arg->txn_type); CKERR(r);
}
r = arg->operation(txn, arg, arg->operation_extra); r = arg->operation(txn, arg, arg->operation_extra);
if (r == 0) { if (r == 0) {
CHK(txn->commit(txn,0)); if (!arg->single_txn) {
CHK(txn->commit(txn,0));
}
} else { } else {
if (arg->crash_on_operation_failure) { if (arg->crash_on_operation_failure) {
CKERR(r); CKERR(r);
} else { } else {
CHK(txn->abort(txn)); if (!arg->single_txn) {
CHK(txn->abort(txn));
}
} }
} }
unlock_worker_op(we); unlock_worker_op(we);
...@@ -177,6 +190,9 @@ static void *worker(void *arg_v) { ...@@ -177,6 +190,9 @@ static void *worker(void *arg_v) {
usleep(arg->sleep_ms * 1000); usleep(arg->sleep_ms * 1000);
} }
} }
if (arg->single_txn) {
CHK(txn->commit(txn, 0));
}
if (verbose) if (verbose)
printf("%lu returning\n", toku_pthread_self()); printf("%lu returning\n", toku_pthread_self());
toku_free(random_buf); toku_free(random_buf);
...@@ -266,7 +282,11 @@ static int generate_row_for_put( ...@@ -266,7 +282,11 @@ static int generate_row_for_put(
return 0; return 0;
} }
static int UU() loader_op(DB_TXN* txn, ARG arg, void* UU(operation_extra)) { static int UU() nop(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra)) {
return 0;
}
static int UU() loader_op(DB_TXN* txn, ARG UU(arg), void* UU(operation_extra)) {
DB_ENV* env = arg->env; DB_ENV* env = arg->env;
int r; int r;
for (int num = 0; num < 2; num++) { for (int num = 0; num < 2; num++) {
...@@ -373,6 +393,15 @@ static int UU() ptquery_op_no_check(DB_TXN *txn, ARG arg, void* UU(operation_ext ...@@ -373,6 +393,15 @@ static int UU() ptquery_op_no_check(DB_TXN *txn, ARG arg, void* UU(operation_ext
return ptquery_and_maybe_check_op(db, txn, arg, FALSE); return ptquery_and_maybe_check_op(db, txn, arg, FALSE);
} }
static int UU() cursor_create_close_op(DB_TXN *txn, ARG arg, void* UU(operation_extra)) {
int db_index = arg->num_DBs > 1 ? random()%arg->num_DBs : 0;
DB* db = arg->dbp[db_index];
DBC* cursor = NULL;
int r = db->cursor(db, txn, &cursor, 0); assert(r == 0);
r = cursor->c_close(cursor); assert(r == 0);
return 0;
}
#define MAX_RANDOM_VAL 10000 #define MAX_RANDOM_VAL 10000
enum update_type { enum update_type {
...@@ -704,7 +733,7 @@ static void *test_time(void *arg) { ...@@ -704,7 +733,7 @@ static void *test_time(void *arg) {
if (verbose) { if (verbose) {
printf("Sleeping for %d seconds\n", num_seconds); printf("Sleeping for %d seconds\n", num_seconds);
} }
int num_operations_completed_total[tte->num_wes]; int64_t num_operations_completed_total[tte->num_wes];
memset(num_operations_completed_total, 0, sizeof num_operations_completed_total); memset(num_operations_completed_total, 0, sizeof num_operations_completed_total);
for (int i = 0; i < num_seconds; i += tte->performance_period) { for (int i = 0; i < num_seconds; i += tte->performance_period) {
usleep(tte->performance_period*1000*1000); usleep(tte->performance_period*1000*1000);
...@@ -762,7 +791,8 @@ static int run_workers( ...@@ -762,7 +791,8 @@ static int run_workers(
rwlock_init(&rwlock); rwlock_init(&rwlock);
toku_pthread_t tids[num_threads]; toku_pthread_t tids[num_threads];
toku_pthread_t time_tid; toku_pthread_t time_tid;
struct worker_extra worker_extra[num_threads]; struct worker_extra *worker_extra = (struct worker_extra *)
memalign(64, num_threads * sizeof (struct worker_extra)); // allocate worker_extra's on cache line boundaries
struct test_time_extra tte; struct test_time_extra tte;
tte.num_seconds = num_seconds; tte.num_seconds = num_seconds;
tte.crash_at_end = crash_at_end; tte.crash_at_end = crash_at_end;
...@@ -797,6 +827,7 @@ static int run_workers( ...@@ -797,6 +827,7 @@ static int run_workers(
printf("ending test, pthreads have joined\n"); printf("ending test, pthreads have joined\n");
rwlock_destroy(&rwlock); rwlock_destroy(&rwlock);
toku_pthread_mutex_destroy(&mutex); toku_pthread_mutex_destroy(&mutex);
toku_free(worker_extra);
return r; return r;
} }
...@@ -990,6 +1021,7 @@ static struct cli_args UU() get_default_args(void) { ...@@ -990,6 +1021,7 @@ static struct cli_args UU() get_default_args(void) {
.key_size = MIN_KEY_SIZE, .key_size = MIN_KEY_SIZE,
.val_size = MIN_VAL_SIZE, .val_size = MIN_VAL_SIZE,
.env_args = DEFAULT_ENV_ARGS, .env_args = DEFAULT_ENV_ARGS,
.single_txn = false,
}; };
return DEFAULT_ARGS; return DEFAULT_ARGS;
} }
...@@ -1078,7 +1110,7 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct ...@@ -1078,7 +1110,7 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct
argc--; argv++; argc--; argv++;
args->update_broadcast_period_ms = atoi(argv[1]); args->update_broadcast_period_ms = atoi(argv[1]);
} }
else if (strcmp(argv[1], "--num_ptquery_threads") == 0) { else if (strcmp(argv[1], "--num_ptquery_threads") == 0 || strcmp(argv[1], "--num_threads") == 0) {
argc--; argv++; argc--; argv++;
args->num_ptquery_threads = atoi(argv[1]); args->num_ptquery_threads = atoi(argv[1]);
} }
...@@ -1132,6 +1164,9 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct ...@@ -1132,6 +1164,9 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct
argc--; argv++; argc--; argv++;
args->env_args.envdir = argv[1]; args->env_args.envdir = argv[1];
} }
else if (strcmp(argv[1], "--single_txn") == 0) {
args->single_txn = true;
}
else { else {
resultcode=1; resultcode=1;
goto do_usage; goto do_usage;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment