Commit d009c784 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

closes[t:2626] fix the brtloader add_row realloc error recovery

git-svn-id: file:///svn/toku/tokudb@20385 c7de825b-a66e-492c-adef-691d508d4ae1
parent 9b577076
......@@ -56,7 +56,7 @@ struct rowset {
int init_rowset (struct rowset *rows, uint64_t memory_budget);
void destroy_rowset (struct rowset *rows);
void add_row (struct rowset *rows, DBT *key, DBT *val);
int add_row (struct rowset *rows, DBT *key, DBT *val);
int loader_write_row(DBT *key, DBT *val, FIDX data, FILE*, u_int64_t *dataoff, BRTLOADER bl);
int loader_read_row (FILE *f, DBT *key, DBT *val);
......
......@@ -753,12 +753,21 @@ static int row_wont_fit (struct rowset *rows, size_t size)
return (rows->memory_budget < memory_in_use + size);
}
void add_row (struct rowset *rows, DBT *key, DBT *val)
int add_row (struct rowset *rows, DBT *key, DBT *val)
/* Effect: add a row to a collection. */
{
int result = 0;
if (rows->n_rows >= rows->n_rows_limit) {
struct row *old_rows = rows->rows;
size_t old_n_rows_limit = rows->n_rows_limit;
rows->n_rows_limit *= 2;
REALLOC_N(rows->n_rows_limit, rows->rows);
if (rows->rows == NULL) {
result = errno;
rows->rows = old_rows;
rows->n_rows_limit = old_n_rows_limit;
return result;
}
}
size_t off = rows->n_bytes;
size_t next_off = off + key->size + val->size;
......@@ -768,15 +777,24 @@ void add_row (struct rowset *rows, DBT *key, DBT *val)
rows->rows[rows->n_rows++] = newrow;
if (next_off > rows->n_bytes_limit) {
size_t old_n_bytes_limit = rows->n_bytes_limit;
while (next_off > rows->n_bytes_limit) {
rows->n_bytes_limit = rows->n_bytes_limit*2;
}
invariant(next_off <= rows->n_bytes_limit);
char *old_data = rows->data;
REALLOC_N(rows->n_bytes_limit, rows->data);
if (rows->data == NULL) {
result = errno;
rows->data = old_data;
rows->n_bytes_limit =- old_n_bytes_limit;
return result;
}
}
memcpy(rows->data+off, key->data, key->size);
memcpy(rows->data+off+key->size, val->data, val->size);
rows->n_bytes = next_off;
return result;
}
static int process_primary_rows (BRTLOADER bl, struct rowset *primary_rowset);
......@@ -866,8 +884,9 @@ static int loader_do_put(BRTLOADER bl,
DBT *pkey,
DBT *pval)
{
add_row(&bl->primary_rowset, pkey, pval);
if (row_wont_fit(&bl->primary_rowset, 0)) {
int result;
result = add_row(&bl->primary_rowset, pkey, pval);
if (result == 0 && row_wont_fit(&bl->primary_rowset, 0)) {
// queue the rows for further processing by the extractor thread.
//printf("%s:%d please extract %ld\n", __FILE__, __LINE__, bl->primary_rowset.n_rows);
BL_TRACE(blt_do_put);
......@@ -876,10 +895,11 @@ static int loader_do_put(BRTLOADER bl,
{
int r = init_rowset(&bl->primary_rowset, memory_per_rowset(bl));
// bl->primary_rowset will get destroyed by toku_brt_loader_abort
if ( r != 0 ) return r;
if (r != 0)
result = r;
}
}
return 0;
return result;
}
static int finish_extractor (BRTLOADER bl) {
......@@ -921,6 +941,13 @@ static DBT make_dbt (void *data, u_int32_t size) {
return result;
}
// gcc 4.1 does not like f&a
#if defined(__cilkplusplus)
#define inc_error_count __sync_fetch_and_add(&error_count, 1)
#else
#define inc_error_count error_count++
#endif
CILK_BEGIN
static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_rowset)
......@@ -963,11 +990,7 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r
int r = bl->generate_row_for_put(bl->dbs[i], bl->src_db, &skey, &sval, &pkey, &pval, NULL);
if (r != 0) {
error_codes[i] = r;
#if defined(__cilkplusplus)
__sync_fetch_and_add(&error_count, 1);
#else
error_count++;
#endif
inc_error_count;
break;
}
}
......@@ -980,17 +1003,18 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r
// If we do spawn this, then we must account for the additional storage in the memory_per_rowset() function.
BL_TRACE(blt_sort_and_write_rows);
init_rowset(rows, memory_per_rowset(bl)); // we passed the contents of rows to sort_and_write_rows.
if (r!=0) {
if (r != 0) {
error_codes[i] = r;
#if defined(__cilkplusplus)
__sync_fetch_and_add(&error_count, 1);
#else
error_count++;
#endif
inc_error_count;
break;
}
}
add_row(rows, &skey, &sval);
int r = add_row(rows, &skey, &sval);
if (r != 0) {
error_codes[i] = r;
inc_error_count;
break;
}
//flags==0 means generate_row_for_put callback changed it
//(and freed any memory necessary to do so) so that values are now stored
......@@ -1504,13 +1528,14 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q
BL_TRACE(blt_do_i);
r = queue_enq(q, (void*)output_rowset, 1, NULL);
BL_TRACE(blt_fractal_enq);
assert(r==0);
lazy_assert(r==0);
MALLOC(output_rowset);
assert(output_rowset);
r = init_rowset(output_rowset, memory_per_rowset(bl));
assert(r==0);
lazy_assert(r==0);
}
add_row(output_rowset, &keys[mini], &vals[mini]);
r = add_row(output_rowset, &keys[mini], &vals[mini]);
lazy_assert(r == 0);
} else {
// write it to the dest file
r = loader_write_row(&keys[mini], &vals[mini], dest_data, dest_stream, &dataoff[mini], bl);
......
......@@ -98,13 +98,13 @@ check_brtloader-test$(BINSUF): EXTRA_ARGS=dir.brtloader-test
check_brtloader-test-bad-generate$(BINSUF): EXTRA_ARGS=dir.brtloader-test-bad-generate
check_brtloader-test-extractor-errors$(BINSUF): EXTRA_ARGS=-r 1 dir.brtloader-test-extractor-errors
check_brtloader-test-extractor-errors$(BINSUF): EXTRA_ARGS=-s -f -m -r 1 dir.brtloader-test-extractor-errors
check_brtloader-test-open$(BINSUF): EXTRA_ARGS=dir.brtloader-test-open
check_brtloader-test-writer$(BINSUF): EXTRA_ARGS=-r 1000000 dir.brtloader-test-writer
check_brtloader-test-writer-errors$(BINSUF): EXTRA_ARGS=-s -r 10000 dir.brtloader-test-writer-errors
check_brtloader-test-writer-errors$(BINSUF): EXTRA_ARGS=-s -f -m -u -r 10000 dir.brtloader-test-writer-errors
brtloader-%$(BINSUF): brtloader-%.$(OEXT)
ifeq ($(BRTLOADER),cilk)
......
......@@ -25,10 +25,11 @@ static void reset_event_counts(void) {
static void event_hit(void) {
}
static int do_write_errors = 0;
static size_t bad_fwrite (const void *ptr, size_t size, size_t nmemb, FILE *stream) {
event_count++;
size_t r;
if (event_count_trigger == event_count) {
if (do_write_errors && event_count_trigger == ++event_count) {
event_hit();
errno = ENOSPC;
r = -1;
......@@ -43,8 +44,7 @@ static size_t bad_fwrite (const void *ptr, size_t size, size_t nmemb, FILE *stre
static ssize_t bad_write(int fd, const void * bp, size_t len) {
ssize_t r;
event_count++;
if (event_count_trigger == event_count) {
if (do_write_errors && event_count_trigger == ++event_count) {
event_hit();
errno = ENOSPC;
r = -1;
......@@ -56,8 +56,7 @@ static ssize_t bad_write(int fd, const void * bp, size_t len) {
static ssize_t bad_pwrite(int fd, const void * bp, size_t len, toku_off_t off) {
ssize_t r;
event_count++;
if (event_count_trigger == event_count) {
if (do_write_errors && event_count_trigger == ++event_count) {
event_hit();
errno = ENOSPC;
r = -1;
......@@ -67,11 +66,13 @@ static ssize_t bad_pwrite(int fd, const void * bp, size_t len, toku_off_t off) {
return r;
}
static int my_malloc_event = 0;
static int do_malloc_errors = 0;
static int my_malloc_count = 0, my_big_malloc_count = 0;
static int my_realloc_count = 0, my_big_realloc_count = 0;
static void reset_my_malloc_counts(void) {
my_malloc_count = my_big_malloc_count = 0;
my_realloc_count = my_big_realloc_count = 0;
}
static void *my_malloc(size_t n) {
......@@ -81,7 +82,7 @@ static void *my_malloc(size_t n) {
my_malloc_count++;
if (n >= 64*1024) {
my_big_malloc_count++;
if (my_malloc_event) {
if (do_malloc_errors) {
caller = __builtin_return_address(1);
if ((void*)toku_xmalloc <= caller && caller <= (void*)toku_malloc_report)
goto skip;
......@@ -97,6 +98,29 @@ static void *my_malloc(size_t n) {
return malloc(n);
}
static void *my_realloc(void *p, size_t n) {
void *caller = __builtin_return_address(0);
if (!((void*)toku_realloc <= caller && caller <= (void*)toku_free))
goto skip;
my_realloc_count++;
if (n >= 64*1024) {
my_big_realloc_count++;
if (do_malloc_errors) {
caller = __builtin_return_address(1);
if ((void*)toku_xrealloc <= caller && caller <= (void*)toku_malloc_report)
goto skip;
event_count++;
if (event_count == event_count_trigger) {
event_hit();
errno = ENOMEM;
return NULL;
}
}
}
skip:
return realloc(p, n);
}
static void copy_dbt(DBT *dest, const DBT *src) {
assert(dest->flags & DB_DBT_REALLOC);
dest->data = toku_realloc(dest->data, src->size);
......@@ -170,6 +194,7 @@ static void test_extractor(int nrows, int nrowsets, BOOL expect_fail) {
// setup error injection
toku_set_func_malloc(my_malloc);
toku_set_func_realloc(my_realloc);
brtloader_set_os_fwrite(bad_fwrite);
toku_set_func_write(bad_write);
toku_set_func_pwrite(bad_pwrite);
......@@ -184,6 +209,7 @@ static void test_extractor(int nrows, int nrowsets, BOOL expect_fail) {
assert(r == 0);
toku_set_func_malloc(NULL);
toku_set_func_realloc(NULL);
brtloader_set_os_fwrite(NULL);
toku_set_func_write(NULL);
toku_set_func_pwrite(NULL);
......@@ -206,7 +232,15 @@ static int nrows = 1;
static int nrowsets = 2;
static int usage(const char *progname) {
fprintf(stderr, "Usage:\n %s [-h] [-v] [-q] [-s] [-r %d] [--rowsets %d]\n", progname, nrows, nrowsets);
fprintf(stderr, "Usage: %s [options] directory\n", progname);
fprintf(stderr, "[-v] turn on verbose\n");
fprintf(stderr, "[-q] turn off verbose\n");
fprintf(stderr, "[-r %d] set the number of rows\n", nrows);
fprintf(stderr, "[--rowsets %d] set the number of rowsets\n", nrowsets);
fprintf(stderr, "[-s] set the small loader size factor\n");
fprintf(stderr, "[-m] inject big malloc and realloc errors\n");
fprintf(stderr, "[-f] inject write errors\n");
fprintf(stderr, "[-u] inject user errors\n");
return 1;
}
......@@ -229,8 +263,10 @@ int test_main (int argc, const char *argv[]) {
nrowsets = atoi(argv[0]);
} else if (strcmp(argv[0],"-s") == 0) {
toku_brtloader_set_size_factor(1);
} else if (strcmp(argv[0],"-f") == 0) {
do_write_errors = 1;
} else if (strcmp(argv[0],"-m") == 0) {
my_malloc_event = 1;
do_malloc_errors = 1;
} else if (strcmp(argv[0],"--max_error_limit") == 0 && argc >= 1) {
argc--; argv++;
max_error_limit = atoi(argv[0]);
......
......@@ -25,10 +25,11 @@ static void reset_event_counts(void) {
static void event_hit(void) {
}
static int do_user_errors = 0;
static int loader_poll_callback(void *UU(extra), float UU(progress)) {
int r;
event_count++;
if (event_count_trigger == event_count) {
if (do_user_errors && event_count_trigger == ++event_count) {
event_hit();
r = 1;
} else {
......@@ -37,10 +38,11 @@ static int loader_poll_callback(void *UU(extra), float UU(progress)) {
return r;
}
static int do_write_errors = 0;
static size_t bad_fwrite (const void *ptr, size_t size, size_t nmemb, FILE *stream) {
event_count++;
size_t r;
if (event_count_trigger == event_count) {
if (do_write_errors && event_count_trigger == ++event_count) {
event_hit();
errno = ENOSPC;
r = -1;
......@@ -55,8 +57,7 @@ static size_t bad_fwrite (const void *ptr, size_t size, size_t nmemb, FILE *stre
static ssize_t bad_write(int fd, const void * bp, size_t len) {
ssize_t r;
event_count++;
if (event_count_trigger == event_count) {
if (do_write_errors && event_count_trigger == ++event_count) {
event_hit();
errno = ENOSPC;
r = -1;
......@@ -68,8 +69,7 @@ static ssize_t bad_write(int fd, const void * bp, size_t len) {
static ssize_t bad_pwrite(int fd, const void * bp, size_t len, toku_off_t off) {
ssize_t r;
event_count++;
if (event_count_trigger == event_count) {
if (do_write_errors && event_count_trigger == ++event_count) {
event_hit();
errno = ENOSPC;
r = -1;
......@@ -79,7 +79,7 @@ static ssize_t bad_pwrite(int fd, const void * bp, size_t len, toku_off_t off) {
return r;
}
static int my_malloc_event = 0;
static int do_malloc_errors = 0;
static int my_malloc_count = 0, my_big_malloc_count = 0;
static int my_realloc_count = 0, my_big_realloc_count = 0;
......@@ -95,7 +95,7 @@ static void *my_malloc(size_t n) {
my_malloc_count++;
if (n >= 64*1024) {
my_big_malloc_count++;
if (my_malloc_event) {
if (do_malloc_errors) {
caller = __builtin_return_address(1);
if ((void*)toku_xmalloc <= caller && caller <= (void*)toku_malloc_report)
goto skip;
......@@ -118,7 +118,7 @@ static void *my_realloc(void *p, size_t n) {
my_realloc_count++;
if (n >= 64*1024) {
my_big_realloc_count++;
if (my_malloc_event) {
if (do_malloc_errors) {
caller = __builtin_return_address(1);
if ((void*)toku_xrealloc <= caller && caller <= (void*)toku_malloc_report)
goto skip;
......@@ -260,12 +260,14 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_
}
static int usage(const char *progname, int n) {
fprintf(stderr, "Usage:\n %s [-v] [-q] [-r %d] [-s] [-m] directory\n", progname, n);
fprintf(stderr, "Usage: %s [options] directory\n", progname);
fprintf(stderr, "[-v] turn on verbose\n");
fprintf(stderr, "[-q] turn off verbose\n");
fprintf(stderr, "[-r %d] set the number of rows\n", n);
fprintf(stderr, "[-s] set the small loader size factor\n");
fprintf(stderr, "[-m] inject big malloc failures\n");
fprintf(stderr, "[-m] inject big malloc and realloc errors\n");
fprintf(stderr, "[-f] inject write errors\n");
fprintf(stderr, "[-u] inject user errors\n");
return 1;
}
......@@ -285,8 +287,12 @@ int test_main (int argc, const char *argv[]) {
n = atoi(argv[0]);
} else if (strcmp(argv[0],"-s") == 0) {
toku_brtloader_set_size_factor(1);
} else if (strcmp(argv[0],"-f") == 0) {
do_write_errors = 1;
} else if (strcmp(argv[0],"-m") == 0) {
my_malloc_event = 1;
do_malloc_errors = 1;
} else if (strcmp(argv[0],"-u") == 0) {
do_user_errors = 1;
} else if (argc!=1) {
return usage(progname, n);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment