Commit 9cab06f7 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Merge the instrumentation from 2499d+2571 onto the main line (since 2499d has...

Merge the instrumentation from 2499d+2571 onto the main line (since 2499d has been merged.)  Refs #2571.  [t:2571].
{{{
svn merge -r 19857:19905 https://svn.tokutek.com/tokudb/toku/tokudb.2499d+2571
}}}
.


git-svn-id: file:///svn/toku/tokudb@19906 c7de825b-a66e-492c-adef-691d508d4ae1
parent 450d693d
......@@ -591,11 +591,15 @@ static int finish_primary_rows (BRTLOADER bl) {
}
static void* extractor_thread (void *blv) {
BL_TRACE("extractor_init");
BRTLOADER bl = (BRTLOADER)blv;
while (1) {
void *item;
{
BL_TRACE("extractor");
int r = queue_deq(bl->primary_rowset_queue, &item, NULL, NULL);
printf("deq\n");
BL_TRACE("extractor_deq");
if (r==EOF) break;
assert(r==0); // other errors are arbitrarily bad.
}
......@@ -621,6 +625,7 @@ static void* extractor_thread (void *blv) {
for (int i=0; i<bl->N; i++) {
destroy_rowset(&bl->rows[i]); // destroy the rowset here so that if we spawn the sort_and_write, there won't be a race on destroying the rows (thanks to the cilk_sync 2 lines up)
}
BL_TRACE("extractor");
return 0;
}
......@@ -642,7 +647,9 @@ static int loader_do_put(BRTLOADER bl,
if (row_wont_fit(&bl->primary_rowset, 0)) {
// queue the rows for further processing by the extractor thread.
//printf("%s:%d please extract %ld\n", __FILE__, __LINE__, bl->primary_rowset.n_rows);
BL_TRACE("do_put");
enqueue_for_extraction(bl);
BL_TRACE("extract_enq");
init_rowset(&bl->primary_rowset);
}
return 0;
......@@ -651,8 +658,10 @@ static int loader_do_put(BRTLOADER bl,
static int finish_extractor (BRTLOADER bl) {
//printf("%s:%d now finishing extraction\n", __FILE__, __LINE__);
BL_TRACE("do_put");
if (bl->primary_rowset.n_rows>0) {
enqueue_for_extraction(bl);
BL_TRACE("extract_enqeue");
} else {
destroy_rowset(&bl->primary_rowset);
}
......@@ -666,6 +675,7 @@ static int finish_extractor (BRTLOADER bl) {
void *toku_pthread_retval;
int r = toku_pthread_join(bl->extractor_thread, &toku_pthread_retval);
assert(r==0 && toku_pthread_retval==NULL);
BL_TRACE("join_on_extractor");
}
{
int r = queue_destroy(bl->primary_rowset_queue);
......@@ -690,6 +700,7 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r
// if FLUSH is true then write all the buffered rows out.
// if primary_rowset is NULL then treat it as empty.
{
BL_TRACE("cilk_call");
int error_count = 0;
int *MALLOC_N(bl->N, error_codes);
......@@ -776,13 +787,17 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r
assert(0); // could not find the error code. This is an error in the program if we get here.
}
toku_free(error_codes);
BL_TRACE("extractor");
return r;
}
CILK_END
static int process_primary_rows (BRTLOADER bl, struct rowset *primary_rowset) {
BL_TRACE("extractor");
#if defined(__cilkplusplus)
return cilk::run(process_primary_rows_internal, bl, primary_rowset);
int r = cilk::run(process_primary_rows_internal, bl, primary_rowset);
BL_TRACE("cilk_return");
return r;
#else
return process_primary_rows_internal (bl, primary_rowset);
#endif
......@@ -1809,6 +1824,7 @@ int toku_loader_write_brt_from_q_in_C (BRTLOADER bl,
static void* fractal_thread (void *ftav) {
BL_TRACE("start_fractal_thread");
struct fractal_thread_args *fta = (struct fractal_thread_args *)ftav;
#if defined(__cilkplusplus)
int r = cilk::run(toku_loader_write_brt_from_q, fta->bl, fta->descriptor, fta->fd, fta->progress_allocation, fta->q);
......@@ -1816,6 +1832,7 @@ static void* fractal_thread (void *ftav) {
int r = toku_loader_write_brt_from_q(fta->bl, fta->descriptor, fta->fd, fta->progress_allocation, fta->q);
#endif
fta->errno_result = r;
BL_TRACE("fractal_thread");
return NULL;
}
......@@ -1870,7 +1887,9 @@ static int loader_do_i (BRTLOADER bl,
{
void *toku_pthread_retval;
BL_TRACE("do_i");
int r2 = toku_pthread_join(bl->fractal_threads[which_db], &toku_pthread_retval);
BL_TRACE("join_on_fractal");
assert(r2==0 && toku_pthread_retval==NULL);
assert(bl->fractal_threads_live[which_db]);
bl->fractal_threads_live[which_db] = FALSE;
......@@ -1887,6 +1906,7 @@ static int loader_do_i (BRTLOADER bl,
toku_free(rows->data); rows->data = NULL;
toku_free(rows->rows); rows->rows = NULL;
toku_free(fs->data_fidxs); fs->data_fidxs = NULL;
BL_TRACE("do_i");
return r;
}
......@@ -1905,6 +1925,7 @@ static int toku_brt_loader_close_internal (BRTLOADER bl)
int allocate_here = remaining_progress/(bl->N - i);
remaining_progress -= allocate_here;
//printf("%s:%d do_i(%d)\n", __FILE__, __LINE__, i);
BL_TRACE("close");
result = loader_do_i(bl, i, bl->dbs[i], bl->bt_compare_funs[i], bl->descriptors[i], fname_in_cwd,
allocate_here
);
......
......@@ -65,8 +65,8 @@ static FILE* bltrace_fp = NULL;
#endif
void bl_trace(const char *func __attribute__((unused)),
int line __attribute__ ((unused)),
char *str __attribute__((unused)))
int line __attribute__((unused)),
const char *str __attribute__((unused)))
{
#if BL_DO_TRACE
if ( bl_next_trace < BL_TRACE_MAX_ENTRIES ) {
......@@ -97,6 +97,7 @@ void bl_trace_end(void)
#if BL_DO_TRACE
char bltracefile[128];
sprintf(bltracefile, "brtloader_%d.trace", toku_os_getpid());;
printf("brtloader_%d.trace", toku_os_getpid());
bltrace_fp = fopen(bltracefile, "w");
assert(bltrace_fp != NULL);
for (int i=0;i<bl_next_trace;i++) {
......
......@@ -27,8 +27,9 @@ void toku_print_trace_mem(void) __attribute__((__visibility__("default")));
// some trace functions added for the bulk loader
void bl_trace(const char *func __attribute__((unused)),
int line __attribute__ ((unused)),
char *str __attribute__((unused))) __attribute__((unused));
int line __attribute__ ((unused)),
const char *str __attribute__((unused)))
__attribute__((unused));
void bl_trace_end(void) __attribute__((unused));
#define BL_DO_TRACE 0
......
#!/usr/local/bin/python
#!/usr/bin/env python
import sys
try:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment