Commit 9c08070a authored by Leif Walsh's avatar Leif Walsh Committed by Yoni Fogel

[t:5027] clean up a little, add serial inserts and interleaving


git-svn-id: file:///svn/toku/tokudb@44375 c7de825b-a66e-492c-adef-691d508d4ae1
parent 8f9b7b83
...@@ -24,9 +24,12 @@ stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) { ...@@ -24,9 +24,12 @@ stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) {
if (verbose) printf("starting creation of pthreads\n"); if (verbose) printf("starting creation of pthreads\n");
const int num_threads = cli_args->num_put_threads; const int num_threads = cli_args->num_put_threads;
struct arg myargs[num_threads]; struct arg myargs[num_threads];
operation_t put_op = (cli_args->serial_insert
? serial_put_op
: random_put_op_singledb);
for (int i = 0; i < num_threads; i++) { for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], dbp, env, cli_args); arg_init(&myargs[i], dbp, env, cli_args);
myargs[i].operation = random_put_op_singledb; myargs[i].operation = put_op;
} }
run_workers(myargs, num_threads, cli_args->time_of_test, false, cli_args); run_workers(myargs, num_threads, cli_args->time_of_test, false, cli_args);
} }
......
...@@ -129,6 +129,9 @@ struct cli_args { ...@@ -129,6 +129,9 @@ struct cli_args {
bool do_recover; // true if we should run recover bool do_recover; // true if we should run recover
int num_update_threads; // number of threads running updates int num_update_threads; // number of threads running updates
int num_put_threads; // number of threads running puts int num_put_threads; // number of threads running puts
bool serial_insert;
bool interleave; // for insert benchmarks, whether to interleave
// separate threads' puts (or segregate them)
bool crash_on_operation_failure; bool crash_on_operation_failure;
bool print_performance; bool print_performance;
bool print_thread_performance; bool print_thread_performance;
...@@ -553,9 +556,6 @@ fill_zeroed_array(uint8_t *data, uint32_t size, struct random_data *random_data, ...@@ -553,9 +556,6 @@ fill_zeroed_array(uint8_t *data, uint32_t size, struct random_data *random_data,
static int random_put_in_db(DB *db, DB_TXN *txn, ARG arg, void *stats_extra) { static int random_put_in_db(DB *db, DB_TXN *txn, ARG arg, void *stats_extra) {
int r = 0; int r = 0;
char buf[100];
ZERO_ARRAY(buf);
uint64_t i;
union { union {
uint64_t key; uint64_t key;
uint16_t i[4]; uint16_t i[4];
...@@ -564,9 +564,15 @@ static int random_put_in_db(DB *db, DB_TXN *txn, ARG arg, void *stats_extra) { ...@@ -564,9 +564,15 @@ static int random_put_in_db(DB *db, DB_TXN *txn, ARG arg, void *stats_extra) {
ZERO_STRUCT(rand_key); ZERO_STRUCT(rand_key);
uint8_t valbuf[arg->cli->val_size]; uint8_t valbuf[arg->cli->val_size];
ZERO_ARRAY(valbuf); ZERO_ARRAY(valbuf);
uint64_t i;
for (i = 0; i < arg->cli->txn_size; ++i) { for (i = 0; i < arg->cli->txn_size; ++i) {
rand_key.key = randu64(arg->random_data); rand_key.key = randu64(arg->random_data);
if (arg->cli->interleave) {
rand_key.i[4] = arg->thread_idx;
} else {
rand_key.i[0] = arg->thread_idx; rand_key.i[0] = arg->thread_idx;
}
fill_zeroed_array(valbuf, arg->cli->val_size, arg->random_data, arg->cli->compressibility); fill_zeroed_array(valbuf, arg->cli->val_size, arg->random_data, arg->cli->compressibility);
DBT key, val; DBT key, val;
dbt_init(&key, &rand_key, sizeof rand_key); dbt_init(&key, &rand_key, sizeof rand_key);
...@@ -593,6 +599,48 @@ static int UU() random_put_op_singledb(DB_TXN *txn, ARG arg, void *UU(operation_ ...@@ -593,6 +599,48 @@ static int UU() random_put_op_singledb(DB_TXN *txn, ARG arg, void *UU(operation_
return random_put_in_db(db, txn, arg, stats_extra); return random_put_in_db(db, txn, arg, stats_extra);
} }
struct serial_put_extra {
uint64_t current;
};
static int serial_put_op(DB_TXN *txn, ARG arg, void *operation_extra, void *stats_extra) {
struct serial_put_extra *extra = operation_extra;
int db_index = arg->thread_idx % arg->cli->num_DBs;
DB* db = arg->dbp[db_index];
int r = 0;
union {
uint64_t key;
uint16_t i[4];
uint8_t b[arg->cli->key_size];
} rand_key;
ZERO_STRUCT(rand_key);
uint8_t valbuf[arg->cli->val_size];
ZERO_ARRAY(valbuf);
uint64_t i;
for (i = 0; i < arg->cli->txn_size; ++i) {
rand_key.key = extra->current++;
if (arg->cli->interleave) {
rand_key.i[4] = arg->thread_idx;
} else {
rand_key.i[0] = arg->thread_idx;
}
fill_zeroed_array(valbuf, arg->cli->val_size, arg->random_data, arg->cli->compressibility);
DBT key, val;
dbt_init(&key, &rand_key, sizeof rand_key);
dbt_init(&val, valbuf, sizeof valbuf);
r = db->put(db, txn, &key, &val, 0);
if (r != 0) {
goto cleanup;
}
}
cleanup:
increment_counter(stats_extra, PUTS, i);
return r;
}
static int UU() loader_op(DB_TXN* txn, ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) { static int UU() loader_op(DB_TXN* txn, ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) {
DB_ENV* env = arg->env; DB_ENV* env = arg->env;
int r; int r;
...@@ -1309,6 +1357,8 @@ static struct cli_args UU() get_default_args(void) { ...@@ -1309,6 +1357,8 @@ static struct cli_args UU() get_default_args(void) {
.do_recover = false, .do_recover = false,
.num_update_threads = 1, .num_update_threads = 1,
.num_put_threads = 1, .num_put_threads = 1,
.serial_insert = false,
.interleave = false,
.crash_on_operation_failure = true, .crash_on_operation_failure = true,
.print_performance = false, .print_performance = false,
.print_thread_performance = false, .print_thread_performance = false,
...@@ -1663,6 +1713,8 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct ...@@ -1663,6 +1713,8 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct
UINT32_ARG_R("--key_size", key_size, " bytes", MIN_KEY_SIZE, UINT32_MAX), UINT32_ARG_R("--key_size", key_size, " bytes", MIN_KEY_SIZE, UINT32_MAX),
UINT32_ARG_R("--val_size", val_size, " bytes", MIN_VAL_SIZE, UINT32_MAX), UINT32_ARG_R("--val_size", val_size, " bytes", MIN_VAL_SIZE, UINT32_MAX),
BOOL_ARG("serial_insert", serial_insert),
BOOL_ARG("interleave", interleave),
BOOL_ARG("crash_on_operation_failure", crash_on_operation_failure), BOOL_ARG("crash_on_operation_failure", crash_on_operation_failure),
BOOL_ARG("single_txn", single_txn), BOOL_ARG("single_txn", single_txn),
BOOL_ARG("warm_cache", warm_cache), BOOL_ARG("warm_cache", warm_cache),
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment