Commit 3b586f92 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Merge the finer-grained progress report for the loader. Refs #2462. [t:2462]

{{{
svn merge -r 18775:18791 https://svn.tokutek.com/tokudb/toku/tokudb.2462
}}}
.


git-svn-id: file:///svn/toku/tokudb@18792 c7de825b-a66e-492c-adef-691d508d4ae1
parent 04074c9f
...@@ -10,6 +10,7 @@ struct file_info { ...@@ -10,6 +10,7 @@ struct file_info {
BOOL is_extant; // if true, the file must be unlinked. BOOL is_extant; // if true, the file must be unlinked.
char *fname; char *fname;
FILE *file; FILE *file;
u_int64_t n_rows; // how many rows were written into that file
}; };
struct file_infos { struct file_infos {
int n_files; int n_files;
...@@ -36,6 +37,8 @@ struct brtloader_s { ...@@ -36,6 +37,8 @@ struct brtloader_s {
const struct descriptor **descriptors; // N of these const struct descriptor **descriptors; // N of these
const char **new_fnames_in_env; // the file names that the final data will be written to (relative to env). const char **new_fnames_in_env; // the file names that the final data will be written to (relative to env).
u_int64_t n_rows; // how many rows have been put?
const char *temp_file_template; const char *temp_file_template;
FIDX fprimary_rows; // the file index (in the file_infos) for the data FIDX fprimary_rows; // the file index (in the file_infos) for the data
FIDX fprimary_idx; // the file index for the index FIDX fprimary_idx; // the file index for the index
...@@ -43,6 +46,16 @@ struct brtloader_s { ...@@ -43,6 +46,16 @@ struct brtloader_s {
CACHETABLE cachetable; CACHETABLE cachetable;
/* To make it easier to recover from errors, we don't use FILE*, instead we use an index into the file_infos. */ /* To make it easier to recover from errors, we don't use FILE*, instead we use an index into the file_infos. */
struct file_infos file_infos; struct file_infos file_infos;
#define PROGRESS_MAX (1<<16)
int progress; // Progress runs from 0 to PROGRESS_MAX. When we call the poll function we convert to a float from 0.0 to 1.0
// We use an integer so that we can add to the progress using a fetch-and-add instruction.
// These two are set in the close function, and used while running close
int (*poll_function)(void *extra, float progress);
void *poll_extra;
int user_said_stop; // 0 if the poll_function always returned zero. If it ever returns nonzero, then store that value here.
}; };
/* These data structures are used for manipulating a collection of rows in main memory. */ /* These data structures are used for manipulating a collection of rows in main memory. */
...@@ -87,9 +100,9 @@ void init_merge_fileset (struct merge_fileset *fs); ...@@ -87,9 +100,9 @@ void init_merge_fileset (struct merge_fileset *fs);
void destroy_merge_fileset (struct merge_fileset *fs); void destroy_merge_fileset (struct merge_fileset *fs);
int sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, BRTLOADER bl, DB *dest_db, brt_compare_func, int sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, BRTLOADER bl, DB *dest_db, brt_compare_func,
struct error_callback_s *error_callback); struct error_callback_s *error_callback, int progress_allocation);
int merge_files (struct merge_fileset *fs, BRTLOADER bl, DB *dest_db, brt_compare_func, struct error_callback_s *); int merge_files (struct merge_fileset *fs, BRTLOADER bl, DB *dest_db, brt_compare_func, struct error_callback_s *, int progress_allocation);
int write_file_to_dbfile (int outfile, FIDX infile, BRTLOADER bl, const struct descriptor *descriptor); int write_file_to_dbfile (int outfile, FIDX infile, BRTLOADER bl, const struct descriptor *descriptor, int progress_allocation);
int brtloader_init_file_infos (struct file_infos *fi); int brtloader_init_file_infos (struct file_infos *fi);
void brtloader_fi_destroy (struct file_infos *fi, BOOL is_error); void brtloader_fi_destroy (struct file_infos *fi, BOOL is_error);
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include <toku_portability.h> #include <toku_portability.h>
#if !TOKU_WINDOWS #if !TOKU_WINDOWS
...@@ -80,6 +85,7 @@ static void open_file_add (struct file_infos *fi, ...@@ -80,6 +85,7 @@ static void open_file_add (struct file_infos *fi,
fi->file_infos[fi->n_files].is_extant = TRUE; fi->file_infos[fi->n_files].is_extant = TRUE;
fi->file_infos[fi->n_files].fname = fname; fi->file_infos[fi->n_files].fname = fname;
fi->file_infos[fi->n_files].file = file; fi->file_infos[fi->n_files].file = file;
fi->file_infos[fi->n_files].n_rows = 0;
idx->idx = fi->n_files; idx->idx = fi->n_files;
fi->n_files++; fi->n_files++;
fi->n_files_extant++; fi->n_files_extant++;
...@@ -209,6 +215,10 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp, ...@@ -209,6 +215,10 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp,
{ int r = brtloader_open_temp_file(bl, &bl->fprimary_rows); if (r!=0) return r; } { int r = brtloader_open_temp_file(bl, &bl->fprimary_rows); if (r!=0) return r; }
{ int r = brtloader_open_temp_file(bl, &bl->fprimary_idx); if (r!=0) return r; } { int r = brtloader_open_temp_file(bl, &bl->fprimary_idx); if (r!=0) return r; }
bl->fprimary_offset = 0; bl->fprimary_offset = 0;
bl->n_rows = 0;
bl->progress = 0;
*blp = bl; *blp = bl;
return 0; return 0;
...@@ -319,6 +329,8 @@ int loader_write_row(DBT *key, DBT *val, FIDX data, FIDX idx, u_int64_t *dataoff ...@@ -319,6 +329,8 @@ int loader_write_row(DBT *key, DBT *val, FIDX data, FIDX idx, u_int64_t *dataoff
// we have a chance to handle the errors because when we close we can delete all the files. // we have a chance to handle the errors because when we close we can delete all the files.
if ((r=bl_write_dbt(key, data, dataoff, bl))) return r; if ((r=bl_write_dbt(key, data, dataoff, bl))) return r;
if ((r=bl_write_dbt(val, data, dataoff, bl))) return r; if ((r=bl_write_dbt(val, data, dataoff, bl))) return r;
bl->file_infos.file_infos[data.idx].n_rows++;
bl->file_infos.file_infos[idx .idx].n_rows++;
return 0; return 0;
} }
...@@ -328,6 +340,7 @@ int toku_brt_loader_put (BRTLOADER bl, DBT *key, DBT *val) ...@@ -328,6 +340,7 @@ int toku_brt_loader_put (BRTLOADER bl, DBT *key, DBT *val)
*/ */
{ {
if (bl->panic) return EINVAL; // previous panic if (bl->panic) return EINVAL; // previous panic
bl->n_rows++;
return loader_write_row(key, val, bl->fprimary_rows, bl->fprimary_idx, &bl->fprimary_offset, bl); return loader_write_row(key, val, bl->fprimary_rows, bl->fprimary_idx, &bl->fprimary_offset, bl);
} }
...@@ -357,7 +370,6 @@ int loader_read_row (FIDX f, DBT *key, DBT *val, BRTLOADER bl) ...@@ -357,7 +370,6 @@ int loader_read_row (FIDX f, DBT *key, DBT *val, BRTLOADER bl)
//#define SIZE_FACTOR 1 //#define SIZE_FACTOR 1
#define SIZE_FACTOR 1024 #define SIZE_FACTOR 1024
int init_rowset (struct rowset *rows) int init_rowset (struct rowset *rows)
/* Effect: Initialize a collection of rows to be empty. */ /* Effect: Initialize a collection of rows to be empty. */
{ {
...@@ -567,8 +579,20 @@ static int extend_fileset (BRTLOADER bl, struct merge_fileset *fs, FIDX*ffile, F ...@@ -567,8 +579,20 @@ static int extend_fileset (BRTLOADER bl, struct merge_fileset *fs, FIDX*ffile, F
return 0; return 0;
} }
static int update_progress (int N,
BRTLOADER bl,
const char *UU(message))
{
// Need a cilk lock here if we call this function from inside cilk.
bl->progress+=N;
//printf(" %20s: %d ", message, bl->progress);
if (bl->poll_function)
return bl->poll_function(bl->poll_extra, (float)bl->progress/(float)PROGRESS_MAX);
else return 0;
}
int sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, BRTLOADER bl, DB *dest_db, brt_compare_func compare, int sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, BRTLOADER bl, DB *dest_db, brt_compare_func compare,
struct error_callback_s *error_callback) struct error_callback_s *error_callback, int progress_allocation)
/* Effect: Given a rowset, sort it and write it to a temporary file. /* Effect: Given a rowset, sort it and write it to a temporary file.
* Arguments: * Arguments:
* rows the rowset * rows the rowset
...@@ -579,6 +603,7 @@ int sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, BRTLOADE ...@@ -579,6 +603,7 @@ int sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, BRTLOADE
* Returns 0 on success, otherwise an error number. * Returns 0 on success, otherwise an error number.
*/ */
{ {
//printf(" sort_and_write use %d progress=%d fin at %d\n", progress_allocation, bl->progress, bl->progress+progress_allocation);
FIDX sfile, sidx; FIDX sfile, sidx;
u_int64_t soffset=0; u_int64_t soffset=0;
// TODO: erase the files, and deal with all the cleanup on error paths // TODO: erase the files, and deal with all the cleanup on error paths
...@@ -586,6 +611,10 @@ int sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, BRTLOADE ...@@ -586,6 +611,10 @@ int sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, BRTLOADE
if (r!=0) { if (r!=0) {
return r; return r;
} }
r = update_progress(progress_allocation/2, bl, "sorted rows");
progress_allocation -= progress_allocation/2;
if (r!=0) return r;
r = extend_fileset(bl, fs, &sfile, &sidx); r = extend_fileset(bl, fs, &sfile, &sidx);
if (r!=0) return r; if (r!=0) return r;
for (size_t i=0; i<rows->n_rows; i++) { for (size_t i=0; i<rows->n_rows; i++) {
...@@ -597,10 +626,10 @@ int sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, BRTLOADE ...@@ -597,10 +626,10 @@ int sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, BRTLOADE
r = brtloader_fi_close(&bl->file_infos, sfile); if (r!=0) return r; r = brtloader_fi_close(&bl->file_infos, sfile); if (r!=0) return r;
r = brtloader_fi_close(&bl->file_infos, sidx); if (r!=0) return r; r = brtloader_fi_close(&bl->file_infos, sidx); if (r!=0) return r;
return 0; return update_progress(progress_allocation, bl, "wrote sorted");
} }
static int merge_some_files (FIDX dest_data, FIDX dest_idx, int n_sources, FIDX srcs_data[/*n_sources*/], FIDX srcs_idx[/*n_sources*/], BRTLOADER bl, DB *dest_db, brt_compare_func compare, struct error_callback_s *error_callback) static int merge_some_files (FIDX dest_data, FIDX dest_idx, int n_sources, FIDX srcs_data[/*n_sources*/], FIDX srcs_idx[/*n_sources*/], BRTLOADER bl, DB *dest_db, brt_compare_func compare, struct error_callback_s *error_callback, int progress_allocation)
/* Effect: Given an array of FILE*'s each containing sorted, merge the data and write it to dest. All the files remain open after the merge. /* Effect: Given an array of FILE*'s each containing sorted, merge the data and write it to dest. All the files remain open after the merge.
* This merge is performed in one pass, so don't pass too many files in. If you need a tree of merges do it elsewhere. * This merge is performed in one pass, so don't pass too many files in. If you need a tree of merges do it elsewhere.
* Modifies: May modify the arrays of files (but if modified, it must be a permutation so the caller can use that array to close everything.) * Modifies: May modify the arrays of files (but if modified, it must be a permutation so the caller can use that array to close everything.)
...@@ -617,6 +646,7 @@ static int merge_some_files (FIDX dest_data, FIDX dest_idx, int n_sources, FIDX ...@@ -617,6 +646,7 @@ static int merge_some_files (FIDX dest_data, FIDX dest_idx, int n_sources, FIDX
* Return value: 0 on success, otherwise an error number. * Return value: 0 on success, otherwise an error number.
*/ */
{ {
//printf(" merge_some_files progress=%d fin at %d\n", bl->progress, bl->progress+progress_allocation);
// We'll use a really stupid heap: O(n) time per pop instead of O(log n), because we need to get this working soon. ??? // We'll use a really stupid heap: O(n) time per pop instead of O(log n), because we need to get this working soon. ???
FIDX datas[n_sources]; FIDX datas[n_sources];
FIDX idxs [n_sources]; FIDX idxs [n_sources];
...@@ -628,13 +658,17 @@ static int merge_some_files (FIDX dest_data, FIDX dest_idx, int n_sources, FIDX ...@@ -628,13 +658,17 @@ static int merge_some_files (FIDX dest_data, FIDX dest_idx, int n_sources, FIDX
keys[i] = vals[i] = zero; // fill these all in with zero so we can delete stuff more reliably. keys[i] = vals[i] = zero; // fill these all in with zero so we can delete stuff more reliably.
datas[i] = idxs[i] = FIDX_NULL; datas[i] = idxs[i] = FIDX_NULL;
} }
u_int64_t n_rows = 0;
for (int i=0; i<n_sources; i++) { for (int i=0; i<n_sources; i++) {
datas[i] = srcs_data[i]; datas[i] = srcs_data[i];
idxs [i] = srcs_idx[i]; idxs [i] = srcs_idx[i];
int r = loader_read_row(datas[i], &keys[i], &vals[i], bl); int r = loader_read_row(datas[i], &keys[i], &vals[i], bl);
if (r!=0) return r; if (r!=0) return r;
dataoff[i] = 0; dataoff[i] = 0;
n_rows += bl->file_infos.file_infos[datas[i].idx].n_rows;
} }
u_int64_t n_rows_done = 0;
//printf(" n_rows=%ld\n", n_rows);
while (n_sources>0) { while (n_sources>0) {
int mini=0; int mini=0;
for (int j=1; j<n_sources; j++) { for (int j=1; j<n_sources; j++) {
...@@ -679,8 +713,20 @@ static int merge_some_files (FIDX dest_data, FIDX dest_idx, int n_sources, FIDX ...@@ -679,8 +713,20 @@ static int merge_some_files (FIDX dest_data, FIDX dest_idx, int n_sources, FIDX
} }
} }
} }
n_rows_done++;
const u_int64_t rows_per_report = SIZE_FACTOR*1024;
if (n_rows_done%rows_per_report==0) {
// need to update the progress.
double fraction_of_remaining_we_just_did = (double)rows_per_report / (double)(n_rows - n_rows_done + rows_per_report);
assert(0<= fraction_of_remaining_we_just_did && fraction_of_remaining_we_just_did<=1);
int progress_just_done = fraction_of_remaining_we_just_did * progress_allocation;
progress_allocation -= progress_just_done;
int r = update_progress(progress_just_done, bl, "in file merge");
if (r!=0) return r;
} }
return 0; }
return update_progress(progress_allocation, bl, "end of merge_some_files");
} }
static int int_min (int a, int b) static int int_min (int a, int b)
...@@ -689,7 +735,16 @@ static int int_min (int a, int b) ...@@ -689,7 +735,16 @@ static int int_min (int a, int b)
else return b; else return b;
} }
int merge_files (struct merge_fileset *fs, BRTLOADER bl, DB *dest_db, brt_compare_func compare, struct error_callback_s *error_callback) static int n_passes (int N, int B) {
int result = 0;
while (N>1) {
N = (N+B-1)/B;
result++;
}
return result;
}
int merge_files (struct merge_fileset *fs, BRTLOADER bl, DB *dest_db, brt_compare_func compare, struct error_callback_s *error_callback, int progress_allocation)
/* Effect: Given a fileset, merge all the files into one file. At the end the fileset will have one file in it. /* Effect: Given a fileset, merge all the files into one file. At the end the fileset will have one file in it.
* All the other files will be closed and unlinked. * All the other files will be closed and unlinked.
* Return value: 0 on success, otherwise an error number. * Return value: 0 on success, otherwise an error number.
...@@ -697,15 +752,28 @@ int merge_files (struct merge_fileset *fs, BRTLOADER bl, DB *dest_db, brt_compar ...@@ -697,15 +752,28 @@ int merge_files (struct merge_fileset *fs, BRTLOADER bl, DB *dest_db, brt_compar
* (however the fs will still need to be deallocated.) * (however the fs will still need to be deallocated.)
*/ */
{ {
//printf(" merge_files use %d progress=%d fin at %d\n", progress_allocation, bl->progress, bl->progress+progress_allocation);
const int mergelimit = (SIZE_FACTOR == 1) ? 4 : 256;
int n_passes_left = n_passes(fs->n_temp_files, mergelimit);
//printf("%d files, %d per pass, %d passes\n", fs->n_temp_files, mergelimit, n_passes_left);
int r = 0; int r = 0;
while (fs->n_temp_files!=1) { while (fs->n_temp_files!=1) {
assert(n_passes_left>0);
int progress_allocation_for_this_pass = progress_allocation/n_passes_left;
progress_allocation -= progress_allocation_for_this_pass;
assert(fs->n_temp_files>0); assert(fs->n_temp_files>0);
struct merge_fileset next_file_set; struct merge_fileset next_file_set;
init_merge_fileset(&next_file_set); init_merge_fileset(&next_file_set);
while (fs->n_temp_files>0) { while (fs->n_temp_files>0) {
// grab some files and merge them. // grab some files and merge them.
const int mergelimit = 256;
int n_to_merge = int_min(mergelimit, fs->n_temp_files); int n_to_merge = int_min(mergelimit, fs->n_temp_files);
// We are about to do n_to_merge/n_temp_files of the remaining for this pass.
int progress_allocation_for_this_subpass = progress_allocation_for_this_pass * (double)n_to_merge / (double)fs->n_temp_files;
progress_allocation_for_this_pass -= progress_allocation_for_this_subpass;
FIDX *MALLOC_N(n_to_merge, datafiles); FIDX *MALLOC_N(n_to_merge, datafiles);
FIDX *MALLOC_N(n_to_merge, idxfiles); FIDX *MALLOC_N(n_to_merge, idxfiles);
for (int i=0; i<n_to_merge; i++) datafiles[i] = idxfiles[i] = FIDX_NULL; for (int i=0; i<n_to_merge; i++) datafiles[i] = idxfiles[i] = FIDX_NULL;
...@@ -717,14 +785,19 @@ int merge_files (struct merge_fileset *fs, BRTLOADER bl, DB *dest_db, brt_compar ...@@ -717,14 +785,19 @@ int merge_files (struct merge_fileset *fs, BRTLOADER bl, DB *dest_db, brt_compar
r = brtloader_fi_reopen(&bl->file_infos, idxfiles[i], "r"); if (r) goto error; r = brtloader_fi_reopen(&bl->file_infos, idxfiles[i], "r"); if (r) goto error;
} }
FIDX merged_data, merged_idx; FIDX merged_data, merged_idx;
r = extend_fileset(bl, &next_file_set, &merged_data, &merged_idx); if (r!=0) goto error; r = extend_fileset(bl, &next_file_set, &merged_data, &merged_idx);
r = merge_some_files(merged_data, merged_idx, n_to_merge, datafiles, idxfiles, bl, dest_db, compare, error_callback); if (r!=0) goto error; if (r!=0) goto error;
r = merge_some_files(merged_data, merged_idx, n_to_merge, datafiles, idxfiles, bl, dest_db, compare, error_callback, progress_allocation_for_this_subpass);
if (r!=0) goto error;
for (int i=0; i<n_to_merge; i++) { for (int i=0; i<n_to_merge; i++) {
r = brtloader_fi_close(&bl->file_infos, datafiles[i]); if (r!=0) goto error; r = brtloader_fi_close(&bl->file_infos, datafiles[i]); if (r!=0) goto error;
r = brtloader_fi_close(&bl->file_infos, idxfiles[i]); if (r!=0) goto error; r = brtloader_fi_close(&bl->file_infos, idxfiles[i]); if (r!=0) goto error;
r = brtloader_fi_unlink(&bl->file_infos, datafiles[i]); if (r!=0) goto error; r = brtloader_fi_unlink(&bl->file_infos, datafiles[i]); if (r!=0) goto error;
r = brtloader_fi_unlink(&bl->file_infos, idxfiles[i]); if (r!=0) goto error; r = brtloader_fi_unlink(&bl->file_infos, idxfiles[i]); if (r!=0) goto error;
} }
fs->n_temp_files -= n_to_merge; fs->n_temp_files -= n_to_merge;
r = brtloader_fi_close(&bl->file_infos, merged_data); assert(r==0); r = brtloader_fi_close(&bl->file_infos, merged_data); assert(r==0);
r = brtloader_fi_close(&bl->file_infos, merged_idx); assert(r==0); r = brtloader_fi_close(&bl->file_infos, merged_idx); assert(r==0);
...@@ -743,8 +816,14 @@ int merge_files (struct merge_fileset *fs, BRTLOADER bl, DB *dest_db, brt_compar ...@@ -743,8 +816,14 @@ int merge_files (struct merge_fileset *fs, BRTLOADER bl, DB *dest_db, brt_compar
toku_free(fs->data_fidxs); toku_free(fs->data_fidxs);
toku_free(fs->idx_fidxs); toku_free(fs->idx_fidxs);
*fs = next_file_set; *fs = next_file_set;
// Update the progress
n_passes_left--;
r = update_progress(progress_allocation_for_this_pass, bl, "merging files");
if (r!=0) return r;
} }
return 0; assert(n_passes_left == 0);
return update_progress(progress_allocation, bl, "did merge_files");
} }
static int loader_do_i (BRTLOADER bl, static int loader_do_i (BRTLOADER bl,
...@@ -753,11 +832,13 @@ static int loader_do_i (BRTLOADER bl, ...@@ -753,11 +832,13 @@ static int loader_do_i (BRTLOADER bl,
const struct descriptor *descriptor, const struct descriptor *descriptor,
const char *new_fname, const char *new_fname,
int which_db, int which_db,
void (*error_callback)(DB *, int which_db, int err, DBT *key, DBT *val, void *extra), void (*error_callback)(DB *, int which_db, int err, DBT *key, DBT *val, void *extra), void *error_callback_extra,
void *error_callback_extra int progress_allocation // how much progress do I need to add into bl->progress by the end..
) )
/* Effect: Handle the file creating for one particular DB in the bulk loader. */ /* Effect: Handle the file creating for one particular DB in the bulk loader. */
{ {
int expect_progress_at_end = bl->progress+progress_allocation;
//printf("doing i use %d progress=%d fin at %d\n", progress_allocation, bl->progress, bl->progress+progress_allocation);
int r = fseek(bl_fidx2file(bl, bl->fprimary_rows), 0, SEEK_SET); int r = fseek(bl_fidx2file(bl, bl->fprimary_rows), 0, SEEK_SET);
if (r!=0) return errno; if (r!=0) return errno;
DBT pkey={.data=0, .flags=DB_DBT_REALLOC, .size=0, .ulen=0}; DBT pkey={.data=0, .flags=DB_DBT_REALLOC, .size=0, .ulen=0};
...@@ -773,14 +854,32 @@ static int loader_do_i (BRTLOADER bl, ...@@ -773,14 +854,32 @@ static int loader_do_i (BRTLOADER bl,
.db = dest_db, .db = dest_db,
.which_db = which_db, .which_db = which_db,
.extra = error_callback_extra}; .extra = error_callback_extra};
int allocation_for_read_pass = progress_allocation/4;
progress_allocation -= allocation_for_read_pass;
u_int64_t previous_n_rows_remaining = bl->n_rows;
u_int64_t n_rows_remaining = bl->n_rows;
while (0==(r=loader_read_row(bl->fprimary_rows, &pkey, &pval, bl))) { while (0==(r=loader_read_row(bl->fprimary_rows, &pkey, &pval, bl))) {
r = bl->generate_row_for_put(dest_db, bl->src_db, &skey, &sval, &pkey, &pval, NULL); r = bl->generate_row_for_put(dest_db, bl->src_db, &skey, &sval, &pkey, &pval, NULL);
assert(r==0); assert(r==0);
if (row_wont_fit(&rows, skey.size + sval.size)) { if (row_wont_fit(&rows, skey.size + sval.size)) {
r = sort_and_write_rows(&rows, &fs, bl, dest_db, compare, &ec);
// divide the progress into a piece for sort_and_write_rows
u_int64_t n_rows_handled_now = previous_n_rows_remaining - n_rows_remaining;
// we had allocation_for_read_pass left over to do previous_n_rows_remaining
// We did n_rows_handled_now.
int progress_this_sort = allocation_for_read_pass * (double)n_rows_handled_now / (double)previous_n_rows_remaining;
allocation_for_read_pass -= progress_this_sort;
previous_n_rows_remaining = n_rows_remaining;
//printf("rows.n_rows=%ld\n", rows.n_rows);
r = sort_and_write_rows(&rows, &fs, bl, dest_db, compare, &ec, progress_this_sort);
if (r!=0) goto error; if (r!=0) goto error;
reset_rows(&rows); reset_rows(&rows);
} }
add_row(&rows, &skey, &sval); add_row(&rows, &skey, &sval);
...@@ -796,7 +895,10 @@ static int loader_do_i (BRTLOADER bl, ...@@ -796,7 +895,10 @@ static int loader_do_i (BRTLOADER bl,
toku_init_dbt(&sval); toku_init_dbt(&sval);
sval.flags = DB_DBT_REALLOC; sval.flags = DB_DBT_REALLOC;
} }
n_rows_remaining--;
} }
{ // clean up this stuff early, to save memory { // clean up this stuff early, to save memory
toku_free(skey.data); toku_free(skey.data);
toku_free(sval.data); toku_free(sval.data);
...@@ -806,7 +908,7 @@ static int loader_do_i (BRTLOADER bl, ...@@ -806,7 +908,7 @@ static int loader_do_i (BRTLOADER bl,
} }
if (rows.n_rows > 0) { if (rows.n_rows > 0) {
r = sort_and_write_rows(&rows, &fs, bl, dest_db, compare, &ec); r = sort_and_write_rows(&rows, &fs, bl, dest_db, compare, &ec, allocation_for_read_pass);
if (r!=0) goto error; if (r!=0) goto error;
} }
{ {
...@@ -816,7 +918,10 @@ static int loader_do_i (BRTLOADER bl, ...@@ -816,7 +918,10 @@ static int loader_do_i (BRTLOADER bl,
rows.data = NULL; //set to NULL so the final cleanup won't free them again. rows.data = NULL; //set to NULL so the final cleanup won't free them again.
rows.rows = NULL; rows.rows = NULL;
} }
r = merge_files(&fs, bl, dest_db, compare, &ec);
int allocation_for_merge = (2*progress_allocation)/3;
progress_allocation -= allocation_for_merge;
r = merge_files(&fs, bl, dest_db, compare, &ec, allocation_for_merge);
if (r!=0) goto error; if (r!=0) goto error;
// Now it's down to one file. Need to write the data out. The file is in fs. // Now it's down to one file. Need to write the data out. The file is in fs.
...@@ -826,7 +931,7 @@ static int loader_do_i (BRTLOADER bl, ...@@ -826,7 +931,7 @@ static int loader_do_i (BRTLOADER bl,
assert(fs.n_temp_files==1); assert(fs.n_temp_files==1);
r = brtloader_fi_reopen(&bl->file_infos, fs.data_fidxs[0], "r"); r = brtloader_fi_reopen(&bl->file_infos, fs.data_fidxs[0], "r");
if (r) goto error; if (r) goto error;
r = write_file_to_dbfile(fd, fs.data_fidxs[0], bl, descriptor); r = write_file_to_dbfile(fd, fs.data_fidxs[0], bl, descriptor, progress_allocation);
if (r) goto error; if (r) goto error;
r = fsync(fd); r = fsync(fd);
if (r) { r=errno; goto error; } if (r) { r=errno; goto error; }
...@@ -838,6 +943,8 @@ static int loader_do_i (BRTLOADER bl, ...@@ -838,6 +943,8 @@ static int loader_do_i (BRTLOADER bl,
if (r) goto error; if (r) goto error;
r = brtloader_fi_unlink(&bl->file_infos, fs.idx_fidxs[0]); r = brtloader_fi_unlink(&bl->file_infos, fs.idx_fidxs[0]);
assert(expect_progress_at_end == bl->progress);
error: // this is the cleanup code. Even if r==0 (no error) we fall through to here. error: // this is the cleanup code. Even if r==0 (no error) we fall through to here.
// if we get here we need to free up the merge_fileset and the rowset, as well as the keys // if we get here we need to free up the merge_fileset and the rowset, as well as the keys
toku_free(rows.data); toku_free(rows.data);
...@@ -859,16 +966,25 @@ int toku_brt_loader_close (BRTLOADER bl, ...@@ -859,16 +966,25 @@ int toku_brt_loader_close (BRTLOADER bl,
* Return all the file descriptors in the array fds. */ * Return all the file descriptors in the array fds. */
{ {
int result = 0; int result = 0;
int remaining_progress = PROGRESS_MAX;
bl->poll_function = poll_function;
bl->poll_extra = poll_extra;
for (int i=0; i<bl->N; i++) { for (int i=0; i<bl->N; i++) {
char * fname_in_cwd = toku_cachetable_get_fname_in_cwd(bl->cachetable, bl->new_fnames_in_env[i]); char * fname_in_cwd = toku_cachetable_get_fname_in_cwd(bl->cachetable, bl->new_fnames_in_env[i]);
result = loader_do_i(bl, bl->dbs[i], bl->bt_compare_funs[i], bl->descriptors[i], fname_in_cwd, i, error_callback, error_callback_extra); // Take the unallocated progress and divide it among the unfinished jobs.
// This calculation allocates all of the PROGRESS_MAX bits of progress to some job.
int allocate_here = remaining_progress/(bl->N - i);
remaining_progress -= allocate_here;
result = loader_do_i(bl, bl->dbs[i], bl->bt_compare_funs[i], bl->descriptors[i], fname_in_cwd, i, error_callback, error_callback_extra,
allocate_here
);
toku_free(fname_in_cwd); toku_free(fname_in_cwd);
if (result!=0) goto error; if (result!=0) goto error;
toku_free((void*)bl->new_fnames_in_env[i]); toku_free((void*)bl->new_fnames_in_env[i]);
bl->new_fnames_in_env[i] = NULL; bl->new_fnames_in_env[i] = NULL;
if (poll_function && poll_function(poll_extra, (float)i/(float)bl->N)) { assert(0<=bl->progress && bl->progress <= PROGRESS_MAX);
goto error; result = update_progress(0, bl, "did index");
} if (result) goto error;
} }
result = brtloader_fi_close (&bl->file_infos, bl->fprimary_rows); if (result) goto error; result = brtloader_fi_close (&bl->file_infos, bl->fprimary_rows); if (result) goto error;
result = brtloader_fi_unlink(&bl->file_infos, bl->fprimary_rows); if (result) goto error; result = brtloader_fi_unlink(&bl->file_infos, bl->fprimary_rows); if (result) goto error;
...@@ -876,6 +992,7 @@ int toku_brt_loader_close (BRTLOADER bl, ...@@ -876,6 +992,7 @@ int toku_brt_loader_close (BRTLOADER bl,
result = brtloader_fi_unlink(&bl->file_infos, bl->fprimary_idx); if (result) goto error; result = brtloader_fi_unlink(&bl->file_infos, bl->fprimary_idx); if (result) goto error;
assert(bl->file_infos.n_files_open == 0); assert(bl->file_infos.n_files_open == 0);
assert(bl->file_infos.n_files_extant == 0); assert(bl->file_infos.n_files_extant == 0);
assert(bl->progress == PROGRESS_MAX);
error: error:
brtloader_destroy(bl, result!=0); brtloader_destroy(bl, result!=0);
return result; return result;
...@@ -939,7 +1056,7 @@ struct leaf_buf { ...@@ -939,7 +1056,7 @@ struct leaf_buf {
int nkeys_p, ndata_p, dsize_p, n_in_buf_p; int nkeys_p, ndata_p, dsize_p, n_in_buf_p;
}; };
const int nodesize = 1<<15; const int nodesize = (SIZE_FACTOR==1) ? (1<<15) : (1<<22);
struct translation { struct translation {
int64_t off, size; int64_t off, size;
...@@ -1062,7 +1179,8 @@ static void seek_align(struct dbout *out) { ...@@ -1062,7 +1179,8 @@ static void seek_align(struct dbout *out) {
assert(out->current_off % alignment == 0); assert(out->current_off % alignment == 0);
} }
static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf) { static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progress_allocation, BRTLOADER bl) {
//printf(" finishing leaf node progress=%d fin at %d\n", bl->progress, bl->progress+progress_allocation);
//printf("local_fingerprint=%8x\n", lbuf->local_fingerprint); //printf("local_fingerprint=%8x\n", lbuf->local_fingerprint);
putbuf_int32_at(&lbuf->dbuf, lbuf->local_fingerprint_p, lbuf->local_fingerprint); putbuf_int32_at(&lbuf->dbuf, lbuf->local_fingerprint_p, lbuf->local_fingerprint);
putbuf_int64_at(&lbuf->dbuf, lbuf->nkeys_p, lbuf->nkeys); putbuf_int64_at(&lbuf->dbuf, lbuf->nkeys_p, lbuf->nkeys);
...@@ -1129,6 +1247,10 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf) { ...@@ -1129,6 +1247,10 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf) {
toku_free(compressed_buf); toku_free(compressed_buf);
dbuf_destroy(&lbuf->dbuf); dbuf_destroy(&lbuf->dbuf);
toku_free(lbuf); toku_free(lbuf);
//printf("Nodewrite %d (%.1f%%):", progress_allocation, 100.0*progress_allocation/PROGRESS_MAX);
int r = update_progress(progress_allocation, bl, "wrote node");
if (r!=0) bl->user_said_stop = r;
} }
static int write_translation_table (struct dbout *out, long long *off_of_translation_p) { static int write_translation_table (struct dbout *out, long long *off_of_translation_p) {
...@@ -1427,7 +1549,8 @@ int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, struct s ...@@ -1427,7 +1549,8 @@ int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, struct s
return 0; return 0;
} }
int write_file_to_dbfile (int outfile, FIDX infile, BRTLOADER bl, const struct descriptor *descriptor) { int write_file_to_dbfile (int outfile, FIDX infile, BRTLOADER bl, const struct descriptor *descriptor, int progress_allocation) {
//printf(" write_file_to_dbfile use %d at %d fin at %d\n", progress_allocation, bl->progress, bl->progress+progress_allocation);
// The pivots file will contain all the pivot strings (in the form <size(32bits)> <data>) // The pivots file will contain all the pivot strings (in the form <size(32bits)> <data>)
// The pivots_fname is the name of the pivots file. // The pivots_fname is the name of the pivots file.
// Note that the pivots file will have one extra pivot in it (the last key in the dictionary) which will not appear in the tree. // Note that the pivots file will have one extra pivot in it (the last key in the dictionary) which will not appear in the tree.
...@@ -1458,9 +1581,17 @@ int write_file_to_dbfile (int outfile, FIDX infile, BRTLOADER bl, const struct d ...@@ -1458,9 +1581,17 @@ int write_file_to_dbfile (int outfile, FIDX infile, BRTLOADER bl, const struct d
struct leaf_buf *lbuf = start_leaf(&out, descriptor, lblock); struct leaf_buf *lbuf = start_leaf(&out, descriptor, lblock);
struct subtree_estimates est = zero_estimates; struct subtree_estimates est = zero_estimates;
est.exact = TRUE; est.exact = TRUE;
u_int64_t n_rows_remaining = bl->n_rows;
u_int64_t old_n_rows_remaining = bl->n_rows;
while (0==loader_read_row(infile, &key, &val, bl)) { while (0==loader_read_row(infile, &key, &val, bl)) {
if (bl->user_said_stop) return bl->user_said_stop; // stops all those cilk subjobs if one of them got a "quit" from the poll.
if (lbuf->dbuf.off >= nodesize) { if (lbuf->dbuf.off >= nodesize) {
int progress_this_node = progress_allocation * (double)(old_n_rows_remaining - n_rows_remaining)/(double)old_n_rows_remaining;
progress_allocation -= progress_this_node;
old_n_rows_remaining = n_rows_remaining;
allocate_node(&sts, lblock, est, lbuf->local_fingerprint); allocate_node(&sts, lblock, est, lbuf->local_fingerprint);
n_pivots++; n_pivots++;
...@@ -1468,7 +1599,7 @@ int write_file_to_dbfile (int outfile, FIDX infile, BRTLOADER bl, const struct d ...@@ -1468,7 +1599,7 @@ int write_file_to_dbfile (int outfile, FIDX infile, BRTLOADER bl, const struct d
if ((r=bl_write_dbt(&key, pivots_file, NULL, bl))) return r; if ((r=bl_write_dbt(&key, pivots_file, NULL, bl))) return r;
struct leaf_buf *writeit = lbuf; struct leaf_buf *writeit = lbuf;
/*cilk_spawn*/ finish_leafnode(&out, writeit); /*cilk_spawn*/ finish_leafnode(&out, writeit, progress_this_node, bl);
lblock = allocate_block(&out); lblock = allocate_block(&out);
lbuf = start_leaf(&out, descriptor, lblock); lbuf = start_leaf(&out, descriptor, lblock);
...@@ -1477,7 +1608,9 @@ int write_file_to_dbfile (int outfile, FIDX infile, BRTLOADER bl, const struct d ...@@ -1477,7 +1608,9 @@ int write_file_to_dbfile (int outfile, FIDX infile, BRTLOADER bl, const struct d
est.nkeys++; est.nkeys++;
est.ndata++; est.ndata++;
est.dsize+=key.size + val.size; est.dsize+=key.size + val.size;
n_rows_remaining--;
} }
if (bl->user_said_stop) return bl->user_said_stop; // stops all those cilk subjobs if one of them got a "quit" from the poll.
allocate_node(&sts, lblock, est, lbuf->local_fingerprint); allocate_node(&sts, lblock, est, lbuf->local_fingerprint);
...@@ -1487,7 +1620,8 @@ int write_file_to_dbfile (int outfile, FIDX infile, BRTLOADER bl, const struct d ...@@ -1487,7 +1620,8 @@ int write_file_to_dbfile (int outfile, FIDX infile, BRTLOADER bl, const struct d
if ((r=bl_write_dbt(&key, pivots_file, NULL, bl))) return r; if ((r=bl_write_dbt(&key, pivots_file, NULL, bl))) return r;
} }
finish_leafnode(&out, lbuf); finish_leafnode(&out, lbuf, progress_allocation/2, bl);
progress_allocation -= progress_allocation/2;
{ {
int r = write_nonleaves(bl, pivots_file, &out, &sts, descriptor); int r = write_nonleaves(bl, pivots_file, &out, &sts, descriptor);
...@@ -1527,5 +1661,5 @@ int write_file_to_dbfile (int outfile, FIDX infile, BRTLOADER bl, const struct d ...@@ -1527,5 +1661,5 @@ int write_file_to_dbfile (int outfile, FIDX infile, BRTLOADER bl, const struct d
if (key.data) toku_free(key.data); if (key.data) toku_free(key.data);
if (val.data) toku_free(val.data); if (val.data) toku_free(val.data);
if (out.translation) toku_free(out.translation); if (out.translation) toku_free(out.translation);
return 0; return update_progress(progress_allocation, bl, "wrote tdb file");
} }
...@@ -259,14 +259,14 @@ static void test_merge_files (char *template) { ...@@ -259,14 +259,14 @@ static void test_merge_files (char *template) {
struct error_callback_s cb; struct error_callback_s cb;
cb.error_callback = err_cb; cb.error_callback = err_cb;
r = sort_and_write_rows(&aset, &fs, &bl, dest_db, compare_ints, &cb); CKERR(r); r = sort_and_write_rows(&aset, &fs, &bl, dest_db, compare_ints, &cb, 0); CKERR(r);
r = sort_and_write_rows(&bset, &fs, &bl, dest_db, compare_ints, &cb); CKERR(r); r = sort_and_write_rows(&bset, &fs, &bl, dest_db, compare_ints, &cb, 0); CKERR(r);
assert(fs.n_temp_files==2 && fs.n_temp_files_limit >= fs.n_temp_files); assert(fs.n_temp_files==2 && fs.n_temp_files_limit >= fs.n_temp_files);
destroy_rowset(&aset); destroy_rowset(&aset);
destroy_rowset(&bset); destroy_rowset(&bset);
for (int i=0; i<2; i++) assert(fs.data_fidxs[i].idx != -1 && fs.idx_fidxs[i].idx != -1); for (int i=0; i<2; i++) assert(fs.data_fidxs[i].idx != -1 && fs.idx_fidxs[i].idx != -1);
r = merge_files(&fs, &bl, dest_db, compare_ints, &cb); CKERR(r); r = merge_files(&fs, &bl, dest_db, compare_ints, &cb, 0); CKERR(r);
assert(fs.n_temp_files==1); assert(fs.n_temp_files==1);
...@@ -278,7 +278,7 @@ static void test_merge_files (char *template) { ...@@ -278,7 +278,7 @@ static void test_merge_files (char *template) {
fprintf(stderr, "Final data in %s\n", name); fprintf(stderr, "Final data in %s\n", name);
assert(r>=0); assert(r>=0);
struct descriptor desc = {.version = 1, .dbt = (DBT){.size = 4, .data="abcd"}}; struct descriptor desc = {.version = 1, .dbt = (DBT){.size = 4, .data="abcd"}};
r = write_file_to_dbfile(fd, inf, &bl, &desc); r = write_file_to_dbfile(fd, inf, &bl, &desc, 1);
CKERR(r); CKERR(r);
r = brtloader_fi_close(&bl.file_infos, inf); r = brtloader_fi_close(&bl.file_infos, inf);
CKERR(r); CKERR(r);
......
...@@ -180,9 +180,19 @@ static void check_results(DB **dbs) ...@@ -180,9 +180,19 @@ static void check_results(DB **dbs)
static void *expect_poll_void = &expect_poll_void; static void *expect_poll_void = &expect_poll_void;
static int poll_count=0; static int poll_count=0;
static int poll_function (void *extra, float progress) { static int poll_function (void *extra, float progress) {
//printf("progress: %5.1f%%\n", progress*100); if (0) {
static int did_one=0;
static struct timeval start;
struct timeval now;
gettimeofday(&now, 0);
if (!did_one) {
start=now;
did_one=1;
}
printf("%6.6f %5.1f%%\n", now.tv_sec - start.tv_sec + 1e-6*(now.tv_usec - start.tv_usec), progress*100);
}
assert(extra==expect_poll_void); assert(extra==expect_poll_void);
assert(0.0<=progress && progress<1.0); assert(0.0<=progress && progress<=1.0);
poll_count++; poll_count++;
return 0; return 0;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment