Commit 8cd9b9e7 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

abort live txns when the env closes [t:1962]

git-svn-id: file:///svn/toku/tokudb@14725 c7de825b-a66e-492c-adef-691d508d4ae1
parent a00c0094
......@@ -277,7 +277,8 @@ struct __toku_db_txn {
struct __toku_db_txn_internal *i;
#define db_txn_struct_i(x) ((x)->i)
int (*txn_stat)(DB_TXN *, struct txn_stat **);
void* __toku_dummy0[7];
struct { void *next, *prev; } open_txns;
void* __toku_dummy0[6];
char __toku_dummy1[24];
void *api_internal; /* 32-bit offset=68 size=4, 64=bit offset=112 size=8 */
void* __toku_dummy2[1];
......
......@@ -294,7 +294,8 @@ struct __toku_db_txn {
struct __toku_db_txn_internal *i;
#define db_txn_struct_i(x) ((x)->i)
int (*txn_stat)(DB_TXN *, struct txn_stat **);
void* __toku_dummy0[12];
struct { void *next, *prev; } open_txns;
void* __toku_dummy0[11];
char __toku_dummy1[24];
void *api_internal; /* 32-bit offset=88 size=4, 64=bit offset=152 size=8 */
void* __toku_dummy2[2];
......
......@@ -301,7 +301,8 @@ struct __toku_db_txn {
struct __toku_db_txn_internal *i;
#define db_txn_struct_i(x) ((x)->i)
int (*txn_stat)(DB_TXN *, struct txn_stat **);
void* __toku_dummy0[15];
struct { void *next, *prev; } open_txns;
void* __toku_dummy0[14];
char __toku_dummy1[8];
void *api_internal; /* 32-bit offset=84 size=4, 64=bit offset=160 size=8 */
void* __toku_dummy2[2];
......
......@@ -301,7 +301,8 @@ struct __toku_db_txn {
struct __toku_db_txn_internal *i;
#define db_txn_struct_i(x) ((x)->i)
int (*txn_stat)(DB_TXN *, struct txn_stat **);
void* __toku_dummy0[15];
struct { void *next, *prev; } open_txns;
void* __toku_dummy0[14];
char __toku_dummy1[8];
void *api_internal; /* 32-bit offset=84 size=4, 64=bit offset=160 size=8 */
void* __toku_dummy2[2];
......
......@@ -305,7 +305,8 @@ struct __toku_db_txn {
struct __toku_db_txn_internal *i;
#define db_txn_struct_i(x) ((x)->i)
int (*txn_stat)(DB_TXN *, struct txn_stat **);
void* __toku_dummy0[16];
struct { void *next, *prev; } open_txns;
void* __toku_dummy0[15];
char __toku_dummy1[8];
void *api_internal; /* 32-bit offset=88 size=4, 64=bit offset=168 size=8 */
void* __toku_dummy2[2];
......
......@@ -419,7 +419,11 @@ int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__un
assert(sizeof(db_txn_fields32)==sizeof(db_txn_fields64));
{
printf("struct txn_stat {\n u_int64_t rolltmp_raw_count;\n};\n");
const char *extra[] = {"int (*txn_stat)(DB_TXN *, struct txn_stat **)"};
const char *extra[] = {
"int (*txn_stat)(DB_TXN *, struct txn_stat **)",
"struct { void *next, *prev; } open_txns",
NULL,
};
print_struct("db_txn", 1, db_txn_fields32, db_txn_fields64, sizeof(db_txn_fields32)/sizeof(db_txn_fields32[0]), extra);
}
......
......@@ -250,6 +250,7 @@ struct __toku_db_txn {
struct __toku_db_txn_internal ii;
#define db_txn_struct_i(x) (&(x)->ii)
int (*txn_stat)(DB_TXN *, struct txn_stat **);
struct { void *next, *prev; } open_txns;
void *api_internal;
int (*abort) (DB_TXN *);
int (*commit) (DB_TXN*, u_int32_t);
......
......@@ -250,6 +250,7 @@ struct __toku_db_txn {
struct __toku_db_txn_internal ii;
#define db_txn_struct_i(x) (&(x)->ii)
int (*txn_stat)(DB_TXN *, struct txn_stat **);
struct { void *next, *prev; } open_txns;
void *api_internal;
int (*abort) (DB_TXN *);
int (*commit) (DB_TXN*, u_int32_t);
......
// this test makes sure the LSN filtering is used during recovery
#include <sys/stat.h>
#include <fcntl.h>
#include "test.h"
const int envflags = DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN;
static void run_test (void) {
int r;
system("rm -rf " ENVDIR);
toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO);
DB_ENV *env;
r = db_env_create(&env, 0); CKERR(r);
r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
DB_TXN *txn;
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
DB_TXN *txnb;
r = env->txn_begin(env, txn, &txnb, 0); CKERR(r);
r = env->close(env, 0);
assert(r == EINVAL);
#if 0
r = txn->abort(txn); CKERR(r);
r = env->close(env, 0); CKERR(r);
#endif
}
static void run_recover (void) {
DB_ENV *env;
int r;
// run recovery
r = db_env_create(&env, 0); CKERR(r);
r = env->open(env, ENVDIR, envflags + DB_RECOVER, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r = env->close(env, 0); CKERR(r);
exit(0);
}
static void run_no_recover (void) {
DB_ENV *env;
int r;
r = db_env_create(&env, 0); CKERR(r);
r = env->open(env, ENVDIR, envflags & ~DB_RECOVER, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r = env->close(env, 0); CKERR(r);
exit(0);
}
const char *cmd;
BOOL do_test=FALSE, do_recover=FALSE, do_recover_only=FALSE, do_no_recover = FALSE;
static void test_parse_args (int argc, char *argv[]) {
int resultcode;
cmd = argv[0];
argc--; argv++;
while (argc>0) {
if (strcmp(argv[0], "-v") == 0) {
verbose++;
} else if (strcmp(argv[0],"-q")==0) {
verbose--;
if (verbose<0) verbose=0;
} else if (strcmp(argv[0], "--test")==0) {
do_test=TRUE;
} else if (strcmp(argv[0], "--recover") == 0) {
do_recover=TRUE;
} else if (strcmp(argv[0], "--recover-only") == 0) {
do_recover_only=TRUE;
} else if (strcmp(argv[0], "--no-recover") == 0) {
do_no_recover=TRUE;
} else if (strcmp(argv[0], "-h")==0) {
resultcode=0;
do_usage:
fprintf(stderr, "Usage:\n%s [-v|-q]* [-h] {--test | --recover } \n", cmd);
exit(resultcode);
} else {
fprintf(stderr, "Unknown arg: %s\n", argv[0]);
resultcode=1;
goto do_usage;
}
argc--;
argv++;
}
}
int test_main (int argc, char *argv[]) {
test_parse_args(argc, argv);
if (do_test) {
run_test();
} else if (do_recover) {
run_recover();
} else if (do_recover_only) {
run_recover();
} else if (do_no_recover) {
run_no_recover();
}
return 0;
}
......@@ -40,7 +40,6 @@ typedef void (*toku_env_errcall_t)(const DB_ENV *, const char *, const char *);
struct __toku_db_env_internal {
int is_panicked; // if nonzero, then its an error number
char *panic_string;
int ref_count;
u_int32_t open_flags;
int open_mode;
toku_env_errcall_t errcall;
......@@ -58,6 +57,7 @@ struct __toku_db_env_internal {
CACHETABLE cachetable;
TOKULOGGER logger;
toku_ltm* ltm;
struct list open_txns;
};
/* *********************************************************
......
......@@ -98,20 +98,36 @@ static int toku_env_set_data_dir(DB_ENV * env, const char *dir);
static int toku_env_set_lg_dir(DB_ENV * env, const char *dir);
static int toku_env_set_tmp_dir(DB_ENV * env, const char *tmp_dir);
static inline void env_add_ref(DB_ENV *env) {
++env->i->ref_count;
static inline int env_opened(DB_ENV *env) {
return env->i->cachetable != 0;
}
static inline void env_unref(DB_ENV *env) {
assert(env->i->ref_count > 0);
if (--env->i->ref_count == 0)
toku_env_close(env, 0);
static void env_init_open_txn(DB_ENV *env) {
list_init(&env->i->open_txns);
}
static inline int env_opened(DB_ENV *env) {
return env->i->cachetable != 0;
// add a txn to the list of open txn's
static void env_add_open_txn(DB_ENV *env, DB_TXN *txn) {
list_push(&env->i->open_txns, (struct list *) (void *) &txn->open_txns);
}
// remove a txn from the list of open txn's
static void env_remove_open_txn(DB_ENV *UU(env), DB_TXN *txn) {
list_remove((struct list *) (void *) &txn->open_txns);
}
static int toku_txn_abort(DB_TXN * txn);
// abort all of the open txn's
static int env_abort_all_open_txns(DB_ENV *env) {
int r = list_empty(&env->i->open_txns) ? 0 : EINVAL;
while (!list_empty(&env->i->open_txns)) {
struct list *list = list_head(&env->i->open_txns);
DB_TXN *txn = list_struct(list, DB_TXN, open_txns);
toku_txn_abort(txn);
}
return r;
}
/* db methods */
static inline int db_opened(DB *db) {
......@@ -463,6 +479,8 @@ static int toku_env_close(DB_ENV * env, u_int32_t flags) {
char *panic_string = env->i->panic_string;
env->i->panic_string = 0;
int open_txns_on_close = env_abort_all_open_txns(env);
// Even if the env is panicked, try to close as much as we can.
int r0=0,r1=0;
if (env->i->cachetable) {
......@@ -517,9 +535,10 @@ static int toku_env_close(DB_ENV * env, u_int32_t flags) {
toku_free(env->i);
toku_free(env);
ydb_unref();
if ( (flags!=0) && !(flags==DB_CLOSE_DONT_TRIM_LOG) ) return EINVAL;
if (r0) return r0;
if (r1) return r1;
if ((flags!=0) && !(flags==DB_CLOSE_DONT_TRIM_LOG)) return EINVAL;
if (open_txns_on_close) return EINVAL;
return is_panicked;
}
......@@ -1010,10 +1029,10 @@ static int toku_env_create(DB_ENV ** envp, u_int32_t flags) {
result->i->dup_compare = toku_default_compare_fun;
result->i->is_panicked=0;
result->i->panic_string = 0;
result->i->ref_count = 1;
result->i->errcall = 0;
result->i->errpfx = 0;
result->i->errfile = 0;
env_init_open_txn(result);
r = toku_ltm_create(&result->i->ltm, __toku_env_default_max_locks,
toku_db_lt_panic,
......@@ -1101,6 +1120,7 @@ static int toku_txn_commit(DB_TXN * txn, u_int32_t flags) {
assert(db_txn_struct_i(txn->parent)->child == txn);
db_txn_struct_i(txn->parent)->child=NULL;
}
env_remove_open_txn(txn->mgrp, txn);
//toku_ydb_notef("flags=%d\n", flags);
int nosync = (flags & DB_TXN_NOSYNC)!=0 || (db_txn_struct_i(txn)->flags&DB_TXN_NOSYNC);
flags &= ~DB_TXN_NOSYNC;
......@@ -1156,6 +1176,7 @@ static int toku_txn_abort(DB_TXN * txn) {
assert(db_txn_struct_i(txn->parent)->child == txn);
db_txn_struct_i(txn->parent)->child=NULL;
}
env_remove_open_txn(txn->mgrp, txn);
//int r = toku_logger_abort(db_txn_struct_i(txn)->tokutxn, ydb_yield, NULL);
int r = toku_txn_abort_txn(db_txn_struct_i(txn)->tokutxn, ydb_yield, NULL);
int r2 = toku_txn_release_locks(txn);
......@@ -1266,6 +1287,7 @@ static int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t f
assert(!db_txn_struct_i(result->parent)->child);
db_txn_struct_i(result->parent)->child = result;
}
env_add_open_txn(env, result);
*txn = result;
return 0;
}
......@@ -1312,7 +1334,6 @@ db_close_before_brt(DB *db, u_int32_t UU(flags)) {
// printf("%s:%d %d=__toku_db_close(%p)\n", __FILE__, __LINE__, r, db);
// Even if panicked, let's close as much as we can.
int is_panicked = toku_env_is_panicked(db->dbenv);
env_unref(db->dbenv);
toku_free(db->i->fname);
toku_free(db->i->full_fname);
toku_sdbt_cleanup(&db->i->skey);
......@@ -3910,11 +3931,9 @@ static int toku_db_create(DB ** db, DB_ENV * env, u_int32_t flags) {
if (!env_opened(env))
return EINVAL;
env_add_ref(env);
DB *MALLOC(result);
if (result == 0) {
env_unref(env);
return ENOMEM;
}
memset(result, 0, sizeof *result);
......@@ -3953,7 +3972,6 @@ static int toku_db_create(DB ** db, DB_ENV * env, u_int32_t flags) {
MALLOC(result->i);
if (result->i == 0) {
toku_free(result);
env_unref(env);
return ENOMEM;
}
memset(result->i, 0, sizeof *result->i);
......@@ -3967,7 +3985,6 @@ static int toku_db_create(DB ** db, DB_ENV * env, u_int32_t flags) {
if (r != 0) {
toku_free(result->i);
toku_free(result);
env_unref(env);
return r;
}
r = toku_brt_set_bt_compare(result->i->brt, env->i->bt_compare);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment