Commit 8457efcd authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:4896], [t:4876], fix bug in indexer-undo-do.c, remove db->i->refs

git-svn-id: file:///svn/toku/tokudb@44045 c7de825b-a66e-492c-adef-691d508d4ae1
parent 9919eb8d
...@@ -192,9 +192,17 @@ indexer_undo_do_provisional(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) { ...@@ -192,9 +192,17 @@ indexer_undo_do_provisional(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) {
// scan the provisional stack from the outermost to the innermost transaction record // scan the provisional stack from the outermost to the innermost transaction record
uint32_t num_committed = ule_get_num_committed(ule); uint32_t num_committed = ule_get_num_committed(ule);
uint32_t num_provisional = ule_get_num_provisional(ule); uint32_t num_provisional = ule_get_num_provisional(ule);
TXNID outermost_xid = uxr_get_txnid(ule_get_uxr(ule, num_committed)); BOOL outermost_retired = FALSE;
TOKUTXN_STATE outermost_xid_state = indexer_xid_state(indexer, outermost_xid); TXNID outermost_xid = TXNID_NONE;
BOOL outermost_retired = outermost_xid_state == TOKUTXN_RETIRED; TOKUTXN_STATE outermost_xid_state = TOKUTXN_RETIRED;
if (num_provisional) {
outermost_xid = uxr_get_txnid(ule_get_uxr(ule, num_committed));
outermost_xid_state = indexer_xid_state(indexer, outermost_xid);
outermost_retired = outermost_xid_state == TOKUTXN_RETIRED;
}
else {
outermost_retired = TRUE;
}
if (outermost_retired) { if (outermost_retired) {
toku_txn_manager_resume(txn_manager); toku_txn_manager_resume(txn_manager);
txn_manager_suspended = FALSE; txn_manager_suspended = FALSE;
......
...@@ -104,20 +104,6 @@ disassociate_indexer_from_hot_dbs(DB_INDEXER *indexer) { ...@@ -104,20 +104,6 @@ disassociate_indexer_from_hot_dbs(DB_INDEXER *indexer) {
} }
} }
static void
indexer_add_refs(DB_INDEXER *indexer) {
toku_db_add_ref(indexer->i->src_db);
for (int i = 0; i < indexer->i->N; i++)
toku_db_add_ref(indexer->i->dest_dbs[i]);
}
static void
indexer_release_refs(DB_INDEXER *indexer) {
toku_db_release_ref(indexer->i->src_db);
for (int i = 0; i < indexer->i->N; i++)
toku_db_release_ref(indexer->i->dest_dbs[i]);
}
/* /*
* free_indexer_resources() frees all of the resources associated with * free_indexer_resources() frees all of the resources associated with
* struct __toku_indexer_internal * struct __toku_indexer_internal
...@@ -133,7 +119,6 @@ free_indexer_resources(DB_INDEXER *indexer) { ...@@ -133,7 +119,6 @@ free_indexer_resources(DB_INDEXER *indexer) {
indexer->i->fnums = NULL; indexer->i->fnums = NULL;
} }
indexer_undo_do_destroy(indexer); indexer_undo_do_destroy(indexer);
indexer_release_refs(indexer);
// indexer->i // indexer->i
toku_free(indexer->i); toku_free(indexer->i);
indexer->i = NULL; indexer->i = NULL;
...@@ -231,7 +216,6 @@ create_exit: ...@@ -231,7 +216,6 @@ create_exit:
if ( rval == 0 ) { if ( rval == 0 ) {
indexer_undo_do_init(indexer); indexer_undo_do_init(indexer);
indexer_add_refs(indexer);
*indexerp = indexer; *indexerp = indexer;
......
...@@ -102,22 +102,6 @@ struct __toku_loader_internal { ...@@ -102,22 +102,6 @@ struct __toku_loader_internal {
char **inames_in_env; /* [N] inames of new files to be created */ char **inames_in_env; /* [N] inames of new files to be created */
}; };
static void
loader_add_refs(DB_LOADER *loader) {
if (loader->i->src_db)
toku_db_add_ref(loader->i->src_db);
for (int i = 0; i < loader->i->N; i++)
toku_db_add_ref(loader->i->dbs[i]);
}
static void
loader_release_refs(DB_LOADER *loader) {
if (loader->i->src_db)
toku_db_release_ref(loader->i->src_db);
for (int i = 0; i < loader->i->N; i++)
toku_db_release_ref(loader->i->dbs[i]);
}
/* /*
* free_loader_resources() frees all of the resources associated with * free_loader_resources() frees all of the resources associated with
* struct __toku_loader_internal * struct __toku_loader_internal
...@@ -127,7 +111,6 @@ loader_release_refs(DB_LOADER *loader) { ...@@ -127,7 +111,6 @@ loader_release_refs(DB_LOADER *loader) {
static void free_loader_resources(DB_LOADER *loader) static void free_loader_resources(DB_LOADER *loader)
{ {
if ( loader->i ) { if ( loader->i ) {
loader_release_refs(loader);
for (int i=0; i<loader->i->N; i++) { for (int i=0; i<loader->i->N; i++) {
if (loader->i->ekeys && if (loader->i->ekeys &&
loader->i->ekeys[i].data && loader->i->ekeys[i].data &&
...@@ -322,7 +305,6 @@ int toku_loader_create_loader(DB_ENV *env, ...@@ -322,7 +305,6 @@ int toku_loader_create_loader(DB_ENV *env,
} }
*blp = loader; *blp = loader;
create_exit: create_exit:
loader_add_refs(loader);
if (rval == 0) { if (rval == 0) {
(void) __sync_fetch_and_add(&STATUS_VALUE(LOADER_CREATE), 1); (void) __sync_fetch_and_add(&STATUS_VALUE(LOADER_CREATE), 1);
(void) __sync_fetch_and_add(&STATUS_VALUE(LOADER_CURRENT), 1); (void) __sync_fetch_and_add(&STATUS_VALUE(LOADER_CURRENT), 1);
......
/* -*- mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: expandtab:ts=8:sw=4:softtabstop=4:
#ident "Copyright (c) 2009 Tokutek Inc. All rights reserved."
#ident "$Id$"
// Verify that the indexer grabs references on the DBs
#include "test.h"
#include "toku_pthread.h"
#include <db.h>
#include <sys/stat.h>
DB_ENV *env;
enum {NUM_DBS=1};
static int put_multiple_generate(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val) {
dest_db = dest_db; src_db = src_db; dest_key = dest_key; dest_val = dest_val; src_key = src_key; src_val = src_val;
assert(0);
return 0;
}
char *src_name="src.db";
static void run_test(void)
{
int r;
r = system("rm -rf " ENVDIR); CKERR(r);
r = toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r = toku_os_mkdir(ENVDIR "/log", S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r = db_env_create(&env, 0); CKERR(r);
r = env->set_lg_dir(env, "log"); CKERR(r);
r = env->set_default_bt_compare(env, int64_dbt_cmp); CKERR(r);
r = env->set_generate_row_callback_for_put(env, put_multiple_generate); CKERR(r);
int envflags = DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_INIT_TXN | DB_CREATE | DB_PRIVATE | DB_INIT_LOG;
r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
env->set_errfile(env, stderr);
//Disable auto-checkpointing
r = env->checkpointing_set_period(env, 0); CKERR(r);
DB *src_db = NULL;
r = db_create(&src_db, env, 0); CKERR(r);
r = src_db->open(src_db, NULL, src_name, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
DB *dbs[NUM_DBS];
for (int i = 0; i < NUM_DBS; i++) {
r = db_create(&dbs[i], env, 0); CKERR(r);
char key_name[32];
sprintf(key_name, "key%d", i);
r = dbs[i]->open(dbs[i], NULL, key_name, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
dbs[i]->app_private = (void *) (intptr_t) i;
}
DB_TXN *hottxn;
r = env->txn_begin(env, NULL, &hottxn, 0); CKERR(r);
DB_INDEXER *indexer;
r = env->create_indexer(env, hottxn, &indexer, src_db, NUM_DBS, dbs, NULL, 0); CKERR(r);
r = src_db->close(src_db, 0);
assert(r == EBUSY); // close the src_db with an active indexer, should not succeed
for(int i = 0; i < NUM_DBS; i++) {
r = dbs[i]->close(dbs[i], 0);
assert(r == EBUSY); // close a dest_db, should not succeed
}
r = indexer->abort(indexer); CKERR(r);
r = src_db->close(src_db, 0); CKERR(r);
r = hottxn->commit(hottxn, DB_TXN_SYNC); CKERR(r);
for(int i = 0;i < NUM_DBS; i++) {
r = dbs[i]->close(dbs[i], 0); CKERR(r);
}
r = env->close(env, 0); CKERR(r);
}
// ------------ infrastructure ----------
static void do_args(int argc, char * const argv[]);
int test_main(int argc, char * const *argv) {
do_args(argc, argv);
run_test();
return 0;
}
static void do_args(int argc, char * const argv[]) {
int resultcode;
char *cmd = argv[0];
argc--; argv++;
while (argc>0) {
if (strcmp(argv[0], "-v")==0) {
verbose++;
} else if (strcmp(argv[0],"-q")==0) {
verbose--;
if (verbose<0) verbose=0;
} else if (strcmp(argv[0], "-h")==0) {
resultcode=0;
do_usage:
fprintf(stderr, "Usage:\n%s\n", cmd);
exit(resultcode);
} else {
fprintf(stderr, "Unknown arg: %s\n", argv[0]);
resultcode=1;
goto do_usage;
}
argc--;
argv++;
}
}
/* -*- mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: expandtab:ts=8:sw=4:softtabstop=4:
#ident "Copyright (c) 2010 Tokutek Inc. All rights reserved."
#ident "$Id$"
// Verify that the loader grabs references on the DB's
#include "test.h"
#include <db.h>
static int loader_flags = 0;
static char *envdir = ENVDIR;
static int put_multiple_generate(DB *UU(dest_db), DB *UU(src_db), DBT *UU(dest_key), DBT *UU(dest_val), const DBT *UU(src_key), const DBT *UU(src_val)) {
return ENOMEM;
}
static void loader_open_abort(int ndb) {
int r;
char rmcmd[32 + strlen(envdir)];
snprintf(rmcmd, sizeof rmcmd, "rm -rf %s", envdir);
r = system(rmcmd); CKERR(r);
r = toku_os_mkdir(envdir, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
DB_ENV *env;
r = db_env_create(&env, 0); CKERR(r);
r = env->set_generate_row_callback_for_put(env, put_multiple_generate);
CKERR(r);
int envflags = DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_INIT_TXN | DB_CREATE | DB_PRIVATE;
r = env->open(env, envdir, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
env->set_errfile(env, stderr);
DB *dbs[ndb];
uint32_t db_flags[ndb];
uint32_t dbt_flags[ndb];
for (int i = 0; i < ndb; i++) {
db_flags[i] = DB_NOOVERWRITE;
dbt_flags[i] = 0;
r = db_create(&dbs[i], env, 0); CKERR(r);
char name[32];
sprintf(name, "db%d", i);
r = dbs[i]->open(dbs[i], NULL, name, NULL, DB_BTREE, DB_CREATE, 0666); CKERR(r);
}
DB_TXN *txn;
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
DB_LOADER *loader;
r = env->create_loader(env, txn, &loader, ndb > 0 ? dbs[0] : NULL, ndb, dbs, db_flags, dbt_flags, loader_flags); CKERR(r);
for (int i = 0; i < ndb; i++) {
r = dbs[i]->close(dbs[i], 0);
assert(r == EBUSY);
}
r = loader->close(loader); CKERR(r);
r = txn->commit(txn, 0); CKERR(r);
for (int i = 0; i < ndb; i++) {
r = dbs[i]->close(dbs[i], 0); CKERR(r);
}
r = env->close(env, 0); CKERR(r);
}
static void do_args(int argc, char * const argv[]) {
int resultcode;
char *cmd = argv[0];
argc--; argv++;
while (argc>0) {
if (strcmp(argv[0], "-h")==0) {
resultcode=0;
do_usage:
fprintf(stderr, "Usage: %s -h -v -q -p\n", cmd);
exit(resultcode);
} else if (strcmp(argv[0], "-v")==0) {
verbose++;
} else if (strcmp(argv[0],"-q")==0) {
verbose--;
if (verbose<0) verbose=0;
} else if (strcmp(argv[0], "-p") == 0) {
loader_flags = LOADER_USE_PUTS;
} else if (strcmp(argv[0], "-e") == 0) {
argc--; argv++;
if (argc > 0)
envdir = argv[0];
} else {
fprintf(stderr, "Unknown arg: %s\n", argv[0]);
resultcode=1;
goto do_usage;
}
argc--;
argv++;
}
}
int test_main(int argc, char * const *argv) {
do_args(argc, argv);
loader_open_abort(0);
loader_open_abort(1);
loader_open_abort(2);
return 0;
}
...@@ -33,15 +33,11 @@ struct __toku_db_internal { ...@@ -33,15 +33,11 @@ struct __toku_db_internal {
char *dname; // dname is constant for this handle (handle must be closed before file is renamed) char *dname; // dname is constant for this handle (handle must be closed before file is renamed)
struct toku_list dbs_that_must_close_before_abort; struct toku_list dbs_that_must_close_before_abort;
DB_INDEXER *indexer; DB_INDEXER *indexer;
int refs; // reference count including indexers and loaders
}; };
int toku_db_set_indexer(DB *db, DB_INDEXER *indexer); int toku_db_set_indexer(DB *db, DB_INDEXER *indexer);
DB_INDEXER *toku_db_get_indexer(DB *db); DB_INDEXER *toku_db_get_indexer(DB *db);
void toku_db_add_ref(DB *db);
void toku_db_release_ref(DB *db);
#if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR == 1 #if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR == 1
typedef void (*toku_env_errcall_t)(const char *, char *); typedef void (*toku_env_errcall_t)(const char *, char *);
#elif DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR >= 3 #elif DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR >= 3
......
...@@ -114,28 +114,10 @@ create_iname(DB_ENV *env, u_int64_t id, char *hint, char *mark, int n) { ...@@ -114,28 +114,10 @@ create_iname(DB_ENV *env, u_int64_t id, char *hint, char *mark, int n) {
static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYPE dbtype, u_int32_t flags, int mode); static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYPE dbtype, u_int32_t flags, int mode);
void
toku_db_add_ref(DB *db) {
db->i->refs++;
}
void
toku_db_release_ref(DB *db){
db->i->refs--;
}
//DB->close() //DB->close()
int int
toku_db_close(DB * db) { toku_db_close(DB * db) {
int r = 0; int r = 0;
// the magic number one comes from the fact that only one loader
// or hot indexer may reference a DB at a time. when that changes,
// this will break.
if (db->i->refs != 1) {
r = EBUSY;
} else {
// TODO: assert(db->i->refs == 0) because we're screwed otherwise
db->i->refs = 0;
if (db_opened(db) && db->i->dname) { if (db_opened(db) && db->i->dname) {
// internal (non-user) dictionary has no dname // internal (non-user) dictionary has no dname
env_note_db_closed(db->dbenv, db); // tell env that this db is no longer in use by the user of this api (user-closed, may still be in use by fractal tree internals) env_note_db_closed(db->dbenv, db); // tell env that this db is no longer in use by the user of this api (user-closed, may still be in use by fractal tree internals)
...@@ -158,7 +140,6 @@ toku_db_close(DB * db) { ...@@ -158,7 +140,6 @@ toku_db_close(DB * db) {
toku_free(db->i); toku_free(db->i);
toku_free(db); toku_free(db);
} }
}
return r; return r;
} }
...@@ -815,7 +796,6 @@ int toku_setup_db_internal (DB **dbp, DB_ENV *env, u_int32_t flags, FT_HANDLE br ...@@ -815,7 +796,6 @@ int toku_setup_db_internal (DB **dbp, DB_ENV *env, u_int32_t flags, FT_HANDLE br
memset(result->i, 0, sizeof *result->i); memset(result->i, 0, sizeof *result->i);
toku_list_init(&result->i->dbs_that_must_close_before_abort); toku_list_init(&result->i->dbs_that_must_close_before_abort);
result->i->ft_handle = brt; result->i->ft_handle = brt;
result->i->refs = 1;
result->i->opened = is_open; result->i->opened = is_open;
*dbp = result; *dbp = result;
return 0; return 0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment