Commit d33db3f8 authored by Yoni Fogel's avatar Yoni Fogel

Addresses #1463

Abstracted out the block translation table and the block allocator
into BLOCK_TABLE
All use is done by accessors surrounded by locks.


git-svn-id: file:///svn/toku/tokudb@9360 c7de825b-a66e-492c-adef-691d508d4ae1
parent 7cbba1ab
...@@ -37,6 +37,7 @@ build default: bins libs $(TEST_NEWBRT) ...@@ -37,6 +37,7 @@ build default: bins libs $(TEST_NEWBRT)
BRT_SOURCES = \ BRT_SOURCES = \
block_allocator \ block_allocator \
block_table \
bread \ bread \
brt-serialize \ brt-serialize \
brt-verify \ brt-verify \
......
...@@ -6,6 +6,12 @@ ...@@ -6,6 +6,12 @@
#include "brttypes.h" #include "brttypes.h"
#define BLOCK_ALLOCATOR_ALIGNMENT 4096
// How much must be reserved at the beginning for the block?
// The actual header is 8+4+4+8+8_4+8+ the length of the db names + 1 pointer for each root.
// So 4096 should be enough.
#define BLOCK_ALLOCATOR_HEADER_RESERVE 4096
// A block allocator manages the allocation of variable-sized blocks. // A block allocator manages the allocation of variable-sized blocks.
// The translation of block numbers to addresses is handled elsewhere. // The translation of block numbers to addresses is handled elsewhere.
// The allocation of block numbers is handled elsewhere. // The allocation of block numbers is handled elsewhere.
......
//TODO: What about h->block_translation_size_on_disk
//TODO: What about h->block_translation_address_on_disk
//TODO: What about h->block_allocator
#include "toku_portability.h"
#include "brttypes.h"
#include "block_table.h"
#include "memory.h"
#include "toku_assert.h"
#include "toku_pthread.h"
#include "block_allocator.h"
#include "rbuf.h"
#include "wbuf.h"
struct block_table {
// This is the map from block numbers to offsets
//int n_blocks, n_blocks_array_size;
//struct block_descriptor *blocks;
BLOCKNUM free_blocks; // free list for blocks. Use -1 to indicate that there are no free blocks
BLOCKNUM unused_blocks; // first unused block
u_int64_t translated_blocknum_limit;
struct block_translation_pair *block_translation;
// Where and how big is the block translation vector stored on disk.
// The size of the on_disk buffer may no longer match the max_blocknum_translated field, since blocks may have been allocated or freed.
// We need to remember this old information so we can free it properly.
u_int64_t block_translation_size_on_disk; // the size of the block containing the translation (i.e. 8 times the number of entries)
u_int64_t block_translation_address_on_disk; // 0 if there is no memory allocated
// The in-memory data structure for block allocation
BLOCK_ALLOCATOR block_allocator;
};
static const DISKOFF diskoff_is_null = (DISKOFF)-1; // in a freelist, this indicates end of list
static const DISKOFF size_is_free = (DISKOFF)-1;
static void
extend_block_translation(BLOCK_TABLE bt, BLOCKNUM blocknum)
// Effect: Record a block translation. This means extending the translation table, and setting the diskoff and size to zero in any of the unused spots.
{
assert(0<=blocknum.b);
if (bt->translated_blocknum_limit <= (u_int64_t)blocknum.b) {
if (bt->block_translation == 0) assert(bt->translated_blocknum_limit==0);
u_int64_t new_limit = blocknum.b + 1;
u_int64_t old_limit = bt->translated_blocknum_limit;
u_int64_t j;
XREALLOC_N(new_limit, bt->block_translation);
for (j=old_limit; j<new_limit; j++) {
bt->block_translation[j].diskoff = 0;
bt->block_translation[j].size = 0;
}
bt->translated_blocknum_limit = new_limit;
}
}
static inline void
verify(BLOCK_TABLE bt, BLOCKNUM b) {
// 0<=b<limit (limit is exclusive)
assert(0 <= b.b);
assert((u_int64_t)b.b < bt->translated_blocknum_limit);
}
static toku_pthread_mutex_t blocktable_mutex = TOKU_PTHREAD_MUTEX_INITIALIZER;
static int blocktable_is_locked=0;
void toku_blocktable_lock_init(void) {
int r = toku_pthread_mutex_init(&blocktable_mutex, NULL); assert(r == 0);
}
void toku_blocktable_lock_destroy(void) {
int r = toku_pthread_mutex_destroy(&blocktable_mutex); assert(r == 0);
}
static inline void
lock_for_blocktable (void) {
// Locks the blocktable_mutex.
int r = toku_pthread_mutex_lock(&blocktable_mutex);
assert(r==0);
blocktable_is_locked = 1;
}
static inline void
unlock_for_blocktable (void) {
blocktable_is_locked = 0;
int r = toku_pthread_mutex_unlock(&blocktable_mutex);
assert(r==0);
}
static void
block_free(BLOCK_TABLE bt, u_int64_t offset) {
block_allocator_free_block(bt->block_allocator, offset);
}
static void
block_free_blocknum(BLOCK_TABLE bt, BLOCKNUM b) {
verify(bt, b);
if (bt->block_translation[b.b].size > 0) {
block_free(bt, bt->block_translation[b.b].diskoff);
bt->block_translation[b.b].diskoff = 0;
bt->block_translation[b.b].size = 0;
}
}
static void
block_alloc(BLOCK_TABLE bt, u_int64_t size, u_int64_t *offset) {
block_allocator_alloc_block(bt->block_allocator, size, offset);
}
static void
block_alloc_and_set_translation(BLOCK_TABLE bt, BLOCKNUM b, u_int64_t size, u_int64_t *offset) {
verify(bt, b);
block_alloc(bt, size, offset);
bt->block_translation[b.b].diskoff = *offset;
bt->block_translation[b.b].size = size;
}
void
toku_block_alloc(BLOCK_TABLE bt, u_int64_t size, u_int64_t *offset) {
lock_for_blocktable();
block_alloc(bt, size, offset);
unlock_for_blocktable();
}
void
toku_block_free(BLOCK_TABLE bt, u_int64_t offset) {
lock_for_blocktable();
block_free(bt, offset);
unlock_for_blocktable();
}
static void
update_size_on_disk(BLOCK_TABLE bt) {
bt->block_translation_size_on_disk = 4 +//4 for checksum
bt->translated_blocknum_limit*sizeof(bt->block_translation[0]);
}
void
toku_block_realloc(BLOCK_TABLE bt, BLOCKNUM b, u_int64_t size, u_int64_t *offset) {
lock_for_blocktable();
extend_block_translation(bt, b);
block_free_blocknum(bt, b);
block_alloc_and_set_translation(bt, b, size, offset);
unlock_for_blocktable();
}
void
toku_block_lock_for_multiple_operations(void) {
lock_for_blocktable();
}
void
toku_block_unlock_for_multiple_operations(void) {
assert(blocktable_is_locked);
unlock_for_blocktable();
}
void
toku_block_realloc_translation_unlocked(BLOCK_TABLE bt) {
assert(blocktable_is_locked);
if (bt->block_translation_address_on_disk != 0) {
block_allocator_free_block(bt->block_allocator, bt->block_translation_address_on_disk);
}
update_size_on_disk(bt);
block_allocator_alloc_block(bt->block_allocator,
bt->block_translation_size_on_disk,
&bt->block_translation_address_on_disk);
}
void
toku_block_wbuf_free_blocks_unlocked(BLOCK_TABLE bt, struct wbuf *wbuf) {
assert(blocktable_is_locked);
wbuf_BLOCKNUM(wbuf, bt->free_blocks);
}
void
toku_block_wbuf_unused_blocks_unlocked(BLOCK_TABLE bt, struct wbuf *wbuf) {
assert(blocktable_is_locked);
wbuf_BLOCKNUM(wbuf, bt->unused_blocks);
}
void
toku_block_wbuf_translated_blocknum_limit_unlocked(BLOCK_TABLE bt, struct wbuf *wbuf) {
assert(blocktable_is_locked);
wbuf_ulonglong(wbuf, bt->translated_blocknum_limit);
}
void
toku_block_wbuf_block_translation_address_on_disk_unlocked(BLOCK_TABLE bt, struct wbuf *wbuf) {
assert(blocktable_is_locked);
wbuf_DISKOFF(wbuf, bt->block_translation_address_on_disk);
}
void
toku_block_wbuf_init_and_fill_unlocked(BLOCK_TABLE bt, struct wbuf *w,
u_int64_t *size, u_int64_t *address) {
assert(blocktable_is_locked);
update_size_on_disk(bt);
u_int64_t size_translation = bt->block_translation_size_on_disk;
//printf("%s:%d writing translation table of size_translation %ld at %ld\n", __FILE__, __LINE__, size_translation, bt->block_translation_address_on_disk);
wbuf_init(w, toku_malloc(size_translation), size_translation);
assert(w->size==size_translation);
u_int64_t i;
for (i=0; i<bt->translated_blocknum_limit; i++) {
//printf("%s:%d %ld,%ld\n", __FILE__, __LINE__, bt->block_translation[i].diskoff, bt->block_translation[i].size_translation);
wbuf_ulonglong(w, bt->block_translation[i].diskoff);
wbuf_ulonglong(w, bt->block_translation[i].size);
}
u_int32_t checksum = x1764_finish(&w->checksum);
wbuf_int(w, checksum);
*size = size_translation;
*address = bt->block_translation_address_on_disk;
}
DISKOFF
toku_block_get_offset(BLOCK_TABLE bt, BLOCKNUM b) {
lock_for_blocktable();
verify(bt, b);
DISKOFF r = bt->block_translation[b.b].diskoff;
unlock_for_blocktable();
return r;
}
DISKOFF
toku_block_get_size(BLOCK_TABLE bt, BLOCKNUM b) {
lock_for_blocktable();
verify(bt, b);
DISKOFF r = bt->block_translation[b.b].size;
unlock_for_blocktable();
return r;
}
int
toku_allocate_diskblocknumber(BLOCK_TABLE bt, BLOCKNUM *res, int *dirty, TOKULOGGER UU(logger)) {
lock_for_blocktable();
BLOCKNUM result;
if (bt->free_blocks.b == diskoff_is_null) {
// no blocks in the free list
result = bt->unused_blocks;
bt->unused_blocks.b++;
} else {
result = bt->free_blocks;
assert(bt->block_translation[result.b].size = size_is_free);
bt->block_translation[result.b].size = 0;
bt->free_blocks.b = bt->block_translation[result.b].diskoff; // pop the freelist
}
assert(result.b>0);
*res = result;
*dirty = 1;
unlock_for_blocktable();
return 0;
}
////CONVERTED above already
//TODO: Convert below
int
toku_free_diskblocknumber(BLOCK_TABLE bt, BLOCKNUM *b, int *dirty, TOKULOGGER UU(logger))
// Effect: Free a diskblock
// Watch out for the case where the disk block was never yet written to disk
{
lock_for_blocktable();
extend_block_translation(bt, *b);
// If the block_translation indicates that the size is <=0
// then there is no disk block allocated.
if (bt->block_translation[b->b].size > 0) {
block_allocator_free_block(bt->block_allocator,
bt->block_translation[b->b].diskoff);
}
verify(bt, *b);
assert(bt->block_translation[b->b].size != size_is_free);
bt->block_translation[b->b].size = size_is_free;
bt->block_translation[b->b].diskoff = bt->free_blocks.b;
bt->free_blocks.b = b->b;
b->b = 0;
*dirty = 1;
unlock_for_blocktable();
return 0;
}
//Verify there are no free blocks.
void
toku_block_verify_no_free_blocks(BLOCK_TABLE bt) {
assert(bt->free_blocks.b==-1);
}
//Verify a block has been allocated at least once.
void
toku_verify_diskblocknumber_allocated(BLOCK_TABLE bt, BLOCKNUM b) {
lock_for_blocktable();
assert(0 <= b.b);
assert( b.b < bt->unused_blocks.b);
unlock_for_blocktable();
}
u_int64_t toku_block_allocator_allocated_limit(BLOCK_TABLE bt) {
lock_for_blocktable();
u_int64_t r = block_allocator_allocated_limit(bt->block_allocator);
unlock_for_blocktable();
return r;
}
void
toku_block_dump_translation_table(FILE *f, BLOCK_TABLE bt) {
lock_for_blocktable();
u_int64_t i;
fprintf(f, "Block translation:");
for (i=0; i<bt->translated_blocknum_limit; i++) {
fprintf(f, " %"PRIu64": %"PRId64" %"PRId64"", i, bt->block_translation[i].diskoff, bt->block_translation[i].size);
}
fprintf(f, "\n");
unlock_for_blocktable();
}
void
toku_block_dump_translation(BLOCK_TABLE bt, u_int64_t offset) {
lock_for_blocktable();
if (offset < bt->translated_blocknum_limit) {
struct block_translation_pair *bx = &bt->block_translation[offset];
printf("%" PRIu64 ": %" PRId64 " %" PRId64 "\n", offset, bx->diskoff, bx->size);
}
unlock_for_blocktable();
}
void
toku_block_recovery_set_unused_blocks(BLOCK_TABLE bt, BLOCKNUM newunused) {
lock_for_blocktable();
bt->unused_blocks = newunused;
unlock_for_blocktable();
}
void
toku_block_recovery_set_free_blocks(BLOCK_TABLE bt, BLOCKNUM newfree) {
lock_for_blocktable();
bt->free_blocks = newfree;
unlock_for_blocktable();
}
void
toku_block_memcpy_translation_table(BLOCK_TABLE bt, size_t n, void *p) {
lock_for_blocktable();
memcpy(p, bt->block_translation, n);
unlock_for_blocktable();
}
u_int64_t
toku_block_get_translated_blocknum_limit(BLOCK_TABLE bt) {
lock_for_blocktable();
u_int64_t r = bt->translated_blocknum_limit;
unlock_for_blocktable();
return r;
}
BLOCKNUM
toku_block_get_free_blocks(BLOCK_TABLE bt) {
lock_for_blocktable();
BLOCKNUM r = bt->free_blocks;
unlock_for_blocktable();
return r;
}
BLOCKNUM
toku_block_get_unused_blocks(BLOCK_TABLE bt) {
lock_for_blocktable();
BLOCKNUM r = bt->unused_blocks;
unlock_for_blocktable();
return r;
}
void
toku_blocktable_destroy(BLOCK_TABLE *btp) {
lock_for_blocktable();
BLOCK_TABLE bt = *btp;
*btp = NULL;
toku_free(bt->block_translation);
bt->block_translation = NULL;
destroy_block_allocator(&bt->block_allocator);
unlock_for_blocktable();
}
void
toku_blocktable_debug_set_translation(BLOCK_TABLE bt,
u_int64_t limit,
struct block_translation_pair *table) {
lock_for_blocktable();
if (bt->block_translation) toku_free(bt->block_translation);
bt->translated_blocknum_limit = limit;
bt->block_translation = table;
unlock_for_blocktable();
}
void
toku_blocktable_create(BLOCK_TABLE *btp,
BLOCKNUM free_blocks,
BLOCKNUM unused_blocks,
u_int64_t translated_blocknum_limit,
u_int64_t block_translation_address_on_disk,
u_int64_t block_translation_size_on_disk,
unsigned char *buffer) {
lock_for_blocktable();
BLOCK_TABLE bt;
XMALLOC(bt);
bt->free_blocks = free_blocks;
bt->unused_blocks = unused_blocks;
bt->translated_blocknum_limit = translated_blocknum_limit;
bt->block_translation_address_on_disk = block_translation_address_on_disk;
update_size_on_disk(bt);
if (block_translation_address_on_disk==0 && block_translation_size_on_disk == 0) {
bt->block_translation_size_on_disk = 0;
}
assert(block_translation_size_on_disk==bt->block_translation_size_on_disk);
// Set up the the block translation buffer.
create_block_allocator(&bt->block_allocator, BLOCK_ALLOCATOR_HEADER_RESERVE, BLOCK_ALLOCATOR_ALIGNMENT);
if (block_translation_address_on_disk==0) {
bt->block_translation = NULL;
assert(buffer==NULL);
}
else {
XMALLOC_N(translated_blocknum_limit, bt->block_translation);
//Mark where the translation table is stored on disk.
block_allocator_alloc_block_at(bt->block_allocator, bt->block_translation_size_on_disk, bt->block_translation_address_on_disk);
//Load translations from the buffer.
u_int64_t i;
struct rbuf rt;
rt.buf = buffer;
rt.ndone = 0;
rt.size = bt->block_translation_size_on_disk-4;//4==checksum
assert(rt.size>0);
for (i=0; i<bt->translated_blocknum_limit; i++) {
bt->block_translation[i].diskoff = rbuf_diskoff(&rt);
bt->block_translation[i].size = rbuf_diskoff(&rt);
if (bt->block_translation[i].size > 0)
block_allocator_alloc_block_at(bt->block_allocator, bt->block_translation[i].size, bt->block_translation[i].diskoff);
//printf("%s:%d %ld %ld\n", __FILE__, __LINE__, bt->block_translation[i].diskoff, bt->block_translation[i].size);
}
}
// printf("%s:%d translated_blocknum_limit=%ld, block_translation_address_on_disk=%ld\n", __FILE__, __LINE__, bt->translated_blocknum_limit, bt->block_translation_address_on_disk);
*btp = bt;
unlock_for_blocktable();
}
void
toku_blocktable_create_new(BLOCK_TABLE *btp) {
toku_blocktable_create(btp,
make_blocknum(-1),
make_blocknum(2),
0, 0, 0, NULL);
}
/* -*- mode: C; c-basic-offset: 4 -*- */
#ifndef BLOCKTABLE_H
#define BLOCKTABLE_H
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
typedef struct block_table *BLOCK_TABLE;
//Needed by tests, brtdump
struct block_translation_pair {
DISKOFF diskoff; // When in free list, set to the next free block. In this case it's really a BLOCKNUM.
DISKOFF size; // set to 0xFFFFFFFFFFFFFFFF for free
};
void toku_blocktable_lock_init(void);
void toku_blocktable_lock_destroy(void);
void toku_block_realloc(BLOCK_TABLE bt, BLOCKNUM b, u_int64_t size, u_int64_t *offset);
void toku_block_alloc(BLOCK_TABLE bt, u_int64_t size, u_int64_t *offset);
void toku_block_free(BLOCK_TABLE bt, u_int64_t offset);
DISKOFF toku_block_get_offset(BLOCK_TABLE bt, BLOCKNUM b);
DISKOFF toku_block_get_size(BLOCK_TABLE bt, BLOCKNUM b);
int toku_allocate_diskblocknumber(BLOCK_TABLE bt, BLOCKNUM *res, int *dirty, TOKULOGGER logger);
int toku_free_diskblocknumber(BLOCK_TABLE bt, BLOCKNUM *b, int *dirty, TOKULOGGER logger);
void toku_verify_diskblocknumber_allocated(BLOCK_TABLE bt, BLOCKNUM b);
void toku_block_verify_no_free_blocks(BLOCK_TABLE bt);
u_int64_t toku_block_allocator_allocated_limit(BLOCK_TABLE bt);
void toku_block_dump_translation_table(FILE *f, BLOCK_TABLE bt);
void toku_block_dump_translation(BLOCK_TABLE bt, u_int64_t offset);
void toku_blocktable_destroy(BLOCK_TABLE *btp);
void toku_blocktable_debug_set_translation(BLOCK_TABLE bt,
u_int64_t limit,
struct block_translation_pair *table);
void toku_blocktable_create(BLOCK_TABLE *btp,
BLOCKNUM free_blocks,
BLOCKNUM unused_blocks,
u_int64_t translated_blocknum_limit,
u_int64_t block_translation_address_on_disk,
u_int64_t block_translation_size_on_disk,
unsigned char *buffer);
void toku_blocktable_create_new(BLOCK_TABLE *bt);
void toku_block_recovery_set_unused_blocks(BLOCK_TABLE bt, BLOCKNUM newunused);
void toku_block_recovery_set_free_blocks(BLOCK_TABLE bt, BLOCKNUM newfree);
BLOCKNUM toku_block_get_unused_blocks(BLOCK_TABLE bt);
BLOCKNUM toku_block_get_free_blocks(BLOCK_TABLE bt);
u_int64_t toku_block_get_translated_blocknum_limit(BLOCK_TABLE bt);
void toku_block_memcpy_translation_table(BLOCK_TABLE bt, size_t n, void *p);
//Unlocked/multi ops
void toku_block_lock_for_multiple_operations(void);
void toku_block_unlock_for_multiple_operations(void);
void toku_block_realloc_translation_unlocked(BLOCK_TABLE bt);
void toku_block_wbuf_free_blocks_unlocked(BLOCK_TABLE bt, struct wbuf *wbuf);
void toku_block_wbuf_unused_blocks_unlocked(BLOCK_TABLE bt, struct wbuf *wbuf);
void toku_block_wbuf_translated_blocknum_limit_unlocked(BLOCK_TABLE bt, struct wbuf *wbuf);
void toku_block_wbuf_block_translation_address_on_disk_unlocked(BLOCK_TABLE bt, struct wbuf *wbuf);
void toku_block_wbuf_init_and_fill_unlocked(BLOCK_TABLE bt, struct wbuf *w,
u_int64_t *size, u_int64_t *address);
#endif
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
typedef void *OMTVALUE; typedef void *OMTVALUE;
#include "omt.h" #include "omt.h"
#include "leafentry.h" #include "leafentry.h"
#include "block_table.h"
#ifndef BRT_FANOUT #ifndef BRT_FANOUT
#define BRT_FANOUT 16 #define BRT_FANOUT 16
...@@ -113,11 +114,6 @@ struct remembered_hash { ...@@ -113,11 +114,6 @@ struct remembered_hash {
u_int32_t fullhash; // fullhash is the hashed value of fnum and root. u_int32_t fullhash; // fullhash is the hashed value of fnum and root.
}; };
struct block_translation_pair {
DISKOFF diskoff; // When in free list, set to the next free block. In this case it's really a BLOCKNUM.
DISKOFF size; // set to 0xFFFFFFFFFFFFFFFF for free
};
// The brt_header is not managed by the cachetable. Instead, it hangs off the cachefile as userdata. // The brt_header is not managed by the cachetable. Instead, it hangs off the cachefile as userdata.
struct brt_header { struct brt_header {
...@@ -137,23 +133,7 @@ struct brt_header { ...@@ -137,23 +133,7 @@ struct brt_header {
u_int64_t root_put_counter; // the generation number of the brt u_int64_t root_put_counter; // the generation number of the brt
// This is the map from block numbers to offsets BLOCK_TABLE blocktable;
//int n_blocks, n_blocks_array_size;
//struct block_descriptor *blocks;
BLOCKNUM free_blocks; // free list for blocks. Use -1 to indicate that there are no free blocks
BLOCKNUM unused_blocks; // first unused block
u_int64_t translated_blocknum_limit;
struct block_translation_pair *block_translation;
// Where and how big is the block translation vector stored on disk.
// The size of the on_disk buffer may no longer match the max_blocknum_translated field, since blocks may have been allocated or freed.
// We need to remember this old information so we can free it properly.
u_int64_t block_translation_size_on_disk; // the size of the block containing the translation (i.e. 8 times the number of entries)
u_int64_t block_translation_address_on_disk; // 0 if there is no memory allocated
// The in-memory data structure for block allocation
BLOCK_ALLOCATOR block_allocator;
}; };
struct brt { struct brt {
...@@ -292,12 +272,6 @@ void toku_brtheader_free (struct brt_header *h); ...@@ -292,12 +272,6 @@ void toku_brtheader_free (struct brt_header *h);
int toku_brtheader_close (CACHEFILE cachefile, void *header_v, char **error_string); int toku_brtheader_close (CACHEFILE cachefile, void *header_v, char **error_string);
int toku_brtheader_checkpoint (CACHEFILE cachefile, void *header_v); int toku_brtheader_checkpoint (CACHEFILE cachefile, void *header_v);
#define BLOCK_ALLOCATOR_ALIGNMENT 4096
// How much must be reserved at the beginning for the block?
// The actual header is 8+4+4+8+8_4+8+ the length of the db names + 1 pointer for each root.
// So 4096 should be enough.
#define BLOCK_ALLOCATOR_HEADER_RESERVE 4096
int toku_db_badformat(void); int toku_db_badformat(void);
#endif #endif
...@@ -353,24 +353,17 @@ int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct b ...@@ -353,24 +353,17 @@ int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct b
int r; int r;
{ {
lock_for_pwrite(); lock_for_pwrite();
//TODO: #1463 START (might not be the entire range
// If the node has never been written, then write the whole buffer, including the zeros // If the node has never been written, then write the whole buffer, including the zeros
assert(blocknum.b>=0); assert(blocknum.b>=0);
//printf("%s:%d h=%p\n", __FILE__, __LINE__, h); //printf("%s:%d h=%p\n", __FILE__, __LINE__, h);
//printf("%s:%d translated_blocknum_limit=%lu blocknum.b=%lu\n", __FILE__, __LINE__, h->translated_blocknum_limit, blocknum.b); //printf("%s:%d translated_blocknum_limit=%lu blocknum.b=%lu\n", __FILE__, __LINE__, h->translated_blocknum_limit, blocknum.b);
//printf("%s:%d allocator=%p\n", __FILE__, __LINE__, h->block_allocator); //printf("%s:%d allocator=%p\n", __FILE__, __LINE__, h->block_allocator);
//printf("%s:%d bt=%p\n", __FILE__, __LINE__, h->block_translation); //printf("%s:%d bt=%p\n", __FILE__, __LINE__, h->block_translation);
extend_block_translation(blocknum, h);
if (h->block_translation[blocknum.b].size > 0) {
block_allocator_free_block(h->block_allocator, h->block_translation[blocknum.b].diskoff);
h->block_translation[blocknum.b].diskoff = 0;
h->block_translation[blocknum.b].size = 0;
}
h->dirty = 1; // Allocating a block dirties the header. h->dirty = 1; // Allocating a block dirties the header.
size_t n_to_write = uncompressed_magic_len + compression_header_len + compressed_len; size_t n_to_write = uncompressed_magic_len + compression_header_len + compressed_len;
u_int64_t offset; u_int64_t offset;
block_allocator_alloc_block(h->block_allocator, n_to_write, &offset); toku_block_realloc(h->blocktable, blocknum, n_to_write, &offset);
h->block_translation[blocknum.b].diskoff = offset;
h->block_translation[blocknum.b].size = n_to_write;
ssize_t n_wrote; ssize_t n_wrote;
r=toku_pwrite_extend(fd, compressed_buf, n_to_write, offset, &n_wrote); r=toku_pwrite_extend(fd, compressed_buf, n_to_write, offset, &n_wrote);
if (r) { if (r) {
...@@ -378,6 +371,7 @@ int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct b ...@@ -378,6 +371,7 @@ int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct b
} else { } else {
r=0; r=0;
} }
//TODO: #1463 END
unlock_for_pwrite(); unlock_for_pwrite();
} }
...@@ -391,8 +385,7 @@ int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct b ...@@ -391,8 +385,7 @@ int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct b
int toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash, BRTNODE *brtnode, struct brt_header *h) { int toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash, BRTNODE *brtnode, struct brt_header *h) {
if (0) printf("Deserializing Block %" PRId64 "\n", blocknum.b); if (0) printf("Deserializing Block %" PRId64 "\n", blocknum.b);
if (h->panic) return h->panic; if (h->panic) return h->panic;
assert(0 <= blocknum.b && (u_int64_t)blocknum.b < h->translated_blocknum_limit); DISKOFF offset = toku_block_get_offset(h->blocktable, blocknum);
DISKOFF offset = h->block_translation[blocknum.b].diskoff;
TAGMALLOC(BRTNODE, result); TAGMALLOC(BRTNODE, result);
struct rbuf rc; struct rbuf rc;
int i; int i;
...@@ -714,16 +707,19 @@ int toku_serialize_brt_header_to_wbuf (struct wbuf *wbuf, struct brt_header *h) ...@@ -714,16 +707,19 @@ int toku_serialize_brt_header_to_wbuf (struct wbuf *wbuf, struct brt_header *h)
wbuf_int (wbuf, size); wbuf_int (wbuf, size);
wbuf_int (wbuf, BRT_LAYOUT_VERSION); wbuf_int (wbuf, BRT_LAYOUT_VERSION);
wbuf_int (wbuf, h->nodesize); wbuf_int (wbuf, h->nodesize);
wbuf_BLOCKNUM(wbuf, h->free_blocks); //TODO: Use 'prelocked/unlocked' versions to make this atomic
wbuf_BLOCKNUM(wbuf, h->unused_blocks); //TODO: #1463 START
toku_block_realloc_translation_unlocked(h->blocktable);
toku_block_wbuf_free_blocks_unlocked(h->blocktable, wbuf);
toku_block_wbuf_unused_blocks_unlocked(h->blocktable, wbuf);
//TODO: #1463 END
wbuf_int (wbuf, h->n_named_roots); wbuf_int (wbuf, h->n_named_roots);
if (h->block_translation_address_on_disk != 0) { //TODO: #1463 START
block_allocator_free_block(h->block_allocator, h->block_translation_address_on_disk);
}
block_allocator_alloc_block(h->block_allocator, 4 + 16*h->translated_blocknum_limit, &h->block_translation_address_on_disk);
//printf("%s:%d bta=%lu size=%lu\n", __FILE__, __LINE__, h->block_translation_address_on_disk, 4 + 16*h->translated_blocknum_limit); //printf("%s:%d bta=%lu size=%lu\n", __FILE__, __LINE__, h->block_translation_address_on_disk, 4 + 16*h->translated_blocknum_limit);
wbuf_ulonglong(wbuf, h->translated_blocknum_limit); toku_block_wbuf_translated_blocknum_limit_unlocked(h->blocktable, wbuf);
wbuf_DISKOFF(wbuf, h->block_translation_address_on_disk); toku_block_wbuf_block_translation_address_on_disk_unlocked(h->blocktable, wbuf);
//TODO: #1463 END
if (h->n_named_roots>=0) { if (h->n_named_roots>=0) {
int i; int i;
for (i=0; i<h->n_named_roots; i++) { for (i=0; i<h->n_named_roots; i++) {
...@@ -746,18 +742,31 @@ int toku_serialize_brt_header_to (int fd, struct brt_header *h) { ...@@ -746,18 +742,31 @@ int toku_serialize_brt_header_to (int fd, struct brt_header *h) {
int rr = 0; int rr = 0;
if (h->panic) return h->panic; if (h->panic) return h->panic;
lock_for_pwrite(); lock_for_pwrite();
toku_block_lock_for_multiple_operations();
struct wbuf w_main;
unsigned int size_main = toku_serialize_brt_header_size (h);
{ {
struct wbuf w; wbuf_init(&w_main, toku_malloc(size_main), size_main);
unsigned int size = toku_serialize_brt_header_size (h);
wbuf_init(&w, toku_malloc(size), size);
{ {
int r=toku_serialize_brt_header_to_wbuf(&w, h); int r=toku_serialize_brt_header_to_wbuf(&w_main, h);
assert(r==0); assert(r==0);
} }
assert(w.ndone==size); assert(w_main.ndone==size_main);
}
struct wbuf w_translation;
u_int64_t size_translation;
u_int64_t address_translation;
{
toku_block_wbuf_init_and_fill_unlocked(h->blocktable, &w_translation,
&size_translation, &address_translation);
size_translation = w_translation.size;
}
toku_block_unlock_for_multiple_operations();
{
//Actual Write main header
ssize_t nwrote; ssize_t nwrote;
rr = toku_pwrite_extend(fd, w.buf, w.ndone, 0, &nwrote); rr = toku_pwrite_extend(fd, w_main.buf, w_main.ndone, 0, &nwrote);
toku_free(w.buf); toku_free(w_main.buf);
if (rr) { if (rr) {
if (h->panic==0) { if (h->panic==0) {
char s[200]; char s[200];
...@@ -767,31 +776,21 @@ int toku_serialize_brt_header_to (int fd, struct brt_header *h) { ...@@ -767,31 +776,21 @@ int toku_serialize_brt_header_to (int fd, struct brt_header *h) {
} }
goto finish; goto finish;
} }
assert((u_int64_t)nwrote==size); assert((u_int64_t)nwrote==size_main);
} }
{ {
struct wbuf w; //Actual Write translation table
u_int64_t size = 4 + h->translated_blocknum_limit * 16; // 4 for the checksum
//printf("%s:%d writing translation table of size %ld at %ld\n", __FILE__, __LINE__, size, h->block_translation_address_on_disk);
wbuf_init(&w, toku_malloc(size), size);
u_int64_t i;
for (i=0; i<h->translated_blocknum_limit; i++) {
//printf("%s:%d %ld,%ld\n", __FILE__, __LINE__, h->block_translation[i].diskoff, h->block_translation[i].size);
wbuf_ulonglong(&w, h->block_translation[i].diskoff);
wbuf_ulonglong(&w, h->block_translation[i].size);
}
u_int32_t checksum = x1764_finish(&w.checksum);
wbuf_int(&w, checksum);
ssize_t nwrote; ssize_t nwrote;
rr = toku_pwrite_extend(fd, w.buf, size, h->block_translation_address_on_disk, &nwrote); rr = toku_pwrite_extend(fd, w_translation.buf,
toku_free(w.buf); size_translation, address_translation, &nwrote);
if (rr) { if (rr) {
//fprintf(stderr, "%s:%d: Error writing data to file. errno=%d (%s)\n", __FILE__, __LINE__, rr, strerror(rr)); //fprintf(stderr, "%s:%d: Error writing data to file. errno=%d (%s)\n", __FILE__, __LINE__, rr, strerror(rr));
goto finish; goto finish;
} }
assert((u_int64_t)nwrote==size); assert((u_int64_t)nwrote==size_translation);
} }
finish: finish:
toku_free(w_translation.buf);
unlock_for_pwrite(); unlock_for_pwrite();
return rr; return rr;
} }
...@@ -820,49 +819,48 @@ deserialize_brtheader (u_int32_t size, int fd, DISKOFF off, struct brt_header ** ...@@ -820,49 +819,48 @@ deserialize_brtheader (u_int32_t size, int fd, DISKOFF off, struct brt_header **
h->layout_version = rbuf_int(&rc); h->layout_version = rbuf_int(&rc);
h->nodesize = rbuf_int(&rc); h->nodesize = rbuf_int(&rc);
assert(h->layout_version==BRT_LAYOUT_VERSION_9); assert(h->layout_version==BRT_LAYOUT_VERSION_9);
h->free_blocks = rbuf_blocknum(&rc); BLOCKNUM free_blocks = rbuf_blocknum(&rc);
h->unused_blocks = rbuf_blocknum(&rc); BLOCKNUM unused_blocks = rbuf_blocknum(&rc);
h->n_named_roots = rbuf_int(&rc); h->n_named_roots = rbuf_int(&rc);
h->translated_blocknum_limit = rbuf_diskoff(&rc); u_int64_t translated_blocknum_limit = rbuf_diskoff(&rc);
h->block_translation_size_on_disk = 4 + 16 * h->translated_blocknum_limit; u_int64_t block_translation_address_on_disk = rbuf_diskoff(&rc);
h->block_translation_address_on_disk = rbuf_diskoff(&rc); u_int64_t block_translation_size_on_disk = 4 +//4 for checksum
// Set up the the block translation buffer. 16*translated_blocknum_limit;
create_block_allocator(&h->block_allocator, BLOCK_ALLOCATOR_HEADER_RESERVE, BLOCK_ALLOCATOR_ALIGNMENT);
// printf("%s:%d translated_blocknum_limit=%ld, block_translation_address_on_disk=%ld\n", __FILE__, __LINE__, h->translated_blocknum_limit, h->block_translation_address_on_disk); // printf("%s:%d translated_blocknum_limit=%ld, block_translation_address_on_disk=%ld\n", __FILE__, __LINE__, h->translated_blocknum_limit, h->block_translation_address_on_disk);
if (h->block_translation_address_on_disk == 0) { if (block_translation_address_on_disk == 0) {
h->block_translation = 0; //There is no data on the disk.
} else { //Create empty translation table.
toku_blocktable_create(&h->blocktable,
free_blocks, unused_blocks,
translated_blocknum_limit,
block_translation_address_on_disk,
block_translation_size_on_disk, NULL);
}
else {
//Load translation table if it exists on disk.
lock_for_pwrite(); lock_for_pwrite();
block_allocator_alloc_block_at(h->block_allocator, h->block_translation_size_on_disk, h->block_translation_address_on_disk); //TODO: #1463 load!
XMALLOC_N(h->translated_blocknum_limit, h->block_translation); unsigned char *XMALLOC_N(block_translation_size_on_disk, tbuf);
unsigned char *XMALLOC_N(h->block_translation_size_on_disk, tbuf);
{ {
ssize_t r = pread(fd, tbuf, h->block_translation_size_on_disk, h->block_translation_address_on_disk); ssize_t r = pread(fd, tbuf, block_translation_size_on_disk, block_translation_address_on_disk);
// This cast is messed up in 32-bits if the block translation table is ever more than 4GB. But in that case, the translation table itself won't fit in main memory. // This cast is messed up in 32-bits if the block translation table is ever more than 4GB. But in that case, the translation table itself won't fit in main memory.
assert((u_int64_t)r==h->block_translation_size_on_disk); assert((u_int64_t)r==block_translation_size_on_disk);
} }
{ {
// check the checksum // check the checksum
u_int32_t x1764 = x1764_memory(tbuf, h->block_translation_size_on_disk - 4); u_int32_t x1764 = x1764_memory(tbuf, block_translation_size_on_disk - 4);
u_int64_t offset = h->block_translation_size_on_disk - 4; u_int64_t offset = block_translation_size_on_disk - 4;
//printf("%s:%d read from %ld (x1764 offset=%ld) size=%ld\n", __FILE__, __LINE__, h->block_translation_address_on_disk, offset, h->block_translation_size_on_disk); //printf("%s:%d read from %ld (x1764 offset=%ld) size=%ld\n", __FILE__, __LINE__, block_translation_address_on_disk, offset, block_translation_size_on_disk);
u_int32_t stored_x1764 = toku_ntohl(*(int*)(tbuf + offset)); u_int32_t stored_x1764 = toku_ntohl(*(int*)(tbuf + offset));
assert(x1764 == stored_x1764); assert(x1764 == stored_x1764);
} }
// now read all that data. // Create table and read in data.
u_int64_t i; toku_blocktable_create(&h->blocktable,
struct rbuf rt; free_blocks, unused_blocks,
rt.buf = tbuf; translated_blocknum_limit,
rt.ndone = 0; block_translation_address_on_disk,
rt.size = h->block_translation_size_on_disk-4; block_translation_size_on_disk,
assert(rt.size>0); tbuf);
for (i=0; i<h->translated_blocknum_limit; i++) {
h->block_translation[i].diskoff = rbuf_diskoff(&rt);
h->block_translation[i].size = rbuf_diskoff(&rt);
if (h->block_translation[i].size > 0)
block_allocator_alloc_block_at(h->block_allocator, h->block_translation[i].size, h->block_translation[i].diskoff);
//printf("%s:%d %ld %ld\n", __FILE__, __LINE__, h->block_translation[i].diskoff, h->block_translation[i].size);
}
unlock_for_pwrite(); unlock_for_pwrite();
toku_free(tbuf); toku_free(tbuf);
} }
...@@ -898,7 +896,7 @@ deserialize_brtheader (u_int32_t size, int fd, DISKOFF off, struct brt_header ** ...@@ -898,7 +896,7 @@ deserialize_brtheader (u_int32_t size, int fd, DISKOFF off, struct brt_header **
toku_free(rc.buf); toku_free(rc.buf);
{ {
int r; int r;
if ((r = deserialize_fifo_at(fd, block_allocator_allocated_limit(h->block_allocator), &h->fifo))) return r; if ((r = deserialize_fifo_at(fd, toku_block_allocator_allocated_limit(h->blocktable), &h->fifo))) return r;
} }
*brth = h; *brth = h;
return 0; return 0;
......
...@@ -557,10 +557,8 @@ brtheader_init(struct brt_header *h) { ...@@ -557,10 +557,8 @@ brtheader_init(struct brt_header *h) {
static void static void
brtheader_partial_destroy(struct brt_header *h) { brtheader_partial_destroy(struct brt_header *h) {
toku_free(h->block_translation); toku_blocktable_destroy(&h->blocktable);
h->block_translation = 0;
toku_fifo_free(&h->fifo); toku_fifo_free(&h->fifo);
destroy_block_allocator(&h->block_allocator);
} }
static void static void
...@@ -603,62 +601,6 @@ toku_brtheader_free (struct brt_header *h) { ...@@ -603,62 +601,6 @@ toku_brtheader_free (struct brt_header *h) {
brtheader_free(h); brtheader_free(h);
} }
void
extend_block_translation (BLOCKNUM blocknum, struct brt_header *h)
// Effect: Record a block translation. This means extending the translation table, and setting the diskoff and size to zero in any of the unused spots.
{
if (h->translated_blocknum_limit <= (u_int64_t)blocknum.b) {
if (h->block_translation == 0) assert(h->translated_blocknum_limit==0);
u_int64_t new_limit = blocknum.b + 1;
u_int64_t old_limit = h->translated_blocknum_limit;
u_int64_t j;
XREALLOC_N(new_limit, h->block_translation);
for (j=old_limit; j<new_limit; j++) {
h->block_translation[j].diskoff = 0;
h->block_translation[j].size = 0;
}
h->translated_blocknum_limit = new_limit;
}
}
const DISKOFF diskoff_is_null = (DISKOFF)-1; // in a freelist, this indicates end of list
const DISKOFF size_is_free = (DISKOFF)-1;
static int
allocate_diskblocknumber (BLOCKNUM *res, BRT brt, TOKULOGGER logger __attribute__((__unused__))) {
BLOCKNUM result;
if (brt->h->free_blocks.b == diskoff_is_null) {
// no blocks in the free list
result = brt->h->unused_blocks;
brt->h->unused_blocks.b++;
} else {
result = brt->h->free_blocks;
assert(brt->h->block_translation[result.b].size = size_is_free);
brt->h->block_translation[result.b].size = 0;
brt->h->free_blocks.b = brt->h->block_translation[result.b].diskoff; // pop the freelist
}
assert(result.b>0);
*res = result;
brt->h->dirty = 1;
return 0;
}
static int
free_diskblocknumber (BLOCKNUM *b, struct brt_header *h, TOKULOGGER logger __attribute__((__unused__)))
// Effect: Free a diskblock
// Watch out for the case where the disk block was never yet written to disk and is beyond the translated_blocknum_limit.
{
extend_block_translation(*b, h);
assert((u_int64_t)b->b < h->translated_blocknum_limit); // as a "limit" it should be <
assert(h->block_translation[b->b].size != size_is_free);
h->block_translation[b->b].size = size_is_free;
h->block_translation[b->b].diskoff = h->free_blocks.b;
h->free_blocks.b = b->b;
b->b = 0;
h->dirty = 1;
return 0;
}
static void static void
initialize_empty_brtnode (BRT t, BRTNODE n, BLOCKNUM nodename, int height) initialize_empty_brtnode (BRT t, BRTNODE n, BLOCKNUM nodename, int height)
// Effect: Fill in N as an empty brtnode. // Effect: Fill in N as an empty brtnode.
...@@ -712,7 +654,9 @@ brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, CACHEKEY *r ...@@ -712,7 +654,9 @@ brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, CACHEKEY *r
int new_height = nodea->height+1; int new_height = nodea->height+1;
int new_nodesize = brt->h->nodesize; int new_nodesize = brt->h->nodesize;
BLOCKNUM newroot_diskoff; BLOCKNUM newroot_diskoff;
r = allocate_diskblocknumber(&newroot_diskoff, brt, logger); r = toku_allocate_diskblocknumber(brt->h->blocktable,
&newroot_diskoff,
&brt->h->dirty, logger);
assert(r==0); assert(r==0);
assert(newroot); assert(newroot);
newroot->ever_been_written = 0; newroot->ever_been_written = 0;
...@@ -780,7 +724,7 @@ int toku_create_new_brtnode (BRT t, BRTNODE *result, int height, TOKULOGGER logg ...@@ -780,7 +724,7 @@ int toku_create_new_brtnode (BRT t, BRTNODE *result, int height, TOKULOGGER logg
TAGMALLOC(BRTNODE, n); TAGMALLOC(BRTNODE, n);
int r; int r;
BLOCKNUM name; BLOCKNUM name;
r = allocate_diskblocknumber (&name, t, logger); r = toku_allocate_diskblocknumber(t->h->blocktable, &name, &t->h->dirty, logger);
assert(r==0); assert(r==0);
assert(n); assert(n);
assert(t->h->nodesize>0); assert(t->h->nodesize>0);
...@@ -2227,15 +2171,8 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_io, TOKUL ...@@ -2227,15 +2171,8 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_io, TOKUL
if (did_merge) { if (did_merge) {
BLOCKNUM bn = childb->thisnodename; BLOCKNUM bn = childb->thisnodename;
rrb = toku_cachetable_unpin_and_remove(t->cf, bn); rrb = toku_cachetable_unpin_and_remove(t->cf, bn);
// If the block_translation indicates that the size is <=0 then there is no block allocated. rrb1 = toku_free_diskblocknumber(t->h->blocktable, &bn,
// The block translation might not be big enough, and that also indicates no block allocated. &t->h->dirty, logger);
assert(0 <= bn.b); // the blocknumber better be good
if ((unsigned)bn.b < t->h->translated_blocknum_limit) {
if (t->h->block_translation[bn.b].size > 0) {
block_allocator_free_block(t->h->block_allocator, t->h->block_translation[bn.b].diskoff);
}
}
rrb1 = free_diskblocknumber(&bn, t->h, logger);
} else { } else {
rrb = toku_unpin_brtnode(t, childb); rrb = toku_unpin_brtnode(t, childb);
} }
...@@ -2246,7 +2183,7 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_io, TOKUL ...@@ -2246,7 +2183,7 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_io, TOKUL
} }
verify_local_fingerprint_nonleaf(node); verify_local_fingerprint_nonleaf(node);
return r; return r;
} }
static int static int
brt_handle_maybe_reactive_child(BRT t, BRTNODE node, int childnum, enum reactivity re, BOOL *did_io, TOKULOGGER logger, BOOL *did_react) { brt_handle_maybe_reactive_child(BRT t, BRTNODE node, int childnum, enum reactivity re, BOOL *did_io, TOKULOGGER logger, BOOL *did_react) {
...@@ -2315,7 +2252,8 @@ flush_this_child (BRT t, BRTNODE node, int childnum, TOKULOGGER logger, enum rea ...@@ -2315,7 +2252,8 @@ flush_this_child (BRT t, BRTNODE node, int childnum, TOKULOGGER logger, enum rea
{ {
assert(node->height>0); assert(node->height>0);
BLOCKNUM targetchild = BNC_BLOCKNUM(node, childnum); BLOCKNUM targetchild = BNC_BLOCKNUM(node, childnum);
assert(targetchild.b>=0 && targetchild.b<t->h->unused_blocks.b); // This assertion could fail in a concurrent setting since another process might have bumped unused memory. //TODO: #1463 This assert...
toku_verify_diskblocknumber_allocated(t->h->blocktable, targetchild);
u_int32_t childfullhash = compute_child_fullhash(t->cf, node, childnum); u_int32_t childfullhash = compute_child_fullhash(t->cf, node, childnum);
BRTNODE child; BRTNODE child;
{ {
...@@ -2760,23 +2698,18 @@ static int brt_init_header(BRT t, TOKUTXN txn) { ...@@ -2760,23 +2698,18 @@ static int brt_init_header(BRT t, TOKUTXN txn) {
t->h->dirty=1; t->h->dirty=1;
t->h->flags_array[0] = t->flags; t->h->flags_array[0] = t->flags;
t->h->nodesize=t->nodesize; t->h->nodesize=t->nodesize;
t->h->free_blocks = make_blocknum(-1); toku_blocktable_create_new(&t->h->blocktable);
t->h->unused_blocks=make_blocknum(2);
t->h->translated_blocknum_limit = 0;
t->h->block_translation = 0;
t->h->block_translation_size_on_disk = 0;
t->h->block_translation_address_on_disk = 0;
// printf("%s:%d translated_blocknum_limit=%ld, block_translation_address_on_disk=%ld\n", __FILE__, __LINE__, t->h->translated_blocknum_limit, t->h->block_translation_address_on_disk);
create_block_allocator(&t->h->block_allocator, BLOCK_ALLOCATOR_HEADER_RESERVE, BLOCK_ALLOCATOR_ALIGNMENT);
toku_fifo_create(&t->h->fifo); toku_fifo_create(&t->h->fifo);
t->h->root_put_counter = global_root_put_counter++; t->h->root_put_counter = global_root_put_counter++;
{ {
BLOCKNUM free_blocks = toku_block_get_free_blocks(t->h->blocktable);
BLOCKNUM unused_blocks = toku_block_get_unused_blocks(t->h->blocktable);
LOGGEDBRTHEADER lh = {.size= toku_serialize_brt_header_size(t->h), LOGGEDBRTHEADER lh = {.size= toku_serialize_brt_header_size(t->h),
.flags = t->flags, .flags = t->flags,
.nodesize = t->h->nodesize, .nodesize = t->h->nodesize,
.free_blocks = t->h->free_blocks, .free_blocks = free_blocks,
.unused_blocks = t->h->unused_blocks, .unused_blocks = unused_blocks,
.n_named_roots = t->h->n_named_roots }; .n_named_roots = t->h->n_named_roots };
if (t->h->n_named_roots>=0) { if (t->h->n_named_roots>=0) {
lh.u.many.names = t->h->names; lh.u.many.names = t->h->names;
...@@ -2788,7 +2721,7 @@ static int brt_init_header(BRT t, TOKUTXN txn) { ...@@ -2788,7 +2721,7 @@ static int brt_init_header(BRT t, TOKUTXN txn) {
} }
if ((r=setup_initial_brt_root_node(t, root, toku_txn_logger(txn)))!=0) { return r; } if ((r=setup_initial_brt_root_node(t, root, toku_txn_logger(txn)))!=0) { return r; }
//printf("%s:%d putting %p (%d)\n", __FILE__, __LINE__, t->h, 0); //printf("%s:%d putting %p (%d)\n", __FILE__, __LINE__, t->h, 0);
assert(t->h->free_blocks.b==-1); toku_block_verify_no_free_blocks(t->h->blocktable);
toku_cachefile_set_userdata(t->cf, t->h, toku_brtheader_close, toku_brtheader_checkpoint); toku_cachefile_set_userdata(t->cf, t->h, toku_brtheader_close, toku_brtheader_checkpoint);
return r; return r;
...@@ -2940,7 +2873,7 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char ...@@ -2940,7 +2873,7 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char
t->h->n_named_roots++; t->h->n_named_roots++;
if ((t->h->names[t->h->n_named_roots-1] = toku_strdup(dbname)) == 0) { assert(errno==ENOMEM); r=ENOMEM; goto died_after_read_and_pin; } if ((t->h->names[t->h->n_named_roots-1] = toku_strdup(dbname)) == 0) { assert(errno==ENOMEM); r=ENOMEM; goto died_after_read_and_pin; }
//printf("%s:%d t=%p\n", __FILE__, __LINE__, t); //printf("%s:%d t=%p\n", __FILE__, __LINE__, t);
r = allocate_diskblocknumber(&t->h->roots[t->h->n_named_roots-1], t, toku_txn_logger(txn)); r = toku_allocate_diskblocknumber(t->h->blocktable, &t->h->roots[t->h->n_named_roots-1], &t->h->dirty, toku_txn_logger(txn));
if (r!=0) goto died_after_read_and_pin; if (r!=0) goto died_after_read_and_pin;
t->h->dirty = 1; t->h->dirty = 1;
compute_and_fill_remembered_hash(t, t->h->n_named_roots-1); compute_and_fill_remembered_hash(t, t->h->n_named_roots-1);
...@@ -3074,7 +3007,9 @@ toku_brtheader_checkpoint (CACHEFILE cachefile, void *header_v) ...@@ -3074,7 +3007,9 @@ toku_brtheader_checkpoint (CACHEFILE cachefile, void *header_v)
int r = toku_serialize_brt_header_to(toku_cachefile_fd(cachefile), h); int r = toku_serialize_brt_header_to(toku_cachefile_fd(cachefile), h);
if (r) return r; if (r) return r;
} }
u_int64_t write_to = block_allocator_allocated_limit(h->block_allocator); // Must compute this after writing the header. //We would want retrieving 'write_to' and writing to that point to be
//atomic. This is only done during shutdown of a BRT, so we allow it.
u_int64_t write_to = toku_block_allocator_allocated_limit(h->blocktable); // Must compute this after writing the header.
//printf("%s:%d fifo written to %lu\n", __FILE__, __LINE__, write_to); //printf("%s:%d fifo written to %lu\n", __FILE__, __LINE__, write_to);
{ {
int r = toku_serialize_fifo_at(toku_cachefile_fd(cachefile), write_to, h->fifo); int r = toku_serialize_fifo_at(toku_cachefile_fd(cachefile), write_to, h->fifo);
...@@ -4360,12 +4295,7 @@ int toku_dump_brt (FILE *f, BRT brt) { ...@@ -4360,12 +4295,7 @@ int toku_dump_brt (FILE *f, BRT brt) {
CACHEKEY *rootp; CACHEKEY *rootp;
assert(brt->h); assert(brt->h);
u_int32_t fullhash; u_int32_t fullhash;
u_int64_t i; toku_block_dump_translation_table(f, brt->h->blocktable);
fprintf(f, "Block translation:");
for (i=0; i<brt->h->translated_blocknum_limit; i++) {
fprintf(f, " %"PRIu64": %"PRId64" %"PRId64"", i, brt->h->block_translation[i].diskoff, brt->h->block_translation[i].size);
}
fprintf(f, "\n");
rootp = toku_calculate_root_offset_pointer(brt, &fullhash); rootp = toku_calculate_root_offset_pointer(brt, &fullhash);
return toku_dump_brtnode(f, brt, *rootp, 0, 0, 0, 0, 0); return toku_dump_brtnode(f, brt, *rootp, 0, 0, 0, 0, 0);
} }
...@@ -4396,12 +4326,14 @@ static void toku_brt_lock_init(void) { ...@@ -4396,12 +4326,14 @@ static void toku_brt_lock_init(void) {
toku_pwrite_lock_init(); toku_pwrite_lock_init();
toku_logger_lock_init(); toku_logger_lock_init();
toku_graceful_lock_init(); toku_graceful_lock_init();
toku_blocktable_lock_init();
} }
static void toku_brt_lock_destroy(void) { static void toku_brt_lock_destroy(void) {
toku_pwrite_lock_destroy(); toku_pwrite_lock_destroy();
toku_logger_lock_destroy(); toku_logger_lock_destroy();
toku_graceful_lock_destroy(); toku_graceful_lock_destroy();
toku_blocktable_lock_destroy();
} }
void toku_brt_init(void) { void toku_brt_init(void) {
......
...@@ -111,8 +111,6 @@ enum brt_header_flags { ...@@ -111,8 +111,6 @@ enum brt_header_flags {
int toku_brt_keyrange (BRT brt, DBT *key, u_int64_t *less, u_int64_t *equal, u_int64_t *greater); int toku_brt_keyrange (BRT brt, DBT *key, u_int64_t *less, u_int64_t *equal, u_int64_t *greater);
void extend_block_translation (BLOCKNUM blocknum, struct brt_header *h);
void toku_brt_init(void); void toku_brt_init(void);
void toku_brt_destroy(void); void toku_brt_destroy(void);
void toku_pwrite_lock_init(void); void toku_pwrite_lock_init(void);
......
...@@ -30,8 +30,10 @@ dump_header (int f, struct brt_header **header) { ...@@ -30,8 +30,10 @@ dump_header (int f, struct brt_header **header) {
else printf(" layout_version=%d\n", h->layout_version); else printf(" layout_version=%d\n", h->layout_version);
printf(" dirty=%d\n", h->dirty); printf(" dirty=%d\n", h->dirty);
printf(" nodesize=%u\n", h->nodesize); printf(" nodesize=%u\n", h->nodesize);
printf(" free_blocks=%" PRId64 "\n", h->free_blocks.b); BLOCKNUM free_blocks = toku_block_get_free_blocks(h->blocktable);
printf(" unused_memory=%" PRId64 "\n", h->unused_blocks.b); BLOCKNUM unused_blocks = toku_block_get_unused_blocks(h->blocktable);
printf(" free_blocks=%" PRId64 "\n", free_blocks.b);
printf(" unused_memory=%" PRId64 "\n", unused_blocks.b);
if (h->n_named_roots==-1) { if (h->n_named_roots==-1) {
printf(" unnamed_root=%" PRId64 "\n", h->roots[0].b); printf(" unnamed_root=%" PRId64 "\n", h->roots[0].b);
printf(" flags=%u\n", h->flags_array[0]); printf(" flags=%u\n", h->flags_array[0]);
...@@ -165,10 +167,7 @@ dump_node (int f, BLOCKNUM blocknum, struct brt_header *h) { ...@@ -165,10 +167,7 @@ dump_node (int f, BLOCKNUM blocknum, struct brt_header *h) {
static void static void
dump_block_translation(struct brt_header *h, u_int64_t offset) { dump_block_translation(struct brt_header *h, u_int64_t offset) {
if (offset < h->translated_blocknum_limit) { toku_block_dump_translation(h->blocktable, offset);
struct block_translation_pair *bx = &h->block_translation[offset];
printf("%" PRIu64 ": %" PRId64 " %" PRId64 "\n", offset, bx->diskoff, bx->size);
}
} }
static int static int
...@@ -187,28 +186,31 @@ dump_fragmentation(int f, struct brt_header *h) { ...@@ -187,28 +186,31 @@ dump_fragmentation(int f, struct brt_header *h) {
u_int64_t leafblocks = 0; u_int64_t leafblocks = 0;
u_int64_t fragsizes = 0; u_int64_t fragsizes = 0;
u_int64_t i; u_int64_t i;
for (i = 0; i < h->translated_blocknum_limit; i++) { u_int64_t limit = toku_block_get_translated_blocknum_limit(h->blocktable);
for (i = 0; i < limit; i++) {
BRTNODE n; BRTNODE n;
BLOCKNUM blocknum = make_blocknum(i); BLOCKNUM blocknum = make_blocknum(i);
int r = toku_deserialize_brtnode_from (f, blocknum, 0 /*pass zero for hash, it doesn't matter*/, &n, h); int r = toku_deserialize_brtnode_from (f, blocknum, 0 /*pass zero for hash, it doesn't matter*/, &n, h);
if (r != 0) continue; if (r != 0) continue;
blocksizes += h->block_translation[i].size;
DISKOFF size = toku_block_get_size(h->blocktable, blocknum);
blocksizes += size;
if (n->height == 0) { if (n->height == 0) {
leafsizes += h->block_translation[i].size; leafsizes += size;
leafblocks += 1; leafblocks += 1;
} }
toku_brtnode_free(&n); toku_brtnode_free(&n);
} }
size_t n = h->translated_blocknum_limit * sizeof (struct block_translation_pair); size_t n = limit * sizeof (struct block_translation_pair);
struct block_translation_pair *bx = toku_malloc(n); struct block_translation_pair *bx = toku_malloc(n);
memcpy(bx, h->block_translation, n); toku_block_memcpy_translation_table(h->blocktable, n, bx);
qsort(bx, h->translated_blocknum_limit, sizeof (struct block_translation_pair), bxpcmp); qsort(bx, limit, sizeof (struct block_translation_pair), bxpcmp);
for (i = 0; i < h->translated_blocknum_limit - 1; i++) { for (i = 0; i < limit - 1; i++) {
// printf("%lu %lu %lu\n", i, bx[i].diskoff, bx[i].size); // printf("%lu %lu %lu\n", i, bx[i].diskoff, bx[i].size);
fragsizes += bx[i+1].diskoff - (bx[i].diskoff + bx[i].size); fragsizes += bx[i+1].diskoff - (bx[i].diskoff + bx[i].size);
} }
toku_free(bx); toku_free(bx);
printf("translated_blocknum_limit: %" PRIu64 "\n", h->translated_blocknum_limit); printf("translated_blocknum_limit: %" PRIu64 "\n", limit);
printf("leafblocks: %" PRIu64 "\n", leafblocks); printf("leafblocks: %" PRIu64 "\n", leafblocks);
printf("blocksizes: %" PRIu64 "\n", blocksizes); printf("blocksizes: %" PRIu64 "\n", blocksizes);
printf("leafsizes: %" PRIu64 "\n", leafsizes); printf("leafsizes: %" PRIu64 "\n", leafsizes);
...@@ -299,15 +301,24 @@ main (int argc, const char *argv[]) { ...@@ -299,15 +301,24 @@ main (int argc, const char *argv[]) {
} else { } else {
BLOCKNUM blocknum; BLOCKNUM blocknum;
printf("Block translation:"); printf("Block translation:");
for (blocknum.b=0; blocknum.b<h->unused_blocks.b; blocknum.b++) {
u_int64_t limit = toku_block_get_translated_blocknum_limit(h->blocktable);
BLOCKNUM unused_blocks = toku_block_get_unused_blocks(h->blocktable);
size_t bx_size = limit * sizeof (struct block_translation_pair);
struct block_translation_pair *bx = toku_malloc(bx_size);
toku_block_memcpy_translation_table(h->blocktable, bx_size, bx);
for (blocknum.b=0; blocknum.b< unused_blocks.b; blocknum.b++) {
printf(" %" PRId64 ":", blocknum.b); printf(" %" PRId64 ":", blocknum.b);
if (h->block_translation[blocknum.b].size == -1) printf("free"); if (bx[blocknum.b].size == -1) printf("free");
else printf("%" PRId64 ":%" PRId64, h->block_translation[blocknum.b].diskoff, h->block_translation[blocknum.b].size); else printf("%" PRId64 ":%" PRId64, bx[blocknum.b].diskoff, bx[blocknum.b].size);
} }
for (blocknum.b=1; blocknum.b<h->unused_blocks.b; blocknum.b++) { for (blocknum.b=1; blocknum.b<unused_blocks.b; blocknum.b++) {
if (h->block_translation[blocknum.b].size != -1) if (bx[blocknum.b].size != -1)
dump_node(f, blocknum, h); dump_node(f, blocknum, h);
} }
toku_free(bx);
} }
toku_brtheader_free(h); toku_brtheader_free(h);
toku_malloc_cleanup(); toku_malloc_cleanup();
......
...@@ -137,8 +137,9 @@ static void toku_recover_fheader (LSN UU(lsn), TXNID UU(txnid),FILENUM filenum,L ...@@ -137,8 +137,9 @@ static void toku_recover_fheader (LSN UU(lsn), TXNID UU(txnid),FILENUM filenum,L
XMALLOC(h->flags_array); XMALLOC(h->flags_array);
h->flags_array[0] = header.flags; h->flags_array[0] = header.flags;
h->nodesize = header.nodesize; h->nodesize = header.nodesize;
h->free_blocks = header.free_blocks; assert(h->blocktable /* Not initialized. Is this used? */);
h->unused_blocks = header.unused_blocks; toku_block_recovery_set_free_blocks(h->blocktable, header.free_blocks);
toku_block_recovery_set_unused_blocks(h->blocktable, header.unused_blocks);
h->n_named_roots = header.n_named_roots; h->n_named_roots = header.n_named_roots;
r=toku_fifo_create(&h->fifo); r=toku_fifo_create(&h->fifo);
assert(r==0); assert(r==0);
...@@ -687,7 +688,7 @@ toku_recover_changeunusedmemory (LSN UU(lsn), FILENUM filenum, BLOCKNUM UU(oldun ...@@ -687,7 +688,7 @@ toku_recover_changeunusedmemory (LSN UU(lsn), FILENUM filenum, BLOCKNUM UU(oldun
assert(r==0); assert(r==0);
assert(pair->brt); assert(pair->brt);
assert(pair->brt->h); assert(pair->brt->h);
pair->brt->h->unused_blocks = newunused; toku_block_recovery_set_unused_blocks(pair->brt->h->blocktable, newunused);
} }
static int toku_recover_checkpoint (LSN UU(lsn)) { static int toku_recover_checkpoint (LSN UU(lsn)) {
......
...@@ -84,8 +84,6 @@ REGRESSION_TESTS_RAW = \ ...@@ -84,8 +84,6 @@ REGRESSION_TESTS_RAW = \
omt-cursor-test \ omt-cursor-test \
omt-test \ omt-test \
shortcut \ shortcut \
test1305 \
test1308a \
test-assert \ test-assert \
test-brt-delete-both \ test-brt-delete-both \
test-brt-overflow \ test-brt-overflow \
......
...@@ -53,14 +53,13 @@ static void test_serialize(void) { ...@@ -53,14 +53,13 @@ static void test_serialize(void) {
memset(btps, 0, sizeof(btps)); memset(btps, 0, sizeof(btps));
brt->h = brt_h; brt->h = brt_h;
brt_h->panic = 0; brt_h->panic_string = 0; brt_h->panic = 0; brt_h->panic_string = 0;
brt_h->translated_blocknum_limit = 1; toku_blocktable_create_new(&brt_h->blocktable);
brt_h->block_translation = btps; toku_blocktable_debug_set_translation(brt_h->blocktable, 1, btps);
brt_h->block_translation[20].diskoff = 4096; btps[20].diskoff = 4096;
brt_h->block_translation[20].size = 100; btps[20].size = 100;
create_block_allocator(&brt_h->block_allocator, 4096, BLOCK_ALLOCATOR_ALIGNMENT);
{ {
u_int64_t b; u_int64_t b;
block_allocator_alloc_block(brt_h->block_allocator, 100, &b); toku_block_alloc(brt_h->blocktable, 100, &b);
assert(b==4096); assert(b==4096);
} }
...@@ -120,9 +119,8 @@ static void test_serialize(void) { ...@@ -120,9 +119,8 @@ static void test_serialize(void) {
toku_free(sn.u.n.childinfos); toku_free(sn.u.n.childinfos);
toku_free(sn.u.n.childkeys); toku_free(sn.u.n.childkeys);
block_allocator_free_block(brt_h->block_allocator, 4096); toku_block_free(brt_h->blocktable, 4096);
destroy_block_allocator(&brt_h->block_allocator); toku_blocktable_destroy(&brt_h->blocktable);
toku_free(brt_h->block_translation);
toku_free(brt_h); toku_free(brt_h);
toku_free(brt); toku_free(brt);
} }
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
/* This code requires that the buffer be big enough to hold whatever you put into it. */ /* This code requires that the buffer be big enough to hold whatever you put into it. */
/* This abstraction doesn't do a good job of hiding its internals. /* This abstraction doesn't do a good job of hiding its internals.
* Why? The performance of this code is important, and we want to inline stuff */ * Why? The performance of this code is important, and we want to inline stuff */
//Why is size here an int instead of DISKOFF like in the initializer?
struct wbuf { struct wbuf {
unsigned char *buf; unsigned char *buf;
unsigned int size; unsigned int size;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment