Commit 47f2199d authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Merge in

 * bradley's benchmarking changes, and
 * the big speedup in the sort-and-write-rows (slowness caused the lock in fidx2file).
Refs #2571. [t:2571]
{{{
svn merge -r 19909:20027 https://svn.tokutek.com/tokudb/toku/tokudb.2571
}}}
.


git-svn-id: file:///svn/toku/tokudb@20029 c7de825b-a66e-492c-adef-691d508d4ae1
parent 4b42c354
......@@ -103,6 +103,7 @@ endif
NEWBRT_O_FILES += brtloader.$(OEXT)
brtloader.$(OEXT): $(DEPEND_COMPILE)
ifeq ($(BRTLOADER),cilk)
brtloader.$(OEXT): brtloader.c
$(CILKPP) -DTOKU_ALLOW_DEPRECATED $(CILKFLAGS) -c $<
......
......@@ -37,6 +37,7 @@ typedef struct fidx { int idx; } FIDX;
static const FIDX FIDX_NULL __attribute__((__unused__)) = {-1};
static int fidx_is_null (const FIDX f) __attribute__((__unused__));
static int fidx_is_null (const FIDX f) { return f.idx==-1; }
FILE *toku_bl_fidx2file (BRTLOADER bl, FIDX i);
int brtloader_open_temp_file (BRTLOADER bl, FIDX*file_idx);
......@@ -56,7 +57,7 @@ int init_rowset (struct rowset *rows);
void destroy_rowset (struct rowset *rows);
void add_row (struct rowset *rows, DBT *key, DBT *val);
int loader_write_row(DBT *key, DBT *val, FIDX data, u_int64_t *dataoff, BRTLOADER bl);
int loader_write_row(DBT *key, DBT *val, FIDX data, FILE*, u_int64_t *dataoff, BRTLOADER bl);
int loader_read_row (FIDX f, DBT *key, DBT *val, BRTLOADER bl);
struct merge_fileset {
......@@ -183,7 +184,7 @@ int merge_files (struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_
#if defined(__cilkplusplus)
extern "Cilk++" {
#endif
int sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func,
int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func,
int progress_allocation);
int mergesort_row_array (struct row rows[/*n*/], int n, int which_db, DB *dest_db, brt_compare_func, BRTLOADER, struct rowset *);
......
......@@ -278,11 +278,11 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp,
BRTLOADER CALLOC(bl); // initialized to all zeros (hence CALLOC)
if (!bl) return errno;
BL_TRACE("calibrate begin");
#if BL_DO_TRACE
BL_TRACE(blt_calibrate_begin);
sleep(1);
BL_TRACE(blt_calibrate_done);
#endif
BL_TRACE("calibrate done");
bl->panic = 0;
bl->panic_errno = 0;
......@@ -336,7 +336,7 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp,
bl->extractor_live = TRUE;
*blp = bl;
BL_TRACE("open");
BL_TRACE(blt_open);
return 0;
}
......@@ -346,7 +346,8 @@ static void brt_loader_set_panic(BRTLOADER bl, int error) {
brt_loader_set_error(&bl->error_callback, error, NULL, 0, NULL, NULL);
}
static FILE *bl_fidx2file (BRTLOADER bl, FIDX i) {
// One of the tests uses this.
FILE *toku_bl_fidx2file (BRTLOADER bl, FIDX i) {
{ int r2 = toku_pthread_mutex_lock(&bl->file_infos.lock); assert(r2==0); }
assert(i.idx >=0 && i.idx < bl->file_infos.n_files);
assert(bl->file_infos.file_infos[i.idx].is_open);
......@@ -355,7 +356,7 @@ static FILE *bl_fidx2file (BRTLOADER bl, FIDX i) {
return result;
}
static int bl_fwrite(void *ptr, size_t size, size_t nmemb, FIDX streami, BRTLOADER bl)
static int bl_fwrite(void *ptr, size_t size, size_t nmemb, FILE *stream, BRTLOADER bl)
/* Effect: this is a wrapper for fwrite that returns 0 on success, otherwise returns an error number.
* Arguments:
* ptr the data to be writen.
......@@ -366,7 +367,6 @@ static int bl_fwrite(void *ptr, size_t size, size_t nmemb, FIDX streami, BRTLOAD
* Return value: 0 on success, an error number otherwise.
*/
{
FILE *stream = bl_fidx2file(bl, streami);
size_t r = do_fwrite(ptr, size, nmemb, stream);
if (r!=nmemb) {
int e;
......@@ -392,7 +392,7 @@ static int bl_fread (void *ptr, size_t size, size_t nmemb, FIDX streami, BRTLOAD
* Return value: 0 on success, an error number otherwise.
*/
{
FILE *stream = bl_fidx2file(bl, streami);
FILE *stream = toku_bl_fidx2file(bl, streami);
size_t r = fread(ptr, size, nmemb, stream);
if (r==0) {
if (feof(stream)) return EOF;
......@@ -409,7 +409,7 @@ static int bl_fread (void *ptr, size_t size, size_t nmemb, FIDX streami, BRTLOAD
}
}
static int bl_write_dbt (DBT *dbt, FIDX datafile, uint64_t *dataoff, BRTLOADER bl)
static int bl_write_dbt (DBT *dbt, FILE* datafile, uint64_t *dataoff, BRTLOADER bl)
{
int r;
int dlen = dbt->size;
......@@ -437,7 +437,7 @@ static int bl_read_dbt (/*in*/DBT *dbt, FIDX datafile, BRTLOADER bl)
return 0;
}
int loader_write_row(DBT *key, DBT *val, FIDX data, u_int64_t *dataoff, BRTLOADER bl)
int loader_write_row(DBT *key, DBT *val, FIDX data, FILE *dataf, u_int64_t *dataoff, BRTLOADER bl)
/* Effect: Given a key and a val (both DBTs), write them to a file. Increment *dataoff so that it's up to date.
* Arguments:
* key, val write these.
......@@ -451,8 +451,8 @@ int loader_write_row(DBT *key, DBT *val, FIDX data, u_int64_t *dataoff, BRTLOADE
//int vlen = val->size;
int r;
// we have a chance to handle the errors because when we close we can delete all the files.
if ((r=bl_write_dbt(key, data, dataoff, bl))) return r;
if ((r=bl_write_dbt(val, data, dataoff, bl))) return r;
if ((r=bl_write_dbt(key, dataf, dataoff, bl))) return r;
if ((r=bl_write_dbt(val, dataf, dataoff, bl))) return r;
{ int r2 = toku_pthread_mutex_lock(&bl->file_infos.lock); assert(r2==0); }
bl->file_infos.file_infos[data.idx].n_rows++;
{ int r2 = toku_pthread_mutex_unlock(&bl->file_infos.lock); assert(r2==0); }
......@@ -527,13 +527,6 @@ static int row_wont_fit (struct rowset *rows, size_t size)
return (data_buffer_limit < rows->n_bytes + size);
}
static void reset_rows (struct rowset *rows)
/* Effect: Reset the rows to an empty collection (but reuse any allocated space) */
{
rows->n_bytes = 0;
rows->n_rows = 0;
}
void add_row (struct rowset *rows, DBT *key, DBT *val)
/* Effect: add a row to a collection. */
{
......@@ -563,17 +556,20 @@ void add_row (struct rowset *rows, DBT *key, DBT *val)
static int process_primary_rows (BRTLOADER bl, struct rowset *primary_rowset);
CILK_BEGIN
static int finish_primary_rows_internal (BRTLOADER bl) {
// now we have been asked to finish up.
static int finish_primary_rows_internal (BRTLOADER bl)
// now we have been asked to finish up.
// Be sure to destroy the rowsets.
{
int ra[bl->N];
cilk_for (int i = 0; i < bl->N; i++) {
struct rowset *rows = &(bl->rows[i]);
//printf("%s:%d extractor finishing index %d with %ld rows\n", __FILE__, __LINE__, i, rows->n_rows);
int progress_this_sort = 0; // fix?
ra[i] = /*cilk_spawn*/sort_and_write_rows(rows, &(bl->fs[i]), bl, i, bl->dbs[i], bl->bt_compare_funs[i], progress_this_sort);
ra[i] = cilk_spawn sort_and_write_rows(*rows, &(bl->fs[i]), bl, i, bl->dbs[i], bl->bt_compare_funs[i], progress_this_sort);
zero_rowset(rows);
}
// cilk_sync;
// Implicit cilk_sync after that cilk_for loop.
int r = 0;
for (int i = 0; i < bl->N; i++)
......@@ -593,14 +589,14 @@ static int finish_primary_rows (BRTLOADER bl) {
}
static void* extractor_thread (void *blv) {
BL_TRACE("extractor_init");
BL_TRACE(blt_extractor_init);
BRTLOADER bl = (BRTLOADER)blv;
while (1) {
void *item;
{
BL_TRACE("extractor");
BL_TRACE(blt_extractor);
int r = queue_deq(bl->primary_rowset_queue, &item, NULL, NULL);
BL_TRACE("extractor_deq");
BL_TRACE(blt_extract_deq);
if (r==EOF) break;
assert(r==0); // other errors are arbitrarily bad.
}
......@@ -623,10 +619,7 @@ static void* extractor_thread (void *blv) {
int r = finish_primary_rows(bl);
r = r; // RFP 2578 assert(r==0); // !!! should deal with this.
for (int i=0; i<bl->N; i++) {
destroy_rowset(&bl->rows[i]); // destroy the rowset here so that if we spawn the sort_and_write, there won't be a race on destroying the rows (thanks to the cilk_sync 2 lines up)
}
BL_TRACE("extractor");
BL_TRACE(blt_extractor);
return 0;
}
......@@ -648,9 +641,9 @@ static int loader_do_put(BRTLOADER bl,
if (row_wont_fit(&bl->primary_rowset, 0)) {
// queue the rows for further processing by the extractor thread.
//printf("%s:%d please extract %ld\n", __FILE__, __LINE__, bl->primary_rowset.n_rows);
BL_TRACE("do_put");
BL_TRACE(blt_do_put);
enqueue_for_extraction(bl);
BL_TRACE("extract_enq");
BL_TRACE(blt_extract_enq);
init_rowset(&bl->primary_rowset);
}
return 0;
......@@ -659,10 +652,10 @@ static int loader_do_put(BRTLOADER bl,
static int finish_extractor (BRTLOADER bl) {
//printf("%s:%d now finishing extraction\n", __FILE__, __LINE__);
BL_TRACE("do_put");
BL_TRACE(blt_do_put);
if (bl->primary_rowset.n_rows>0) {
enqueue_for_extraction(bl);
BL_TRACE("extract_enqeue");
BL_TRACE(blt_extract_enq);
} else {
destroy_rowset(&bl->primary_rowset);
}
......@@ -677,7 +670,7 @@ static int finish_extractor (BRTLOADER bl) {
int r = toku_pthread_join(bl->extractor_thread, &toku_pthread_retval);
assert(r==0 && toku_pthread_retval==NULL);
bl->extractor_live = FALSE;
BL_TRACE("join_on_extractor");
BL_TRACE(blt_join_on_extractor);
}
{
int r = queue_destroy(bl->primary_rowset_queue);
......@@ -702,7 +695,6 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r
// if FLUSH is true then write all the buffered rows out.
// if primary_rowset is NULL then treat it as empty.
{
BL_TRACE("cilk_call");
int error_count = 0;
int *MALLOC_N(bl->N, error_codes);
......@@ -741,10 +733,10 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r
if (row_wont_fit(rows, skey.size + sval.size)) {
//printf("rows.n_rows=%ld\n", rows.n_rows);
BL_TRACE("puts");
BL_TRACE(blt_extractor);
int progress_this_sort = 0; // fix?
int r = sort_and_write_rows(rows, fs, bl, i, bl->dbs[i], compare, progress_this_sort); // cannot spawn this because of the race on rows. If we were to create a new rows, and if sort_and_write_rows were to destroy the rows it is passed, we could spawn it, however.
BL_TRACE("sort_and_write_rows");
int r = sort_and_write_rows(*rows, fs, bl, i, bl->dbs[i], compare, progress_this_sort); // cannot spawn this because of the race on rows. If we were to create a new rows, and if sort_and_write_rows were to destroy the rows it is passed, we could spawn it, however.
BL_TRACE(blt_sort_and_write_rows);
if (r!=0) {
error_codes[i] = r;
#if defined(__cilkplusplus)
......@@ -754,7 +746,7 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r
#endif
break;
}
reset_rows(rows);
init_rowset(rows); // we passed the contents of rows to sort_and_write_rows.
}
add_row(rows, &skey, &sval);
......@@ -789,20 +781,20 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r
assert(0); // could not find the error code. This is an error in the program if we get here.
}
toku_free(error_codes);
BL_TRACE("extractor");
BL_TRACE(blt_extractor);
return r;
}
CILK_END
static int process_primary_rows (BRTLOADER bl, struct rowset *primary_rowset) {
BL_TRACE("extractor");
BL_TRACE(blt_extractor);
#if defined(__cilkplusplus)
int r = cilk::run(process_primary_rows_internal, bl, primary_rowset);
BL_TRACE("cilk_return");
return r;
#else
return process_primary_rows_internal (bl, primary_rowset);
int r = process_primary_rows_internal (bl, primary_rowset);
#endif
BL_TRACE(blt_extractor);
return r;
}
......@@ -1090,7 +1082,7 @@ static int update_progress (int N,
}
CILK_BEGIN
int sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func compare,
int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func compare,
int progress_allocation)
/* Effect: Given a rowset, sort it and write it to a temporary file.
* Arguments:
......@@ -1100,32 +1092,38 @@ int sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, BRTLOADE
* dest_db the DB, needed for the comparison function.
* compare The comparison function.
* Returns 0 on success, otherwise an error number.
* Destroy the rowset after finishing it.
*/
{
//printf(" sort_and_write use %d progress=%d fin at %d\n", progress_allocation, bl->progress, bl->progress+progress_allocation);
FIDX sfile;
FIDX sfile = FIDX_NULL;
u_int64_t soffset=0;
// TODO: erase the files, and deal with all the cleanup on error paths
//printf("%s:%d sort_rows n_rows=%ld\n", __FILE__, __LINE__, rows->n_rows);
int r = sort_rows(rows, which_db, dest_db, compare, bl);
//bl_time_t before_sort = bl_time_now();
int r = sort_rows(&rows, which_db, dest_db, compare, bl);
if (r!=0) {
return r;
}
//bl_time_t after_sort = bl_time_now();
r = update_progress(progress_allocation/2, bl, "sorted rows");
progress_allocation -= progress_allocation/2;
if (r!=0) return r;
r = extend_fileset(bl, fs, &sfile);
FILE *sstream = toku_bl_fidx2file(bl, sfile);
if (r!=0) return r;
for (size_t i=0; i<rows->n_rows; i++) {
DBT skey; memset(&skey, 0, sizeof skey); skey.data = rows->data + rows->rows[i].off; skey.size=rows->rows[i].klen;
DBT sval; memset(&sval, 0, sizeof sval); sval.data = rows->data + rows->rows[i].off + rows->rows[i].klen; sval.size=rows->rows[i].vlen;
for (size_t i=0; i<rows.n_rows; i++) {
DBT skey = make_dbt(rows.data + rows.rows[i].off, rows.rows[i].klen);
DBT sval = make_dbt(rows.data + rows.rows[i].off + rows.rows[i].klen, rows.rows[i].vlen);
r = loader_write_row(&skey, &sval, sfile, &soffset, bl);
r = loader_write_row(&skey, &sval, sfile, sstream, &soffset, bl);
if (r!=0) return r;
}
r = brtloader_fi_close(&bl->file_infos, sfile); if (r!=0) return r;
destroy_rowset(&rows);
//bl_time_t after_write = bl_time_now();
return update_progress(progress_allocation, bl, "wrote sorted");
}
CILK_END
......@@ -1134,9 +1132,9 @@ CILK_END
int brt_loader_sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func compare,
int progress_allocation) {
#if defined(__cilkplusplus)
return cilk::run(sort_and_write_rows, rows, fs, bl, which_db, dest_db, compare, progress_allocation);
return cilk::run(sort_and_write_rows, *rows, fs, bl, which_db, dest_db, compare, progress_allocation);
#else
return sort_and_write_rows (rows, fs, bl, which_db, dest_db, compare, progress_allocation);
return sort_and_write_rows (*rows, fs, bl, which_db, dest_db, compare, progress_allocation);
#endif
}
......@@ -1158,6 +1156,8 @@ static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sou
* Return value: 0 on success, otherwise an error number.
*/
{
FILE *dest_stream = to_q ? NULL : toku_bl_fidx2file(bl, dest_data);
//printf(" merge_some_files progress=%d fin at %d\n", bl->progress, bl->progress+progress_allocation);
DBT keys[n_sources];
DBT vals[n_sources];
......@@ -1179,7 +1179,9 @@ static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sou
u_int64_t n_rows = 0;
// load pqueue with first value from each source
for (int i=0; i<n_sources; i++) {
BL_TRACE_QUIET(blt_do_i);
int r = loader_read_row(srcs_data[i], &keys[i], &vals[i], bl);
BL_TRACE_QUIET(blt_read_row);
if (r==EOF) continue; // if the file is empty, don't initialize the pqueue.
if (r!=0) return r;
......@@ -1233,7 +1235,9 @@ static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sou
}
if (to_q) {
if (row_wont_fit(output_rowset, keys[mini].size + vals[mini].size)) {
BL_TRACE(blt_do_i);
r = queue_enq(q, (void*)output_rowset, 1, NULL);
BL_TRACE(blt_fractal_enq);
assert(r==0);
MALLOC(output_rowset);
assert(output_rowset);
......@@ -1243,20 +1247,22 @@ static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sou
add_row(output_rowset, &keys[mini], &vals[mini]);
} else {
// write it to the dest file
r = loader_write_row(&keys[mini], &vals[mini], dest_data, &dataoff[mini], bl);
r = loader_write_row(&keys[mini], &vals[mini], dest_data, dest_stream, &dataoff[mini], bl);
if (r!=0) return r;
}
{
// read next row from file that just sourced min value
BL_TRACE_QUIET(blt_do_i);
r = loader_read_row(srcs_data[mini], &keys[mini], &vals[mini], bl);
BL_TRACE_QUIET(blt_read_row);
if (r!=0) {
if (feof(bl_fidx2file(bl, srcs_data[mini]))) {
if (feof(toku_bl_fidx2file(bl, srcs_data[mini]))) {
// on feof, queue size permanently smaller
toku_free(keys[mini].data);
toku_free(vals[mini].data);
} else {
r = ferror(bl_fidx2file(bl, srcs_data[mini]));
r = ferror(toku_bl_fidx2file(bl, srcs_data[mini]));
assert(r!=0);
return r;
}
......@@ -1290,8 +1296,10 @@ static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sou
}
}
if (to_q) {
int r = queue_enq(q, (void*)output_rowset, 1, NULL);
assert(r==0);
BL_TRACE(blt_do_i);
int r = queue_enq(q, (void*)output_rowset, 1, NULL);
BL_TRACE(blt_fractal_enq);
assert(r==0);
}
pqueue_free(pq);
toku_free(pq_nodes);
......@@ -1687,6 +1695,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
int64_t n_pivots=0; // number of pivots in pivots_file
FIDX pivots_file; // the file
brtloader_open_temp_file (bl, &pivots_file);
FILE *pivots_stream = toku_bl_fidx2file(bl, pivots_file);
// The blocks_array will contain all the block numbers that correspond to the pivots. Generally there should be one more block than pivot.
struct subtrees_info sts;
......@@ -1731,7 +1740,9 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
while (1) {
void *item;
{
BL_TRACE(blt_fractal_thread);
int rr = queue_deq(q, &item, NULL, NULL);
BL_TRACE(blt_fractal_deq);
if (rr == EOF) break;
if (rr != 0) {
r=rr; goto error;
......@@ -1752,7 +1763,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
allocate_node(&sts, lblock, est, lbuf->local_fingerprint);
n_pivots++;
if ((r=bl_write_dbt(&key, pivots_file, NULL, bl))) {
if ((r=bl_write_dbt(&key, pivots_stream, NULL, bl))) {
// how to handle errors in fractal thread?
assert(r==0); // this always fails.
return r;
......@@ -1787,7 +1798,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
{
DBT key=make_dbt(0,0); // must write an extra DBT into the pivots file.
r = bl_write_dbt(&key, pivots_file, NULL, bl);
r=bl_write_dbt(&key, pivots_stream, NULL, bl);
if (r) goto error; // RFP2578
}
......@@ -1840,6 +1851,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
error:
subtrees_info_destroy(&sts);
dbout_destroy(&out);
BL_TRACE(blt_fractal_thread);
return r; // RFP2578
}
......@@ -1861,7 +1873,7 @@ int toku_loader_write_brt_from_q_in_C (BRTLOADER bl,
static void* fractal_thread (void *ftav) {
BL_TRACE("start_fractal_thread");
BL_TRACE(blt_start_fractal_thread);
struct fractal_thread_args *fta = (struct fractal_thread_args *)ftav;
#if defined(__cilkplusplus)
int r = cilk::run(toku_loader_write_brt_from_q, fta->bl, fta->descriptor, fta->fd, fta->progress_allocation, fta->q);
......@@ -1869,7 +1881,6 @@ static void* fractal_thread (void *ftav) {
int r = toku_loader_write_brt_from_q (fta->bl, fta->descriptor, fta->fd, fta->progress_allocation, fta->q);
#endif
fta->errno_result = r;
BL_TRACE("fractal_thread");
return NULL;
}
......@@ -1886,7 +1897,7 @@ static int loader_do_i (BRTLOADER bl,
//printf("doing i use %d progress=%d fin at %d\n", progress_allocation, bl->progress, bl->progress+progress_allocation);
struct merge_fileset *fs = &(bl->fs[which_db]);
struct rowset *rows = &(bl->rows[which_db]);
assert(rows->data==NULL); // the rows should be all cleaned up alrea
assert(rows->data==NULL); // the rows should be all cleaned up already
// a better allocation would be to figure out roughly how many merge passes we'll need.
int allocation_for_merge = (2*progress_allocation)/3;
......@@ -1901,6 +1912,7 @@ static int loader_do_i (BRTLOADER bl,
int fd = open(new_fname, O_RDWR| O_CREAT | O_BINARY, mode);
assert(fd>=0);
// This structure must stay live until the join below.
struct fractal_thread_args fta = {bl,
descriptor,
fd,
......@@ -1918,22 +1930,25 @@ static int loader_do_i (BRTLOADER bl,
assert(bl->fractal_threads_live[which_db]==FALSE);
bl->fractal_threads_live[which_db] = TRUE;
r = merge_files(fs, bl, which_db, dest_db, compare, allocation_for_merge, bl->fractal_queues[which_db]);
r = merge_files(fs, bl, which_db, dest_db, compare, allocation_for_merge, bl->fractal_queues[which_db]);
void *toku_pthread_retval;
BL_TRACE("do_i");
int r2 = toku_pthread_join(bl->fractal_threads[which_db], &toku_pthread_retval);
BL_TRACE("join_on_fractal");
assert(r2==0 && toku_pthread_retval==NULL);
assert(bl->fractal_threads_live[which_db]);
bl->fractal_threads_live[which_db] = FALSE;
if (r == 0) r = fta.errno_result;
}
{
int r2 = queue_destroy(bl->fractal_queues[which_db]);
assert(r2==0);
bl->fractal_queues[which_db]=NULL;
{
void *toku_pthread_retval;
BL_TRACE(blt_do_i);
int r2 = toku_pthread_join(bl->fractal_threads[which_db], &toku_pthread_retval);
assert(fta.bl==bl); // this is a gratuitous assertion to make sure that the fta struct is still live here. A previous bug but that struct into a C block statement.
BL_TRACE(blt_join_on_fractal);
assert(r2==0 && toku_pthread_retval==NULL);
assert(bl->fractal_threads_live[which_db]);
bl->fractal_threads_live[which_db] = FALSE;
if (r == 0) r = fta.errno_result;
}
{
int r2 = queue_destroy(bl->fractal_queues[which_db]);
assert(r2==0);
bl->fractal_queues[which_db]=NULL;
}
}
error: // this is the cleanup code. Even if r==0 (no error) we fall through to here.
......@@ -1941,7 +1956,7 @@ static int loader_do_i (BRTLOADER bl,
toku_free(rows->data); rows->data = NULL;
toku_free(rows->rows); rows->rows = NULL;
toku_free(fs->data_fidxs); fs->data_fidxs = NULL;
BL_TRACE("do_i");
BL_TRACE(blt_do_i);
return r;
}
......@@ -1949,7 +1964,7 @@ static int toku_brt_loader_close_internal (BRTLOADER bl)
/* Effect: Close the bulk loader.
* Return all the file descriptors in the array fds. */
{
BL_TRACE("puts");
BL_TRACE(blt_do_put);
int result = 0;
int remaining_progress = PROGRESS_MAX;
// RFP cilk_for, no breaks in the loop
......@@ -1960,7 +1975,7 @@ static int toku_brt_loader_close_internal (BRTLOADER bl)
int allocate_here = remaining_progress/(bl->N - i);
remaining_progress -= allocate_here;
//printf("%s:%d do_i(%d)\n", __FILE__, __LINE__, i);
BL_TRACE("close");
BL_TRACE(blt_close);
result = loader_do_i(bl, i, bl->dbs[i], bl->bt_compare_funs[i], bl->descriptors[i], fname_in_cwd,
allocate_here
);
......@@ -1977,7 +1992,7 @@ static int toku_brt_loader_close_internal (BRTLOADER bl)
assert(bl->progress == PROGRESS_MAX);
error:
brtloader_destroy(bl, (BOOL)(result!=0));
BL_TRACE("close");
BL_TRACE(blt_close);
BL_TRACE_END;
return result;
}
......@@ -2241,7 +2256,9 @@ static int setup_nonleaf_block (int n_children,
int r = read_some_pivots(pivots_file, n_children, bl, pivots);
assert(r==0);
if ((r=bl_write_dbt(&pivots[n_children-1], next_pivots_file, NULL, bl))) return r;
FILE *next_pivots_stream = toku_bl_fidx2file(bl, next_pivots_file);
if ((r=bl_write_dbt(&pivots[n_children-1], next_pivots_stream, NULL, bl))) return r;
// The last pivot was written to the next_pivots file, so we free it now instead of returning it.
toku_free(pivots[n_children-1].data);
pivots[n_children-1] = zero_dbt;
......@@ -2354,7 +2371,7 @@ static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, s
// 2) We put the 15 pivots and 16 blocks into an non-leaf node.
// 3) We put the 16th pivot into the next pivots file.
{
int r = fseek(bl_fidx2file(bl, pivots_fidx), 0, SEEK_SET);
int r = fseek(toku_bl_fidx2file(bl, pivots_fidx), 0, SEEK_SET);
if (r!=0) { assert(errno!=0); return errno; }
}
......
......@@ -185,7 +185,7 @@ static void test_read_write_rows (char *template) {
for (int i=0; i<3; i++) {
DBT key = {.size=strlen(keystrings[i]), .data=keystrings[i]};
DBT val = {.size=strlen(valstrings[i]), .data=valstrings[i]};
r = loader_write_row(&key, &val, file, &dataoff, &bl);
r = loader_write_row(&key, &val, file, toku_bl_fidx2file(&bl, file), &dataoff, &bl);
CKERR(r);
actual_size+=key.size + val.size + 8;
}
......
......@@ -9,6 +9,54 @@
#include "rdtsc.h"
#include "trace_mem.h"
#if BL_DO_TRACE && BL_SIMPLE_TRACE
static double saved_scale_factor = 1e-9; // approximate until we get the actual scale factor
static unsigned long long first_time = 0;
static __thread unsigned long long prev_time = 0;
static unsigned long long trace_hist[BLT_LIMIT];
unsigned long long bl_trace (const BL_TRACE_ENUM l, const int quiet) {
assert(l<BLT_LIMIT);
unsigned long long t = rdtsc();
if (first_time==0) {
first_time = t;
}
if (prev_time != 0) {
unsigned long long diff = t-prev_time;
if (l==blt_calibrate_done) {
saved_scale_factor = 1/(double)diff;
}
if (!quiet)
printf("-> %30s %21llu %21llu %13.6fs\n", blt_to_string(l), trace_hist[l], t, (t-first_time)*saved_scale_factor);
trace_hist[l] += diff;
}
prev_time = t;
return t;
}
bl_time_t bl_time_now(void) {
return rdtsc();
}
double bl_time_diff(const bl_time_t a, const bl_time_t b) {
return (a-b)*saved_scale_factor;
}
void bl_trace_end(void) {
double scale_factor = trace_hist[blt_calibrate_done];
unsigned long long total = 0;
for (BL_TRACE_ENUM i=0; i<BLT_LIMIT; i++) {
total+=trace_hist[i];
}
for (BL_TRACE_ENUM i=0; i<BLT_LIMIT; i++) {
printf("%25s %20lld %8.3fs %5.1f%%\n", blt_to_string(i), trace_hist[i], trace_hist[i]/scale_factor, 100.0*trace_hist[i]/(double)total);
}
}
#elif BL_DO_TRACE && !BL_SIMPLE_TRACE
// customize this as required
#define NTRACE 0
#if NTRACE
......@@ -94,6 +142,7 @@ void bl_trace(const char *func __attribute__((unused)),
void bl_trace_end(void)
{
#error
#if BL_DO_TRACE
char bltracefile[128];
sprintf(bltracefile, "brtloader_%d.trace", toku_os_getpid());;
......@@ -113,3 +162,4 @@ void bl_trace_end(void)
#endif
}
#endif
......@@ -9,6 +9,17 @@
extern "C" {
#endif
// Define this even when traces are compiled out so we don't have to recompile things like scanscan.c
// print the trace
void toku_print_trace_mem(void) __attribute__((__visibility__("default")));
#define BL_DO_TRACE 1
// BL_SIMPLE_TRACE 1 is Bradley's in-memory trace analysis.
// BL_SIMPLE_TRACE 0 is Dave's post-processing analysis.
#define BL_SIMPLE_TRACE 1
#define BL_TRACE_PRINT 0
#if BL_DO_TRACE && !BL_SIMPLE_TRACE
// a circular log of trace entries is maintained in memory. the trace
// entry consists of a string pointer, an integer, and the processor
// timestamp. there are functions to add an entry to the end of the
......@@ -22,9 +33,6 @@ extern "C" {
// pointer, a number, and the processor timestamp
void toku_add_trace_mem(const char *str, int n) __attribute__((__visibility__("default")));
// print the trace
void toku_print_trace_mem(void) __attribute__((__visibility__("default")));
// some trace functions added for the bulk loader
void bl_trace(const char *func __attribute__((unused)),
int line __attribute__ ((unused)),
......@@ -32,15 +40,94 @@ void bl_trace(const char *func __attribute__((unused)),
__attribute__((unused));
void bl_trace_end(void) __attribute__((unused));
#define BL_DO_TRACE 0
#define BL_TRACE_PRINT 0
#if BL_DO_TRACE
#define BL_TRACE(str) bl_trace(__FUNCTION__, __LINE__, str)
#define BL_TRACE(sym) bl_trace(__FUNCTION__, __LINE__, #sym)
#define BL_TRACE_END bl_trace_end()
#elif BL_DO_TRACE && BL_SIMPLE_TRACE
typedef enum bl_trace_enum {BLT_START,
blt_calibrate_begin,
blt_calibrate_done,
blt_open,
// Time spent in the extractor
blt_extractor_init,
blt_extractor,
blt_extract_deq,
blt_sort_and_write_rows,
// Time spent by the main thread in parallel to the extractor
blt_do_put,
blt_extract_enq,
blt_join_on_extractor,
// Time spent in the fractal thread
blt_fractal_thread,
blt_fractal_deq,
// Time spent by the main thread in parallel to the fractal thread
blt_start_fractal_thread,
blt_do_i,
blt_read_row,
blt_fractal_enq,
blt_join_on_fractal,
blt_close,
//
BLT_LIMIT}
BL_TRACE_ENUM;
static const char * blt_to_string (BL_TRACE_ENUM) __attribute__((__unused__));
#define BLSCASE(s) case s: return #s
static const char * blt_to_string (BL_TRACE_ENUM i) {
switch(i) {
BLSCASE(BLT_START);
BLSCASE(blt_calibrate_begin);
BLSCASE(blt_calibrate_done);
BLSCASE(blt_close);
BLSCASE(blt_do_i);
BLSCASE(blt_do_put);
BLSCASE(blt_extract_deq);
BLSCASE(blt_extract_enq);
BLSCASE(blt_extractor);
BLSCASE(blt_extractor_init);
BLSCASE(blt_fractal_deq);
BLSCASE(blt_fractal_enq);
BLSCASE(blt_fractal_thread);
BLSCASE(blt_join_on_extractor);
BLSCASE(blt_join_on_fractal);
BLSCASE(blt_open);
BLSCASE(blt_read_row);
BLSCASE(blt_sort_and_write_rows);
BLSCASE(blt_start_fractal_thread);
BLSCASE(BLT_LIMIT);
}
return NULL;
}
typedef unsigned long long bl_time_t;
bl_time_t bl_trace(const BL_TRACE_ENUM, const int quiet);
bl_time_t bl_time_now(void);
double bl_time_diff(const bl_time_t a, const bl_time_t b);
void bl_trace_end(void);
#define BL_TRACE(sym) bl_trace(sym, 0)
#if 0
#define BL_TRACE_QUIET(sym) bl_trace(sym, 1)
#else
#define BL_TRACE_QUIET(sym)
#endif
#define BL_TRACE_END bl_trace_end()
#else
#define BL_TRACE(str)
#define BL_TRACE(sym)
#define BL_TRACE_FROM(sym,q,t)
#define BL_TRACE_QUIET(sym)
#define BL_TRACE_FROM_QUIET(sym,q,t)
#define BL_TRACE_END
#endif
#if defined(__cplusplus) || defined(__cilkplusplus)
......
......@@ -257,9 +257,9 @@ static void test_loader(DB **dbs)
poll_count=0;
// close the loader
printf("%9.6fs closing", elapsed_time()); fflush(stdout);
printf("%9.6fs closing\n", elapsed_time());
r = loader->close(loader);
printf(" done\n");
printf("%9.6fs done\n", elapsed_time());
CKERR2s(r,0,TOKUDB_CANCELED);
if (r==0) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment