Commit 7faf5d48 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:5029], make fix

git-svn-id: file:///svn/toku/tokudb@44327 c7de825b-a66e-492c-adef-691d508d4ae1
parent 4d74fb5c
...@@ -197,8 +197,15 @@ int toku_loader_create_loader(DB_ENV *env, ...@@ -197,8 +197,15 @@ int toku_loader_create_loader(DB_ENV *env,
XCALLOC(loader); // init to all zeroes (thus initializing the error_callback and poll_func) XCALLOC(loader); // init to all zeroes (thus initializing the error_callback and poll_func)
XCALLOC(loader->i); // init to all zeroes (thus initializing all pointers to NULL) XCALLOC(loader->i); // init to all zeroes (thus initializing all pointers to NULL)
DB_TXN* child_txn = NULL;
int using_txns = env->i->open_flags & DB_INIT_TXN;
if (using_txns) {
int r = env->txn_begin(env, txn, &child_txn, 0);
assert_zero(r);
}
loader->i->env = env; loader->i->env = env;
loader->i->txn = txn; loader->i->txn = child_txn;
loader->i->N = N; loader->i->N = N;
loader->i->dbs = dbs; loader->i->dbs = dbs;
loader->i->src_db = src_db; loader->i->src_db = src_db;
...@@ -228,7 +235,7 @@ int toku_loader_create_loader(DB_ENV *env, ...@@ -228,7 +235,7 @@ int toku_loader_create_loader(DB_ENV *env,
// lock tables and check empty // lock tables and check empty
for(int i=0;i<N;i++) { for(int i=0;i<N;i++) {
if (!(loader_flags&DB_PRELOCKED_WRITE)) { if (!(loader_flags&DB_PRELOCKED_WRITE)) {
r = toku_db_pre_acquire_table_lock(dbs[i], txn); r = toku_db_pre_acquire_table_lock(dbs[i], child_txn);
if (r!=0) break; if (r!=0) break;
} }
r = !toku_ft_is_empty_fast(dbs[i]->i->ft_handle); r = !toku_ft_is_empty_fast(dbs[i]->i->ft_handle);
...@@ -254,14 +261,14 @@ int toku_loader_create_loader(DB_ENV *env, ...@@ -254,14 +261,14 @@ int toku_loader_create_loader(DB_ENV *env,
loader->i->ekeys = NULL; loader->i->ekeys = NULL;
loader->i->evals = NULL; loader->i->evals = NULL;
LSN load_lsn; LSN load_lsn;
r = locked_load_inames(env, txn, N, dbs, new_inames_in_env, &load_lsn, use_ft_loader); r = locked_load_inames(env, child_txn, N, dbs, new_inames_in_env, &load_lsn, use_ft_loader);
if ( r!=0 ) { if ( r!=0 ) {
toku_free(new_inames_in_env); toku_free(new_inames_in_env);
toku_free(brts); toku_free(brts);
rval = r; rval = r;
goto create_exit; goto create_exit;
} }
TOKUTXN ttxn = txn ? db_txn_struct_i(txn)->tokutxn : NULL; TOKUTXN ttxn = child_txn ? db_txn_struct_i(child_txn)->tokutxn : NULL;
r = toku_ft_loader_open(&loader->i->ft_loader, r = toku_ft_loader_open(&loader->i->ft_loader,
loader->i->env->i->cachetable, loader->i->env->i->cachetable,
loader->i->env->i->generate_row_for_put, loader->i->env->i->generate_row_for_put,
...@@ -292,7 +299,7 @@ int toku_loader_create_loader(DB_ENV *env, ...@@ -292,7 +299,7 @@ int toku_loader_create_loader(DB_ENV *env,
for (int i=0; i<N; i++) { for (int i=0; i<N; i++) {
loader->i->ekeys[i].flags = DB_DBT_REALLOC; loader->i->ekeys[i].flags = DB_DBT_REALLOC;
loader->i->evals[i].flags = DB_DBT_REALLOC; loader->i->evals[i].flags = DB_DBT_REALLOC;
toku_ft_suppress_recovery_logs(dbs[i]->i->ft_handle, db_txn_struct_i(txn)->tokutxn); toku_ft_suppress_recovery_logs(dbs[i]->i->ft_handle, db_txn_struct_i(child_txn)->tokutxn);
} }
loader->i->ft_loader = NULL; loader->i->ft_loader = NULL;
// close the ft_loader and skip to the redirection // close the ft_loader and skip to the redirection
...@@ -311,6 +318,9 @@ int toku_loader_create_loader(DB_ENV *env, ...@@ -311,6 +318,9 @@ int toku_loader_create_loader(DB_ENV *env,
} }
else { else {
(void) __sync_fetch_and_add(&STATUS_VALUE(LOADER_CREATE_FAIL), 1); (void) __sync_fetch_and_add(&STATUS_VALUE(LOADER_CREATE_FAIL), 1);
if (child_txn) {
child_txn->abort(child_txn);
}
free_loader(loader); free_loader(loader);
} }
return rval; return rval;
...@@ -397,6 +407,7 @@ int toku_loader_close(DB_LOADER *loader) ...@@ -397,6 +407,7 @@ int toku_loader_close(DB_LOADER *loader)
{ {
(void) __sync_fetch_and_sub(&STATUS_VALUE(LOADER_CURRENT), 1); (void) __sync_fetch_and_sub(&STATUS_VALUE(LOADER_CURRENT), 1);
int r=0; int r=0;
DB_TXN* txn = loader->i->txn;
if ( loader->i->err_errno != 0 ) { if ( loader->i->err_errno != 0 ) {
if ( loader->i->error_callback != NULL ) { if ( loader->i->error_callback != NULL ) {
loader->i->error_callback(loader->i->dbs[loader->i->err_i], loader->i->err_i, loader->i->err_errno, &loader->i->err_key, &loader->i->err_val, loader->i->error_extra); loader->i->error_callback(loader->i->dbs[loader->i->err_i], loader->i->err_i, loader->i->err_errno, &loader->i->err_key, &loader->i->err_val, loader->i->error_extra);
...@@ -413,11 +424,21 @@ int toku_loader_close(DB_LOADER *loader) ...@@ -413,11 +424,21 @@ int toku_loader_close(DB_LOADER *loader)
r = ft_loader_close_and_redirect(loader); r = ft_loader_close_and_redirect(loader);
} }
} }
free_loader(loader); if (r==0) {
if (r==0) if (txn) {
int rr = txn->commit(txn, 0);
assert_zero(rr);
}
(void) __sync_fetch_and_add(&STATUS_VALUE(LOADER_CLOSE), 1); (void) __sync_fetch_and_add(&STATUS_VALUE(LOADER_CLOSE), 1);
else }
else {
if (txn) {
int rr = txn->abort(txn);
assert_zero(rr);
}
(void) __sync_fetch_and_add(&STATUS_VALUE(LOADER_CLOSE_FAIL), 1); (void) __sync_fetch_and_add(&STATUS_VALUE(LOADER_CLOSE_FAIL), 1);
}
free_loader(loader);
return r; return r;
} }
...@@ -426,6 +447,7 @@ int toku_loader_abort(DB_LOADER *loader) ...@@ -426,6 +447,7 @@ int toku_loader_abort(DB_LOADER *loader)
(void) __sync_fetch_and_sub(&STATUS_VALUE(LOADER_CURRENT), 1); (void) __sync_fetch_and_sub(&STATUS_VALUE(LOADER_CURRENT), 1);
(void) __sync_fetch_and_add(&STATUS_VALUE(LOADER_ABORT), 1); (void) __sync_fetch_and_add(&STATUS_VALUE(LOADER_ABORT), 1);
int r=0; int r=0;
DB_TXN* txn = loader->i->txn;
if ( loader->i->err_errno != 0 ) { if ( loader->i->err_errno != 0 ) {
if ( loader->i->error_callback != NULL ) { if ( loader->i->error_callback != NULL ) {
loader->i->error_callback(loader->i->dbs[loader->i->err_i], loader->i->err_i, loader->i->err_errno, &loader->i->err_key, &loader->i->err_val, loader->i->error_extra); loader->i->error_callback(loader->i->dbs[loader->i->err_i], loader->i->err_i, loader->i->err_errno, &loader->i->err_key, &loader->i->err_val, loader->i->error_extra);
...@@ -435,6 +457,10 @@ int toku_loader_abort(DB_LOADER *loader) ...@@ -435,6 +457,10 @@ int toku_loader_abort(DB_LOADER *loader)
if (!(loader->i->loader_flags & LOADER_USE_PUTS) ) { if (!(loader->i->loader_flags & LOADER_USE_PUTS) ) {
r = toku_ft_loader_abort(loader->i->ft_loader, TRUE); r = toku_ft_loader_abort(loader->i->ft_loader, TRUE);
} }
if (txn) {
int rr = txn->abort(txn);
assert_zero(rr);
}
free_loader(loader); free_loader(loader);
return r; return r;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment