Commit ad0121f9 authored by Leif Walsh's avatar Leif Walsh Committed by Yoni Fogel

[t:4144] add command line flags --num_update_threads, --update_txn_size,

and --no-crash_on_update_failure


git-svn-id: file:///svn/toku/tokudb@36985 c7de825b-a66e-492c-adef-691d508d4ae1
parent 94500089
...@@ -50,10 +50,10 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -50,10 +50,10 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
// //
if (verbose) printf("starting creation of pthreads\n"); if (verbose) printf("starting creation of pthreads\n");
const int num_threads = 5 + cli_args->num_ptquery_threads; const int num_threads = 4 + cli_args->num_update_threads + cli_args->num_ptquery_threads;
struct arg myargs[num_threads]; struct arg myargs[num_threads];
for (int i = 0; i < num_threads; i++) { for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], n, dbp, env); arg_init(&myargs[i], n, dbp, env, cli_args);
} }
// make the forward fast scanner // make the forward fast scanner
...@@ -77,10 +77,12 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -77,10 +77,12 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
myargs[3].operation = scan_op; myargs[3].operation = scan_op;
// make the guy that updates the db // make the guy that updates the db
myargs[4].operation = update_op; for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) {
myargs[i].operation = update_op;
}
// make the guy that does point queries // make the guy that does point queries
for (int i = 5; i < num_threads; i++) { for (int i = 4 + cli_args->num_update_threads; i < num_threads; i++) {
myargs[i].operation = ptquery_op; myargs[i].operation = ptquery_op;
} }
......
...@@ -20,14 +20,14 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -20,14 +20,14 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
int n = cli_args->num_elements; int n = cli_args->num_elements;
if (verbose) printf("starting creation of pthreads\n"); if (verbose) printf("starting creation of pthreads\n");
const int num_threads = 1; const int num_threads = cli_args->num_update_threads;
struct arg myargs[num_threads]; struct arg myargs[num_threads];
for (int i = 0; i < num_threads; i++) { for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], n, dbp, env); arg_init(&myargs[i], n, dbp, env);
// make the guy that updates the db
myargs[i].operation = update_op;
} }
// make the guy that updates the db
myargs[0].operation = update_op;
int num_seconds = random() % cli_args->time_of_test; int num_seconds = random() % cli_args->time_of_test;
run_workers(myargs, num_threads, num_seconds, true); run_workers(myargs, num_threads, num_seconds, true);
......
...@@ -43,17 +43,17 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -43,17 +43,17 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
// //
// the threads that we want: // the threads that we want:
// - one thread constantly updating random values // - some threads constantly updating random values
// - one thread doing table scan with bulk fetch // - one thread doing table scan with bulk fetch
// - one thread doing table scan without bulk fetch // - one thread doing table scan without bulk fetch
// - one thread doing random point queries // - some threads doing random point queries
// //
if (verbose) printf("starting creation of pthreads\n"); if (verbose) printf("starting creation of pthreads\n");
const int num_threads = 5 + cli_args->num_ptquery_threads; const int num_threads = 4 + cli_args->num_update_threads + cli_args->num_ptquery_threads;
struct arg myargs[num_threads]; struct arg myargs[num_threads];
for (int i = 0; i < num_threads; i++) { for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], n, dbp, env); arg_init(&myargs[i], n, dbp, env, cli_args);
} }
// make the forward fast scanner // make the forward fast scanner
...@@ -77,10 +77,12 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -77,10 +77,12 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
myargs[3].operation = scan_op; myargs[3].operation = scan_op;
// make the guy that updates the db // make the guy that updates the db
myargs[4].operation = update_op; for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) {
myargs[i].operation = update_op;
}
// make the guy that does point queries // make the guy that does point queries
for (int i = 5; i < num_threads; i++) { for (int i = 4 + cli_args->num_update_threads; i < num_threads; i++) {
myargs[i].operation = ptquery_op; myargs[i].operation = ptquery_op;
} }
......
...@@ -44,10 +44,10 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -44,10 +44,10 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
// //
if (verbose) printf("starting creation of pthreads\n"); if (verbose) printf("starting creation of pthreads\n");
const int num_threads = 5 + cli_args->num_ptquery_threads; const int num_threads = 4 + cli_args->num_update_threads + cli_args->num_ptquery_threads;
struct arg myargs[num_threads]; struct arg myargs[num_threads];
for (int i = 0; i < num_threads; i++) { for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], n, dbp, env); arg_init(&myargs[i], n, dbp, env, cli_args);
} }
// make the forward fast scanner // make the forward fast scanner
...@@ -71,11 +71,13 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -71,11 +71,13 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
myargs[3].operation = scan_op; myargs[3].operation = scan_op;
// make the guy that updates the db // make the guy that updates the db
myargs[4].bounded_update_range = false; for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) {
myargs[4].operation = update_op; myargs[i].bounded_update_range = false;
myargs[i].operation = update_op;
}
// make the guy that does point queries // make the guy that does point queries
for (int i = 5; i < num_threads; i++) { for (int i = 4 + cli_args->num_update_threads; i < num_threads; i++) {
myargs[i].bounded_update_range = false; myargs[i].bounded_update_range = false;
myargs[i].operation = ptquery_op_no_check; myargs[i].operation = ptquery_op_no_check;
} }
......
...@@ -42,10 +42,10 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -42,10 +42,10 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
// - one thread doing random point queries // - one thread doing random point queries
// //
if (verbose) printf("starting creation of pthreads\n"); if (verbose) printf("starting creation of pthreads\n");
const int num_threads = 6 + cli_args->num_ptquery_threads; const int num_threads = 5 + cli_args->num_update_threads + cli_args->num_ptquery_threads;
struct arg myargs[num_threads]; struct arg myargs[num_threads];
for (int i = 0; i < num_threads; i++) { for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], n, dbp, env); arg_init(&myargs[i], n, dbp, env, cli_args);
} }
// make the forward fast scanner // make the forward fast scanner
...@@ -69,16 +69,18 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -69,16 +69,18 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
myargs[3].operation = scan_op; myargs[3].operation = scan_op;
// make the guy that updates the db // make the guy that updates the db
myargs[4].lock_type = STRESS_LOCK_SHARED; for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) {
myargs[4].operation = update_op; myargs[i].lock_type = STRESS_LOCK_SHARED;
myargs[i].operation = update_op;
}
// make the guy that sends update broadcasts // make the guy that sends update broadcasts
myargs[5].lock_type = STRESS_LOCK_EXCL; myargs[4 + cli_args->num_update_threads].lock_type = STRESS_LOCK_EXCL;
myargs[5].sleep_ms = cli_args->update_broadcast_period_ms; myargs[4 + cli_args->num_update_threads].sleep_ms = cli_args->update_broadcast_period_ms;
myargs[5].operation = update_broadcast_op; myargs[4 + cli_args->num_update_threads].operation = update_broadcast_op;
// make the guys that do point queries // make the guys that do point queries
for (int i = 6; i < num_threads; i++) { for (int i = 5 + cli_args->num_update_threads; i < num_threads; i++) {
myargs[i].operation = ptquery_op; myargs[i].operation = ptquery_op;
} }
......
...@@ -42,10 +42,10 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -42,10 +42,10 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
// - one thread doing random point queries // - one thread doing random point queries
// //
if (verbose) printf("starting creation of pthreads\n"); if (verbose) printf("starting creation of pthreads\n");
const int num_threads = 5 + cli_args->num_ptquery_threads; const int num_threads = 4 + cli_args->num_update_threads + cli_args->num_ptquery_threads;
struct arg myargs[num_threads]; struct arg myargs[num_threads];
for (int i = 0; i < num_threads; i++) { for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], n, dbp, env); arg_init(&myargs[i], n, dbp, env, cli_args);
} }
// make the forward fast scanner // make the forward fast scanner
...@@ -69,18 +69,22 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -69,18 +69,22 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
myargs[3].operation = scan_op_no_check; myargs[3].operation = scan_op_no_check;
// make the guy that updates the db // make the guy that updates the db
myargs[4].update_history_buffer = toku_xmalloc(n * (sizeof myargs[4].update_history_buffer[0])); for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) {
memset(myargs[4].update_history_buffer, 0, n * (sizeof myargs[4].update_history_buffer[0])); myargs[i].update_history_buffer = toku_xmalloc(n * (sizeof myargs[i].update_history_buffer[0]));
myargs[4].operation = update_with_history_op; memset(myargs[i].update_history_buffer, 0, n * (sizeof myargs[i].update_history_buffer[0]));
myargs[i].operation = update_with_history_op;
}
// make the guys that do point queries // make the guys that do point queries
for (int i = 5; i < num_threads; i++) { for (int i = 4 + cli_args->num_update_threads; i < num_threads; i++) {
myargs[i].operation = ptquery_op; myargs[i].operation = ptquery_op;
} }
run_workers(myargs, num_threads, cli_args->time_of_test, false); run_workers(myargs, num_threads, cli_args->time_of_test, false);
toku_free(myargs[4].update_history_buffer); for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) {
toku_free(myargs[i].update_history_buffer);
}
} }
int int
......
...@@ -24,10 +24,10 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -24,10 +24,10 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
// //
if (verbose) printf("starting creation of pthreads\n"); if (verbose) printf("starting creation of pthreads\n");
const int num_threads = 5 + cli_args->num_ptquery_threads; const int num_threads = 4 + cli_args->num_update_threads + cli_args->num_ptquery_threads;
struct arg myargs[num_threads]; struct arg myargs[num_threads];
for (int i = 0; i < num_threads; i++) { for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], n, dbp, env); arg_init(&myargs[i], n, dbp, env, cli_args);
} }
// make the forward fast scanner // make the forward fast scanner
myargs[0].fast = TRUE; myargs[0].fast = TRUE;
...@@ -40,12 +40,15 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -40,12 +40,15 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
myargs[1].operation = scan_op; myargs[1].operation = scan_op;
// make the guy that updates the db // make the guy that updates the db
myargs[2].operation = update_op; myargs[2].operation = loader_op;
myargs[3].operation = loader_op; myargs[3].operation = keyrange_op;
myargs[4].operation = keyrange_op;
for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) {
myargs[i].operation = update_op;
}
// make the guy that does point queries // make the guy that does point queries
for (int i = 5; i < num_threads; i++) { for (int i = 4 + cli_args->num_update_threads; i < num_threads; i++) {
myargs[i].operation = ptquery_op; myargs[i].operation = ptquery_op;
} }
run_workers(myargs, num_threads, cli_args->time_of_test, false); run_workers(myargs, num_threads, cli_args->time_of_test, false);
......
...@@ -44,10 +44,10 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -44,10 +44,10 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
// //
if (verbose) printf("starting creation of pthreads\n"); if (verbose) printf("starting creation of pthreads\n");
const int num_threads = 7 + cli_args->num_ptquery_threads; const int num_threads = 6 + cli_args->num_update_threads + cli_args->num_ptquery_threads;
struct arg myargs[num_threads]; struct arg myargs[num_threads];
for (int i = 0; i < num_threads; i++) { for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], n, dbp, env); arg_init(&myargs[i], n, dbp, env, cli_args);
} }
// make the forward fast scanner // make the forward fast scanner
...@@ -74,22 +74,24 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -74,22 +74,24 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
myargs[3].lock_type = STRESS_LOCK_SHARED; myargs[3].lock_type = STRESS_LOCK_SHARED;
myargs[3].operation = scan_op; myargs[3].operation = scan_op;
// make the guy that updates the db
myargs[4].bounded_update_range = false;
myargs[4].lock_type = STRESS_LOCK_SHARED;
myargs[4].operation = update_op;
// make the guy that removes and recreates the db // make the guy that removes and recreates the db
myargs[4].lock_type = STRESS_LOCK_EXCL;
myargs[4].sleep_ms = 2000; // maybe make this a runtime param at some point
myargs[4].operation = remove_and_recreate_me;
myargs[5].lock_type = STRESS_LOCK_EXCL; myargs[5].lock_type = STRESS_LOCK_EXCL;
myargs[5].sleep_ms = 2000; // maybe make this a runtime param at some point myargs[5].sleep_ms = 2000; // maybe make this a runtime param at some point
myargs[5].operation = remove_and_recreate_me; myargs[5].operation = truncate_me;
myargs[6].lock_type = STRESS_LOCK_EXCL; // make the guy that updates the db
myargs[6].sleep_ms = 2000; // maybe make this a runtime param at some point for (int i = 6; i < 6 + cli_args->num_update_threads; ++i) {
myargs[6].operation = truncate_me; myargs[i].bounded_update_range = false;
myargs[i].lock_type = STRESS_LOCK_SHARED;
myargs[i].operation = update_op;
}
// make the guy that does point queries // make the guy that does point queries
for (int i = 7; i < num_threads; i++) { for (int i = 6 + cli_args->num_update_threads; i < num_threads; i++) {
myargs[i].lock_type = STRESS_LOCK_SHARED; myargs[i].lock_type = STRESS_LOCK_SHARED;
myargs[i].bounded_update_range = false; myargs[i].bounded_update_range = false;
myargs[i].operation = ptquery_op_no_check; myargs[i].operation = ptquery_op_no_check;
......
...@@ -49,11 +49,35 @@ struct arg { ...@@ -49,11 +49,35 @@ struct arg {
toku_pthread_mutex_t *broadcast_lock_mutex; toku_pthread_mutex_t *broadcast_lock_mutex;
struct rwlock *broadcast_lock; struct rwlock *broadcast_lock;
int update_pad_frequency; int update_pad_frequency;
bool crash_on_update_failure;
u_int32_t update_txn_size;
};
struct cli_args {
int num_elements;
int time_of_test;
int node_size;
int basement_node_size;
u_int64_t cachetable_size;
bool only_create;
bool only_stress;
int checkpointing_period;
int cleaner_period;
int cleaner_iterations;
int update_broadcast_period_ms;
int num_ptquery_threads;
test_update_callback_f update_function;
bool do_test_and_crash;
bool do_recover;
char *envdir;
int num_update_threads;
bool crash_on_update_failure;
u_int32_t update_txn_size;
}; };
DB_TXN * const null_txn = 0; DB_TXN * const null_txn = 0;
static void arg_init(struct arg *arg, int n, DB **dbp, DB_ENV *env) { static void arg_init(struct arg *arg, int n, DB **dbp, DB_ENV *env, struct cli_args *cli_args) {
arg->n = n; arg->n = n;
arg->dbp = dbp; arg->dbp = dbp;
arg->env = env; arg->env = env;
...@@ -66,6 +90,8 @@ static void arg_init(struct arg *arg, int n, DB **dbp, DB_ENV *env) { ...@@ -66,6 +90,8 @@ static void arg_init(struct arg *arg, int n, DB **dbp, DB_ENV *env) {
arg->txn_type = DB_TXN_SNAPSHOT; arg->txn_type = DB_TXN_SNAPSHOT;
arg->update_history_buffer = NULL; arg->update_history_buffer = NULL;
arg->update_pad_frequency = n/100; // bit arbitrary. Just want dictionary to grow and shrink so splits and merges occur arg->update_pad_frequency = n/100; // bit arbitrary. Just want dictionary to grow and shrink so splits and merges occur
arg->crash_on_update_failure = cli_args->crash_on_update_failure;
arg->update_txn_size = cli_args->update_txn_size;
} }
static void *worker(void *arg_v) { static void *worker(void *arg_v) {
...@@ -89,8 +115,16 @@ static void *worker(void *arg_v) { ...@@ -89,8 +115,16 @@ static void *worker(void *arg_v) {
} }
int r = env->txn_begin(env, 0, &txn, arg->txn_type); CKERR(r); int r = env->txn_begin(env, 0, &txn, arg->txn_type); CKERR(r);
r = arg->operation(env, dbp, txn, arg); CKERR(r); r = arg->operation(env, dbp, txn, arg);
if (r == 0) {
CHK(txn->commit(txn,0)); CHK(txn->commit(txn,0));
} else {
if (arg->crash_on_update_failure) {
CKERR(r);
} else {
CHK(txn->abort(txn));
}
}
toku_pthread_mutex_lock(arg->broadcast_lock_mutex); toku_pthread_mutex_lock(arg->broadcast_lock_mutex);
if (arg->lock_type == STRESS_LOCK_SHARED) { if (arg->lock_type == STRESS_LOCK_SHARED) {
...@@ -337,7 +371,7 @@ static int UU()update_op2(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) { ...@@ -337,7 +371,7 @@ static int UU()update_op2(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) {
memset(&extra, 0, sizeof(extra)); memset(&extra, 0, sizeof(extra));
extra.type = UPDATE_ADD_DIFF; extra.type = UPDATE_ADD_DIFF;
extra.pad_bytes = 0; extra.pad_bytes = 0;
for (u_int32_t i = 0; i < 500; i++) { for (u_int32_t i = 0; i < arg->update_txn_size; i++) {
rand_key = random(); rand_key = random();
if (arg->bounded_update_range) { if (arg->bounded_update_range) {
rand_key = rand_key % (arg->n/2); rand_key = rand_key % (arg->n/2);
...@@ -353,6 +387,9 @@ static int UU()update_op2(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) { ...@@ -353,6 +387,9 @@ static int UU()update_op2(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) {
dbt_init(&val, &extra, sizeof extra), dbt_init(&val, &extra, sizeof extra),
0 0
); );
if (r != 0 && !arg->crash_on_update_failure) {
return r;
}
CKERR(r); CKERR(r);
extra.u.d.diff = -1; extra.u.d.diff = -1;
r = db->update( r = db->update(
...@@ -362,6 +399,9 @@ static int UU()update_op2(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) { ...@@ -362,6 +399,9 @@ static int UU()update_op2(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) {
dbt_init(&val, &extra, sizeof extra), dbt_init(&val, &extra, sizeof extra),
0 0
); );
if (r != 0 && !arg->crash_on_update_failure) {
return r;
}
CKERR(r); CKERR(r);
} }
return r; return r;
...@@ -382,9 +422,8 @@ static int UU()update_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) { ...@@ -382,9 +422,8 @@ static int UU()update_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) {
if (update_count % (2*arg->update_pad_frequency) == update_count%arg->update_pad_frequency) { if (update_count % (2*arg->update_pad_frequency) == update_count%arg->update_pad_frequency) {
extra.pad_bytes = 100; extra.pad_bytes = 100;
} }
} }
for (u_int32_t i = 0; i < 1000; i++) { for (u_int32_t i = 0; i < arg->update_txn_size; i++) {
rand_key = random(); rand_key = random();
if (arg->bounded_update_range) { if (arg->bounded_update_range) {
rand_key = rand_key % arg->n; rand_key = rand_key % arg->n;
...@@ -402,6 +441,9 @@ static int UU()update_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) { ...@@ -402,6 +441,9 @@ static int UU()update_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) {
dbt_init(&val, &extra, sizeof extra), dbt_init(&val, &extra, sizeof extra),
0 0
); );
if (r != 0 && !arg->crash_on_update_failure) {
return r;
}
CKERR(r); CKERR(r);
} }
// //
...@@ -419,6 +461,9 @@ static int UU()update_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) { ...@@ -419,6 +461,9 @@ static int UU()update_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) {
dbt_init(&val, &extra, sizeof extra), dbt_init(&val, &extra, sizeof extra),
0 0
); );
if (r != 0 && !arg->crash_on_update_failure) {
return r;
}
CKERR(r); CKERR(r);
return r; return r;
...@@ -443,7 +488,7 @@ static int UU() update_with_history_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, A ...@@ -443,7 +488,7 @@ static int UU() update_with_history_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, A
} }
} }
for (u_int32_t i = 0; i < 1000; i++) { for (u_int32_t i = 0; i < arg->update_txn_size; i++) {
rand_key = random() % arg->n; rand_key = random() % arg->n;
extra.u.h.new = random() % MAX_RANDOM_VAL; extra.u.h.new = random() % MAX_RANDOM_VAL;
// just make every other value random // just make every other value random
...@@ -460,6 +505,9 @@ static int UU() update_with_history_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, A ...@@ -460,6 +505,9 @@ static int UU() update_with_history_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, A
dbt_init(&val, &extra, sizeof extra), dbt_init(&val, &extra, sizeof extra),
0 0
); );
if (r != 0 && !arg->crash_on_update_failure) {
return r;
}
CKERR(r); CKERR(r);
} }
// //
...@@ -479,6 +527,9 @@ static int UU() update_with_history_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, A ...@@ -479,6 +527,9 @@ static int UU() update_with_history_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, A
dbt_init(&val, &extra, sizeof extra), dbt_init(&val, &extra, sizeof extra),
0 0
); );
if (r != 0 && !arg->crash_on_update_failure) {
return r;
}
CKERR(r); CKERR(r);
return r; return r;
...@@ -727,25 +778,6 @@ static int close_table(DB_ENV *env, DB *db) { ...@@ -727,25 +778,6 @@ static int close_table(DB_ENV *env, DB *db) {
return r; return r;
} }
struct cli_args {
int num_elements;
int time_of_test;
int node_size;
int basement_node_size;
u_int64_t cachetable_size;
bool only_create;
bool only_stress;
int checkpointing_period;
int cleaner_period;
int cleaner_iterations;
int update_broadcast_period_ms;
int num_ptquery_threads;
test_update_callback_f update_function;
bool do_test_and_crash;
bool do_recover;
char *envdir;
};
static const struct cli_args DEFAULT_ARGS = { static const struct cli_args DEFAULT_ARGS = {
.num_elements = 150000, .num_elements = 150000,
.time_of_test = 180, .time_of_test = 180,
...@@ -763,6 +795,9 @@ static const struct cli_args DEFAULT_ARGS = { ...@@ -763,6 +795,9 @@ static const struct cli_args DEFAULT_ARGS = {
.do_test_and_crash = false, .do_test_and_crash = false,
.do_recover = false, .do_recover = false,
.envdir = ENVDIR, .envdir = ENVDIR,
.num_update_threads = 1,
.crash_on_update_failure = true,
.update_txn_size = 1000,
}; };
static inline void parse_stress_test_args (int argc, char *const argv[], struct cli_args *args) { static inline void parse_stress_test_args (int argc, char *const argv[], struct cli_args *args) {
...@@ -789,6 +824,9 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct ...@@ -789,6 +824,9 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct
fprintf(stderr, "\t--cleaner_iterations INT (default %ds)\n", DEFAULT_ARGS.cleaner_iterations); fprintf(stderr, "\t--cleaner_iterations INT (default %ds)\n", DEFAULT_ARGS.cleaner_iterations);
fprintf(stderr, "\t--update_broadcast_period INT (default %dms)\n", DEFAULT_ARGS.update_broadcast_period_ms); fprintf(stderr, "\t--update_broadcast_period INT (default %dms)\n", DEFAULT_ARGS.update_broadcast_period_ms);
fprintf(stderr, "\t--num_ptquery_threads INT (default %d threads)\n", DEFAULT_ARGS.num_ptquery_threads); fprintf(stderr, "\t--num_ptquery_threads INT (default %d threads)\n", DEFAULT_ARGS.num_ptquery_threads);
fprintf(stderr, "\t--num_update_threads INT (default %d threads)\n", DEFAULT_ARGS.num_update_threads);
fprintf(stderr, "\t--update_txn_size INT (default %d rows)\n", DEFAULT_ARGS.update_txn_size);
fprintf(stderr, "\t--[no-]crash_on_update_failure BOOL (default %s)\n", DEFAULT_ARGS.crash_on_update_failure ? "yes" : "no");
exit(resultcode); exit(resultcode);
} }
else if (strcmp(argv[1], "--num_elements") == 0) { else if (strcmp(argv[1], "--num_elements") == 0) {
...@@ -831,6 +869,20 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct ...@@ -831,6 +869,20 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct
argc--; argv++; argc--; argv++;
args->num_ptquery_threads = atoi(argv[1]); args->num_ptquery_threads = atoi(argv[1]);
} }
else if (strcmp(argv[1], "--num_update_threads") == 0) {
argc--; argv++;
args->num_update_threads = atoi(argv[1]);
}
else if (strcmp(argv[1], "--crash_on_update_failure") == 0) {
args->crash_on_update_failure = true;
}
else if (strcmp(argv[1], "--no-crash_on_update_failure") == 0) {
args->crash_on_update_failure = false;
}
else if (strcmp(argv[1], "--update_txn_size") == 0) {
argc--; argv++;
args->update_txn_size = atoi(argv[1]);
}
else if (strcmp(argv[1], "--only_create") == 0) { else if (strcmp(argv[1], "--only_create") == 0) {
args->only_create = true; args->only_create = true;
} }
...@@ -913,7 +965,7 @@ UU() stress_recover(struct cli_args *args) { ...@@ -913,7 +965,7 @@ UU() stress_recover(struct cli_args *args) {
DB_TXN* txn = NULL; DB_TXN* txn = NULL;
struct arg recover_args; struct arg recover_args;
arg_init(&recover_args, args->num_elements, &db, env); arg_init(&recover_args, args->num_elements, &db, env, args);
int r = env->txn_begin(env, 0, &txn, recover_args.txn_type); int r = env->txn_begin(env, 0, &txn, recover_args.txn_type);
CKERR(r); CKERR(r);
r = scan_op_and_maybe_check_sum(env, &db, txn, &recover_args, true); r = scan_op_and_maybe_check_sum(env, &db, txn, &recover_args, true);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment