Commit 663acdae authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

#2654 refs[t:2654] merge loader persort speedup to main tokudb

git-svn-id: file:///svn/toku/tokudb@22354 c7de825b-a66e-492c-adef-691d508d4ae1
parent 07f79708
......@@ -62,6 +62,10 @@ int loader_write_row(DBT *key, DBT *val, FIDX data, FILE*, u_int64_t *dataoff, B
int loader_read_row (FILE *f, DBT *key, DBT *val);
struct merge_fileset {
BOOL have_sorted_output; // Is there an previous key?
FIDX sorted_output; // this points to one of the data_fidxs. If output_is_sorted then this is the file containing sorted data. It's still open
DBT prev_key; // What is it? If it's here, its the last output in the merge fileset
int n_temp_files, n_temp_files_limit;
FIDX *data_fidxs;
};
......@@ -217,6 +221,7 @@ int brt_loader_write_file_to_dbfile (int outfile, FIDX infile, BRTLOADER bl, con
int brtloader_init_file_infos (struct file_infos *fi);
void brtloader_fi_destroy (struct file_infos *fi, BOOL is_error);
int brtloader_fi_close (struct file_infos *fi, FIDX idx);
void brtloader_fi_close_all (struct file_infos *fi);
int brtloader_fi_reopen (struct file_infos *fi, FIDX idx, const char *mode);
int brtloader_fi_unlink (struct file_infos *fi, FIDX idx);
......
......@@ -257,6 +257,13 @@ int brtloader_fi_unlink (struct file_infos *fi, FIDX idx) {
return result;
}
void brtloader_fi_close_all(struct file_infos *fi) {
for (int i = 0; i < fi->n_files; i++) {
FIDX idx = { i };
(void ) brtloader_fi_close(fi, idx);
}
}
int brtloader_open_temp_file (BRTLOADER bl, FIDX *file_idx)
/* Effect: Open a temporary file in read-write mode. Save enough information to close and delete the file later.
* Return value: 0 on success, an error number otherwise.
......@@ -982,11 +989,15 @@ static int finish_extractor (BRTLOADER bl) {
int r = queue_destroy(bl->primary_rowset_queue);
invariant(r==0);
}
brtloader_fi_close_all(&bl->file_infos);
//printf("%s:%d joined\n", __FILE__, __LINE__);
return 0;
}
static const DBT zero_dbt = {0,0,0,0};
static DBT make_dbt (void *data, u_int32_t size) {
DBT result = zero_dbt;
result.data = data;
......@@ -1348,6 +1359,11 @@ CILK_END
void init_merge_fileset (struct merge_fileset *fs)
/* Effect: Initialize a fileset */
{
fs->have_sorted_output = FALSE;
fs->sorted_output = FIDX_NULL;
fs->prev_key = zero_dbt;
fs->prev_key.flags = DB_DBT_REALLOC;
fs->n_temp_files = 0;
fs->n_temp_files_limit = 0;
fs->data_fidxs = NULL;
......@@ -1357,6 +1373,7 @@ void destroy_merge_fileset (struct merge_fileset *fs)
/* Effect: Destroy a fileset. */
{
if ( fs ) {
toku_destroy_dbt(&fs->prev_key);
fs->n_temp_files = 0;
fs->n_temp_files_limit = 0;
toku_free(fs->data_fidxs);
......@@ -1416,6 +1433,21 @@ static int update_progress (int N,
return result;
}
static int write_rowset_to_file (BRTLOADER bl, FIDX sfile, const struct rowset rows) {
FILE *sstream = toku_bl_fidx2file(bl, sfile);
for (size_t i=0; i<rows.n_rows; i++) {
DBT skey = make_dbt(rows.data + rows.rows[i].off, rows.rows[i].klen);
DBT sval = make_dbt(rows.data + rows.rows[i].off + rows.rows[i].klen, rows.rows[i].vlen);
u_int64_t soffset=0; // don't really need this.
int r = loader_write_row(&skey, &sval, sfile, sstream, &soffset, bl);
if (r != 0) return r;
}
return 0;
}
CILK_BEGIN
int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func compare)
/* Effect: Given a rowset, sort it and write it to a temporary file.
......@@ -1433,40 +1465,42 @@ int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, BRTLOADER
*/
{
//printf(" sort_and_write use %d progress=%d fin at %d\n", progress_allocation, bl->progress, bl->progress+progress_allocation);
FIDX sfile = FIDX_NULL;
u_int64_t soffset=0;
// TODO: erase the files, and deal with all the cleanup on error paths
//printf("%s:%d sort_rows n_rows=%ld\n", __FILE__, __LINE__, rows->n_rows);
//bl_time_t before_sort = bl_time_now();
int result = 0;
int r = sort_rows(&rows, which_db, dest_db, compare, bl);
if (r != 0) result = r;
//bl_time_t after_sort = bl_time_now();
if (result == 0) {
r = extend_fileset(bl, fs, &sfile);
if (r != 0)
result = r;
else {
int result;
if (rows.n_rows == 0) {
result = 0;
} else {
result = sort_rows(&rows, which_db, dest_db, compare, bl);
FILE *sstream = toku_bl_fidx2file(bl, sfile);
for (size_t i=0; i<rows.n_rows; i++) {
DBT skey = make_dbt(rows.data + rows.rows[i].off, rows.rows[i].klen);
DBT sval = make_dbt(rows.data + rows.rows[i].off + rows.rows[i].klen, rows.rows[i].vlen);
//bl_time_t after_sort = bl_time_now();
r = loader_write_row(&skey, &sval, sfile, sstream, &soffset, bl);
if (r != 0) {
result = r;
break;
if (result == 0) {
DBT akey = make_dbt(rows.data+rows.rows[0].off, rows.rows[0].klen);
if (fs->have_sorted_output && compare(dest_db, &fs->prev_key, &akey)<0) {
// write everything to the same output.
result = write_rowset_to_file(bl, fs->sorted_output, rows);
} else {
FIDX sfile = FIDX_NULL;
result = extend_fileset(bl, fs, &sfile);
if (result == 0) {
if (fs->have_sorted_output) {
fs->have_sorted_output = FALSE;
result = brtloader_fi_close(&bl->file_infos, fs->sorted_output);
}
if (result == 0) {
result = write_rowset_to_file(bl, sfile, rows);
if (result == 0) {
fs->have_sorted_output = TRUE; fs->sorted_output = sfile;
result = toku_dbt_set(rows.rows[rows.n_rows-1].klen, rows.data + rows.rows[rows.n_rows-1].off, &fs->prev_key, NULL);
}
}
}
}
r = brtloader_fi_close(&bl->file_infos, sfile);
if (r != 0) result = r;
}
}
}
}
destroy_rowset(&rows);
......@@ -1834,8 +1868,7 @@ int merge_files (struct merge_fileset *fs,
if (result!=0) break;
}
toku_free(fs->data_fidxs);
destroy_merge_fileset(fs);
*fs = next_file_set;
// Update the progress
......
......@@ -74,6 +74,8 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_
r = brt_loader_sort_and_write_rows(&aset, &fs, &bl, 0, dest_db, compare_ints); CKERR(r);
// destroy_rowset(&aset);
brtloader_fi_close_all(&bl.file_infos);
QUEUE q;
r = queue_create(&q, 0xFFFFFFFF); // infinite queue.
assert(r==0);
......
......@@ -110,6 +110,8 @@ static void test_write_dbfile (char *template, int n, char *output_name) {
brt_loader_set_error_function(&bl.error_callback, err_cb, NULL);
r = brt_loader_sort_and_write_rows(&aset, &fs, &bl, 0, dest_db, compare_ints); CKERR(r);
// destroy_rowset(&aset);
brtloader_fi_close_all(&bl.file_infos);
QUEUE q;
r = queue_create(&q, 0xFFFFFFFF); // infinite queue.
......
......@@ -308,6 +308,8 @@ static void verify_dbfile(int n, int sorted_keys[], const char *sorted_vals[], c
// destroy_rowset(&bset);
for (int i=0; i<2; i++) assert(fs.data_fidxs[i].idx != -1);
brtloader_fi_close_all(&bl.file_infos);
QUEUE q;
r = queue_create(&q, 0xFFFFFFFF); // infinite queue.
assert(r==0);
......
......@@ -10,6 +10,17 @@ toku_init_dbt (DBT *ybt) {
return ybt;
}
void
toku_destroy_dbt(DBT *dbt) {
switch (dbt->flags) {
case DB_DBT_MALLOC:
case DB_DBT_REALLOC:
toku_free(dbt->data);
dbt->data = NULL;
break;
}
}
DBT*
toku_fill_dbt(DBT *dbt, bytevec k, ITEMLEN len) {
toku_init_dbt(dbt);
......
......@@ -14,6 +14,7 @@ extern "C" {
#endif
DBT* toku_init_dbt (DBT *);
void toku_destroy_dbt (DBT *);
DBT *toku_fill_dbt(DBT *dbt, bytevec k, ITEMLEN len);
int toku_dbt_set (ITEMLEN len, bytevec val, DBT *d, struct simple_dbt *sdbt);
int toku_dbt_set_value (DBT *, bytevec *val, ITEMLEN vallen, void **staticptrp, BOOL ybt1_disposable);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment