/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: #ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved." #ident "$Id$" #include <db.h> #include <portability/toku_atomic.h> #include "test.h" #include "threaded_stress_test_helpers.h" // // This test tries to emulate iibench at the ydb layer. // // The schema is simple: // 8 byte primary key // 8 byte key A // 8 byte key B // 8 byte key C // // There's one primary DB for the pk and three secondary DBs. // // The primary key stores the other columns as the value. // The secondary keys have the primary key appended to them. // static const int iibench_num_dbs = 4; static const size_t iibench_secondary_key_size = sizeof(uint64_t) * 2; struct iibench_row { int64_t pk; int64_t a; int64_t b; int64_t c; }; static int64_t hash(int64_t key) { int64_t hash = 0; char *buf = (char *) &key; for (int i = 0; i < 8; i++) { hash += (((buf[i] + 1) * 17) & 0xFF) << (i * 8); } return hash; } static void iibench_generate_secondary_keys(int64_t pk, struct iibench_row *row) { row->a = hash(pk); row->b = hash(pk * 2); row->c = hash(pk * 3); } static void iibench_verify_row(struct iibench_row *row) { (void) iibench_verify_row; struct iibench_row expected_row; iibench_generate_secondary_keys(row->pk, &expected_row); invariant(row->a == expected_row.a); invariant(row->b == expected_row.b); invariant(row->c == expected_row.c); } static void iibench_fill_key_buf(int64_t pk, int64_t *buf) { memcpy(&buf[0], &pk, sizeof(int64_t)); } static void iibench_fill_val_buf(int64_t pk, int64_t *buf) { struct iibench_row row; iibench_generate_secondary_keys(pk, &row); memcpy(&buf[0], &row.a, sizeof(row.a)); memcpy(&buf[1], &row.b, sizeof(row.b)); memcpy(&buf[2], &row.c, sizeof(row.c)); } struct iibench_op_extra { uint64_t autoincrement; }; static int UU() iibench_put_op(DB_TXN *txn, ARG arg, void *operation_extra, void *stats_extra) { DB **dbs = arg->dbp; DB_ENV *env = arg->env; DBT mult_key_dbt[iibench_num_dbs]; DBT mult_val_dbt[iibench_num_dbs]; uint32_t mult_put_flags[iibench_num_dbs]; memset(mult_key_dbt, 0, sizeof(mult_key_dbt)); memset(mult_val_dbt, 0, sizeof(mult_val_dbt)); // The first index is unique with serial autoincrement keys. // The rest are have keys generated with this thread's random data. mult_put_flags[0] = get_put_flags(arg->cli) | DB_NOOVERWRITE; for (int i = 1; i < iibench_num_dbs; i++) { // Secondary keys have the primary key appended to them. mult_key_dbt[i].size = iibench_secondary_key_size; mult_key_dbt[i].data = toku_xmalloc(iibench_secondary_key_size); mult_key_dbt[i].flags = DB_DBT_REALLOC; mult_put_flags[i] = get_put_flags(arg->cli); } int r = 0; uint64_t puts_to_increment = 0; for (uint32_t i = 0; i < arg->cli->txn_size; ++i) { struct iibench_op_extra *CAST_FROM_VOIDP(info, operation_extra); // Get a random primary key, generate secondary key columns in valbuf uint64_t pk = toku_sync_fetch_and_add(&info->autoincrement, 1); int64_t keybuf[1]; int64_t valbuf[3]; iibench_fill_key_buf(pk, keybuf); iibench_fill_val_buf(pk, valbuf); dbt_init(&mult_key_dbt[0], keybuf, sizeof keybuf); dbt_init(&mult_val_dbt[0], valbuf, sizeof valbuf); r = env->put_multiple( env, dbs[0], // source db. txn, &mult_key_dbt[0], // source db key &mult_val_dbt[0], // source db value iibench_num_dbs, // total number of dbs dbs, // array of dbs mult_key_dbt, // array of keys mult_val_dbt, // array of values mult_put_flags // array of flags ); if (r != 0) { goto cleanup; } puts_to_increment++; if (puts_to_increment == 100) { increment_counter(stats_extra, PUTS, puts_to_increment); puts_to_increment = 0; } } cleanup: for (int i = 1; i < iibench_num_dbs; i++) { toku_free(mult_key_dbt[i].data); } return r; } static void stress_table(DB_ENV* env, DB **dbs, struct cli_args *cli_args) { if (verbose) printf("starting creation of pthreads\n"); const int num_threads = cli_args->num_put_threads; struct iibench_op_extra iib_extra = { .autoincrement = 0 }; struct arg myargs[num_threads]; for (int i = 0; i < num_threads; i++) { arg_init(&myargs[i], dbs, env, cli_args); myargs[i].operation = iibench_put_op; myargs[i].operation_extra = &iib_extra; } const bool crash_at_end = false; run_workers(myargs, num_threads, cli_args->num_seconds, crash_at_end, cli_args); } static int iibench_generate_row_for_put(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *UU(src_key), const DBT *src_val) { DESCRIPTOR desc = dest_db->cmp_descriptor; invariant(src_db != dest_db); invariant_notnull(src_key->data); invariant(src_key->size == sizeof(int64_t)); invariant(dest_key->size == iibench_secondary_key_size); invariant(dest_key->flags == DB_DBT_REALLOC); invariant_notnull(desc->dbt.data); invariant(desc->dbt.size == sizeof(int)); // Get the column index from the descriptor. This is a secondary index // so it has to be greater than zero (which would be the pk). Then // grab the appropriate secondary key from the source val, which is // an array of the 3 columns, so we have to subtract 1 from the index. int column_index; memcpy(&column_index, desc->dbt.data, desc->dbt.size); invariant(column_index > 0 && column_index < 4); int64_t *CAST_FROM_VOIDP(columns, src_val->data); int64_t secondary_key = columns[column_index - 1]; // First write down the secondary key, then the primary key (in src_key) int64_t *CAST_FROM_VOIDP(dest_key_buf, dest_key->data); memcpy(&dest_key_buf[0], &secondary_key, sizeof(secondary_key)); memcpy(&dest_key_buf[1], src_key->data, src_key->size); dest_val->data = nullptr; dest_val->size = 0; return 0; } // After each DB opens, set the descriptor to store the DB idx value. // Close and reopen the DB so we can use db->cmp_descriptor during comparisons. static DB *iibench_set_descriptor_after_db_opens(DB_ENV *env, DB *db, int idx, reopen_db_fn reopen, struct cli_args *cli_args) { int r; DBT desc_dbt; desc_dbt.data = &idx; desc_dbt.size = sizeof(idx); desc_dbt.ulen = 0; desc_dbt.flags = 0; r = db->change_descriptor(db, nullptr, &desc_dbt, 0); CKERR(r); r = db->close(db, 0); CKERR(r); r = db_create(&db, env, 0); CKERR(r); reopen(db, idx, cli_args); return db; } int test_main(int argc, char *const argv[]) { struct cli_args args = get_default_args_for_perf(); args.num_elements = 0; // want to start with empty DBs // Puts per transaction is configurable. It defaults to 1k. args.txn_size = 1000; parse_stress_test_args(argc, argv, &args); // The index count and schema are not configurable. Silently ignore whatever was passed in. args.num_DBs = 4; args.key_size = 8; args.val_size = 32; // when there are multiple threads, its valid for two of them to // generate the same key and one of them fail with DB_LOCK_NOTGRANTED if (args.num_put_threads > 1) { args.crash_on_operation_failure = false; } args.env_args.generate_put_callback = iibench_generate_row_for_put; after_db_open_hook = iibench_set_descriptor_after_db_opens; perf_test_main(&args); return 0; }