Commit 07f79708 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Merge 2577c onto main line. Refs #2755c. [t:2755c]

{{{
svn merge -r22020:22070 https://svn.tokutek.com/tokudb/toku/tokudb.2755c
}}}
.


git-svn-id: file:///svn/toku/tokudb@22160 c7de825b-a66e-492c-adef-691d508d4ae1
parent c2162500
...@@ -36,6 +36,7 @@ ...@@ -36,6 +36,7 @@
#include <cilk.h> #include <cilk.h>
#include <cilk_mutex.h> #include <cilk_mutex.h>
#include <fake_mutex.h> #include <fake_mutex.h>
#define cilk_worker_count (cilk::current_worker_count())
#else #else
// maybe #include <cilk_stub.h> // maybe #include <cilk_stub.h>
#if !defined(CILK_STUB) #if !defined(CILK_STUB)
...@@ -44,6 +45,7 @@ ...@@ -44,6 +45,7 @@
#define cilk_sync #define cilk_sync
#define cilk_for for #define cilk_for for
#endif #endif
#define cilk_worker_count 1
#endif #endif
// mark everything as C and selectively mark cilk functions // mark everything as C and selectively mark cilk functions
...@@ -71,8 +73,10 @@ static int nodesize = (1<<22); ...@@ -71,8 +73,10 @@ static int nodesize = (1<<22);
enum { EXTRACTOR_QUEUE_DEPTH = 2, enum { EXTRACTOR_QUEUE_DEPTH = 2,
FILE_BUFFER_SIZE = 1<<24, FILE_BUFFER_SIZE = 1<<24,
MIN_ROWSET_MEMORY = 1<<23, MIN_ROWSET_MEMORY = 1<<23,
MERGE_BUF_SIZE = 1<<24, MIN_MERGE_FANIN = 2,
MIN_MERGE_FANIN = 4, MERGE_QUEUE_DEPTH = 3,
TARGET_MERGE_BUF_SIZE = 1<<24, // we'd like the merge buffer to be this big.
MIN_MERGE_BUF_SIZE = 1<<20 // always use at least this much
}; };
...@@ -351,7 +355,7 @@ static void *extractor_thread (void*); ...@@ -351,7 +355,7 @@ static void *extractor_thread (void*);
#define MAX(a,b) (((a)<(b)) ? (b) : (a)) #define MAX(a,b) (((a)<(b)) ? (b) : (a))
static uint64_t memory_per_rowset (BRTLOADER bl) static uint64_t memory_per_rowset_during_extract (BRTLOADER bl)
// Return how much memory can be allocated for each rowset. // Return how much memory can be allocated for each rowset.
{ {
if (size_factor==1) { if (size_factor==1) {
...@@ -362,7 +366,7 @@ static uint64_t memory_per_rowset (BRTLOADER bl) ...@@ -362,7 +366,7 @@ static uint64_t memory_per_rowset (BRTLOADER bl)
// There is one rowset for each index (bl->N) being filled in. // There is one rowset for each index (bl->N) being filled in.
// Later we may have sort_and_write operations spawning in parallel, and will need to account for that. // Later we may have sort_and_write operations spawning in parallel, and will need to account for that.
int n_copies = (1 // primary rowset int n_copies = (1 // primary rowset
+2 // the two primaries in the queue +EXTRACTOR_QUEUE_DEPTH // the number of primaries in the queue
+bl->N // the N rowsets being constructed by the extrator thread. +bl->N // the N rowsets being constructed by the extrator thread.
+1 // Give the extractor thread one more so that it can have temporary space for sorting. This is overkill. +1 // Give the extractor thread one more so that it can have temporary space for sorting. This is overkill.
); );
...@@ -372,14 +376,35 @@ static uint64_t memory_per_rowset (BRTLOADER bl) ...@@ -372,14 +376,35 @@ static uint64_t memory_per_rowset (BRTLOADER bl)
} }
} }
static int merge_fanin (BRTLOADER bl) // To compute a merge, we have a certain amount of memory to work with.
// Return the fanin // We perform only one fanin at a time.
{ // If the fanout is F then we are using
// assume we only perform one fanin at a time. // F merges. Each merge uses
int tentative_fanin = ((int64_t)(bl->reserved_memory - FILE_BUFFER_SIZE))/MERGE_BUF_SIZE; // MERGE_QUEUE_DEPTH buffers for double buffering. Each buffer is of size at least MERGE_BUF_SIZE
int result = MAX(tentative_fanin, (int)MIN_MERGE_FANIN); // so the memory is
//printf("%s:%d Mergefanin=%d (memory=%ld)\n", __FILE__, __LINE__, result, bl->reserved_memory); // F*MERGE_BUF_SIZE*MERGE_QUEUE_DEPTH storage.
return result; // We use some additional space to buffer the outputs.
// That's FILE_BUFFER_SIZE for writing to a merge file if we are writing to a mergefile.
// And we have MERGE_QUEUE_DEPTH*MERGE_BUF_SIZE*bl->N buffers for queue
// And if we are doing a fractal, each worker could have have a fractal tree that it's working on.
static int64_t memory_avail_during_merge(BRTLOADER bl, BOOL is_fractal_node) {
int64_t extra_reserved_memory_for_queues = (int64_t)MERGE_QUEUE_DEPTH*(int64_t)TARGET_MERGE_BUF_SIZE* (int64_t)bl->N;
int64_t extra_reserved_memory_for_fractal = (is_fractal_node ? cilk_worker_count*(int64_t)nodesize : 0);
int64_t extra_reserved_memory = extra_reserved_memory_for_queues + extra_reserved_memory_for_fractal;
return bl->reserved_memory-extra_reserved_memory;
}
static int merge_fanin (BRTLOADER bl, BOOL is_fractal_node) {
int64_t memory_avail = memory_avail_during_merge(bl, is_fractal_node);
int64_t min_buffers_needed = (int64_t)MIN_MERGE_FANIN*(int64_t)TARGET_MERGE_BUF_SIZE*(int64_t)MERGE_QUEUE_DEPTH;
if (memory_avail<min_buffers_needed) return MIN_MERGE_FANIN;
else return memory_avail/((int64_t)TARGET_MERGE_BUF_SIZE*(int64_t)MERGE_QUEUE_DEPTH);
}
static uint64_t memory_per_rowset_during_merge (BRTLOADER bl, int merge_factor, BOOL is_fractal_node // if it is being sent to a q
) {
int64_t memory_avail = memory_avail_during_merge(bl, is_fractal_node);
return MAX(memory_avail/merge_factor, (int64_t)MIN_MERGE_BUF_SIZE);
} }
int toku_brt_loader_internal_init (/* out */ BRTLOADER *blp, int toku_brt_loader_internal_init (/* out */ BRTLOADER *blp,
...@@ -450,7 +475,7 @@ int toku_brt_loader_internal_init (/* out */ BRTLOADER *blp, ...@@ -450,7 +475,7 @@ int toku_brt_loader_internal_init (/* out */ BRTLOADER *blp,
MY_CALLOC_N(N, bl->last_key); MY_CALLOC_N(N, bl->last_key);
for(int i=0;i<N;i++) { for(int i=0;i<N;i++) {
{ {
int r = init_rowset(&bl->rows[i], memory_per_rowset(bl)); int r = init_rowset(&bl->rows[i], memory_per_rowset_during_extract(bl));
if (r!=0) { toku_brtloader_internal_destroy(bl, TRUE); return r; } if (r!=0) { toku_brtloader_internal_destroy(bl, TRUE); return r; }
} }
init_merge_fileset(&bl->fs[i]); init_merge_fileset(&bl->fs[i]);
...@@ -464,7 +489,7 @@ int toku_brt_loader_internal_init (/* out */ BRTLOADER *blp, ...@@ -464,7 +489,7 @@ int toku_brt_loader_internal_init (/* out */ BRTLOADER *blp,
brt_loader_init_poll_callback(&bl->poll_callback); brt_loader_init_poll_callback(&bl->poll_callback);
{ {
int r = init_rowset(&bl->primary_rowset, memory_per_rowset(bl)); int r = init_rowset(&bl->primary_rowset, memory_per_rowset_during_extract(bl));
if (r!=0) { toku_brtloader_internal_destroy(bl, TRUE); return r; } if (r!=0) { toku_brtloader_internal_destroy(bl, TRUE); return r; }
} }
{ int r = queue_create(&bl->primary_rowset_queue, EXTRACTOR_QUEUE_DEPTH); { int r = queue_create(&bl->primary_rowset_queue, EXTRACTOR_QUEUE_DEPTH);
...@@ -921,7 +946,7 @@ static int loader_do_put(BRTLOADER bl, ...@@ -921,7 +946,7 @@ static int loader_do_put(BRTLOADER bl,
enqueue_for_extraction(bl); enqueue_for_extraction(bl);
BL_TRACE(blt_extract_enq); BL_TRACE(blt_extract_enq);
{ {
int r = init_rowset(&bl->primary_rowset, memory_per_rowset(bl)); int r = init_rowset(&bl->primary_rowset, memory_per_rowset_during_extract(bl));
// bl->primary_rowset will get destroyed by toku_brt_loader_abort // bl->primary_rowset will get destroyed by toku_brt_loader_abort
if (r != 0) if (r != 0)
result = r; result = r;
...@@ -1032,7 +1057,7 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r ...@@ -1032,7 +1057,7 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r
int r = sort_and_write_rows(*rows, fs, bl, i, bl->dbs[i], compare); // cannot spawn this because of the race on rows. If we were to create a new rows, and if sort_and_write_rows were to destroy the rows it is passed, we could spawn it, however. int r = sort_and_write_rows(*rows, fs, bl, i, bl->dbs[i], compare); // cannot spawn this because of the race on rows. If we were to create a new rows, and if sort_and_write_rows were to destroy the rows it is passed, we could spawn it, however.
// If we do spawn this, then we must account for the additional storage in the memory_per_rowset() function. // If we do spawn this, then we must account for the additional storage in the memory_per_rowset() function.
BL_TRACE(blt_sort_and_write_rows); BL_TRACE(blt_sort_and_write_rows);
init_rowset(rows, memory_per_rowset(bl)); // we passed the contents of rows to sort_and_write_rows. init_rowset(rows, memory_per_rowset_during_extract(bl)); // we passed the contents of rows to sort_and_write_rows.
if (r != 0) { if (r != 0) {
error_codes[i] = r; error_codes[i] = r;
inc_error_count(); inc_error_count();
...@@ -1537,7 +1562,7 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q ...@@ -1537,7 +1562,7 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q
struct rowset *output_rowset = NULL; struct rowset *output_rowset = NULL;
if (result==0 && to_q) { if (result==0 && to_q) {
XMALLOC(output_rowset); // freed in cleanup XMALLOC(output_rowset); // freed in cleanup
int r = init_rowset(output_rowset, memory_per_rowset(bl)); int r = init_rowset(output_rowset, memory_per_rowset_during_merge(bl, n_sources, to_q));
if (r!=0) result = r; if (r!=0) result = r;
} }
...@@ -1568,7 +1593,7 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q ...@@ -1568,7 +1593,7 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q
} }
XMALLOC(output_rowset); // freed in cleanup XMALLOC(output_rowset); // freed in cleanup
{ {
int r = init_rowset(output_rowset, memory_per_rowset(bl)); int r = init_rowset(output_rowset, memory_per_rowset_during_merge(bl, n_sources, to_q));
if (r!=0) { if (r!=0) {
result = r; result = r;
break; break;
...@@ -1678,7 +1703,7 @@ static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sou ...@@ -1678,7 +1703,7 @@ static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sou
} }
} }
if (result==0) { if (result==0) {
int r = create_dbufio_fileset(&bfs, n_sources, fds, MERGE_BUF_SIZE); int r = create_dbufio_fileset(&bfs, n_sources, fds, memory_per_rowset_during_merge(bl, n_sources, to_q));
if (r!=0) { result = r; } if (r!=0) { result = r; }
} }
...@@ -1733,9 +1758,12 @@ int merge_files (struct merge_fileset *fs, ...@@ -1733,9 +1758,12 @@ int merge_files (struct merge_fileset *fs,
{ {
//printf(" merge_files %d files\n", fs->n_temp_files); //printf(" merge_files %d files\n", fs->n_temp_files);
//printf(" merge_files use %d progress=%d fin at %d\n", progress_allocation, bl->progress, bl->progress+progress_allocation); //printf(" merge_files use %d progress=%d fin at %d\n", progress_allocation, bl->progress, bl->progress+progress_allocation);
const int mergelimit = (size_factor == 1) ? 4 : merge_fanin(bl); const int final_mergelimit = (size_factor == 1) ? 4 : merge_fanin(bl, TRUE); // try for a merge to the leaf level
int n_passes_left = (fs->n_temp_files==1) ? 1 : n_passes(fs->n_temp_files, mergelimit); const int earlier_mergelimit = (size_factor == 1) ? 4 : merge_fanin(bl, FALSE); // try for a merge at nonleaf.
//printf("%d files, %d per pass, %d passes\n", fs->n_temp_files, mergelimit, n_passes_left); int n_passes_left = (fs->n_temp_files<=final_mergelimit)
? 1
: 1+n_passes((fs->n_temp_files+final_mergelimit-1)/final_mergelimit, earlier_mergelimit);
//printf("%d files, %d on last pass, %d on earlier passes, %d passes\n", fs->n_temp_files, final_mergelimit, earlier_mergelimit, n_passes_left);
int result = 0; int result = 0;
while (fs->n_temp_files > 0) { while (fs->n_temp_files > 0) {
int progress_allocation_for_this_pass = progress_allocation/n_passes_left; int progress_allocation_for_this_pass = progress_allocation/n_passes_left;
...@@ -1744,11 +1772,11 @@ int merge_files (struct merge_fileset *fs, ...@@ -1744,11 +1772,11 @@ int merge_files (struct merge_fileset *fs,
invariant(fs->n_temp_files>0); invariant(fs->n_temp_files>0);
struct merge_fileset next_file_set; struct merge_fileset next_file_set;
BOOL to_queue = (BOOL)(fs->n_temp_files <= mergelimit); BOOL to_queue = (BOOL)(fs->n_temp_files <= final_mergelimit);
init_merge_fileset(&next_file_set); init_merge_fileset(&next_file_set);
while (fs->n_temp_files>0) { while (fs->n_temp_files>0) {
// grab some files and merge them. // grab some files and merge them.
int n_to_merge = int_min(mergelimit, fs->n_temp_files); int n_to_merge = int_min(to_queue?final_mergelimit:earlier_mergelimit, fs->n_temp_files);
// We are about to do n_to_merge/n_temp_files of the remaining for this pass. // We are about to do n_to_merge/n_temp_files of the remaining for this pass.
int progress_allocation_for_this_subpass = progress_allocation_for_this_pass * (double)n_to_merge / (double)fs->n_temp_files; int progress_allocation_for_this_subpass = progress_allocation_for_this_pass * (double)n_to_merge / (double)fs->n_temp_files;
...@@ -2414,7 +2442,7 @@ static int loader_do_i (BRTLOADER bl, ...@@ -2414,7 +2442,7 @@ static int loader_do_i (BRTLOADER bl,
progress_allocation -= allocation_for_merge; progress_allocation -= allocation_for_merge;
int r; int r;
r = queue_create(&bl->fractal_queues[which_db], 3); r = queue_create(&bl->fractal_queues[which_db], MERGE_QUEUE_DEPTH);
if (r) goto error; if (r) goto error;
{ {
......
...@@ -2,6 +2,9 @@ ...@@ -2,6 +2,9 @@
#ident "Copyright (c) 2010 Tokutek Inc. All rights reserved." #ident "Copyright (c) 2010 Tokutek Inc. All rights reserved."
#ident "$Id$" #ident "$Id$"
// Need to use malloc for the malloc instrumentation tests
#define TOKU_ALLOW_DEPRECATED
#include "test.h" #include "test.h"
#include "toku_pthread.h" #include "toku_pthread.h"
#include "toku_atomic.h" #include "toku_atomic.h"
...@@ -22,6 +25,44 @@ int ALLOW_DUPS=0; ...@@ -22,6 +25,44 @@ int ALLOW_DUPS=0;
enum {MAGIC=311}; enum {MAGIC=311};
char *datadir = NULL; char *datadir = NULL;
BOOL check_est = TRUE; // do check the estimates by default BOOL check_est = TRUE; // do check the estimates by default
BOOL footprint_print = FALSE; // print memory footprint info
// Code for showing memory footprint information.
pthread_mutex_t my_lock = PTHREAD_MUTEX_INITIALIZER;
size_t hiwater;
size_t water;
size_t hiwater_start;
static long long mcount = 0, fcount=0;
size_t malloc_usable_size(void *p);
static void my_free(void*p) {
if (p) {
water-=malloc_usable_size(p);
}
free(p);
}
static void *my_malloc(size_t size) {
void *r = malloc(size);
if (r) {
water += malloc_usable_size(r);
if (water>hiwater) hiwater=water;
}
return r;
}
static void *my_realloc(void *p, size_t size) {
size_t old_usable = p ? malloc_usable_size(p) : 0;
void *r = realloc(p, size);
if (r) {
water -= old_usable;
water += malloc_usable_size(r);
}
return r;
}
// //
// Functions to create unique key/value pairs, row generators, checkers, ... for each of NUM_DBS // Functions to create unique key/value pairs, row generators, checkers, ... for each of NUM_DBS
...@@ -271,8 +312,11 @@ static void test_loader(DB **dbs) ...@@ -271,8 +312,11 @@ static void test_loader(DB **dbs)
// create and initialize loader // create and initialize loader
r = env->txn_begin(env, NULL, &txn, 0); r = env->txn_begin(env, NULL, &txn, 0);
CKERR(r); CKERR(r);
hiwater_start = hiwater;
if (footprint_print) printf("%s:%d Hiwater=%ld water=%ld\n", __FILE__, __LINE__, hiwater, water);
r = env->create_loader(env, txn, &loader, dbs[0], NUM_DBS, dbs, db_flags, dbt_flags, loader_flags); r = env->create_loader(env, txn, &loader, dbs[0], NUM_DBS, dbs, db_flags, dbt_flags, loader_flags);
CKERR(r); CKERR(r);
if (footprint_print) printf("%s:%d Hiwater=%ld water=%ld\n", __FILE__, __LINE__, hiwater, water);
r = loader->set_error_callback(loader, NULL, NULL); r = loader->set_error_callback(loader, NULL, NULL);
CKERR(r); CKERR(r);
r = loader->set_poll_function(loader, poll_function, expect_poll_void); r = loader->set_poll_function(loader, poll_function, expect_poll_void);
...@@ -302,7 +346,9 @@ static void test_loader(DB **dbs) ...@@ -302,7 +346,9 @@ static void test_loader(DB **dbs)
// close the loader // close the loader
printf("%9.6fs closing\n", elapsed_time()); printf("%9.6fs closing\n", elapsed_time());
if (footprint_print) printf("%s:%d Hiwater=%ld water=%ld\n", __FILE__, __LINE__, hiwater, water);
r = loader->close(loader); r = loader->close(loader);
if (footprint_print) printf("%s:%d Hiwater=%ld water=%ld (extra hiwater=%ldM)\n", __FILE__, __LINE__, hiwater, water, (hiwater-hiwater_start)/(1024*1024));
printf("%9.6fs done\n", elapsed_time()); printf("%9.6fs done\n", elapsed_time());
CKERR2s(r,0,TOKUDB_CANCELED); CKERR2s(r,0,TOKUDB_CANCELED);
...@@ -414,11 +460,13 @@ static void run_test(void) ...@@ -414,11 +460,13 @@ static void run_test(void)
toku_free(dbs); toku_free(dbs);
} }
// ------------ infrastructure ---------- // ------------ infrastructure ----------
static void do_args(int argc, char * const argv[]); static void do_args(int argc, char * const argv[]);
int test_main(int argc, char * const *argv) { int test_main(int argc, char * const *argv) {
do_args(argc, argv); do_args(argc, argv);
run_test(); run_test();
if (free_me) toku_free(free_me); if (free_me) toku_free(free_me);
...@@ -432,10 +480,26 @@ int test_main(int argc, char * const *argv) { ...@@ -432,10 +480,26 @@ int test_main(int argc, char * const *argv) {
} }
toku_free(progress_infos); toku_free(progress_infos);
} }
if (footprint_print) {
printf("%s:%d Hiwater=%ld water=%ld (extra hiwater=%ldM) mcount=%lld fcount=%lld\n", __FILE__, __LINE__, hiwater, water, (hiwater-hiwater_start)/(1024*1024), mcount, fcount);
extern void malloc_stats(void);
malloc_stats();
}
return 0; return 0;
} }
static void do_args(int argc, char * const argv[]) { static void do_args(int argc, char * const argv[]) {
// Must look for "-f" right away before we malloc anything.
for (int i=1; i<argc; i++) {
if (strcmp(argv[i], "-f")) {
db_env_set_func_malloc(my_malloc);
db_env_set_func_realloc(my_realloc);
db_env_set_func_free(my_free);
}
}
int resultcode; int resultcode;
char *cmd = argv[0]; char *cmd = argv[0];
argc--; argv++; argc--; argv++;
...@@ -452,11 +516,13 @@ static void do_args(int argc, char * const argv[]) { ...@@ -452,11 +516,13 @@ static void do_args(int argc, char * const argv[]) {
resultcode=0; resultcode=0;
do_usage: do_usage:
fprintf(stderr, "Usage: -h -c -d <num_dbs> -r <num_rows> [ -b <num_calls> ] [-m <megabytes>] [-M]\n%s\n", cmd); fprintf(stderr, "Usage: -h -c -d <num_dbs> -r <num_rows> [ -b <num_calls> ] [-m <megabytes>] [-M]\n%s\n", cmd);
fprintf(stderr, " where -b <num_calls> causes the poll function to return nonzero after <num_calls>\n"); fprintf(stderr, " where -d <num_dbs> is the number of dictionaries to build (primary & secondary). (Default=%d)\n", NUM_DBS);
fprintf(stderr, " -b <num_calls> causes the poll function to return nonzero after <num_calls>\n");
fprintf(stderr, " -e <env> uses <env> to construct the directory (so that different tests can run concurrently)\n"); fprintf(stderr, " -e <env> uses <env> to construct the directory (so that different tests can run concurrently)\n");
fprintf(stderr, " -m <m> use m MB of memory for the cachetable (default is %d MB)\n", CACHESIZE); fprintf(stderr, " -m <m> use m MB of memory for the cachetable (default is %d MB)\n", CACHESIZE);
fprintf(stderr, " -M use %d MB of memory for the cachetable\n", old_default_cachesize); fprintf(stderr, " -M use %d MB of memory for the cachetable\n", old_default_cachesize);
fprintf(stderr, " -s use size factor of 1 and count temporary files\n"); fprintf(stderr, " -s use size factor of 1 and count temporary files\n");
fprintf(stderr, " -f print memory footprint information at various points in the load\n");
exit(resultcode); exit(resultcode);
} else if (strcmp(argv[0], "-d")==0) { } else if (strcmp(argv[0], "-d")==0) {
argc--; argv++; argc--; argv++;
...@@ -479,6 +545,8 @@ static void do_args(int argc, char * const argv[]) { ...@@ -479,6 +545,8 @@ static void do_args(int argc, char * const argv[]) {
} else if (strcmp(argv[0],"-q")==0) { } else if (strcmp(argv[0],"-q")==0) {
verbose--; verbose--;
if (verbose<0) verbose=0; if (verbose<0) verbose=0;
} else if (strcmp(argv[0], "-f")==0) {
footprint_print = TRUE;
} else if (strcmp(argv[0], "-r")==0) { } else if (strcmp(argv[0], "-r")==0) {
argc--; argv++; argc--; argv++;
NUM_ROWS = atoi(argv[0]); NUM_ROWS = atoi(argv[0]);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment