Commit f275c656 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

[t:3218] Merge the #3218 changes back onto the main line. The tests ran on...

[t:3218] Merge the #3218 changes back onto the main line.  The tests ran on pointy (43 minutes elapsed, by the way.  I did this merge (and added a comment):
{{{
svn merge -r27967:28141 https://svn.tokutek.com/tokudb/toku/tokudb.3218
}}}
Refs #3218.


git-svn-id: file:///svn/toku/tokudb@28175 c7de825b-a66e-492c-adef-691d508d4ae1
parent d3361396
...@@ -9,17 +9,12 @@ ...@@ -9,17 +9,12 @@
// It's not very fast at allocating or freeing. // It's not very fast at allocating or freeing.
// Previous implementation used next_fit, but now use first_fit since we are moving blocks around to reduce file size. // Previous implementation used next_fit, but now use first_fit since we are moving blocks around to reduce file size.
struct blockpair {
u_int64_t offset;
u_int64_t size;
};
struct block_allocator { struct block_allocator {
u_int64_t reserve_at_beginning; // How much to reserve at the beginning u_int64_t reserve_at_beginning; // How much to reserve at the beginning
u_int64_t alignment; // Block alignment u_int64_t alignment; // Block alignment
u_int64_t n_blocks; // How many blocks u_int64_t n_blocks; // How many blocks
u_int64_t blocks_array_size; // How big is the blocks_array. Must be >= n_blocks. u_int64_t blocks_array_size; // How big is the blocks_array. Must be >= n_blocks.
struct blockpair *blocks_array; // These blocks are sorted by address. struct block_allocator_blockpair *blocks_array; // These blocks are sorted by address.
u_int64_t n_bytes_in_use; // including the reserve_at_beginning u_int64_t n_bytes_in_use; // including the reserve_at_beginning
}; };
...@@ -77,42 +72,100 @@ destroy_block_allocator (BLOCK_ALLOCATOR *bap) { ...@@ -77,42 +72,100 @@ destroy_block_allocator (BLOCK_ALLOCATOR *bap) {
} }
static void static void
grow_blocks_array (BLOCK_ALLOCATOR ba) { grow_blocks_array_by (BLOCK_ALLOCATOR ba, u_int64_t n_to_add) {
if (ba->n_blocks >= ba->blocks_array_size) { if (ba->n_blocks + n_to_add > ba->blocks_array_size) {
ba->blocks_array_size *= 2; int new_size = ba->n_blocks + n_to_add;
int at_least = ba->blocks_array_size * 2;
if (at_least > new_size) {
new_size = at_least;
}
ba->blocks_array_size = new_size;
XREALLOC_N(ba->blocks_array_size, ba->blocks_array); XREALLOC_N(ba->blocks_array_size, ba->blocks_array);
} }
} }
static void
grow_blocks_array (BLOCK_ALLOCATOR ba) {
grow_blocks_array_by(ba, 1);
}
static void
merge_blockpairs_into (u_int64_t d, struct block_allocator_blockpair dst[/*d*/],
u_int64_t s, struct block_allocator_blockpair src[/*s*/])
// Effect: Merge dst[d] and src[s] into dst[d+s], merging in place.
// Initially dst and src hold sorted arrays (sorted by increasing offset).
// Finally dst contains all d+s elements sorted in order.
// dst must be large enough.
// Requires no overlaps.
{
u_int64_t tail = d+s;
while (d>0 && s>0) {
struct block_allocator_blockpair *dp = &dst[d-1];
struct block_allocator_blockpair *sp = &src[s-1];
struct block_allocator_blockpair *tp = &dst[tail-1];
assert(tail>0);
if (dp->offset > sp->offset) {
*tp = *dp;
d--;
tail--;
} else {
*tp = *sp;
s--;
tail--;
}
}
while (d>0) {
struct block_allocator_blockpair *dp = &dst[d-1];
struct block_allocator_blockpair *tp = &dst[tail-1];
*tp = *dp;
d--;
tail--;
}
while (s>0) {
struct block_allocator_blockpair *sp = &src[s-1];
struct block_allocator_blockpair *tp = &dst[tail-1];
*tp = *sp;
s--;
tail--;
}
}
static int
compare_blockpairs (const void *av, const void *bv) {
const struct block_allocator_blockpair *a = av;
const struct block_allocator_blockpair *b = bv;
if (a->offset < b->offset) return -1;
if (a->offset > b->offset) return +1;
return 0;
}
void void
block_allocator_alloc_block_at (BLOCK_ALLOCATOR ba, u_int64_t size, u_int64_t offset) { block_allocator_alloc_blocks_at (BLOCK_ALLOCATOR ba, u_int64_t n_blocks, struct block_allocator_blockpair *pairs)
assert(offset%ba->alignment == 0); {
u_int64_t i;
VALIDATE(ba);
assert(offset >= ba->reserve_at_beginning);
grow_blocks_array(ba);
// Just do a linear search for the block
ba->n_bytes_in_use += size;
for (i=0; i<ba->n_blocks; i++) {
if (ba->blocks_array[i].offset > offset) {
// allocate it in that slot
// Don't do error checking, since we require that the blocks don't overlap.
// Slide everything over
memmove(ba->blocks_array+i+1, ba->blocks_array+i, (ba->n_blocks - i)*sizeof(struct blockpair));
ba->blocks_array[i].offset = offset;
ba->blocks_array[i].size = size;
ba->n_blocks++;
VALIDATE(ba); VALIDATE(ba);
return; qsort(pairs, n_blocks, sizeof(*pairs), compare_blockpairs);
for (u_int64_t i=0; i<n_blocks; i++) {
assert(pairs[i].offset >= ba->reserve_at_beginning);
assert(pairs[i].offset%ba->alignment == 0);
ba->n_bytes_in_use += pairs[i].size;
} }
} grow_blocks_array_by(ba, n_blocks);
// Goes at the end merge_blockpairs_into(ba->n_blocks, ba->blocks_array,
ba->blocks_array[ba->n_blocks].offset = offset; n_blocks, pairs);
ba->blocks_array[ba->n_blocks].size = size; ba->n_blocks += n_blocks;
ba->n_blocks++;
VALIDATE(ba); VALIDATE(ba);
} }
void
block_allocator_alloc_block_at (BLOCK_ALLOCATOR ba, u_int64_t size, u_int64_t offset) {
struct block_allocator_blockpair p = {.size = size, .offset=offset};
// Just do a linear search for the block.
// This data structure is a sorted array (no gaps or anything), so the search isn't really making this any slower than the insertion.
// To speed up the insertion when opening a file, we provide the block_allocator_alloc_blocks_at function.
block_allocator_alloc_blocks_at(ba, 1, &p);
}
static inline u_int64_t static inline u_int64_t
align (u_int64_t value, BLOCK_ALLOCATOR ba) align (u_int64_t value, BLOCK_ALLOCATOR ba)
// Effect: align a value by rounding up. // Effect: align a value by rounding up.
...@@ -137,8 +190,8 @@ block_allocator_alloc_block (BLOCK_ALLOCATOR ba, u_int64_t size, u_int64_t *offs ...@@ -137,8 +190,8 @@ block_allocator_alloc_block (BLOCK_ALLOCATOR ba, u_int64_t size, u_int64_t *offs
u_int64_t end_of_reserve = align(ba->reserve_at_beginning, ba); u_int64_t end_of_reserve = align(ba->reserve_at_beginning, ba);
if (end_of_reserve + size <= ba->blocks_array[0].offset ) { if (end_of_reserve + size <= ba->blocks_array[0].offset ) {
// Check to see if the space immediately after the reserve is big enough to hold the new block. // Check to see if the space immediately after the reserve is big enough to hold the new block.
struct blockpair *bp = &ba->blocks_array[0]; struct block_allocator_blockpair *bp = &ba->blocks_array[0];
memmove(bp+1, bp, (ba->n_blocks)*sizeof(struct blockpair)); memmove(bp+1, bp, (ba->n_blocks)*sizeof(*bp));
bp[0].offset = end_of_reserve; bp[0].offset = end_of_reserve;
bp[0].size = size; bp[0].size = size;
ba->n_blocks++; ba->n_blocks++;
...@@ -149,13 +202,13 @@ block_allocator_alloc_block (BLOCK_ALLOCATOR ba, u_int64_t size, u_int64_t *offs ...@@ -149,13 +202,13 @@ block_allocator_alloc_block (BLOCK_ALLOCATOR ba, u_int64_t size, u_int64_t *offs
} }
for (u_int64_t blocknum = 0; blocknum +1 < ba->n_blocks; blocknum ++) { for (u_int64_t blocknum = 0; blocknum +1 < ba->n_blocks; blocknum ++) {
// Consider the space after blocknum // Consider the space after blocknum
struct blockpair *bp = &ba->blocks_array[blocknum]; struct block_allocator_blockpair *bp = &ba->blocks_array[blocknum];
u_int64_t this_offset = bp[0].offset; u_int64_t this_offset = bp[0].offset;
u_int64_t this_size = bp[0].size; u_int64_t this_size = bp[0].size;
u_int64_t answer_offset = align(this_offset + this_size, ba); u_int64_t answer_offset = align(this_offset + this_size, ba);
if (answer_offset + size > bp[1].offset) continue; // The block we want doesn't fit after this block. if (answer_offset + size > bp[1].offset) continue; // The block we want doesn't fit after this block.
// It fits, so allocate it here. // It fits, so allocate it here.
memmove(bp+2, bp+1, (ba->n_blocks - blocknum -1)*sizeof(struct blockpair)); memmove(bp+2, bp+1, (ba->n_blocks - blocknum -1)*sizeof(*bp));
bp[1].offset = answer_offset; bp[1].offset = answer_offset;
bp[1].size = size; bp[1].size = size;
ba->n_blocks++; ba->n_blocks++;
...@@ -164,7 +217,8 @@ block_allocator_alloc_block (BLOCK_ALLOCATOR ba, u_int64_t size, u_int64_t *offs ...@@ -164,7 +217,8 @@ block_allocator_alloc_block (BLOCK_ALLOCATOR ba, u_int64_t size, u_int64_t *offs
return; return;
} }
// It didn't fit anywhere, so fit it on the end. // It didn't fit anywhere, so fit it on the end.
struct blockpair *bp = &ba->blocks_array[ba->n_blocks]; assert(ba->n_blocks < ba->blocks_array_size);
struct block_allocator_blockpair *bp = &ba->blocks_array[ba->n_blocks];
u_int64_t answer_offset = align(bp[-1].offset+bp[-1].size, ba); u_int64_t answer_offset = align(bp[-1].offset+bp[-1].size, ba);
bp->offset = answer_offset; bp->offset = answer_offset;
bp->size = size; bp->size = size;
...@@ -206,7 +260,7 @@ block_allocator_free_block (BLOCK_ALLOCATOR ba, u_int64_t offset) { ...@@ -206,7 +260,7 @@ block_allocator_free_block (BLOCK_ALLOCATOR ba, u_int64_t offset) {
int64_t bn = find_block(ba, offset); int64_t bn = find_block(ba, offset);
assert(bn>=0); // we require that there is a block with that offset. Might as well abort if no such block exists. assert(bn>=0); // we require that there is a block with that offset. Might as well abort if no such block exists.
ba->n_bytes_in_use -= ba->blocks_array[bn].size; ba->n_bytes_in_use -= ba->blocks_array[bn].size;
memmove(&ba->blocks_array[bn], &ba->blocks_array[bn+1], (ba->n_blocks-bn-1) * sizeof(struct blockpair)); memmove(&ba->blocks_array[bn], &ba->blocks_array[bn+1], (ba->n_blocks-bn-1) * sizeof(struct block_allocator_blockpair));
ba->n_blocks--; ba->n_blocks--;
VALIDATE(ba); VALIDATE(ba);
} }
...@@ -222,7 +276,7 @@ u_int64_t ...@@ -222,7 +276,7 @@ u_int64_t
block_allocator_allocated_limit (BLOCK_ALLOCATOR ba) { block_allocator_allocated_limit (BLOCK_ALLOCATOR ba) {
if (ba->n_blocks==0) return ba->reserve_at_beginning; if (ba->n_blocks==0) return ba->reserve_at_beginning;
else { else {
struct blockpair *last = &ba->blocks_array[ba->n_blocks-1]; struct block_allocator_blockpair *last = &ba->blocks_array[ba->n_blocks-1];
return last->offset + last->size; return last->offset + last->size;
} }
} }
...@@ -260,7 +314,7 @@ block_allocator_get_unused_statistics(BLOCK_ALLOCATOR ba, TOKU_DB_FRAGMENTATION ...@@ -260,7 +314,7 @@ block_allocator_get_unused_statistics(BLOCK_ALLOCATOR ba, TOKU_DB_FRAGMENTATION
if (ba->n_blocks > 0) { if (ba->n_blocks > 0) {
//Deal with space before block 0 and after reserve: //Deal with space before block 0 and after reserve:
{ {
struct blockpair *bp = &ba->blocks_array[0]; struct block_allocator_blockpair *bp = &ba->blocks_array[0];
assert(bp->offset >= align(ba->reserve_at_beginning, ba)); assert(bp->offset >= align(ba->reserve_at_beginning, ba));
uint64_t free_space = bp->offset - align(ba->reserve_at_beginning, ba); uint64_t free_space = bp->offset - align(ba->reserve_at_beginning, ba);
if (free_space > 0) { if (free_space > 0) {
...@@ -275,7 +329,7 @@ block_allocator_get_unused_statistics(BLOCK_ALLOCATOR ba, TOKU_DB_FRAGMENTATION ...@@ -275,7 +329,7 @@ block_allocator_get_unused_statistics(BLOCK_ALLOCATOR ba, TOKU_DB_FRAGMENTATION
//Deal with space between blocks: //Deal with space between blocks:
for (u_int64_t blocknum = 0; blocknum +1 < ba->n_blocks; blocknum ++) { for (u_int64_t blocknum = 0; blocknum +1 < ba->n_blocks; blocknum ++) {
// Consider the space after blocknum // Consider the space after blocknum
struct blockpair *bp = &ba->blocks_array[blocknum]; struct block_allocator_blockpair *bp = &ba->blocks_array[blocknum];
uint64_t this_offset = bp[0].offset; uint64_t this_offset = bp[0].offset;
uint64_t this_size = bp[0].size; uint64_t this_size = bp[0].size;
uint64_t end_of_this_block = align(this_offset+this_size, ba); uint64_t end_of_this_block = align(this_offset+this_size, ba);
...@@ -292,7 +346,7 @@ block_allocator_get_unused_statistics(BLOCK_ALLOCATOR ba, TOKU_DB_FRAGMENTATION ...@@ -292,7 +346,7 @@ block_allocator_get_unused_statistics(BLOCK_ALLOCATOR ba, TOKU_DB_FRAGMENTATION
//Deal with space after last block //Deal with space after last block
{ {
struct blockpair *bp = &ba->blocks_array[ba->n_blocks-1]; struct block_allocator_blockpair *bp = &ba->blocks_array[ba->n_blocks-1];
uint64_t this_offset = bp[0].offset; uint64_t this_offset = bp[0].offset;
uint64_t this_size = bp[0].size; uint64_t this_size = bp[0].size;
uint64_t end_of_this_block = align(this_offset+this_size, ba); uint64_t end_of_this_block = align(this_offset+this_size, ba);
......
...@@ -62,13 +62,24 @@ void ...@@ -62,13 +62,24 @@ void
block_allocator_alloc_block_at (BLOCK_ALLOCATOR ba, u_int64_t size, u_int64_t offset); block_allocator_alloc_block_at (BLOCK_ALLOCATOR ba, u_int64_t size, u_int64_t offset);
// Effect: Allocate a block of the specified size at a particular offset. // Effect: Allocate a block of the specified size at a particular offset.
// Aborts if anything goes wrong. // Aborts if anything goes wrong.
// The performance of this function may be as bad as Theta(N), where N is the number of blocks currently in use.
// Usage note: To allocate several blocks (e.g., when opening a BRT), use block_allocator_alloc_blocks_at().
// Requires: The resulting block may not overlap any other allocated block. // Requires: The resulting block may not overlap any other allocated block.
// And the offset must be a multiple of the block alignment. // And the offset must be a multiple of the block alignment.
// Parameters: // Parameters:
// ba (IN/OUT): The block allocator. (Modifies ba.) // ba (IN/OUT): The block allocator. (Modifies ba.)
// size (IN): The size of the block. // size (IN): The size of the block.
// offset (IN): The location of the block. // offset (IN): The location of the block.
//
struct block_allocator_blockpair {
u_int64_t offset;
u_int64_t size;
};
void
block_allocator_alloc_blocks_at (BLOCK_ALLOCATOR ba, u_int64_t n_blocks, struct block_allocator_blockpair *pairs);
// Effect: Take pairs in any order, and add them all, as if we did block_allocator_alloc_block() on each pair.
// This should run in time O(N + M log M) where N is the number of blocks in ba, and M is the number of new blocks.
void void
block_allocator_alloc_block (BLOCK_ALLOCATOR ba, u_int64_t size, u_int64_t *offset); block_allocator_alloc_block (BLOCK_ALLOCATOR ba, u_int64_t size, u_int64_t *offset);
......
...@@ -785,14 +785,20 @@ static void ...@@ -785,14 +785,20 @@ static void
blocktable_note_translation (BLOCK_ALLOCATOR allocator, struct translation *t) { blocktable_note_translation (BLOCK_ALLOCATOR allocator, struct translation *t) {
//This is where the space for them will be reserved (in addition to normal blocks). //This is where the space for them will be reserved (in addition to normal blocks).
//See RESERVED_BLOCKNUMS //See RESERVED_BLOCKNUMS
int64_t i;
for (i=0; i<t->smallest_never_used_blocknum.b; i++) { // Previously this added blocks one at a time. Now we make an array and pass it in so it can be sorted and merged. See #3218.
struct block_allocator_blockpair *MALLOC_N(t->smallest_never_used_blocknum.b, pairs);
u_int64_t n_pairs = 0;
for (int64_t i=0; i<t->smallest_never_used_blocknum.b; i++) {
struct block_translation_pair pair = t->block_translation[i]; struct block_translation_pair pair = t->block_translation[i];
if (pair.size > 0) { if (pair.size > 0) {
assert(pair.u.diskoff != diskoff_unused); assert(pair.u.diskoff != diskoff_unused);
block_allocator_alloc_block_at(allocator, pair.size, pair.u.diskoff); pairs[n_pairs++] = (struct block_allocator_blockpair){.size = pair.size,
.offset = pair.u.diskoff};
} }
} }
block_allocator_alloc_blocks_at(allocator, n_pairs, pairs);
toku_free(pairs);
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment