Commit b611db04 authored by Dave Wells's avatar Dave Wells Committed by Yoni Fogel

cleanup error handling in toku_brt_loader_open, toku_loader_open

git-svn-id: file:///svn/toku/tokudb@20331 c7de825b-a66e-492c-adef-691d508d4ae1
parent 27cbb4c7
...@@ -134,7 +134,10 @@ int brtloader_init_file_infos (struct file_infos *fi) { ...@@ -134,7 +134,10 @@ int brtloader_init_file_infos (struct file_infos *fi) {
fi->n_files_extant = 0; fi->n_files_extant = 0;
MALLOC_N(fi->n_files_limit, fi->file_infos); MALLOC_N(fi->n_files_limit, fi->file_infos);
if (fi->file_infos) return 0; if (fi->file_infos) return 0;
else return errno; else {
toku_pthread_mutex_destroy(&fi->lock);
return errno;
}
} }
void brtloader_fi_destroy (struct file_infos *fi, BOOL is_error) void brtloader_fi_destroy (struct file_infos *fi, BOOL is_error)
...@@ -335,7 +338,7 @@ static uint64_t memory_per_rowset (BRTLOADER bl) { ...@@ -335,7 +338,7 @@ static uint64_t memory_per_rowset (BRTLOADER bl) {
} }
// LAZY cleanup on error paths, ticket #2591 // lazy cleanup on error paths, ticket #2591
int toku_brt_loader_open (/* out */ BRTLOADER *blp, int toku_brt_loader_open (/* out */ BRTLOADER *blp,
CACHETABLE cachetable, CACHETABLE cachetable,
generate_row_for_put_func g, generate_row_for_put_func g,
...@@ -400,29 +403,52 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp, ...@@ -400,29 +403,52 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp,
MY_CALLOC_N(N, bl->fractal_threads_live); MY_CALLOC_N(N, bl->fractal_threads_live);
for (int i=0; i<N; i++) bl->fractal_threads_live[i] = FALSE; for (int i=0; i<N; i++) bl->fractal_threads_live[i] = FALSE;
{int r = brtloader_init_file_infos(&bl->file_infos); lazy_assert(r==0);} {
int r = brtloader_init_file_infos(&bl->file_infos);
if (r!=0) { brtloader_destroy(bl, TRUE); return r; }
}
SET_TO_MY_STRDUP(bl->temp_file_template, temp_file_template); SET_TO_MY_STRDUP(bl->temp_file_template, temp_file_template);
bl->n_rows = 0; bl->n_rows = 0;
bl->progress = 0; bl->progress = 0;
bl->rows = (struct rowset *) toku_malloc(N*sizeof(struct rowset)); if (bl->rows == NULL) return errno; MY_CALLOC_N(N, bl->rows);
bl->fs = (struct merge_fileset *) toku_malloc(N*sizeof(struct merge_fileset)); if (bl->rows == NULL) return errno; MY_CALLOC_N(N, bl->fs);
for(int i=0;i<N;i++) { for(int i=0;i<N;i++) {
{ int r = init_rowset(&bl->rows[i], memory_per_rowset(bl)); if (r!=0) return r; } {
int r = init_rowset(&bl->rows[i], memory_per_rowset(bl));
if (r!=0) {brtloader_destroy(bl, TRUE); return r; }
}
init_merge_fileset(&bl->fs[i]); init_merge_fileset(&bl->fs[i]);
} }
{ // note : currently brt_loader_init_error_callback always returns 0
brt_loader_init_error_callback(&bl->error_callback); int r = brt_loader_init_error_callback(&bl->error_callback);
if (r!=0) { brtloader_destroy(bl, TRUE); return r; }
}
brt_loader_init_poll_callback(&bl->poll_callback); brt_loader_init_poll_callback(&bl->poll_callback);
{ int r = init_rowset(&bl->primary_rowset, memory_per_rowset(bl)); if (r!=0) return r; } {
{ int r = queue_create(&bl->primary_rowset_queue, EXTRACTOR_QUEUE_DEPTH); if (r!=0) return r; } int r = init_rowset(&bl->primary_rowset, memory_per_rowset(bl));
if (r!=0) { brtloader_destroy(bl, TRUE); return r; }
}
{ int r = queue_create(&bl->primary_rowset_queue, EXTRACTOR_QUEUE_DEPTH);
if (r!=0) { brtloader_destroy(bl, TRUE); return r; }
}
//printf("%s:%d toku_pthread_create\n", __FILE__, __LINE__); //printf("%s:%d toku_pthread_create\n", __FILE__, __LINE__);
{ int r = toku_pthread_mutex_init(&bl->mutex, NULL); if (r != 0) return r; } {
{ int r = toku_pthread_create(&bl->extractor_thread, NULL, extractor_thread, (void*)bl); if (r!=0) return r; } int r = toku_pthread_mutex_init(&bl->mutex, NULL);
if (r != 0) { brtloader_destroy(bl, TRUE); return r; }
}
{
int r = toku_pthread_create(&bl->extractor_thread, NULL, extractor_thread, (void*)bl);
if (r!=0) {
toku_pthread_mutex_destroy(&bl->mutex);
brtloader_destroy(bl, TRUE);
return r;
}
}
bl->extractor_live = TRUE; bl->extractor_live = TRUE;
*blp = bl; *blp = bl;
......
...@@ -237,7 +237,12 @@ int toku_loader_create_loader(DB_ENV *env, ...@@ -237,7 +237,12 @@ int toku_loader_create_loader(DB_ENV *env,
compare_functions, compare_functions,
loader->i->temp_file_template, loader->i->temp_file_template,
load_lsn); load_lsn);
lazy_assert(r == 0); if ( r!=0 ) {
toku_free(new_inames_in_env);
toku_free(descriptors);
rval = r;
goto create_exit;
}
loader->i->inames_in_env = new_inames_in_env; loader->i->inames_in_env = new_inames_in_env;
toku_free(descriptors); toku_free(descriptors);
rval = 0; rval = 0;
...@@ -262,6 +267,7 @@ int toku_loader_set_poll_function(DB_LOADER *loader, ...@@ -262,6 +267,7 @@ int toku_loader_set_poll_function(DB_LOADER *loader,
int (*poll_func)(void *extra, float progress), int (*poll_func)(void *extra, float progress),
void *poll_extra) void *poll_extra)
{ {
invariant(loader != NULL);
loader->i->poll_func = poll_func; loader->i->poll_func = poll_func;
loader->i->poll_extra = poll_extra; loader->i->poll_extra = poll_extra;
return 0; return 0;
...@@ -271,6 +277,7 @@ int toku_loader_set_error_callback(DB_LOADER *loader, ...@@ -271,6 +277,7 @@ int toku_loader_set_error_callback(DB_LOADER *loader,
void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *extra), void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *extra),
void *error_extra) void *error_extra)
{ {
invariant(loader != NULL);
loader->i->error_callback = error_cb; loader->i->error_callback = error_cb;
loader->i->error_extra = error_extra; loader->i->error_extra = error_extra;
return 0; return 0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment