Commit 44d24685 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

#2833 refs[t:2833] fix the brtloader to use less memory during key extraction and merge sorting

git-svn-id: file:///svn/toku/tokudb@22581 c7de825b-a66e-492c-adef-691d508d4ae1
parent 67c3f6d9
...@@ -75,6 +75,7 @@ enum { EXTRACTOR_QUEUE_DEPTH = 2, ...@@ -75,6 +75,7 @@ enum { EXTRACTOR_QUEUE_DEPTH = 2,
MIN_ROWSET_MEMORY = 1<<23, MIN_ROWSET_MEMORY = 1<<23,
MIN_MERGE_FANIN = 2, MIN_MERGE_FANIN = 2,
FRACTAL_WRITER_QUEUE_DEPTH = 3, FRACTAL_WRITER_QUEUE_DEPTH = 3,
FRACTAL_WRITER_ROWSETS = FRACTAL_WRITER_QUEUE_DEPTH + 2,
DBUFIO_DEPTH = 2, DBUFIO_DEPTH = 2,
TARGET_MERGE_BUF_SIZE = 1<<24, // we'd like the merge buffer to be this big. TARGET_MERGE_BUF_SIZE = 1<<24, // we'd like the merge buffer to be this big.
MIN_MERGE_BUF_SIZE = 1<<20 // always use at least this much MIN_MERGE_BUF_SIZE = 1<<20 // always use at least this much
...@@ -398,7 +399,8 @@ static uint64_t memory_per_rowset_during_extract (BRTLOADER bl) ...@@ -398,7 +399,8 @@ static uint64_t memory_per_rowset_during_extract (BRTLOADER bl)
// Later we may have sort_and_write operations spawning in parallel, and will need to account for that. // Later we may have sort_and_write operations spawning in parallel, and will need to account for that.
int n_copies = (1 // primary rowset int n_copies = (1 // primary rowset
+EXTRACTOR_QUEUE_DEPTH // the number of primaries in the queue +EXTRACTOR_QUEUE_DEPTH // the number of primaries in the queue
+bl->N // the N rowsets being constructed by the extrator thread. +bl->N // the N rowsets being constructed by the extractor thread.
+bl->N // the N sort buffers
+1 // Give the extractor thread one more so that it can have temporary space for sorting. This is overkill. +1 // Give the extractor thread one more so that it can have temporary space for sorting. This is overkill.
); );
int64_t extra_reserved_memory = bl->N * FILE_BUFFER_SIZE; // for each index we are writing to a file at any given time. int64_t extra_reserved_memory = bl->N * FILE_BUFFER_SIZE; // for each index we are writing to a file at any given time.
...@@ -435,35 +437,40 @@ CILK_END ...@@ -435,35 +437,40 @@ CILK_END
// F merges. Each merge uses // F merges. Each merge uses
// DBUFIO_DEPTH buffers for double buffering. Each buffer is of size at least MERGE_BUF_SIZE // DBUFIO_DEPTH buffers for double buffering. Each buffer is of size at least MERGE_BUF_SIZE
// so the memory is // so the memory is
// F*MERGE_BUF_SIZE*DOUBLE_BUFFER storage. // F*MERGE_BUF_SIZE*DBUFIO_DEPTH storage.
// We use some additional space to buffer the outputs. // We use some additional space to buffer the outputs.
// That's FILE_BUFFER_SIZE for writing to a merge file if we are writing to a mergefile. // That's FILE_BUFFER_SIZE for writing to a merge file if we are writing to a mergefile.
// And we have FRACTAL_WRITER_QUEUE_DEPTH*MERGE_BUF_SIZE per queue // And we have FRACTAL_WRITER_ROWSETS*MERGE_BUF_SIZE per queue
// And if we are doing a fractal, each worker could have have a fractal tree that it's working on. // And if we are doing a fractal, each worker could have have a fractal tree that it's working on.
//
// DBUFIO_DEPTH*F*MERGE_BUF_SIZE + FRACTAL_WRITER_ROWSETS*MERGE_BUF_SIZE + WORKERS*NODESIZE*2 <= RESERVED_MEMORY
static int64_t memory_avail_during_merge(BRTLOADER bl, BOOL is_fractal_node) { static int64_t memory_avail_during_merge(BRTLOADER bl, BOOL is_fractal_node) {
int64_t extra_reserved_memory = 0; // avail memory = reserved memory - WORKERS*NODESIZE*2 for the last merge stage only
int64_t avail_memory = bl->reserved_memory;
if (is_fractal_node) { if (is_fractal_node) {
int64_t extra_reserved_memory_for_queues = (int64_t)FRACTAL_WRITER_QUEUE_DEPTH * (int64_t)TARGET_MERGE_BUF_SIZE; // * bl->N if we run multiple merges in parallel // reserve space for the fractal writer thread buffers
extra_reserved_memory += extra_reserved_memory_for_queues; avail_memory -= (int64_t)brt_loader_get_fractal_workers_count(bl) * (int64_t)nodesize * 2; // compressed and uncompressed buffers
int64_t extra_reserved_memory_for_fractal = (int64_t)brt_loader_get_fractal_workers_count(bl) * (int64_t)nodesize * 2; // compressed and uncompressed buffers
extra_reserved_memory += extra_reserved_memory_for_fractal;
} }
return bl->reserved_memory - extra_reserved_memory; return avail_memory;
} }
static int merge_fanin (BRTLOADER bl, BOOL is_fractal_node) { static int merge_fanin (BRTLOADER bl, BOOL is_fractal_node) {
// return number of temp files to read in this pass // return number of temp files to read in this pass
int64_t memory_avail = memory_avail_during_merge(bl, is_fractal_node); int64_t memory_avail = memory_avail_during_merge(bl, is_fractal_node);
int fanin = memory_avail / ((int64_t)DBUFIO_DEPTH * (int64_t)TARGET_MERGE_BUF_SIZE); int64_t nbuffers = memory_avail / (int64_t)TARGET_MERGE_BUF_SIZE;
if (fanin < MIN_MERGE_FANIN) if (is_fractal_node)
fanin = MIN_MERGE_FANIN; nbuffers -= FRACTAL_WRITER_ROWSETS;
return fanin; return MAX(nbuffers / (int64_t)DBUFIO_DEPTH, (int)MIN_MERGE_FANIN);
} }
static uint64_t memory_per_rowset_during_merge (BRTLOADER bl, int merge_factor, BOOL is_fractal_node // if it is being sent to a q static uint64_t memory_per_rowset_during_merge (BRTLOADER bl, int merge_factor, BOOL is_fractal_node // if it is being sent to a q
) { ) {
int64_t memory_avail = memory_avail_during_merge(bl, is_fractal_node); int64_t memory_avail = memory_avail_during_merge(bl, is_fractal_node);
return MAX(memory_avail/(DBUFIO_DEPTH * merge_factor), (int64_t)MIN_MERGE_BUF_SIZE); int64_t nbuffers = DBUFIO_DEPTH * merge_factor;
if (is_fractal_node)
nbuffers += FRACTAL_WRITER_ROWSETS;
return MAX(memory_avail / nbuffers, (int64_t)MIN_MERGE_BUF_SIZE);
} }
int toku_brt_loader_internal_init (/* out */ BRTLOADER *blp, int toku_brt_loader_internal_init (/* out */ BRTLOADER *blp,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment