Commit d98b2702 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Merge in memory management for the extractor thread. Refs #2613. [t:2613].

{{{
svn merge -r 20278:20301 https://svn.tokutek.com/tokudb/toku/tokudb.2613
}}}
.


git-svn-id: file:///svn/toku/tokudb@20302 c7de825b-a66e-492c-adef-691d508d4ae1
parent 1380e291
...@@ -47,13 +47,14 @@ struct row { ...@@ -47,13 +47,14 @@ struct row {
int klen,vlen; int klen,vlen;
}; };
struct rowset { struct rowset {
uint64_t memory_budget;
size_t n_rows, n_rows_limit; size_t n_rows, n_rows_limit;
struct row *rows; struct row *rows;
size_t n_bytes, n_bytes_limit; size_t n_bytes, n_bytes_limit;
char *data; char *data;
}; };
int init_rowset (struct rowset *rows); int init_rowset (struct rowset *rows, uint64_t memory_budget);
void destroy_rowset (struct rowset *rows); void destroy_rowset (struct rowset *rows);
void add_row (struct rowset *rows, DBT *key, DBT *val); void add_row (struct rowset *rows, DBT *key, DBT *val);
......
...@@ -78,7 +78,6 @@ static size_t do_fwrite (const void *ptr, size_t size, size_t nmemb, FILE *strea ...@@ -78,7 +78,6 @@ static size_t do_fwrite (const void *ptr, size_t size, size_t nmemb, FILE *strea
// 1024 is the right size_factor for production. // 1024 is the right size_factor for production.
// Different values for these sizes may be used for testing. // Different values for these sizes may be used for testing.
static uint32_t size_factor = 1024; static uint32_t size_factor = 1024;
static size_t data_buffer_limit = 1024*1024*64;
static int nodesize = (1<<22); static int nodesize = (1<<22);
...@@ -87,10 +86,15 @@ void ...@@ -87,10 +86,15 @@ void
toku_brtloader_set_size_factor(uint32_t factor) { toku_brtloader_set_size_factor(uint32_t factor) {
// For test purposes only // For test purposes only
size_factor = factor; size_factor = factor;
data_buffer_limit = 1024*size_factor*64;
nodesize = (size_factor==1) ? (1<<15) : (1<<22); nodesize = (size_factor==1) ? (1<<15) : (1<<22);
} }
uint64_t
toku_brtloader_get_rowset_budget_for_testing (void)
// For test purposes only. In production, the rowset size is determined by negotation with the cachetable for some memory. (See #2613).
{
return 16ULL*size_factor*1024ULL;
}
static int add_big_buffer(struct file_info *file) { static int add_big_buffer(struct file_info *file) {
int result = 0; int result = 0;
...@@ -311,6 +315,22 @@ static void brtloader_destroy (BRTLOADER bl, BOOL is_error) { ...@@ -311,6 +315,22 @@ static void brtloader_destroy (BRTLOADER bl, BOOL is_error) {
static void *extractor_thread (void*); static void *extractor_thread (void*);
enum { EXTRACTOR_QUEUE_DEPTH = 2};
static uint64_t memory_per_rowset (BRTLOADER bl) {
// There is a primary rowset being maintained by the foreground thread.
// There could be two more in the queue.
// There is one rowset for each index (bl->N) being filled in.
// Later we may have sort_and_write operations spawning in parallel, and will need to account for that.
int n_copies = (1 // primary rowset
+2 // the two primaries in the queue
+bl->N // the N rowsets being constructed by the extrator thread.
+1 // Give the extractor thread one more so that it can have temporary space for sorting. This is overkill.
);
return bl->reserved_memory/(n_copies);
}
// RFP 2535 error recovery in the loader open needs to be replaced // RFP 2535 error recovery in the loader open needs to be replaced
int toku_brt_loader_open (/* out */ BRTLOADER *blp, int toku_brt_loader_open (/* out */ BRTLOADER *blp,
CACHETABLE cachetable, CACHETABLE cachetable,
...@@ -350,6 +370,9 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp, ...@@ -350,6 +370,9 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp,
bl->cachetable = cachetable; bl->cachetable = cachetable;
if (bl->cachetable) if (bl->cachetable)
bl->reserved_memory = toku_cachetable_reserve_memory(bl->cachetable, 0.5); bl->reserved_memory = toku_cachetable_reserve_memory(bl->cachetable, 0.5);
else
bl->reserved_memory = 512*1024*1024; // if no cache table use 512MB.
//printf("Reserved memory=%ld\n", bl->reserved_memory);
bl->src_db = src_db; bl->src_db = src_db;
bl->N = N; bl->N = N;
...@@ -382,16 +405,17 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp, ...@@ -382,16 +405,17 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp,
bl->rows = (struct rowset *) toku_malloc(N*sizeof(struct rowset)); if (bl->rows == NULL) return errno; bl->rows = (struct rowset *) toku_malloc(N*sizeof(struct rowset)); if (bl->rows == NULL) return errno;
bl->fs = (struct merge_fileset *) toku_malloc(N*sizeof(struct merge_fileset)); if (bl->rows == NULL) return errno; bl->fs = (struct merge_fileset *) toku_malloc(N*sizeof(struct merge_fileset)); if (bl->rows == NULL) return errno;
for(int i=0;i<N;i++) { for(int i=0;i<N;i++) {
{ int r = init_rowset(&bl->rows[i]); if (r!=0) return r; } { int r = init_rowset(&bl->rows[i], memory_per_rowset(bl)); if (r!=0) return r; }
init_merge_fileset(&bl->fs[i]); init_merge_fileset(&bl->fs[i]);
} }
brt_loader_init_error_callback(&bl->error_callback); brt_loader_init_error_callback(&bl->error_callback);
brt_loader_init_poll_callback(&bl->poll_callback); brt_loader_init_poll_callback(&bl->poll_callback);
{ int r = init_rowset(&bl->primary_rowset); if (r!=0) return r; } { int r = init_rowset(&bl->primary_rowset, memory_per_rowset(bl)); if (r!=0) return r; }
{ int r = queue_create(&bl->primary_rowset_queue, 2); if (r!=0) return r; } { int r = queue_create(&bl->primary_rowset_queue, EXTRACTOR_QUEUE_DEPTH); if (r!=0) return r; }
//printf("%s:%d toku_pthread_create\n", __FILE__, __LINE__); //printf("%s:%d toku_pthread_create\n", __FILE__, __LINE__);
{ int r = toku_pthread_mutex_init(&bl->mutex, NULL); if (r != 0) return r; } { int r = toku_pthread_mutex_init(&bl->mutex, NULL); if (r != 0) return r; }
{ int r = toku_pthread_create(&bl->extractor_thread, NULL, extractor_thread, (void*)bl); if (r!=0) return r; } { int r = toku_pthread_create(&bl->extractor_thread, NULL, extractor_thread, (void*)bl); if (r!=0) return r; }
...@@ -606,11 +630,13 @@ static int loader_read_row_from_dbufio (DBUFIO_FILESET bfs, int filenum, DBT *ke ...@@ -606,11 +630,13 @@ static int loader_read_row_from_dbufio (DBUFIO_FILESET bfs, int filenum, DBT *ke
} }
int init_rowset (struct rowset *rows) int init_rowset (struct rowset *rows, uint64_t memory_budget)
/* Effect: Initialize a collection of rows to be empty. */ /* Effect: Initialize a collection of rows to be empty. */
{ {
int result = 0; int result = 0;
rows->memory_budget = memory_budget;
rows->rows = NULL; rows->rows = NULL;
rows->data = NULL; rows->data = NULL;
...@@ -620,7 +646,8 @@ int init_rowset (struct rowset *rows) ...@@ -620,7 +646,8 @@ int init_rowset (struct rowset *rows)
if (rows->rows == NULL) if (rows->rows == NULL)
result = errno; result = errno;
rows->n_bytes = 0; rows->n_bytes = 0;
rows->n_bytes_limit = 1024*size_factor*16; rows->n_bytes_limit = (size_factor==1) ? 1024*size_factor*16 : memory_budget;
//printf("%s:%d n_bytes_limit=%ld (size_factor based limit=%d)\n", __FILE__, __LINE__, rows->n_bytes_limit, 1024*size_factor*16);
rows->data = (char *) toku_malloc(rows->n_bytes_limit); rows->data = (char *) toku_malloc(rows->n_bytes_limit);
if (rows->rows==NULL || rows->data==NULL) { if (rows->rows==NULL || rows->data==NULL) {
if (result == 0) if (result == 0)
...@@ -646,7 +673,10 @@ void destroy_rowset (struct rowset *rows) { ...@@ -646,7 +673,10 @@ void destroy_rowset (struct rowset *rows) {
static int row_wont_fit (struct rowset *rows, size_t size) static int row_wont_fit (struct rowset *rows, size_t size)
/* Effect: Return nonzero if adding a row of size SIZE would be too big (bigger than the buffer limit) */ /* Effect: Return nonzero if adding a row of size SIZE would be too big (bigger than the buffer limit) */
{ {
return (data_buffer_limit < rows->n_bytes + size); // Account for the memory used by the data and also the row structures.
size_t memory_in_use = (rows->n_rows*sizeof(struct row)
+ rows->n_bytes);
return (rows->memory_budget < memory_in_use + size);
} }
void add_row (struct rowset *rows, DBT *key, DBT *val) void add_row (struct rowset *rows, DBT *key, DBT *val)
...@@ -769,7 +799,7 @@ static int loader_do_put(BRTLOADER bl, ...@@ -769,7 +799,7 @@ static int loader_do_put(BRTLOADER bl,
BL_TRACE(blt_do_put); BL_TRACE(blt_do_put);
enqueue_for_extraction(bl); enqueue_for_extraction(bl);
BL_TRACE(blt_extract_enq); BL_TRACE(blt_extract_enq);
{int r = init_rowset(&bl->primary_rowset); lazy_assert(r==0);} {int r = init_rowset(&bl->primary_rowset, memory_per_rowset(bl)); lazy_assert(r==0);}
} }
return 0; return 0;
} }
...@@ -857,12 +887,13 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r ...@@ -857,12 +887,13 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r
} }
if (row_wont_fit(rows, skey.size + sval.size)) { if (row_wont_fit(rows, skey.size + sval.size)) {
//printf("rows.n_rows=%ld\n", rows.n_rows); //printf("%s:%d rows.n_rows=%ld rows.n_bytes=%ld\n", __FILE__, __LINE__, rows->n_rows, rows->n_bytes);
BL_TRACE(blt_extractor); BL_TRACE(blt_extractor);
int progress_this_sort = 0; // fix? int progress_this_sort = 0; // fix?
int r = sort_and_write_rows(*rows, fs, bl, i, bl->dbs[i], compare, progress_this_sort); // cannot spawn this because of the race on rows. If we were to create a new rows, and if sort_and_write_rows were to destroy the rows it is passed, we could spawn it, however. int r = sort_and_write_rows(*rows, fs, bl, i, bl->dbs[i], compare, progress_this_sort); // cannot spawn this because of the race on rows. If we were to create a new rows, and if sort_and_write_rows were to destroy the rows it is passed, we could spawn it, however.
// If we do spawn this, then we must account for the additional storage in the memory_per_rowset() function.
BL_TRACE(blt_sort_and_write_rows); BL_TRACE(blt_sort_and_write_rows);
init_rowset(rows); // we passed the contents of rows to sort_and_write_rows. init_rowset(rows, memory_per_rowset(bl)); // we passed the contents of rows to sort_and_write_rows.
if (r!=0) { if (r!=0) {
error_codes[i] = r; error_codes[i] = r;
#if defined(__cilkplusplus) #if defined(__cilkplusplus)
...@@ -1360,7 +1391,7 @@ static int merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE ...@@ -1360,7 +1391,7 @@ static int merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE
if (to_q) { if (to_q) {
MALLOC(output_rowset); MALLOC(output_rowset);
assert(output_rowset); assert(output_rowset);
int r = init_rowset(output_rowset); int r = init_rowset(output_rowset, memory_per_rowset(bl));
assert(r==0); assert(r==0);
} }
...@@ -1391,7 +1422,7 @@ static int merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE ...@@ -1391,7 +1422,7 @@ static int merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE
assert(r==0); assert(r==0);
MALLOC(output_rowset); MALLOC(output_rowset);
assert(output_rowset); assert(output_rowset);
r = init_rowset(output_rowset); r = init_rowset(output_rowset, memory_per_rowset(bl));
assert(r==0); assert(r==0);
} }
add_row(output_rowset, &keys[mini], &vals[mini]); add_row(output_rowset, &keys[mini], &vals[mini]);
......
...@@ -43,6 +43,9 @@ void brtloader_set_os_fwrite (size_t (*fwrite_fun)(const void*,size_t,size_t,FIL ...@@ -43,6 +43,9 @@ void brtloader_set_os_fwrite (size_t (*fwrite_fun)(const void*,size_t,size_t,FIL
// For test purposes only // For test purposes only
void toku_brtloader_set_size_factor (uint32_t factor); void toku_brtloader_set_size_factor (uint32_t factor);
// For test purposes only. (In production, the rowset size is determined by negotation with the cachetable for some memory. See #2613.)
uint64_t toku_brtloader_get_rowset_budget_for_testing (void);
int toku_brt_loader_finish_extractor(BRTLOADER bl); int toku_brt_loader_finish_extractor(BRTLOADER bl);
......
...@@ -145,7 +145,7 @@ static void test_extractor(int nrows, int nrowsets, BOOL expect_fail) { ...@@ -145,7 +145,7 @@ static void test_extractor(int nrows, int nrowsets, BOOL expect_fail) {
for (int i = 0 ; i < nrowsets; i++) { for (int i = 0 ; i < nrowsets; i++) {
rowset[i] = (struct rowset *) toku_malloc(sizeof (struct rowset)); rowset[i] = (struct rowset *) toku_malloc(sizeof (struct rowset));
assert(rowset[i]); assert(rowset[i]);
init_rowset(rowset[i]); init_rowset(rowset[i], toku_brtloader_get_rowset_budget_for_testing());
populate_rowset(rowset[i], i, nrows); populate_rowset(rowset[i], i, nrows);
} }
......
...@@ -164,7 +164,7 @@ static void test_extractor(int nrows, int nrowsets, BOOL expect_fail) { ...@@ -164,7 +164,7 @@ static void test_extractor(int nrows, int nrowsets, BOOL expect_fail) {
for (int i = 0 ; i < nrowsets; i++) { for (int i = 0 ; i < nrowsets; i++) {
rowset[i] = (struct rowset *) toku_malloc(sizeof (struct rowset)); rowset[i] = (struct rowset *) toku_malloc(sizeof (struct rowset));
assert(rowset[i]); assert(rowset[i]);
init_rowset(rowset[i]); init_rowset(rowset[i], toku_brtloader_get_rowset_budget_for_testing());
populate_rowset(rowset[i], i, nrows); populate_rowset(rowset[i], i, nrows);
} }
......
...@@ -143,7 +143,7 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_ ...@@ -143,7 +143,7 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_
// put rows in the row set // put rows in the row set
struct rowset aset; struct rowset aset;
init_rowset(&aset); init_rowset(&aset, toku_brtloader_get_rowset_budget_for_testing());
for (int i=0; i<n; i++) { for (int i=0; i<n; i++) {
DBT key = {.size=sizeof i, DBT key = {.size=sizeof i,
.data=&i}; .data=&i};
......
...@@ -91,7 +91,7 @@ static void test_write_dbfile (char *template, int n, char *output_name) { ...@@ -91,7 +91,7 @@ static void test_write_dbfile (char *template, int n, char *output_name) {
// put rows in the row set // put rows in the row set
struct rowset aset; struct rowset aset;
init_rowset(&aset); init_rowset(&aset, toku_brtloader_get_rowset_budget_for_testing());
for (int i=0; i<n; i++) { for (int i=0; i<n; i++) {
DBT key = {.size=sizeof i, DBT key = {.size=sizeof i,
.data=&i}; .data=&i};
......
...@@ -230,7 +230,7 @@ static void fill_rowset (struct rowset *rows, ...@@ -230,7 +230,7 @@ static void fill_rowset (struct rowset *rows,
int keys[], int keys[],
const char *vals[], const char *vals[],
int n) { int n) {
init_rowset(rows); init_rowset(rows, toku_brtloader_get_rowset_budget_for_testing());
for (int i=0; i<n; i++) { for (int i=0; i<n; i++) {
DBT key = {.size=sizeof(keys[i]), DBT key = {.size=sizeof(keys[i]),
.data=&keys[i]}; .data=&keys[i]};
......
...@@ -15,7 +15,8 @@ int NUM_DBS=5; ...@@ -15,7 +15,8 @@ int NUM_DBS=5;
int NUM_ROWS=100000; int NUM_ROWS=100000;
int CHECK_RESULTS=0; int CHECK_RESULTS=0;
int USE_PUTS=0; int USE_PUTS=0;
int CACHESIZE=1024; // MB enum { default_cachesize=1024 }; // MB
int CACHESIZE=default_cachesize; // MB
int ALLOW_DUPS=0; int ALLOW_DUPS=0;
enum {MAGIC=311}; enum {MAGIC=311};
...@@ -358,9 +359,11 @@ static void do_args(int argc, char * const argv[]) { ...@@ -358,9 +359,11 @@ static void do_args(int argc, char * const argv[]) {
} else if (strcmp(argv[0], "-h")==0) { } else if (strcmp(argv[0], "-h")==0) {
resultcode=0; resultcode=0;
do_usage: do_usage:
fprintf(stderr, "Usage: -h -c -d <num_dbs> -r <num_rows> [ -b <num_calls> ]\n%s\n", cmd); fprintf(stderr, "Usage: -h -c -d <num_dbs> -r <num_rows> [ -b <num_calls> ] [-m <megabytes>] [-M]\n%s\n", cmd);
fprintf(stderr, " where -b <num_calls> causes the poll function to return nonzero after <num_calls>\n"); fprintf(stderr, " where -b <num_calls> causes the poll function to return nonzero after <num_calls>\n");
fprintf(stderr, " -e <env> uses <env> to construct the directory (so that different tests of loader-stress-test can run concurrently)\n"); fprintf(stderr, " -e <env> uses <env> to construct the directory (so that different tests of loader-stress-test can run concurrently)\n");
fprintf(stderr, " -m <m> use m MB of memeory for the cachetable (defualt is %d MB)\n", default_cachesize);
fprintf(stderr, " -M use half of physical memory for the cachetable\n");
exit(resultcode); exit(resultcode);
} else if (strcmp(argv[0], "-d")==0) { } else if (strcmp(argv[0], "-d")==0) {
argc--; argv++; argc--; argv++;
...@@ -393,6 +396,8 @@ static void do_args(int argc, char * const argv[]) { ...@@ -393,6 +396,8 @@ static void do_args(int argc, char * const argv[]) {
} else if (strcmp(argv[0], "-m")==0) { } else if (strcmp(argv[0], "-m")==0) {
argc--; argv++; argc--; argv++;
CACHESIZE = atoi(argv[0]); CACHESIZE = atoi(argv[0]);
} else if (strcmp(argv[0], "-M")==0) {
CACHESIZE = (toku_os_get_phys_memory_size()/(1024*1024))/2;
} else if (strcmp(argv[0], "-y")==0) { } else if (strcmp(argv[0], "-y")==0) {
ALLOW_DUPS = 1; ALLOW_DUPS = 1;
} else if (strcmp(argv[0], "-b")==0) { } else if (strcmp(argv[0], "-b")==0) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment