Commit 42921bb8 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

#3163 use the nodesize configured in a db in the brtloader refs[t:3163]

git-svn-id: file:///svn/toku/tokudb@26909 c7de825b-a66e-492c-adef-691d508d4ae1
parent 22455dbb
......@@ -187,6 +187,7 @@ struct fractal_thread_args {
uint64_t total_disksize_estimate;
int errno_result; // the final result.
int which_db;
uint32_t target_nodesize;
};
void toku_brt_loader_set_n_rows(BRTLOADER bl, u_int64_t n_rows);
......@@ -218,7 +219,8 @@ int toku_loader_write_brt_from_q_in_C (BRTLOADER bl,
int progress_allocation,
QUEUE q,
uint64_t total_disksize_estimate,
int which_db);
int which_db,
uint32_t target_nodesize);
int brt_loader_mergesort_row_array (struct row rows[/*n*/], int n, int which_db, DB *dest_db, brt_compare_func, BRTLOADER, struct rowset *);
......
......@@ -70,7 +70,7 @@ static size_t do_fwrite (const void *ptr, size_t size, size_t nmemb, FILE *strea
// 1024 is the right size_factor for production.
// Different values for these sizes may be used for testing.
static uint32_t size_factor = 1024;
static int nodesize = (1<<22);
static uint32_t default_loader_nodesize = (1<<22);
enum { EXTRACTOR_QUEUE_DEPTH = 2,
FILE_BUFFER_SIZE = 1<<24,
......@@ -88,7 +88,7 @@ void
toku_brtloader_set_size_factor(uint32_t factor) {
// For test purposes only
size_factor = factor;
nodesize = (size_factor==1) ? (1<<15) : (1<<22);
default_loader_nodesize = (size_factor==1) ? (1<<15) : (1<<22);
}
uint64_t
......@@ -464,7 +464,7 @@ static int64_t memory_avail_during_merge(BRTLOADER bl, BOOL is_fractal_node) {
int64_t avail_memory = bl->reserved_memory;
if (is_fractal_node) {
// reserve space for the fractal writer thread buffers
avail_memory -= (int64_t)brt_loader_get_fractal_workers_count(bl) * (int64_t)nodesize * 2; // compressed and uncompressed buffers
avail_memory -= (int64_t)brt_loader_get_fractal_workers_count(bl) * (int64_t)default_loader_nodesize * 2; // compressed and uncompressed buffers
}
return avail_memory;
}
......@@ -2215,7 +2215,7 @@ static inline long int loader_random(void) {
return r;
}
static struct leaf_buf *start_leaf (struct dbout *out, const DESCRIPTOR UU(desc), int64_t lblocknum, TXNID xid) {
static struct leaf_buf *start_leaf (struct dbout *out, const DESCRIPTOR UU(desc), int64_t lblocknum, TXNID xid, uint32_t target_nodesize) {
invariant(lblocknum < out->n_translations_limit);
struct leaf_buf *XMALLOC(lbuf);
lbuf->blocknum = lblocknum;
......@@ -2227,7 +2227,7 @@ static struct leaf_buf *start_leaf (struct dbout *out, const DESCRIPTOR UU(desc)
putbuf_int32(&lbuf->dbuf, layout_version);
putbuf_int32(&lbuf->dbuf, layout_version); // layout_version original
putbuf_int32(&lbuf->dbuf, nodesize);
putbuf_int32(&lbuf->dbuf, target_nodesize);
putbuf_int32(&lbuf->dbuf, flags);
putbuf_int32(&lbuf->dbuf, height);
lbuf->rand4fingerprint = loader_random();
......@@ -2253,11 +2253,11 @@ static struct leaf_buf *start_leaf (struct dbout *out, const DESCRIPTOR UU(desc)
CILK_BEGIN
static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progress_allocation, BRTLOADER bl);
static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, struct subtrees_info *sts, const DESCRIPTOR descriptor);
static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, struct subtrees_info *sts, const DESCRIPTOR descriptor, uint32_t target_nodesize);
CILK_END
static void add_pair_to_leafnode (struct leaf_buf *lbuf, unsigned char *key, int keylen, unsigned char *val, int vallen);
static int write_translation_table (struct dbout *out, long long *off_of_translation_p);
static int write_header (struct dbout *out, long long translation_location_on_disk, long long translation_size_on_disk, BLOCKNUM root_blocknum_on_disk, LSN load_lsn, TXNID root_xid);
static int write_header (struct dbout *out, long long translation_location_on_disk, long long translation_size_on_disk, BLOCKNUM root_blocknum_on_disk, LSN load_lsn, TXNID root_xid, uint32_t target_nodesize);
static void drain_writer_q(QUEUE q) {
void *item;
......@@ -2279,7 +2279,8 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
int progress_allocation,
QUEUE q,
uint64_t total_disksize_estimate,
int which_db)
int which_db,
uint32_t target_nodesize)
// Effect: Consume a sequence of rowsets work from a queue, creating a fractal tree. Closes fd.
{
// set the number of fractal tree writer threads so that we can partition memory in the merger
......@@ -2344,7 +2345,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
if (bl->root_xids_that_created && bl->load_root_xid != bl->root_xids_that_created[which_db]) {
le_xid = bl->load_root_xid;
}
struct leaf_buf *lbuf = start_leaf(&out, descriptor, lblock, le_xid);
struct leaf_buf *lbuf = start_leaf(&out, descriptor, lblock, le_xid, target_nodesize);
u_int64_t n_rows_remaining = bl->n_rows;
u_int64_t old_n_rows_remaining = bl->n_rows;
......@@ -2375,7 +2376,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
// b) the remaining amount won't fit in the current node and the current node's data is more than the remaining amount
int remaining_amount = total_disksize_estimate - used_estimate;
int used_here = lbuf->dbuf.off + 1000; // leave 1000 for various overheads.
int target_size = (nodesize*7L)/8; // use only 7/8 of the node.
int target_size = (target_nodesize*7L)/8; // use only 7/8 of the node.
int used_here_with_next_key = used_here + key.size + val.size + disksize_row_overhead;
if ((used_here_with_next_key >= target_size)
|| (used_here + remaining_amount >= target_size
......@@ -2408,7 +2409,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
if (result == 0) result = r;
break;
}
lbuf = start_leaf(&out, descriptor, lblock, le_xid);
lbuf = start_leaf(&out, descriptor, lblock, le_xid, target_nodesize);
}
add_pair_to_leafnode(lbuf, (unsigned char *) key.data, key.size, (unsigned char *) val.data, val.size);
......@@ -2453,7 +2454,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
}
}
r = write_nonleaves(bl, pivots_file, &out, &sts, descriptor);
r = write_nonleaves(bl, pivots_file, &out, &sts, descriptor, target_nodesize);
if (r) {
result = r; goto error;
}
......@@ -2497,7 +2498,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
if (bl->root_xids_that_created) {
root_xid_that_created = bl->root_xids_that_created[which_db];
}
r = write_header(&out, off_of_translation, (out.n_translations+1)*16+4, root_block, bl->load_lsn, root_xid_that_created);
r = write_header(&out, off_of_translation, (out.n_translations+1)*16+4, root_block, bl->load_lsn, root_xid_that_created, target_nodesize);
if (r) {
result = r; goto error;
}
......@@ -2538,13 +2539,15 @@ int toku_loader_write_brt_from_q_in_C (BRTLOADER bl,
int progress_allocation,
QUEUE q,
uint64_t total_disksize_estimate,
int which_db)
int which_db,
uint32_t target_nodesize)
// This is probably only for testing.
{
target_nodesize = target_nodesize == 0 ? default_loader_nodesize : target_nodesize;
#if defined(__cilkplusplus)
return cilk::run(toku_loader_write_brt_from_q, bl, descriptor, fd, progress_allocation, q, total_disksize_estimate, which_db);
return cilk::run(toku_loader_write_brt_from_q, bl, descriptor, fd, progress_allocation, q, total_disksize_estimate, which_db, target_nodesize);
#else
return toku_loader_write_brt_from_q (bl, descriptor, fd, progress_allocation, q, total_disksize_estimate, which_db);
return toku_loader_write_brt_from_q (bl, descriptor, fd, progress_allocation, q, total_disksize_estimate, which_db, target_nodesize);
#endif
}
......@@ -2553,9 +2556,9 @@ static void* fractal_thread (void *ftav) {
BL_TRACE(blt_start_fractal_thread);
struct fractal_thread_args *fta = (struct fractal_thread_args *)ftav;
#if defined(__cilkplusplus)
int r = cilk::run(toku_loader_write_brt_from_q, fta->bl, fta->descriptor, fta->fd, fta->progress_allocation, fta->q, fta->total_disksize_estimate, fta->which_db);
int r = cilk::run(toku_loader_write_brt_from_q, fta->bl, fta->descriptor, fta->fd, fta->progress_allocation, fta->q, fta->total_disksize_estimate, fta->which_db, fta->target_nodesize);
#else
int r = toku_loader_write_brt_from_q (fta->bl, fta->descriptor, fta->fd, fta->progress_allocation, fta->q, fta->total_disksize_estimate, fta->which_db);
int r = toku_loader_write_brt_from_q (fta->bl, fta->descriptor, fta->fd, fta->progress_allocation, fta->q, fta->total_disksize_estimate, fta->which_db, fta->target_nodesize);
#endif
fta->errno_result = r;
return NULL;
......@@ -2591,7 +2594,11 @@ static int loader_do_i (BRTLOADER bl,
if (fd < 0) {
r = errno; goto error;
}
uint32_t target_nodesize;
r = dest_db->get_pagesize(dest_db, &target_nodesize);
invariant_zero(r);
// This structure must stay live until the join below.
struct fractal_thread_args fta = { bl,
descriptor,
......@@ -2600,7 +2607,9 @@ static int loader_do_i (BRTLOADER bl,
bl->fractal_queues[which_db],
bl->extracted_datasizes[which_db],
0,
which_db };
which_db,
target_nodesize,
};
r = toku_pthread_create(bl->fractal_threads+which_db, NULL, fractal_thread, (void*)&fta);
if (r) {
......@@ -2848,7 +2857,7 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr
for (int i = 0; i < n_sub_blocks; i++)
sub_block_init(&sub_block[i]);
set_all_sub_block_sizes(uncompressed_len, sub_block_size, n_sub_blocks, sub_block);
// allocate space for the compressed bufer
int bound = get_sum_compressed_size_bound(n_sub_blocks, sub_block);
unsigned char *MALLOC_N(header_len + bound, compressed_buf);
......@@ -2946,14 +2955,16 @@ static int write_translation_table (struct dbout *out, long long *off_of_transla
}
static int write_header (struct dbout *out, long long translation_location_on_disk, long long translation_size_on_disk, BLOCKNUM root_blocknum_on_disk, LSN load_lsn, TXNID root_xid_that_created) {
static int
write_header (struct dbout *out, long long translation_location_on_disk, long long translation_size_on_disk, BLOCKNUM root_blocknum_on_disk,
LSN load_lsn, TXNID root_xid_that_created, uint32_t target_nodesize) {
int result = 0;
struct brt_header h; memset(&h, 0, sizeof h);
h.layout_version = BRT_LAYOUT_VERSION;
h.checkpoint_count = 1;
h.checkpoint_lsn = load_lsn;
h.nodesize = nodesize;
h.nodesize = target_nodesize;
h.root = root_blocknum_on_disk;
h.flags = 0;
h.layout_version_original = BRT_LAYOUT_VERSION;
......@@ -3081,7 +3092,7 @@ CILK_BEGIN
static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknum_of_new_node, int n_children,
DBT *pivots, /* must free this array, as well as the things it points t */
struct subtree_info *subtree_info, int height, const DESCRIPTOR UU(desc))
struct subtree_info *subtree_info, int height, const DESCRIPTOR UU(desc), uint32_t target_nodesize)
{
//Nodes do not currently touch descriptors
invariant(height>0);
......@@ -3089,7 +3100,7 @@ static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknu
int result = 0;
BRTNODE XMALLOC(node);
node->nodesize = nodesize;
node->nodesize = target_nodesize;
node->thisnodename = make_blocknum(blocknum_of_new_node);
node->layout_version = BRT_LAYOUT_VERSION;
node->layout_version_original = BRT_LAYOUT_VERSION;
......@@ -3174,7 +3185,7 @@ static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknu
brt_loader_set_panic(bl, result, TRUE);
}
static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, struct subtrees_info *sts, const DESCRIPTOR descriptor) {
static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, struct subtrees_info *sts, const DESCRIPTOR descriptor, uint32_t target_nodesize) {
int result = 0;
int height = 1;
......@@ -3222,7 +3233,7 @@ static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, s
result = r;
break;
} else {
cilk_spawn write_nonleaf_node(bl, out, blocknum_of_new_node, n_per_block, pivots, subtree_info, height, descriptor); // frees all the data structures that go into making the node.
cilk_spawn write_nonleaf_node(bl, out, blocknum_of_new_node, n_per_block, pivots, subtree_info, height, descriptor, target_nodesize); // frees all the data structures that go into making the node.
n_subtrees_used += n_per_block;
}
}
......@@ -3245,7 +3256,7 @@ static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, s
if (r) {
result = r;
} else {
cilk_spawn write_nonleaf_node(bl, out, blocknum_of_new_node, n_first, pivots, subtree_info, height, descriptor);
cilk_spawn write_nonleaf_node(bl, out, blocknum_of_new_node, n_first, pivots, subtree_info, height, descriptor, target_nodesize);
n_blocks_left -= n_first;
n_subtrees_used += n_first;
}
......@@ -3264,7 +3275,7 @@ static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, s
if (r) {
result = r;
} else {
cilk_spawn write_nonleaf_node(bl, out, blocknum_of_new_node, n_blocks_left, pivots, subtree_info, height, descriptor);
cilk_spawn write_nonleaf_node(bl, out, blocknum_of_new_node, n_blocks_left, pivots, subtree_info, height, descriptor, target_nodesize);
n_subtrees_used += n_blocks_left;
}
}
......
......@@ -132,7 +132,7 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_
brt_loader_set_error_function(&bl.error_callback, NULL, NULL);
brt_loader_set_poll_function(&bl.poll_callback, loader_poll_callback, NULL);
r = toku_loader_write_brt_from_q_in_C(&bl, &desc, fd, 1000, q2, size_est, 0);
r = toku_loader_write_brt_from_q_in_C(&bl, &desc, fd, 1000, q2, size_est, 0, 0);
// if (!(expect_error ? r != 0 : r == 0)) printf("WARNING%%d expect_error=%d r=%d\n", __LINE__, expect_error, r);
assert(expect_error ? r != 0 : r == 0);
......
......@@ -163,7 +163,7 @@ static void test_write_dbfile (char *template, int n, char *output_name) {
assert(fd>=0);
if (verbose) traceit("write to file");
r = toku_loader_write_brt_from_q_in_C(&bl, &desc, fd, 1000, q2, size_est, 0);
r = toku_loader_write_brt_from_q_in_C(&bl, &desc, fd, 1000, q2, size_est, 0, 0);
assert(r==0);
r = queue_destroy(q2);
......
......@@ -325,7 +325,7 @@ static void verify_dbfile(int n, int sorted_keys[], const char *sorted_vals[], c
int fd = open(output_name, O_RDWR | O_CREAT | O_BINARY, S_IRWXU|S_IRWXG|S_IRWXO);
assert(fd>=0);
r = toku_loader_write_brt_from_q_in_C(&bl, &desc, fd, 1000, q, size_est, 0);
r = toku_loader_write_brt_from_q_in_C(&bl, &desc, fd, 1000, q, size_est, 0, 0);
assert(r==0);
destroy_merge_fileset(&fs);
......
......@@ -19,6 +19,7 @@ struct kv_pair {
struct kv_pair kv_pairs[NUM_KV_PAIRS] = {{1,4},
{2,5},
{3,6}};
static uint32_t block_size = 0;
static int put_multiple_generate(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val) {
......@@ -132,6 +133,9 @@ static void run_test(void)
r = db_create(&dbs[i], env, 0); CKERR(r);
r = dbs[i]->set_descriptor(dbs[i], 1, &desc); CKERR(r);
dbs[i]->app_private = &idx[i];
if (block_size != 0) {
r = dbs[i]->set_pagesize(dbs[i], block_size); CKERR(r);
}
snprintf(name, sizeof(name), "db_%04x", i);
r = dbs[i]->open(dbs[i], NULL, name, NULL, DB_BTREE, DB_CREATE, 0666); CKERR(r);
}
......@@ -173,6 +177,9 @@ static void do_args(int argc, char * const argv[]) {
exit(resultcode);
} else if (strcmp(argv[0], "-p")==0) {
USE_PUTS = 1;
} else if (strcmp(argv[0], "--block_size") == 0) {
argc--; argv++;
block_size = atoi(argv[0]);
} else {
fprintf(stderr, "Unknown arg: %s\n", argv[0]);
resultcode=1;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment