Commit c4405b71 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Make the ...dbufio tests better. Refs #2633. [t:2633]

git-svn-id: file:///svn/toku/tokudb@20534 c7de825b-a66e-492c-adef-691d508d4ae1
parent 2ef2752c
......@@ -392,9 +392,6 @@ int toku_brt_loader_internal_init (/* out */ BRTLOADER *blp,
BL_TRACE(blt_calibrate_done);
#endif
bl->panic = FALSE;
bl->panic_errno = 0;
bl->generate_row_for_put = g;
bl->cachetable = cachetable;
if (bl->cachetable)
......@@ -465,8 +462,6 @@ int toku_brt_loader_internal_init (/* out */ BRTLOADER *blp,
if (r != 0) { toku_brtloader_internal_destroy(bl, TRUE); return r; }
}
bl->extractor_live = TRUE;
*blp = bl;
return 0;
......@@ -509,7 +504,9 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp,
if (result==0) {
BRTLOADER bl = *blp;
int r = toku_pthread_create(&bl->extractor_thread, NULL, extractor_thread, (void*)bl);
if (r!=0) {
if (r==0) {
bl->extractor_live = TRUE;
} else {
result = r;
toku_pthread_mutex_destroy(&bl->mutex);
toku_brtloader_internal_destroy(bl, TRUE);
......@@ -519,17 +516,10 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp,
return result;
}
static void brt_loader_set_panic(BRTLOADER bl, int error) {
int r = toku_pthread_mutex_lock(&bl->mutex); resource_assert(r == 0);
BOOL is_panic = bl->panic;
if (!is_panic) {
bl->panic = TRUE;
bl->panic_errno = error;
}
r = toku_pthread_mutex_unlock(&bl->mutex); resource_assert(r == 0);
if (!is_panic) {
brt_loader_set_error(&bl->error_callback, error, NULL, 0, NULL, NULL);
}
static void brt_loader_set_panic(BRTLOADER bl, int error, BOOL callback) {
int r = brt_loader_set_error(&bl->error_callback, error, NULL, 0, NULL, NULL);
if (r == 0 && callback)
brt_loader_call_error_function(&bl->error_callback);
}
// One of the tests uses this.
......@@ -877,7 +867,7 @@ static void* extractor_thread (void *blv) {
{
r = process_primary_rows(bl, primary_rowset);
if (r)
brt_loader_set_panic(bl, r);
brt_loader_set_panic(bl, r, FALSE);
}
}
......@@ -885,7 +875,7 @@ static void* extractor_thread (void *blv) {
if (r == 0) {
r = finish_primary_rows(bl);
if (r)
brt_loader_set_panic(bl, r);
brt_loader_set_panic(bl, r, FALSE);
}
BL_TRACE(blt_extractor);
......@@ -1095,7 +1085,7 @@ int toku_brt_loader_put (BRTLOADER bl, DBT *key, DBT *val)
* Return value: 0 on success, an error number otherwise.
*/
{
if (bl->panic || brt_loader_get_error(&bl->error_callback))
if (brt_loader_get_error(&bl->error_callback))
return EINVAL; // previous panic
bl->n_rows++;
// return loader_write_row(key, val, bl->fprimary_rows, &bl->fprimary_offset, bl);
......@@ -1470,25 +1460,23 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q
DBT keys[n_sources];
DBT vals[n_sources];
u_int64_t dataoff[n_sources];
DBT zero; memset(&zero, 0, sizeof zero); zero.data=0; zero.flags=DB_DBT_REALLOC; zero.size=0; zero.ulen=0;
DBT zero = zero_dbt; zero.flags=DB_DBT_REALLOC;
for (int i=0; i<n_sources; i++) {
keys[i] = vals[i] = zero; // fill these all in with zero so we can delete stuff more reliably.
}
pqueue_t *pq;
pqueue_t *pq = NULL;
pqueue_node_t *pq_nodes = (pqueue_node_t *)toku_malloc(n_sources * sizeof(pqueue_node_t)); // freed in cleanup
invariant(pq_nodes != NULL);
if (pq_nodes == NULL) { result = errno; }
{
if (result==0) {
int r = pqueue_init(&pq, n_sources, which_db, dest_db, compare, &bl->error_callback);
lazy_assert(r == 0);
result = r;
if (r!=0) result = r;
}
u_int64_t n_rows = 0;
if ( result == 0 ) {
if (result==0) {
// load pqueue with first value from each source
for (int i=0; i<n_sources; i++) {
BL_TRACE_QUIET(blt_do_i);
......@@ -1560,7 +1548,7 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q
r = add_row(output_rowset, &keys[mini], &vals[mini]);
lazy_assert(r == 0);
if (r!=0) {
return = r;
result = r;
break;
}
} else {
......@@ -1568,7 +1556,7 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q
r = loader_write_row(&keys[mini], &vals[mini], dest_data, dest_stream, &dataoff[mini], bl);
lazy_assert(r==0);
if (r!=0) {
return = r;
result = r;
break;
}
}
......@@ -1640,7 +1628,7 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q
destroy_rowset(output_rowset);
toku_free(output_rowset);
}
pqueue_free(pq);
if (pq) { pqueue_free(pq); pq=NULL; }
toku_free(pq_nodes);
{
int r = update_progress(progress_allocation, bl, "end of merge_some_files");
......@@ -1758,7 +1746,7 @@ int merge_files (struct merge_fileset *fs,
}
if (result==0 && !to_queue) {
result = extend_fileset(bl, &next_file_set, &merged_data);
if (result!=0) { printf("%s:%d r=%d\n", __FILE__, __LINE__, result); break; }
if (result!=0) { break; }
}
if (result==0) {
......@@ -1805,7 +1793,7 @@ int merge_files (struct merge_fileset *fs,
if (result!=0) break;
}
if (result) brt_loader_set_panic(bl, result);
if (result) brt_loader_set_panic(bl, result, TRUE);
{
int r = queue_eof(output_q);
if (r!=0 && result==0) result = r;
......@@ -2191,7 +2179,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
BL_TRACE(blt_fractal_deq);
if (rr == EOF) break;
if (rr != 0) {
brt_loader_set_panic(bl, rr); // error after cilk sync
brt_loader_set_panic(bl, rr, TRUE); // error after cilk sync
break;
}
}
......@@ -2227,7 +2215,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
n_pivots++;
if ((r = bl_write_dbt(&key, pivots_stream, NULL, bl))) {
brt_loader_set_panic(bl, r); // error after cilk sync
brt_loader_set_panic(bl, r, TRUE); // error after cilk sync
if (result == 0) result = r;
break;
}
......@@ -2237,7 +2225,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
r = allocate_block(&out, &lblock);
if (r != 0) {
brt_loader_set_panic(bl, r);
brt_loader_set_panic(bl, r, TRUE);
if (result == 0) result = r;
break;
}
......@@ -2266,8 +2254,9 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
cilk_sync;
if (bl->panic) { // if there were any prior errors then exit
result = bl->panic_errno; goto error;
if (result == 0) {
result = brt_loader_get_error(&bl->error_callback); // if there were any prior errors then exit
if (result) goto error;
}
// We haven't paniced, so the sum should add up.
......@@ -2579,11 +2568,7 @@ int toku_brt_loader_abort(BRTLOADER bl, BOOL is_error)
}
int toku_brt_loader_get_error(BRTLOADER bl, int *error) {
*error = 0;
if (bl->panic)
*error = bl->panic_errno;
else if (bl->error_callback.error)
*error = bl->error_callback.error;
*error = brt_loader_get_error(&bl->error_callback);
return 0;
}
......@@ -2730,12 +2715,10 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr
//printf("Nodewrite %d (%.1f%%):", progress_allocation, 100.0*progress_allocation/PROGRESS_MAX);
if (result == 0) {
result = update_progress(progress_allocation, bl, "wrote node");
if (result != 0)
bl->user_said_stop = result;
}
if (result)
brt_loader_set_panic(bl, result);
brt_loader_set_panic(bl, result, TRUE);
}
CILK_END
......@@ -2988,7 +2971,7 @@ static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknu
blocknum_of_new_node = blocknum_of_new_node;
if (result != 0)
brt_loader_set_panic(bl, result);
brt_loader_set_panic(bl, result, TRUE);
}
static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, struct subtrees_info *sts, const struct descriptor *descriptor) {
......@@ -3090,8 +3073,8 @@ static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, s
cilk_sync;
if (result == 0 && bl->panic) // pick up write_nonleaf_node errors
result = bl->panic_errno;
if (result == 0) // pick up write_nonleaf_node errors
result = brt_loader_get_error(&bl->error_callback);
// Now set things up for the next iteration.
int r = brtloader_fi_close(&bl->file_infos, pivots_fidx); if (r != 0 && result == 0) result = r;
......
......@@ -15,6 +15,10 @@ C_BEGIN
static int event_count, event_count_trigger;
static void my_assert_hook (void) {
fprintf(stderr, "event_count=%d\n", event_count);
}
static void reset_event_counts(void) {
event_count = event_count_trigger = 0;
}
......@@ -76,19 +80,83 @@ static ssize_t bad_pwrite(int fd, const void * bp, size_t len, toku_off_t off) {
return r;
}
static int my_malloc_event = 0;
static FILE *
bad_fdopen(int fd, const char * mode) {
FILE * rval;
event_count++;
if (event_count_trigger == event_count) {
event_hit();
errno = EINVAL;
rval = NULL;
} else {
rval = fdopen(fd, mode);
}
return rval;
}
static FILE *
bad_fopen(const char *filename, const char *mode) {
FILE * rval;
event_count++;
if (event_count_trigger == event_count) {
event_hit();
errno = EINVAL;
rval = NULL;
} else {
rval = fopen(filename, mode);
}
return rval;
}
static int
bad_open(const char *path, int oflag, int mode) {
int rval;
event_count++;
if (event_count_trigger == event_count) {
event_hit();
errno = EINVAL;
rval = -1;
} else {
rval = open(path, oflag, mode);
}
return rval;
}
static int
bad_fclose(FILE * stream) {
int rval;
event_count++;
// Must close the stream even in the "error case" because otherwise there is no way to get the memory back.
rval = fclose(stream);
if (rval==0) {
if (event_count_trigger == event_count) {
errno = ENOSPC;
rval = -1;
}
}
return rval;
}
static int my_malloc_event = 1;
static int my_malloc_count = 0, my_big_malloc_count = 0;
static void reset_my_malloc_counts(void) {
my_malloc_count = my_big_malloc_count = 0;
}
size_t min_malloc_error_size = 0;
static void *my_malloc(size_t n) {
void *caller = __builtin_return_address(0);
if (!((void*)toku_malloc <= caller && caller <= (void*)toku_free))
goto skip;
my_malloc_count++;
if (n >= 64*1024) {
if (n >= min_malloc_error_size) {
my_big_malloc_count++;
if (my_malloc_event) {
caller = __builtin_return_address(1);
......@@ -194,11 +262,6 @@ static void test (const char *directory, BOOL is_error) {
assert(r==0);
}
toku_set_func_malloc(my_malloc);
brtloader_set_os_fwrite(bad_fwrite);
toku_set_func_write(bad_write);
toku_set_func_pwrite(bad_pwrite);
BRTLOADER bl;
DB **XMALLOC_N(N_DEST_DBS, dbs);
const struct descriptor **XMALLOC_N(N_DEST_DBS, descriptors);
......@@ -238,6 +301,7 @@ static void test (const char *directory, BOOL is_error) {
brt_loader_set_poll_function(&bl->poll_callback, loader_poll_callback, NULL);
QUEUE q;
{ int r = queue_create(&q, 1000); assert(r==0); }
DBUFIO_FILESET bfs;
......@@ -265,6 +329,17 @@ static void test (const char *directory, BOOL is_error) {
int r = toku_pthread_create(&consumer, NULL, consumer_thread, (void*)&cthunk);
assert(r==0);
}
toku_set_func_malloc(my_malloc);
brtloader_set_os_fwrite(bad_fwrite);
toku_set_func_write(bad_write);
toku_set_func_pwrite(bad_pwrite);
toku_set_func_fdopen(bad_fdopen);
toku_set_func_fopen(bad_fopen);
toku_set_func_open(bad_open);
toku_set_func_fclose(bad_fclose);
int result = 0;
{
int r = toku_merge_some_files_using_dbufio(TRUE, FIDX_NULL, q, N_SOURCES, bfs, src_fidxs, bl, 0, (DB*)NULL, compare_ints, 10000);
......@@ -279,6 +354,17 @@ static void test (const char *directory, BOOL is_error) {
int r = queue_eof(q);
assert(r==0);
}
toku_set_func_malloc(NULL);
brtloader_set_os_fwrite(NULL);
toku_set_func_write(NULL);
toku_set_func_pwrite(NULL);
toku_set_func_fdopen(NULL);
toku_set_func_fopen(NULL);
toku_set_func_open(NULL);
toku_set_func_fclose(NULL);
do_assert_hook = my_assert_hook;
{
void *vresult;
int r = toku_pthread_join(consumer, &vresult);
......@@ -323,16 +409,18 @@ static void test (const char *directory, BOOL is_error) {
static int usage(const char *progname, int n) {
fprintf(stderr, "Usage:\n %s [-v] [-q] [-r %d] [-s] [-m] directory\n", progname, n);
fprintf(stderr, "Usage:\n %s [-v] [-q] [-r %d] [-s] [-m] [-tend NEVENTS] directory\n", progname, n);
fprintf(stderr, "[-v] turn on verbose\n");
fprintf(stderr, "[-q] turn off verbose\n");
fprintf(stderr, "[-r %d] set the number of rows\n", n);
fprintf(stderr, "[-s] set the small loader size factor\n");
fprintf(stderr, "[-m] inject big malloc failures\n");
fprintf(stderr, "[-tend NEVENTS] stop testing after N events\n");
return 1;
}
int test_main (int argc, const char *argv[]) {
int tend = -1;
const char *progname=argv[0];
argc--; argv++;
while (argc>0) {
......@@ -349,6 +437,9 @@ int test_main (int argc, const char *argv[]) {
toku_brtloader_set_size_factor(1);
} else if (strcmp(argv[0],"-m") == 0) {
my_malloc_event = 1;
} else if (strcmp(argv[0],"-tend") == 0) {
argc--; argv++;
tend = atoi(argv[0]);
} else if (argc!=1) {
return usage(progname, N_RECORDS);
}
......@@ -381,6 +472,7 @@ int test_main (int argc, const char *argv[]) {
{
int event_limit = event_count;
if (tend>0 && tend<event_limit) event_limit=tend;
if (verbose) printf("event_limit=%d\n", event_limit);
for (int i = 1; i <= event_limit; i++) {
......
......@@ -681,4 +681,4 @@ loader-cleanup-test.tdbrun: $(patsubst %,loader-cleanup-test%.tdbrun, 1)
# ./loader-cleanup-test.tdb $(SUMMARIZE_CMD)
loader-cleanup-test1.tdbrun: loader-cleanup-test.tdb$(BINSUF)
$(VGRIND) ./loader-cleanup-test.tdb -r 1000 $(SUMMARIZE_CMD)
$(VGRIND) ./loader-cleanup-test.tdb -r 4000 -s $(SUMMARIZE_CMD)
......@@ -62,14 +62,15 @@ int abort_on_poll = 0; // set when test_loader() called with test_type of abort
DB_ENV *env;
enum {MAX_NAME=128};
enum {MAX_DBS=256};
int NUM_DBS=5;
int NUM_ROWS=100000;
#define default_NUM_DBS 5
int NUM_DBS=default_NUM_DBS;
#define default_NUM_ROWS 100000
int NUM_ROWS=default_NUM_ROWS;
//static int NUM_ROWS=50000000;
int CHECK_RESULTS=0;
int USE_PUTS=0;
int event_trigger_lo=0; // what event triggers to use?
int event_trigger_hi =0; // 0 and 0 mean none.
int assert_temp_files = 0;
enum {MAGIC=311};
......@@ -208,6 +209,7 @@ bad_fopen(const char *filename, const char *mode) {
fopen_count++;
event_count++;
if (fopen_count_trigger == fopen_count || event_count == event_count_trigger) {
printf("Doing fopen_count=%d event_count=%" PRId64 "\n", fopen_count, event_count);
errno = EINVAL;
rval = NULL;
} else {
......@@ -666,6 +668,7 @@ static void test_loader(enum test_type t, DB **dbs, int trigger)
printf(" done\n");
if (t == commit) {
event_count_nominal = event_count;
fwrite_count_nominal = fwrite_count; // capture how many fwrites were required for normal operation
write_count_nominal = write_count; // capture how many writes were required for normal operation
pwrite_count_nominal = pwrite_count; // capture how many pwrites were required for normal operation
......@@ -676,6 +679,7 @@ static void test_loader(enum test_type t, DB **dbs, int trigger)
if (verbose) {
printf("Nominal calls: function calls (number of calls for normal operation)\n");
printf(" events %" PRId64 "\n", event_count_nominal);
printf(" fwrite %d\n", fwrite_count_nominal);
printf(" write %d\n", write_count_nominal);
printf(" pwrite %d\n", pwrite_count_nominal);
......@@ -707,15 +711,20 @@ static void test_loader(enum test_type t, DB **dbs, int trigger)
}
int run_test_count = 0;
static void run_test(enum test_type t, int trigger)
{
run_test_count++;
int r;
if (verbose == 0) { // if no other ouput, give indication of progress
printf(".");
if (verbose>0) { // Don't print anything if verbose is 0. Use "+" to indicate progress if verbose is positive
printf("+");
fflush(stdout);
}
r = system("rm -rf " ENVDIR); CKERR(r);
r = toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
......@@ -826,11 +835,11 @@ static void run_all_tests(void) {
run_test(abort_txn, 0);
if (event_trigger_lo || event_trigger_hi) {
if (verbose) printf("\n\nDoing events %d-%d\n", event_trigger_lo, event_trigger_hi);
for (int i=event_trigger_lo; i<=event_trigger_hi; i++)
printf("\n\nDoing events %d-%d\n", event_trigger_lo, event_trigger_hi);
for (int i=event_trigger_lo; i<=event_trigger_hi; i++) {
run_test(event, i);
return;
}
} else {
enum test_type et[NUM_ERR_TYPES] = {enospc_f, enospc_w, enospc_p, einval_fdo, einval_fo, einval_o, enospc_fc};
int * nomp[NUM_ERR_TYPES] = {&fwrite_count_nominal, &write_count_nominal, &pwrite_count_nominal,
......@@ -841,6 +850,7 @@ static void run_all_tests(void) {
enum test_type t = et[j];
const char * write_type = err_type_str(t);
int nominal = *(nomp[j]);
printf("Test %s %d\n", write_type, nominal);
if (verbose)
printf("\nNow test with induced ENOSPC/EINVAL errors returned from %s, nominal = %d\n", write_type, nominal);
int i;
......@@ -866,20 +876,15 @@ static void run_all_tests(void) {
}
}
}
}
}
int test_main(int argc, char * const *argv) {
do_args(argc, argv);
printf("\nTesting loader with default size_factor\n");
assert_temp_files = 0;
run_all_tests();
printf("\nTesting loader with size_factor=1\n");
db_env_set_loader_size_factor(1);
assert_temp_files = 1;
run_all_tests();
printf("\nTotal event_trigger count is %" PRId64 ". (Use with -t N M, where 1<=N<=M<=%" PRId64 "\n", event_count, event_count);
printf("run_test_count=%d\n", run_test_count);
return 0;
}
......@@ -897,7 +902,14 @@ static void do_args(int argc, char * const argv[]) {
} else if (strcmp(argv[0], "-h")==0) {
resultcode=0;
do_usage:
fprintf(stderr, "Usage: -h -c -s -p -d <num_dbs> -r <num_rows>\n%s\n", cmd);
fprintf(stderr, "Usage: -h -c -s -p -d <num_dbs> -r <num_rows> -t <elow> <ehi> \n%s\n", cmd);
fprintf(stderr, " where -h print this message.\n");
fprintf(stderr, " -c check the results.\n");
fprintf(stderr, " -p LOADER_USE_PUTS.\n");
fprintf(stderr, " -s size_factor=1.\n");
fprintf(stderr, " -d <num_dbs> Number of indexes to create (default=%d).\n", default_NUM_DBS);
fprintf(stderr, " -r <num_rows> Number of rows to put (default=%d).\n", default_NUM_ROWS);
fprintf(stderr, " -t <elo> <ehi> Instrument only events <elo> to <ehi> (default: instrument all).\n");
exit(resultcode);
} else if (strcmp(argv[0], "-d")==0) {
argc--; argv++;
......@@ -921,6 +933,8 @@ static void do_args(int argc, char * const argv[]) {
event_trigger_lo = atoi(argv[0]);
argc--; argv++;
event_trigger_hi = atoi(argv[0]);
} else if (strcmp(argv[0], "-s")==0) {
db_env_set_loader_size_factor(1);
} else {
fprintf(stderr, "Unknown arg: %s\n", argv[0]);
resultcode=1;
......
......@@ -4,10 +4,9 @@
/* This version will complain if NDEBUG is set. */
/* It evaluates the argument and then calls a function toku_do_assert() which takes all the hits for the branches not taken. */
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
#include "c_dialects.h"
C_BEGIN
#ifdef NDEBUG
#error NDEBUG should not be set
......@@ -19,6 +18,8 @@ void toku_do_assert(int,const char*/*expr_as_string*/,const char */*fun*/,const
// Define GCOV if you want to get test-coverage information that ignores the assert statements.
// #define GCOV
extern void (*do_assert_hook)(void); // Set this to a function you want called after printing the assertion failure message but before calling abort(). By default this is NULL.
#if defined(GCOV) || TOKU_WINDOWS
#define assert(expr) toku_do_assert((expr) != 0, #expr, __FUNCTION__, __FILE__, __LINE__)
#else
......@@ -37,8 +38,6 @@ void toku_do_assert(int,const char*/*expr_as_string*/,const char */*fun*/,const
#define invariant(a) assert(a) // indicates a code invariant that must be true
#define resource_assert(a) assert(a) // indicates resource must be available, otherwise unrecoverable
#if defined(__cplusplus) || defined(__cilkplusplus)
}
#endif
C_END
#endif
......@@ -16,6 +16,8 @@
static void *backtrace_pointers[N_POINTERS];
#endif
void (*do_assert_hook)(void) = NULL;
void toku_do_assert_fail (const char* expr_as_string,const char *function,const char*file,int line)
{
fprintf(stderr, "%s:%d %s: Assertion `%s' failed\n", file,line,function,expr_as_string);
......@@ -44,6 +46,9 @@ void toku_do_assert_fail (const char* expr_as_string,const char *function,const
TerminateProcess(GetCurrentProcess(), 134); //Only way found so far to unconditionally
//Terminate the process
#endif
if (do_assert_hook) do_assert_hook();
abort();
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment