Commit d57849d9 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Merge the memory management into the main line. Refs #2613. [t:2613].

{{{
svn merge -r 20329:20340 https://svn.tokutek.com/tokudb/toku/tokudb.2613
}}}
.


git-svn-id: file:///svn/toku/tokudb@20341 c7de825b-a66e-492c-adef-691d508d4ae1
parent ee21e2d8
...@@ -80,6 +80,12 @@ static size_t do_fwrite (const void *ptr, size_t size, size_t nmemb, FILE *strea ...@@ -80,6 +80,12 @@ static size_t do_fwrite (const void *ptr, size_t size, size_t nmemb, FILE *strea
static uint32_t size_factor = 1024; static uint32_t size_factor = 1024;
static int nodesize = (1<<22); static int nodesize = (1<<22);
enum { EXTRACTOR_QUEUE_DEPTH = 2,
FILE_BUFFER_SIZE = 1<<24,
MIN_ROWSET_MEMORY = 1<<23,
MERGE_BUF_SIZE = 1<<24,
MIN_MERGE_FANIN = 4,
};
void void
...@@ -186,7 +192,7 @@ static int open_file_add (struct file_infos *fi, ...@@ -186,7 +192,7 @@ static int open_file_add (struct file_infos *fi,
fi->file_infos[fi->n_files].fname = fname; fi->file_infos[fi->n_files].fname = fname;
fi->file_infos[fi->n_files].file = file; fi->file_infos[fi->n_files].file = file;
fi->file_infos[fi->n_files].n_rows = 0; fi->file_infos[fi->n_files].n_rows = 0;
fi->file_infos[fi->n_files].buffer_size = 1<<20; fi->file_infos[fi->n_files].buffer_size = FILE_BUFFER_SIZE;
fi->file_infos[fi->n_files].buffer = NULL; fi->file_infos[fi->n_files].buffer = NULL;
result = add_big_buffer(&fi->file_infos[fi->n_files]); result = add_big_buffer(&fi->file_infos[fi->n_files]);
if (result == 0) { if (result == 0) {
...@@ -324,9 +330,11 @@ static void brtloader_destroy (BRTLOADER bl, BOOL is_error) { ...@@ -324,9 +330,11 @@ static void brtloader_destroy (BRTLOADER bl, BOOL is_error) {
static void *extractor_thread (void*); static void *extractor_thread (void*);
enum { EXTRACTOR_QUEUE_DEPTH = 2}; #define MAX(a,b) (((a)<(b)) ? (b) : (a))
static uint64_t memory_per_rowset (BRTLOADER bl) { static uint64_t memory_per_rowset (BRTLOADER bl)
// Return how much memory can be allocated for each rowset.
{
// There is a primary rowset being maintained by the foreground thread. // There is a primary rowset being maintained by the foreground thread.
// There could be two more in the queue. // There could be two more in the queue.
// There is one rowset for each index (bl->N) being filled in. // There is one rowset for each index (bl->N) being filled in.
...@@ -335,12 +343,23 @@ static uint64_t memory_per_rowset (BRTLOADER bl) { ...@@ -335,12 +343,23 @@ static uint64_t memory_per_rowset (BRTLOADER bl) {
+2 // the two primaries in the queue +2 // the two primaries in the queue
+bl->N // the N rowsets being constructed by the extrator thread. +bl->N // the N rowsets being constructed by the extrator thread.
+1 // Give the extractor thread one more so that it can have temporary space for sorting. This is overkill. +1 // Give the extractor thread one more so that it can have temporary space for sorting. This is overkill.
); );
return bl->reserved_memory/(n_copies); int64_t extra_reserved_memory = bl->N * FILE_BUFFER_SIZE; // for each index we are writing to a file at any given time.
int64_t tentative_rowset_size = ((int64_t)(bl->reserved_memory - extra_reserved_memory))/(n_copies);
return MAX(tentative_rowset_size, (int64_t)MIN_ROWSET_MEMORY);
} }
static int merge_fanin (BRTLOADER bl)
// Return the fanin
{
// assume we only perform one fanin at a time.
int tentative_fanin = ((int64_t)(bl->reserved_memory - FILE_BUFFER_SIZE))/MERGE_BUF_SIZE;
int result = MAX(tentative_fanin, (int)MIN_MERGE_FANIN);
//printf("%s:%d Mergefanin=%d (memory=%ld)\n", __FILE__, __LINE__, result, bl->reserved_memory);
return result;
}
// lazy cleanup on error paths, ticket #2591 // LAZY cleanup on error paths, ticket #2591
int toku_brt_loader_open (/* out */ BRTLOADER *blp, int toku_brt_loader_open (/* out */ BRTLOADER *blp,
CACHETABLE cachetable, CACHETABLE cachetable,
generate_row_for_put_func g, generate_row_for_put_func g,
...@@ -1547,8 +1566,7 @@ static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sou ...@@ -1547,8 +1566,7 @@ static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sou
} }
} }
if (result==0) { if (result==0) {
size_t bufsize = 1<<21; // This constant needs to be adjusted. int r = create_dbufio_fileset(&bfs, n_sources, fds, MERGE_BUF_SIZE);
int r = create_dbufio_fileset(&bfs, n_sources, fds, bufsize);
if (r!=0) { result = r; } if (r!=0) { result = r; }
} }
...@@ -1599,9 +1617,9 @@ int merge_files (struct merge_fileset *fs, ...@@ -1599,9 +1617,9 @@ int merge_files (struct merge_fileset *fs,
* (however the fs will still need to be deallocated.) * (however the fs will still need to be deallocated.)
*/ */
{ {
//printf(" merge_files %d files\n", fs->n_temp_files);
//printf(" merge_files use %d progress=%d fin at %d\n", progress_allocation, bl->progress, bl->progress+progress_allocation); //printf(" merge_files use %d progress=%d fin at %d\n", progress_allocation, bl->progress, bl->progress+progress_allocation);
const int mergelimit = (size_factor == 1) ? 4 : 256; const int mergelimit = (size_factor == 1) ? 4 : merge_fanin(bl);
// const int mergelimit = (size_factor == 1) ? 4 : 16;
int n_passes_left = (fs->n_temp_files==1) ? 1 : n_passes(fs->n_temp_files, mergelimit); int n_passes_left = (fs->n_temp_files==1) ? 1 : n_passes(fs->n_temp_files, mergelimit);
//printf("%d files, %d per pass, %d passes\n", fs->n_temp_files, mergelimit, n_passes_left); //printf("%d files, %d per pass, %d passes\n", fs->n_temp_files, mergelimit, n_passes_left);
int result = 0; int result = 0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment