Commit 98f50f88 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

#2654 refs[t:2654] merge loader persort speedup to main tokudb

git-svn-id: file:///svn/toku/tokudb@22354 c7de825b-a66e-492c-adef-691d508d4ae1
parent 86130597
...@@ -62,6 +62,10 @@ int loader_write_row(DBT *key, DBT *val, FIDX data, FILE*, u_int64_t *dataoff, B ...@@ -62,6 +62,10 @@ int loader_write_row(DBT *key, DBT *val, FIDX data, FILE*, u_int64_t *dataoff, B
int loader_read_row (FILE *f, DBT *key, DBT *val); int loader_read_row (FILE *f, DBT *key, DBT *val);
struct merge_fileset { struct merge_fileset {
BOOL have_sorted_output; // Is there an previous key?
FIDX sorted_output; // this points to one of the data_fidxs. If output_is_sorted then this is the file containing sorted data. It's still open
DBT prev_key; // What is it? If it's here, its the last output in the merge fileset
int n_temp_files, n_temp_files_limit; int n_temp_files, n_temp_files_limit;
FIDX *data_fidxs; FIDX *data_fidxs;
}; };
...@@ -217,6 +221,7 @@ int brt_loader_write_file_to_dbfile (int outfile, FIDX infile, BRTLOADER bl, con ...@@ -217,6 +221,7 @@ int brt_loader_write_file_to_dbfile (int outfile, FIDX infile, BRTLOADER bl, con
int brtloader_init_file_infos (struct file_infos *fi); int brtloader_init_file_infos (struct file_infos *fi);
void brtloader_fi_destroy (struct file_infos *fi, BOOL is_error); void brtloader_fi_destroy (struct file_infos *fi, BOOL is_error);
int brtloader_fi_close (struct file_infos *fi, FIDX idx); int brtloader_fi_close (struct file_infos *fi, FIDX idx);
void brtloader_fi_close_all (struct file_infos *fi);
int brtloader_fi_reopen (struct file_infos *fi, FIDX idx, const char *mode); int brtloader_fi_reopen (struct file_infos *fi, FIDX idx, const char *mode);
int brtloader_fi_unlink (struct file_infos *fi, FIDX idx); int brtloader_fi_unlink (struct file_infos *fi, FIDX idx);
......
...@@ -257,6 +257,13 @@ int brtloader_fi_unlink (struct file_infos *fi, FIDX idx) { ...@@ -257,6 +257,13 @@ int brtloader_fi_unlink (struct file_infos *fi, FIDX idx) {
return result; return result;
} }
void brtloader_fi_close_all(struct file_infos *fi) {
for (int i = 0; i < fi->n_files; i++) {
FIDX idx = { i };
(void ) brtloader_fi_close(fi, idx);
}
}
int brtloader_open_temp_file (BRTLOADER bl, FIDX *file_idx) int brtloader_open_temp_file (BRTLOADER bl, FIDX *file_idx)
/* Effect: Open a temporary file in read-write mode. Save enough information to close and delete the file later. /* Effect: Open a temporary file in read-write mode. Save enough information to close and delete the file later.
* Return value: 0 on success, an error number otherwise. * Return value: 0 on success, an error number otherwise.
...@@ -982,11 +989,15 @@ static int finish_extractor (BRTLOADER bl) { ...@@ -982,11 +989,15 @@ static int finish_extractor (BRTLOADER bl) {
int r = queue_destroy(bl->primary_rowset_queue); int r = queue_destroy(bl->primary_rowset_queue);
invariant(r==0); invariant(r==0);
} }
brtloader_fi_close_all(&bl->file_infos);
//printf("%s:%d joined\n", __FILE__, __LINE__); //printf("%s:%d joined\n", __FILE__, __LINE__);
return 0; return 0;
} }
static const DBT zero_dbt = {0,0,0,0}; static const DBT zero_dbt = {0,0,0,0};
static DBT make_dbt (void *data, u_int32_t size) { static DBT make_dbt (void *data, u_int32_t size) {
DBT result = zero_dbt; DBT result = zero_dbt;
result.data = data; result.data = data;
...@@ -1348,6 +1359,11 @@ CILK_END ...@@ -1348,6 +1359,11 @@ CILK_END
void init_merge_fileset (struct merge_fileset *fs) void init_merge_fileset (struct merge_fileset *fs)
/* Effect: Initialize a fileset */ /* Effect: Initialize a fileset */
{ {
fs->have_sorted_output = FALSE;
fs->sorted_output = FIDX_NULL;
fs->prev_key = zero_dbt;
fs->prev_key.flags = DB_DBT_REALLOC;
fs->n_temp_files = 0; fs->n_temp_files = 0;
fs->n_temp_files_limit = 0; fs->n_temp_files_limit = 0;
fs->data_fidxs = NULL; fs->data_fidxs = NULL;
...@@ -1357,6 +1373,7 @@ void destroy_merge_fileset (struct merge_fileset *fs) ...@@ -1357,6 +1373,7 @@ void destroy_merge_fileset (struct merge_fileset *fs)
/* Effect: Destroy a fileset. */ /* Effect: Destroy a fileset. */
{ {
if ( fs ) { if ( fs ) {
toku_destroy_dbt(&fs->prev_key);
fs->n_temp_files = 0; fs->n_temp_files = 0;
fs->n_temp_files_limit = 0; fs->n_temp_files_limit = 0;
toku_free(fs->data_fidxs); toku_free(fs->data_fidxs);
...@@ -1416,6 +1433,21 @@ static int update_progress (int N, ...@@ -1416,6 +1433,21 @@ static int update_progress (int N,
return result; return result;
} }
static int write_rowset_to_file (BRTLOADER bl, FIDX sfile, const struct rowset rows) {
FILE *sstream = toku_bl_fidx2file(bl, sfile);
for (size_t i=0; i<rows.n_rows; i++) {
DBT skey = make_dbt(rows.data + rows.rows[i].off, rows.rows[i].klen);
DBT sval = make_dbt(rows.data + rows.rows[i].off + rows.rows[i].klen, rows.rows[i].vlen);
u_int64_t soffset=0; // don't really need this.
int r = loader_write_row(&skey, &sval, sfile, sstream, &soffset, bl);
if (r != 0) return r;
}
return 0;
}
CILK_BEGIN CILK_BEGIN
int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func compare) int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func compare)
/* Effect: Given a rowset, sort it and write it to a temporary file. /* Effect: Given a rowset, sort it and write it to a temporary file.
...@@ -1433,39 +1465,41 @@ int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, BRTLOADER ...@@ -1433,39 +1465,41 @@ int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, BRTLOADER
*/ */
{ {
//printf(" sort_and_write use %d progress=%d fin at %d\n", progress_allocation, bl->progress, bl->progress+progress_allocation); //printf(" sort_and_write use %d progress=%d fin at %d\n", progress_allocation, bl->progress, bl->progress+progress_allocation);
FIDX sfile = FIDX_NULL;
u_int64_t soffset=0;
// TODO: erase the files, and deal with all the cleanup on error paths // TODO: erase the files, and deal with all the cleanup on error paths
//printf("%s:%d sort_rows n_rows=%ld\n", __FILE__, __LINE__, rows->n_rows); //printf("%s:%d sort_rows n_rows=%ld\n", __FILE__, __LINE__, rows->n_rows);
//bl_time_t before_sort = bl_time_now(); //bl_time_t before_sort = bl_time_now();
int result = 0; int result;
int r = sort_rows(&rows, which_db, dest_db, compare, bl); if (rows.n_rows == 0) {
if (r != 0) result = r; result = 0;
} else {
result = sort_rows(&rows, which_db, dest_db, compare, bl);
//bl_time_t after_sort = bl_time_now(); //bl_time_t after_sort = bl_time_now();
if (result == 0) { if (result == 0) {
r = extend_fileset(bl, fs, &sfile); DBT akey = make_dbt(rows.data+rows.rows[0].off, rows.rows[0].klen);
if (r != 0) if (fs->have_sorted_output && compare(dest_db, &fs->prev_key, &akey)<0) {
result = r; // write everything to the same output.
else { result = write_rowset_to_file(bl, fs->sorted_output, rows);
} else {
FILE *sstream = toku_bl_fidx2file(bl, sfile); FIDX sfile = FIDX_NULL;
for (size_t i=0; i<rows.n_rows; i++) { result = extend_fileset(bl, fs, &sfile);
DBT skey = make_dbt(rows.data + rows.rows[i].off, rows.rows[i].klen); if (result == 0) {
DBT sval = make_dbt(rows.data + rows.rows[i].off + rows.rows[i].klen, rows.rows[i].vlen); if (fs->have_sorted_output) {
fs->have_sorted_output = FALSE;
r = loader_write_row(&skey, &sval, sfile, sstream, &soffset, bl); result = brtloader_fi_close(&bl->file_infos, fs->sorted_output);
if (r != 0) { }
result = r; if (result == 0) {
break; result = write_rowset_to_file(bl, sfile, rows);
if (result == 0) {
fs->have_sorted_output = TRUE; fs->sorted_output = sfile;
result = toku_dbt_set(rows.rows[rows.n_rows-1].klen, rows.data + rows.rows[rows.n_rows-1].off, &fs->prev_key, NULL);
}
}
} }
} }
r = brtloader_fi_close(&bl->file_infos, sfile);
if (r != 0) result = r;
} }
} }
...@@ -1834,8 +1868,7 @@ int merge_files (struct merge_fileset *fs, ...@@ -1834,8 +1868,7 @@ int merge_files (struct merge_fileset *fs,
if (result!=0) break; if (result!=0) break;
} }
destroy_merge_fileset(fs);
toku_free(fs->data_fidxs);
*fs = next_file_set; *fs = next_file_set;
// Update the progress // Update the progress
......
...@@ -74,6 +74,8 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_ ...@@ -74,6 +74,8 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_
r = brt_loader_sort_and_write_rows(&aset, &fs, &bl, 0, dest_db, compare_ints); CKERR(r); r = brt_loader_sort_and_write_rows(&aset, &fs, &bl, 0, dest_db, compare_ints); CKERR(r);
// destroy_rowset(&aset); // destroy_rowset(&aset);
brtloader_fi_close_all(&bl.file_infos);
QUEUE q; QUEUE q;
r = queue_create(&q, 0xFFFFFFFF); // infinite queue. r = queue_create(&q, 0xFFFFFFFF); // infinite queue.
assert(r==0); assert(r==0);
......
...@@ -111,6 +111,8 @@ static void test_write_dbfile (char *template, int n, char *output_name) { ...@@ -111,6 +111,8 @@ static void test_write_dbfile (char *template, int n, char *output_name) {
r = brt_loader_sort_and_write_rows(&aset, &fs, &bl, 0, dest_db, compare_ints); CKERR(r); r = brt_loader_sort_and_write_rows(&aset, &fs, &bl, 0, dest_db, compare_ints); CKERR(r);
// destroy_rowset(&aset); // destroy_rowset(&aset);
brtloader_fi_close_all(&bl.file_infos);
QUEUE q; QUEUE q;
r = queue_create(&q, 0xFFFFFFFF); // infinite queue. r = queue_create(&q, 0xFFFFFFFF); // infinite queue.
assert(r==0); assert(r==0);
......
...@@ -308,6 +308,8 @@ static void verify_dbfile(int n, int sorted_keys[], const char *sorted_vals[], c ...@@ -308,6 +308,8 @@ static void verify_dbfile(int n, int sorted_keys[], const char *sorted_vals[], c
// destroy_rowset(&bset); // destroy_rowset(&bset);
for (int i=0; i<2; i++) assert(fs.data_fidxs[i].idx != -1); for (int i=0; i<2; i++) assert(fs.data_fidxs[i].idx != -1);
brtloader_fi_close_all(&bl.file_infos);
QUEUE q; QUEUE q;
r = queue_create(&q, 0xFFFFFFFF); // infinite queue. r = queue_create(&q, 0xFFFFFFFF); // infinite queue.
assert(r==0); assert(r==0);
......
...@@ -10,6 +10,17 @@ toku_init_dbt (DBT *ybt) { ...@@ -10,6 +10,17 @@ toku_init_dbt (DBT *ybt) {
return ybt; return ybt;
} }
void
toku_destroy_dbt(DBT *dbt) {
switch (dbt->flags) {
case DB_DBT_MALLOC:
case DB_DBT_REALLOC:
toku_free(dbt->data);
dbt->data = NULL;
break;
}
}
DBT* DBT*
toku_fill_dbt(DBT *dbt, bytevec k, ITEMLEN len) { toku_fill_dbt(DBT *dbt, bytevec k, ITEMLEN len) {
toku_init_dbt(dbt); toku_init_dbt(dbt);
......
...@@ -14,6 +14,7 @@ extern "C" { ...@@ -14,6 +14,7 @@ extern "C" {
#endif #endif
DBT* toku_init_dbt (DBT *); DBT* toku_init_dbt (DBT *);
void toku_destroy_dbt (DBT *);
DBT *toku_fill_dbt(DBT *dbt, bytevec k, ITEMLEN len); DBT *toku_fill_dbt(DBT *dbt, bytevec k, ITEMLEN len);
int toku_dbt_set (ITEMLEN len, bytevec val, DBT *d, struct simple_dbt *sdbt); int toku_dbt_set (ITEMLEN len, bytevec val, DBT *d, struct simple_dbt *sdbt);
int toku_dbt_set_value (DBT *, bytevec *val, ITEMLEN vallen, void **staticptrp, BOOL ybt1_disposable); int toku_dbt_set_value (DBT *, bytevec *val, ITEMLEN vallen, void **staticptrp, BOOL ybt1_disposable);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment