Commit c9e32bbe authored by Dave Wells's avatar Dave Wells Committed by Yoni Fogel

add toku_brt_loader_abort, fix up error handling in toku_loader_*

git-svn-id: file:///svn/toku/tokudb@18804 c7de825b-a66e-492c-adef-691d508d4ae1
parent 3919e855
...@@ -998,6 +998,14 @@ int toku_brt_loader_close (BRTLOADER bl, ...@@ -998,6 +998,14 @@ int toku_brt_loader_close (BRTLOADER bl,
return result; return result;
} }
int toku_brt_loader_abort(BRTLOADER bl, BOOL is_error)
/* Effect : Abort the bulk loader, free brtloader resources */
{
int result = 0;
brtloader_destroy(bl, is_error);
return result;
}
struct dbuf { struct dbuf {
unsigned char *buf; unsigned char *buf;
int buflen; int buflen;
......
...@@ -19,6 +19,8 @@ int toku_brt_loader_put (BRTLOADER bl, DBT *key, DBT *val); ...@@ -19,6 +19,8 @@ int toku_brt_loader_put (BRTLOADER bl, DBT *key, DBT *val);
int toku_brt_loader_close (BRTLOADER bl, int toku_brt_loader_close (BRTLOADER bl,
void (*error_callback)(DB *, int which_db, int err, DBT *key, DBT *val, void *extra), void *error_callback_extra, void (*error_callback)(DB *, int which_db, int err, DBT *key, DBT *val, void *extra), void *error_callback_extra,
int (*poll_callback)(void *extra, float progress), void *poll_callback_extra); int (*poll_callback)(void *extra, float progress), void *poll_callback_extra);
int toku_brt_loader_abort(BRTLOADER bl,
BOOL is_error);
void brtloader_set_os_fwrite (size_t (*fwrite_fun)(const void*,size_t,size_t,FILE*)); void brtloader_set_os_fwrite (size_t (*fwrite_fun)(const void*,size_t,size_t,FILE*));
......
...@@ -54,21 +54,68 @@ struct __toku_loader_internal { ...@@ -54,21 +54,68 @@ struct __toku_loader_internal {
char **inames_in_env; /* [N] inames of new files to be created */ char **inames_in_env; /* [N] inames of new files to be created */
}; };
/*
* free_loader_resources() frees all of the resources associated with
* struct __toku_loader_internal
* assumes any previously freed items set the field pointer to NULL
*/
static void free_loader_resources(DB_LOADER *loader)
{
if ( loader->i ) {
for (int i=0; i<loader->i->N; i++) {
if (loader->i->ekeys &&
loader->i->ekeys[i].data) {
toku_free(loader->i->ekeys[i].data);
}
if (loader->i->evals &&
loader->i->evals[i].data) {
toku_free(loader->i->evals[i].data);
}
}
if (loader->i->ekeys) toku_free(loader->i->ekeys);
if (loader->i->evals) toku_free(loader->i->evals);
for (int i=0; i<loader->i->N; i++) {
if (loader->i->inames_in_env[i]) toku_free(loader->i->inames_in_env[i]);
}
if (loader->i->err_key.data) toku_free(loader->i->err_key.data);
if (loader->i->err_val.data) toku_free(loader->i->err_val.data);
if (loader->i->inames_in_env) toku_free(loader->i->inames_in_env);
if (loader->i->temp_file_template) toku_free(loader->i->temp_file_template);
if (loader->i->brt_loader) toku_free(loader->i->brt_loader);
// loader->i
toku_free(loader->i);
loader->i = NULL;
}
}
static void free_loader(DB_LOADER *loader)
{
if ( loader ) free_loader_resources(loader);
toku_free(loader);
}
// excuse the convolution of error messages - in the end returns
// 0 if empty
// DB_KEYEXIST if not empty
// DB_NOTFOUND if problem with DB
static int verify_empty(DB *db, DB_TXN *txn) static int verify_empty(DB *db, DB_TXN *txn)
{ {
int r; int r, r2;
DBC *cursor; DBC *cursor;
DBT k, v; DBT k, v;
toku_init_dbt(&k); toku_init_dbt(&k);
toku_init_dbt(&v); toku_init_dbt(&v);
r = db->cursor(db, txn, &cursor, 0); assert(r == 0); r = db->cursor(db, txn, &cursor, 0);
r = cursor->c_get(cursor, &k, &v, DB_NEXT); if ( r!=0 ) return DB_NOTFOUND;
{ r = cursor->c_get(cursor, &k, &v, DB_NEXT);
int r2 = cursor->c_close(cursor); r2 = cursor->c_close(cursor);
assert(r2==0); if ( r2!=0 ) return DB_NOTFOUND;
} if (r==DB_NOTFOUND) r = 0; // this is correct
if (r==DB_NOTFOUND) r = 0;
else if (r==0) r = DB_KEYEXIST; else if (r==0) r = DB_KEYEXIST;
return r; return r;
} }
...@@ -83,30 +130,35 @@ int toku_loader_create_loader(DB_ENV *env, ...@@ -83,30 +130,35 @@ int toku_loader_create_loader(DB_ENV *env,
uint32_t dbt_flags[N], uint32_t dbt_flags[N],
uint32_t loader_flags) uint32_t loader_flags)
{ {
*blp = NULL; // set later when created
DB_LOADER *loader; DB_LOADER *loader;
XCALLOC(loader); // init to all zeroes (thus initializing the error_callback and poll_func) XCALLOC(loader); // init to all zeroes (thus initializing the error_callback and poll_func)
XCALLOC(loader->i); XCALLOC(loader->i); // init to all zeroes (thus initializing all pointers to NULL)
loader->i->env = env; loader->i->env = env;
loader->i->txn = txn; loader->i->txn = txn;
loader->i->N = N; loader->i->N = N;
loader->i->src_db = src_db;
loader->i->dbs = dbs; loader->i->dbs = dbs;
loader->i->src_db = src_db;
loader->i->db_flags = db_flags; loader->i->db_flags = db_flags;
loader->i->dbt_flags = dbt_flags; loader->i->dbt_flags = dbt_flags;
loader->i->loader_flags = loader_flags; loader->i->loader_flags = loader_flags;
loader->i->temp_file_template = (char *)toku_malloc(MAX_FILE_SIZE); loader->i->temp_file_template = (char *)toku_malloc(MAX_FILE_SIZE);
int n = snprintf(loader->i->temp_file_template, MAX_FILE_SIZE, "%s/tempXXXXXX", env->i->dir); int n = snprintf(loader->i->temp_file_template, MAX_FILE_SIZE, "%s/tempXXXXXX", env->i->dir);
assert(n>0 && n<MAX_FILE_SIZE); if ( !(n>0 && n<MAX_FILE_SIZE) ) {
free_loader(loader);
return -1;
}
memset(&loader->i->err_key, 0, sizeof(loader->i->err_key)); memset(&loader->i->err_key, 0, sizeof(loader->i->err_key));
memset(&loader->i->err_val, 0, sizeof(loader->i->err_val)); memset(&loader->i->err_val, 0, sizeof(loader->i->err_val));
loader->i->err_i = 0; loader->i->err_i = 0;
loader->i->err_errno = 0; loader->i->err_errno = 0;
loader->set_poll_function = toku_loader_set_poll_function;
loader->set_error_callback = toku_loader_set_error_callback; loader->set_error_callback = toku_loader_set_error_callback;
loader->set_poll_function = toku_loader_set_poll_function;
loader->put = toku_loader_put; loader->put = toku_loader_put;
loader->close = toku_loader_close; loader->close = toku_loader_close;
loader->abort = toku_loader_abort; loader->abort = toku_loader_abort;
...@@ -115,9 +167,15 @@ int toku_loader_create_loader(DB_ENV *env, ...@@ -115,9 +167,15 @@ int toku_loader_create_loader(DB_ENV *env,
// lock tables and check empty // lock tables and check empty
for(int i=0;i<N;i++) { for(int i=0;i<N;i++) {
r = toku_db_pre_acquire_table_lock(dbs[i], txn); r = toku_db_pre_acquire_table_lock(dbs[i], txn);
assert(r==0); if ( r!=0 ) {
free_loader(loader);
return -1;
}
r = verify_empty(dbs[i], txn); r = verify_empty(dbs[i], txn);
assert(r==0); if ( r!=0 ) {
free_loader(loader);
return -1;
}
} }
brt_compare_func compare_functions[N]; brt_compare_func compare_functions[N];
...@@ -147,7 +205,11 @@ int toku_loader_create_loader(DB_ENV *env, ...@@ -147,7 +205,11 @@ int toku_loader_create_loader(DB_ENV *env,
loader->i->ekeys = NULL; loader->i->ekeys = NULL;
loader->i->evals = NULL; loader->i->evals = NULL;
r = locked_ydb_load_inames (env, txn, N, dbs, new_inames_in_env); r = locked_ydb_load_inames (env, txn, N, dbs, new_inames_in_env);
assert(r==0); if ( r!=0 ) {
toku_free(descriptors);
free_loader(loader);
return -1;
}
toku_brt_loader_open(&loader->i->brt_loader, toku_brt_loader_open(&loader->i->brt_loader,
loader->i->env->i->cachetable, loader->i->env->i->cachetable,
loader->i->env->i->generate_row_for_put, loader->i->env->i->generate_row_for_put,
...@@ -225,8 +287,10 @@ int toku_loader_put(DB_LOADER *loader, DBT *key, DBT *val) ...@@ -225,8 +287,10 @@ int toku_loader_put(DB_LOADER *loader, DBT *key, DBT *val)
loader->i->err_i = i; loader->i->err_i = i;
loader->i->err_errno = r; loader->i->err_errno = r;
return -1; // deliberately return content free value
// - must call error_callback to get error info
return -1;
} }
return 0; return 0;
} }
...@@ -238,63 +302,48 @@ int toku_loader_close(DB_LOADER *loader) ...@@ -238,63 +302,48 @@ int toku_loader_close(DB_LOADER *loader)
if ( loader->i->error_callback != NULL ) { if ( loader->i->error_callback != NULL ) {
loader->i->error_callback(loader->i->dbs[loader->i->err_i], loader->i->err_i, loader->i->err_errno, &loader->i->err_key, &loader->i->err_val, loader->i->error_extra); loader->i->error_callback(loader->i->dbs[loader->i->err_i], loader->i->err_i, loader->i->err_errno, &loader->i->err_key, &loader->i->err_val, loader->i->error_extra);
} }
toku_free(loader->i->err_key.data); r = toku_brt_loader_abort(loader->i->brt_loader, TRUE);
toku_free(loader->i->err_val.data); }
} else { // no error outstanding
if ( !(loader->i->loader_flags & LOADER_USE_PUTS ) ) {
if ( !(loader->i->loader_flags & LOADER_USE_PUTS ) ) { // use the bulk loader
r = toku_brt_loader_close(loader->i->brt_loader, // in case you've been looking - here is where the real work is done!
loader->i->error_callback, loader->i->error_extra, r = toku_brt_loader_close(loader->i->brt_loader,
loader->i->poll_func, loader->i->poll_extra); loader->i->error_callback, loader->i->error_extra,
if (r!=0) goto cleanup_and_return_r; loader->i->poll_func, loader->i->poll_extra);
if ( r==0 ) {
for (int i=0; i<loader->i->N; i++) { for (int i=0; i<loader->i->N; i++) {
toku_ydb_lock(); //Must hold ydb lock for dictionary_redirect. toku_ydb_lock(); //Must hold ydb lock for dictionary_redirect.
r = toku_dictionary_redirect(loader->i->inames_in_env[i], r = toku_dictionary_redirect(loader->i->inames_in_env[i],
loader->i->dbs[i]->i->brt, loader->i->dbs[i]->i->brt,
db_txn_struct_i(loader->i->txn)->tokutxn); db_txn_struct_i(loader->i->txn)->tokutxn);
assert(r==0); toku_ydb_unlock();
toku_ydb_unlock(); if ( r!=0 ) {
} printf("NEED TO DO CLEANUP HERE\n");
cleanup_and_return_r: assert(0);
for (int i=0; i<loader->i->N; i++) { // int rr = toku_brt_loader_abort(loader->i->brt_loader, TRUE);
toku_free(loader->i->inames_in_env[i]); break;
} }
toku_free(loader->i->inames_in_env); }
toku_free(loader->i->brt_loader);
// TODO: release table locks
} else {
// (loader->i->loader_flags & LOADER_USE_PUTS);
int num_dbts = loader->i->N;
for (int i=0; i<num_dbts; i++) {
if (loader->i->ekeys &&
loader->i->ekeys[i].flags == DB_DBT_REALLOC &&
loader->i->ekeys[i].data) {
toku_free(loader->i->ekeys[i].data);
}
if (loader->i->evals &&
loader->i->evals[i].flags == DB_DBT_REALLOC &&
loader->i->evals[i].data) {
toku_free(loader->i->evals[i].data);
} }
} }
if (loader->i->ekeys) toku_free(loader->i->ekeys);
if (loader->i->evals) toku_free(loader->i->evals);
} }
toku_free(loader->i->temp_file_template); free_loader(loader);
toku_free(loader->i);
toku_free(loader);
return r; return r;
} }
int toku_loader_abort(DB_LOADER *loader) int toku_loader_abort(DB_LOADER *loader)
{ {
if ( loader->i->loader_flags & LOADER_USE_PUTS ) { int r=0;
return toku_loader_close(loader); if ( loader->i->err_errno != 0 ) {
if ( loader->i->error_callback != NULL ) {
loader->i->error_callback(loader->i->dbs[loader->i->err_i], loader->i->err_i, loader->i->err_errno, &loader->i->err_key, &loader->i->err_val, loader->i->error_extra);
}
} }
else {
loader = loader; if ( !(loader->i->loader_flags & LOADER_USE_PUTS) ) {
assert(1); // not implemented r = toku_brt_loader_abort(loader->i->brt_loader, TRUE);
return 0;
} }
free_loader(loader);
return r;
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment