Commit f941a12b authored by Leif Walsh's avatar Leif Walsh Committed by Yoni Fogel

[t:5027] some work on making threaded_stress_test_helpers better for benchmarking

also created perf_insert.tdb


git-svn-id: file:///svn/toku/tokudb@44332 c7de825b-a66e-492c-adef-691d508d4ae1
parent decb4dd9
......@@ -92,6 +92,7 @@ if(BUILD_TESTING)
mvcc-many-committed.c
perf_checkpoint_var.c
perf_cursor_nop.c
perf_insert.c
perf_malloc_free.c
perf_nop.c
perf_ptquery.c
......
......@@ -123,7 +123,7 @@ static void checkpoint_callback_2(void * extra) {
// - number of elements
//
static int checkpoint_var(DB_TXN *txn, ARG arg, void* operation_extra) {
static int checkpoint_var(DB_TXN *txn, ARG arg, void* operation_extra, void *UU(stats_extra)) {
int db_index = random()%arg->num_DBs;
int r = 0;
int val_size = *(int *)operation_extra;
......
/* -*- mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: expandtab:ts=8:sw=4:softtabstop=4:
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#ident "$Id$"
#include "test.h"
#include <stdio.h>
#include <stdlib.h>
#include <toku_pthread.h>
#include <unistd.h>
#include <memory.h>
#include <sys/stat.h>
#include <db.h>
#include "threaded_stress_test_helpers.h"
// The intent of this test is to measure the throughput of db->puts
// with multiple threads.
static void
stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) {
int n = cli_args->num_elements;
if (verbose) printf("starting creation of pthreads\n");
const int num_threads = cli_args->num_put_threads;
struct arg myargs[num_threads];
for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], n, dbp, env, cli_args);
}
for (int i = 0; i < num_threads; i++) {
myargs[i].operation = random_put_op;
}
run_workers(myargs, num_threads, cli_args->time_of_test, false, cli_args);
}
int
test_main(int argc, char *const argv[]) {
struct cli_args args = get_default_args_for_perf();
parse_stress_test_args(argc, argv, &args);
stress_test_main(&args);
return 0;
}
......@@ -16,7 +16,7 @@
#include "threaded_stress_test_helpers.h"
static int ptquery_op2(DB_TXN *txn, ARG arg, void* operation_extra) {
static int ptquery_op2(DB_TXN *txn, ARG arg, void* operation_extra, void *UU(stats_extra)) {
int db_index = *(int *)operation_extra;
DB* db = arg->dbp[db_index];
return ptquery_and_maybe_check_op(db, txn, arg, TRUE);
......
......@@ -55,7 +55,7 @@ static void checkpoint_callback2(void* UU(extra)) {
}
}
static int manual_checkpoint(DB_TXN *UU(txn), ARG UU(arg), void* operation_extra) {
static int manual_checkpoint(DB_TXN *UU(txn), ARG UU(arg), void* operation_extra, void *UU(stats_extra)) {
DB_ENV* env = operation_extra;
int r = env->txn_checkpoint(env,0,0,0);
assert_zero(r);
......
......@@ -18,7 +18,7 @@
// The intent of this test is to measure create and abort transactions
// with garbage collection verification on
static int random_sleep(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra)) {
static int random_sleep(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) {
usleep(random()%2000);
return 0;
}
......
......@@ -54,9 +54,25 @@ myrandom_r(struct random_data *buf)
return x;
}
#else
# error "no suitable reentrant random function available (checked random_r and jrand48)"
# error "no suitable reentrant random function available (checked random_r and nrand48)"
#endif
static inline uint64_t
randu62(struct random_data *buf)
{
uint64_t a = myrandom_r(buf);
uint64_t b = myrandom_r(buf);
return (a | (b << 31));
}
static inline uint64_t
randu64(struct random_data *buf)
{
uint64_t r62 = randu62(buf);
uint64_t c = myrandom_r(buf);
return (r62 | ((c & 0x3) << 62));
}
#if !defined(HAVE_MEMALIGN)
# if defined(HAVE_VALLOC)
static void *
......@@ -72,10 +88,17 @@ memalign(size_t UU(alignment), size_t size)
volatile bool run_test; // should be volatile since we are communicating through this variable.
typedef struct arg *ARG;
typedef int (*operation_t)(DB_TXN *txn, ARG arg, void* operation_extra);
typedef int (*operation_t)(DB_TXN *txn, ARG arg, void *operation_extra, void *stats_extra);
typedef int (*test_update_callback_f)(DB *, const DBT *key, const DBT *old_val, const DBT *extra, void (*set_val)(const DBT *new_val, void *set_extra), void *set_extra);
enum operation_type {
OPERATION = 0,
PUTS,
NUM_OPERATION_TYPES
};
static void increment_counter(void *extra, enum operation_type type, uint64_t inc);
enum stress_lock_type {
STRESS_LOCK_NONE = 0,
STRESS_LOCK_SHARED,
......@@ -98,7 +121,10 @@ struct arg {
enum stress_lock_type lock_type; // states if operation must be exclusive, shared, or does not require locking
bool crash_on_operation_failure; // true if we should crash if operation returns non-zero, false otherwise
struct random_data *random_data; // state for random_r
u_int32_t txn_size; // specifies number of updates/puts/whatevers per txn
bool single_txn;
int thread_idx;
int num_threads;
};
struct env_args {
......@@ -123,11 +149,12 @@ struct cli_args {
bool do_test_and_crash; // true if we should crash after running stress test. For recovery tests.
bool do_recover; // true if we should run recover
int num_update_threads; // number of threads running updates
int num_put_threads; // number of threads running puts
bool crash_on_update_failure;
bool print_performance;
bool print_thread_performance;
int performance_period;
u_int32_t update_txn_size; // for clients that do updates, specifies number of updates per txn
u_int32_t txn_size; // specifies number of updates/puts/whatevers per txn
u_int32_t key_size; // number of bytes in vals. Must be at least 4
u_int32_t val_size; // number of bytes in vals. Must be at least 4
struct env_args env_args; // specifies environment variables
......@@ -147,6 +174,7 @@ static void arg_init(struct arg *arg, int num_elements, DB **dbp, DB_ENV *env, s
arg->lock_type = STRESS_LOCK_NONE;
arg->txn_type = DB_TXN_SNAPSHOT;
arg->crash_on_operation_failure = cli_args->crash_on_update_failure;
arg->txn_size = cli_args->txn_size;
arg->single_txn = cli_args->single_txn;
arg->operation_extra = NULL;
}
......@@ -155,7 +183,7 @@ struct worker_extra {
struct arg* thread_arg;
toku_mutex_t *operation_lock_mutex;
struct rwlock *operation_lock;
int64_t num_operations_completed;
uint64_t *counters;
int64_t pad[4]; // pad to 64 bytes
};
......@@ -189,14 +217,22 @@ static void unlock_worker_op(struct worker_extra* we) {
}
}
static void increment_counter(void *extra, enum operation_type type, uint64_t inc) {
invariant(type != OPERATION);
int t = (int) type;
invariant(extra);
invariant(t >= 0 && t < (int) NUM_OPERATION_TYPES);
struct worker_extra *we = extra;
we->counters[t] += inc;
}
static void *worker(void *arg_v) {
int r;
struct worker_extra* we = arg_v;
ARG arg = we->thread_arg;
struct random_data random_data;
memset(&random_data, 0, sizeof random_data);
char *random_buf = toku_xmalloc(8);
memset(random_buf, 0, 8);
ZERO_STRUCT(random_data);
char *XCALLOC_N(8, random_buf);
r = initstate_r(random(), random_buf, 8, &random_data);
assert_zero(r);
arg->random_data = &random_data;
......@@ -213,7 +249,7 @@ static void *worker(void *arg_v) {
if (!arg->single_txn) {
r = env->txn_begin(env, 0, &txn, arg->txn_type); CKERR(r);
}
r = arg->operation(txn, arg, arg->operation_extra);
r = arg->operation(txn, arg, arg->operation_extra, we);
if (r == 0) {
if (!arg->single_txn) {
CHK(txn->commit(txn,0));
......@@ -228,7 +264,7 @@ static void *worker(void *arg_v) {
}
}
unlock_worker_op(we);
(void) __sync_fetch_and_add(&we->num_operations_completed, 1);
we->counters[OPERATION]++;
if (arg->sleep_ms) {
usleep(arg->sleep_ms * 1000);
}
......@@ -325,11 +361,11 @@ static int generate_row_for_put(
return 0;
}
static int UU() nop(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra)) {
static int UU() nop(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) {
return 0;
}
static int UU() xmalloc_free_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra)) {
static int UU() xmalloc_free_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) {
size_t s = 256;
void *p = toku_xmalloc(s);
toku_free(p);
......@@ -337,7 +373,7 @@ static int UU() xmalloc_free_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation
}
#if DONT_DEPRECATE_MALLOC
static int UU() malloc_free_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra)) {
static int UU() malloc_free_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) {
size_t s = 256;
void *p = malloc(s);
free(p);
......@@ -345,7 +381,35 @@ static int UU() malloc_free_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_
}
#endif
static int UU() loader_op(DB_TXN* txn, ARG UU(arg), void* UU(operation_extra)) {
static int UU() random_put_op(DB_TXN *txn, ARG arg, void *UU(operation_extra), void *stats_extra) {
int r = 0;
//int db_index = myrandom_r(arg->random_data)%arg->num_DBs;
int db_index = arg->thread_idx%arg->num_DBs;
DB* db = arg->dbp[db_index];
char buf[100];
ZERO_ARRAY(buf);
uint64_t i;
for (i = 0; i < arg->txn_size; ++i) {
union {
uint64_t key;
uint16_t i[4];
} rand_key;
rand_key.key = randu64(arg->random_data);
rand_key.i[0] = arg->thread_idx;
DBT key, val;
dbt_init(&key, &rand_key, sizeof rand_key);
dbt_init(&val, buf, sizeof buf);
r = db->put(db, txn, &key, &val, 0);
if (r != 0) {
goto cleanup;
}
}
cleanup:
increment_counter(stats_extra, PUTS, i);
return r;
}
static int UU() loader_op(DB_TXN* txn, ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) {
DB_ENV* env = arg->env;
int r;
for (int num = 0; num < 2; num++) {
......@@ -377,7 +441,7 @@ static int UU() loader_op(DB_TXN* txn, ARG UU(arg), void* UU(operation_extra)) {
return 0;
}
static int UU() keyrange_op(DB_TXN *txn, ARG arg, void* UU(operation_extra)) {
static int UU() keyrange_op(DB_TXN *txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
int r;
// callback is designed to run on tests with one DB
// no particular reason why, just the way it was
......@@ -397,7 +461,7 @@ static int UU() keyrange_op(DB_TXN *txn, ARG arg, void* UU(operation_extra)) {
return r;
}
static int UU() verify_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra)) {
static int UU() verify_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) {
int r = 0;
for (int i = 0; i < arg->num_DBs; i++) {
DB* db = arg->dbp[i];
......@@ -407,7 +471,7 @@ static int UU() verify_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra
return r;
}
static int UU() scan_op(DB_TXN *txn, ARG UU(arg), void* operation_extra) {
static int UU() scan_op(DB_TXN *txn, ARG UU(arg), void* operation_extra, void *UU(stats_extra)) {
struct scan_op_extra* extra = operation_extra;
for (int i = 0; i < arg->num_DBs; i++) {
int r = scan_op_and_maybe_check_sum(arg->dbp[i], txn, extra, true);
......@@ -416,7 +480,7 @@ static int UU() scan_op(DB_TXN *txn, ARG UU(arg), void* operation_extra) {
return 0;
}
static int UU() scan_op_no_check(DB_TXN *txn, ARG arg, void* operation_extra) {
static int UU() scan_op_no_check(DB_TXN *txn, ARG arg, void* operation_extra, void *UU(stats_extra)) {
struct scan_op_extra* extra = operation_extra;
for (int i = 0; i < arg->num_DBs; i++) {
int r = scan_op_and_maybe_check_sum(arg->dbp[i], txn, extra, false);
......@@ -432,27 +496,27 @@ static int UU() ptquery_and_maybe_check_op(DB* db, DB_TXN *txn, ARG arg, BOOL ch
rand_key = rand_key % arg->num_elements;
}
DBT key, val;
memset(&val, 0, sizeof val);
dbt_init(&key, &rand_key, sizeof rand_key);
dbt_init(&val, NULL, 0);
r = db->get(db, txn, &key, &val, 0);
if (check) assert(r != DB_NOTFOUND);
r = 0;
return r;
}
static int UU() ptquery_op(DB_TXN *txn, ARG arg, void* UU(operation_extra)) {
static int UU() ptquery_op(DB_TXN *txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
int db_index = myrandom_r(arg->random_data)%arg->num_DBs;
DB* db = arg->dbp[db_index];
return ptquery_and_maybe_check_op(db, txn, arg, TRUE);
}
static int UU() ptquery_op_no_check(DB_TXN *txn, ARG arg, void* UU(operation_extra)) {
static int UU() ptquery_op_no_check(DB_TXN *txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
int db_index = myrandom_r(arg->random_data)%arg->num_DBs;
DB* db = arg->dbp[db_index];
return ptquery_and_maybe_check_op(db, txn, arg, FALSE);
}
static int UU() cursor_create_close_op(DB_TXN *txn, ARG arg, void* UU(operation_extra)) {
static int UU() cursor_create_close_op(DB_TXN *txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
int db_index = arg->num_DBs > 1 ? myrandom_r(arg->random_data)%arg->num_DBs : 0;
DB* db = arg->dbp[db_index];
DBC* cursor = NULL;
......@@ -485,7 +549,6 @@ struct update_op_extra {
struct update_op_args {
int *update_history_buffer;
u_int32_t update_txn_size;
int update_pad_frequency;
};
......@@ -493,7 +556,6 @@ static struct update_op_args UU() get_update_op_args(struct cli_args* cli_args,
struct update_op_args uoe;
uoe.update_history_buffer = update_history_buffer;
uoe.update_pad_frequency = cli_args->num_elements/100; // arbitrary
uoe.update_txn_size = cli_args->update_txn_size;
return uoe;
}
......@@ -532,13 +594,13 @@ static int update_op_callback(DB *UU(db), const DBT *UU(key),
DBT new_val;
u_int32_t data_size = sizeof(int) + e->pad_bytes;
char* data [data_size];
memset(data, 0, data_size);
ZERO_ARRAY(data);
memcpy(data, &new_int_val, sizeof(new_int_val));
set_val(dbt_init(&new_val, data, data_size), set_extra);
return 0;
}
static int UU()update_op2(DB_TXN* txn, ARG arg, void* operation_extra) {
static int UU()update_op2(DB_TXN* txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
int r;
int db_index = myrandom_r(arg->random_data)%arg->num_DBs;
DB* db = arg->dbp[db_index];
......@@ -546,13 +608,12 @@ static int UU()update_op2(DB_TXN* txn, ARG arg, void* operation_extra) {
DBT key, val;
int rand_key;
int rand_key2;
struct update_op_args* op_args = operation_extra;
update_count++;
struct update_op_extra extra;
memset(&extra, 0, sizeof(extra));
ZERO_STRUCT(extra);
extra.type = UPDATE_ADD_DIFF;
extra.pad_bytes = 0;
for (u_int32_t i = 0; i < op_args->update_txn_size; i++) {
for (u_int32_t i = 0; i < arg->txn_size; i++) {
rand_key = myrandom_r(arg->random_data);
if (arg->bounded_element_range) {
rand_key = rand_key % (arg->num_elements/2);
......@@ -586,7 +647,7 @@ static int UU()update_op2(DB_TXN* txn, ARG arg, void* operation_extra) {
return r;
}
static int UU()update_op(DB_TXN *txn, ARG arg, void* operation_extra) {
static int UU()update_op(DB_TXN *txn, ARG arg, void* operation_extra, void *UU(stats_extra)) {
int r;
int db_index = myrandom_r(arg->random_data)%arg->num_DBs;
DB* db = arg->dbp[db_index];
......@@ -596,7 +657,7 @@ static int UU()update_op(DB_TXN *txn, ARG arg, void* operation_extra) {
update_count++;
struct update_op_args* op_args = operation_extra;
struct update_op_extra extra;
memset(&extra, 0, sizeof(extra));
ZERO_STRUCT(extra);
extra.type = UPDATE_ADD_DIFF;
extra.pad_bytes = 0;
if (op_args->update_pad_frequency) {
......@@ -604,7 +665,7 @@ static int UU()update_op(DB_TXN *txn, ARG arg, void* operation_extra) {
extra.pad_bytes = 100;
}
}
for (u_int32_t i = 0; i < op_args->update_txn_size; i++) {
for (u_int32_t i = 0; i < arg->txn_size; i++) {
rand_key = myrandom_r(arg->random_data);
if (arg->bounded_element_range) {
rand_key = rand_key % arg->num_elements;
......@@ -648,7 +709,7 @@ static int UU()update_op(DB_TXN *txn, ARG arg, void* operation_extra) {
return r;
}
static int UU() update_with_history_op(DB_TXN *txn, ARG arg, void* operation_extra) {
static int UU() update_with_history_op(DB_TXN *txn, ARG arg, void* operation_extra, void *UU(stats_extra)) {
struct update_op_args* op_args = operation_extra;
assert(arg->bounded_element_range);
assert(op_args->update_history_buffer);
......@@ -659,7 +720,7 @@ static int UU() update_with_history_op(DB_TXN *txn, ARG arg, void* operation_ext
DBT key, val;
int rand_key;
struct update_op_extra extra;
memset(&extra, 0, sizeof(extra));
ZERO_STRUCT(extra);
extra.type = UPDATE_WITH_HISTORY;
update_count++;
extra.pad_bytes = 0;
......@@ -667,9 +728,8 @@ static int UU() update_with_history_op(DB_TXN *txn, ARG arg, void* operation_ext
if (update_count % (2*op_args->update_pad_frequency) != update_count%op_args->update_pad_frequency) {
extra.pad_bytes = 500;
}
}
for (u_int32_t i = 0; i < op_args->update_txn_size; i++) {
for (u_int32_t i = 0; i < arg->txn_size; i++) {
rand_key = myrandom_r(arg->random_data) % arg->num_elements;
extra.u.h.new = myrandom_r(arg->random_data) % MAX_RANDOM_VAL;
// just make every other value random
......@@ -714,9 +774,9 @@ static int UU() update_with_history_op(DB_TXN *txn, ARG arg, void* operation_ext
return r;
}
static int UU() update_broadcast_op(DB_TXN *txn, ARG arg, void* UU(operation_extra)) {
static int UU() update_broadcast_op(DB_TXN *txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
struct update_op_extra extra;
memset(&extra, 0, sizeof(extra));
ZERO_STRUCT(extra);
int db_index = myrandom_r(arg->random_data)%arg->num_DBs;
DB* db = arg->dbp[db_index];
extra.type = UPDATE_NEGATE;
......@@ -727,7 +787,7 @@ static int UU() update_broadcast_op(DB_TXN *txn, ARG arg, void* UU(operation_ext
return r;
}
static int UU() hot_op(DB_TXN *UU(txn), ARG UU(arg), void* UU(operation_extra)) {
static int UU() hot_op(DB_TXN *UU(txn), ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) {
int r;
for (int i = 0; i < arg->num_DBs; i++) {
DB* db = arg->dbp[i];
......@@ -737,14 +797,14 @@ static int UU() hot_op(DB_TXN *UU(txn), ARG UU(arg), void* UU(operation_extra))
return 0;
}
static int UU() remove_and_recreate_me(DB_TXN *UU(txn), ARG arg, void* UU(operation_extra)) {
static int UU() remove_and_recreate_me(DB_TXN *UU(txn), ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
int r;
int db_index = myrandom_r(arg->random_data)%arg->num_DBs;
DB* db = arg->dbp[db_index];
r = (db)->close(db, 0); CKERR(r);
char name[30];
memset(name, 0, sizeof(name));
ZERO_ARRAY(name);
snprintf(name, sizeof(name), "main%d", db_index);
r = arg->env->dbremove(arg->env, null_txn, name, NULL, 0);
......@@ -780,34 +840,28 @@ static void *test_time(void *arg) {
if (verbose) {
printf("Sleeping for %d seconds\n", num_seconds);
}
int64_t num_operations_completed_total[tte->num_wes];
memset(num_operations_completed_total, 0, sizeof num_operations_completed_total);
uint64_t last_counter_values[tte->num_wes][(int) NUM_OPERATION_TYPES];
ZERO_ARRAY(last_counter_values);
for (int i = 0; i < num_seconds; i += tte->performance_period) {
usleep(tte->performance_period*1000*1000);
int64_t total_operations_in_period = 0;
uint64_t period_totals[(int) NUM_OPERATION_TYPES];
ZERO_ARRAY(period_totals);
for (int we = 0; we < tte->num_wes; ++we) {
int64_t last = num_operations_completed_total[we];
int64_t current = __sync_fetch_and_add(&tte->wes[we].num_operations_completed, 0);
if (tte->print_thread_performance) {
printf("Thread %d Iteration %d Operations %"PRId64"\n", we, i, current - last);
for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
int64_t last = last_counter_values[we][op];
int64_t current = tte->wes[we].counters[op];
if (tte->print_thread_performance) {
printf("Thread %d Iteration %d Operations %"PRId64"\n", we, i, current - last);
}
period_totals[op] += (current - last);
last_counter_values[we][op] = current;
}
total_operations_in_period += (current - last);
num_operations_completed_total[we] = current;
}
if (tte->print_performance) {
printf("Iteration %d Total_Operations %"PRId64"\n", i, total_operations_in_period);
}
}
int64_t total_operations_in_test = 0;
for (int we = 0; we < tte->num_wes; ++we) {
int64_t current = __sync_fetch_and_add(&tte->wes[we].num_operations_completed, 0);
if (tte->print_thread_performance) {
printf("TOTAL Thread %d Operations %"PRId64"\n", we, current);
for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
printf("Iteration %d Total_Operations %"PRId64"\n", i, period_totals[op]);
}
}
total_operations_in_test += current;
}
if (tte->print_performance) {
printf("Total_Operations %"PRId64"\n", total_operations_in_test);
}
if (verbose) {
......@@ -850,10 +904,12 @@ static int run_workers(
tte.performance_period = cli_args->performance_period;
run_test = true;
for (int i = 0; i < num_threads; ++i) {
thread_args[i].thread_idx = i;
thread_args[i].num_threads = num_threads;
worker_extra[i].thread_arg = &thread_args[i];
worker_extra[i].operation_lock = &rwlock;
worker_extra[i].operation_lock_mutex = &mutex;
worker_extra[i].num_operations_completed = 0;
XCALLOC_N((int) NUM_OPERATION_TYPES, worker_extra[i].counters);
CHK(toku_pthread_create(&tids[i], NULL, worker, &worker_extra[i]));
if (verbose)
printf("%lu created\n", (unsigned long) tids[i]);
......@@ -870,6 +926,27 @@ static int run_workers(
if (verbose)
printf("%lu joined\n", (unsigned long) tids[i]);
}
uint64_t overall_totals[(int) NUM_OPERATION_TYPES];
ZERO_ARRAY(overall_totals);
for (int we = 0; we < num_threads; ++we) {
for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
uint64_t current = worker_extra[we].counters[op];
if (cli_args->print_thread_performance) {
printf("TOTAL Thread %d Operations %"PRId64"\n", we, current);
}
overall_totals[op] += current;
}
}
if (cli_args->print_performance) {
for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
printf("Total_Operations %"PRId64"\n", overall_totals[op]);
}
}
for (int i = 0; i < num_threads; ++i) {
toku_free(worker_extra[i].counters);
}
if (verbose)
printf("ending test, pthreads have joined\n");
rwlock_destroy(&rwlock);
......@@ -1060,11 +1137,12 @@ static struct cli_args UU() get_default_args(void) {
.do_test_and_crash = false,
.do_recover = false,
.num_update_threads = 1,
.num_put_threads = 1,
.crash_on_update_failure = true,
.print_performance = false,
.print_thread_performance = false,
.performance_period = 1,
.update_txn_size = 1000,
.txn_size = 1000,
.key_size = MIN_KEY_SIZE,
.val_size = MIN_VAL_SIZE,
.env_args = DEFAULT_ENV_ARGS,
......@@ -1109,7 +1187,8 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct
fprintf(stderr, "\t--update_broadcast_period INT (default %dms)\n", default_args.update_broadcast_period_ms);
fprintf(stderr, "\t--num_ptquery_threads INT (default %d threads)\n", default_args.num_ptquery_threads);
fprintf(stderr, "\t--num_update_threads INT (default %d threads)\n", default_args.num_update_threads);
fprintf(stderr, "\t--update_txn_size INT (default %d rows)\n", default_args.update_txn_size);
fprintf(stderr, "\t--num_put_threads INT (default %d threads)\n", default_args.num_put_threads);
fprintf(stderr, "\t--txn_size INT (default %d rows)\n", default_args.txn_size);
fprintf(stderr, "\t--key_size INT (default %d, minimum %ld)\n", default_args.key_size, MIN_KEY_SIZE);
fprintf(stderr, "\t--val_size INT (default %d, minimum %ld)\n", default_args.val_size, MIN_VAL_SIZE);
fprintf(stderr, "\t--[no-]crash_on_update_failure BOOL (default %s)\n", default_args.crash_on_update_failure ? "yes" : "no");
......@@ -1168,6 +1247,10 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct
argc--; argv++;
args->num_update_threads = atoi(argv[1]);
}
else if (strcmp(argv[1], "--num_put_threads") == 0 && argc > 1) {
argc--; argv++;
args->num_put_threads = atoi(argv[1]);
}
else if (strcmp(argv[1], "--crash_on_update_failure") == 0 && argc > 1) {
args->crash_on_update_failure = true;
}
......@@ -1184,9 +1267,9 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct
argc--; argv++;
args->performance_period = atoi(argv[1]);
}
else if (strcmp(argv[1], "--update_txn_size") == 0 && argc > 1) {
else if (strcmp(argv[1], "--txn_size") == 0 && argc > 1) {
argc--; argv++;
args->update_txn_size = atoi(argv[1]);
args->txn_size = atoi(argv[1]);
}
else if (strcmp(argv[1], "--key_size") == 0 && argc > 1) {
argc--; argv++;
......@@ -1264,7 +1347,7 @@ do_warm_cache(DB_ENV *env, DB **dbs, struct cli_args *args)
scan_arg.lock_type = STRESS_LOCK_NONE;
DB_TXN* txn = NULL;
int r = env->txn_begin(env, 0, &txn, 0); CKERR(r);
scan_op_no_check(txn, &scan_arg, &soe);
scan_op_no_check(txn, &scan_arg, &soe, NULL);
r = txn->commit(txn,0); CKERR(r);
}
......@@ -1319,7 +1402,7 @@ UU() stress_recover(struct cli_args *args) {
soe.fast = TRUE;
soe.fwd = TRUE;
soe.prefetch = FALSE;
r = scan_op(txn, &recover_args, &soe);
r = scan_op(txn, &recover_args, &soe, NULL);
CKERR(r);
CHK(txn->commit(txn,0));
CHK(close_tables(env, dbs, args->num_DBs));
......
......@@ -70,6 +70,11 @@ size_t toku_malloc_usable_size(void *p) __attribute__((__visibility__("default")
#define XCALLOC(v) XCALLOC_N(1,(v))
#define XREALLOC_N(n,v) v = cast_to_typeof(v) toku_xrealloc(v, (n)*sizeof(*v))
// ZERO_ARRAY writes zeroes to a stack-allocated array
#define ZERO_ARRAY(o) do { memset((o), 0, sizeof (o)); } while (0)
// ZERO_STRUCT writes zeroes to a stack-allocated struct
#define ZERO_STRUCT(o) do { memset(&(o), 0, sizeof (o)); } while (0)
/* Copy memory. Analogous to strdup() */
void *toku_memdup (const void *v, size_t len);
/* Toku-version of strdup. Use this so that it calls toku_malloc() */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment