Commit 8e25de39 authored by Yoni Fogel's avatar Yoni Fogel

Addresses #1510 Merge 1510 up to and including [11022]

Fixes windows build issues
Add checkpoint-safe fast truncate

git-svn-id: file:///svn/toku/tokudb@11023 c7de825b-a66e-492c-adef-691d508d4ae1
parent 1401e27d
...@@ -128,9 +128,7 @@ toku_block_translation_note_start_checkpoint_unlocked (BLOCK_TABLE bt) { ...@@ -128,9 +128,7 @@ toku_block_translation_note_start_checkpoint_unlocked (BLOCK_TABLE bt) {
// Copy current translation to inprogress translation. // Copy current translation to inprogress translation.
assert(bt->inprogress.block_translation == NULL); assert(bt->inprogress.block_translation == NULL);
copy_translation(&bt->inprogress, &bt->current, TRANSLATION_INPROGRESS); copy_translation(&bt->inprogress, &bt->current, TRANSLATION_INPROGRESS);
// don't yet have a block allocated for the inprogress btt.
bt->inprogress.block_translation[RESERVED_BLOCKNUM_TRANSLATION].size = 0;
bt->inprogress.block_translation[RESERVED_BLOCKNUM_TRANSLATION].u.diskoff = diskoff_unused;
bt->checkpoint_skipped = FALSE; bt->checkpoint_skipped = FALSE;
bt->checkpoint_failed = FALSE; bt->checkpoint_failed = FALSE;
} }
...@@ -477,8 +475,8 @@ maybe_expand_translation (struct translation *t) { ...@@ -477,8 +475,8 @@ maybe_expand_translation (struct translation *t) {
} }
void void
toku_allocate_blocknum(BLOCK_TABLE bt, BLOCKNUM *res, struct brt_header * h) { toku_allocate_blocknum_unlocked(BLOCK_TABLE bt, BLOCKNUM *res, struct brt_header * h) {
lock_for_blocktable(bt); assert(bt->is_locked);
BLOCKNUM result; BLOCKNUM result;
struct translation * t = &bt->current; struct translation * t = &bt->current;
if (t->blocknum_freelist_head.b == freelist_null.b) { if (t->blocknum_freelist_head.b == freelist_null.b) {
...@@ -501,14 +499,20 @@ toku_allocate_blocknum(BLOCK_TABLE bt, BLOCKNUM *res, struct brt_header * h) { ...@@ -501,14 +499,20 @@ toku_allocate_blocknum(BLOCK_TABLE bt, BLOCKNUM *res, struct brt_header * h) {
verify_valid_freeable_blocknum(t, result); verify_valid_freeable_blocknum(t, result);
*res = result; *res = result;
brtheader_set_dirty(h, FALSE); brtheader_set_dirty(h, FALSE);
unlock_for_blocktable(bt);
} }
void void
toku_free_blocknum(BLOCK_TABLE bt, BLOCKNUM *bp, struct brt_header * h) { toku_allocate_blocknum(BLOCK_TABLE bt, BLOCKNUM *res, struct brt_header * h) {
lock_for_blocktable(bt);
toku_allocate_blocknum_unlocked(bt, res, h);
unlock_for_blocktable(bt);
}
static void
free_blocknum_unlocked(BLOCK_TABLE bt, BLOCKNUM *bp, struct brt_header * h) {
// Effect: Free a blocknum. // Effect: Free a blocknum.
// If the blocknum holds the only reference to a block on disk, free that block // If the blocknum holds the only reference to a block on disk, free that block
lock_for_blocktable(bt); assert(bt->is_locked);
BLOCKNUM b = *bp; BLOCKNUM b = *bp;
bp->b = 0; //Remove caller's reference. bp->b = 0; //Remove caller's reference.
struct translation *t = &bt->current; struct translation *t = &bt->current;
...@@ -533,8 +537,29 @@ PRNTF("free_blocknum_free", b.b, old_pair.size, old_pair.u.diskoff, bt); ...@@ -533,8 +537,29 @@ PRNTF("free_blocknum_free", b.b, old_pair.size, old_pair.u.diskoff, bt);
} }
else assert(old_pair.size==0 && old_pair.u.diskoff == diskoff_unused); else assert(old_pair.size==0 && old_pair.u.diskoff == diskoff_unused);
brtheader_set_dirty(h, FALSE); brtheader_set_dirty(h, FALSE);
}
void
toku_free_blocknum(BLOCK_TABLE bt, BLOCKNUM *bp, struct brt_header * h) {
lock_for_blocktable(bt);
free_blocknum_unlocked(bt, bp, h);
unlock_for_blocktable(bt); unlock_for_blocktable(bt);
} }
void
toku_block_translation_truncate_unlocked(BLOCK_TABLE bt, struct brt_header *h) {
assert(bt->is_locked);
brtheader_set_dirty(h, FALSE);
//Free all used blocks except descriptor
BLOCKNUM keep_only = h->descriptor.b;
struct translation *t = &bt->current;
int64_t i;
for (i=RESERVED_BLOCKNUMS; i<t->smallest_never_used_blocknum.b; i++) {
if (i==keep_only.b) continue;
BLOCKNUM b = make_blocknum(i);
if (t->block_translation[i].size > 0) free_blocknum_unlocked(bt, &b, h);
}
}
//Verify there are no free blocks. //Verify there are no free blocks.
void void
......
...@@ -25,9 +25,11 @@ void toku_block_translation_note_start_checkpoint_unlocked(BLOCK_TABLE bt); ...@@ -25,9 +25,11 @@ void toku_block_translation_note_start_checkpoint_unlocked(BLOCK_TABLE bt);
void toku_block_translation_note_end_checkpoint(BLOCK_TABLE bt); void toku_block_translation_note_end_checkpoint(BLOCK_TABLE bt);
void toku_block_translation_note_failed_checkpoint(BLOCK_TABLE bt); void toku_block_translation_note_failed_checkpoint(BLOCK_TABLE bt);
void toku_block_translation_note_skipped_checkpoint(BLOCK_TABLE bt); void toku_block_translation_note_skipped_checkpoint(BLOCK_TABLE bt);
void toku_block_translation_truncate_unlocked(BLOCK_TABLE bt, struct brt_header *h);
//Blocknums //Blocknums
void toku_allocate_blocknum(BLOCK_TABLE bt, BLOCKNUM *res, struct brt_header * h); void toku_allocate_blocknum(BLOCK_TABLE bt, BLOCKNUM *res, struct brt_header * h);
void toku_allocate_blocknum_unlocked(BLOCK_TABLE bt, BLOCKNUM *res, struct brt_header * h);
void toku_free_blocknum(BLOCK_TABLE bt, BLOCKNUM *b, struct brt_header * h); void toku_free_blocknum(BLOCK_TABLE bt, BLOCKNUM *b, struct brt_header * h);
void toku_verify_blocknum_allocated(BLOCK_TABLE bt, BLOCKNUM b); void toku_verify_blocknum_allocated(BLOCK_TABLE bt, BLOCKNUM b);
void toku_block_verify_no_free_blocknums(BLOCK_TABLE bt); void toku_block_verify_no_free_blocknums(BLOCK_TABLE bt);
......
...@@ -593,23 +593,14 @@ void toku_brtnode_free (BRTNODE *nodep) { ...@@ -593,23 +593,14 @@ void toku_brtnode_free (BRTNODE *nodep) {
*nodep=0; *nodep=0;
} }
static void
brtheader_init(struct brt_header *h) {
memset(h, 0, sizeof *h);
}
static void static void
brtheader_partial_destroy(struct brt_header *h) { brtheader_partial_destroy(struct brt_header *h) {
if (h->type == BRTHEADER_CHECKPOINT_INPROGRESS) { if (h->type == BRTHEADER_CHECKPOINT_INPROGRESS) {
//header and checkpoint_header have same Blocktable pointer
//cannot destroy since it is still in use by CURRENT
h->blocktable = NULL;
//Share fifo till #1603 //Share fifo till #1603
h->fifo = NULL; h->fifo = NULL;
} }
else { else {
assert(h->type == BRTHEADER_CURRENT); assert(h->type == BRTHEADER_CURRENT);
toku_blocktable_destroy(&h->blocktable);
toku_fifo_free(&h->fifo); //TODO: #1603 delete toku_fifo_free(&h->fifo); //TODO: #1603 delete
} }
} }
...@@ -619,18 +610,23 @@ brtheader_destroy(struct brt_header *h) { ...@@ -619,18 +610,23 @@ brtheader_destroy(struct brt_header *h) {
if (!h->panic) assert(!h->checkpoint_header); if (!h->panic) assert(!h->checkpoint_header);
brtheader_partial_destroy(h); brtheader_partial_destroy(h);
if (h->type == BRTHEADER_CURRENT && h->descriptor.sdbt.data) toku_free(h->descriptor.sdbt.data);
//header and checkpoint_header have same Blocktable pointer
//cannot destroy since it is still in use by CURRENT
if (h->type == BRTHEADER_CHECKPOINT_INPROGRESS) h->blocktable = NULL;
else {
assert(h->type == BRTHEADER_CURRENT);
toku_blocktable_destroy(&h->blocktable);
if (h->descriptor.sdbt.data) toku_free(h->descriptor.sdbt.data);
}
} }
static int static int
brtheader_alloc(struct brt_header **hh) { brtheader_alloc(struct brt_header **hh) {
int r; int r = 0;
if ((MALLOC(*hh))==0) { if ((CALLOC(*hh))==0) {
assert(errno==ENOMEM); assert(errno==ENOMEM);
r = ENOMEM; r = ENOMEM;
} else {
brtheader_init(*hh);
r = 0;
} }
return r; return r;
} }
...@@ -2862,18 +2858,11 @@ static int brt_open_file(BRT brt, const char *fname, int is_create, int *fdp, BO ...@@ -2862,18 +2858,11 @@ static int brt_open_file(BRT brt, const char *fname, int is_create, int *fdp, BO
} }
static int static int
brt_init_header (BRT t) { brt_init_header_partial (BRT t) {
int r; int r;
t->h->type = BRTHEADER_CURRENT;
t->h->checkpoint_header = NULL;
t->h->flags = t->flags; t->h->flags = t->flags;
t->h->nodesize=t->nodesize; t->h->nodesize=t->nodesize;
toku_blocktable_create_new(&t->h->blocktable);
BLOCKNUM root;
//Assign blocknum for root block, also dirty the header
toku_allocate_blocknum(t->h->blocktable, &root, t->h);
t->h->root = root;
compute_and_fill_remembered_hash(t); compute_and_fill_remembered_hash(t);
toku_fifo_create(&t->h->fifo); toku_fifo_create(&t->h->fifo);
...@@ -2902,6 +2891,7 @@ brt_init_header (BRT t) { ...@@ -2902,6 +2891,7 @@ brt_init_header (BRT t) {
//if ((r=toku_log_fheader(toku_txn_logger(txn), (LSN*)0, 0, toku_txn_get_txnid(txn), toku_cachefile_filenum(t->cf), lh))) { return r; } //if ((r=toku_log_fheader(toku_txn_logger(txn), (LSN*)0, 0, toku_txn_get_txnid(txn), toku_cachefile_filenum(t->cf), lh))) { return r; }
} }
#endif #endif
BLOCKNUM root = t->h->root;
if ((r=setup_initial_brt_root_node(t, root))!=0) { return r; } if ((r=setup_initial_brt_root_node(t, root))!=0) { return r; }
//printf("%s:%d putting %p (%d)\n", __FILE__, __LINE__, t->h, 0); //printf("%s:%d putting %p (%d)\n", __FILE__, __LINE__, t->h, 0);
toku_block_verify_no_free_blocknums(t->h->blocktable); toku_block_verify_no_free_blocknums(t->h->blocktable);
...@@ -2910,6 +2900,21 @@ brt_init_header (BRT t) { ...@@ -2910,6 +2900,21 @@ brt_init_header (BRT t) {
return r; return r;
} }
static int
brt_init_header (BRT t) {
t->h->type = BRTHEADER_CURRENT;
t->h->checkpoint_header = NULL;
toku_blocktable_create_new(&t->h->blocktable);
BLOCKNUM root;
//Assign blocknum for root block, also dirty the header
toku_allocate_blocknum(t->h->blocktable, &root, t->h);
t->h->root = root;
int r = brt_init_header_partial(t);
return r;
}
// allocate and initialize a brt header. // allocate and initialize a brt header.
// t->cf is not set to anything. // t->cf is not set to anything.
int toku_brt_alloc_init_header(BRT t) { int toku_brt_alloc_init_header(BRT t) {
...@@ -4689,48 +4694,25 @@ int toku_dump_brt (FILE *f, BRT brt) { ...@@ -4689,48 +4694,25 @@ int toku_dump_brt (FILE *f, BRT brt) {
int toku_brt_truncate (BRT brt) { int toku_brt_truncate (BRT brt) {
int r; int r;
//save information about the descriptor.
DISKOFF descriptor_size;
DISKOFF descriptor_location_ignore;
BLOCKNUM b = brt->h->descriptor.b;
if (b.b > 0) {
toku_translate_blocknum_to_offset_size(brt->h->blocktable, b,
&descriptor_location_ignore,
&descriptor_size);
}
// flush the cached tree blocks // flush the cached tree blocks
r = toku_brt_flush(brt); r = toku_brt_flush(brt);
if (r != 0)
return r;
// truncate the underlying file
r = toku_cachefile_truncate0(brt->cf);
if (r != 0)
return r;
// TODO log the truncate? // TODO log the truncate?
// reinit the header toku_block_lock_for_multiple_operations(brt->h->blocktable);
brtheader_partial_destroy(brt->h); if (r==0) {
r = brt_init_header(brt); // reinit the header
toku_block_translation_truncate_unlocked(brt->h->blocktable, brt->h);
brt->h->descriptor.b.b = 0; //Assign blocknum for root block, also dirty the header
if (b.b > 0) { //There was a descriptor. toku_allocate_blocknum_unlocked(brt->h->blocktable, &brt->h->root, brt->h);
//Write the db descriptor to file
toku_allocate_blocknum(brt->h->blocktable, &brt->h->descriptor.b, brt->h); brtheader_partial_destroy(brt->h);
DISKOFF offset; r = brt_init_header_partial(brt);
//4 for checksum
toku_blocknum_realloc_on_disk(brt->h->blocktable, brt->h->descriptor.b,
descriptor_size, &offset,
brt->h, FALSE);
DBT dbt_descriptor;
toku_fill_dbt(&dbt_descriptor, brt->h->descriptor.sdbt.data, brt->h->descriptor.sdbt.len);
r = toku_serialize_descriptor_contents_to_fd(toku_cachefile_fd(brt->cf),
&dbt_descriptor, offset);
assert(r==0);
} }
toku_block_unlock_for_multiple_operations(brt->h->blocktable);
return r; return r;
} }
......
...@@ -91,14 +91,26 @@ struct ctpair { ...@@ -91,14 +91,26 @@ struct ctpair {
struct rwlock rwlock; // multiple get's, single writer struct rwlock rwlock; // multiple get's, single writer
struct workqueue *cq; // writers sometimes return ctpair's using this queue struct workqueue *cq; // writers sometimes return ctpair's using this queue
struct workitem asyncwork; // work item for the worker threads struct workitem asyncwork; // work item for the worker threads
u_int32_t refs; //References that prevent descruction
int already_removed; //If a pair is removed from the cachetable, but cannot be freed because refs>0, this is set.
}; };
static void * const zero_value = 0; static void * const zero_value = 0;
static int const zero_size = 0; static int const zero_size = 0;
static inline void
ctpair_add_ref(PAIR p) {
assert(!p->already_removed);
p->refs++;
}
static inline void ctpair_destroy(PAIR p) { static inline void ctpair_destroy(PAIR p) {
rwlock_destroy(&p->rwlock); assert(p->refs>0);
toku_free(p); p->refs--;
if (p->refs==0) {
rwlock_destroy(&p->rwlock);
toku_free(p);
}
} }
// The cachetable is as close to an ENV as we get. // The cachetable is as close to an ENV as we get.
...@@ -519,6 +531,7 @@ static void cachetable_remove_pair (CACHETABLE ct, PAIR p) { ...@@ -519,6 +531,7 @@ static void cachetable_remove_pair (CACHETABLE ct, PAIR p) {
ct->table[h] = remove_from_hash_chain (p, ct->table[h]); ct->table[h] = remove_from_hash_chain (p, ct->table[h]);
} }
ct->size_current -= p->size; assert(ct->size_current >= 0); ct->size_current -= p->size; assert(ct->size_current >= 0);
p->already_removed = TRUE;
} }
// Maybe remove a pair from the cachetable and free it, depending on whether // Maybe remove a pair from the cachetable and free it, depending on whether
...@@ -756,6 +769,7 @@ static PAIR cachetable_insert_at(CACHETABLE ct, ...@@ -756,6 +769,7 @@ static PAIR cachetable_insert_at(CACHETABLE ct,
TAGMALLOC(PAIR, p); TAGMALLOC(PAIR, p);
assert(p); assert(p);
memset(p, 0, sizeof *p); memset(p, 0, sizeof *p);
ctpair_add_ref(p);
p->cachefile = cachefile; p->cachefile = cachefile;
p->key = key; p->key = key;
p->value = value; p->value = value;
...@@ -854,7 +868,12 @@ write_pair_for_checkpoint (CACHETABLE ct, PAIR p) ...@@ -854,7 +868,12 @@ write_pair_for_checkpoint (CACHETABLE ct, PAIR p)
// this is essentially a flush_and_maybe_remove except that // this is essentially a flush_and_maybe_remove except that
// we already have p->rwlock and we just do the write in our own thread. // we already have p->rwlock and we just do the write in our own thread.
assert(p->dirty); // it must be dirty if its pending. assert(p->dirty); // it must be dirty if its pending.
p->cq = 0; // I don't want any delay, just do it. #if 0
// TODO: Determine if this is legal, and/or required. Commented out for now
// I believe if it has a queue, removing it it will break whatever's waiting for it.
// p->cq = 0; // I don't want any delay, just do it.
#endif
p->state = CTPAIR_WRITING; //most of this code should run only if NOT ALREADY CTPAIR_WRITING p->state = CTPAIR_WRITING; //most of this code should run only if NOT ALREADY CTPAIR_WRITING
assert(ct->size_writing>=0); assert(ct->size_writing>=0);
ct->size_writing += p->size; ct->size_writing += p->size;
...@@ -1168,17 +1187,48 @@ static int cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) { ...@@ -1168,17 +1187,48 @@ static int cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) {
unsigned i; unsigned i;
//THIS LOOP IS NOT THREAD SAFE! Has race condition since flush_and_maybe_remove releases cachetable lock //THIS LOOP IS NOT THREAD SAFE! Has race condition since flush_and_maybe_remove releases cachetable lock
unsigned num_pairs = 0;
unsigned list_size = 16;
PAIR *list = NULL;
XMALLOC_N(list_size, list);
//It is not safe to loop through the table (and hash chains) if you can
//release the cachetable lock at any point within.
//Make a list of pairs that belong to this cachefile.
//Add a reference to them.
for (i=0; i < ct->table_size; i++) { for (i=0; i < ct->table_size; i++) {
PAIR p; PAIR p;
for (p = ct->table[i]; p; p = p->hash_chain) { for (p = ct->table[i]; p; p = p->hash_chain) {
if (cf == 0 || p->cachefile==cf) { if (cf == 0 || p->cachefile==cf) {
nfound++; ctpair_add_ref(p);
p->cq = &cq; list[num_pairs] = p;
if (p->state == CTPAIR_IDLE) num_pairs++;
flush_and_maybe_remove(ct, p, TRUE); if (num_pairs == list_size) {
} list_size *= 2;
XREALLOC_N(list_size, list);
}
}
} }
} }
//Loop through the list.
//It is safe to access the memory (will not have been freed).
//If 'already_removed' is set, then we should release our reference
//and go to the next entry.
for (i=0; i < num_pairs; i++) {
PAIR p = list[i];
if (p->already_removed) {
ctpair_destroy(p); //Release our reference
continue;
}
assert(cf == 0 || p->cachefile==cf);
nfound++;
p->cq = &cq;
if (p->state == CTPAIR_IDLE)
flush_and_maybe_remove(ct, p, TRUE);
ctpair_destroy(p); //Release our reference
}
toku_free(list);
// wait for all of the pairs in the work queue to complete // wait for all of the pairs in the work queue to complete
for (i=0; i<nfound; i++) { for (i=0; i<nfound; i++) {
......
...@@ -53,8 +53,8 @@ ...@@ -53,8 +53,8 @@
#include <stdio.h> #include <stdio.h>
#include "brttypes.h"
#include "toku_portability.h" #include "toku_portability.h"
#include "brttypes.h"
#include "cachetable.h" #include "cachetable.h"
#include "checkpoint.h" #include "checkpoint.h"
......
...@@ -18,7 +18,7 @@ toku_pthread_rwlock_destroy(toku_pthread_rwlock_t *rwlock) { ...@@ -18,7 +18,7 @@ toku_pthread_rwlock_destroy(toku_pthread_rwlock_t *rwlock) {
rwlock->initialized = FALSE; rwlock->initialized = FALSE;
//Windows does not have a cleanup function for SRWLocks. //Windows does not have a cleanup function for SRWLocks.
//You just stop using them. //You just stop using them.
//return 0; return 0;
} }
int int
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment