Commit 2788c919 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

#4685 get the hot indexer to work with txn's in the preparing state closes[t:4685]

git-svn-id: file:///svn/toku/tokudb@41614 c7de825b-a66e-492c-adef-691d508d4ae1
parent 7db30802
......@@ -127,14 +127,7 @@ int toku_txn_ignore_add(TOKUTXN txn, FILENUM filenum);
int toku_txn_ignore_remove(TOKUTXN txn, FILENUM filenum);
int toku_txn_ignore_contains(TOKUTXN txn, FILENUM filenum);
enum tokutxn_state {
TOKUTXN_LIVE, // initial txn state
TOKUTXN_PREPARING, // txn is preparing (or prepared)
TOKUTXN_COMMITTING, // txn in the process of committing
TOKUTXN_ABORTING, // txn in the process of aborting
TOKUTXN_RETIRED, // txn no longer exists
};
typedef enum tokutxn_state TOKUTXN_STATE;
#include "txn_state.h"
TOKUTXN_STATE toku_txn_get_state(TOKUTXN txn);
......
#if !defined(TOKUTXN_STATE_H)
#define TOKUTXN_STATE_H
// this is a separate file so that the hotindexing tests can see the txn states
enum tokutxn_state {
TOKUTXN_LIVE, // initial txn state
TOKUTXN_PREPARING, // txn is preparing (or prepared)
TOKUTXN_COMMITTING, // txn in the process of committing
TOKUTXN_ABORTING, // txn in the process of aborting
TOKUTXN_RETIRED, // txn no longer exists
};
typedef enum tokutxn_state TOKUTXN_STATE;
#endif
......@@ -10,6 +10,8 @@
#ifndef TOKU_INDEXER_INTERNAL_H
#define TOKU_INDEXER_INTERNAL_H
#include "txn_state.h"
// the indexer_commit_keys is an ordered set of keys described by a DBT in the keys array.
// the array is a resizeable array with max size "max_keys" and current size "current_keys".
// the ordered set is used by the hotindex undo function to collect the commit keys.
......@@ -42,7 +44,7 @@ struct __toku_indexer_internal {
// test functions
int (*undo_do)(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule);
int (*test_xid_state)(DB_INDEXER *indexer, TXNID xid);
TOKUTXN_STATE (*test_xid_state)(DB_INDEXER *indexer, TXNID xid);
int (*test_lock_key)(DB_INDEXER *indexer, TXNID xid, DB *hotdb, DBT *key);
int (*test_delete_provisional)(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids);
int (*test_delete_committed)(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids);
......
/* -*- mode: C; c-basic-offset: 4 -*- */
/*
* Copyright (c) 2010-2011 Tokutek Inc. All rights reserved.
* Copyright (c) 2010-2012 Tokutek Inc. All rights reserved.
* The technology is licensed by the Massachusetts Institute of Technology,
* Rutgers State University of New Jersey, and the Research Foundation of
* State University of New York at Stony Brook under United States of America
......@@ -52,8 +52,7 @@ static void
indexer_commit_keys_add(struct indexer_commit_keys *keys, size_t length, void *ptr) {
if (keys->current_keys >= keys->max_keys) {
int new_max_keys = keys->max_keys == 0 ? 256 : keys->max_keys * 2;
keys->keys = (DBT *) toku_realloc(keys->keys, new_max_keys * sizeof (DBT));
resource_assert(keys->keys);
keys->keys = (DBT *) toku_xrealloc(keys->keys, new_max_keys * sizeof (DBT));
for (int i = keys->current_keys; i < new_max_keys; i++)
toku_init_dbt_flags(&keys->keys[i], DB_DBT_REALLOC);
keys->max_keys = new_max_keys;
......@@ -125,7 +124,7 @@ indexer_undo_do_committed(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) {
// placeholders in the committed stack are not allowed
if (uxr_is_placeholder(uxr))
invariant(0);
assert(0);
// undo
if (xrindex > 0) {
......@@ -143,7 +142,7 @@ indexer_undo_do_committed(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) {
indexer_commit_keys_add(&indexer->i->commit_keys, indexer->i->hotkey.size, indexer->i->hotkey.data);
}
} else
invariant(0);
assert(0);
}
if (result != 0)
break;
......@@ -161,7 +160,7 @@ indexer_undo_do_committed(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) {
indexer_commit_keys_add(&indexer->i->commit_keys, indexer->i->hotkey.size, indexer->i->hotkey.data);
}
} else
invariant(0);
assert(0);
// send commit messages if needed
for (int i = 0; result == 0 && i < indexer_commit_keys_valid(&indexer->i->commit_keys); i++)
......@@ -206,13 +205,26 @@ indexer_undo_do_provisional(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) {
outermost_xid = this_xid;
outermost_xid_state = this_xid_state;
result = indexer_set_xid(indexer, this_xid, &xids); // always add the outermost xid to the XIDS list
} else if (this_xid_state == TOKUTXN_LIVE)
result = indexer_append_xid(indexer, this_xid, &xids); // append a live xid to the XIDS list
} else {
switch (this_xid_state) {
case TOKUTXN_LIVE:
result = indexer_append_xid(indexer, this_xid, &xids); // append a live xid to the XIDS list
break;
case TOKUTXN_PREPARING:
assert(0); // not allowed
case TOKUTXN_COMMITTING:
case TOKUTXN_ABORTING:
case TOKUTXN_RETIRED:
break; // nothing to do
}
}
if (result != 0)
break;
if (outermost_xid_state != TOKUTXN_LIVE && xrindex > num_committed)
invariant(this_xid_state == TOKUTXN_RETIRED);
if (outermost_xid_state != TOKUTXN_LIVE && xrindex > num_committed) {
// if the outermost is not live, then the inner state must be retired. thats the way that the txn API works.
assert(this_xid_state == TOKUTXN_RETIRED);
}
if (uxr_is_placeholder(uxr))
continue; // skip placeholders
......@@ -229,20 +241,26 @@ indexer_undo_do_provisional(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) {
result = indexer_generate_hot_key_val(indexer, hotdb, ule, prevuxr, &indexer->i->hotkey, NULL);
if (result == 0) {
// send the delete message
if (outermost_xid_state == TOKUTXN_LIVE) {
invariant(this_xid_state != TOKUTXN_ABORTING);
switch (outermost_xid_state) {
case TOKUTXN_LIVE:
case TOKUTXN_PREPARING:
assert(this_xid_state != TOKUTXN_ABORTING);
result = indexer_brt_delete_provisional(indexer, hotdb, &indexer->i->hotkey, xids);
if (result == 0)
result = indexer_lock_key(indexer, hotdb, &indexer->i->hotkey, outermost_xid);
} else {
invariant(outermost_xid_state == TOKUTXN_RETIRED || outermost_xid_state == TOKUTXN_COMMITTING);
break;
case TOKUTXN_COMMITTING:
case TOKUTXN_RETIRED:
result = indexer_brt_delete_committed(indexer, hotdb, &indexer->i->hotkey, xids);
if (result == 0)
indexer_commit_keys_add(&indexer->i->commit_keys, indexer->i->hotkey.size, indexer->i->hotkey.data);
break;
case TOKUTXN_ABORTING: // can not happen since we stop processing the leaf entry if the outer most xr is aborting
assert(0);
}
}
} else
invariant(0);
assert(0);
}
if (result != 0)
break;
......@@ -255,23 +273,27 @@ indexer_undo_do_provisional(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) {
result = indexer_generate_hot_key_val(indexer, hotdb, ule, uxr, &indexer->i->hotkey, &indexer->i->hotval);
if (result == 0) {
// send the insert message
if (outermost_xid_state == TOKUTXN_LIVE) {
invariant(this_xid_state != TOKUTXN_ABORTING);
switch (outermost_xid_state) {
case TOKUTXN_LIVE:
case TOKUTXN_PREPARING:
assert(this_xid_state != TOKUTXN_ABORTING);
result = indexer_brt_insert_provisional(indexer, hotdb, &indexer->i->hotkey, &indexer->i->hotval, xids);
if (result == 0)
result = indexer_lock_key(indexer, hotdb, &indexer->i->hotkey, outermost_xid);
} else {
invariant(outermost_xid_state == TOKUTXN_RETIRED || outermost_xid_state == TOKUTXN_COMMITTING);
break;
case TOKUTXN_COMMITTING:
case TOKUTXN_RETIRED:
result = indexer_brt_insert_committed(indexer, hotdb, &indexer->i->hotkey, &indexer->i->hotval, xids);
#if 0
// no need to do this because we do implicit commits on inserts
if (result == 0)
if (0 && result == 0)
indexer_commit_keys_add(&indexer->i->commit_keys, indexer->i->hotkey.size, indexer->i->hotkey.data);
#endif
break;
case TOKUTXN_ABORTING: // can not happen since we stop processing the leaf entry if the outer most xr is aborting
assert(0);
}
}
} else
invariant(0);
assert(0);
if (result != 0)
break;
......@@ -360,24 +382,19 @@ indexer_generate_hot_key_val(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule, UXRH
// then return TOKUTXN_RETIRED.
static TOKUTXN_STATE
indexer_xid_state(DB_INDEXER *indexer, TXNID xid) {
TOKUTXN_STATE result = TOKUTXN_RETIRED;
TOKUTXN_STATE result;
// TEST
if (indexer->i->test_xid_state) {
int r = indexer->i->test_xid_state(indexer, xid);
switch (r) {
case 0: result = TOKUTXN_LIVE; break;
case 1: result = TOKUTXN_COMMITTING; break;
case 2: result = TOKUTXN_ABORTING; break;
case 3: result = TOKUTXN_RETIRED; break;
default: assert(0); break;
}
result = indexer->i->test_xid_state(indexer, xid);
} else {
DB_ENV *env = indexer->i->env;
TOKUTXN txn = NULL;
int r = toku_txnid2txn(env->i->logger, xid, &txn);
invariant(r == 0);
assert(r == 0);
if (txn)
result = toku_txn_get_state(txn);
else
result = TOKUTXN_RETIRED;
}
return result;
}
......@@ -393,7 +410,7 @@ indexer_lock_key(DB_INDEXER *indexer, DB *hotdb, DBT *key, TXNID outermost_live_
DB_ENV *env = indexer->i->env;
TOKUTXN txn = NULL;
result = toku_txnid2txn(env->i->logger, outermost_live_xid, &txn);
invariant(result == 0 && txn != NULL);
assert(result == 0 && txn != NULL);
result = toku_grab_write_lock(hotdb, key, txn);
}
return result;
......@@ -404,7 +421,7 @@ indexer_lock_key(DB_INDEXER *indexer, DB *hotdb, DBT *key, TXNID outermost_live_
// return FALSE.
static BOOL
indexer_find_prev_xr(DB_INDEXER *UU(indexer), ULEHANDLE ule, uint64_t xrindex, uint64_t *prev_xrindex) {
invariant(xrindex < ule_num_uxrs(ule));
assert(xrindex < ule_num_uxrs(ule));
BOOL prev_found = FALSE;
while (xrindex > 0) {
xrindex -= 1;
......@@ -427,7 +444,7 @@ indexer_get_innermost_live_txn(DB_INDEXER *indexer, XIDS xids) {
TXNID xid = xids_get_xid(xids, (u_int8_t)(num_xids-1));
TOKUTXN txn = NULL;
int result = toku_txnid2txn(env->i->logger, xid, &txn);
invariant(result == 0);
assert(result == 0);
return txn;
}
......@@ -443,7 +460,7 @@ indexer_brt_delete_provisional(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS
result = toku_ydb_check_avail_fs_space(indexer->i->env);
if (result == 0) {
TOKUTXN txn = indexer_get_innermost_live_txn(indexer, xids);
invariant(txn != NULL);
assert(txn != NULL);
result = toku_brt_maybe_delete (hotdb->i->brt, hotkey, txn, FALSE, ZERO_LSN, TRUE);
}
}
......@@ -477,7 +494,7 @@ indexer_brt_insert_provisional(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, DBT
result = toku_ydb_check_avail_fs_space(indexer->i->env);
if (result == 0) {
TOKUTXN txn = indexer_get_innermost_live_txn(indexer, xids);
invariant(txn != NULL);
assert(txn != NULL);
result = toku_brt_maybe_insert (hotdb->i->brt, hotkey, hotval, txn, FALSE, ZERO_LSN, TRUE, BRT_INSERT);
}
}
......
/* -*- mode: C; c-basic-offset: 4 -*- */
/*
* Copyright (c) 2010-2012 Tokutek Inc. All rights reserved.
* The technology is licensed by the Massachusetts Institute of Technology,
* Rutgers State University of New Jersey, and the Research Foundation of
* State University of New York at Stony Brook under United States of America
* Serial No. 11/760379 and to the patents and/or patent applications resulting from it.
*/
#ident "$Id$"
// test the hotindexer undo do function
// read a description of the live transactions and a leafentry from a test file, run the undo do function,
// and print out the actions taken by the undo do function while processing the leafentry
......@@ -15,10 +25,6 @@
#include "indexer-internal.h"
#include "xids-internal.h"
typedef enum {
TOKUTXN_LIVE, TOKUTXN_COMMITTING, TOKUTXN_ABORTING, TOKUTXN_RETIRED,
} TOKUTXN_STATE; // see txn.h
struct txn {
TXNID xid;
TOKUTXN_STATE state;
......@@ -53,7 +59,7 @@ live_add(struct live *live, TXNID xid, TOKUTXN_STATE state) {
}
static int
txn_state(struct live *live, TXNID xid) {
lookup_txn_state(struct live *live, TXNID xid) {
int r = TOKUTXN_RETIRED;
for (int i = 0; i < live->o; i++) {
if (live->txns[i].xid == xid) {
......@@ -196,10 +202,10 @@ put_callback(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_data, const DBT *
static DB_INDEXER *test_indexer = NULL;
static DB *test_hotdb = NULL;
static int
static TOKUTXN_STATE
test_xid_state(DB_INDEXER *indexer, TXNID xid) {
invariant(indexer == test_indexer);
int r = txn_state(&live_xids, xid);
TOKUTXN_STATE r = lookup_txn_state(&live_xids, xid);
return r;
}
......@@ -207,7 +213,8 @@ static int
test_lock_key(DB_INDEXER *indexer, TXNID xid, DB *hotdb, DBT *key) {
invariant(indexer == test_indexer);
invariant(hotdb == test_hotdb);
invariant(test_xid_state(indexer, xid) == TOKUTXN_LIVE);
TOKUTXN_STATE txn_state = test_xid_state(indexer, xid);
invariant(txn_state == TOKUTXN_LIVE || txn_state == TOKUTXN_PREPARING);
printf("lock [%lu] ", xid);
print_dbt(key);
printf("\n");
......@@ -342,6 +349,8 @@ read_test(char *testname, ULE ule) {
TOKUTXN_STATE state = TOKUTXN_RETIRED;
if (strcmp(fields[2], "live") == 0)
state = TOKUTXN_LIVE;
else if (strcmp(fields[2], "preparing") == 0)
state = TOKUTXN_PREPARING;
else if (strcmp(fields[2], "committing") == 0)
state = TOKUTXN_COMMITTING;
else if (strcmp(fields[2], "aborting") == 0)
......
......@@ -37,7 +37,7 @@ live <XIDLIST>
the live transaction set is initially empty
xid <XID> [live|committing|aborting]
xid <XID> [live|preparing|committing|aborting]
== push a delete transaction record onto the leaf entry stack ==
delete [committed|provisional] <XID>
......
insert_provisional [100] v100 k1
lock [100] v100
xid 100 preparing
key k1
delete committed 0
insert provisional 100 v100
insert_provisional [200] v200 k1
lock [200] v200
delete_provisional [200] v200
lock [200] v200
insert_provisional [200] v201 k1
lock [200] v201
xid 200 preparing
key k1
delete committed 0
insert provisional 200 v200
insert provisional 201 v201
......@@ -5,6 +5,7 @@
tests=""
verbose=0
valgrind=""
exitcode=0
for arg in $* ; do
if [[ $arg =~ --(.*)=(.*) ]] ; then
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment