Commit 9c891e55 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

fix some error paths in the brtloader extractor.

fix an uninitialized var problem in the ydbloader.
refs[t:2597] refs[t:2598]
merge -r 20145:20165 tokudb.2597 to tokudb



git-svn-id: file:///svn/toku/tokudb@20166 c7de825b-a66e-492c-adef-691d508d4ae1
parent e0fd8d4f
...@@ -508,11 +508,12 @@ int init_rowset (struct rowset *rows) ...@@ -508,11 +508,12 @@ int init_rowset (struct rowset *rows)
rows->n_rows = 0; rows->n_rows = 0;
rows->n_rows_limit = 100; rows->n_rows_limit = 100;
MALLOC_N(rows->n_rows_limit, rows->rows); MALLOC_N(rows->n_rows_limit, rows->rows);
int err = errno;
rows->n_bytes = 0; rows->n_bytes = 0;
rows->n_bytes_limit = 1024*size_factor*16; rows->n_bytes_limit = 1024*size_factor*16;
rows->data = (char *) toku_malloc(rows->n_bytes_limit); rows->data = (char *) toku_malloc(rows->n_bytes_limit);
if (rows->rows==NULL || rows->data==NULL) { if (rows->rows==NULL || rows->data==NULL) {
int r = errno; int r = errno ? errno : err;
toku_free(rows->rows); toku_free(rows->rows);
toku_free(rows->data); toku_free(rows->data);
rows->rows = NULL; rows->rows = NULL;
...@@ -608,14 +609,15 @@ static int finish_primary_rows (BRTLOADER bl) { ...@@ -608,14 +609,15 @@ static int finish_primary_rows (BRTLOADER bl) {
static void* extractor_thread (void *blv) { static void* extractor_thread (void *blv) {
BL_TRACE(blt_extractor_init); BL_TRACE(blt_extractor_init);
BRTLOADER bl = (BRTLOADER)blv; BRTLOADER bl = (BRTLOADER)blv;
int r = 0;
while (1) { while (1) {
void *item; void *item;
{ {
BL_TRACE(blt_extractor); BL_TRACE(blt_extractor);
int r = queue_deq(bl->primary_rowset_queue, &item, NULL, NULL); int rq = queue_deq(bl->primary_rowset_queue, &item, NULL, NULL);
BL_TRACE(blt_extract_deq); BL_TRACE(blt_extract_deq);
if (r==EOF) break; if (rq==EOF) break;
assert(r==0); // other errors are arbitrarily bad. assert(rq==0); // other errors are arbitrarily bad.
} }
struct rowset *primary_rowset = (struct rowset *)item; struct rowset *primary_rowset = (struct rowset *)item;
...@@ -625,21 +627,21 @@ static void* extractor_thread (void *blv) { ...@@ -625,21 +627,21 @@ static void* extractor_thread (void *blv) {
// Now we have some rows to output // Now we have some rows to output
{ {
int r = process_primary_rows(bl, primary_rowset); r = process_primary_rows(bl, primary_rowset);
if (r) if (r)
brt_loader_set_panic(bl, r); brt_loader_set_panic(bl, r);
} }
} }
//printf("%s:%d extractor finishing\n", __FILE__, __LINE__); //printf("%s:%d extractor finishing\n", __FILE__, __LINE__);
if (r == 0) {
int r = finish_primary_rows(bl); r = finish_primary_rows(bl);
if (r) if (r)
brt_loader_set_panic(bl, r); brt_loader_set_panic(bl, r);
}
BL_TRACE(blt_extractor); BL_TRACE(blt_extractor);
return NULL;
return 0;
} }
static void enqueue_for_extraction (BRTLOADER bl) { static void enqueue_for_extraction (BRTLOADER bl) {
...@@ -743,10 +745,9 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r ...@@ -743,10 +745,9 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r
pval.data = primary_rowset->data + prow->off + prow->klen; pval.data = primary_rowset->data + prow->off + prow->klen;
pval.size = prow->vlen; pval.size = prow->vlen;
{ {
int r = bl->generate_row_for_put(bl->dbs[i], bl->src_db, &skey, &sval, &pkey, &pval, NULL); int r = bl->generate_row_for_put(bl->dbs[i], bl->src_db, &skey, &sval, &pkey, &pval, NULL);
assert(r==0); assert(r==0); // LAZY
} }
if (row_wont_fit(rows, skey.size + sval.size)) { if (row_wont_fit(rows, skey.size + sval.size)) {
...@@ -755,6 +756,7 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r ...@@ -755,6 +756,7 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r
int progress_this_sort = 0; // fix? int progress_this_sort = 0; // fix?
int r = sort_and_write_rows(*rows, fs, bl, i, bl->dbs[i], compare, progress_this_sort); // cannot spawn this because of the race on rows. If we were to create a new rows, and if sort_and_write_rows were to destroy the rows it is passed, we could spawn it, however. int r = sort_and_write_rows(*rows, fs, bl, i, bl->dbs[i], compare, progress_this_sort); // cannot spawn this because of the race on rows. If we were to create a new rows, and if sort_and_write_rows were to destroy the rows it is passed, we could spawn it, however.
BL_TRACE(blt_sort_and_write_rows); BL_TRACE(blt_sort_and_write_rows);
init_rowset(rows); // we passed the contents of rows to sort_and_write_rows.
if (r!=0) { if (r!=0) {
error_codes[i] = r; error_codes[i] = r;
#if defined(__cilkplusplus) #if defined(__cilkplusplus)
...@@ -764,7 +766,6 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r ...@@ -764,7 +766,6 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r
#endif #endif
break; break;
} }
init_rowset(rows); // we passed the contents of rows to sort_and_write_rows.
} }
add_row(rows, &skey, &sval); add_row(rows, &skey, &sval);
...@@ -783,20 +784,23 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r ...@@ -783,20 +784,23 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r
} }
{ {
toku_free(skey.data); if (skey.flags) {
toku_free(sval.data); toku_free(skey.data); skey.data = NULL;
skey.data = sval.data = NULL; // set to NULL so that the final cleanup won't free them again. }
if (sval.flags) {
toku_free(sval.data); sval.data = NULL;
}
} }
} }
destroy_rowset(primary_rowset); destroy_rowset(primary_rowset);
toku_free(primary_rowset); toku_free(primary_rowset);
int r = 0; int r = 0;
if (error_count>0) { if (error_count > 0) {
for (int i=0; i<bl->N; i++) { for (int i=0; i<bl->N; i++) {
if (error_codes[i]) r = error_codes[i]; if (error_codes[i]) r = error_codes[i];
} }
assert(0); // could not find the error code. This is an error in the program if we get here. assert(r); // could not find the error code. This is an error in the program if we get here.
} }
toku_free(error_codes); toku_free(error_codes);
BL_TRACE(blt_extractor); BL_TRACE(blt_extractor);
......
...@@ -206,6 +206,7 @@ int toku_loader_create_loader(DB_ENV *env, ...@@ -206,6 +206,7 @@ int toku_loader_create_loader(DB_ENV *env,
loader->i->evals[i].flags = DB_DBT_REALLOC; loader->i->evals[i].flags = DB_DBT_REALLOC;
} }
loader->i->brt_loader = NULL; loader->i->brt_loader = NULL;
rval = 0;
} }
else { else {
char **XMALLOC_N(N, new_inames_in_env); char **XMALLOC_N(N, new_inames_in_env);
......
...@@ -18,10 +18,12 @@ static free_fun_t t_free = 0; ...@@ -18,10 +18,12 @@ static free_fun_t t_free = 0;
static realloc_fun_t t_realloc = 0; static realloc_fun_t t_realloc = 0;
void *toku_malloc(size_t size) { void *toku_malloc(size_t size) {
void *p;
if (t_malloc) if (t_malloc)
return t_malloc(size); p = t_malloc(size);
else else
return os_malloc(size); p = os_malloc(size);
return p;
} }
void * void *
...@@ -60,7 +62,6 @@ toku_xrealloc(void *v, size_t size) ...@@ -60,7 +62,6 @@ toku_xrealloc(void *v, size_t size)
void * void *
toku_tagmalloc(size_t size, enum typ_tag typtag) toku_tagmalloc(size_t size, enum typ_tag typtag)
{ {
//printf("%s:%d tagmalloc\n", __FILE__, __LINE__);
void *r = toku_malloc(size); void *r = toku_malloc(size);
if (!r) return 0; if (!r) return 0;
assert(size>sizeof(int)); assert(size>sizeof(int));
...@@ -71,14 +72,16 @@ toku_tagmalloc(size_t size, enum typ_tag typtag) ...@@ -71,14 +72,16 @@ toku_tagmalloc(size_t size, enum typ_tag typtag)
void * void *
toku_realloc(void *p, size_t size) toku_realloc(void *p, size_t size)
{ {
void *q;
if (t_realloc) if (t_realloc)
return t_realloc(p, size); q = t_realloc(p, size);
else else
return os_realloc(p, size); q = os_realloc(p, size);
return q;
} }
void void
toku_free(void* p) toku_free(void *p)
{ {
if (t_free) if (t_free)
t_free(p); t_free(p);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment