Commit ac9464bc authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Merge the 2571 double-buffering stuff over and delete the 2571 branch. Double...

Merge the 2571 double-buffering stuff over and delete the 2571 branch.  Double buffering of the file merge is complete.  Fixes #2571. close[t:2571]
{{{
svn merge -r 20199:20206 https://svn.tokutek.com/tokudb/toku/tokudb.2571
}}}
.


git-svn-id: file:///svn/toku/tokudb@20208 c7de825b-a66e-492c-adef-691d508d4ae1
parent 182aefc6
...@@ -58,7 +58,7 @@ void destroy_rowset (struct rowset *rows); ...@@ -58,7 +58,7 @@ void destroy_rowset (struct rowset *rows);
void add_row (struct rowset *rows, DBT *key, DBT *val); void add_row (struct rowset *rows, DBT *key, DBT *val);
int loader_write_row(DBT *key, DBT *val, FIDX data, FILE*, u_int64_t *dataoff, BRTLOADER bl); int loader_write_row(DBT *key, DBT *val, FIDX data, FILE*, u_int64_t *dataoff, BRTLOADER bl);
int loader_read_row (FILE *f, DBT *key, DBT *val, BRTLOADER bl); int loader_read_row (FILE *f, DBT *key, DBT *val);
struct merge_fileset { struct merge_fileset {
int n_temp_files, n_temp_files_limit; int n_temp_files, n_temp_files_limit;
......
...@@ -23,6 +23,8 @@ ...@@ -23,6 +23,8 @@
#include "sub_block_map.h" #include "sub_block_map.h"
#include "pqueue.h" #include "pqueue.h"
#include "trace_mem.h" #include "trace_mem.h"
#include "dbufio.h"
// to turn on tracing, // to turn on tracing,
// cd .../newbrt // cd .../newbrt
// edit trace_mem.h, set #define BL_DO_TRACE 1 // edit trace_mem.h, set #define BL_DO_TRACE 1
...@@ -178,7 +180,8 @@ int brtloader_fi_reopen (struct file_infos *fi, FIDX idx, const char *mode) { ...@@ -178,7 +180,8 @@ int brtloader_fi_reopen (struct file_infos *fi, FIDX idx, const char *mode) {
fi->file_infos[i].file = fopen(fi->file_infos[i].fname, mode); fi->file_infos[i].file = fopen(fi->file_infos[i].fname, mode);
if (fi->file_infos[i].file==NULL) { result = errno; goto error; } if (fi->file_infos[i].file==NULL) { result = errno; goto error; }
fi->file_infos[i].is_open = TRUE; fi->file_infos[i].is_open = TRUE;
add_big_buffer(&fi->file_infos[i]); // No longer need the big buffer for reopened files. Don't allocate the space, we need it elsewhere.
//add_big_buffer(&fi->file_infos[i]);
fi->n_files_open++; fi->n_files_open++;
r = toku_pthread_mutex_unlock(&fi->lock); assert(r==0); r = toku_pthread_mutex_unlock(&fi->lock); assert(r==0);
error: error:
...@@ -266,6 +269,7 @@ static void brtloader_destroy (BRTLOADER bl, BOOL is_error) { ...@@ -266,6 +269,7 @@ static void brtloader_destroy (BRTLOADER bl, BOOL is_error) {
brt_loader_destroy_error_callback(&bl->error_callback); brt_loader_destroy_error_callback(&bl->error_callback);
brt_loader_destroy_poll_callback(&bl->poll_callback); brt_loader_destroy_poll_callback(&bl->poll_callback);
toku_free(bl);
} }
static void *extractor_thread (void*); static void *extractor_thread (void*);
...@@ -401,14 +405,13 @@ static int bl_fwrite(void *ptr, size_t size, size_t nmemb, FILE *stream, BRTLOAD ...@@ -401,14 +405,13 @@ static int bl_fwrite(void *ptr, size_t size, size_t nmemb, FILE *stream, BRTLOAD
return 0; return 0;
} }
static int bl_fread (void *ptr, size_t size, size_t nmemb, FILE *stream, BRTLOADER UU(bl)) static int bl_fread (void *ptr, size_t size, size_t nmemb, FILE *stream)
/* Effect: this is a wrapper for fread that returns 0 on success, otherwise returns an error number. /* Effect: this is a wrapper for fread that returns 0 on success, otherwise returns an error number.
* Arguments: * Arguments:
* ptr read data into here. * ptr read data into here.
* size size of data element to be read. * size size of data element to be read.
* nmemb number of data elements to be read. * nmemb number of data elements to be read.
* stream where to read the data from. * stream where to read the data from.
* bl passed so we can panic the brtloader if something goes wrong (recording the error number.
* Return value: 0 on success, an error number otherwise. * Return value: 0 on success, an error number otherwise.
*/ */
{ {
...@@ -438,23 +441,62 @@ static int bl_write_dbt (DBT *dbt, FILE* datafile, uint64_t *dataoff, BRTLOADER ...@@ -438,23 +441,62 @@ static int bl_write_dbt (DBT *dbt, FILE* datafile, uint64_t *dataoff, BRTLOADER
return 0; return 0;
} }
static int bl_read_dbt (/*in*/DBT *dbt, FILE *stream, BRTLOADER bl) static int bl_read_dbt (/*in*/DBT *dbt, FILE *stream)
{ {
int len; int len;
{ {
int r; int r;
if ((r = bl_fread(&len, sizeof(len), 1, stream, bl))) return r; if ((r = bl_fread(&len, sizeof(len), 1, stream))) return r;
assert(len>=0); assert(len>=0);
} }
if ((int)dbt->ulen<len) { dbt->ulen=len; dbt->data=toku_xrealloc(dbt->data, len); } if ((int)dbt->ulen<len) { dbt->ulen=len; dbt->data=toku_xrealloc(dbt->data, len); }
{ {
int r; int r;
if ((r = bl_fread(dbt->data, 1, len, stream, bl))) return r; if ((r = bl_fread(dbt->data, 1, len, stream))) return r;
} }
dbt->size = len; dbt->size = len;
return 0; return 0;
} }
static int bl_read_dbt_from_dbufio (/*in*/DBT *dbt, DBUFIO_FILESET bfs, int filenum)
{
int result = 0;
u_int32_t len;
{
size_t n_read;
int r = dbufio_fileset_read(bfs, filenum, &len, sizeof(len), &n_read);
if (r!=0) {
result = r;
} else if (n_read<sizeof(len)) {
result = ENODATA; // must have run out of data prematurely. This is not EOF, it's a real error.
}
}
if (result==0) {
if (dbt->ulen<len) {
void * data = toku_realloc(dbt->data, len);
if (data==NULL) {
result = errno;
} else {
dbt->ulen=len;
dbt->data=data;
}
}
}
if (result==0) {
size_t n_read;
int r = dbufio_fileset_read(bfs, filenum, dbt->data, len, &n_read);
if (r!=0) {
result = r;
} else if (n_read<len) {
result = ENODATA; // must have run out of data prematurely. This is not EOF, it's a real error.
} else {
dbt->size = len;
}
}
return result;
}
int loader_write_row(DBT *key, DBT *val, FIDX data, FILE *dataf, u_int64_t *dataoff, BRTLOADER bl) int loader_write_row(DBT *key, DBT *val, FIDX data, FILE *dataf, u_int64_t *dataoff, BRTLOADER bl)
/* Effect: Given a key and a val (both DBTs), write them to a file. Increment *dataoff so that it's up to date. /* Effect: Given a key and a val (both DBTs), write them to a file. Increment *dataoff so that it's up to date.
* Arguments: * Arguments:
...@@ -477,7 +519,28 @@ int loader_write_row(DBT *key, DBT *val, FIDX data, FILE *dataf, u_int64_t *data ...@@ -477,7 +519,28 @@ int loader_write_row(DBT *key, DBT *val, FIDX data, FILE *dataf, u_int64_t *data
return 0; return 0;
} }
int loader_read_row (FILE *f, DBT *key, DBT *val, BRTLOADER bl) int loader_read_row (FILE *f, DBT *key, DBT *val)
/* Effect: Read a key value pair from a file. The DBTs must have DB_DBT_REALLOC set.
* Arguments:
* f where to read it from.
* key, val read it into these.
* bl passed so we can panic if needed.
* Return value: 0 on success, an error number otherwise.
* Requires: The DBTs must have DB_DBT_REALLOC
*/
{
{
int r = bl_read_dbt(key, f);
if (r!=0) return r;
}
{
int r = bl_read_dbt(val, f);
if (r!=0) return r;
}
return 0;
}
static int loader_read_row_from_dbufio (DBUFIO_FILESET bfs, int filenum, DBT *key, DBT *val)
/* Effect: Read a key value pair from a file. The DBTs must have DB_DBT_REALLOC set. /* Effect: Read a key value pair from a file. The DBTs must have DB_DBT_REALLOC set.
* Arguments: * Arguments:
* f where to read it from. * f where to read it from.
...@@ -488,11 +551,11 @@ int loader_read_row (FILE *f, DBT *key, DBT *val, BRTLOADER bl) ...@@ -488,11 +551,11 @@ int loader_read_row (FILE *f, DBT *key, DBT *val, BRTLOADER bl)
*/ */
{ {
{ {
int r = bl_read_dbt(key, f, bl); int r = bl_read_dbt_from_dbufio(key, bfs, filenum);
if (r!=0) return r; if (r!=0) return r;
} }
{ {
int r = bl_read_dbt(val, f, bl); int r = bl_read_dbt_from_dbufio(val, bfs, filenum);
if (r!=0) return r; if (r!=0) return r;
} }
return 0; return 0;
...@@ -1181,13 +1244,12 @@ int brt_loader_sort_and_write_rows (struct rowset *rows, struct merge_fileset *f ...@@ -1181,13 +1244,12 @@ int brt_loader_sort_and_write_rows (struct rowset *rows, struct merge_fileset *f
#endif #endif
} }
static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sources, FIDX srcs_fidxs[/*n_sources*/], FILE *srcs_files[/*n_sources*/], BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func compare, int progress_allocation) static int merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sources, DBUFIO_FILESET bfs, FIDX srcs_fidxs[/*n_sources*/], BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func compare, int progress_allocation)
/* Effect: Given an array of FILE*'s each containing sorted, merge the data and write it to an output. All the files remain open after the merge. /* Effect: Given an array of FILE*'s each containing sorted, merge the data and write it to an output. All the files remain open after the merge.
* This merge is performed in one pass, so don't pass too many files in. If you need a tree of merges do it elsewhere. * This merge is performed in one pass, so don't pass too many files in. If you need a tree of merges do it elsewhere.
* If TO_Q is true then we write rowsets into queue Q. Otherwise we write into dest_data. * If TO_Q is true then we write rowsets into queue Q. Otherwise we write into dest_data.
* Modifies: May modify the arrays of files (but if modified, it must be a permutation so the caller can use that array to close everything.) * Modifies: May modify the arrays of files (but if modified, it must be a permutation so the caller can use that array to close everything.)
* Requires: The number of sources is at least one, and each of the input files must have at least one row in it. * Requires: The number of sources is at least one, and each of the input files must have at least one row in it.
* Implementation note: Currently this code uses a really stupid heap O(n) time per pop instead of O(log n), but we'll fix that soon.
* Arguments: * Arguments:
* to_q boolean indicating that output is queue (true) or a file (false) * to_q boolean indicating that output is queue (true) or a file (false)
* dest_data where to write the sorted data * dest_data where to write the sorted data
...@@ -1197,6 +1259,7 @@ static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sou ...@@ -1197,6 +1259,7 @@ static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sou
* bl the brtloader. * bl the brtloader.
* dest_db the destination DB (used in the comparison function). * dest_db the destination DB (used in the comparison function).
* Return value: 0 on success, otherwise an error number. * Return value: 0 on success, otherwise an error number.
* The fidxs are not closed by this function.
*/ */
{ {
FILE *dest_stream = to_q ? NULL : toku_bl_fidx2file(bl, dest_data); FILE *dest_stream = to_q ? NULL : toku_bl_fidx2file(bl, dest_data);
...@@ -1223,7 +1286,7 @@ static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sou ...@@ -1223,7 +1286,7 @@ static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sou
// load pqueue with first value from each source // load pqueue with first value from each source
for (int i=0; i<n_sources; i++) { for (int i=0; i<n_sources; i++) {
BL_TRACE_QUIET(blt_do_i); BL_TRACE_QUIET(blt_do_i);
int r = loader_read_row(srcs_files[i], &keys[i], &vals[i], bl); int r = loader_read_row_from_dbufio(bfs, i, &keys[i], &vals[i]);
BL_TRACE_QUIET(blt_read_row); BL_TRACE_QUIET(blt_read_row);
if (r==EOF) continue; // if the file is empty, don't initialize the pqueue. if (r==EOF) continue; // if the file is empty, don't initialize the pqueue.
if (r!=0) return r; if (r!=0) return r;
...@@ -1297,16 +1360,14 @@ static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sou ...@@ -1297,16 +1360,14 @@ static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sou
{ {
// read next row from file that just sourced min value // read next row from file that just sourced min value
BL_TRACE_QUIET(blt_do_i); BL_TRACE_QUIET(blt_do_i);
r = loader_read_row(srcs_files[mini], &keys[mini], &vals[mini], bl); r = loader_read_row_from_dbufio(bfs, mini, &keys[mini], &vals[mini]);
BL_TRACE_QUIET(blt_read_row); BL_TRACE_QUIET(blt_read_row);
if (r!=0) { if (r!=0) {
if (feof(srcs_files[mini])) { if (r==EOF) {
// on feof, queue size permanently smaller // on feof, queue size permanently smaller
toku_free(keys[mini].data); toku_free(keys[mini].data);
toku_free(vals[mini].data); toku_free(vals[mini].data);
} else { } else {
r = ferror(srcs_files[mini]);
assert(r!=0);
return r; return r;
} }
} }
...@@ -1349,8 +1410,44 @@ static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sou ...@@ -1349,8 +1410,44 @@ static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sou
return update_progress(progress_allocation, bl, "end of merge_some_files"); return update_progress(progress_allocation, bl, "end of merge_some_files");
} }
static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sources, FIDX srcs_fidxs[/*n_sources*/], BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func compare, int progress_allocation)
{
int result = 0;
DBUFIO_FILESET bfs = NULL;
int *MALLOC_N(n_sources, fds);
if (fds==NULL) result=errno;
if (result==0) {
for (int i=0; i<n_sources; i++) {
int r = fileno(toku_bl_fidx2file(bl, srcs_fidxs[i])); // we rely on the fact that when the files are closed, the fd is also closed.
if (r==-1) {
result=errno;
break;
}
fds[i] = r;
}
}
if (result==0) {
size_t bufsize = 1<<21; // This constant needs to be adjusted.
int r = create_dbufio_fileset(&bfs, n_sources, fds, bufsize);
if (r!=0) { result = r; }
}
if (result==0) {
int r = merge_some_files_using_dbufio (to_q, dest_data, q, n_sources, bfs, srcs_fidxs, bl, which_db, dest_db, compare, progress_allocation);
if (r!=0) { result = r; }
}
if (bfs!=NULL) {
int r = destroy_dbufio_fileset(bfs);
if (r!=0 && result==0) result=r;
bfs = NULL;
}
if (fds!=NULL) {
toku_free(fds);
fds = NULL;
}
return result;
}
static int int_min (int a, int b) static int int_min (int a, int b)
{ {
...@@ -1409,17 +1506,14 @@ int merge_files (struct merge_fileset *fs, ...@@ -1409,17 +1506,14 @@ int merge_files (struct merge_fileset *fs,
FIDX merged_data = FIDX_NULL; FIDX merged_data = FIDX_NULL;
FIDX *MALLOC_N(n_to_merge, data_fidxs); FIDX *MALLOC_N(n_to_merge, data_fidxs);
FILE **MALLOC_N(n_to_merge, data_files);
for (int i=0; i<n_to_merge; i++) { for (int i=0; i<n_to_merge; i++) {
data_fidxs[i] = FIDX_NULL; data_fidxs[i] = FIDX_NULL;
data_files[i] = NULL;
} }
for (int i=0; i<n_to_merge; i++) { for (int i=0; i<n_to_merge; i++) {
int idx = fs->n_temp_files -1 -i; int idx = fs->n_temp_files -1 -i;
data_fidxs[i] = fs->data_fidxs[idx]; data_fidxs[i] = fs->data_fidxs[idx];
result = brtloader_fi_reopen(&bl->file_infos, data_fidxs[i], "r"); result = brtloader_fi_reopen(&bl->file_infos, data_fidxs[i], "r");
if (result) { printf("%s:%d r=%d\n", __FILE__, __LINE__, result); break; } if (result) { printf("%s:%d r=%d\n", __FILE__, __LINE__, result); break; }
data_files[i] = toku_bl_fidx2file(bl, data_fidxs[i]);
} }
if (result==0 && !to_queue) { if (result==0 && !to_queue) {
result = extend_fileset(bl, &next_file_set, &merged_data); result = extend_fileset(bl, &next_file_set, &merged_data);
...@@ -1427,7 +1521,7 @@ int merge_files (struct merge_fileset *fs, ...@@ -1427,7 +1521,7 @@ int merge_files (struct merge_fileset *fs,
} }
if (result==0) { if (result==0) {
result = merge_some_files(to_queue, merged_data, output_q, n_to_merge, data_fidxs, data_files, bl, which_db, dest_db, compare, progress_allocation_for_this_subpass); result = merge_some_files(to_queue, merged_data, output_q, n_to_merge, data_fidxs, bl, which_db, dest_db, compare, progress_allocation_for_this_subpass);
// if result!=0, fall through // if result!=0, fall through
if (result==0) { if (result==0) {
/*nothing*/;// this is gratuitous, but we need something to give code coverage tools to help us know that it's important to distinguish between result==0 and result!=0 /*nothing*/;// this is gratuitous, but we need something to give code coverage tools to help us know that it's important to distinguish between result==0 and result!=0
...@@ -1446,7 +1540,6 @@ int merge_files (struct merge_fileset *fs, ...@@ -1446,7 +1540,6 @@ int merge_files (struct merge_fileset *fs,
if (r!=0 && result==0) result = r; if (r!=0 && result==0) result = r;
} }
data_fidxs[i] = FIDX_NULL; data_fidxs[i] = FIDX_NULL;
data_files[i] = NULL;
} }
} }
...@@ -1455,7 +1548,6 @@ int merge_files (struct merge_fileset *fs, ...@@ -1455,7 +1548,6 @@ int merge_files (struct merge_fileset *fs,
int r = brtloader_fi_close(&bl->file_infos, merged_data); int r = brtloader_fi_close(&bl->file_infos, merged_data);
if (r!=0 && result==0) result = r; if (r!=0 && result==0) result = r;
} }
toku_free(data_files);
toku_free(data_fidxs); toku_free(data_fidxs);
if (result!=0) break; if (result!=0) break;
...@@ -2334,7 +2426,7 @@ static int read_some_pivots (FIDX pivots_file, int n_to_read, BRTLOADER bl, ...@@ -2334,7 +2426,7 @@ static int read_some_pivots (FIDX pivots_file, int n_to_read, BRTLOADER bl,
int result = 0; int result = 0;
for (int i = 0; i < n_to_read; i++) { for (int i = 0; i < n_to_read; i++) {
int r = bl_read_dbt(&pivots[i], pivots_stream, bl); int r = bl_read_dbt(&pivots[i], pivots_stream);
if (r != 0) { if (r != 0) {
result = r; result = r;
break; break;
......
...@@ -201,7 +201,7 @@ static void test_read_write_rows (char *template) { ...@@ -201,7 +201,7 @@ static void test_read_write_rows (char *template) {
{ {
int n_read=0; int n_read=0;
DBT key={.size=0}, val={.size=0}; DBT key={.size=0}, val={.size=0};
while (0==loader_read_row(toku_bl_fidx2file(&bl, file), &key, &val, &bl)) { while (0==loader_read_row(toku_bl_fidx2file(&bl, file), &key, &val)) {
assert(strlen(keystrings[n_read])==key.size); assert(strlen(keystrings[n_read])==key.size);
assert(strlen(valstrings[n_read])==val.size); assert(strlen(valstrings[n_read])==val.size);
assert(0==memcmp(keystrings[n_read], key.data, key.size)); assert(0==memcmp(keystrings[n_read], key.data, key.size));
......
...@@ -57,6 +57,7 @@ struct __toku_loader_internal { ...@@ -57,6 +57,7 @@ struct __toku_loader_internal {
* free_loader_resources() frees all of the resources associated with * free_loader_resources() frees all of the resources associated with
* struct __toku_loader_internal * struct __toku_loader_internal
* assumes any previously freed items set the field pointer to NULL * assumes any previously freed items set the field pointer to NULL
* Requires that the brt_loader is closed or destroyed before calling this function.
*/ */
static void free_loader_resources(DB_LOADER *loader) static void free_loader_resources(DB_LOADER *loader)
{ {
...@@ -86,7 +87,6 @@ static void free_loader_resources(DB_LOADER *loader) ...@@ -86,7 +87,6 @@ static void free_loader_resources(DB_LOADER *loader)
toku_free(loader->i->inames_in_env); toku_free(loader->i->inames_in_env);
} }
if (loader->i->temp_file_template) toku_free(loader->i->temp_file_template); if (loader->i->temp_file_template) toku_free(loader->i->temp_file_template);
if (loader->i->brt_loader) toku_free(loader->i->brt_loader);
// loader->i // loader->i
toku_free(loader->i); toku_free(loader->i);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment