Commit a0994a17 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:3015], merge update_multiple changes to main

git-svn-id: file:///svn/toku/tokudb@25473 c7de825b-a66e-492c-adef-691d508d4ae1
parent b05d16ce
......@@ -278,9 +278,9 @@ struct __toku_db_env {
const DBT *src_key, const DBT *src_val,
void *extra));
int (*update_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *old_src_key, const DBT *old_src_data,
const DBT *new_src_key, const DBT *new_src_data,
uint32_t num_dbs, DB **db_array,
DBT *old_src_key, DBT *old_src_data,
DBT *new_src_key, DBT *new_src_data,
uint32_t num_dbs, DB **db_array, uint32_t *flags_array,
uint32_t num_keys, DBT *keys,
uint32_t num_vals, DBT *vals,
void *extra) /* update multiple DBs */;
......
......@@ -280,9 +280,9 @@ struct __toku_db_env {
const DBT *src_key, const DBT *src_val,
void *extra));
int (*update_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *old_src_key, const DBT *old_src_data,
const DBT *new_src_key, const DBT *new_src_data,
uint32_t num_dbs, DB **db_array,
DBT *old_src_key, DBT *old_src_data,
DBT *new_src_key, DBT *new_src_data,
uint32_t num_dbs, DB **db_array, uint32_t *flags_array,
uint32_t num_keys, DBT *keys,
uint32_t num_vals, DBT *vals,
void *extra) /* update multiple DBs */;
......
......@@ -280,9 +280,9 @@ struct __toku_db_env {
const DBT *src_key, const DBT *src_val,
void *extra));
int (*update_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *old_src_key, const DBT *old_src_data,
const DBT *new_src_key, const DBT *new_src_data,
uint32_t num_dbs, DB **db_array,
DBT *old_src_key, DBT *old_src_data,
DBT *new_src_key, DBT *new_src_data,
uint32_t num_dbs, DB **db_array, uint32_t *flags_array,
uint32_t num_keys, DBT *keys,
uint32_t num_vals, DBT *vals,
void *extra) /* update multiple DBs */;
......
......@@ -280,9 +280,9 @@ struct __toku_db_env {
const DBT *src_key, const DBT *src_val,
void *extra));
int (*update_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *old_src_key, const DBT *old_src_data,
const DBT *new_src_key, const DBT *new_src_data,
uint32_t num_dbs, DB **db_array,
DBT *old_src_key, DBT *old_src_data,
DBT *new_src_key, DBT *new_src_data,
uint32_t num_dbs, DB **db_array, uint32_t *flags_array,
uint32_t num_keys, DBT *keys,
uint32_t num_vals, DBT *vals,
void *extra) /* update multiple DBs */;
......
......@@ -281,9 +281,9 @@ struct __toku_db_env {
const DBT *src_key, const DBT *src_val,
void *extra));
int (*update_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *old_src_key, const DBT *old_src_data,
const DBT *new_src_key, const DBT *new_src_data,
uint32_t num_dbs, DB **db_array,
DBT *old_src_key, DBT *old_src_data,
DBT *new_src_key, DBT *new_src_data,
uint32_t num_dbs, DB **db_array, uint32_t *flags_array,
uint32_t num_keys, DBT *keys,
uint32_t num_vals, DBT *vals,
void *extra) /* update multiple DBs */;
......
......@@ -579,9 +579,9 @@ int main (int argc __attribute__((__unused__)), char *const argv[] __attribute__
" const DBT *src_key, const DBT *src_val,\n"
" void *extra))",
"int (*update_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,\n"
" const DBT *old_src_key, const DBT *old_src_data,\n"
" const DBT *new_src_key, const DBT *new_src_data,\n"
" uint32_t num_dbs, DB **db_array,\n"
" DBT *old_src_key, DBT *old_src_data,\n"
" DBT *new_src_key, DBT *new_src_data,\n"
" uint32_t num_dbs, DB **db_array, uint32_t *flags_array,\n"
" uint32_t num_keys, DBT *keys,\n"
" uint32_t num_vals, DBT *vals,\n"
" void *extra) /* update multiple DBs */",
......
......@@ -281,9 +281,9 @@ struct __toku_db_env {
const DBT *src_key, const DBT *src_val,
void *extra));
int (*update_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *old_src_key, const DBT *old_src_data,
const DBT *new_src_key, const DBT *new_src_data,
uint32_t num_dbs, DB **db_array,
DBT *old_src_key, DBT *old_src_data,
DBT *new_src_key, DBT *new_src_data,
uint32_t num_dbs, DB **db_array, uint32_t *flags_array,
uint32_t num_keys, DBT *keys,
uint32_t num_vals, DBT *vals,
void *extra) /* update multiple DBs */;
......
......@@ -281,9 +281,9 @@ struct __toku_db_env {
const DBT *src_key, const DBT *src_val,
void *extra));
int (*update_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *old_src_key, const DBT *old_src_data,
const DBT *new_src_key, const DBT *new_src_data,
uint32_t num_dbs, DB **db_array,
DBT *old_src_key, DBT *old_src_data,
DBT *new_src_key, DBT *new_src_data,
uint32_t num_dbs, DB **db_array, uint32_t *flags_array,
uint32_t num_keys, DBT *keys,
uint32_t num_vals, DBT *vals,
void *extra) /* update multiple DBs */;
......
......@@ -110,8 +110,9 @@ update_diagonal(DB_ENV *env, DB_TXN *txn, DB *db[], int ndbs, int nrows) {
int ndbts = 2 * ndbs;
DBT keys[ndbts]; memset(keys, 0, sizeof keys);
DBT vals[ndbts]; memset(vals, 0, sizeof vals);
uint32_t flags_array[ndbs]; memset(flags_array, 0, sizeof(flags_array));
r = env->update_multiple(env, NULL, txn, &old_key, &old_data, &new_key, &new_data, ndbs, db, ndbts, keys, ndbts, vals, NULL);
r = env->update_multiple(env, NULL, txn, &old_key, &old_data, &new_key, &new_data, ndbs, db, flags_array, ndbts, keys, ndbts, vals, NULL);
assert_zero(r);
}
}
......
......@@ -110,8 +110,9 @@ update_diagonal(DB_ENV *env, DB_TXN *txn, DB *db[], int ndbs, int nrows) {
int ndbts = 2 * ndbs;
DBT keys[ndbts]; memset(keys, 0, sizeof keys);
DBT vals[ndbts]; memset(vals, 0, sizeof vals);
uint32_t flags_array[ndbs]; memset(flags_array, 0, sizeof(flags_array));
r = env->update_multiple(env, NULL, txn, &old_key, &old_data, &new_key, &new_data, ndbs, db, ndbts, keys, ndbts, vals, NULL);
r = env->update_multiple(env, NULL, txn, &old_key, &old_data, &new_key, &new_data, ndbs, db, flags_array, ndbts, keys, ndbts, vals, NULL);
assert_zero(r);
}
}
......
......@@ -181,8 +181,9 @@ update_diagonal(DB_ENV *env, DB *db[], int ndbs, int nrows) {
int ndbts = 2 * ndbs;
DBT keys[ndbts]; memset(keys, 0, sizeof keys);
DBT vals[ndbts]; memset(vals, 0, sizeof vals);
uint32_t flags_array[ndbs]; memset(flags_array, 0, sizeof(flags_array));
r = env->update_multiple(env, NULL, txn, &old_key, &old_data, &new_key, &new_data, ndbs, db, ndbts, keys, ndbts, vals, NULL);
r = env->update_multiple(env, NULL, txn, &old_key, &old_data, &new_key, &new_data, ndbs, db, flags_array, ndbts, keys, ndbts, vals, NULL);
assert_zero(r);
}
r = txn->commit(txn, 0); assert_zero(r);
......
......@@ -160,8 +160,9 @@ update_key0(DB_ENV *env, DB *db[], int ndbs, int nrows) {
int ndbts = 2 * ndbs;
DBT keys[ndbts]; memset(keys, 0, sizeof keys);
DBT vals[ndbts]; memset(vals, 0, sizeof vals);
uint32_t flags_array[ndbs]; memset(flags_array, 0, sizeof(flags_array));
r = env->update_multiple(env, NULL, txn, &old_key, &old_data, &new_key, &new_data, ndbs, db, ndbts, keys, ndbts, vals, NULL);
r = env->update_multiple(env, NULL, txn, &old_key, &old_data, &new_key, &new_data, ndbs, db, flags_array, ndbts, keys, ndbts, vals, NULL);
assert_zero(r);
verify_locked(env, db[0], k);
......
......@@ -155,8 +155,9 @@ verify(DB_ENV *env, DB *db[], int ndbs, int nrows) {
int ndbts = 2 * ndbs;
DBT keys[ndbts]; memset(keys, 0, sizeof keys);
DBT vals[ndbts]; memset(vals, 0, sizeof vals);
uint32_t flags_array[ndbs]; memset(flags_array, 0, sizeof(flags_array));
r = env->update_multiple(env, NULL, txn, &old_key, &old_data, &new_key, &new_data, ndbs, db, ndbts, keys, ndbts, vals, NULL);
r = env->update_multiple(env, NULL, txn, &old_key, &old_data, &new_key, &new_data, ndbs, db, flags_array, ndbts, keys, ndbts, vals, NULL);
assert_zero(r);
}
r = txn->commit(txn, 0); assert_zero(r);
......
......@@ -1557,9 +1557,9 @@ static int env_del_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn,
void *extra);
static int env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *old_src_key, const DBT *old_src_data,
const DBT *new_src_key, const DBT *new_src_data,
uint32_t num_dbs, DB **db_array,
DBT *old_src_key, DBT *old_src_data,
DBT *new_src_key, DBT *new_src_data,
uint32_t num_dbs, DB **db_array, uint32_t* flags_array,
uint32_t num_keys, DBT *keys,
uint32_t num_vals, DBT *vals,
void *extra);
......@@ -1585,14 +1585,14 @@ locked_env_del_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn, const DBT *key, co
static int
locked_env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *old_src_key, const DBT *old_src_data,
const DBT *new_src_key, const DBT *new_src_data,
uint32_t num_dbs, DB **db_array,
DBT *old_src_key, DBT *old_src_data,
DBT *new_src_key, DBT *new_src_data,
uint32_t num_dbs, DB **db_array, uint32_t* flags_array,
uint32_t num_keys, DBT *keys,
uint32_t num_vals, DBT *vals,
void *extra) {
toku_ydb_lock();
int r = env_update_multiple(env, src_db, txn, old_src_key, old_src_data, new_src_key, new_src_data, num_dbs, db_array, num_keys, keys, num_vals, vals, extra);
int r = env_update_multiple(env, src_db, txn, old_src_key, old_src_data, new_src_key, new_src_data, num_dbs, db_array, flags_array, num_keys, keys, num_vals, vals, extra);
toku_ydb_unlock();
return r;
}
......@@ -4336,7 +4336,7 @@ locked_db_row_size_supported(DB *db, u_int32_t size) {
//Return 0 if insert is legal
static int
db_put_check_overwrite_constraint(DB *db, DB_TXN *txn, DBT *key, DBT *UU(val),
db_put_check_overwrite_constraint(DB *db, DB_TXN *txn, DBT *key,
u_int32_t lock_flags, u_int32_t overwrite_flag) {
int r;
......@@ -4381,7 +4381,7 @@ toku_db_put(DB *db, DB_TXN *txn, DBT *key, DBT *val, u_int32_t flags) {
r = db_put_check_size_constraints(db, key, val);
if (r == 0) {
//Do any checking required by the flags.
r = db_put_check_overwrite_constraint(db, txn, key, val, lock_flags, flags);
r = db_put_check_overwrite_constraint(db, txn, key, lock_flags, flags);
}
BOOL do_locking = (BOOL)(db->i->lt && !(lock_flags&DB_PRELOCKED_WRITE));
if (r == 0 && do_locking) {
......@@ -4500,7 +4500,7 @@ env_put_multiple(
//Check overwrite constraints
r = db_put_check_overwrite_constraint(db, txn,
&put_keys[which_db], &put_vals[which_db],
&put_keys[which_db],
lock_flags[which_db], remaining_flags[which_db]);
if (r != 0) goto cleanup;
if (remaining_flags[which_db] == DB_NOOVERWRITE_NO_ERROR) {
......@@ -4545,32 +4545,63 @@ dbt_cmp(const DBT *a, const DBT *b) {
}
static int
update_single(DB *db, DB_TXN *txn, const DBT *old_key, const DBT *old_data, const DBT *new_key, const DBT *new_data) {
update_single(
DB *db,
uint32_t flags,
DB_TXN *txn,
DBT *old_key,
DBT *old_data,
DBT *new_key,
DBT *new_data
)
{
int r = 0;
uint32_t lock_flags;
uint32_t remaining_flags;
lock_flags = get_prelocked_flags(flags);
remaining_flags = flags & ~lock_flags;
BOOL key_eq = dbt_cmp(old_key, new_key) == 0;
if (!key_eq)
r = toku_db_del(db, txn, (DBT *) old_key, DB_DELETE_ANY);
if (!key_eq) {
//Check overwrite constraints only in the case where
// the keys are not equal.
// If the keys are equal, then we do not care of the flag is DB_NOOVERWRITE or DB_YESOVERWRITE
r = db_put_check_overwrite_constraint(db, txn,
new_key,
lock_flags, remaining_flags);
if (r != 0) goto cleanup;
if (r == 0 && (!key_eq || !(dbt_cmp(old_data, new_data) == 0)))
r = toku_db_put(db, txn, (DBT *) new_key, (DBT *) new_data, DB_YESOVERWRITE);
r = toku_db_del(db, txn, (DBT *) old_key, DB_DELETE_ANY);
}
if (r == 0 && (!key_eq || !(dbt_cmp(old_data, new_data) == 0))) {
r = toku_db_put(db, txn, (DBT *) new_key, (DBT *) new_data, DB_YESOVERWRITE);
}
cleanup:
return r;
}
static int
env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *old_src_key, const DBT *old_src_data,
const DBT *new_src_key, const DBT *new_src_data,
uint32_t num_dbs, DB **db_array,
DBT *old_src_key, DBT *old_src_data,
DBT *new_src_key, DBT *new_src_data,
uint32_t num_dbs, DB **db_array, uint32_t* flags_array,
uint32_t num_keys, DBT keys[],
uint32_t num_vals, DBT vals[],
void *extra) {
int r = 0;
// special case for a single DB
if (num_dbs == 1 && src_db == db_array[0]) {
r = update_single(db_array[0], txn, old_src_key, old_src_data, new_src_key, new_src_data);
if (FALSE && num_dbs == 1 && src_db == db_array[0]) {
r = update_single(
db_array[0],
flags_array[0],
txn,
old_src_key,
old_src_data,
new_src_key,
new_src_data
);
goto cleanup;
}
......@@ -4599,12 +4630,20 @@ env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn,
DBT put_keys[num_dbs];
DBT put_vals[num_dbs];
uint32_t lock_flags[num_dbs];
uint32_t remaining_flags[num_dbs];
int (*cmpfun)(DB *db, const DBT *a, const DBT *b) = toku_builtin_compare_fun;
if (env->i->bt_compare)
cmpfun = env->i->bt_compare;
for (uint32_t which_db = 0; which_db < num_dbs; which_db++) {
DB *db = db_array[which_db];
DBT curr_old_key, curr_new_key, curr_new_val;
lock_flags[which_db] = get_prelocked_flags(flags_array[which_db]);
remaining_flags[which_db] = flags_array[which_db] & ~lock_flags[which_db];
// keys[0..num_dbs-1] are the new keys
// keys[num_dbs..2*num_dbs-1] are the old keys
......@@ -4614,37 +4653,63 @@ env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn,
if (which_db + num_dbs >= num_keys) {
r = ENOMEM; goto cleanup;
}
r = env->i->generate_row_for_put(db, src_db, &keys[which_db + num_dbs], NULL, old_src_key, old_src_data, extra);
if (r != 0) goto cleanup;
if (db == src_db) {
curr_old_key = *old_src_key;
}
else {
r = env->i->generate_row_for_put(db, src_db, &keys[which_db + num_dbs], NULL, old_src_key, old_src_data, extra);
if (r != 0) goto cleanup;
curr_old_key = keys[which_db + num_dbs];
}
// Generate the new key and val
if (which_db >= num_keys || which_db >= num_vals) {
r = ENOMEM; goto cleanup;
}
r = env->i->generate_row_for_put(db, src_db, &keys[which_db], &vals[which_db], new_src_key, new_src_data, extra);
if (r != 0) goto cleanup;
BOOL key_eq = cmpfun(db, &keys[which_db + num_dbs], &keys[which_db]) == 0;
if (db == src_db) {
curr_new_key = *new_src_key;
curr_new_val = *new_src_data;
}
else {
r = env->i->generate_row_for_put(db, src_db, &keys[which_db], &vals[which_db], new_src_key, new_src_data, extra);
if (r != 0) goto cleanup;
curr_new_key = keys[which_db];
curr_new_val = vals[which_db];
}
BOOL key_eq = cmpfun(db, &curr_old_key, &curr_new_key) == 0;
if (!key_eq) {
r = toku_grab_read_lock_on_directory(db, txn);
if (r != 0) goto cleanup;
//Check overwrite constraints only in the case where
// the keys are not equal.
// If the keys are equal, then we do not care of the flag is DB_NOOVERWRITE or DB_YESOVERWRITE
r = db_put_check_overwrite_constraint(db, txn,
&curr_new_key,
lock_flags[which_db], remaining_flags[which_db]);
if (r != 0) goto cleanup;
if (remaining_flags[which_db] == DB_NOOVERWRITE_NO_ERROR) {
//update_multiple does not support delaying the no error, since we would
//have to log the flag in the put_multiple.
r = EINVAL; goto cleanup;
}
// lock old key
if (db->i->lt) {
r = get_point_lock(db, txn, &keys[which_db + num_dbs]);
if (db->i->lt && !(lock_flags[which_db] & DB_PRELOCKED_WRITE)) {
r = get_point_lock(db, txn, &curr_old_key);
if (r != 0) goto cleanup;
}
del_dbs[n_del_dbs] = db;
del_brts[n_del_dbs] = db->i->brt;
del_keys[n_del_dbs] = keys[which_db + num_dbs];
del_keys[n_del_dbs] = curr_old_key;
n_del_dbs++;
}
// we take a shortcut and avoid generating the old val
// we assume that any new vals with size > 0 are different than the old val
// if (!key_eq || !(dbt_cmp(&vals[which_db], &vals[which_db + num_dbs]) == 0)) {
if (!key_eq || vals[which_db].size > 0) {
r = db_put_check_size_constraints(db, &keys[which_db], &vals[which_db]);
if (!key_eq || curr_new_val.size > 0) {
r = db_put_check_size_constraints(db, &curr_new_key, &curr_new_val);
if (r != 0) goto cleanup;
r = toku_grab_read_lock_on_directory(db, txn);
......@@ -4652,13 +4717,13 @@ env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn,
// lock new key
if (db->i->lt) {
r = get_point_lock(db, txn, &keys[which_db]);
r = get_point_lock(db, txn, &curr_new_key);
if (r != 0) goto cleanup;
}
put_dbs[n_put_dbs] = db;
put_brts[n_put_dbs] = db->i->brt;
put_keys[n_put_dbs] = keys[which_db];
put_vals[n_put_dbs] = vals[which_db];
put_keys[n_put_dbs] = curr_new_key;
put_vals[n_put_dbs] = curr_new_val;
n_put_dbs++;
}
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment