Commit 443c2202 authored by Yoni Fogel's avatar Yoni Fogel

Closes #1557 Blocktable now has 1 lock per brt

git-svn-id: file:///svn/toku/tokudb@10254 c7de825b-a66e-492c-adef-691d508d4ae1
parent ef041cd3
......@@ -30,6 +30,8 @@ struct block_table {
// The in-memory data structure for block allocation
BLOCK_ALLOCATOR block_allocator;
toku_pthread_mutex_t mutex;
int is_locked;
};
static const DISKOFF diskoff_is_null = (DISKOFF)-1; // in a freelist, this indicates end of list
......@@ -61,29 +63,28 @@ verify(BLOCK_TABLE bt, BLOCKNUM b) {
assert((u_int64_t)b.b < bt->translated_blocknum_limit);
}
static toku_pthread_mutex_t blocktable_mutex = TOKU_PTHREAD_MUTEX_INITIALIZER;
static int blocktable_is_locked=0;
void toku_blocktable_lock_init(void) {
int r = toku_pthread_mutex_init(&blocktable_mutex, NULL); assert(r == 0);
void blocktable_lock_init(BLOCK_TABLE bt) {
memset(&bt->mutex, 0, sizeof(bt->mutex));
int r = toku_pthread_mutex_init(&bt->mutex, NULL); assert(r == 0);
bt->is_locked = 0;
}
void toku_blocktable_lock_destroy(void) {
int r = toku_pthread_mutex_destroy(&blocktable_mutex); assert(r == 0);
void blocktable_lock_destroy(BLOCK_TABLE bt) {
int r = toku_pthread_mutex_destroy(&bt->mutex); assert(r == 0);
}
static inline void
lock_for_blocktable (void) {
lock_for_blocktable(BLOCK_TABLE bt) {
// Locks the blocktable_mutex.
int r = toku_pthread_mutex_lock(&blocktable_mutex);
int r = toku_pthread_mutex_lock(&bt->mutex);
assert(r==0);
blocktable_is_locked = 1;
bt->is_locked = 1;
}
static inline void
unlock_for_blocktable (void) {
blocktable_is_locked = 0;
int r = toku_pthread_mutex_unlock(&blocktable_mutex);
unlock_for_blocktable(BLOCK_TABLE bt) {
bt->is_locked = 0;
int r = toku_pthread_mutex_unlock(&bt->mutex);
assert(r==0);
}
......@@ -117,16 +118,16 @@ block_alloc_and_set_translation(BLOCK_TABLE bt, BLOCKNUM b, u_int64_t size, u_in
void
toku_block_alloc(BLOCK_TABLE bt, u_int64_t size, u_int64_t *offset) {
lock_for_blocktable();
lock_for_blocktable(bt);
block_alloc(bt, size, offset);
unlock_for_blocktable();
unlock_for_blocktable(bt);
}
void
toku_block_free(BLOCK_TABLE bt, u_int64_t offset) {
lock_for_blocktable();
lock_for_blocktable(bt);
block_free(bt, offset);
unlock_for_blocktable();
unlock_for_blocktable(bt);
}
static void
......@@ -137,29 +138,29 @@ update_size_on_disk(BLOCK_TABLE bt) {
void
toku_block_realloc(BLOCK_TABLE bt, BLOCKNUM b, u_int64_t size, u_int64_t *offset, int *dirty) {
lock_for_blocktable();
lock_for_blocktable(bt);
*dirty = 1;
extend_block_translation(bt, b);
block_free_blocknum(bt, b);
block_alloc_and_set_translation(bt, b, size, offset);
unlock_for_blocktable();
unlock_for_blocktable(bt);
}
void
toku_block_lock_for_multiple_operations(void) {
lock_for_blocktable();
toku_block_lock_for_multiple_operations(BLOCK_TABLE bt) {
lock_for_blocktable(bt);
}
void
toku_block_unlock_for_multiple_operations(void) {
assert(blocktable_is_locked);
unlock_for_blocktable();
toku_block_unlock_for_multiple_operations(BLOCK_TABLE bt) {
assert(bt->is_locked);
unlock_for_blocktable(bt);
}
void
toku_block_realloc_translation_unlocked(BLOCK_TABLE bt) {
assert(blocktable_is_locked);
assert(bt->is_locked);
if (bt->block_translation_address_on_disk != 0) {
block_allocator_free_block(bt->block_allocator, bt->block_translation_address_on_disk);
}
......@@ -171,32 +172,32 @@ toku_block_realloc_translation_unlocked(BLOCK_TABLE bt) {
void
toku_block_wbuf_free_blocks_unlocked(BLOCK_TABLE bt, struct wbuf *wbuf) {
assert(blocktable_is_locked);
assert(bt->is_locked);
wbuf_BLOCKNUM(wbuf, bt->free_blocks);
}
void
toku_block_wbuf_unused_blocks_unlocked(BLOCK_TABLE bt, struct wbuf *wbuf) {
assert(blocktable_is_locked);
assert(bt->is_locked);
wbuf_BLOCKNUM(wbuf, bt->unused_blocks);
}
void
toku_block_wbuf_translated_blocknum_limit_unlocked(BLOCK_TABLE bt, struct wbuf *wbuf) {
assert(blocktable_is_locked);
assert(bt->is_locked);
wbuf_ulonglong(wbuf, bt->translated_blocknum_limit);
}
void
toku_block_wbuf_block_translation_address_on_disk_unlocked(BLOCK_TABLE bt, struct wbuf *wbuf) {
assert(blocktable_is_locked);
assert(bt->is_locked);
wbuf_DISKOFF(wbuf, bt->block_translation_address_on_disk);
}
void
toku_block_wbuf_init_and_fill_unlocked(BLOCK_TABLE bt, struct wbuf *w,
u_int64_t *size, u_int64_t *address) {
assert(blocktable_is_locked);
assert(bt->is_locked);
update_size_on_disk(bt);
u_int64_t size_translation = bt->block_translation_size_on_disk;
//printf("%s:%d writing translation table of size_translation %ld at %ld\n", __FILE__, __LINE__, size_translation, bt->block_translation_address_on_disk);
......@@ -216,34 +217,34 @@ toku_block_wbuf_init_and_fill_unlocked(BLOCK_TABLE bt, struct wbuf *w,
DISKOFF
toku_block_get_offset(BLOCK_TABLE bt, BLOCKNUM b) {
lock_for_blocktable();
lock_for_blocktable(bt);
verify(bt, b);
DISKOFF r = bt->block_translation[b.b].diskoff;
unlock_for_blocktable();
unlock_for_blocktable(bt);
return r;
}
DISKOFF
toku_block_get_size(BLOCK_TABLE bt, BLOCKNUM b) {
lock_for_blocktable();
lock_for_blocktable(bt);
verify(bt, b);
DISKOFF r = bt->block_translation[b.b].size;
unlock_for_blocktable();
unlock_for_blocktable(bt);
return r;
}
void
toku_block_get_offset_size(BLOCK_TABLE bt, BLOCKNUM b, DISKOFF *offset, DISKOFF *size) {
lock_for_blocktable();
lock_for_blocktable(bt);
verify(bt, b);
*offset = bt->block_translation[b.b].diskoff;
*size = bt->block_translation[b.b].size;
unlock_for_blocktable();
unlock_for_blocktable(bt);
}
int
toku_allocate_diskblocknumber(BLOCK_TABLE bt, BLOCKNUM *res, int *dirty) {
lock_for_blocktable();
lock_for_blocktable(bt);
BLOCKNUM result;
if (bt->free_blocks.b == diskoff_is_null) {
// no blocks in the free list
......@@ -258,7 +259,7 @@ toku_allocate_diskblocknumber(BLOCK_TABLE bt, BLOCKNUM *res, int *dirty) {
assert(result.b>0);
*res = result;
*dirty = 1;
unlock_for_blocktable();
unlock_for_blocktable(bt);
return 0;
}
////CONVERTED above already
......@@ -270,7 +271,7 @@ toku_free_diskblocknumber(BLOCK_TABLE bt, BLOCKNUM *b, int *dirty)
// Effect: Free a diskblock
// Watch out for the case where the disk block was never yet written to disk
{
lock_for_blocktable();
lock_for_blocktable(bt);
extend_block_translation(bt, *b);
// If the block_translation indicates that the size is <=0
// then there is no disk block allocated.
......@@ -285,7 +286,7 @@ toku_free_diskblocknumber(BLOCK_TABLE bt, BLOCKNUM *b, int *dirty)
bt->free_blocks.b = b->b;
b->b = 0;
*dirty = 1;
unlock_for_blocktable();
unlock_for_blocktable(bt);
return 0;
}
......@@ -298,107 +299,108 @@ toku_block_verify_no_free_blocks(BLOCK_TABLE bt) {
//Verify a block has been allocated at least once.
void
toku_verify_diskblocknumber_allocated(BLOCK_TABLE bt, BLOCKNUM b) {
lock_for_blocktable();
lock_for_blocktable(bt);
assert(0 <= b.b);
assert( b.b < bt->unused_blocks.b);
unlock_for_blocktable();
unlock_for_blocktable(bt);
}
u_int64_t toku_block_allocator_allocated_limit(BLOCK_TABLE bt) {
lock_for_blocktable();
lock_for_blocktable(bt);
u_int64_t r = block_allocator_allocated_limit(bt->block_allocator);
unlock_for_blocktable();
unlock_for_blocktable(bt);
return r;
}
void
toku_block_dump_translation_table(FILE *f, BLOCK_TABLE bt) {
lock_for_blocktable();
lock_for_blocktable(bt);
u_int64_t i;
fprintf(f, "Block translation:");
for (i=0; i<bt->translated_blocknum_limit; i++) {
fprintf(f, " %"PRIu64": %"PRId64" %"PRId64"", i, bt->block_translation[i].diskoff, bt->block_translation[i].size);
}
fprintf(f, "\n");
unlock_for_blocktable();
unlock_for_blocktable(bt);
}
void
toku_block_dump_translation(BLOCK_TABLE bt, u_int64_t offset) {
lock_for_blocktable();
lock_for_blocktable(bt);
if (offset < bt->translated_blocknum_limit) {
struct block_translation_pair *bx = &bt->block_translation[offset];
printf("%" PRIu64 ": %" PRId64 " %" PRId64 "\n", offset, bx->diskoff, bx->size);
}
unlock_for_blocktable();
unlock_for_blocktable(bt);
}
void
toku_block_recovery_set_unused_blocks(BLOCK_TABLE bt, BLOCKNUM newunused) {
lock_for_blocktable();
lock_for_blocktable(bt);
bt->unused_blocks = newunused;
unlock_for_blocktable();
unlock_for_blocktable(bt);
}
void
toku_block_recovery_set_free_blocks(BLOCK_TABLE bt, BLOCKNUM newfree) {
lock_for_blocktable();
lock_for_blocktable(bt);
bt->free_blocks = newfree;
unlock_for_blocktable();
unlock_for_blocktable(bt);
}
void
toku_block_memcpy_translation_table(BLOCK_TABLE bt, size_t n, void *p) {
lock_for_blocktable();
lock_for_blocktable(bt);
memcpy(p, bt->block_translation, n);
unlock_for_blocktable();
unlock_for_blocktable(bt);
}
u_int64_t
toku_block_get_translated_blocknum_limit(BLOCK_TABLE bt) {
lock_for_blocktable();
lock_for_blocktable(bt);
u_int64_t r = bt->translated_blocknum_limit;
unlock_for_blocktable();
unlock_for_blocktable(bt);
return r;
}
BLOCKNUM
toku_block_get_free_blocks(BLOCK_TABLE bt) {
lock_for_blocktable();
lock_for_blocktable(bt);
BLOCKNUM r = bt->free_blocks;
unlock_for_blocktable();
unlock_for_blocktable(bt);
return r;
}
BLOCKNUM
toku_block_get_unused_blocks(BLOCK_TABLE bt) {
lock_for_blocktable();
lock_for_blocktable(bt);
BLOCKNUM r = bt->unused_blocks;
unlock_for_blocktable();
unlock_for_blocktable(bt);
return r;
}
//Must not call this function when anything else is using the blocktable.
//No one may use the blocktable afterwards.
void
toku_blocktable_destroy(BLOCK_TABLE *btp) {
lock_for_blocktable();
BLOCK_TABLE bt = *btp;
*btp = NULL;
toku_free(bt->block_translation);
bt->block_translation = NULL;
destroy_block_allocator(&bt->block_allocator);
blocktable_lock_destroy(bt);
toku_free(bt);
unlock_for_blocktable();
}
void
toku_blocktable_debug_set_translation(BLOCK_TABLE bt,
u_int64_t limit,
struct block_translation_pair *table) {
lock_for_blocktable();
lock_for_blocktable(bt);
if (bt->block_translation) toku_free(bt->block_translation);
bt->translated_blocknum_limit = limit;
bt->block_translation = table;
unlock_for_blocktable();
unlock_for_blocktable(bt);
}
void
......@@ -409,10 +411,10 @@ toku_blocktable_create(BLOCK_TABLE *btp,
u_int64_t block_translation_address_on_disk,
u_int64_t block_translation_size_on_disk,
unsigned char *buffer) {
lock_for_blocktable();
BLOCK_TABLE bt;
XMALLOC(bt);
blocktable_lock_init(bt);
bt->free_blocks = free_blocks;
bt->unused_blocks = unused_blocks;
......@@ -455,7 +457,6 @@ toku_blocktable_create(BLOCK_TABLE *btp,
// printf("%s:%d translated_blocknum_limit=%ld, block_translation_address_on_disk=%ld\n", __FILE__, __LINE__, bt->translated_blocknum_limit, bt->block_translation_address_on_disk);
*btp = bt;
unlock_for_blocktable();
}
void
......
......@@ -11,9 +11,6 @@ struct block_translation_pair {
DISKOFF size; // set to 0xFFFFFFFFFFFFFFFF for free
};
void toku_blocktable_lock_init(void);
void toku_blocktable_lock_destroy(void);
void toku_block_realloc(BLOCK_TABLE bt, BLOCKNUM b, u_int64_t size, u_int64_t *offset, int *dirty);
void toku_block_alloc(BLOCK_TABLE bt, u_int64_t size, u_int64_t *offset);
void toku_block_free(BLOCK_TABLE bt, u_int64_t offset);
......@@ -50,8 +47,8 @@ u_int64_t toku_block_get_translated_blocknum_limit(BLOCK_TABLE bt);
void toku_block_memcpy_translation_table(BLOCK_TABLE bt, size_t n, void *p);
//Unlocked/multi ops
void toku_block_lock_for_multiple_operations(void);
void toku_block_unlock_for_multiple_operations(void);
void toku_block_lock_for_multiple_operations(BLOCK_TABLE bt);
void toku_block_unlock_for_multiple_operations(BLOCK_TABLE bt);
void toku_block_realloc_translation_unlocked(BLOCK_TABLE bt);
void toku_block_wbuf_free_blocks_unlocked(BLOCK_TABLE bt, struct wbuf *wbuf);
......
......@@ -731,7 +731,7 @@ int toku_serialize_brt_header_to (int fd, struct brt_header *h) {
int rr = 0;
if (h->panic) return h->panic;
lock_for_pwrite();
toku_block_lock_for_multiple_operations();
toku_block_lock_for_multiple_operations(h->blocktable);
struct wbuf w_main;
unsigned int size_main = toku_serialize_brt_header_size (h);
{
......@@ -750,7 +750,7 @@ int toku_serialize_brt_header_to (int fd, struct brt_header *h) {
&size_translation, &address_translation);
size_translation = w_translation.size;
}
toku_block_unlock_for_multiple_operations();
toku_block_unlock_for_multiple_operations(h->blocktable);
{
//Actual Write main header
ssize_t nwrote;
......
......@@ -4426,7 +4426,6 @@ static void toku_brt_lock_init(void) {
toku_pwrite_lock_init();
toku_logger_lock_init();
toku_graceful_lock_init();
toku_blocktable_lock_init();
toku_leaflock_init();
}
......@@ -4434,7 +4433,6 @@ static void toku_brt_lock_destroy(void) {
toku_pwrite_lock_destroy();
toku_logger_lock_destroy();
toku_graceful_lock_destroy();
toku_blocktable_lock_destroy();
toku_leaflock_destroy();
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment