Commit 7a3f6089 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

#2821 refs[t:2821] fixup the loader tests

git-svn-id: file:///svn/toku/tokudb@22416 c7de825b-a66e-492c-adef-691d508d4ae1
parent 6f67fad6
...@@ -249,6 +249,10 @@ int toku_brt_loader_finish_extractor(BRTLOADER bl); ...@@ -249,6 +249,10 @@ int toku_brt_loader_finish_extractor(BRTLOADER bl);
int toku_brt_loader_get_error(BRTLOADER bl, int *loader_errno); int toku_brt_loader_get_error(BRTLOADER bl, int *loader_errno);
int brt_loader_lock_init(BRTLOADER bl);
void brt_loader_lock_destroy(BRTLOADER bl);
void brt_loader_set_fractal_workers_count_from_c(BRTLOADER bl);
C_END C_END
#endif #endif
...@@ -95,11 +95,28 @@ toku_brtloader_get_rowset_budget_for_testing (void) ...@@ -95,11 +95,28 @@ toku_brtloader_get_rowset_budget_for_testing (void)
return 16ULL*size_factor*1024ULL; return 16ULL*size_factor*1024ULL;
} }
int brt_loader_lock_init(BRTLOADER bl) {
invariant(!bl->mutex_init);
int r = toku_pthread_mutex_init(&bl->mutex, NULL);
if (r == 0)
bl->mutex_init = TRUE;
return r;
}
void brt_loader_lock_destroy(BRTLOADER bl) {
if (bl->mutex_init) {
int r = toku_pthread_mutex_destroy(&bl->mutex); resource_assert(r == 0);
bl->mutex_init = FALSE;
}
}
static void brt_loader_lock(BRTLOADER bl) { static void brt_loader_lock(BRTLOADER bl) {
invariant(bl->mutex_init);
int r = toku_pthread_mutex_lock(&bl->mutex); resource_assert(r == 0); int r = toku_pthread_mutex_lock(&bl->mutex); resource_assert(r == 0);
} }
static void brt_loader_unlock(BRTLOADER bl) { static void brt_loader_unlock(BRTLOADER bl) {
invariant(bl->mutex_init);
int r = toku_pthread_mutex_unlock(&bl->mutex); resource_assert(r == 0); int r = toku_pthread_mutex_unlock(&bl->mutex); resource_assert(r == 0);
} }
...@@ -312,10 +329,7 @@ int brtloader_open_temp_file (BRTLOADER bl, FIDX *file_idx) ...@@ -312,10 +329,7 @@ int brtloader_open_temp_file (BRTLOADER bl, FIDX *file_idx)
} }
void toku_brtloader_internal_destroy (BRTLOADER bl, BOOL is_error) { void toku_brtloader_internal_destroy (BRTLOADER bl, BOOL is_error) {
if (bl->mutex_init) { brt_loader_lock_destroy(bl);
int r = toku_pthread_mutex_destroy(&bl->mutex); resource_assert(r == 0);
bl->mutex_init = FALSE;
}
// These frees rely on the fact that if you free a NULL pointer then nothing bad happens. // These frees rely on the fact that if you free a NULL pointer then nothing bad happens.
toku_free(bl->dbs); toku_free(bl->dbs);
...@@ -406,12 +420,14 @@ static unsigned brt_loader_get_fractal_workers_count(BRTLOADER bl) { ...@@ -406,12 +420,14 @@ static unsigned brt_loader_get_fractal_workers_count(BRTLOADER bl) {
return w; return w;
} }
CILK_BEGIN
static void brt_loader_set_fractal_workers_count(BRTLOADER bl) { static void brt_loader_set_fractal_workers_count(BRTLOADER bl) {
brt_loader_lock(bl); brt_loader_lock(bl);
if (bl->fractal_workers == 0) if (bl->fractal_workers == 0)
bl->fractal_workers = cilk_worker_count; bl->fractal_workers = cilk_worker_count;
brt_loader_unlock(bl); brt_loader_unlock(bl);
} }
CILK_END
// To compute a merge, we have a certain amount of memory to work with. // To compute a merge, we have a certain amount of memory to work with.
// We perform only one fanin at a time. // We perform only one fanin at a time.
...@@ -539,10 +555,9 @@ int toku_brt_loader_internal_init (/* out */ BRTLOADER *blp, ...@@ -539,10 +555,9 @@ int toku_brt_loader_internal_init (/* out */ BRTLOADER *blp,
if (r!=0) { toku_brtloader_internal_destroy(bl, TRUE); return r; } if (r!=0) { toku_brtloader_internal_destroy(bl, TRUE); return r; }
} }
//printf("%s:%d toku_pthread_create\n", __FILE__, __LINE__); //printf("%s:%d toku_pthread_create\n", __FILE__, __LINE__);
{ {
int r = toku_pthread_mutex_init(&bl->mutex, NULL); int r = brt_loader_lock_init(bl);
if (r != 0) { toku_brtloader_internal_destroy(bl, TRUE); return r; } if (r != 0) { toku_brtloader_internal_destroy(bl, TRUE); return r; }
bl->mutex_init = TRUE;
} }
*blp = bl; *blp = bl;
...@@ -590,7 +605,6 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp, ...@@ -590,7 +605,6 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp,
bl->extractor_live = TRUE; bl->extractor_live = TRUE;
} else { } else {
result = r; result = r;
(void) toku_pthread_mutex_destroy(&bl->mutex);
(void) toku_brtloader_internal_destroy(bl, TRUE); (void) toku_brtloader_internal_destroy(bl, TRUE);
} }
} }
...@@ -3221,16 +3235,13 @@ static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, s ...@@ -3221,16 +3235,13 @@ static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, s
CILK_END CILK_END
#if 0 void brt_loader_set_fractal_workers_count_from_c(BRTLOADER bl) {
// C function for testing write_file_to_dbfile
int brt_loader_write_file_to_dbfile (int outfile, FIDX infile, BRTLOADER bl, const DESCRIPTOR descriptor, int progress_allocation) {
#if defined(__cilkplusplus) #if defined(__cilkplusplus)
return cilk::run(write_file_to_dbfile, outfile, infile, bl, descriptor, progress_allocation); cilk::run(brt_loader_set_fractal_workers_count, bl);
#else #else
return write_file_to_dbfile (outfile, infile, bl, descriptor, progress_allocation); brt_loader_set_fractal_workers_count (bl);
#endif #endif
} }
#endif
C_END C_END
...@@ -48,8 +48,10 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_ ...@@ -48,8 +48,10 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_
.temp_file_template = template, .temp_file_template = template,
.reserved_memory = 512*1024*1024, .reserved_memory = 512*1024*1024,
}; };
int r = brtloader_init_file_infos(&bl.file_infos); int r = brtloader_init_file_infos(&bl.file_infos); CKERR(r);
CKERR(r); r = brt_loader_lock_init(&bl); CKERR(r);
brt_loader_set_fractal_workers_count_from_c(&bl);
struct merge_fileset fs; struct merge_fileset fs;
init_merge_fileset(&fs); init_merge_fileset(&fs);
...@@ -142,6 +144,7 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_ ...@@ -142,6 +144,7 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_
brt_loader_destroy_error_callback(&bl.error_callback); brt_loader_destroy_error_callback(&bl.error_callback);
brt_loader_destroy_poll_callback(&bl.poll_callback); brt_loader_destroy_poll_callback(&bl.poll_callback);
brt_loader_lock_destroy(&bl);
r = queue_destroy(q2); r = queue_destroy(q2);
//if (r != 0) printf("WARNING%d r=%d\n", __LINE__, r); //if (r != 0) printf("WARNING%d r=%d\n", __LINE__, r);
......
...@@ -86,8 +86,10 @@ static void test_write_dbfile (char *template, int n, char *output_name) { ...@@ -86,8 +86,10 @@ static void test_write_dbfile (char *template, int n, char *output_name) {
.temp_file_template = template, .temp_file_template = template,
.reserved_memory = 512*1024*1024, .reserved_memory = 512*1024*1024,
}; };
int r = brtloader_init_file_infos(&bl.file_infos); int r = brtloader_init_file_infos(&bl.file_infos); CKERR(r);
CKERR(r); r = brt_loader_lock_init(&bl); CKERR(r);
brt_loader_set_fractal_workers_count_from_c(&bl);
struct merge_fileset fs; struct merge_fileset fs;
init_merge_fileset(&fs); init_merge_fileset(&fs);
...@@ -173,6 +175,7 @@ static void test_write_dbfile (char *template, int n, char *output_name) { ...@@ -173,6 +175,7 @@ static void test_write_dbfile (char *template, int n, char *output_name) {
verify_dbfile(n, output_name); verify_dbfile(n, output_name);
brt_loader_destroy_error_callback(&bl.error_callback); brt_loader_destroy_error_callback(&bl.error_callback);
brt_loader_lock_destroy(&bl);
} }
static int nrows = 1; static int nrows = 1;
......
...@@ -282,8 +282,10 @@ static void verify_dbfile(int n, int sorted_keys[], const char *sorted_vals[], c ...@@ -282,8 +282,10 @@ static void verify_dbfile(int n, int sorted_keys[], const char *sorted_vals[], c
.temp_file_template = template, .temp_file_template = template,
.reserved_memory = 512*1024*1024, .reserved_memory = 512*1024*1024,
}; };
int r = brtloader_init_file_infos(&bl.file_infos); int r = brtloader_init_file_infos(&bl.file_infos); CKERR(r);
CKERR(r); r = brt_loader_lock_init(&bl); CKERR(r);
brt_loader_set_fractal_workers_count_from_c(&bl);
struct merge_fileset fs; struct merge_fileset fs;
init_merge_fileset(&fs); init_merge_fileset(&fs);
...@@ -328,6 +330,7 @@ static void verify_dbfile(int n, int sorted_keys[], const char *sorted_vals[], c ...@@ -328,6 +330,7 @@ static void verify_dbfile(int n, int sorted_keys[], const char *sorted_vals[], c
destroy_merge_fileset(&fs); destroy_merge_fileset(&fs);
brtloader_fi_destroy(&bl.file_infos, FALSE); brtloader_fi_destroy(&bl.file_infos, FALSE);
brt_loader_lock_destroy(&bl);
// verify the dbfile // verify the dbfile
verify_dbfile(10, sorted_keys, sorted_vals, output_name); verify_dbfile(10, sorted_keys, sorted_vals, output_name);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment