Commit 45d038fa authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:4427], merge changes to main

git-svn-id: file:///svn/toku/tokudb@39258 c7de825b-a66e-492c-adef-691d508d4ae1
parent 2f43b601
...@@ -63,21 +63,23 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -63,21 +63,23 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
} }
// make the guy that updates the db // make the guy that updates the db
struct update_op_args uoe = get_update_op_args(cli_args, NULL);
myargs[0].operation_extra = &uoe;
myargs[0].operation = update_op; myargs[0].operation = update_op;
//myargs[0].update_pad_frequency = 0; //myargs[0].update_pad_frequency = 0;
db_env_set_flusher_thread_callback(ft_callback, env); db_env_set_flusher_thread_callback(ft_callback, env);
run_workers(myargs, num_threads, cli_args->time_of_test, true); run_workers(myargs, num_threads, cli_args->time_of_test, true, cli_args);
} }
static int static int
run_recover_ft_test(int argc, char *const argv[]) { run_recover_ft_test(int argc, char *const argv[]) {
struct cli_args args = DEFAULT_ARGS; struct cli_args args = get_default_args();
// make test time arbitrarily high because we expect a crash // make test time arbitrarily high because we expect a crash
args.time_of_test = 1000000000; args.time_of_test = 1000000000;
args.num_elements = 2000; args.num_elements = 2000;
// we want to induce a checkpoint // we want to induce a checkpoint
args.checkpointing_period = 0; args.env_args.checkpointing_period = 0;
parse_stress_test_args(argc, argv, &args); parse_stress_test_args(argc, argv, &args);
if (args.do_test_and_crash) { if (args.do_test_and_crash) {
stress_test_main(&args); stress_test_main(&args);
......
...@@ -55,29 +55,40 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -55,29 +55,40 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
for (int i = 0; i < num_threads; i++) { for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], n, dbp, env, cli_args); arg_init(&myargs[i], n, dbp, env, cli_args);
} }
struct scan_op_extra soe[4];
// make the forward fast scanner // make the forward fast scanner
myargs[0].fast = TRUE; soe[0].fast = TRUE;
myargs[0].fwd = TRUE; soe[0].fwd = TRUE;
soe[0].prefetch = FALSE;
myargs[0].operation_extra = &soe[0];
myargs[0].operation = scan_op; myargs[0].operation = scan_op;
// make the forward slow scanner // make the forward slow scanner
myargs[1].fast = FALSE; soe[1].fast = FALSE;
myargs[1].fwd = TRUE; soe[1].fwd = TRUE;
soe[1].prefetch = FALSE;
myargs[1].operation_extra = &soe[1];
myargs[1].operation = scan_op; myargs[1].operation = scan_op;
// make the backward fast scanner // make the backward fast scanner
myargs[2].fast = TRUE; soe[2].fast = TRUE;
myargs[2].fwd = FALSE; soe[2].fwd = FALSE;
soe[2].prefetch = FALSE;
myargs[2].operation_extra = &soe[2];
myargs[2].operation = scan_op; myargs[2].operation = scan_op;
// make the backward slow scanner // make the backward slow scanner
myargs[3].fast = FALSE; soe[3].fast = FALSE;
myargs[3].fwd = FALSE; soe[3].fwd = FALSE;
soe[3].prefetch = FALSE;
myargs[3].operation_extra = &soe[3];
myargs[3].operation = scan_op; myargs[3].operation = scan_op;
struct update_op_args uoe = get_update_op_args(cli_args, NULL);
// make the guy that updates the db // make the guy that updates the db
for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) { for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) {
myargs[i].operation_extra = &uoe;
myargs[i].operation = update_op; myargs[i].operation = update_op;
} }
...@@ -87,13 +98,13 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -87,13 +98,13 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
} }
int num_seconds = random() % cli_args->time_of_test; int num_seconds = random() % cli_args->time_of_test;
run_workers(myargs, num_threads, num_seconds, true); run_workers(myargs, num_threads, num_seconds, true, cli_args);
} }
int int
test_main(int argc, char *const argv[]) { test_main(int argc, char *const argv[]) {
struct cli_args args = DEFAULT_ARGS; struct cli_args args = get_default_args();
args.checkpointing_period = 1; args.env_args.checkpointing_period = 1;
parse_stress_test_args(argc, argv, &args); parse_stress_test_args(argc, argv, &args);
if (args.do_test_and_crash) { if (args.do_test_and_crash) {
stress_test_main(&args); stress_test_main(&args);
......
...@@ -22,21 +22,23 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -22,21 +22,23 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
if (verbose) printf("starting creation of pthreads\n"); if (verbose) printf("starting creation of pthreads\n");
const int num_threads = cli_args->num_update_threads; const int num_threads = cli_args->num_update_threads;
struct arg myargs[num_threads]; struct arg myargs[num_threads];
for (int i = 0; i < num_threads; i++) { struct update_op_args uoe = get_update_op_args(cli_args, NULL);
// make the guy that updates the db
for (int i = 0; i < 0 + cli_args->num_update_threads; ++i) {
arg_init(&myargs[i], n, dbp, env, cli_args); arg_init(&myargs[i], n, dbp, env, cli_args);
// make the guy that updates the db myargs[i].operation_extra = &uoe;
myargs[i].operation = update_op; myargs[i].operation = update_op;
} }
int num_seconds = random() % cli_args->time_of_test; int num_seconds = random() % cli_args->time_of_test;
run_workers(myargs, num_threads, num_seconds, true); run_workers(myargs, num_threads, num_seconds, true, cli_args);
} }
int int
test_main(int argc, char *const argv[]) { test_main(int argc, char *const argv[]) {
struct cli_args args = DEFAULT_ARGS; struct cli_args args = get_default_args();
args.checkpointing_period = 1; args.env_args.checkpointing_period = 1;
args.num_elements = 2000; args.num_elements = 2000;
parse_stress_test_args(argc, argv, &args); parse_stress_test_args(argc, argv, &args);
if (args.do_test_and_crash) { if (args.do_test_and_crash) {
......
...@@ -55,29 +55,40 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -55,29 +55,40 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
for (int i = 0; i < num_threads; i++) { for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], n, dbp, env, cli_args); arg_init(&myargs[i], n, dbp, env, cli_args);
} }
struct scan_op_extra soe[4];
// make the forward fast scanner // make the forward fast scanner
myargs[0].fast = TRUE; soe[0].fast = TRUE;
myargs[0].fwd = TRUE; soe[0].fwd = TRUE;
soe[0].prefetch = FALSE;
myargs[0].operation_extra = &soe[0];
myargs[0].operation = scan_op; myargs[0].operation = scan_op;
// make the forward slow scanner // make the forward slow scanner
myargs[1].fast = FALSE; soe[1].fast = FALSE;
myargs[1].fwd = TRUE; soe[1].fwd = TRUE;
soe[1].prefetch = FALSE;
myargs[1].operation_extra = &soe[1];
myargs[1].operation = scan_op; myargs[1].operation = scan_op;
// make the backward fast scanner // make the backward fast scanner
myargs[2].fast = TRUE; soe[2].fast = TRUE;
myargs[2].fwd = FALSE; soe[2].fwd = FALSE;
soe[2].prefetch = FALSE;
myargs[2].operation_extra = &soe[2];
myargs[2].operation = scan_op; myargs[2].operation = scan_op;
// make the backward slow scanner // make the backward slow scanner
myargs[3].fast = FALSE; soe[3].fast = FALSE;
myargs[3].fwd = FALSE; soe[3].fwd = FALSE;
soe[3].prefetch = FALSE;
myargs[3].operation_extra = &soe[3];
myargs[3].operation = scan_op; myargs[3].operation = scan_op;
struct update_op_args uoe = get_update_op_args(cli_args, NULL);
// make the guy that updates the db // make the guy that updates the db
for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) { for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) {
myargs[i].operation_extra = &uoe;
myargs[i].operation = update_op; myargs[i].operation = update_op;
} }
...@@ -86,12 +97,12 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -86,12 +97,12 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
myargs[i].operation = ptquery_op; myargs[i].operation = ptquery_op;
} }
run_workers(myargs, num_threads, cli_args->time_of_test, false); run_workers(myargs, num_threads, cli_args->time_of_test, false, cli_args);
} }
int int
test_main(int argc, char *const argv[]) { test_main(int argc, char *const argv[]) {
struct cli_args args = DEFAULT_ARGS; struct cli_args args = get_default_args();
parse_stress_test_args(argc, argv, &args); parse_stress_test_args(argc, argv, &args);
stress_test_main(&args); stress_test_main(&args);
return 0; return 0;
......
...@@ -49,45 +49,56 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -49,45 +49,56 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
for (int i = 0; i < num_threads; i++) { for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], n, dbp, env, cli_args); arg_init(&myargs[i], n, dbp, env, cli_args);
} }
struct scan_op_extra soe[4];
// make the forward fast scanner // make the forward fast scanner
myargs[0].fast = TRUE; soe[0].fast = TRUE;
myargs[0].fwd = TRUE; soe[0].fwd = TRUE;
soe[0].prefetch = FALSE;
myargs[0].operation_extra = &soe[0];
myargs[0].operation = scan_op; myargs[0].operation = scan_op;
// make the forward slow scanner // make the forward slow scanner
myargs[1].fast = FALSE; soe[1].fast = FALSE;
myargs[1].fwd = TRUE; soe[1].fwd = TRUE;
soe[1].prefetch = FALSE;
myargs[1].operation_extra = &soe[1];
myargs[1].operation = scan_op; myargs[1].operation = scan_op;
// make the backward fast scanner // make the backward fast scanner
myargs[2].fast = TRUE; soe[2].fast = TRUE;
myargs[2].fwd = FALSE; soe[2].fwd = FALSE;
soe[2].prefetch = FALSE;
myargs[2].operation_extra = &soe[2];
myargs[2].operation = scan_op; myargs[2].operation = scan_op;
// make the backward slow scanner // make the backward slow scanner
myargs[3].fast = FALSE; soe[3].fast = FALSE;
myargs[3].fwd = FALSE; soe[3].fwd = FALSE;
soe[3].prefetch = FALSE;
myargs[3].operation_extra = &soe[3];
myargs[3].operation = scan_op; myargs[3].operation = scan_op;
struct update_op_args uoe = get_update_op_args(cli_args, NULL);
// make the guy that updates the db // make the guy that updates the db
for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) { for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) {
myargs[i].bounded_update_range = false; myargs[i].operation_extra = &uoe;
myargs[i].bounded_element_range = false;
myargs[i].operation = update_op; myargs[i].operation = update_op;
} }
// make the guy that does point queries // make the guy that does point queries
for (int i = 4 + cli_args->num_update_threads; i < num_threads; i++) { for (int i = 4 + cli_args->num_update_threads; i < num_threads; i++) {
myargs[i].bounded_update_range = false; myargs[i].bounded_element_range = false;
myargs[i].operation = ptquery_op_no_check; myargs[i].operation = ptquery_op_no_check;
} }
run_workers(myargs, num_threads, cli_args->time_of_test, false); run_workers(myargs, num_threads, cli_args->time_of_test, false, cli_args);
} }
int int
test_main(int argc, char *const argv[]) { test_main(int argc, char *const argv[]) {
struct cli_args args = DEFAULT_ARGS; struct cli_args args = get_default_args();
parse_stress_test_args(argc, argv, &args); parse_stress_test_args(argc, argv, &args);
stress_test_main(&args); stress_test_main(&args);
return 0; return 0;
......
...@@ -48,28 +48,40 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -48,28 +48,40 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
arg_init(&myargs[i], n, dbp, env, cli_args); arg_init(&myargs[i], n, dbp, env, cli_args);
} }
struct scan_op_extra soe[4];
// make the forward fast scanner // make the forward fast scanner
myargs[0].fast = TRUE; soe[0].fast = TRUE;
myargs[0].fwd = TRUE; soe[0].fwd = TRUE;
soe[0].prefetch = FALSE;
myargs[0].operation_extra = &soe[0];
myargs[0].operation = scan_op; myargs[0].operation = scan_op;
// make the forward slow scanner // make the forward slow scanner
myargs[1].fast = FALSE; soe[1].fast = FALSE;
myargs[1].fwd = TRUE; soe[1].fwd = TRUE;
soe[1].prefetch = FALSE;
myargs[1].operation_extra = &soe[1];
myargs[1].operation = scan_op; myargs[1].operation = scan_op;
// make the backward fast scanner // make the backward fast scanner
myargs[2].fast = TRUE; soe[2].fast = TRUE;
myargs[2].fwd = FALSE; soe[2].fwd = FALSE;
soe[2].prefetch = FALSE;
myargs[2].operation_extra = &soe[2];
myargs[2].operation = scan_op; myargs[2].operation = scan_op;
// make the backward slow scanner // make the backward slow scanner
myargs[3].fast = FALSE; soe[3].fast = FALSE;
myargs[3].fwd = FALSE; soe[3].fwd = FALSE;
soe[3].prefetch = FALSE;
myargs[3].operation_extra = &soe[3];
myargs[3].operation = scan_op; myargs[3].operation = scan_op;
struct update_op_args uoe = get_update_op_args(cli_args, NULL);
// make the guy that updates the db // make the guy that updates the db
for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) { for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) {
myargs[i].operation_extra = &uoe;
myargs[i].lock_type = STRESS_LOCK_SHARED; myargs[i].lock_type = STRESS_LOCK_SHARED;
myargs[i].operation = update_op; myargs[i].operation = update_op;
} }
...@@ -84,12 +96,12 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -84,12 +96,12 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
myargs[i].operation = ptquery_op; myargs[i].operation = ptquery_op;
} }
run_workers(myargs, num_threads, cli_args->time_of_test, false); run_workers(myargs, num_threads, cli_args->time_of_test, false, cli_args);
} }
int int
test_main(int argc, char *const argv[]) { test_main(int argc, char *const argv[]) {
struct cli_args args = DEFAULT_ARGS; struct cli_args args = get_default_args();
parse_stress_test_args(argc, argv, &args); parse_stress_test_args(argc, argv, &args);
stress_test_main(&args); stress_test_main(&args);
return 0; return 0;
......
...@@ -48,30 +48,42 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -48,30 +48,42 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
arg_init(&myargs[i], n, dbp, env, cli_args); arg_init(&myargs[i], n, dbp, env, cli_args);
} }
struct scan_op_extra soe[4];
// make the forward fast scanner // make the forward fast scanner
myargs[0].fast = TRUE; soe[0].fast = TRUE;
myargs[0].fwd = TRUE; soe[0].fwd = TRUE;
soe[0].prefetch = FALSE;
myargs[0].operation_extra = &soe[0];
myargs[0].operation = scan_op_no_check; myargs[0].operation = scan_op_no_check;
// make the forward slow scanner // make the forward slow scanner
myargs[1].fast = FALSE; soe[1].fast = FALSE;
myargs[1].fwd = TRUE; soe[1].fwd = TRUE;
soe[1].prefetch = FALSE;
myargs[1].operation_extra = &soe[1];
myargs[1].operation = scan_op_no_check; myargs[1].operation = scan_op_no_check;
// make the backward fast scanner // make the backward fast scanner
myargs[2].fast = TRUE; soe[2].fast = TRUE;
myargs[2].fwd = FALSE; soe[2].fwd = FALSE;
soe[2].prefetch = FALSE;
myargs[2].operation_extra = &soe[2];
myargs[2].operation = scan_op_no_check; myargs[2].operation = scan_op_no_check;
// make the backward slow scanner // make the backward slow scanner
myargs[3].fast = FALSE; soe[3].fast = FALSE;
myargs[3].fwd = FALSE; soe[3].fwd = FALSE;
soe[3].prefetch = FALSE;
myargs[3].operation_extra = &soe[3];
myargs[3].operation = scan_op_no_check; myargs[3].operation = scan_op_no_check;
struct update_op_args uoe[cli_args->num_update_threads];
// make the guy that updates the db // make the guy that updates the db
for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) { for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) {
myargs[i].update_history_buffer = toku_xmalloc(n * (sizeof myargs[i].update_history_buffer[0])); int* update_history_buffer = toku_xmalloc(n * (sizeof uoe[i-4].update_history_buffer[0]));
memset(myargs[i].update_history_buffer, 0, n * (sizeof myargs[i].update_history_buffer[0])); uoe[i-4] = get_update_op_args(cli_args,update_history_buffer);
memset(uoe[i-4].update_history_buffer, 0, n * (sizeof uoe[i-4].update_history_buffer[0]));
myargs[i].operation = update_with_history_op; myargs[i].operation = update_with_history_op;
} }
...@@ -80,16 +92,16 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -80,16 +92,16 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
myargs[i].operation = ptquery_op; myargs[i].operation = ptquery_op;
} }
run_workers(myargs, num_threads, cli_args->time_of_test, false); run_workers(myargs, num_threads, cli_args->time_of_test, false, cli_args);
for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) { for (int i = 0; i < cli_args->num_update_threads; ++i) {
toku_free(myargs[i].update_history_buffer); toku_free(uoe[i].update_history_buffer);
} }
} }
int int
test_main(int argc, char *const argv[]) { test_main(int argc, char *const argv[]) {
struct cli_args args = DEFAULT_ARGS; struct cli_args args = get_default_args();
parse_stress_test_args(argc, argv, &args); parse_stress_test_args(argc, argv, &args);
stress_test_main(&args); stress_test_main(&args);
return 0; return 0;
......
...@@ -29,21 +29,30 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -29,21 +29,30 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
for (int i = 0; i < num_threads; i++) { for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], n, dbp, env, cli_args); arg_init(&myargs[i], n, dbp, env, cli_args);
} }
struct scan_op_extra soe[2];
// make the forward fast scanner // make the forward fast scanner
myargs[0].fast = TRUE; soe[0].fast = TRUE;
myargs[0].fwd = TRUE; soe[0].fwd = TRUE;
soe[0].prefetch = FALSE;
myargs[0].operation_extra = &soe[0];
myargs[0].operation = scan_op; myargs[0].operation = scan_op;
// make the backward slow scanner // make the forward slow scanner
myargs[1].fast = FALSE; soe[1].fast = FALSE;
myargs[1].fwd = FALSE; soe[1].fwd = TRUE;
soe[1].prefetch = FALSE;
myargs[1].operation_extra = &soe[1];
myargs[1].operation = scan_op; myargs[1].operation = scan_op;
// make the guy that updates the db // make the guy that updates the db
myargs[2].operation = loader_op; myargs[2].operation = loader_op;
myargs[3].operation = keyrange_op; myargs[3].operation = keyrange_op;
struct update_op_args uoe = get_update_op_args(cli_args, NULL);
// make the guy that updates the db
for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) { for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) {
myargs[i].operation_extra = &uoe;
myargs[i].operation = update_op; myargs[i].operation = update_op;
} }
...@@ -51,14 +60,14 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -51,14 +60,14 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
for (int i = 4 + cli_args->num_update_threads; i < num_threads; i++) { for (int i = 4 + cli_args->num_update_threads; i < num_threads; i++) {
myargs[i].operation = ptquery_op; myargs[i].operation = ptquery_op;
} }
run_workers(myargs, num_threads, cli_args->time_of_test, false); run_workers(myargs, num_threads, cli_args->time_of_test, false, cli_args);
} }
int int
test_main(int argc, char *const argv[]) { test_main(int argc, char *const argv[]) {
struct cli_args args = DEFAULT_ARGS; struct cli_args args = get_default_args();
// let's make default checkpointing period really slow // let's make default checkpointing period really slow
args.checkpointing_period = 1; args.env_args.checkpointing_period = 1;
parse_stress_test_args(argc, argv, &args); parse_stress_test_args(argc, argv, &args);
stress_test_main(&args); stress_test_main(&args);
return 0; return 0;
......
...@@ -50,28 +50,38 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -50,28 +50,38 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
arg_init(&myargs[i], n, dbp, env, cli_args); arg_init(&myargs[i], n, dbp, env, cli_args);
} }
struct scan_op_extra soe[4];
// make the forward fast scanner // make the forward fast scanner
myargs[0].fast = TRUE; soe[0].fast = TRUE;
myargs[0].fwd = TRUE; soe[0].fwd = TRUE;
soe[0].prefetch = FALSE;
myargs[0].lock_type = STRESS_LOCK_SHARED; myargs[0].lock_type = STRESS_LOCK_SHARED;
myargs[0].operation_extra = &soe[0];
myargs[0].operation = scan_op; myargs[0].operation = scan_op;
// make the forward slow scanner // make the forward slow scanner
myargs[1].fast = FALSE; soe[1].fast = FALSE;
myargs[1].fwd = TRUE; soe[1].fwd = TRUE;
soe[1].prefetch = FALSE;
myargs[1].lock_type = STRESS_LOCK_SHARED; myargs[1].lock_type = STRESS_LOCK_SHARED;
myargs[1].operation_extra = &soe[1];
myargs[1].operation = scan_op; myargs[1].operation = scan_op;
// make the backward fast scanner // make the backward fast scanner
myargs[2].fast = TRUE; soe[2].fast = TRUE;
myargs[2].fwd = FALSE; soe[2].fwd = FALSE;
soe[2].prefetch = FALSE;
myargs[2].lock_type = STRESS_LOCK_SHARED; myargs[2].lock_type = STRESS_LOCK_SHARED;
myargs[2].operation_extra = &soe[2];
myargs[2].operation = scan_op; myargs[2].operation = scan_op;
// make the backward slow scanner // make the backward slow scanner
myargs[3].fast = FALSE; soe[3].fast = FALSE;
myargs[3].fwd = FALSE; soe[3].fwd = FALSE;
soe[3].prefetch = FALSE;
myargs[3].lock_type = STRESS_LOCK_SHARED; myargs[3].lock_type = STRESS_LOCK_SHARED;
myargs[3].operation_extra = &soe[3];
myargs[3].operation = scan_op; myargs[3].operation = scan_op;
// make the guy that removes and recreates the db // make the guy that removes and recreates the db
...@@ -84,25 +94,27 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -84,25 +94,27 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
myargs[5].operation = truncate_me; myargs[5].operation = truncate_me;
// make the guy that updates the db // make the guy that updates the db
struct update_op_args uoe = get_update_op_args(cli_args, NULL);
for (int i = 6; i < 6 + cli_args->num_update_threads; ++i) { for (int i = 6; i < 6 + cli_args->num_update_threads; ++i) {
myargs[i].bounded_update_range = false; myargs[i].bounded_element_range = false;
myargs[i].lock_type = STRESS_LOCK_SHARED; myargs[i].lock_type = STRESS_LOCK_SHARED;
myargs[i].operation_extra = &uoe;
myargs[i].operation = update_op; myargs[i].operation = update_op;
} }
// make the guy that does point queries // make the guy that does point queries
for (int i = 6 + cli_args->num_update_threads; i < num_threads; i++) { for (int i = 6 + cli_args->num_update_threads; i < num_threads; i++) {
myargs[i].lock_type = STRESS_LOCK_SHARED; myargs[i].lock_type = STRESS_LOCK_SHARED;
myargs[i].bounded_update_range = false; myargs[i].bounded_element_range = false;
myargs[i].operation = ptquery_op_no_check; myargs[i].operation = ptquery_op_no_check;
} }
run_workers(myargs, num_threads, cli_args->time_of_test, false); run_workers(myargs, num_threads, cli_args->time_of_test, false, cli_args);
} }
int int
test_main(int argc, char *const argv[]) { test_main(int argc, char *const argv[]) {
struct cli_args args = DEFAULT_ARGS; struct cli_args args = get_default_args();
parse_stress_test_args(argc, argv, &args); parse_stress_test_args(argc, argv, &args);
stress_test_main(&args); stress_test_main(&args);
return 0; return 0;
......
...@@ -29,21 +29,30 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -29,21 +29,30 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
for (int i = 0; i < num_threads; i++) { for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], n, dbp, env, cli_args); arg_init(&myargs[i], n, dbp, env, cli_args);
} }
struct scan_op_extra soe[2];
// make the forward fast scanner // make the forward fast scanner
myargs[0].fast = TRUE; soe[0].fast = TRUE;
myargs[0].fwd = TRUE; soe[0].fwd = TRUE;
soe[0].prefetch = FALSE;
myargs[0].operation_extra = &soe[0];
myargs[0].operation = scan_op; myargs[0].operation = scan_op;
// make the backward slow scanner // make the forward slow scanner
myargs[1].fast = FALSE; soe[1].fast = FALSE;
myargs[1].fwd = FALSE; soe[1].fwd = TRUE;
soe[1].prefetch = FALSE;
myargs[1].operation_extra = &soe[1];
myargs[1].operation = scan_op; myargs[1].operation = scan_op;
// make the guy that runs HOT in the background // make the guy that runs HOT in the background
myargs[2].operation = hot_op; myargs[2].operation = hot_op;
myargs[3].operation = keyrange_op; myargs[3].operation = keyrange_op;
struct update_op_args uoe = get_update_op_args(cli_args, NULL);
// make the guy that updates the db
for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) { for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) {
myargs[i].operation_extra = &uoe;
myargs[i].operation = update_op; myargs[i].operation = update_op;
} }
...@@ -51,14 +60,14 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -51,14 +60,14 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
for (int i = 4 + cli_args->num_update_threads; i < num_threads; i++) { for (int i = 4 + cli_args->num_update_threads; i < num_threads; i++) {
myargs[i].operation = ptquery_op; myargs[i].operation = ptquery_op;
} }
run_workers(myargs, num_threads, cli_args->time_of_test, false); run_workers(myargs, num_threads, cli_args->time_of_test, false, cli_args);
} }
int int
test_main(int argc, char *const argv[]) { test_main(int argc, char *const argv[]) {
struct cli_args args = DEFAULT_ARGS; struct cli_args args = get_default_args();
// let's make default checkpointing period really slow // let's make default checkpointing period really slow
args.checkpointing_period = 1; args.env_args.checkpointing_period = 1;
parse_stress_test_args(argc, argv, &args); parse_stress_test_args(argc, argv, &args);
stress_test_main(&args); stress_test_main(&args);
return 0; return 0;
......
...@@ -36,8 +36,11 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -36,8 +36,11 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
arg_init(&myargs[i], n, dbp, env, cli_args); arg_init(&myargs[i], n, dbp, env, cli_args);
} }
// make the forward fast scanner // make the forward fast scanner
myargs[0].fast = TRUE; struct scan_op_extra soe;
myargs[0].fwd = TRUE; soe.fast = TRUE;
soe.fwd = TRUE;
soe.prefetch = FALSE;
myargs[0].operation_extra = &soe;
myargs[0].lock_type = STRESS_LOCK_SHARED; myargs[0].lock_type = STRESS_LOCK_SHARED;
myargs[0].operation = scan_op; myargs[0].operation = scan_op;
...@@ -52,18 +55,20 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -52,18 +55,20 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
} }
// make the guy that does point queries // make the guy that does point queries
struct update_op_args uoe = get_update_op_args(cli_args, NULL);
for (int i = 2 + cli_args->num_update_threads; i < num_threads; i++) { for (int i = 2 + cli_args->num_update_threads; i < num_threads; i++) {
myargs[i].lock_type = STRESS_LOCK_SHARED; myargs[i].lock_type = STRESS_LOCK_SHARED;
myargs[i].operation_extra = &uoe;
myargs[i].operation = ptquery_op; myargs[i].operation = ptquery_op;
} }
run_workers(myargs, num_threads, cli_args->time_of_test, false); run_workers(myargs, num_threads, cli_args->time_of_test, false, cli_args);
} }
int int
test_main(int argc, char *const argv[]) { test_main(int argc, char *const argv[]) {
struct cli_args args = DEFAULT_ARGS; struct cli_args args = get_default_args();
// let's make default checkpointing period really slow // let's make default checkpointing period really slow
args.checkpointing_period = 1; args.env_args.checkpointing_period = 1;
args.num_elements= 2000; // make default of small num elements to args.num_elements= 2000; // make default of small num elements to
args.num_ptquery_threads = 0; args.num_ptquery_threads = 0;
parse_stress_test_args(argc, argv, &args); parse_stress_test_args(argc, argv, &args);
......
...@@ -23,7 +23,7 @@ ...@@ -23,7 +23,7 @@
volatile bool run_test; // should be volatile since we are communicating through this variable. volatile bool run_test; // should be volatile since we are communicating through this variable.
typedef struct arg *ARG; typedef struct arg *ARG;
typedef int (*operation_t)(DB_ENV *env, DB** dbp, DB_TXN *txn, ARG arg); typedef int (*operation_t)(DB_TXN *txn, ARG arg, void* operation_extra);
typedef int (*test_update_callback_f)(DB *, const DBT *key, const DBT *old_val, const DBT *extra, void (*set_val)(const DBT *new_val, void *set_extra), void *set_extra); typedef int (*test_update_callback_f)(DB *, const DBT *key, const DBT *old_val, const DBT *extra, void (*set_val)(const DBT *new_val, void *set_extra), void *set_extra);
...@@ -34,108 +34,127 @@ enum stress_lock_type { ...@@ -34,108 +34,127 @@ enum stress_lock_type {
}; };
struct arg { struct arg {
int n; int num_elements; // number of elements per DB
DB **dbp; DB **dbp; // array of DBs
DB_ENV* env; int num_DBs; // number of DBs
bool fast; DB_ENV* env; // environment used
bool fwd; bool bounded_element_range; // true if elements in dictionary are bounded
bool prefetch; // by num_elements, that is, all keys in each
bool bounded_update_range; // DB are in [0, num_elements)
int sleep_ms; // false otherwise
enum stress_lock_type lock_type; int sleep_ms; // number of milliseconds to sleep between operations
u_int32_t txn_type; u_int32_t txn_type; // isolation level for txn running operation
int *update_history_buffer; operation_t operation; // function that is the operation to be run
operation_t operation; void* operation_extra; // extra parameter passed to operation
toku_pthread_mutex_t *broadcast_lock_mutex; enum stress_lock_type lock_type; // states if operation must be exclusive, shared, or does not require locking
struct rwlock *broadcast_lock; bool crash_on_operation_failure; // true if we should crash if operation returns non-zero, false otherwise
int update_pad_frequency;
bool crash_on_update_failure;
u_int32_t update_txn_size;
}; };
struct cli_args { struct env_args {
int num_elements;
int time_of_test;
int node_size; int node_size;
int basement_node_size; int basement_node_size;
u_int64_t cachetable_size;
bool only_create;
bool only_stress;
int checkpointing_period; int checkpointing_period;
int cleaner_period; int cleaner_period;
int cleaner_iterations; int cleaner_iterations;
int update_broadcast_period_ms; u_int64_t cachetable_size;
int num_ptquery_threads;
test_update_callback_f update_function;
bool do_test_and_crash;
bool do_recover;
char *envdir; char *envdir;
int num_update_threads; test_update_callback_f update_function; // update callback function
bool crash_on_update_failure; };
u_int32_t update_txn_size;
struct cli_args {
int num_elements; // number of elements per DB
int num_DBs; // number of DBs
int time_of_test; // how long test should run
bool only_create; // true if want to only create DBs but not run stress
bool only_stress; // true if DBs are already created and want to only run stress
int update_broadcast_period_ms; // specific to test_stress3
int num_ptquery_threads; // number of threads to run point queries
bool do_test_and_crash; // true if we should crash after running stress test. For recovery tests.
bool do_recover; // true if we should run recover
int num_update_threads; // number of threads running updates
bool crash_on_update_failure;
bool print_performance;
bool print_thread_performance;
int performance_period;
u_int32_t update_txn_size; // for clients that do updates, specifies number of updates per txn
u_int32_t key_size; // number of bytes in vals. Must be at least 4
u_int32_t val_size; // number of bytes in vals. Must be at least 4
struct env_args env_args; // specifies environment variables
}; };
DB_TXN * const null_txn = 0; DB_TXN * const null_txn = 0;
static void arg_init(struct arg *arg, int n, DB **dbp, DB_ENV *env, struct cli_args *cli_args) { static void arg_init(struct arg *arg, int num_elements, DB **dbp, DB_ENV *env, struct cli_args *cli_args) {
arg->n = n; arg->num_elements = num_elements;
arg->dbp = dbp; arg->dbp = dbp;
arg->num_DBs = cli_args->num_DBs;
arg->env = env; arg->env = env;
arg->fast = true; arg->bounded_element_range = true;
arg->fwd = true;
arg->prefetch = false; // setting this to TRUE causes thrashing, even with a cachetable size that is 400000. Must investigate
arg->bounded_update_range = true;
arg->sleep_ms = 0; arg->sleep_ms = 0;
arg->lock_type = STRESS_LOCK_NONE; arg->lock_type = STRESS_LOCK_NONE;
arg->txn_type = DB_TXN_SNAPSHOT; arg->txn_type = DB_TXN_SNAPSHOT;
arg->update_history_buffer = NULL; arg->crash_on_operation_failure = cli_args->crash_on_update_failure;
arg->update_pad_frequency = n/100; // bit arbitrary. Just want dictionary to grow and shrink so splits and merges occur arg->operation_extra = NULL;
arg->crash_on_update_failure = cli_args->crash_on_update_failure; }
arg->update_txn_size = cli_args->update_txn_size;
struct worker_extra {
struct arg* thread_arg;
toku_pthread_mutex_t *operation_lock_mutex;
struct rwlock *operation_lock;
int num_operations_completed;
};
static void lock_worker_op(struct worker_extra* we) {
ARG arg = we->thread_arg;
if (arg->lock_type != STRESS_LOCK_NONE) {
toku_pthread_mutex_lock(we->operation_lock_mutex);
if (arg->lock_type == STRESS_LOCK_SHARED) {
rwlock_read_lock(we->operation_lock, we->operation_lock_mutex);
} else if (arg->lock_type == STRESS_LOCK_EXCL) {
rwlock_write_lock(we->operation_lock, we->operation_lock_mutex);
} else {
assert(false);
}
toku_pthread_mutex_unlock(we->operation_lock_mutex);
}
}
static void unlock_worker_op(struct worker_extra* we) {
ARG arg = we->thread_arg;
toku_pthread_mutex_lock(we->operation_lock_mutex);
if (arg->lock_type == STRESS_LOCK_SHARED) {
rwlock_read_unlock(we->operation_lock);
} else if (arg->lock_type == STRESS_LOCK_EXCL) {
rwlock_write_unlock(we->operation_lock);
} else {
assert(arg->lock_type == STRESS_LOCK_NONE);
}
toku_pthread_mutex_unlock(we->operation_lock_mutex);
} }
static void *worker(void *arg_v) { static void *worker(void *arg_v) {
ARG arg = arg_v; struct worker_extra* we = arg_v;
ARG arg = we->thread_arg;
DB_ENV *env = arg->env; DB_ENV *env = arg->env;
DB** dbp = arg->dbp;
DB_TXN *txn = NULL; DB_TXN *txn = NULL;
if (verbose) if (verbose) {
printf("%lu starting %p\n", toku_pthread_self(), arg->operation); printf("%lu starting %p\n", toku_pthread_self(), arg->operation);
}
while (run_test) { while (run_test) {
if (arg->lock_type != STRESS_LOCK_NONE) { lock_worker_op(we);
toku_pthread_mutex_lock(arg->broadcast_lock_mutex);
if (arg->lock_type == STRESS_LOCK_SHARED) {
rwlock_read_lock(arg->broadcast_lock, arg->broadcast_lock_mutex);
} else if (arg->lock_type == STRESS_LOCK_EXCL) {
rwlock_write_lock(arg->broadcast_lock, arg->broadcast_lock_mutex);
} else {
assert(false);
}
toku_pthread_mutex_unlock(arg->broadcast_lock_mutex);
}
int r = env->txn_begin(env, 0, &txn, arg->txn_type); CKERR(r); int r = env->txn_begin(env, 0, &txn, arg->txn_type); CKERR(r);
r = arg->operation(env, dbp, txn, arg); r = arg->operation(txn, arg, arg->operation_extra);
if (r == 0) { if (r == 0) {
CHK(txn->commit(txn,0)); CHK(txn->commit(txn,0));
} else { } else {
if (arg->crash_on_update_failure) { if (arg->crash_on_operation_failure) {
CKERR(r); CKERR(r);
} else { } else {
CHK(txn->abort(txn)); CHK(txn->abort(txn));
} }
} }
unlock_worker_op(we);
toku_pthread_mutex_lock(arg->broadcast_lock_mutex); we->num_operations_completed++;
if (arg->lock_type == STRESS_LOCK_SHARED) {
rwlock_read_unlock(arg->broadcast_lock);
} else if (arg->lock_type == STRESS_LOCK_EXCL) {
rwlock_write_unlock(arg->broadcast_lock);
} else {
assert(arg->lock_type == STRESS_LOCK_NONE);
}
toku_pthread_mutex_unlock(arg->broadcast_lock_mutex);
if (arg->sleep_ms) { if (arg->sleep_ms) {
usleep(arg->sleep_ms * 1000); usleep(arg->sleep_ms * 1000);
} }
...@@ -152,6 +171,12 @@ struct scan_cb_extra { ...@@ -152,6 +171,12 @@ struct scan_cb_extra {
int64_t num_elements; int64_t num_elements;
}; };
struct scan_op_extra {
bool fast;
bool fwd;
bool prefetch;
};
static int static int
scan_cb(const DBT *a, const DBT *b, void *arg_v) { scan_cb(const DBT *a, const DBT *b, void *arg_v) {
SCAN_CB_EXTRA cb_extra = arg_v; SCAN_CB_EXTRA cb_extra = arg_v;
...@@ -164,23 +189,28 @@ scan_cb(const DBT *a, const DBT *b, void *arg_v) { ...@@ -164,23 +189,28 @@ scan_cb(const DBT *a, const DBT *b, void *arg_v) {
return cb_extra->fast ? TOKUDB_CURSOR_CONTINUE : 0; return cb_extra->fast ? TOKUDB_CURSOR_CONTINUE : 0;
} }
static int scan_op_and_maybe_check_sum(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg, bool check_sum) { static int scan_op_and_maybe_check_sum(
DB* db,
DB_TXN *txn,
struct scan_op_extra* sce,
bool check_sum
)
{
int r = 0; int r = 0;
DB* db = *dbp;
DBC* cursor = NULL; DBC* cursor = NULL;
struct scan_cb_extra e; struct scan_cb_extra e;
e.fast = arg->fast; e.fast = sce->fast;
e.curr_sum = 0; e.curr_sum = 0;
e.num_elements = 0; e.num_elements = 0;
CHK(db->cursor(db, txn, &cursor, 0)); CHK(db->cursor(db, txn, &cursor, 0));
if (arg->prefetch) { if (sce->prefetch) {
r = cursor->c_pre_acquire_range_lock(cursor, db->dbt_neg_infty(), db->dbt_pos_infty()); r = cursor->c_pre_acquire_range_lock(cursor, db->dbt_neg_infty(), db->dbt_pos_infty());
assert(r == 0); assert(r == 0);
} }
while (r != DB_NOTFOUND) { while (r != DB_NOTFOUND) {
if (arg->fwd) { if (sce->fwd) {
r = cursor->c_getf_next(cursor, 0, scan_cb, &e); r = cursor->c_getf_next(cursor, 0, scan_cb, &e);
} }
else { else {
...@@ -217,7 +247,8 @@ static int generate_row_for_put( ...@@ -217,7 +247,8 @@ static int generate_row_for_put(
return 0; return 0;
} }
static int UU() loader_op(DB_ENV *env, DB** UU(dbp), DB_TXN* txn, ARG UU(arg)) { static int UU() loader_op(DB_TXN* txn, ARG UU(arg), void* UU(operation_extra)) {
DB_ENV* env = arg->env;
int r; int r;
for (int num = 0; num < 2; num++) { for (int num = 0; num < 2; num++) {
DB *db_load; DB *db_load;
...@@ -248,12 +279,16 @@ static int UU() loader_op(DB_ENV *env, DB** UU(dbp), DB_TXN* txn, ARG UU(arg)) { ...@@ -248,12 +279,16 @@ static int UU() loader_op(DB_ENV *env, DB** UU(dbp), DB_TXN* txn, ARG UU(arg)) {
return 0; return 0;
} }
static int UU() keyrange_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) { static int UU() keyrange_op(DB_TXN *txn, ARG arg, void* UU(operation_extra)) {
int r; int r;
DB* db = *dbp; // callback is designed to run on tests with one DB
// no particular reason why, just the way it was
// originally done
int db_index = random()%arg->num_DBs;
DB* db = arg->dbp[db_index];
int rand_key = random(); int rand_key = random();
if (arg->bounded_update_range) { if (arg->bounded_element_range) {
rand_key = rand_key % arg->n; rand_key = rand_key % arg->num_elements;
} }
DBT key; DBT key;
dbt_init(&key, &rand_key, sizeof rand_key); dbt_init(&key, &rand_key, sizeof rand_key);
...@@ -264,28 +299,39 @@ static int UU() keyrange_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) { ...@@ -264,28 +299,39 @@ static int UU() keyrange_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) {
return r; return r;
} }
static int UU() verify_op(DB_ENV *UU(env), DB **dbp, DB_TXN* UU(txn), ARG UU(arg)) { static int UU() verify_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra)) {
int r; int r;
DB* db = *dbp; for (int i = 0; i < arg->num_DBs; i++) {
r = db->verify_with_progress(db, NULL, NULL, 0, 0); DB* db = arg->dbp[i];
r = db->verify_with_progress(db, NULL, NULL, 0, 0);
}
CKERR(r); CKERR(r);
return r; return r;
} }
static int UU() scan_op(DB_ENV *env, DB **dbp, DB_TXN *txn, ARG arg) { static int UU() scan_op(DB_TXN *txn, ARG UU(arg), void* operation_extra) {
return scan_op_and_maybe_check_sum(env, dbp, txn, arg, true); struct scan_op_extra* extra = operation_extra;
for (int i = 0; i < arg->num_DBs; i++) {
int r = scan_op_and_maybe_check_sum(arg->dbp[i], txn, extra, true);
assert_zero(r);
}
return 0;
} }
static int UU() scan_op_no_check(DB_ENV *env, DB **dbp, DB_TXN *txn, ARG arg) { static int UU() scan_op_no_check(DB_TXN *txn, ARG arg, void* operation_extra) {
return scan_op_and_maybe_check_sum(env, dbp, txn, arg, false); struct scan_op_extra* extra = operation_extra;
for (int i = 0; i < arg->num_DBs; i++) {
int r = scan_op_and_maybe_check_sum(arg->dbp[i], txn, extra, false);
assert_zero(r);
}
return 0;
} }
static int UU() ptquery_and_maybe_check_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg, BOOL check) { static int UU() ptquery_and_maybe_check_op(DB* db, DB_TXN *txn, ARG arg, BOOL check) {
int r; int r;
DB* db = *dbp;
int rand_key = random(); int rand_key = random();
if (arg->bounded_update_range) { if (arg->bounded_element_range) {
rand_key = rand_key % arg->n; rand_key = rand_key % arg->num_elements;
} }
DBT key, val; DBT key, val;
memset(&val, 0, sizeof val); memset(&val, 0, sizeof val);
...@@ -296,12 +342,16 @@ static int UU() ptquery_and_maybe_check_op(DB_ENV *UU(env), DB **dbp, DB_TXN *tx ...@@ -296,12 +342,16 @@ static int UU() ptquery_and_maybe_check_op(DB_ENV *UU(env), DB **dbp, DB_TXN *tx
return r; return r;
} }
static int UU() ptquery_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) { static int UU() ptquery_op(DB_TXN *txn, ARG arg, void* UU(operation_extra)) {
return ptquery_and_maybe_check_op(env, dbp, txn, arg, TRUE); int db_index = random()%arg->num_DBs;
DB* db = arg->dbp[db_index];
return ptquery_and_maybe_check_op(db, txn, arg, TRUE);
} }
static int UU() ptquery_op_no_check(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) { static int UU() ptquery_op_no_check(DB_TXN *txn, ARG arg, void* UU(operation_extra)) {
return ptquery_and_maybe_check_op(env, dbp, txn, arg, FALSE); int db_index = random()%arg->num_DBs;
DB* db = arg->dbp[db_index];
return ptquery_and_maybe_check_op(db, txn, arg, FALSE);
} }
#define MAX_RANDOM_VAL 10000 #define MAX_RANDOM_VAL 10000
...@@ -326,6 +376,20 @@ struct update_op_extra { ...@@ -326,6 +376,20 @@ struct update_op_extra {
} u; } u;
}; };
struct update_op_args {
int *update_history_buffer;
u_int32_t update_txn_size;
int update_pad_frequency;
};
static struct update_op_args get_update_op_args(struct cli_args* cli_args, int* update_history_buffer) {
struct update_op_args uoe;
uoe.update_history_buffer = update_history_buffer;
uoe.update_pad_frequency = cli_args->num_elements/100; // arbitrary
uoe.update_txn_size = cli_args->update_txn_size;
return uoe;
}
static u_int64_t update_count = 0; static u_int64_t update_count = 0;
static int update_op_callback(DB *UU(db), const DBT *UU(key), static int update_op_callback(DB *UU(db), const DBT *UU(key),
...@@ -367,24 +431,26 @@ static int update_op_callback(DB *UU(db), const DBT *UU(key), ...@@ -367,24 +431,26 @@ static int update_op_callback(DB *UU(db), const DBT *UU(key),
return 0; return 0;
} }
static int UU()update_op2(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) { static int UU()update_op2(DB_TXN* txn, ARG arg, void* operation_extra) {
int r; int r;
DB* db = *dbp; int db_index = random()%arg->num_DBs;
DB* db = arg->dbp[db_index];
int curr_val_sum = 0; int curr_val_sum = 0;
DBT key, val; DBT key, val;
int rand_key; int rand_key;
int rand_key2; int rand_key2;
struct update_op_args* op_args = operation_extra;
update_count++; update_count++;
struct update_op_extra extra; struct update_op_extra extra;
memset(&extra, 0, sizeof(extra)); memset(&extra, 0, sizeof(extra));
extra.type = UPDATE_ADD_DIFF; extra.type = UPDATE_ADD_DIFF;
extra.pad_bytes = 0; extra.pad_bytes = 0;
for (u_int32_t i = 0; i < arg->update_txn_size; i++) { for (u_int32_t i = 0; i < op_args->update_txn_size; i++) {
rand_key = random(); rand_key = random();
if (arg->bounded_update_range) { if (arg->bounded_element_range) {
rand_key = rand_key % (arg->n/2); rand_key = rand_key % (arg->num_elements/2);
} }
rand_key2 = arg->n - rand_key; rand_key2 = arg->num_elements - rand_key;
assert(rand_key != rand_key2); assert(rand_key != rand_key2);
extra.u.d.diff = 1; extra.u.d.diff = 1;
curr_val_sum += extra.u.d.diff; curr_val_sum += extra.u.d.diff;
...@@ -395,10 +461,9 @@ static int UU()update_op2(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) { ...@@ -395,10 +461,9 @@ static int UU()update_op2(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) {
dbt_init(&val, &extra, sizeof extra), dbt_init(&val, &extra, sizeof extra),
0 0
); );
if (r != 0 && !arg->crash_on_update_failure) { if (r != 0) {
return r; return r;
} }
CKERR(r);
extra.u.d.diff = -1; extra.u.d.diff = -1;
r = db->update( r = db->update(
db, db,
...@@ -407,34 +472,35 @@ static int UU()update_op2(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) { ...@@ -407,34 +472,35 @@ static int UU()update_op2(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) {
dbt_init(&val, &extra, sizeof extra), dbt_init(&val, &extra, sizeof extra),
0 0
); );
if (r != 0 && !arg->crash_on_update_failure) { if (r != 0) {
return r; return r;
} }
CKERR(r);
} }
return r; return r;
} }
static int UU()update_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) { static int UU()update_op(DB_TXN *txn, ARG arg, void* operation_extra) {
int r; int r;
DB* db = *dbp; int db_index = random()%arg->num_DBs;
DB* db = arg->dbp[db_index];
int curr_val_sum = 0; int curr_val_sum = 0;
DBT key, val; DBT key, val;
int rand_key; int rand_key;
update_count++; update_count++;
struct update_op_args* op_args = operation_extra;
struct update_op_extra extra; struct update_op_extra extra;
memset(&extra, 0, sizeof(extra)); memset(&extra, 0, sizeof(extra));
extra.type = UPDATE_ADD_DIFF; extra.type = UPDATE_ADD_DIFF;
extra.pad_bytes = 0; extra.pad_bytes = 0;
if (arg->update_pad_frequency) { if (op_args->update_pad_frequency) {
if (update_count % (2*arg->update_pad_frequency) == update_count%arg->update_pad_frequency) { if (update_count % (2*op_args->update_pad_frequency) == update_count%op_args->update_pad_frequency) {
extra.pad_bytes = 100; extra.pad_bytes = 100;
} }
} }
for (u_int32_t i = 0; i < arg->update_txn_size; i++) { for (u_int32_t i = 0; i < op_args->update_txn_size; i++) {
rand_key = random(); rand_key = random();
if (arg->bounded_update_range) { if (arg->bounded_element_range) {
rand_key = rand_key % arg->n; rand_key = rand_key % arg->num_elements;
} }
extra.u.d.diff = random() % MAX_RANDOM_VAL; extra.u.d.diff = random() % MAX_RANDOM_VAL;
// just make every other value random // just make every other value random
...@@ -449,18 +515,17 @@ static int UU()update_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) { ...@@ -449,18 +515,17 @@ static int UU()update_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) {
dbt_init(&val, &extra, sizeof extra), dbt_init(&val, &extra, sizeof extra),
0 0
); );
if (r != 0 && !arg->crash_on_update_failure) { if (r != 0) {
return r; return r;
} }
CKERR(r);
} }
// //
// now put in one more to ensure that the sum stays 0 // now put in one more to ensure that the sum stays 0
// //
extra.u.d.diff = -curr_val_sum; extra.u.d.diff = -curr_val_sum;
rand_key = random(); rand_key = random();
if (arg->bounded_update_range) { if (arg->bounded_element_range) {
rand_key = rand_key % arg->n; rand_key = rand_key % arg->num_elements;
} }
r = db->update( r = db->update(
db, db,
...@@ -469,19 +534,20 @@ static int UU()update_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) { ...@@ -469,19 +534,20 @@ static int UU()update_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) {
dbt_init(&val, &extra, sizeof extra), dbt_init(&val, &extra, sizeof extra),
0 0
); );
if (r != 0 && !arg->crash_on_update_failure) { if (r != 0) {
return r; return r;
} }
CKERR(r);
return r; return r;
} }
static int UU() update_with_history_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) { static int UU() update_with_history_op(DB_TXN *txn, ARG arg, void* operation_extra) {
assert(arg->bounded_update_range); struct update_op_args* op_args = operation_extra;
assert(arg->update_history_buffer); assert(arg->bounded_element_range);
assert(op_args->update_history_buffer);
int r; int r;
DB* db = *dbp; int db_index = random()%arg->num_DBs;
DB* db = arg->dbp[db_index];
int curr_val_sum = 0; int curr_val_sum = 0;
DBT key, val; DBT key, val;
int rand_key; int rand_key;
...@@ -490,22 +556,22 @@ static int UU() update_with_history_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, A ...@@ -490,22 +556,22 @@ static int UU() update_with_history_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, A
extra.type = UPDATE_WITH_HISTORY; extra.type = UPDATE_WITH_HISTORY;
update_count++; update_count++;
extra.pad_bytes = 0; extra.pad_bytes = 0;
if (arg->update_pad_frequency) { if (op_args->update_pad_frequency) {
if (update_count % (2*arg->update_pad_frequency) != update_count%arg->update_pad_frequency) { if (update_count % (2*op_args->update_pad_frequency) != update_count%op_args->update_pad_frequency) {
extra.pad_bytes = 500; extra.pad_bytes = 500;
} }
} }
for (u_int32_t i = 0; i < arg->update_txn_size; i++) { for (u_int32_t i = 0; i < op_args->update_txn_size; i++) {
rand_key = random() % arg->n; rand_key = random() % arg->num_elements;
extra.u.h.new = random() % MAX_RANDOM_VAL; extra.u.h.new = random() % MAX_RANDOM_VAL;
// just make every other value random // just make every other value random
if (i%2 == 0) { if (i%2 == 0) {
extra.u.h.new = -extra.u.h.new; extra.u.h.new = -extra.u.h.new;
} }
curr_val_sum += extra.u.h.new; curr_val_sum += extra.u.h.new;
extra.u.h.expected = arg->update_history_buffer[rand_key]; extra.u.h.expected = op_args->update_history_buffer[rand_key];
arg->update_history_buffer[rand_key] = extra.u.h.new; op_args->update_history_buffer[rand_key] = extra.u.h.new;
r = db->update( r = db->update(
db, db,
txn, txn,
...@@ -513,21 +579,20 @@ static int UU() update_with_history_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, A ...@@ -513,21 +579,20 @@ static int UU() update_with_history_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, A
dbt_init(&val, &extra, sizeof extra), dbt_init(&val, &extra, sizeof extra),
0 0
); );
if (r != 0 && !arg->crash_on_update_failure) { if (r != 0) {
return r; return r;
} }
CKERR(r);
} }
// //
// now put in one more to ensure that the sum stays 0 // now put in one more to ensure that the sum stays 0
// //
extra.u.h.new = -curr_val_sum; extra.u.h.new = -curr_val_sum;
rand_key = random(); rand_key = random();
if (arg->bounded_update_range) { if (arg->bounded_element_range) {
rand_key = rand_key % arg->n; rand_key = rand_key % arg->num_elements;
} }
extra.u.h.expected = arg->update_history_buffer[rand_key]; extra.u.h.expected = op_args->update_history_buffer[rand_key];
arg->update_history_buffer[rand_key] = extra.u.h.new; op_args->update_history_buffer[rand_key] = extra.u.h.new;
r = db->update( r = db->update(
db, db,
txn, txn,
...@@ -535,18 +600,18 @@ static int UU() update_with_history_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, A ...@@ -535,18 +600,18 @@ static int UU() update_with_history_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, A
dbt_init(&val, &extra, sizeof extra), dbt_init(&val, &extra, sizeof extra),
0 0
); );
if (r != 0 && !arg->crash_on_update_failure) { if (r != 0) {
return r; return r;
} }
CKERR(r);
return r; return r;
} }
static int UU() update_broadcast_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG UU(arg)) { static int UU() update_broadcast_op(DB_TXN *txn, ARG UU(arg), void* UU(operation_extra)) {
struct update_op_extra extra; struct update_op_extra extra;
memset(&extra, 0, sizeof(extra)); memset(&extra, 0, sizeof(extra));
DB* db = *dbp; int db_index = random()%arg->num_DBs;
DB* db = arg->dbp[db_index];
extra.type = UPDATE_NEGATE; extra.type = UPDATE_NEGATE;
extra.pad_bytes = 0; extra.pad_bytes = 0;
DBT val; DBT val;
...@@ -555,33 +620,43 @@ static int UU() update_broadcast_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG ...@@ -555,33 +620,43 @@ static int UU() update_broadcast_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG
return r; return r;
} }
static int UU() hot_op(DB_ENV *UU(env), DB **dbp, DB_TXN *UU(txn), ARG UU(arg)) { static int UU() hot_op(DB_TXN *UU(txn), ARG UU(arg), void* UU(operation_extra)) {
int r; int r;
DB* db = *dbp; for (int i = 0; i < arg->num_DBs; i++) {
r = db->hot_optimize(db, NULL, NULL); DB* db = arg->dbp[i];
CKERR(r); r = db->hot_optimize(db, NULL, NULL);
return r; CKERR(r);
}
return 0;
} }
static int UU() remove_and_recreate_me(DB_ENV *env, DB **dbp, DB_TXN *UU(txn), ARG UU(arg)) { static int UU() remove_and_recreate_me(DB_TXN *UU(txn), ARG UU(arg), void* UU(operation_extra)) {
int r; int r;
r = (*dbp)->close(*dbp, 0); CKERR(r); int db_index = random()%arg->num_DBs;
DB* db = arg->dbp[db_index];
r = (db)->close(db, 0); CKERR(r);
r = env->dbremove(env, null_txn, "main", NULL, 0); char name[30];
memset(name, 0, sizeof(name));
snprintf(name, sizeof(name), "main%d", db_index);
r = arg->env->dbremove(arg->env, null_txn, name, NULL, 0);
CKERR(r); CKERR(r);
r = db_create(dbp, env, 0); r = db_create(&(arg->dbp[db_index]), arg->env, 0);
assert(r == 0); assert(r == 0);
r = (*dbp)->open(*dbp, null_txn, "main", NULL, DB_BTREE, DB_CREATE, 0666); r = arg->dbp[db_index]->open(arg->dbp[db_index], null_txn, name, NULL, DB_BTREE, DB_CREATE, 0666);
assert(r == 0); assert(r == 0);
return 0; return 0;
} }
static int UU() truncate_me(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG UU(arg)) { static int UU() truncate_me(DB_TXN *txn, ARG UU(arg), void* UU(operation_extra)) {
int r; int r;
u_int32_t row_count = 0; for ( int i = 0; i < arg->num_DBs; i++) {
r = (*dbp)->truncate(*dbp, txn, &row_count, 0); u_int32_t row_count = 0;
assert(r == 0); r = (*arg->dbp)->truncate(*arg->dbp, txn, &row_count, 0);
assert(r == 0);
}
return 0; return 0;
} }
...@@ -590,32 +665,77 @@ static int UU() truncate_me(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG UU(arg)) ...@@ -590,32 +665,77 @@ static int UU() truncate_me(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG UU(arg))
struct test_time_extra { struct test_time_extra {
int num_seconds; int num_seconds;
bool crash_at_end; bool crash_at_end;
struct worker_extra *wes;
int num_wes;
bool print_performance;
bool print_thread_performance;
int performance_period;
}; };
static void *test_time(void *arg) { static void *test_time(void *arg) {
struct test_time_extra* tte = arg; struct test_time_extra* tte = arg;
int num_seconds = tte->num_seconds; int num_seconds = tte->num_seconds;
// //
// if num_Seconds is set to 0, run indefinitely // if num_Seconds is set to 0, run indefinitely
// //
if (num_seconds != 0) { if (num_seconds == 0) {
if (verbose) num_seconds = INT32_MAX;
printf("Sleeping for %d seconds\n", num_seconds); }
usleep(num_seconds*1000*1000); if (verbose) {
if (verbose) printf("Sleeping for %d seconds\n", num_seconds);
printf("should now end test\n"); }
__sync_bool_compare_and_swap(&run_test, true, false); // make this atomic to make valgrind --tool=drd happy. int num_operations_completed_total[tte->num_wes];
if (verbose) memset(num_operations_completed_total, 0, sizeof num_operations_completed_total);
printf("run_test %d\n", run_test); for (int i = 0; i < num_seconds; i += tte->performance_period) {
if (tte->crash_at_end) { usleep(tte->performance_period*1000*1000);
toku_hard_crash_on_purpose(); int total_operations_in_period = 0;
for (int we = 0; we < tte->num_wes; ++we) {
int last = num_operations_completed_total[we];
int current = __sync_fetch_and_add(&tte->wes[we].num_operations_completed, 0);
if (tte->print_thread_performance) {
printf("Thread %d Iteration %d Operations %d\n", we, i, current - last);
}
total_operations_in_period += (current - last);
num_operations_completed_total[we] = current;
} }
if (tte->print_performance) {
printf("Iteration %d Total_Operations %d\n", i, total_operations_in_period);
}
}
int total_operations_in_test = 0;
for (int we = 0; we < tte->num_wes; ++we) {
int current = __sync_fetch_and_add(&tte->wes[we].num_operations_completed, 0);
if (tte->print_thread_performance) {
printf("TOTAL Thread %d Operations %d\n", we, current);
}
total_operations_in_test += current;
}
if (tte->print_performance) {
printf("Total_Operations %d\n", total_operations_in_test);
}
if (verbose) {
printf("should now end test\n");
}
__sync_bool_compare_and_swap(&run_test, true, false); // make this atomic to make valgrind --tool=drd happy.
if (verbose) {
printf("run_test %d\n", run_test);
}
if (tte->crash_at_end) {
toku_hard_crash_on_purpose();
} }
return arg; return arg;
} }
static int run_workers(struct arg *thread_args, int num_threads, u_int32_t num_seconds, bool crash_at_end) { static int run_workers(
struct arg *thread_args,
int num_threads,
u_int32_t num_seconds,
bool crash_at_end,
struct cli_args* cli_args
)
{
int r; int r;
toku_pthread_mutex_t mutex; toku_pthread_mutex_t mutex;
toku_pthread_mutex_init(&mutex, NULL); toku_pthread_mutex_init(&mutex, NULL);
...@@ -623,14 +743,22 @@ static int run_workers(struct arg *thread_args, int num_threads, u_int32_t num_s ...@@ -623,14 +743,22 @@ static int run_workers(struct arg *thread_args, int num_threads, u_int32_t num_s
rwlock_init(&rwlock); rwlock_init(&rwlock);
toku_pthread_t tids[num_threads]; toku_pthread_t tids[num_threads];
toku_pthread_t time_tid; toku_pthread_t time_tid;
struct worker_extra worker_extra[num_threads];
struct test_time_extra tte; struct test_time_extra tte;
tte.num_seconds = num_seconds; tte.num_seconds = num_seconds;
tte.crash_at_end = crash_at_end; tte.crash_at_end = crash_at_end;
tte.wes = worker_extra;
tte.num_wes = num_threads;
tte.print_performance = cli_args->print_performance;
tte.print_thread_performance = cli_args->print_thread_performance;
tte.performance_period = cli_args->performance_period;
run_test = true; run_test = true;
for (int i = 0; i < num_threads; ++i) { for (int i = 0; i < num_threads; ++i) {
thread_args[i].broadcast_lock = &rwlock; worker_extra[i].thread_arg = &thread_args[i];
thread_args[i].broadcast_lock_mutex = &mutex; worker_extra[i].operation_lock = &rwlock;
CHK(toku_pthread_create(&tids[i], NULL, worker, &thread_args[i])); worker_extra[i].operation_lock_mutex = &mutex;
worker_extra[i].num_operations_completed = 0;
CHK(toku_pthread_create(&tids[i], NULL, worker, &worker_extra[i]));
if (verbose) if (verbose)
printf("%lu created\n", tids[i]); printf("%lu created\n", tids[i]);
} }
...@@ -646,88 +774,90 @@ static int run_workers(struct arg *thread_args, int num_threads, u_int32_t num_s ...@@ -646,88 +774,90 @@ static int run_workers(struct arg *thread_args, int num_threads, u_int32_t num_s
if (verbose) if (verbose)
printf("%lu joined\n", tids[i]); printf("%lu joined\n", tids[i]);
} }
rwlock_destroy(&rwlock);
if (verbose) if (verbose)
printf("ending test, pthreads have joined\n"); printf("ending test, pthreads have joined\n");
rwlock_destroy(&rwlock);
toku_pthread_mutex_destroy(&mutex); toku_pthread_mutex_destroy(&mutex);
return r; return r;
} }
static int create_table(DB_ENV **env_res, DB **db_res, static int create_tables(DB_ENV **env_res, DB **db_res, int num_DBs,
int (*bt_compare)(DB *, const DBT *, const DBT *), int (*bt_compare)(DB *, const DBT *, const DBT *),
u_int64_t cachesize, struct env_args env_args
u_int32_t checkpointing_period, ) {
u_int32_t cleaner_period,
u_int32_t cleaner_iterations,
u_int32_t pagesize,
u_int32_t readpagesize,
char *envdir) {
int r; int r;
char rmcmd[32 + strlen(envdir)]; sprintf(rmcmd, "rm -rf %s", envdir); char rmcmd[32 + strlen(env_args.envdir)]; sprintf(rmcmd, "rm -rf %s", env_args.envdir);
r = system(rmcmd); r = system(rmcmd);
CKERR(r); CKERR(r);
r = toku_os_mkdir(envdir, S_IRWXU+S_IRWXG+S_IRWXO); assert(r==0); r = toku_os_mkdir(env_args.envdir, S_IRWXU+S_IRWXG+S_IRWXO); assert(r==0);
DB_ENV *env; DB_ENV *env;
r = db_env_create(&env, 0); assert(r == 0); r = db_env_create(&env, 0); assert(r == 0);
r = env->set_redzone(env, 0); CKERR(r); r = env->set_redzone(env, 0); CKERR(r);
r = env->set_default_bt_compare(env, bt_compare); CKERR(r); r = env->set_default_bt_compare(env, bt_compare); CKERR(r);
r = env->set_cachesize(env, cachesize / (1 << 30), cachesize % (1 << 30), 1); CKERR(r); r = env->set_cachesize(env, env_args.cachetable_size / (1 << 30), env_args.cachetable_size % (1 << 30), 1); CKERR(r);
r = env->set_generate_row_callback_for_put(env, generate_row_for_put); CKERR(r); r = env->set_generate_row_callback_for_put(env, generate_row_for_put); CKERR(r);
r = env->open(env, envdir, DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); r = env->open(env, env_args.envdir, DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r = env->checkpointing_set_period(env, checkpointing_period); CKERR(r); r = env->checkpointing_set_period(env, env_args.checkpointing_period); CKERR(r);
r = env->cleaner_set_period(env, cleaner_period); CKERR(r); r = env->cleaner_set_period(env, env_args.cleaner_period); CKERR(r);
r = env->cleaner_set_iterations(env, cleaner_iterations); CKERR(r); r = env->cleaner_set_iterations(env, env_args.cleaner_iterations); CKERR(r);
*env_res = env;
DB *db;
r = db_create(&db, env, 0);
assert(r == 0);
r = db->set_flags(db, 0);
assert(r == 0);
r = db->set_pagesize(db, pagesize);
assert(r == 0);
r = db->set_readpagesize(db, readpagesize);
assert(r == 0);
r = db->open(db, null_txn, "main", NULL, DB_BTREE, DB_CREATE, 0666);
assert(r == 0);
*env_res = env; for (int i = 0; i < num_DBs; i++) {
*db_res = db; DB *db;
char name[30];
memset(name, 0, sizeof(name));
snprintf(name, sizeof(name), "main%d", i);
r = db_create(&db, env, 0);
CKERR(r);
r = db->set_flags(db, 0);
CKERR(r);
r = db->set_pagesize(db, env_args.node_size);
CKERR(r);
r = db->set_readpagesize(db, env_args.basement_node_size);
CKERR(r);
r = db->open(db, null_txn, name, NULL, DB_BTREE, DB_CREATE, 0666);
CKERR(r);
db_res[i] = db;
}
return r; return r;
} }
static int fill_table_from_fun(DB *db, int num_elements, int max_bufsz, static int fill_table_from_fun(DB *db, int num_elements, int key_bufsz, int val_bufsz,
void (*callback)(int idx, void *extra, void (*callback)(int idx, void *extra,
void *key, int *keysz, void *key, int *keysz,
void *val, int *valsz), void *val, int *valsz),
void *extra) { void *extra) {
int r = 0; int r = 0;
for (long i = 0; i < num_elements; ++i) { for (long i = 0; i < num_elements; ++i) {
char keybuf[max_bufsz], valbuf[max_bufsz]; char keybuf[key_bufsz], valbuf[val_bufsz];
memset(keybuf, 0, sizeof(keybuf));
memset(valbuf, 0, sizeof(valbuf));
int keysz, valsz; int keysz, valsz;
callback(i, extra, keybuf, &keysz, valbuf, &valsz); callback(i, extra, keybuf, &keysz, valbuf, &valsz);
// let's make sure the data stored fits in the buffers we passed in
assert(keysz <= key_bufsz);
assert(valsz <= val_bufsz);
DBT key, val; DBT key, val;
r = db->put(db, null_txn, dbt_init(&key, keybuf, keysz), dbt_init(&val, valbuf, valsz), 0); // make size of data what is specified w/input parameters
// note that key and val have sizes of
// key_bufsz and val_bufsz, which were passed into this
// function, not what was stored by the callback
r = db->put(
db,
null_txn,
dbt_init(&key, keybuf, key_bufsz),
dbt_init(&val, valbuf, val_bufsz),
0
);
assert(r == 0); assert(r == 0);
} }
return r; return r;
} }
static void int_element_callback(int idx, void *UU(extra), void *keyv, int *keysz, void *valv, int *valsz) {
int *key = keyv, *val = valv;
*key = idx;
*val = idx;
*keysz = sizeof(int);
*valsz = sizeof(int);
}
static int fill_table_with_ints(DB *db, int num_elements) __attribute__((unused));
static int fill_table_with_ints(DB *db, int num_elements) {
return fill_table_from_fun(db, num_elements, sizeof(int), int_element_callback, NULL);
}
static void zero_element_callback(int idx, void *UU(extra), void *keyv, int *keysz, void *valv, int *valsz) { static void zero_element_callback(int idx, void *UU(extra), void *keyv, int *keysz, void *valv, int *valsz) {
int *key = keyv, *val = valv; int *key = keyv, *val = valv;
*key = idx; *key = idx;
...@@ -736,30 +866,26 @@ static void zero_element_callback(int idx, void *UU(extra), void *keyv, int *key ...@@ -736,30 +866,26 @@ static void zero_element_callback(int idx, void *UU(extra), void *keyv, int *key
*valsz = sizeof(int); *valsz = sizeof(int);
} }
static int fill_table_with_zeroes(DB *db, int num_elements) { static int fill_tables_with_zeroes(DB **dbs, int num_DBs, int num_elements, u_int32_t key_size, u_int32_t val_size) {
return fill_table_from_fun(db, num_elements, sizeof(int), zero_element_callback, NULL); for (int i = 0; i < num_DBs; i++) {
} assert(key_size >= sizeof(int));
assert(val_size >= sizeof(int));
static int fill_table_from_array(DB *db, int num_elements, void *array, size_t element_size) __attribute__((unused)); int r = fill_table_from_fun(
static int fill_table_from_array(DB *db, int num_elements, void *array, size_t element_size) { dbs[i],
int r = 0; num_elements,
char *a = array; key_size,
for (char *p = a; p < a + num_elements * element_size; p += element_size) { val_size,
DBT key, val; zero_element_callback,
r = db->put(db, null_txn, dbt_init(&key, p, element_size), dbt_init(&val, p, element_size), 0); NULL
assert(r == 0); );
CKERR(r);
} }
return r; return 0;
} }
static int open_table(DB_ENV **env_res, DB **db_res, static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs,
int (*bt_compare)(DB *, const DBT *, const DBT *), int (*bt_compare)(DB *, const DBT *, const DBT *),
u_int64_t cachesize, struct env_args env_args) {
u_int32_t checkpointing_period,
u_int32_t cleaner_period,
u_int32_t cleaner_iterations,
test_update_callback_f f,
char *envdir) {
int r; int r;
/* create the dup database file */ /* create the dup database file */
...@@ -767,55 +893,77 @@ static int open_table(DB_ENV **env_res, DB **db_res, ...@@ -767,55 +893,77 @@ static int open_table(DB_ENV **env_res, DB **db_res,
r = db_env_create(&env, 0); assert(r == 0); r = db_env_create(&env, 0); assert(r == 0);
r = env->set_redzone(env, 0); CKERR(r); r = env->set_redzone(env, 0); CKERR(r);
r = env->set_default_bt_compare(env, bt_compare); CKERR(r); r = env->set_default_bt_compare(env, bt_compare); CKERR(r);
env->set_update(env, f); env->set_update(env, env_args.update_function);
// set the cache size to 10MB // set the cache size to 10MB
r = env->set_cachesize(env, cachesize / (1 << 30), cachesize % (1 << 30), 1); CKERR(r); r = env->set_cachesize(env, env_args.cachetable_size / (1 << 30), env_args.cachetable_size % (1 << 30), 1); CKERR(r);
r = env->set_generate_row_callback_for_put(env, generate_row_for_put); CKERR(r); r = env->set_generate_row_callback_for_put(env, generate_row_for_put); CKERR(r);
r = env->open(env, envdir, DB_RECOVER|DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); r = env->open(env, env_args.envdir, DB_RECOVER|DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r = env->checkpointing_set_period(env, checkpointing_period); CKERR(r); r = env->checkpointing_set_period(env, env_args.checkpointing_period); CKERR(r);
r = env->cleaner_set_period(env, cleaner_period); CKERR(r); r = env->cleaner_set_period(env, env_args.cleaner_period); CKERR(r);
r = env->cleaner_set_iterations(env, cleaner_iterations); CKERR(r); r = env->cleaner_set_iterations(env, env_args.cleaner_iterations); CKERR(r);
DB *db;
r = db_create(&db, env, 0);
assert(r == 0);
r = db->open(db, null_txn, "main", NULL, DB_BTREE, 0, 0666);
assert(r == 0);
*env_res = env; *env_res = env;
*db_res = db;
for (int i = 0; i < num_DBs; i++) {
DB *db;
char name[30];
memset(name, 0, sizeof(name));
snprintf(name, sizeof(name), "main%d", i);
r = db_create(&db, env, 0);
CKERR(r);
r = db->open(db, null_txn, name, NULL, DB_BTREE, 0, 0666);
CKERR(r);
db_res[i] = db;
}
return r; return r;
} }
static int close_table(DB_ENV *env, DB *db) { static int close_tables(DB_ENV *env, DB** dbs, int num_DBs) {
int r; int r;
r = db->close(db, 0); CKERR(r); for (int i = 0; i < num_DBs; i++) {
r = dbs[i]->close(dbs[i], 0); CKERR(r);
}
r = env->close(env, 0); CKERR(r); r = env->close(env, 0); CKERR(r);
return r; return r;
} }
static const struct cli_args DEFAULT_ARGS = { static const struct env_args DEFAULT_ENV_ARGS = {
.num_elements = 150000,
.time_of_test = 180,
.node_size = 4096, .node_size = 4096,
.basement_node_size = 1024, .basement_node_size = 1024,
.cachetable_size = 300000,
.only_create = false,
.only_stress = false,
.checkpointing_period = 10, .checkpointing_period = 10,
.cleaner_period = 1, .cleaner_period = 1,
.cleaner_iterations = 1, .cleaner_iterations = 1,
.update_broadcast_period_ms = 2000, .cachetable_size = 300000,
.num_ptquery_threads = 1,
.update_function = update_op_callback,
.do_test_and_crash = false,
.do_recover = false,
.envdir = ENVDIR, .envdir = ENVDIR,
.num_update_threads = 1, .update_function = update_op_callback,
.crash_on_update_failure = true,
.update_txn_size = 1000,
}; };
#define MIN_VAL_SIZE sizeof(int)
#define MIN_KEY_SIZE sizeof(int)
static struct cli_args get_default_args(void) {
struct cli_args DEFAULT_ARGS = {
.num_elements = 150000,
.num_DBs = 1,
.time_of_test = 180,
.only_create = false,
.only_stress = false,
.update_broadcast_period_ms = 2000,
.num_ptquery_threads = 1,
.do_test_and_crash = false,
.do_recover = false,
.num_update_threads = 1,
.crash_on_update_failure = true,
.print_performance = false,
.print_thread_performance = false,
.performance_period = 1,
.update_txn_size = 1000,
.key_size = MIN_KEY_SIZE,
.val_size = MIN_VAL_SIZE,
.env_args = DEFAULT_ENV_ARGS,
};
return DEFAULT_ARGS;
}
static inline void parse_stress_test_args (int argc, char *const argv[], struct cli_args *args) { static inline void parse_stress_test_args (int argc, char *const argv[], struct cli_args *args) {
const char *argv0=argv[0]; const char *argv0=argv[0];
while (argc>1) { while (argc>1) {
...@@ -830,52 +978,62 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct ...@@ -830,52 +978,62 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct
do_usage: do_usage:
fprintf(stderr, "Usage:\n%s [-h|-v|-q] [OPTIONS] [--only_create|--only_stress]\n", argv0); fprintf(stderr, "Usage:\n%s [-h|-v|-q] [OPTIONS] [--only_create|--only_stress]\n", argv0);
fprintf(stderr, "OPTIONS are among:\n"); fprintf(stderr, "OPTIONS are among:\n");
fprintf(stderr, "\t--num_elements INT (default %d)\n", DEFAULT_ARGS.num_elements); fprintf(stderr, "\t--num_elements INT (default %d)\n", get_default_args().num_elements);
fprintf(stderr, "\t--num_seconds INT (default %ds)\n", DEFAULT_ARGS.time_of_test); fprintf(stderr, "\t--num_DBs INT (default %d)\n", get_default_args().num_DBs);
fprintf(stderr, "\t--node_size INT (default %d bytes)\n", DEFAULT_ARGS.node_size); fprintf(stderr, "\t--num_seconds INT (default %ds)\n", get_default_args().time_of_test);
fprintf(stderr, "\t--basement_node_size INT (default %d bytes)\n", DEFAULT_ARGS.basement_node_size); fprintf(stderr, "\t--node_size INT (default %d bytes)\n", get_default_args().env_args.node_size);
fprintf(stderr, "\t--cachetable_size INT (default %ld bytes)\n", DEFAULT_ARGS.cachetable_size); fprintf(stderr, "\t--basement_node_size INT (default %d bytes)\n", get_default_args().env_args.basement_node_size);
fprintf(stderr, "\t--checkpointing_period INT (default %ds)\n", DEFAULT_ARGS.checkpointing_period); fprintf(stderr, "\t--cachetable_size INT (default %ld bytes)\n", get_default_args().env_args.cachetable_size);
fprintf(stderr, "\t--cleaner_period INT (default %ds)\n", DEFAULT_ARGS.cleaner_period); fprintf(stderr, "\t--checkpointing_period INT (default %ds)\n", get_default_args().env_args.checkpointing_period);
fprintf(stderr, "\t--cleaner_iterations INT (default %ds)\n", DEFAULT_ARGS.cleaner_iterations); fprintf(stderr, "\t--cleaner_period INT (default %ds)\n", get_default_args().env_args.cleaner_period);
fprintf(stderr, "\t--update_broadcast_period INT (default %dms)\n", DEFAULT_ARGS.update_broadcast_period_ms); fprintf(stderr, "\t--cleaner_iterations INT (default %ds)\n", get_default_args().env_args.cleaner_iterations);
fprintf(stderr, "\t--num_ptquery_threads INT (default %d threads)\n", DEFAULT_ARGS.num_ptquery_threads); fprintf(stderr, "\t--update_broadcast_period INT (default %dms)\n", get_default_args().update_broadcast_period_ms);
fprintf(stderr, "\t--num_update_threads INT (default %d threads)\n", DEFAULT_ARGS.num_update_threads); fprintf(stderr, "\t--num_ptquery_threads INT (default %d threads)\n", get_default_args().num_ptquery_threads);
fprintf(stderr, "\t--update_txn_size INT (default %d rows)\n", DEFAULT_ARGS.update_txn_size); fprintf(stderr, "\t--num_update_threads INT (default %d threads)\n", get_default_args().num_update_threads);
fprintf(stderr, "\t--[no-]crash_on_update_failure BOOL (default %s)\n", DEFAULT_ARGS.crash_on_update_failure ? "yes" : "no"); fprintf(stderr, "\t--update_txn_size INT (default %d rows)\n", get_default_args().update_txn_size);
fprintf(stderr, "\t--key_size INT (default %d, minimum %ld)\n", get_default_args().key_size, MIN_KEY_SIZE);
fprintf(stderr, "\t--val_size INT (default %d, minimum %ld)\n", get_default_args().val_size, MIN_VAL_SIZE);
fprintf(stderr, "\t--[no-]crash_on_update_failure BOOL (default %s)\n", get_default_args().crash_on_update_failure ? "yes" : "no");
fprintf(stderr, "\t--print_performance \n");
fprintf(stderr, "\t--print_thread_performance \n");
fprintf(stderr, "\t--performance_period INT (default %d)\n", get_default_args().performance_period);
exit(resultcode); exit(resultcode);
} }
else if (strcmp(argv[1], "--num_elements") == 0) { else if (strcmp(argv[1], "--num_elements") == 0) {
argc--; argv++; argc--; argv++;
args->num_elements = atoi(argv[1]); args->num_elements = atoi(argv[1]);
} }
else if (strcmp(argv[1], "--num_DBs") == 0) {
argc--; argv++;
args->num_DBs = atoi(argv[1]);
}
else if (strcmp(argv[1], "--num_seconds") == 0) { else if (strcmp(argv[1], "--num_seconds") == 0) {
argc--; argv++; argc--; argv++;
args->time_of_test = atoi(argv[1]); args->time_of_test = atoi(argv[1]);
} }
else if (strcmp(argv[1], "--node_size") == 0) { else if (strcmp(argv[1], "--node_size") == 0) {
argc--; argv++; argc--; argv++;
args->node_size = atoi(argv[1]); args->env_args.node_size = atoi(argv[1]);
} }
else if (strcmp(argv[1], "--basement_node_size") == 0) { else if (strcmp(argv[1], "--basement_node_size") == 0) {
argc--; argv++; argc--; argv++;
args->basement_node_size = atoi(argv[1]); args->env_args.basement_node_size = atoi(argv[1]);
} }
else if (strcmp(argv[1], "--cachetable_size") == 0) { else if (strcmp(argv[1], "--cachetable_size") == 0) {
argc--; argv++; argc--; argv++;
args->cachetable_size = strtoll(argv[1], NULL, 0); args->env_args.cachetable_size = strtoll(argv[1], NULL, 0);
} }
else if (strcmp(argv[1], "--checkpointing_period") == 0) { else if (strcmp(argv[1], "--checkpointing_period") == 0) {
argc--; argv++; argc--; argv++;
args->checkpointing_period = atoi(argv[1]); args->env_args.checkpointing_period = atoi(argv[1]);
} }
else if (strcmp(argv[1], "--cleaner_period") == 0) { else if (strcmp(argv[1], "--cleaner_period") == 0) {
argc--; argv++; argc--; argv++;
args->cleaner_period = atoi(argv[1]); args->env_args.cleaner_period = atoi(argv[1]);
} }
else if (strcmp(argv[1], "--cleaner_iterations") == 0) { else if (strcmp(argv[1], "--cleaner_iterations") == 0) {
argc--; argv++; argc--; argv++;
args->cleaner_iterations = atoi(argv[1]); args->env_args.cleaner_iterations = atoi(argv[1]);
} }
else if (strcmp(argv[1], "--update_broadcast_period") == 0) { else if (strcmp(argv[1], "--update_broadcast_period") == 0) {
argc--; argv++; argc--; argv++;
...@@ -895,10 +1053,30 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct ...@@ -895,10 +1053,30 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct
else if (strcmp(argv[1], "--no-crash_on_update_failure") == 0) { else if (strcmp(argv[1], "--no-crash_on_update_failure") == 0) {
args->crash_on_update_failure = false; args->crash_on_update_failure = false;
} }
else if (strcmp(argv[1], "--print_performance") == 0) {
args->print_performance = true;
}
else if (strcmp(argv[1], "--print_thread_performance") == 0) {
args->print_thread_performance = true;
}
else if (strcmp(argv[1], "--performance_period") == 0) {
argc--; argv++;
args->performance_period = atoi(argv[1]);
}
else if (strcmp(argv[1], "--update_txn_size") == 0) { else if (strcmp(argv[1], "--update_txn_size") == 0) {
argc--; argv++; argc--; argv++;
args->update_txn_size = atoi(argv[1]); args->update_txn_size = atoi(argv[1]);
} }
else if (strcmp(argv[1], "--key_size") == 0) {
argc--; argv++;
args->key_size = atoi(argv[1]);
assert(args->key_size >= MIN_KEY_SIZE);
}
else if (strcmp(argv[1], "--val_size") == 0) {
argc--; argv++;
args->val_size = atoi(argv[1]);
assert(args->val_size >= MIN_VAL_SIZE);
}
else if (strcmp(argv[1], "--only_create") == 0) { else if (strcmp(argv[1], "--only_create") == 0) {
args->only_create = true; args->only_create = true;
} }
...@@ -913,7 +1091,7 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct ...@@ -913,7 +1091,7 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct
} }
else if (strcmp(argv[1], "--envdir") == 0 && argc > 1) { else if (strcmp(argv[1], "--envdir") == 0 && argc > 1) {
argc--; argv++; argc--; argv++;
args->envdir = argv[1]; args->env_args.envdir = argv[1];
} }
else { else {
resultcode=1; resultcode=1;
...@@ -930,64 +1108,73 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct ...@@ -930,64 +1108,73 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct
static void static void
stress_table(DB_ENV *, DB **, struct cli_args *); stress_table(DB_ENV *, DB **, struct cli_args *);
static int
stress_int_dbt_cmp (DB *db, const DBT *a, const DBT *b) {
assert(db && a && b);
assert(a->size >= sizeof(int));
assert(b->size >= sizeof(int));
int x = *(int *) a->data;
int y = *(int *) b->data;
if (x<y) return -1;
if (x>y) return 1;
return 0;
}
static void static void
stress_test_main(struct cli_args *args) stress_test_main(struct cli_args *args)
{ {
DB_ENV* env = NULL; DB_ENV* env = NULL;
DB* db = NULL; DB* dbs[args->num_DBs];
memset(dbs, 0, sizeof(dbs));
if (!args->only_stress) { if (!args->only_stress) {
create_table( create_tables(
&env, &env,
&db, dbs,
int_dbt_cmp, args->num_DBs,
args->cachetable_size, stress_int_dbt_cmp,
args->checkpointing_period, args->env_args
args->cleaner_period, );
args->cleaner_iterations, CHK(fill_tables_with_zeroes(dbs, args->num_DBs, args->num_elements, args->key_size, args->val_size));
args->node_size, CHK(close_tables(env, dbs, args->num_DBs));
args->basement_node_size,
args->envdir);
CHK(fill_table_with_zeroes(db, args->num_elements));
CHK(close_table(env, db));
} }
if (!args->only_create) { if (!args->only_create) {
CHK(open_table(&env, CHK(open_tables(&env,
&db, dbs,
int_dbt_cmp, args->num_DBs,
args->cachetable_size, //cachetable size stress_int_dbt_cmp,
args->checkpointing_period, // checkpoint period args->env_args));
args->cleaner_period, stress_table(env, dbs, args);
args->cleaner_iterations, CHK(close_tables(env, dbs, args->num_DBs));
args->update_function,
args->envdir));
stress_table(env, &db, args);
CHK(close_table(env, db));
} }
} }
static void static void
UU() stress_recover(struct cli_args *args) { UU() stress_recover(struct cli_args *args) {
DB_ENV* env = NULL; DB_ENV* env = NULL;
DB* db = NULL; DB* dbs[args->num_DBs];
CHK(open_table(&env, memset(dbs, 0, sizeof(dbs));
&db, CHK(open_tables(&env,
int_dbt_cmp, dbs,
args->cachetable_size, //cachetable size args->num_DBs,
args->checkpointing_period, // checkpoint period stress_int_dbt_cmp,
args->cleaner_period, args->env_args));
args->cleaner_iterations,
args->update_function,
args->envdir));
DB_TXN* txn = NULL; DB_TXN* txn = NULL;
struct arg recover_args; struct arg recover_args;
arg_init(&recover_args, args->num_elements, &db, env, args); arg_init(&recover_args, args->num_elements, dbs, env, args);
int r = env->txn_begin(env, 0, &txn, recover_args.txn_type); int r = env->txn_begin(env, 0, &txn, recover_args.txn_type);
CKERR(r); CKERR(r);
r = scan_op_and_maybe_check_sum(env, &db, txn, &recover_args, true); struct scan_op_extra soe;
soe.fast = TRUE;
soe.fwd = TRUE;
soe.prefetch = FALSE;
r = scan_op(txn, &recover_args, &soe);
CKERR(r); CKERR(r);
CHK(txn->commit(txn,0)); CHK(txn->commit(txn,0));
CHK(close_table(env, db)); CHK(close_tables(env, dbs, args->num_DBs));
} }
#endif #endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment