Commit 2b27ce98 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Merge in the changes that make the reads not use fidx locks too. Refs #2571. [t:2571]

{{{
svn merge -r 20067:20072 https://svn.tokutek.com/tokudb/toku/tokudb.2571
}}}
.


git-svn-id: file:///svn/toku/tokudb@20073 c7de825b-a66e-492c-adef-691d508d4ae1
parent 23f72d66
...@@ -58,7 +58,7 @@ void destroy_rowset (struct rowset *rows); ...@@ -58,7 +58,7 @@ void destroy_rowset (struct rowset *rows);
void add_row (struct rowset *rows, DBT *key, DBT *val); void add_row (struct rowset *rows, DBT *key, DBT *val);
int loader_write_row(DBT *key, DBT *val, FIDX data, FILE*, u_int64_t *dataoff, BRTLOADER bl); int loader_write_row(DBT *key, DBT *val, FIDX data, FILE*, u_int64_t *dataoff, BRTLOADER bl);
int loader_read_row (FIDX f, DBT *key, DBT *val, BRTLOADER bl); int loader_read_row (FILE *f, DBT *key, DBT *val, BRTLOADER bl);
struct merge_fileset { struct merge_fileset {
int n_temp_files, n_temp_files_limit; int n_temp_files, n_temp_files_limit;
......
...@@ -381,7 +381,7 @@ static int bl_fwrite(void *ptr, size_t size, size_t nmemb, FILE *stream, BRTLOAD ...@@ -381,7 +381,7 @@ static int bl_fwrite(void *ptr, size_t size, size_t nmemb, FILE *stream, BRTLOAD
return 0; return 0;
} }
static int bl_fread (void *ptr, size_t size, size_t nmemb, FIDX streami, BRTLOADER bl) static int bl_fread (void *ptr, size_t size, size_t nmemb, FILE *stream, BRTLOADER bl)
/* Effect: this is a wrapper for fread that returns 0 on success, otherwise returns an error number. /* Effect: this is a wrapper for fread that returns 0 on success, otherwise returns an error number.
* Arguments: * Arguments:
* ptr read data into here. * ptr read data into here.
...@@ -392,7 +392,6 @@ static int bl_fread (void *ptr, size_t size, size_t nmemb, FIDX streami, BRTLOAD ...@@ -392,7 +392,6 @@ static int bl_fread (void *ptr, size_t size, size_t nmemb, FIDX streami, BRTLOAD
* Return value: 0 on success, an error number otherwise. * Return value: 0 on success, an error number otherwise.
*/ */
{ {
FILE *stream = toku_bl_fidx2file(bl, streami);
size_t r = fread(ptr, size, nmemb, stream); size_t r = fread(ptr, size, nmemb, stream);
if (r==0) { if (r==0) {
if (feof(stream)) return EOF; if (feof(stream)) return EOF;
...@@ -420,18 +419,18 @@ static int bl_write_dbt (DBT *dbt, FILE* datafile, uint64_t *dataoff, BRTLOADER ...@@ -420,18 +419,18 @@ static int bl_write_dbt (DBT *dbt, FILE* datafile, uint64_t *dataoff, BRTLOADER
return 0; return 0;
} }
static int bl_read_dbt (/*in*/DBT *dbt, FIDX datafile, BRTLOADER bl) static int bl_read_dbt (/*in*/DBT *dbt, FILE *stream, BRTLOADER bl)
{ {
int len; int len;
{ {
int r; int r;
if ((r = bl_fread(&len, sizeof(len), 1, datafile, bl))) return r; if ((r = bl_fread(&len, sizeof(len), 1, stream, bl))) return r;
assert(len>=0); assert(len>=0);
} }
if ((int)dbt->ulen<len) { dbt->ulen=len; dbt->data=toku_xrealloc(dbt->data, len); } if ((int)dbt->ulen<len) { dbt->ulen=len; dbt->data=toku_xrealloc(dbt->data, len); }
{ {
int r; int r;
if ((r = bl_fread(dbt->data, 1, len, datafile, bl))) return r; if ((r = bl_fread(dbt->data, 1, len, stream, bl))) return r;
} }
dbt->size = len; dbt->size = len;
return 0; return 0;
...@@ -459,7 +458,7 @@ int loader_write_row(DBT *key, DBT *val, FIDX data, FILE *dataf, u_int64_t *data ...@@ -459,7 +458,7 @@ int loader_write_row(DBT *key, DBT *val, FIDX data, FILE *dataf, u_int64_t *data
return 0; return 0;
} }
int loader_read_row (FIDX f, DBT *key, DBT *val, BRTLOADER bl) int loader_read_row (FILE *f, DBT *key, DBT *val, BRTLOADER bl)
/* Effect: Read a key value pair from a file. The DBTs must have DB_DBT_REALLOC set. /* Effect: Read a key value pair from a file. The DBTs must have DB_DBT_REALLOC set.
* Arguments: * Arguments:
* f where to read it from. * f where to read it from.
...@@ -1162,7 +1161,7 @@ int brt_loader_sort_and_write_rows (struct rowset *rows, struct merge_fileset *f ...@@ -1162,7 +1161,7 @@ int brt_loader_sort_and_write_rows (struct rowset *rows, struct merge_fileset *f
#endif #endif
} }
static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sources, FIDX srcs_data[/*n_sources*/], BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func compare, int progress_allocation) static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sources, FIDX srcs_fidxs[/*n_sources*/], FILE *srcs_files[/*n_sources*/], BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func compare, int progress_allocation)
/* Effect: Given an array of FILE*'s each containing sorted, merge the data and write it to an output. All the files remain open after the merge. /* Effect: Given an array of FILE*'s each containing sorted, merge the data and write it to an output. All the files remain open after the merge.
* This merge is performed in one pass, so don't pass too many files in. If you need a tree of merges do it elsewhere. * This merge is performed in one pass, so don't pass too many files in. If you need a tree of merges do it elsewhere.
* If TO_Q is true then we write rowsets into queue Q. Otherwise we write into dest_data. * If TO_Q is true then we write rowsets into queue Q. Otherwise we write into dest_data.
...@@ -1204,7 +1203,7 @@ static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sou ...@@ -1204,7 +1203,7 @@ static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sou
// load pqueue with first value from each source // load pqueue with first value from each source
for (int i=0; i<n_sources; i++) { for (int i=0; i<n_sources; i++) {
BL_TRACE_QUIET(blt_do_i); BL_TRACE_QUIET(blt_do_i);
int r = loader_read_row(srcs_data[i], &keys[i], &vals[i], bl); int r = loader_read_row(srcs_files[i], &keys[i], &vals[i], bl);
BL_TRACE_QUIET(blt_read_row); BL_TRACE_QUIET(blt_read_row);
if (r==EOF) continue; // if the file is empty, don't initialize the pqueue. if (r==EOF) continue; // if the file is empty, don't initialize the pqueue.
if (r!=0) return r; if (r!=0) return r;
...@@ -1225,7 +1224,7 @@ static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sou ...@@ -1225,7 +1224,7 @@ static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sou
dataoff[i] = 0; dataoff[i] = 0;
{ int r2 = toku_pthread_mutex_lock(&bl->file_infos.lock); assert(r2==0); } { int r2 = toku_pthread_mutex_lock(&bl->file_infos.lock); assert(r2==0); }
n_rows += bl->file_infos.file_infos[srcs_data[i].idx].n_rows; n_rows += bl->file_infos.file_infos[srcs_fidxs[i].idx].n_rows;
{ int r2 = toku_pthread_mutex_unlock(&bl->file_infos.lock); assert(r2==0); } { int r2 = toku_pthread_mutex_unlock(&bl->file_infos.lock); assert(r2==0); }
} }
u_int64_t n_rows_done = 0; u_int64_t n_rows_done = 0;
...@@ -1278,15 +1277,15 @@ static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sou ...@@ -1278,15 +1277,15 @@ static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sou
{ {
// read next row from file that just sourced min value // read next row from file that just sourced min value
BL_TRACE_QUIET(blt_do_i); BL_TRACE_QUIET(blt_do_i);
r = loader_read_row(srcs_data[mini], &keys[mini], &vals[mini], bl); r = loader_read_row(srcs_files[mini], &keys[mini], &vals[mini], bl);
BL_TRACE_QUIET(blt_read_row); BL_TRACE_QUIET(blt_read_row);
if (r!=0) { if (r!=0) {
if (feof(toku_bl_fidx2file(bl, srcs_data[mini]))) { if (feof(srcs_files[mini])) {
// on feof, queue size permanently smaller // on feof, queue size permanently smaller
toku_free(keys[mini].data); toku_free(keys[mini].data);
toku_free(vals[mini].data); toku_free(vals[mini].data);
} else { } else {
r = ferror(toku_bl_fidx2file(bl, srcs_data[mini])); r = ferror(srcs_files[mini]);
assert(r!=0); assert(r!=0);
return r; return r;
} }
...@@ -1389,13 +1388,18 @@ int merge_files (struct merge_fileset *fs, ...@@ -1389,13 +1388,18 @@ int merge_files (struct merge_fileset *fs,
//printf("%s:%d merging\n", __FILE__, __LINE__); //printf("%s:%d merging\n", __FILE__, __LINE__);
FIDX merged_data = FIDX_NULL; FIDX merged_data = FIDX_NULL;
FIDX *MALLOC_N(n_to_merge, datafiles); FIDX *MALLOC_N(n_to_merge, data_fidxs);
for (int i=0; i<n_to_merge; i++) datafiles[i] = FIDX_NULL; FILE **MALLOC_N(n_to_merge, data_files);
for (int i=0; i<n_to_merge; i++) {
data_fidxs[i] = FIDX_NULL;
data_files[i] = NULL;
}
for (int i=0; i<n_to_merge; i++) { for (int i=0; i<n_to_merge; i++) {
int idx = fs->n_temp_files -1 -i; int idx = fs->n_temp_files -1 -i;
datafiles[i] = fs->data_fidxs[idx]; data_fidxs[i] = fs->data_fidxs[idx];
result = brtloader_fi_reopen(&bl->file_infos, datafiles[i], "r"); result = brtloader_fi_reopen(&bl->file_infos, data_fidxs[i], "r");
if (result) { printf("%s:%d r=%d\n", __FILE__, __LINE__, result); break; } if (result) { printf("%s:%d r=%d\n", __FILE__, __LINE__, result); break; }
data_files[i] = toku_bl_fidx2file(bl, data_fidxs[i]);
} }
if (result==0 && !to_queue) { if (result==0 && !to_queue) {
result = extend_fileset(bl, &next_file_set, &merged_data); result = extend_fileset(bl, &next_file_set, &merged_data);
...@@ -1403,7 +1407,7 @@ int merge_files (struct merge_fileset *fs, ...@@ -1403,7 +1407,7 @@ int merge_files (struct merge_fileset *fs,
} }
if (result==0) { if (result==0) {
result = merge_some_files(to_queue, merged_data, output_q, n_to_merge, datafiles, bl, which_db, dest_db, compare, progress_allocation_for_this_subpass); result = merge_some_files(to_queue, merged_data, output_q, n_to_merge, data_fidxs, data_files, bl, which_db, dest_db, compare, progress_allocation_for_this_subpass);
// if result!=0, fall through // if result!=0, fall through
if (result==0) { if (result==0) {
/*nothing*/;// this is gratuitous, but we need something to give code coverage tools to help us know that it's important to distinguish between result==0 and result!=0 /*nothing*/;// this is gratuitous, but we need something to give code coverage tools to help us know that it's important to distinguish between result==0 and result!=0
...@@ -1412,16 +1416,17 @@ int merge_files (struct merge_fileset *fs, ...@@ -1412,16 +1416,17 @@ int merge_files (struct merge_fileset *fs,
//printf("%s:%d merged\n", __FILE__, __LINE__); //printf("%s:%d merged\n", __FILE__, __LINE__);
for (int i=0; i<n_to_merge; i++) { for (int i=0; i<n_to_merge; i++) {
if (!fidx_is_null(datafiles[i])) { if (!fidx_is_null(data_fidxs[i])) {
{ {
int r = brtloader_fi_close(&bl->file_infos, datafiles[i]); int r = brtloader_fi_close(&bl->file_infos, data_fidxs[i]);
if (r!=0 && result==0) result = r; if (r!=0 && result==0) result = r;
} }
{ {
int r = brtloader_fi_unlink(&bl->file_infos, datafiles[i]); int r = brtloader_fi_unlink(&bl->file_infos, data_fidxs[i]);
if (r!=0 && result==0) result = r; if (r!=0 && result==0) result = r;
} }
datafiles[i] = FIDX_NULL; data_fidxs[i] = FIDX_NULL;
data_files[i] = NULL;
} }
} }
...@@ -1430,7 +1435,8 @@ int merge_files (struct merge_fileset *fs, ...@@ -1430,7 +1435,8 @@ int merge_files (struct merge_fileset *fs,
int r = brtloader_fi_close(&bl->file_infos, merged_data); int r = brtloader_fi_close(&bl->file_infos, merged_data);
if (r!=0 && result==0) result = r; if (r!=0 && result==0) result = r;
} }
toku_free(datafiles); toku_free(data_files);
toku_free(data_fidxs);
if (result!=0) break; if (result!=0) break;
} }
...@@ -2249,9 +2255,10 @@ static int read_some_pivots (FIDX pivots_file, int n_to_read, BRTLOADER bl, ...@@ -2249,9 +2255,10 @@ static int read_some_pivots (FIDX pivots_file, int n_to_read, BRTLOADER bl,
/*out*/ DBT pivots[/*n_to_read*/]) /*out*/ DBT pivots[/*n_to_read*/])
// pivots is an array to be filled in. The pivots array is uninitialized. // pivots is an array to be filled in. The pivots array is uninitialized.
{ {
FILE *pivots_stream = toku_bl_fidx2file(bl, pivots_file);
for (int i=0; i<n_to_read; i++) { for (int i=0; i<n_to_read; i++) {
memset(&pivots[i], 0, sizeof pivots[i]); memset(&pivots[i], 0, sizeof pivots[i]);
int r = bl_read_dbt(&pivots[i], pivots_file, bl); int r = bl_read_dbt(&pivots[i], pivots_stream, bl);
if (r!=0) return r; if (r!=0) return r;
}; };
return 0; return 0;
......
...@@ -201,7 +201,7 @@ static void test_read_write_rows (char *template) { ...@@ -201,7 +201,7 @@ static void test_read_write_rows (char *template) {
{ {
int n_read=0; int n_read=0;
DBT key={.size=0}, val={.size=0}; DBT key={.size=0}, val={.size=0};
while (0==loader_read_row(file, &key, &val, &bl)) { while (0==loader_read_row(toku_bl_fidx2file(&bl, file), &key, &val, &bl)) {
assert(strlen(keystrings[n_read])==key.size); assert(strlen(keystrings[n_read])==key.size);
assert(strlen(valstrings[n_read])==val.size); assert(strlen(valstrings[n_read])==val.size);
assert(0==memcmp(keystrings[n_read], key.data, key.size)); assert(0==memcmp(keystrings[n_read], key.data, key.size));
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment