Commit 1bba1e63 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

closes[t:2644] combine the brtloader panic and error callback state

git-svn-id: file:///svn/toku/tokudb@20526 c7de825b-a66e-492c-adef-691d508d4ae1
parent 4d0f2e57
......@@ -84,14 +84,14 @@ void brt_loader_set_poll_function(brtloader_poll_callback, brt_loader_poll_func
int brt_loader_call_poll_function(brtloader_poll_callback, float progress);
struct error_callback_s {
int error;
brt_loader_error_func error_callback;
void *extra;
int did_callback;
int error;
DB *db;
int which_db;
DBT key;
DBT val;
BOOL did_callback;
toku_pthread_mutex_t mutex;
};
typedef struct error_callback_s *brtloader_error_callback;
......@@ -111,8 +111,9 @@ int brt_loader_call_error_function(brtloader_error_callback);
int brt_loader_set_error_and_callback(brtloader_error_callback, int error, DB *db, int which_db, DBT *key, DBT *val);
struct brtloader_s {
BOOL panic;
int panic_errno;
// These two are set in the close function, and used while running close
struct error_callback_s error_callback;
struct poll_callback_s poll_callback;
generate_row_for_put_func generate_row_for_put;
brt_compare_func *bt_compare_funs;
......@@ -148,11 +149,6 @@ struct brtloader_s {
int progress; // Progress runs from 0 to PROGRESS_MAX. When we call the poll function we convert to a float from 0.0 to 1.0
// We use an integer so that we can add to the progress using a fetch-and-add instruction.
// These two are set in the close function, and used while running close
struct error_callback_s error_callback;
struct poll_callback_s poll_callback;
int user_said_stop; // 0 if the poll_function always returned zero. If it ever returns nonzero, then store that value here.
LSN load_lsn; //LSN of the fsynced 'load' log entry. Write this LSN (as checkpoint_lsn) in brt headers made by this loader.
QUEUE *fractal_queues; // an array of work queues, one for each secondary index.
......@@ -234,6 +230,13 @@ void toku_brtloader_internal_destroy (BRTLOADER bl, BOOL is_error);
enum { disksize_row_overhead = 9 }; // how much overhead for a row in the fractal tree
// For test purposes only. (In production, the rowset size is determined by negotation with the cachetable for some memory. See #2613.)
uint64_t toku_brtloader_get_rowset_budget_for_testing (void);
int toku_brt_loader_finish_extractor(BRTLOADER bl);
int toku_brt_loader_get_error(BRTLOADER bl, int *loader_errno);
C_END
#endif
......@@ -121,18 +121,17 @@ static void cleanup_big_buffer(struct file_info *file) {
}
int brtloader_init_file_infos (struct file_infos *fi) {
int result = 0;
int r = toku_pthread_mutex_init(&fi->lock, NULL); resource_assert(r == 0);
fi->n_files = 0;
fi->n_files_limit = 1;
fi->n_files_open = 0;
fi->n_files_extant = 0;
MALLOC_N(fi->n_files_limit, fi->file_infos);
if (fi->file_infos) return 0;
else {
int result = errno;
toku_pthread_mutex_destroy(&fi->lock); // lazy no error check and maybe done elsewhere
return result;
if (fi->file_infos == NULL) {
result = errno;
}
return result;
}
void brtloader_fi_destroy (struct file_infos *fi, BOOL is_error)
......@@ -392,9 +391,6 @@ int toku_brt_loader_internal_init (/* out */ BRTLOADER *blp,
BL_TRACE(blt_calibrate_done);
#endif
bl->panic = FALSE;
bl->panic_errno = 0;
bl->generate_row_for_put = g;
bl->cachetable = cachetable;
if (bl->cachetable)
......@@ -465,8 +461,6 @@ int toku_brt_loader_internal_init (/* out */ BRTLOADER *blp,
if (r != 0) { toku_brtloader_internal_destroy(bl, TRUE); return r; }
}
bl->extractor_live = TRUE;
*blp = bl;
return 0;
......@@ -504,12 +498,14 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp,
bt_compare_functions,
temp_file_template,
load_lsn);
if (r!=0) result = r;
if (r != 0) result = r;
}
if (result==0) {
if (result == 0) {
BRTLOADER bl = *blp;
int r = toku_pthread_create(&bl->extractor_thread, NULL, extractor_thread, (void*)bl);
if (r!=0) {
if (r == 0) {
bl->extractor_live = TRUE;
} else {
result = r;
toku_pthread_mutex_destroy(&bl->mutex);
toku_brtloader_internal_destroy(bl, TRUE);
......@@ -519,17 +515,10 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp,
return result;
}
static void brt_loader_set_panic(BRTLOADER bl, int error) {
int r = toku_pthread_mutex_lock(&bl->mutex); resource_assert(r == 0);
BOOL is_panic = bl->panic;
if (!is_panic) {
bl->panic = TRUE;
bl->panic_errno = error;
}
r = toku_pthread_mutex_unlock(&bl->mutex); resource_assert(r == 0);
if (!is_panic) {
brt_loader_set_error(&bl->error_callback, error, NULL, 0, NULL, NULL);
}
static void brt_loader_set_panic(BRTLOADER bl, int error, BOOL callback) {
int r = brt_loader_set_error(&bl->error_callback, error, NULL, 0, NULL, NULL);
if (r == 0 && callback)
brt_loader_call_error_function(&bl->error_callback);
}
// One of the tests uses this.
......@@ -877,7 +866,7 @@ static void* extractor_thread (void *blv) {
{
r = process_primary_rows(bl, primary_rowset);
if (r)
brt_loader_set_panic(bl, r);
brt_loader_set_panic(bl, r, FALSE);
}
}
......@@ -885,7 +874,7 @@ static void* extractor_thread (void *blv) {
if (r == 0) {
r = finish_primary_rows(bl);
if (r)
brt_loader_set_panic(bl, r);
brt_loader_set_panic(bl, r, FALSE);
}
BL_TRACE(blt_extractor);
......@@ -1095,7 +1084,7 @@ int toku_brt_loader_put (BRTLOADER bl, DBT *key, DBT *val)
* Return value: 0 on success, an error number otherwise.
*/
{
if (bl->panic || brt_loader_get_error(&bl->error_callback))
if (brt_loader_get_error(&bl->error_callback))
return EINVAL; // previous panic
bl->n_rows++;
// return loader_write_row(key, val, bl->fprimary_rows, &bl->fprimary_offset, bl);
......@@ -1784,7 +1773,8 @@ int merge_files (struct merge_fileset *fs,
if (result!=0) break;
}
if (result) brt_loader_set_panic(bl, result);
if (result)
brt_loader_set_panic(bl, result, TRUE);
{
int r = queue_eof(output_q);
if (r!=0 && result==0) result = r;
......@@ -2153,7 +2143,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
seek_align(&out);
int64_t lblock;
result = allocate_block(&out, &lblock);
lazy_assert(result == 0); // can not fail since translations reserved above
invariant(result == 0); // can not fail since the first block is reserved above
struct leaf_buf *lbuf = start_leaf(&out, descriptor, lblock);
struct subtree_estimates est = zero_estimates;
est.exact = TRUE;
......@@ -2170,7 +2160,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
BL_TRACE(blt_fractal_deq);
if (rr == EOF) break;
if (rr != 0) {
brt_loader_set_panic(bl, rr); // error after cilk sync
brt_loader_set_panic(bl, rr, TRUE); // error after cilk sync
break;
}
}
......@@ -2206,8 +2196,9 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
n_pivots++;
if ((r = bl_write_dbt(&key, pivots_stream, NULL, bl))) {
brt_loader_set_panic(bl, r); // error after cilk sync
if (result == 0) result = r;
brt_loader_set_panic(bl, r, TRUE); // error after cilk sync
if (result == 0)
result = r;
break;
}
......@@ -2216,8 +2207,9 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
r = allocate_block(&out, &lblock);
if (r != 0) {
brt_loader_set_panic(bl, r);
if (result == 0) result = r;
brt_loader_set_panic(bl, r, TRUE);
if (result == 0)
result = r;
break;
}
lbuf = start_leaf(&out, descriptor, lblock);
......@@ -2245,8 +2237,9 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
cilk_sync;
if (bl->panic) { // if there were any prior errors then exit
result = bl->panic_errno; goto error;
if (result == 0) {
result = brt_loader_get_error(&bl->error_callback); // if there were any prior errors then exit
if (result) goto error;
}
// We haven't paniced, so the sum should add up.
......@@ -2320,8 +2313,6 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
result = errno; goto error;
}
// Do we need to pay attention to user_said_stop? Or should the guy at the other end of the queue pay attention and send in an EOF.
error:
{
int rr = toku_os_close(fd);
......@@ -2558,11 +2549,7 @@ int toku_brt_loader_abort(BRTLOADER bl, BOOL is_error)
}
int toku_brt_loader_get_error(BRTLOADER bl, int *error) {
*error = 0;
if (bl->panic)
*error = bl->panic_errno;
else if (bl->error_callback.error)
*error = bl->error_callback.error;
*error = brt_loader_get_error(&bl->error_callback);
return 0;
}
......@@ -2709,12 +2696,10 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr
//printf("Nodewrite %d (%.1f%%):", progress_allocation, 100.0*progress_allocation/PROGRESS_MAX);
if (result == 0) {
result = update_progress(progress_allocation, bl, "wrote node");
if (result != 0)
bl->user_said_stop = result;
}
if (result)
brt_loader_set_panic(bl, result);
brt_loader_set_panic(bl, result, TRUE);
}
CILK_END
......@@ -2967,7 +2952,7 @@ static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknu
blocknum_of_new_node = blocknum_of_new_node;
if (result != 0)
brt_loader_set_panic(bl, result);
brt_loader_set_panic(bl, result, TRUE);
}
static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, struct subtrees_info *sts, const struct descriptor *descriptor) {
......@@ -3069,8 +3054,8 @@ static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, s
cilk_sync;
if (result == 0 && bl->panic) // pick up write_nonleaf_node errors
result = bl->panic_errno;
if (result == 0) // pick up write_nonleaf_node errors
result = brt_loader_get_error(&bl->error_callback);
// Now set things up for the next iteration.
int r = brtloader_fi_close(&bl->file_infos, pivots_fidx); if (r != 0 && result == 0) result = r;
......
......@@ -39,17 +39,10 @@ int toku_brt_loader_close (BRTLOADER bl,
int toku_brt_loader_abort(BRTLOADER bl,
BOOL is_error);
void brtloader_set_os_fwrite (size_t (*fwrite_fun)(const void*,size_t,size_t,FILE*));
// For test purposes only
void toku_brtloader_set_size_factor (uint32_t factor);
// For test purposes only. (In production, the rowset size is determined by negotation with the cachetable for some memory. See #2613.)
uint64_t toku_brtloader_get_rowset_budget_for_testing (void);
int toku_brt_loader_finish_extractor(BRTLOADER bl);
int toku_brt_loader_get_error(BRTLOADER bl, int *loader_errno);
void brtloader_set_os_fwrite (size_t (*fwrite_fun)(const void*,size_t,size_t,FILE*));
C_END
......
......@@ -163,7 +163,6 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_
DB *dest_db = NULL;
struct brtloader_s bl = {
.panic = 0,
.temp_file_template = template,
.reserved_memory = 512*1024*1024,
};
......@@ -244,6 +243,7 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_
brtloader_set_os_fwrite(bad_fwrite);
toku_set_func_write(bad_write);
toku_set_func_pwrite(bad_pwrite);
brt_loader_set_error_function(&bl.error_callback, NULL, NULL);
brt_loader_set_poll_function(&bl.poll_callback, loader_poll_callback, NULL);
r = toku_loader_write_brt_from_q_in_C(&bl, &desc, fd, 1000, q2, size_est);
......
......@@ -83,7 +83,6 @@ static void test_write_dbfile (char *template, int n, char *output_name) {
DB *dest_db = NULL;
struct brtloader_s bl = {
.panic = 0,
.temp_file_template = template,
.reserved_memory = 512*1024*1024,
};
......
......@@ -169,8 +169,7 @@ static void test_mergesort_row_array (void) {
}
static void test_read_write_rows (char *template) {
struct brtloader_s bl = {.panic = 0,
.temp_file_template = template};
struct brtloader_s bl = { .temp_file_template = template};
int r = brtloader_init_file_infos(&bl.file_infos);
CKERR(r);
FIDX file;
......@@ -280,7 +279,6 @@ static void verify_dbfile(int n, int sorted_keys[], const char *sorted_vals[], c
static void test_merge_files (const char *template, const char *output_name) {
DB *dest_db = NULL;
struct brtloader_s bl = {
.panic = 0,
.temp_file_template = template,
.reserved_memory = 512*1024*1024,
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment