Commit 6807a2cb authored by Dave Wells's avatar Dave Wells Committed by Yoni Fogel

cleanup merge_some_files error path [t:2642]

git-svn-id: file:///svn/toku/tokudb@20533 c7de825b-a66e-492c-adef-691d508d4ae1
parent 6819006c
...@@ -121,17 +121,18 @@ static void cleanup_big_buffer(struct file_info *file) { ...@@ -121,17 +121,18 @@ static void cleanup_big_buffer(struct file_info *file) {
} }
int brtloader_init_file_infos (struct file_infos *fi) { int brtloader_init_file_infos (struct file_infos *fi) {
int result = 0;
int r = toku_pthread_mutex_init(&fi->lock, NULL); resource_assert(r == 0); int r = toku_pthread_mutex_init(&fi->lock, NULL); resource_assert(r == 0);
fi->n_files = 0; fi->n_files = 0;
fi->n_files_limit = 1; fi->n_files_limit = 1;
fi->n_files_open = 0; fi->n_files_open = 0;
fi->n_files_extant = 0; fi->n_files_extant = 0;
MALLOC_N(fi->n_files_limit, fi->file_infos); MALLOC_N(fi->n_files_limit, fi->file_infos);
if (fi->file_infos == NULL) { if (fi->file_infos) return 0;
result = errno; else {
} int result = errno;
toku_pthread_mutex_destroy(&fi->lock); // lazy no error check and maybe done elsewhere
return result; return result;
}
} }
void brtloader_fi_destroy (struct file_infos *fi, BOOL is_error) void brtloader_fi_destroy (struct file_infos *fi, BOOL is_error)
...@@ -391,6 +392,9 @@ int toku_brt_loader_internal_init (/* out */ BRTLOADER *blp, ...@@ -391,6 +392,9 @@ int toku_brt_loader_internal_init (/* out */ BRTLOADER *blp,
BL_TRACE(blt_calibrate_done); BL_TRACE(blt_calibrate_done);
#endif #endif
bl->panic = FALSE;
bl->panic_errno = 0;
bl->generate_row_for_put = g; bl->generate_row_for_put = g;
bl->cachetable = cachetable; bl->cachetable = cachetable;
if (bl->cachetable) if (bl->cachetable)
...@@ -461,6 +465,8 @@ int toku_brt_loader_internal_init (/* out */ BRTLOADER *blp, ...@@ -461,6 +465,8 @@ int toku_brt_loader_internal_init (/* out */ BRTLOADER *blp,
if (r != 0) { toku_brtloader_internal_destroy(bl, TRUE); return r; } if (r != 0) { toku_brtloader_internal_destroy(bl, TRUE); return r; }
} }
bl->extractor_live = TRUE;
*blp = bl; *blp = bl;
return 0; return 0;
...@@ -498,14 +504,12 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp, ...@@ -498,14 +504,12 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp,
bt_compare_functions, bt_compare_functions,
temp_file_template, temp_file_template,
load_lsn); load_lsn);
if (r != 0) result = r; if (r!=0) result = r;
} }
if (result == 0) { if (result==0) {
BRTLOADER bl = *blp; BRTLOADER bl = *blp;
int r = toku_pthread_create(&bl->extractor_thread, NULL, extractor_thread, (void*)bl); int r = toku_pthread_create(&bl->extractor_thread, NULL, extractor_thread, (void*)bl);
if (r == 0) { if (r!=0) {
bl->extractor_live = TRUE;
} else {
result = r; result = r;
toku_pthread_mutex_destroy(&bl->mutex); toku_pthread_mutex_destroy(&bl->mutex);
toku_brtloader_internal_destroy(bl, TRUE); toku_brtloader_internal_destroy(bl, TRUE);
...@@ -515,10 +519,17 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp, ...@@ -515,10 +519,17 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp,
return result; return result;
} }
static void brt_loader_set_panic(BRTLOADER bl, int error, BOOL callback) { static void brt_loader_set_panic(BRTLOADER bl, int error) {
int r = brt_loader_set_error(&bl->error_callback, error, NULL, 0, NULL, NULL); int r = toku_pthread_mutex_lock(&bl->mutex); resource_assert(r == 0);
if (r == 0 && callback) BOOL is_panic = bl->panic;
brt_loader_call_error_function(&bl->error_callback); if (!is_panic) {
bl->panic = TRUE;
bl->panic_errno = error;
}
r = toku_pthread_mutex_unlock(&bl->mutex); resource_assert(r == 0);
if (!is_panic) {
brt_loader_set_error(&bl->error_callback, error, NULL, 0, NULL, NULL);
}
} }
// One of the tests uses this. // One of the tests uses this.
...@@ -866,7 +877,7 @@ static void* extractor_thread (void *blv) { ...@@ -866,7 +877,7 @@ static void* extractor_thread (void *blv) {
{ {
r = process_primary_rows(bl, primary_rowset); r = process_primary_rows(bl, primary_rowset);
if (r) if (r)
brt_loader_set_panic(bl, r, FALSE); brt_loader_set_panic(bl, r);
} }
} }
...@@ -874,7 +885,7 @@ static void* extractor_thread (void *blv) { ...@@ -874,7 +885,7 @@ static void* extractor_thread (void *blv) {
if (r == 0) { if (r == 0) {
r = finish_primary_rows(bl); r = finish_primary_rows(bl);
if (r) if (r)
brt_loader_set_panic(bl, r, FALSE); brt_loader_set_panic(bl, r);
} }
BL_TRACE(blt_extractor); BL_TRACE(blt_extractor);
...@@ -1084,7 +1095,7 @@ int toku_brt_loader_put (BRTLOADER bl, DBT *key, DBT *val) ...@@ -1084,7 +1095,7 @@ int toku_brt_loader_put (BRTLOADER bl, DBT *key, DBT *val)
* Return value: 0 on success, an error number otherwise. * Return value: 0 on success, an error number otherwise.
*/ */
{ {
if (brt_loader_get_error(&bl->error_callback)) if (bl->panic || brt_loader_get_error(&bl->error_callback))
return EINVAL; // previous panic return EINVAL; // previous panic
bl->n_rows++; bl->n_rows++;
// return loader_write_row(key, val, bl->fprimary_rows, &bl->fprimary_offset, bl); // return loader_write_row(key, val, bl->fprimary_rows, &bl->fprimary_offset, bl);
...@@ -1466,16 +1477,18 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q ...@@ -1466,16 +1477,18 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q
} }
pqueue_t *pq; pqueue_t *pq;
pqueue_node_t *pq_nodes = (pqueue_node_t *)toku_malloc(n_sources * sizeof(pqueue_node_t)); pqueue_node_t *pq_nodes = (pqueue_node_t *)toku_malloc(n_sources * sizeof(pqueue_node_t)); // freed in cleanup
invariant(pq_nodes != NULL); invariant(pq_nodes != NULL);
{ {
int r = pqueue_init(&pq, n_sources, which_db, dest_db, compare, &bl->error_callback); int r = pqueue_init(&pq, n_sources, which_db, dest_db, compare, &bl->error_callback);
lazy_assert(r == 0); lazy_assert(r == 0);
if (r) return r; result = r;
} }
u_int64_t n_rows = 0; u_int64_t n_rows = 0;
if ( result == 0 ) {
// load pqueue with first value from each source // load pqueue with first value from each source
for (int i=0; i<n_sources; i++) { for (int i=0; i<n_sources; i++) {
BL_TRACE_QUIET(blt_do_i); BL_TRACE_QUIET(blt_do_i);
...@@ -1483,7 +1496,10 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q ...@@ -1483,7 +1496,10 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q
BL_TRACE_QUIET(blt_read_row); BL_TRACE_QUIET(blt_read_row);
if (r==EOF) continue; // if the file is empty, don't initialize the pqueue. if (r==EOF) continue; // if the file is empty, don't initialize the pqueue.
lazy_assert(r == 0); lazy_assert(r == 0);
if (r!=0) return r; if (r!=0) {
result = r;
break;
}
pq_nodes[i].key = &keys[i]; pq_nodes[i].key = &keys[i];
pq_nodes[i].val = &vals[i]; pq_nodes[i].val = &vals[i];
...@@ -1501,13 +1517,15 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q ...@@ -1501,13 +1517,15 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q
n_rows += bl->file_infos.file_infos[srcs_fidxs[i].idx].n_rows; n_rows += bl->file_infos.file_infos[srcs_fidxs[i].idx].n_rows;
{ int r2 = toku_pthread_mutex_unlock(&bl->file_infos.lock); resource_assert(r2==0); } { int r2 = toku_pthread_mutex_unlock(&bl->file_infos.lock); resource_assert(r2==0); }
} }
}
u_int64_t n_rows_done = 0; u_int64_t n_rows_done = 0;
struct rowset *output_rowset = NULL; struct rowset *output_rowset = NULL;
if (result==0 && to_q) { if (result==0 && to_q) {
XMALLOC(output_rowset); XMALLOC(output_rowset); // freed in cleanup
int r = init_rowset(output_rowset, memory_per_rowset(bl)); int r = init_rowset(output_rowset, memory_per_rowset(bl));
lazy_assert(r==0); lazy_assert(r==0);
if (r!=0) result = r;
} }
//printf(" n_rows=%ld\n", n_rows); //printf(" n_rows=%ld\n", n_rows);
...@@ -1531,18 +1549,28 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q ...@@ -1531,18 +1549,28 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q
r = queue_enq(q, (void*)output_rowset, 1, NULL); r = queue_enq(q, (void*)output_rowset, 1, NULL);
BL_TRACE(blt_fractal_enq); BL_TRACE(blt_fractal_enq);
lazy_assert(r==0); lazy_assert(r==0);
MALLOC(output_rowset); XMALLOC(output_rowset); // freed in cleanup
assert(output_rowset);
r = init_rowset(output_rowset, memory_per_rowset(bl)); r = init_rowset(output_rowset, memory_per_rowset(bl));
lazy_assert(r==0); lazy_assert(r==0);
if (r!=0) {
result = r;
break;
}
} }
r = add_row(output_rowset, &keys[mini], &vals[mini]); r = add_row(output_rowset, &keys[mini], &vals[mini]);
lazy_assert(r == 0); lazy_assert(r == 0);
if (r!=0) {
return = r;
break;
}
} else { } else {
// write it to the dest file // write it to the dest file
r = loader_write_row(&keys[mini], &vals[mini], dest_data, dest_stream, &dataoff[mini], bl); r = loader_write_row(&keys[mini], &vals[mini], dest_data, dest_stream, &dataoff[mini], bl);
lazy_assert(r==0); lazy_assert(r==0);
if (r!=0) return r; if (r!=0) {
return = r;
break;
}
} }
{ {
...@@ -1558,7 +1586,8 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q ...@@ -1558,7 +1586,8 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q
} else { } else {
printf("%s:%d returning\n", __FILE__, __LINE__); printf("%s:%d returning\n", __FILE__, __LINE__);
lazy_assert(0); lazy_assert(0);
return r; result = r;
break;
} }
} }
else { else {
...@@ -1595,7 +1624,10 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q ...@@ -1595,7 +1624,10 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q
BL_TRACE(blt_do_i); BL_TRACE(blt_do_i);
int r = queue_enq(q, (void*)output_rowset, 1, NULL); int r = queue_enq(q, (void*)output_rowset, 1, NULL);
BL_TRACE(blt_fractal_enq); BL_TRACE(blt_fractal_enq);
assert(r==0); // if (r!=0) result = r; lazy_assert(r==0); //
if (r!=0)
result = r;
else
output_rowset = NULL; output_rowset = NULL;
} }
...@@ -1773,8 +1805,7 @@ int merge_files (struct merge_fileset *fs, ...@@ -1773,8 +1805,7 @@ int merge_files (struct merge_fileset *fs,
if (result!=0) break; if (result!=0) break;
} }
if (result) if (result) brt_loader_set_panic(bl, result);
brt_loader_set_panic(bl, result, TRUE);
{ {
int r = queue_eof(output_q); int r = queue_eof(output_q);
if (r!=0 && result==0) result = r; if (r!=0 && result==0) result = r;
...@@ -2143,7 +2174,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl, ...@@ -2143,7 +2174,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
seek_align(&out); seek_align(&out);
int64_t lblock; int64_t lblock;
result = allocate_block(&out, &lblock); result = allocate_block(&out, &lblock);
invariant(result == 0); // can not fail since the first block is reserved above lazy_assert(result == 0); // can not fail since translations reserved above
struct leaf_buf *lbuf = start_leaf(&out, descriptor, lblock); struct leaf_buf *lbuf = start_leaf(&out, descriptor, lblock);
struct subtree_estimates est = zero_estimates; struct subtree_estimates est = zero_estimates;
est.exact = TRUE; est.exact = TRUE;
...@@ -2160,7 +2191,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl, ...@@ -2160,7 +2191,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
BL_TRACE(blt_fractal_deq); BL_TRACE(blt_fractal_deq);
if (rr == EOF) break; if (rr == EOF) break;
if (rr != 0) { if (rr != 0) {
brt_loader_set_panic(bl, rr, TRUE); // error after cilk sync brt_loader_set_panic(bl, rr); // error after cilk sync
break; break;
} }
} }
...@@ -2196,9 +2227,8 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl, ...@@ -2196,9 +2227,8 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
n_pivots++; n_pivots++;
if ((r = bl_write_dbt(&key, pivots_stream, NULL, bl))) { if ((r = bl_write_dbt(&key, pivots_stream, NULL, bl))) {
brt_loader_set_panic(bl, r, TRUE); // error after cilk sync brt_loader_set_panic(bl, r); // error after cilk sync
if (result == 0) if (result == 0) result = r;
result = r;
break; break;
} }
...@@ -2207,9 +2237,8 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl, ...@@ -2207,9 +2237,8 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
r = allocate_block(&out, &lblock); r = allocate_block(&out, &lblock);
if (r != 0) { if (r != 0) {
brt_loader_set_panic(bl, r, TRUE); brt_loader_set_panic(bl, r);
if (result == 0) if (result == 0) result = r;
result = r;
break; break;
} }
lbuf = start_leaf(&out, descriptor, lblock); lbuf = start_leaf(&out, descriptor, lblock);
...@@ -2237,9 +2266,8 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl, ...@@ -2237,9 +2266,8 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
cilk_sync; cilk_sync;
if (result == 0) { if (bl->panic) { // if there were any prior errors then exit
result = brt_loader_get_error(&bl->error_callback); // if there were any prior errors then exit result = bl->panic_errno; goto error;
if (result) goto error;
} }
// We haven't paniced, so the sum should add up. // We haven't paniced, so the sum should add up.
...@@ -2313,6 +2341,8 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl, ...@@ -2313,6 +2341,8 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
result = errno; goto error; result = errno; goto error;
} }
// Do we need to pay attention to user_said_stop? Or should the guy at the other end of the queue pay attention and send in an EOF.
error: error:
{ {
int rr = toku_os_close(fd); int rr = toku_os_close(fd);
...@@ -2549,7 +2579,11 @@ int toku_brt_loader_abort(BRTLOADER bl, BOOL is_error) ...@@ -2549,7 +2579,11 @@ int toku_brt_loader_abort(BRTLOADER bl, BOOL is_error)
} }
int toku_brt_loader_get_error(BRTLOADER bl, int *error) { int toku_brt_loader_get_error(BRTLOADER bl, int *error) {
*error = brt_loader_get_error(&bl->error_callback); *error = 0;
if (bl->panic)
*error = bl->panic_errno;
else if (bl->error_callback.error)
*error = bl->error_callback.error;
return 0; return 0;
} }
...@@ -2696,10 +2730,12 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr ...@@ -2696,10 +2730,12 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr
//printf("Nodewrite %d (%.1f%%):", progress_allocation, 100.0*progress_allocation/PROGRESS_MAX); //printf("Nodewrite %d (%.1f%%):", progress_allocation, 100.0*progress_allocation/PROGRESS_MAX);
if (result == 0) { if (result == 0) {
result = update_progress(progress_allocation, bl, "wrote node"); result = update_progress(progress_allocation, bl, "wrote node");
if (result != 0)
bl->user_said_stop = result;
} }
if (result) if (result)
brt_loader_set_panic(bl, result, TRUE); brt_loader_set_panic(bl, result);
} }
CILK_END CILK_END
...@@ -2952,7 +2988,7 @@ static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknu ...@@ -2952,7 +2988,7 @@ static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknu
blocknum_of_new_node = blocknum_of_new_node; blocknum_of_new_node = blocknum_of_new_node;
if (result != 0) if (result != 0)
brt_loader_set_panic(bl, result, TRUE); brt_loader_set_panic(bl, result);
} }
static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, struct subtrees_info *sts, const struct descriptor *descriptor) { static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, struct subtrees_info *sts, const struct descriptor *descriptor) {
...@@ -3054,8 +3090,8 @@ static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, s ...@@ -3054,8 +3090,8 @@ static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, s
cilk_sync; cilk_sync;
if (result == 0) // pick up write_nonleaf_node errors if (result == 0 && bl->panic) // pick up write_nonleaf_node errors
result = brt_loader_get_error(&bl->error_callback); result = bl->panic_errno;
// Now set things up for the next iteration. // Now set things up for the next iteration.
int r = brtloader_fi_close(&bl->file_infos, pivots_fidx); if (r != 0 && result == 0) result = r; int r = brtloader_fi_close(&bl->file_infos, pivots_fidx); if (r != 0 && result == 0) result = r;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment