Commit 73c323a8 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Fix #2533. (Avoid runts and don't make maximal fanout in loader). close[t:2533]

git-svn-id: file:///svn/toku/tokudb@20459 c7de825b-a66e-492c-adef-691d508d4ae1
parent 79f16c50
......@@ -119,9 +119,11 @@ struct brtloader_s {
DB *src_db;
int N;
DB **dbs;
const struct descriptor **descriptors; // N of these
const char **new_fnames_in_env; // the file names that the final data will be written to (relative to env).
DB **dbs; // N of these
const struct descriptor **descriptors; // N of these.
const char **new_fnames_in_env; // N of these. The file names that the final data will be written to (relative to env).
uint64_t *extracted_datasizes; // N of these.
struct rowset primary_rowset; // the primary rows that have been put, but the secondary rows haven't been generated.
struct rowset primary_rowset_temp; // the primary rows that are being worked on by the extractor_thread.
......@@ -168,12 +170,13 @@ u_int64_t toku_brt_loader_get_n_rows(BRTLOADER bl);
// The data passed into a fractal_thread via pthread_create.
struct fractal_thread_args {
BRTLOADER bl;
BRTLOADER bl;
const struct descriptor *descriptor;
int fd; // write the brt into tfd.
int progress_allocation;
QUEUE q;
int errno_result; // the final result.
int fd; // write the brt into tfd.
int progress_allocation;
QUEUE q;
uint64_t total_disksize_estimate;
int errno_result; // the final result.
};
void toku_brt_loader_set_n_rows(BRTLOADER bl, u_int64_t n_rows);
......@@ -200,11 +203,13 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q
int brt_loader_sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func,
int progress_allocation);
int toku_loader_write_brt_from_q_in_C (BRTLOADER bl,
// This is probably only for testing.
int toku_loader_write_brt_from_q_in_C (BRTLOADER bl,
const struct descriptor *descriptor,
int fd, // write to here
int progress_allocation,
QUEUE q);
int fd, // write to here
int progress_allocation,
QUEUE q,
uint64_t total_disksize_estimate);
int brt_loader_mergesort_row_array (struct row rows[/*n*/], int n, int which_db, DB *dest_db, brt_compare_func, BRTLOADER, struct rowset *);
......@@ -229,6 +234,8 @@ int toku_brt_loader_internal_init (/* out */ BRTLOADER *blp,
void toku_brtloader_internal_destroy (BRTLOADER bl, BOOL is_error);
enum { disksize_row_overhead = 9 }; // how much overhead for a row in the fractal tree
C_END
#endif
......@@ -295,6 +295,7 @@ void toku_brtloader_internal_destroy (BRTLOADER bl, BOOL is_error) {
if (bl->new_fnames_in_env)
toku_free((char*)bl->new_fnames_in_env[i]);
}
toku_free(bl->extracted_datasizes);
toku_free(bl->new_fnames_in_env);
toku_free(bl->bt_compare_funs);
toku_free((char*)bl->temp_file_template);
......@@ -406,6 +407,7 @@ int toku_brt_loader_internal_init (/* out */ BRTLOADER *blp,
for (int i=0; i<N; i++) bl->descriptors[i]=descriptors[i];
MY_CALLOC_N(N, bl->new_fnames_in_env);
for (int i=0; i<N; i++) SET_TO_MY_STRDUP(bl->new_fnames_in_env[i], new_fnames_in_env[i]);
MY_CALLOC_N(N, bl->extracted_datasizes); // the calloc_n zeroed everything, which is what we want
MY_CALLOC_N(N, bl->bt_compare_funs);
for (int i=0; i<N; i++) bl->bt_compare_funs[i] = bt_compare_functions[i];
......@@ -951,10 +953,11 @@ static DBT make_dbt (void *data, u_int32_t size) {
}
// gcc 4.1 does not like f&a
// Previously this macro was defined without "()". This macro should look like a function call not a variable dereference, however.
#if defined(__cilkplusplus)
#define inc_error_count __sync_fetch_and_add(&error_count, 1)
#define inc_error_count() __sync_fetch_and_add(&error_count, 1)
#else
#define inc_error_count error_count++
#define inc_error_count() error_count++
#endif
CILK_BEGIN
......@@ -999,11 +1002,13 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r
int r = bl->generate_row_for_put(bl->dbs[i], bl->src_db, &skey, &sval, &pkey, &pval, NULL);
if (r != 0) {
error_codes[i] = r;
inc_error_count;
inc_error_count();
break;
}
}
bl->extracted_datasizes[i] += skey.size + sval.size + disksize_row_overhead;
if (row_wont_fit(rows, skey.size + sval.size)) {
//printf("%s:%d rows.n_rows=%ld rows.n_bytes=%ld\n", __FILE__, __LINE__, rows->n_rows, rows->n_bytes);
BL_TRACE(blt_extractor);
......@@ -1014,14 +1019,14 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r
init_rowset(rows, memory_per_rowset(bl)); // we passed the contents of rows to sort_and_write_rows.
if (r != 0) {
error_codes[i] = r;
inc_error_count;
inc_error_count();
break;
}
}
int r = add_row(rows, &skey, &sval);
if (r != 0) {
error_codes[i] = r;
inc_error_count;
inc_error_count();
break;
}
......@@ -2069,7 +2074,8 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
const struct descriptor *descriptor,
int fd, // write to here
int progress_allocation,
QUEUE q)
QUEUE q,
uint64_t total_disksize_estimate)
// Effect: Consume a sequence of rowsets work from a queue, creating a fractal tree. Closes fd.
{
int result = 0;
......@@ -2131,6 +2137,8 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
u_int64_t n_rows_remaining = bl->n_rows;
u_int64_t old_n_rows_remaining = bl->n_rows;
uint64_t used_estimate = 0; // how much diskspace have we used up?
while (1) {
void *item;
{
......@@ -2149,8 +2157,22 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
DBT key = make_dbt(output_rowset->data+output_rowset->rows[i].off, output_rowset->rows[i].klen);
DBT val = make_dbt(output_rowset->data+output_rowset->rows[i].off + output_rowset->rows[i].klen, output_rowset->rows[i].vlen);
if (lbuf->dbuf.off >= nodesize) {
used_estimate += key.size + val.size + disksize_row_overhead;
// Spawn off a node if
// a) this item would make the nodesize too big, or
// b) the remaining amount won't fit in the current node and the current node's data is more than the remaining amount
int remaining_amount = total_disksize_estimate - used_estimate;
int used_here = lbuf->dbuf.off + 1000; // leave 1000 for various overheads.
int target_size = (nodesize*7L)/8; // use only 7/8 of the node.
int used_here_with_next_key = used_here + key.size + val.size + disksize_row_overhead;
if ((used_here_with_next_key >= target_size)
|| (used_here + remaining_amount >= target_size
&& lbuf->dbuf.off > remaining_amount)) {
//if (used_here_with_next_key < target_size) {
// printf("%s:%d Runt avoidance: used_here=%d, remaining_amount=%d target_size=%d dbuf.off=%d\n", __FILE__, __LINE__, used_here, remaining_amount, target_size, lbuf->dbuf.off);
//}
int progress_this_node = progress_allocation * (double)(old_n_rows_remaining - n_rows_remaining)/(double)old_n_rows_remaining;
progress_allocation -= progress_this_node;
old_n_rows_remaining = n_rows_remaining;
......@@ -2194,6 +2216,11 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
result = bl->panic_errno; goto error;
}
// We haven't paniced, so the sum should add up.
if (result==0) {
invariant(used_estimate == total_disksize_estimate);
}
n_pivots++;
{
......@@ -2279,17 +2306,18 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
}
CILK_END
int toku_loader_write_brt_from_q_in_C (BRTLOADER bl,
int toku_loader_write_brt_from_q_in_C (BRTLOADER bl,
const struct descriptor *descriptor,
int fd, // write to here
int progress_allocation,
QUEUE q)
int fd, // write to here
int progress_allocation,
QUEUE q,
uint64_t total_disksize_estimate)
// This is probably only for testing.
{
#if defined(__cilkplusplus)
return cilk::run(toku_loader_write_brt_from_q, bl, descriptor, fd, progress_allocation, q);
return cilk::run(toku_loader_write_brt_from_q, bl, descriptor, fd, progress_allocation, q, total_disksize_estimate);
#else
return toku_loader_write_brt_from_q (bl, descriptor, fd, progress_allocation, q);
return toku_loader_write_brt_from_q (bl, descriptor, fd, progress_allocation, q, total_disksize_estimate);
#endif
}
......@@ -2298,9 +2326,9 @@ static void* fractal_thread (void *ftav) {
BL_TRACE(blt_start_fractal_thread);
struct fractal_thread_args *fta = (struct fractal_thread_args *)ftav;
#if defined(__cilkplusplus)
int r = cilk::run(toku_loader_write_brt_from_q, fta->bl, fta->descriptor, fta->fd, fta->progress_allocation, fta->q);
int r = cilk::run(toku_loader_write_brt_from_q, fta->bl, fta->descriptor, fta->fd, fta->progress_allocation, fta->q, fta->total_disksize_estimate);
#else
int r = toku_loader_write_brt_from_q (fta->bl, fta->descriptor, fta->fd, fta->progress_allocation, fta->q);
int r = toku_loader_write_brt_from_q (fta->bl, fta->descriptor, fta->fd, fta->progress_allocation, fta->q, fta->total_disksize_estimate);
#endif
fta->errno_result = r;
return NULL;
......@@ -2315,6 +2343,7 @@ static int loader_do_i (BRTLOADER bl,
int progress_allocation // how much progress do I need to add into bl->progress by the end..
)
/* Effect: Handle the file creating for one particular DB in the bulk loader. */
/* Requires: The data is fully extracted, so we can do merges out of files and write the brt file. */
{
//printf("doing i use %d progress=%d fin at %d\n", progress_allocation, bl->progress, bl->progress+progress_allocation);
struct merge_fileset *fs = &(bl->fs[which_db]);
......@@ -2337,12 +2366,13 @@ static int loader_do_i (BRTLOADER bl,
}
// This structure must stay live until the join below.
struct fractal_thread_args fta = {bl,
descriptor,
fd,
progress_allocation,
bl->fractal_queues[which_db],
0 // result
struct fractal_thread_args fta = {.bl = bl,
.descriptor = descriptor,
.fd = fd,
.progress_allocation = progress_allocation,
.q = bl->fractal_queues[which_db],
.total_disksize_estimate = bl->extracted_datasizes[which_db],
.errno_result = 0
};
r = toku_pthread_create(bl->fractal_threads+which_db, NULL, fractal_thread, (void*)&fta);
......@@ -2924,7 +2954,7 @@ static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, s
next_sts.n_subtrees_limit = 1;
XMALLOC_N(next_sts.n_subtrees_limit, next_sts.subtrees);
const int n_per_block = 16;
const int n_per_block = 15;
int64_t n_subtrees_used = 0;
while (sts->n_subtrees - n_subtrees_used >= n_per_block*2) {
// grab the first N_PER_BLOCK and build a node.
......
......@@ -171,6 +171,7 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_
// put rows in the row set
struct rowset aset;
uint64_t size_est = 0;
init_rowset(&aset, toku_brtloader_get_rowset_budget_for_testing());
for (int i=0; i<n; i++) {
DBT key = {.size=sizeof i,
......@@ -178,6 +179,7 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_
DBT val = {.size=sizeof i,
.data=&i};
add_row(&aset, &key, &val);
size_est += key.size + val.size + disksize_row_overhead;
}
toku_brt_loader_set_n_rows(&bl, n);
......@@ -199,6 +201,7 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_
assert(r==0);
size_t num_found = 0;
size_t found_size_est = 0;
while (1) {
void *v;
r = queue_deq(q, &v, NULL, NULL);
......@@ -206,11 +209,12 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_
struct rowset *rs = (struct rowset *)v;
if (verbose) printf("v=%p\n", v);
for (size_t i=num_found; i<rs->n_rows; i++) {
for (size_t i=0; i<rs->n_rows; i++) {
struct row *row = &rs->rows[i];
assert(row->klen==sizeof(int));
assert(row->vlen==sizeof(int));
assert((int)i==*(int*)(rs->data+row->off));
assert((int)(num_found+i)==*(int*)(rs->data+row->off));
found_size_est += row->klen + row->vlen + disksize_row_overhead;
}
num_found += rs->n_rows;
......@@ -219,6 +223,7 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_
assert(r==0);
}
assert((int)num_found == n);
if (!expect_error) assert(found_size_est == size_est);
r = queue_eof(q2);
assert(r==0);
......@@ -238,7 +243,7 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_
toku_set_func_pwrite(bad_pwrite);
brt_loader_set_poll_function(&bl.poll_callback, loader_poll_callback, NULL);
r = toku_loader_write_brt_from_q_in_C(&bl, &desc, fd, 1000, q2);
r = toku_loader_write_brt_from_q_in_C(&bl, &desc, fd, 1000, q2, size_est);
// if (!(expect_error ? r != 0 : r == 0)) printf("WARNING%%d expect_error=%d r=%d\n", __LINE__, expect_error, r);
assert(expect_error ? r != 0 : r == 0);
......
......@@ -94,6 +94,7 @@ static void test_write_dbfile (char *template, int n, char *output_name) {
// put rows in the row set
struct rowset aset;
uint64_t size_est = 0;
init_rowset(&aset, toku_brtloader_get_rowset_budget_for_testing());
for (int i=0; i<n; i++) {
DBT key = {.size=sizeof i,
......@@ -101,7 +102,8 @@ static void test_write_dbfile (char *template, int n, char *output_name) {
DBT val = {.size=sizeof i,
.data=&i};
add_row(&aset, &key, &val);
}
size_est += key.size + val.size + disksize_row_overhead;
}
toku_brt_loader_set_n_rows(&bl, n);
......@@ -121,6 +123,7 @@ static void test_write_dbfile (char *template, int n, char *output_name) {
assert(r==0);
size_t num_found = 0;
size_t found_size_est = 0;
while (1) {
void *v;
r = queue_deq(q, &v, NULL, NULL);
......@@ -128,12 +131,13 @@ static void test_write_dbfile (char *template, int n, char *output_name) {
struct rowset *rs = (struct rowset *)v;
if (verbose) printf("v=%p\n", v);
for (size_t i=num_found; i<rs->n_rows; i++) {
for (size_t i=0; i<rs->n_rows; i++) {
struct row *row = &rs->rows[i];
assert(row->klen==sizeof(int));
assert(row->vlen==sizeof(int));
assert((int)i==*(int*)(rs->data+row->off));
}
assert((int)(num_found+i)==*(int*)(rs->data+row->off));
found_size_est += row->klen + row->vlen + disksize_row_overhead;
}
num_found += rs->n_rows;
......@@ -141,7 +145,8 @@ static void test_write_dbfile (char *template, int n, char *output_name) {
assert(r==0);
}
assert((int)num_found == n);
assert(found_size_est == size_est);
r = queue_eof(q2);
assert(r==0);
......@@ -154,7 +159,7 @@ static void test_write_dbfile (char *template, int n, char *output_name) {
assert(fd>=0);
if (verbose) traceit("write to file");
r = toku_loader_write_brt_from_q_in_C(&bl, &desc, fd, 1000, q2);
r = toku_loader_write_brt_from_q_in_C(&bl, &desc, fd, 1000, q2, size_est);
assert(r==0);
r = queue_destroy(q2);
......
......@@ -229,7 +229,8 @@ static void test_read_write_rows (char *template) {
static void fill_rowset (struct rowset *rows,
int keys[],
const char *vals[],
int n) {
int n,
uint64_t *size_est) {
init_rowset(rows, toku_brtloader_get_rowset_budget_for_testing());
for (int i=0; i<n; i++) {
DBT key = {.size=sizeof(keys[i]),
......@@ -237,6 +238,7 @@ static void fill_rowset (struct rowset *rows,
DBT val = {.size=strlen(vals[i]),
.data=(void *)vals[i]};
add_row(rows, &key, &val);
*size_est += key.size + val.size + disksize_row_overhead;
}
}
......@@ -294,8 +296,9 @@ static void verify_dbfile(int n, int sorted_keys[], const char *sorted_vals[], c
int sorted_keys[] = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
const char *sorted_vals[] = { "0", "a", "b", "c", "d", "e", "f", "g", "h", "i" };
struct rowset aset, bset;
fill_rowset(&aset, a_keys, a_vals, 6);
fill_rowset(&bset, b_keys, b_vals, 4);
uint64_t size_est = 0;
fill_rowset(&aset, a_keys, a_vals, 6, &size_est);
fill_rowset(&bset, b_keys, b_vals, 4, &size_est);
toku_brt_loader_set_n_rows(&bl, 6+4);
......@@ -320,7 +323,7 @@ static void verify_dbfile(int n, int sorted_keys[], const char *sorted_vals[], c
int fd = open(output_name, O_RDWR | O_CREAT | O_BINARY, S_IRWXU|S_IRWXG|S_IRWXO);
assert(fd>=0);
r = toku_loader_write_brt_from_q_in_C(&bl, &desc, fd, 1000, q);
r = toku_loader_write_brt_from_q_in_C(&bl, &desc, fd, 1000, q, size_est);
assert(r==0);
destroy_merge_fileset(&fs);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment