Commit e2ebb90b authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

#3010,#3023 merge *multiple changes to main, i mean it this time refs[t:3010] refs[t:3023]

git-svn-id: file:///svn/toku/tokudb@25308 c7de825b-a66e-492c-adef-691d508d4ae1
parent ab996be6
......@@ -276,7 +276,8 @@ struct __toku_db_env {
const DBT *old_src_key, const DBT *old_src_data,
const DBT *new_src_key, const DBT *new_src_data,
uint32_t num_dbs, DB **db_array,
uint32_t num_dbts, DBT *keys, DBT *vals,
uint32_t num_keys, DBT *keys,
uint32_t num_vals, DBT *vals,
void *extra) /* update multiple DBs */;
int (*get_redzone) (DB_ENV *env, int *redzone) /* get the redzone limit */;
int (*set_redzone) (DB_ENV *env, int redzone) /* set the redzone limit in percent of total space */;
......
......@@ -278,7 +278,8 @@ struct __toku_db_env {
const DBT *old_src_key, const DBT *old_src_data,
const DBT *new_src_key, const DBT *new_src_data,
uint32_t num_dbs, DB **db_array,
uint32_t num_dbts, DBT *keys, DBT *vals,
uint32_t num_keys, DBT *keys,
uint32_t num_vals, DBT *vals,
void *extra) /* update multiple DBs */;
int (*get_redzone) (DB_ENV *env, int *redzone) /* get the redzone limit */;
int (*set_redzone) (DB_ENV *env, int redzone) /* set the redzone limit in percent of total space */;
......
......@@ -278,7 +278,8 @@ struct __toku_db_env {
const DBT *old_src_key, const DBT *old_src_data,
const DBT *new_src_key, const DBT *new_src_data,
uint32_t num_dbs, DB **db_array,
uint32_t num_dbts, DBT *keys, DBT *vals,
uint32_t num_keys, DBT *keys,
uint32_t num_vals, DBT *vals,
void *extra) /* update multiple DBs */;
int (*get_redzone) (DB_ENV *env, int *redzone) /* get the redzone limit */;
int (*set_redzone) (DB_ENV *env, int redzone) /* set the redzone limit in percent of total space */;
......
......@@ -278,7 +278,8 @@ struct __toku_db_env {
const DBT *old_src_key, const DBT *old_src_data,
const DBT *new_src_key, const DBT *new_src_data,
uint32_t num_dbs, DB **db_array,
uint32_t num_dbts, DBT *keys, DBT *vals,
uint32_t num_keys, DBT *keys,
uint32_t num_vals, DBT *vals,
void *extra) /* update multiple DBs */;
int (*get_redzone) (DB_ENV *env, int *redzone) /* get the redzone limit */;
int (*set_redzone) (DB_ENV *env, int redzone) /* set the redzone limit in percent of total space */;
......
......@@ -279,7 +279,8 @@ struct __toku_db_env {
const DBT *old_src_key, const DBT *old_src_data,
const DBT *new_src_key, const DBT *new_src_data,
uint32_t num_dbs, DB **db_array,
uint32_t num_dbts, DBT *keys, DBT *vals,
uint32_t num_keys, DBT *keys,
uint32_t num_vals, DBT *vals,
void *extra) /* update multiple DBs */;
int (*get_redzone) (DB_ENV *env, int *redzone) /* get the redzone limit */;
int (*set_redzone) (DB_ENV *env, int redzone) /* set the redzone limit in percent of total space */;
......
......@@ -577,7 +577,8 @@ int main (int argc __attribute__((__unused__)), char *const argv[] __attribute__
" const DBT *old_src_key, const DBT *old_src_data,\n"
" const DBT *new_src_key, const DBT *new_src_data,\n"
" uint32_t num_dbs, DB **db_array,\n"
" uint32_t num_dbts, DBT *keys, DBT *vals,\n"
" uint32_t num_keys, DBT *keys,\n"
" uint32_t num_vals, DBT *vals,\n"
" void *extra) /* update multiple DBs */",
"int (*get_redzone) (DB_ENV *env, int *redzone) /* get the redzone limit */",
"int (*set_redzone) (DB_ENV *env, int redzone) /* set the redzone limit in percent of total space */",
......
......@@ -279,7 +279,8 @@ struct __toku_db_env {
const DBT *old_src_key, const DBT *old_src_data,
const DBT *new_src_key, const DBT *new_src_data,
uint32_t num_dbs, DB **db_array,
uint32_t num_dbts, DBT *keys, DBT *vals,
uint32_t num_keys, DBT *keys,
uint32_t num_vals, DBT *vals,
void *extra) /* update multiple DBs */;
int (*get_redzone) (DB_ENV *env, int *redzone) /* get the redzone limit */;
int (*set_redzone) (DB_ENV *env, int redzone) /* set the redzone limit in percent of total space */;
......
......@@ -279,7 +279,8 @@ struct __toku_db_env {
const DBT *old_src_key, const DBT *old_src_data,
const DBT *new_src_key, const DBT *new_src_data,
uint32_t num_dbs, DB **db_array,
uint32_t num_dbts, DBT *keys, DBT *vals,
uint32_t num_keys, DBT *keys,
uint32_t num_vals, DBT *vals,
void *extra) /* update multiple DBs */;
int (*get_redzone) (DB_ENV *env, int *redzone) /* get the redzone limit */;
int (*set_redzone) (DB_ENV *env, int redzone) /* set the redzone limit in percent of total space */;
......
......@@ -89,6 +89,7 @@ BDB_DONTRUN_TESTS = \
cursor-isolation \
del-simple \
del-multiple \
del-multiple-huge-primary-row \
diskfull \
env-put-multiple \
env_startup \
......
#include "test.h"
// verify that del_multiple logs individual delete log entries in the recovery log when
// the sum of the log sizes of the individual deletes.
static int
get_key(int i, int dbnum) {
return htonl(i + dbnum);
}
static void
get_data(int *v, int i, int ndbs) {
for (int dbnum = 0; dbnum < ndbs; dbnum++) {
v[dbnum] = get_key(i, dbnum);
}
}
static int
del_callback(DB *dest_db, DB *src_db, DBT *dest_key, const DBT *src_key, const DBT *src_data, void *extra) {
dest_db = dest_db; src_db = src_db; dest_key = dest_key; src_key = src_key; src_data = src_data;
assert(src_db == NULL);
assert(extra == NULL);
unsigned int dbnum;
assert(dest_db->descriptor->dbt.size == sizeof dbnum);
memcpy(&dbnum, dest_db->descriptor->dbt.data, sizeof dbnum);
assert(dbnum < src_data->size / sizeof (int));
int *pri_data = (int *) src_data->data;
assert(dest_key->flags == 0);
dest_key->size = sizeof (int);
dest_key->data = &pri_data[dbnum];
return 0;
}
static void
verify_locked(DB_ENV *env, DB *db, int k) {
int r;
DB_TXN *txn = NULL;
r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
DBT key; dbt_init(&key, &k, sizeof k);
r = db->del(db, txn, &key, DB_DELETE_ANY); assert(r == DB_LOCK_NOTGRANTED);
r = txn->abort(txn); assert_zero(r);
}
static void
verify_empty(DB_ENV *env, DB *db) {
int r;
DB_TXN *txn = NULL;
r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
DBC *cursor = NULL;
r = db->cursor(db, txn, &cursor, 0); assert_zero(r);
int i;
for (i = 0; ; i++) {
DBT key; memset(&key, 0, sizeof key);
DBT val; memset(&val, 0, sizeof val);
r = cursor->c_get(cursor, &key, &val, DB_NEXT);
if (r != 0)
break;
}
assert_zero(i);
r = cursor->c_close(cursor); assert_zero(r);
r = txn->commit(txn, 0); assert_zero(r);
}
static int
max(int a, int b) {
return a < b ? b : a;
}
static void
verify_del_multiple(DB_ENV *env, DB *db[], int ndbs, int nrows) {
int r;
DB_TXN *deltxn = NULL;
r = env->txn_begin(env, NULL, &deltxn, 0); assert_zero(r);
for (int i = 0; i < nrows; i++) {
int k = get_key(i, 0);
DBT pri_key; dbt_init(&pri_key, &k, sizeof k);
int v[max(ndbs,1024)]; get_data(v, i, ndbs);
DBT pri_data; dbt_init(&pri_data, &v[0], sizeof v);
DBT keys[ndbs]; memset(keys, 0, sizeof keys);
uint32_t flags[ndbs]; memset(flags, 0, sizeof flags);
r = env->del_multiple(env, NULL, deltxn, &pri_key, &pri_data, ndbs, db, keys, flags, NULL); assert_zero(r);
for (int dbnum = 0; dbnum < ndbs; dbnum++)
verify_locked(env, db[dbnum], get_key(i, dbnum));
}
r = deltxn->commit(deltxn, 0); assert_zero(r);
for (int dbnum = 0; dbnum < ndbs; dbnum++)
verify_empty(env, db[dbnum]);
}
static void
populate_primary(DB_ENV *env, DB *db, int ndbs, int nrows) {
int r;
DB_TXN *txn = NULL;
r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
// populate
for (int i = 0; i < nrows; i++) {
int k = get_key(i, 0);
int v[max(ndbs, 1024)]; memset(v, 0, sizeof v); get_data(v, i, ndbs);
DBT key; dbt_init(&key, &k, sizeof k);
DBT val; dbt_init(&val, &v[0], sizeof v);
r = db->put(db, txn, &key, &val, DB_YESOVERWRITE); assert_zero(r);
}
r = txn->commit(txn, 0); assert_zero(r);
}
static void
populate_secondary(DB_ENV *env, DB *db, int dbnum, int nrows) {
int r;
DB_TXN *txn = NULL;
r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
// populate
for (int i = 0; i < nrows; i++) {
int k = get_key(i, dbnum);
DBT key; dbt_init(&key, &k, sizeof k);
DBT val; dbt_init(&val, NULL, 0);
r = db->put(db, txn, &key, &val, DB_YESOVERWRITE); assert_zero(r);
}
r = txn->commit(txn, 0); assert_zero(r);
}
static void
run_test(int ndbs, int nrows) {
int r;
DB_ENV *env = NULL;
r = db_env_create(&env, 0); assert_zero(r);
r = env->set_generate_row_callback_for_del(env, del_callback); assert_zero(r);
r = env->open(env, ENVDIR, DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
DB *db[ndbs];
for (int dbnum = 0; dbnum < ndbs; dbnum++) {
r = db_create(&db[dbnum], env, 0); assert_zero(r);
DBT dbt_dbnum; dbt_init(&dbt_dbnum, &dbnum, sizeof dbnum);
r = db[dbnum]->set_descriptor(db[dbnum], 1, &dbt_dbnum); assert_zero(r);
char dbname[32]; sprintf(dbname, "%d.tdb", dbnum);
r = db[dbnum]->open(db[dbnum], NULL, dbname, NULL, DB_BTREE, DB_AUTO_COMMIT+DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
}
for (int dbnum = 0; dbnum < ndbs; dbnum++) {
if (dbnum == 0)
populate_primary(env, db[dbnum], ndbs, nrows);
else
populate_secondary(env, db[dbnum], dbnum, nrows);
}
verify_del_multiple(env, db, ndbs, nrows);
for (int dbnum = 0; dbnum < ndbs; dbnum++)
r = db[dbnum]->close(db[dbnum], 0); assert_zero(r);
r = env->close(env, 0); assert_zero(r);
}
int
test_main(int argc, char * const argv[]) {
int r;
int ndbs = 2;
int nrows = 2;
// parse_args(argc, argv);
for (int i = 1; i < argc; i++) {
char * const arg = argv[i];
if (strcmp(arg, "-v") == 0) {
verbose++;
continue;
}
if (strcmp(arg, "-q") == 0) {
verbose = 0;
continue;
}
if (strcmp(arg, "--ndbs") == 0 && i+1 < argc) {
ndbs = atoi(argv[++i]);
continue;
}
if (strcmp(arg, "--nrows") == 0 && i+1 < argc) {
nrows = atoi(argv[++i]);
continue;
}
}
r = system("rm -rf " ENVDIR); assert_zero(r);
r = toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
run_test(ndbs, nrows);
return 0;
}
......@@ -111,7 +111,7 @@ update_diagonal(DB_ENV *env, DB_TXN *txn, DB *db[], int ndbs, int nrows) {
DBT keys[ndbts]; memset(keys, 0, sizeof keys);
DBT vals[ndbts]; memset(vals, 0, sizeof vals);
r = env->update_multiple(env, NULL, txn, &old_key, &old_data, &new_key, &new_data, ndbs, db, ndbts, keys, vals, NULL);
r = env->update_multiple(env, NULL, txn, &old_key, &old_data, &new_key, &new_data, ndbs, db, ndbts, keys, ndbts, vals, NULL);
assert_zero(r);
}
}
......
......@@ -111,7 +111,7 @@ update_diagonal(DB_ENV *env, DB_TXN *txn, DB *db[], int ndbs, int nrows) {
DBT keys[ndbts]; memset(keys, 0, sizeof keys);
DBT vals[ndbts]; memset(vals, 0, sizeof vals);
r = env->update_multiple(env, NULL, txn, &old_key, &old_data, &new_key, &new_data, ndbs, db, ndbts, keys, vals, NULL);
r = env->update_multiple(env, NULL, txn, &old_key, &old_data, &new_key, &new_data, ndbs, db, ndbts, keys, ndbts, vals, NULL);
assert_zero(r);
}
}
......
......@@ -182,7 +182,7 @@ update_diagonal(DB_ENV *env, DB *db[], int ndbs, int nrows) {
DBT keys[ndbts]; memset(keys, 0, sizeof keys);
DBT vals[ndbts]; memset(vals, 0, sizeof vals);
r = env->update_multiple(env, NULL, txn, &old_key, &old_data, &new_key, &new_data, ndbs, db, ndbts, keys, vals, NULL);
r = env->update_multiple(env, NULL, txn, &old_key, &old_data, &new_key, &new_data, ndbs, db, ndbts, keys, ndbts, vals, NULL);
assert_zero(r);
}
r = txn->commit(txn, 0); assert_zero(r);
......
......@@ -161,7 +161,7 @@ update_key0(DB_ENV *env, DB *db[], int ndbs, int nrows) {
DBT keys[ndbts]; memset(keys, 0, sizeof keys);
DBT vals[ndbts]; memset(vals, 0, sizeof vals);
r = env->update_multiple(env, NULL, txn, &old_key, &old_data, &new_key, &new_data, ndbs, db, ndbts, keys, vals, NULL);
r = env->update_multiple(env, NULL, txn, &old_key, &old_data, &new_key, &new_data, ndbs, db, ndbts, keys, ndbts, vals, NULL);
assert_zero(r);
verify_locked(env, db[0], k);
......
......@@ -156,7 +156,7 @@ verify(DB_ENV *env, DB *db[], int ndbs, int nrows) {
DBT keys[ndbts]; memset(keys, 0, sizeof keys);
DBT vals[ndbts]; memset(vals, 0, sizeof vals);
r = env->update_multiple(env, NULL, txn, &old_key, &old_data, &new_key, &new_data, ndbs, db, ndbts, keys, vals, NULL);
r = env->update_multiple(env, NULL, txn, &old_key, &old_data, &new_key, &new_data, ndbs, db, ndbts, keys, ndbts, vals, NULL);
assert_zero(r);
}
r = txn->commit(txn, 0); assert_zero(r);
......
......@@ -1544,7 +1544,8 @@ static int env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *old_src_key, const DBT *old_src_data,
const DBT *new_src_key, const DBT *new_src_data,
uint32_t num_dbs, DB **db_array,
uint32_t num_dbts, DBT *keys, DBT *vals,
uint32_t num_keys, DBT *keys,
uint32_t num_vals, DBT *vals,
void *extra);
static int
......@@ -1571,10 +1572,11 @@ locked_env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *old_src_key, const DBT *old_src_data,
const DBT *new_src_key, const DBT *new_src_data,
uint32_t num_dbs, DB **db_array,
uint32_t num_dbts, DBT *keys, DBT *vals,
uint32_t num_keys, DBT *keys,
uint32_t num_vals, DBT *vals,
void *extra) {
toku_ydb_lock();
int r = env_update_multiple(env, src_db, txn, old_src_key, old_src_data, new_src_key, new_src_data, num_dbs, db_array, num_dbts, keys, vals, extra);
int r = env_update_multiple(env, src_db, txn, old_src_key, old_src_data, new_src_key, new_src_data, num_dbs, db_array, num_keys, keys, num_vals, vals, extra);
toku_ydb_unlock();
return r;
}
......@@ -3644,14 +3646,30 @@ log_del_single(DB_TXN *txn, BRT brt, const DBT *key) {
return r;
}
static uint32_t
sum_size(uint32_t num_keys, DBT keys[], uint32_t overhead) {
uint32_t sum = 0;
for (uint32_t i = 0; i < num_keys; i++)
sum += keys[i].size + overhead;
return sum;
}
static int
log_del_multiple(DB_TXN *txn, DB *src_db, const DBT *key, const DBT *val, uint32_t num_dbs, BRT brts[]) {
log_del_multiple(DB_TXN *txn, DB *src_db, const DBT *key, const DBT *val, uint32_t num_dbs, BRT brts[], DBT keys[]) {
int r = 0;
if (num_dbs > 0) {
TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn;
BRT src_brt = src_db ? src_db->i->brt : NULL;
const uint32_t log_entry_overhead = 24; // rough approximation of the log entry overhead for deletes
uint32_t del_multiple_size = key->size + val->size + log_entry_overhead;
uint32_t del_single_sizes = sum_size(num_dbs, keys, log_entry_overhead);
if (del_single_sizes < del_multiple_size) {
for (uint32_t i = 0; r == 0 && i < num_dbs; i++)
r = log_del_single(txn, brts[i], &keys[i]);
} else {
r = toku_brt_log_del_multiple(ttxn, src_brt, brts, num_dbs, key, val);
}
}
return r;
}
......@@ -3731,7 +3749,7 @@ env_del_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn, const DBT *key, const DBT
if (num_dbs == 1)
r = log_del_single(txn, brts[0], &keys[0]);
else
r = log_del_multiple(txn, src_db, key, val, num_dbs, brts);
r = log_del_multiple(txn, src_db, key, val, num_dbs, brts, keys);
if (r == 0)
r = do_del_multiple(txn, num_dbs, db_array, keys);
......@@ -4444,7 +4462,8 @@ env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *old_src_key, const DBT *old_src_data,
const DBT *new_src_key, const DBT *new_src_data,
uint32_t num_dbs, DB **db_array,
uint32_t num_dbts, DBT keys[], DBT vals[],
uint32_t num_keys, DBT keys[],
uint32_t num_vals, DBT vals[],
void *extra) {
int r = 0;
......@@ -4468,7 +4487,6 @@ env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn,
HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn);
{
// RFP malloc this stuff?
uint32_t n_del_dbs = 0;
DB *del_dbs[num_dbs];
BRT del_brts[num_dbs];
......@@ -4480,40 +4498,52 @@ env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn,
DBT put_keys[num_dbs];
DBT put_vals[num_dbs];
int (*cmpfun)(DB *db, const DBT *a, const DBT *b) = toku_builtin_compare_fun;
if (env->i->bt_compare)
cmpfun = env->i->bt_compare;
for (uint32_t which_db = 0; which_db < num_dbs; which_db++) {
DB *db = db_array[which_db];
// Generate the old key and val
r = env->i->generate_row_for_put(db, src_db, &keys[which_db], &vals[which_db], old_src_key, old_src_data, extra);
if (r != 0) goto cleanup;
// keys[0..num_dbs-1] are the new keys
// keys[num_dbs..2*num_dbs-1] are the old keys
// vals[0..num_dbs-1] are the new vals
if (which_db + num_dbs >= num_dbts) {
// Generate the old key and val
if (which_db + num_dbs >= num_keys) {
r = ENOMEM; goto cleanup;
}
r = env->i->generate_row_for_put(db, src_db, &keys[which_db + num_dbs], NULL, old_src_key, old_src_data, extra);
if (r != 0) goto cleanup;
// Generate the new key and val
r = env->i->generate_row_for_put(db, src_db, &keys[which_db + num_dbs], &vals[which_db + num_dbs], new_src_key, new_src_data, extra);
if (which_db >= num_keys || which_db >= num_vals) {
r = ENOMEM; goto cleanup;
}
r = env->i->generate_row_for_put(db, src_db, &keys[which_db], &vals[which_db], new_src_key, new_src_data, extra);
if (r != 0) goto cleanup;
// RFP can i just memcmp the keys?
BOOL key_eq = dbt_cmp(&keys[which_db], &keys[which_db + num_dbs]) == 0;
BOOL key_eq = cmpfun(db, &keys[which_db + num_dbs], &keys[which_db]) == 0;
if (!key_eq) {
r = toku_grab_read_lock_on_directory(db, txn);
if (r != 0) goto cleanup;
// lock old key
if (db->i->lt) {
r = get_point_lock(db, txn, &keys[which_db]);
r = get_point_lock(db, txn, &keys[which_db + num_dbs]);
if (r != 0) goto cleanup;
}
del_dbs[n_del_dbs] = db;
del_brts[n_del_dbs] = db->i->brt;
del_keys[n_del_dbs] = keys[which_db];
del_keys[n_del_dbs] = keys[which_db + num_dbs];
n_del_dbs++;
}
if (!key_eq || !(dbt_cmp(&vals[which_db], &vals[which_db + num_dbs]) == 0)) {
r = db_put_check_size_constraints(db, &keys[which_db + num_dbs], &vals[which_db + num_dbs]);
// we take a shortcut and avoid generating the old val
// we assume that any new vals with size > 0 are different than the old val
// if (!key_eq || !(dbt_cmp(&vals[which_db], &vals[which_db + num_dbs]) == 0)) {
if (!key_eq || vals[which_db].size > 0) {
r = db_put_check_size_constraints(db, &keys[which_db], &vals[which_db]);
if (r != 0) goto cleanup;
r = toku_grab_read_lock_on_directory(db, txn);
......@@ -4521,13 +4551,13 @@ env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn,
// lock new key
if (db->i->lt) {
r = get_point_lock(db, txn, &keys[which_db + num_dbs]);
r = get_point_lock(db, txn, &keys[which_db]);
if (r != 0) goto cleanup;
}
put_dbs[n_put_dbs] = db;
put_brts[n_put_dbs] = db->i->brt;
put_keys[n_put_dbs] = keys[which_db + num_dbs];
put_vals[n_put_dbs] = vals[which_db + num_dbs];
put_keys[n_put_dbs] = keys[which_db];
put_vals[n_put_dbs] = vals[which_db];
n_put_dbs++;
}
}
......@@ -4536,7 +4566,7 @@ env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn,
if (n_del_dbs == 1)
r = log_del_single(txn, del_brts[0], &del_keys[0]);
else
r = log_del_multiple(txn, src_db, old_src_key, old_src_data, n_del_dbs, del_brts);
r = log_del_multiple(txn, src_db, old_src_key, old_src_data, n_del_dbs, del_brts, del_keys);
if (r == 0)
r = do_del_multiple(txn, n_del_dbs, del_dbs, del_keys);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment