Commit d5da2448 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

#3258 fix the hot indexer when txn is committing or aborting refs[t:3258]

git-svn-id: file:///svn/toku/tokudb@28442 c7de825b-a66e-492c-adef-691d508d4ae1
parent 68f33321
......@@ -155,6 +155,7 @@ struct tokutxn {
ROLLBACK_LOG_NODE pinned_inprogress_rollback_log;
struct toku_list checkpoint_before_commit;
TXN_IGNORE_S ignore_errors; // 2954
TOKUTXN_STATE state;
};
struct txninfo {
......
......@@ -779,6 +779,7 @@ int toku_get_and_pin_rollback_log(TOKUTXN txn, TXNID xid, uint64_t sequence, BLO
}
int toku_get_and_pin_rollback_log_for_new_entry (TOKUTXN txn, ROLLBACK_LOG_NODE *result) {
assert(txn->state == TOKUTXN_LIVE);
int r;
ROLLBACK_LOG_NODE log;
if (txn_has_inprogress_rollback_log(txn)) {
......
......@@ -253,6 +253,8 @@ int toku_txn_begin_with_xid (
result->force_fsync_on_commit = FALSE;
result->recovered_from_checkpoint = FALSE;
toku_list_init(&result->checkpoint_before_commit);
result->state = TOKUTXN_LIVE;
// 2954
r = toku_txn_ignore_init(result);
if (r != 0) goto died;
......@@ -352,6 +354,7 @@ local_checkpoints_and_log_xcommit(void *thunk) {
int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, LSN oplsn,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) {
txn->state = TOKUTXN_COMMIT;
if (garbage_collection_debug) {
verify_snapshot_system(txn->logger);
}
......@@ -388,6 +391,7 @@ int toku_txn_abort_txn(TOKUTXN txn, YIELDF yield, void *yieldv,
int toku_txn_abort_with_lsn(TOKUTXN txn, YIELDF yield, void *yieldv, LSN oplsn,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) {
txn->state = TOKUTXN_ABORT;
if (garbage_collection_debug) {
verify_snapshot_system(txn->logger);
}
......@@ -685,3 +689,8 @@ int toku_txn_ignore_contains(TOKUTXN txn, FILENUM filenum)
}
return ENOENT;
}
TOKUTXN_STATE
toku_txn_get_state(TOKUTXN txn) {
return txn->state;
}
......@@ -86,8 +86,18 @@ int toku_txn_ignore_add(TOKUTXN txn, FILENUM filenum);
int toku_txn_ignore_remove(TOKUTXN txn, FILENUM filenum);
int toku_txn_ignore_contains(TOKUTXN txn, FILENUM filenum);
#if defined(__cplusplus) || defined(__cilkplusplus)
enum tokutxn_state {
TOKUTXN_NOT_LIVE,
TOKUTXN_LIVE,
TOKUTXN_COMMIT,
TOKUTXN_ABORT,
};
typedef enum tokutxn_state TOKUTXN_STATE;
TOKUTXN_STATE toku_txn_get_state(TOKUTXN txn);
#if defined(__cplusplus) || defined(__cilkplusplus)
}
#endif
#endif //TOKUTXN_H
......@@ -42,7 +42,7 @@ struct __toku_indexer_internal {
// test functions
int (*undo_do)(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule);
int (*test_is_xid_live)(DB_INDEXER *indexer, TXNID xid);
int (*test_xid_state)(DB_INDEXER *indexer, TXNID xid);
int (*test_lock_key)(DB_INDEXER *indexer, TXNID xid, DB *hotdb, DBT *key);
int (*test_delete_provisional)(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids);
int (*test_delete_committed)(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids);
......
/* -*- mode: C; c-basic-offset: 4 -*- */
/*
* Copyright (c) 2010 Tokutek Inc. All rights reserved."
* Copyright (c) 2010-2011 Tokutek Inc. All rights reserved.
* The technology is licensed by the Massachusetts Institute of Technology,
* Rutgers State University of New Jersey, and the Research Foundation of
* State University of New York at Stony Brook under United States of America
* Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
* Serial No. 11/760379 and to the patents and/or patent applications resulting from it.
*/
#include <stdio.h>
......@@ -70,7 +70,7 @@ indexer_commit_keys_set_empty(struct indexer_commit_keys *keys) {
// internal functions
static int indexer_set_xid(DB_INDEXER *indexer, TXNID xid, XIDS *xids_result);
static int indexer_append_xid(DB_INDEXER *indexer, TXNID xid, BOOL xid_is_live, XIDS *xids_result);
static int indexer_append_xid(DB_INDEXER *indexer, TXNID xid, XIDS *xids_result);
static BOOL indexer_find_prev_xr(DB_INDEXER *indexer, ULEHANDLE ule, uint64_t xrindex, uint64_t *prev_xrindex);
......@@ -81,7 +81,8 @@ static int indexer_brt_insert_provisional(DB_INDEXER *indexer, DB *hotdb, DBT *h
static int indexer_brt_insert_committed(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, DBT *hotval, XIDS xids);
static int indexer_brt_commit(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids);
static int indexer_lock_key(DB_INDEXER *indexer, DB *hotdb, DBT *key, TXNID outermost_live_xid);
static int indexer_is_xid_live(DB_INDEXER *indexer, TXNID xid);
static TOKUTXN_STATE indexer_xid_state(DB_INDEXER *indexer, TXNID xid);
// initialize undo globals located in the indexer private object
void
......@@ -184,7 +185,7 @@ indexer_undo_do_provisional(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) {
XIDS xids = xids_get_root_xids();
TXNID outermost_xid = TXNID_NONE;
BOOL outermost_is_live = FALSE;
TOKUTXN_STATE outermost_xid_state = TOKUTXN_NOT_LIVE;
// scan the provisional stack from bottom to top
uint32_t num_committed = ule_get_num_committed(ule);
......@@ -195,20 +196,25 @@ indexer_undo_do_provisional(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) {
UXRHANDLE uxr = ule_get_uxr(ule, xrindex);
TXNID this_xid = uxr_get_txnid(uxr);
BOOL this_xid_is_live = indexer_is_xid_live(indexer, this_xid);
TOKUTXN_STATE this_xid_state = indexer_xid_state(indexer, this_xid);
if (this_xid_state == TOKUTXN_ABORT)
break; // nothing to do once we reach a transaction that is aborting
if (xrindex == num_committed) {
outermost_xid = this_xid;
outermost_is_live = this_xid_is_live;
}
// setup up the xids
result = indexer_append_xid(indexer, this_xid, this_xid_is_live, &xids);
outermost_xid_state = this_xid_state;
result = indexer_set_xid(indexer, this_xid, &xids); // always add the outermost xid to the XIDS list
} else if (this_xid_state == TOKUTXN_LIVE)
result = indexer_append_xid(indexer, this_xid, &xids); // append a live xid to the XIDS list
if (result != 0)
break;
// skip placeholders
if (outermost_xid_state != TOKUTXN_LIVE && xrindex > num_committed)
invariant(this_xid_state == TOKUTXN_NOT_LIVE);
if (uxr_is_placeholder(uxr))
continue;
continue; // skip placeholders
// undo
uint64_t prev_xrindex;
......@@ -222,14 +228,16 @@ indexer_undo_do_provisional(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) {
result = indexer_generate_hot_key_val(indexer, hotdb, ule, prevuxr, &indexer->i->hotkey, NULL);
if (result == 0) {
// send the delete message
if (!outermost_is_live) {
result = indexer_brt_delete_committed(indexer, hotdb, &indexer->i->hotkey, xids);
if (result == 0)
indexer_commit_keys_add(&indexer->i->commit_keys, indexer->i->hotkey.size, indexer->i->hotkey.data);
} else {
if (outermost_xid_state == TOKUTXN_LIVE) {
invariant(this_xid_state != TOKUTXN_ABORT);
result = indexer_brt_delete_provisional(indexer, hotdb, &indexer->i->hotkey, xids);
if (result == 0)
result = indexer_lock_key(indexer, hotdb, &indexer->i->hotkey, outermost_xid);
} else {
invariant(outermost_xid_state == TOKUTXN_NOT_LIVE || outermost_xid_state == TOKUTXN_COMMIT);
result = indexer_brt_delete_committed(indexer, hotdb, &indexer->i->hotkey, xids);
if (result == 0)
indexer_commit_keys_add(&indexer->i->commit_keys, indexer->i->hotkey.size, indexer->i->hotkey.data);
}
}
} else
......@@ -246,17 +254,19 @@ indexer_undo_do_provisional(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) {
result = indexer_generate_hot_key_val(indexer, hotdb, ule, uxr, &indexer->i->hotkey, &indexer->i->hotval);
if (result == 0) {
// send the insert message
if (!outermost_is_live) {
if (outermost_xid_state == TOKUTXN_LIVE) {
invariant(this_xid_state != TOKUTXN_ABORT);
result = indexer_brt_insert_provisional(indexer, hotdb, &indexer->i->hotkey, &indexer->i->hotval, xids);
if (result == 0)
result = indexer_lock_key(indexer, hotdb, &indexer->i->hotkey, outermost_xid);
} else {
invariant(outermost_xid_state == TOKUTXN_NOT_LIVE || outermost_xid_state == TOKUTXN_COMMIT);
result = indexer_brt_insert_committed(indexer, hotdb, &indexer->i->hotkey, &indexer->i->hotval, xids);
#if 0
// no need to do this
if (result == 0)
indexer_commit_keys_add(&indexer->i->commit_keys, indexer->i->hotkey.size, indexer->i->hotkey.data);
#endif
} else {
result = indexer_brt_insert_provisional(indexer, hotdb, &indexer->i->hotkey, &indexer->i->hotval, xids);
if (result == 0)
result = indexer_lock_key(indexer, hotdb, &indexer->i->hotkey, outermost_xid);
}
}
} else
......@@ -287,7 +297,7 @@ indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) {
return result;
}
// the committed XIDS always = [this_xid]
// set xids_result = [root_xid, this_xid]
// Note that this could be sped up by adding a new xids constructor that constructs the stack with
// exactly one xid.
static int
......@@ -310,34 +320,16 @@ indexer_set_xid(DB_INDEXER *UU(indexer), TXNID this_xid, XIDS *xids_result) {
return result;
}
// the provisional XIDS = XIDS . [this_xid] when this_xid is live or when XIDS is empty
// append xid to xids_result
static int
indexer_append_xid(DB_INDEXER *UU(indexer), TXNID xid, BOOL xid_is_live, XIDS *xids_result) {
int result = 0;
indexer_append_xid(DB_INDEXER *UU(indexer), TXNID xid, XIDS *xids_result) {
XIDS old_xids = *xids_result;
XIDS new_xids;
if (xids_get_num_xids(old_xids) == 0) {
// setup xids = [ root xid, xid ]
new_xids = xids_get_root_xids();
if (xid > 0) {
XIDS child_xids;
result = xids_create_child(new_xids, &child_xids, xid);
xids_destroy(&new_xids);
if (result == 0) {
new_xids = child_xids;
xids_destroy(&old_xids);
*xids_result = new_xids;
}
}
} else if (xid_is_live) {
// append xid to xids
result = xids_create_child(old_xids, &new_xids, xid);
if (result == 0) {
xids_destroy(&old_xids);
*xids_result = new_xids;
}
int result = xids_create_child(old_xids, &new_xids, xid);
if (result == 0) {
xids_destroy(&old_xids);
*xids_result = new_xids;
}
return result;
}
......@@ -365,18 +357,19 @@ indexer_generate_hot_key_val(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule, UXRH
// looks up the TOKUTXN by TXNID. if it does not exist then the transaction is committed.
// returns TRUE if the xid is committed. otherwise returns FALSE.
static int
indexer_is_xid_live(DB_INDEXER *indexer, TXNID xid) {
int result = 0;
static TOKUTXN_STATE
indexer_xid_state(DB_INDEXER *indexer, TXNID xid) {
TOKUTXN_STATE result = TOKUTXN_NOT_LIVE;
// TEST
if (indexer->i->test_is_xid_live) {
result = indexer->i->test_is_xid_live(indexer, xid);
if (indexer->i->test_xid_state) {
result = indexer->i->test_xid_state(indexer, xid);
} else {
DB_ENV *env = indexer->i->env;
TOKUTXN txn = NULL;
int r = toku_txnid2txn(env->i->logger, xid, &txn);
invariant(r == 0);
result = txn != NULL;
if (txn)
result = toku_txn_get_state(txn);
}
return result;
}
......
......@@ -114,6 +114,8 @@ BDB_DONTRUN_TESTS = \
hotindexer-lock-test \
hotindexer-multiclient \
hotindexer-nested-insert-committed \
hotindexer-put-abort \
hotindexer-put-commit \
hotindexer-put-multiple \
hotindexer-simple-abort \
hotindexer-undo-do-test \
......
#include "test.h"
#include "ydb.h"
#include "toku_pthread.h"
// this test reproduces the rollback log corruption that occurs when hot indexing runs concurrent with a long abort
// the concurrent operation occurs when the abort periodically releases the ydb lock which allows the hot indexer
// to run. the hot indexer erroneously append to the rollback log that is in the process of being aborted.
static int
put_callback(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_data, const DBT *src_key, const DBT *src_data) {
dest_db = dest_db; src_db = src_db; dest_key = dest_key; dest_data = dest_data; src_key = src_key; src_data = src_data;
lazy_assert(src_db != NULL && dest_db != NULL);
if (dest_key->flags == DB_DBT_REALLOC) {
dest_key->data = toku_realloc(dest_key->data, src_data->size);
memcpy(dest_key->data, src_data->data, src_data->size);
dest_key->size = src_data->size;
}
dest_data->size = 0;
return 0;
}
struct indexer_arg {
DB_ENV *env;
DB *src_db;
int n_dest_db;
DB **dest_db;
};
static void *
indexer_thread(void *arg) {
struct indexer_arg *indexer_arg = (struct indexer_arg *) arg;
DB_ENV *env = indexer_arg->env;
int r;
DB_TXN *indexer_txn = NULL;
r = env->txn_begin(env, NULL, &indexer_txn, 0); assert_zero(r);
DB_INDEXER *indexer = NULL;
r = env->create_indexer(env, indexer_txn, &indexer, indexer_arg->src_db, indexer_arg->n_dest_db, indexer_arg->dest_db, NULL, 0); assert_zero(r);
r = indexer->build(indexer); assert_zero(r);
r = indexer->close(indexer); assert_zero(r);
r = indexer_txn->commit(indexer_txn, 0); assert_zero(r);
return arg;
}
static void
verify_empty(DB_ENV *env, DB *db) {
int r;
DB_TXN *txn = NULL;
r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
DBC *cursor = NULL;
r = db->cursor(db, txn, &cursor, 0); assert_zero(r);
DBT key, val;
r = cursor->c_get(cursor, dbt_init(&key, 0, 0), dbt_init(&val, 0, 0), DB_NEXT);
assert(r == DB_NOTFOUND);
r = cursor->c_close(cursor); assert_zero(r);
r = txn->commit(txn, 0); assert_zero(r);
}
static void
run_test(void) {
int r;
DB_ENV *env = NULL;
r = db_env_create(&env, 0); assert_zero(r);
r = env->set_generate_row_callback_for_put(env, put_callback); assert_zero(r);
r = env->open(env, ENVDIR, DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
DB *src_db = NULL;
r = db_create(&src_db, env, 0); assert_zero(r);
r = src_db->open(src_db, NULL, "0.tdb", NULL, DB_BTREE, DB_AUTO_COMMIT+DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
DB *dest_db = NULL;
r = db_create(&dest_db, env, 0); assert_zero(r);
r = dest_db->open(dest_db, NULL, "1.tdb", NULL, DB_BTREE, DB_AUTO_COMMIT+DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
DB_TXN *txn = NULL;
r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
// insert some
for (int i = 0; i < 246723; i++) {
int k = htonl(i);
int v = i;
DBT key; dbt_init(&key, &k, sizeof k);
DBT val; dbt_init(&val, &v, sizeof v);
r = src_db->put(src_db, txn, &key, &val, 0); assert_zero(r);
}
// run the indexer
struct indexer_arg indexer_arg = { env, src_db, 1, &dest_db };
toku_pthread_t pid;
r = toku_pthread_create(&pid, NULL, indexer_thread, &indexer_arg); assert_zero(r);
r = txn->abort(txn); assert_zero(r);
void *ret;
r = toku_pthread_join(pid, &ret); assert_zero(r);
verify_empty(env, src_db);
verify_empty(env, dest_db);
r = src_db->close(src_db, 0); assert_zero(r);
r = dest_db->close(dest_db, 0); assert_zero(r);
r = env->close(env, 0); assert_zero(r);
}
int
test_main(int argc, char * const argv[]) {
int r;
// parse_args(argc, argv);
for (int i = 1; i < argc; i++) {
char * const arg = argv[i];
if (strcmp(arg, "-v") == 0) {
verbose++;
continue;
}
if (strcmp(arg, "-q") == 0) {
verbose = 0;
continue;
}
}
r = system("rm -rf " ENVDIR); assert_zero(r);
r = toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
run_test();
return 0;
}
#include "test.h"
#include "ydb.h"
#include "toku_pthread.h"
// this test reproduces the rollback log corruption that occurs when hot indexing runs concurrent with a long commit.
// the concurrent operation occurs when the commit periodically releases the ydb lock which allows the hot indexer
// to run. the hot indexer erroneously append to the rollback log that is in the process of being committed.
static int
put_callback(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_data, const DBT *src_key, const DBT *src_data) {
dest_db = dest_db; src_db = src_db; dest_key = dest_key; dest_data = dest_data; src_key = src_key; src_data = src_data;
lazy_assert(src_db != NULL && dest_db != NULL);
if (dest_key->flags == DB_DBT_REALLOC) {
dest_key->data = toku_realloc(dest_key->data, src_key->size);
memcpy(dest_key->data, src_key->data, src_key->size);
dest_key->size = src_key->size;
}
if (dest_data->flags == DB_DBT_REALLOC) {
dest_data->data = toku_realloc(dest_data->data, src_data->size);
memcpy(dest_data->data, src_data->data, src_data->size);
dest_data->size = src_data->size;
}
return 0;
}
struct indexer_arg {
DB_ENV *env;
DB *src_db;
int n_dest_db;
DB **dest_db;
};
static void *
indexer_thread(void *arg) {
struct indexer_arg *indexer_arg = (struct indexer_arg *) arg;
DB_ENV *env = indexer_arg->env;
int r;
DB_TXN *indexer_txn = NULL;
r = env->txn_begin(env, NULL, &indexer_txn, 0); assert_zero(r);
DB_INDEXER *indexer = NULL;
r = env->create_indexer(env, indexer_txn, &indexer, indexer_arg->src_db, indexer_arg->n_dest_db, indexer_arg->dest_db, NULL, 0); assert_zero(r);
if (verbose) fprintf(stderr, "build start\n");
r = indexer->build(indexer); assert_zero(r);
if (verbose) fprintf(stderr, "build end\n");
r = indexer->close(indexer); assert_zero(r);
r = indexer_txn->commit(indexer_txn, 0); assert_zero(r);
return arg;
}
static void
verify_full(DB_ENV *env, DB *db, int n) {
int r;
DB_TXN *txn = NULL;
r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
DBC *cursor = NULL;
r = db->cursor(db, txn, &cursor, 0); assert_zero(r);
int i = 0;
DBT key; dbt_init_realloc(&key);
DBT val; dbt_init_realloc(&val);
while (1) {
r = cursor->c_get(cursor, &key, &val, DB_NEXT);
if (r == DB_NOTFOUND)
break;
int k;
assert(key.size == sizeof k);
memcpy(&k, key.data, key.size);
assert(k == (int) htonl(i));
int v;
assert(val.size == sizeof v);
memcpy(&v, val.data, val.size);
assert(v == i);
i++;
}
assert(i == n);
toku_free(key.data);
toku_free(val.data);
r = cursor->c_close(cursor); assert_zero(r);
r = txn->commit(txn, 0); assert_zero(r);
}
static void
run_test(void) {
int r;
DB_ENV *env = NULL;
r = db_env_create(&env, 0); assert_zero(r);
r = env->set_generate_row_callback_for_put(env, put_callback); assert_zero(r);
r = env->open(env, ENVDIR, DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
DB *src_db = NULL;
r = db_create(&src_db, env, 0); assert_zero(r);
r = src_db->open(src_db, NULL, "0.tdb", NULL, DB_BTREE, DB_AUTO_COMMIT+DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
DB *dest_db = NULL;
r = db_create(&dest_db, env, 0); assert_zero(r);
r = dest_db->open(dest_db, NULL, "1.tdb", NULL, DB_BTREE, DB_AUTO_COMMIT+DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
DB_TXN *txn = NULL;
r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r);
// insert some
int n = 246723;
for (int i = 0; i < n; i++) {
int k = htonl(i);
int v = i;
DBT key; dbt_init(&key, &k, sizeof k);
DBT val; dbt_init(&val, &v, sizeof v);
r = src_db->put(src_db, txn, &key, &val, 0); assert_zero(r);
}
// run the indexer
struct indexer_arg indexer_arg = { env, src_db, 1, &dest_db };
toku_pthread_t pid;
r = toku_pthread_create(&pid, NULL, indexer_thread, &indexer_arg); assert_zero(r);
if (verbose) fprintf(stderr, "commit start\n");
r = txn->commit(txn, 0); assert_zero(r);
if (verbose) fprintf(stderr, "commit end\n");
void *ret;
r = toku_pthread_join(pid, &ret); assert_zero(r);
verify_full(env, src_db, n);
verify_full(env, dest_db, n);
r = src_db->close(src_db, 0); assert_zero(r);
r = dest_db->close(dest_db, 0); assert_zero(r);
r = env->close(env, 0); assert_zero(r);
}
int
test_main(int argc, char * const argv[]) {
int r;
// parse_args(argc, argv);
for (int i = 1; i < argc; i++) {
char * const arg = argv[i];
if (strcmp(arg, "-v") == 0) {
verbose++;
continue;
}
if (strcmp(arg, "-q") == 0) {
verbose = 0;
continue;
}
}
r = system("rm -rf " ENVDIR); assert_zero(r);
r = toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
run_test();
return 0;
}
......@@ -16,40 +16,49 @@
#include "indexer-internal.h"
#include "xids-internal.h"
typedef enum {
TOKUTXN_NOT_LIVE, TOKUTXN_LIVE, TOKUTXN_COMMIT, TOKUTXN_ABORT,
} TOKUTXN_STATE;
struct txn {
TXNID xid;
TOKUTXN_STATE state;
};
struct live {
int n;
int o;
TXNID *xids;
struct txn *txns;
};
static void
live_init(struct live *live) {
live->n = live->o = 0;
live->xids = NULL;
live->txns = NULL;
}
static void
live_destroy(struct live *live) {
toku_free(live->xids);
toku_free(live->txns);
}
static void
live_add(struct live *live, TXNID xid) {
live_add(struct live *live, TXNID xid, TOKUTXN_STATE state) {
if (live->o >= live->n) {
int newn = live->n == 0 ? 1 : live->n * 2;
live->xids = (TXNID *) toku_realloc(live->xids, newn * sizeof (TXNID));
resource_assert(live->xids);
live->txns = (struct txn *) toku_realloc(live->txns, newn * sizeof (struct txn));
resource_assert(live->txns);
live->n = newn;
}
live->xids[live->o++] = xid;
live->txns[live->o++] = (struct txn ) { xid, state };
}
static int
is_live(struct live *live, TXNID xid) {
int r = 0;
txn_state(struct live *live, TXNID xid) {
int r = TOKUTXN_NOT_LIVE;
for (int i = 0; i < live->o; i++) {
if (live->xids[i] == xid) {
r = 1;
if (live->txns[i].xid == xid) {
r = live->txns[i].state;
break;
}
}
......@@ -189,9 +198,9 @@ static DB_INDEXER *test_indexer = NULL;
static DB *test_hotdb = NULL;
static int
test_is_xid_live(DB_INDEXER *indexer, TXNID xid) {
test_xid_state(DB_INDEXER *indexer, TXNID xid) {
invariant(indexer == test_indexer);
int r = is_live(&live_xids, xid);
int r = txn_state(&live_xids, xid);
return r;
}
......@@ -199,7 +208,7 @@ static int
test_lock_key(DB_INDEXER *indexer, TXNID xid, DB *hotdb, DBT *key) {
invariant(indexer == test_indexer);
invariant(hotdb == test_hotdb);
invariant(test_is_xid_live(indexer, xid));
invariant(test_xid_state(indexer, xid) == TOKUTXN_LIVE);
printf("lock [%lu] ", xid);
print_dbt(key);
printf("\n");
......@@ -325,7 +334,22 @@ read_test(char *testname, ULE ule) {
// live xid...
if (strcmp(fields[0], "live") == 0) {
for (int i = 1; i < nfields; i++)
live_add(&live_xids, atoll(fields[i]));
live_add(&live_xids, atoll(fields[i]), TOKUTXN_LIVE);
continue;
}
// xid <XID> [live|committing|aborting]
if (strcmp(fields[0], "xid") == 0 && nfields == 3) {
TXNID xid = atoll(fields[1]);
TOKUTXN_STATE state = TOKUTXN_NOT_LIVE;
if (strcmp(fields[2], "live") == 0)
state = TOKUTXN_LIVE;
else if (strcmp(fields[2], "committing") == 0)
state = TOKUTXN_COMMIT;
else if (strcmp(fields[2], "aborting") == 0)
state = TOKUTXN_ABORT;
else
assert(0);
live_add(&live_xids, xid, state);
continue;
}
// key KEY
......@@ -410,7 +434,7 @@ run_test(char *envdir, char *testname) {
r = env->create_indexer(env, txn, &indexer, src_db, 1, &dest_db, NULL, 0); assert_zero(r);
// set test callbacks
indexer->i->test_is_xid_live = test_is_xid_live;
indexer->i->test_xid_state = test_xid_state;
indexer->i->test_lock_key = test_lock_key;
indexer->i->test_delete_provisional = test_delete_provisional;
indexer->i->test_delete_committed = test_delete_committed;
......
......@@ -37,11 +37,13 @@ live <XIDLIST>
the live transaction set is initially empty
xid <XID> [live|committing|aborting]
== push a delete transaction record onto the leaf entry stack ==
delete <committed|provisional> <XID>
delete [committed|provisional] <XID>
== push an insert transaction records onto the leaf entry stack ==
insert <committed|provisional> <XID> <VALUE>
insert [committed|provisional] <XID> <VALUE>
== push a placeholder onto the leaf entry stack ==
placeholder <XID>
......
xid 100 aborting
key k1
delete committed 0
insert provisional 100 v100
xid 100 aborting
key k1
insert committed 0 v100
insert provisional 100 v200
xid 100 committing
key k1
delete committed 0
insert provisional 100 v100
insert_committed [0] v100 k1
delete_committed [100] v100
insert_committed [100] v200 k1
commit_any [100] v100
xid 100 committing
key k1
insert committed 0 v100
insert provisional 100 v200
insert_committed [0] v0 k1
delete_provisional [200] v0
lock [200] v0
insert_provisional [200] v200 k1
lock [200] v200
xid 200 live
xid 201 aborting
key k1
insert committed 0 v0
insert provisional 200 v200
insert provisional 201 v201
insert provisional 202 v202
insert_committed [0] v0 k1
delete_provisional [200] v0
lock [200] v0
insert_provisional [200] v200 k1
lock [200] v200
xid 200 live
xid 201 aborting
key k1
insert committed 0 v0
insert provisional 200 v200
insert provisional 201 v201
insert_committed [0] v0 k1
delete_provisional [200] v0
lock [200] v0
insert_provisional [200] v200 k1
lock [200] v200
delete_provisional [200] v200
lock [200] v200
insert_provisional [200] v201 k1
lock [200] v201
xid 200 live
xid 201 committing
key k1
insert committed 0 v0
insert provisional 200 v200
insert provisional 201 v201
......@@ -2345,7 +2345,8 @@ toku_txn_release_locks(DB_TXN* txn) {
static void
ydb_yield (voidfp f, void *fv, void *UU(v)) {
toku_ydb_unlock();
if (f) f(fv);
if (f)
f(fv);
toku_ydb_lock();
}
......
......@@ -31,6 +31,8 @@ uint64_t toku_test_get_latest_lsn(DB_ENV *env) __attribute__((__visibility__("de
// test-only function
extern int toku_test_get_checkpointing_user_data_status(void) __attribute__((__visibility__("default")));
void toku_test_set_yield_callback(void (*yield)(void *extra), void *extra) __attribute__((__visibility__("default")));
#if defined(__cplusplus)
}
#endif
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment