Commit 8c34dda4 authored by Yoni Fogel's avatar Yoni Fogel

closes[t:2440] Merge #2440 to main.

closes[t:2512] merge #2512 to main
Delete dev branch

git-svn-id: file:///svn/toku/tokudb@19439 c7de825b-a66e-492c-adef-691d508d4ae1
parent 02f3b827
......@@ -435,6 +435,7 @@ deserialize_brtheader_10 (int fd, struct rbuf *rb, struct brt_header **brth) {
h->panic_string = 0;
toku_list_init(&h->live_brts);
toku_list_init(&h->zombie_brts);
toku_list_init(&h->checkpoint_before_commit_link);
//version MUST be in network order on disk regardless of disk order
h->layout_version = rbuf_network_int(&rc);
assert(h->layout_version==BRT_LAYOUT_VERSION_10);
......
......@@ -180,8 +180,10 @@ struct brt_header {
// If a transaction locked the BRT when it was empty, which transaction? (Only the latest one matters)
// 0 if no such transaction
TXNID txnid_that_created_or_locked_when_empty;
TXNID txnid_that_suppressed_recovery_logs;
struct toku_list live_brts;
struct toku_list zombie_brts;
struct toku_list checkpoint_before_commit_link;
};
struct brt {
......
......@@ -1372,6 +1372,7 @@ deserialize_brtheader (int fd, struct rbuf *rb, struct brt_header **brth) {
h->panic_string = 0;
toku_list_init(&h->live_brts);
toku_list_init(&h->zombie_brts);
toku_list_init(&h->checkpoint_before_commit_link);
//version MUST be in network order on disk regardless of disk order
h->layout_version = rbuf_network_int(&rc);
//TODO: #1924
......
......@@ -2686,16 +2686,22 @@ toku_brt_log_put_multiple (TOKUTXN txn, BRT src_brt, BRT *brts, int num_brts, co
TOKULOGGER logger = toku_txn_logger(txn);
if (logger) {
FILENUM fnums[num_brts];
FILENUMS filenums = {.num = num_brts, .filenums = fnums};
int i;
int num_unsuppressed_brts = 0;
for (i = 0; i < num_brts; i++) {
fnums[i] = toku_cachefile_filenum(brts[i]->cf);
if (brts[i]->h->txnid_that_suppressed_recovery_logs == TXNID_NONE) {
//Logging not suppressed for this brt.
fnums[num_unsuppressed_brts++] = toku_cachefile_filenum(brts[i]->cf);
}
}
if (num_unsuppressed_brts) {
FILENUMS filenums = {.num = num_unsuppressed_brts, .filenums = fnums};
BYTESTRING keybs = {.len=key->size, .data=key->data};
BYTESTRING valbs = {.len=val->size, .data=val->data};
TXNID xid = toku_txn_get_txnid(txn);
FILENUM src_filenum = src_brt ? toku_cachefile_filenum(src_brt->cf) : FILENUM_NONE;
r = toku_log_enq_insert_multiple(logger, (LSN*)0, 0, src_filenum, filenums, xid, keybs, valbs);
}
BYTESTRING keybs = {.len=key->size, .data=key->data};
BYTESTRING valbs = {.len=val->size, .data=val->data};
TXNID xid = toku_txn_get_txnid(txn);
FILENUM src_filenum = src_brt ? toku_cachefile_filenum(src_brt->cf) : FILENUM_NONE;
r = toku_log_enq_insert_multiple(logger, (LSN*)0, 0, src_filenum, filenums, xid, keybs, valbs);
}
return r;
}
......@@ -2725,7 +2731,8 @@ int toku_brt_maybe_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn, BOOL oplsn_
message_xids = xids_get_root_xids();
}
TOKULOGGER logger = toku_txn_logger(txn);
if (do_logging && logger) {
if (do_logging && logger &&
brt->h->txnid_that_suppressed_recovery_logs == TXNID_NONE) {
BYTESTRING keybs = {.len=key->size, .data=key->data};
BYTESTRING valbs = {.len=val->size, .data=val->data};
if (type == BRT_INSERT) {
......@@ -2759,16 +2766,22 @@ toku_brt_log_del_multiple (TOKUTXN txn, BRT src_brt, BRT *brts, int num_brts, co
TOKULOGGER logger = toku_txn_logger(txn);
if (logger) {
FILENUM fnums[num_brts];
FILENUMS filenums = {.num = num_brts, .filenums = fnums};
int i;
int num_unsuppressed_brts = 0;
for (i = 0; i < num_brts; i++) {
fnums[i] = toku_cachefile_filenum(brts[i]->cf);
if (brts[i]->h->txnid_that_suppressed_recovery_logs == TXNID_NONE) {
//Logging not suppressed for this brt.
fnums[num_unsuppressed_brts++] = toku_cachefile_filenum(brts[i]->cf);
}
}
if (num_unsuppressed_brts) {
FILENUMS filenums = {.num = num_unsuppressed_brts, .filenums = fnums};
BYTESTRING keybs = {.len=key->size, .data=key->data};
BYTESTRING valbs = {.len=val->size, .data=val->data};
TXNID xid = toku_txn_get_txnid(txn);
FILENUM src_filenum = src_brt ? toku_cachefile_filenum(src_brt->cf) : FILENUM_NONE;
r = toku_log_enq_delete_multiple(logger, (LSN*)0, 0, src_filenum, filenums, xid, keybs, valbs);
}
BYTESTRING keybs = {.len=key->size, .data=key->data};
BYTESTRING valbs = {.len=val->size, .data=val->data};
TXNID xid = toku_txn_get_txnid(txn);
FILENUM src_filenum = src_brt ? toku_cachefile_filenum(src_brt->cf) : FILENUM_NONE;
r = toku_log_enq_delete_multiple(logger, (LSN*)0, 0, src_filenum, filenums, xid, keybs, valbs);
}
return r;
}
......@@ -2791,7 +2804,8 @@ int toku_brt_maybe_delete(BRT brt, DBT *key, TOKUTXN txn, BOOL oplsn_valid, LSN
message_xids = xids_get_root_xids();
}
TOKULOGGER logger = toku_txn_logger(txn);
if (do_logging && logger) {
if (do_logging && logger &&
brt->h->txnid_that_suppressed_recovery_logs == TXNID_NONE) {
BYTESTRING keybs = {.len=key->size, .data=key->data};
r = toku_log_enq_delete_any(logger, (LSN*)0, 0, toku_cachefile_filenum(brt->cf), xid, keybs);
if (r!=0) return r;
......@@ -3022,6 +3036,7 @@ brt_init_header (BRT t) {
toku_list_init(&t->h->live_brts);
toku_list_init(&t->h->zombie_brts);
toku_list_init(&t->h->checkpoint_before_commit_link);
int r = brt_init_header_partial(t);
if (r==0) toku_block_verify_no_free_blocknums(t->h->blocktable);
return r;
......@@ -3782,7 +3797,7 @@ brtheader_note_unpin_by_checkpoint (CACHEFILE UU(cachefile), void *header_v)
// Write checkpoint-in-progress versions of header and translation to disk (really to OS internal buffer).
// Must have access to fd (protected)
int
toku_brtheader_checkpoint (CACHEFILE UU(cachefile), int fd, void *header_v)
toku_brtheader_checkpoint (CACHEFILE cf, int fd, void *header_v)
{
struct brt_header *h = header_v;
struct brt_header *ch = h->checkpoint_header;
......@@ -3794,6 +3809,11 @@ toku_brtheader_checkpoint (CACHEFILE UU(cachefile), int fd, void *header_v)
if (ch->panic!=0) goto handle_error;
assert(ch->type == BRTHEADER_CHECKPOINT_INPROGRESS);
if (ch->dirty) { // this is only place this bit is tested (in checkpoint_header)
TOKULOGGER logger = toku_cachefile_logger(cf);
if (logger) {
r = toku_logger_fsync_if_lsn_not_fsynced(logger, ch->checkpoint_lsn);
if (r!=0) goto handle_error;
}
{
ch->checkpoint_count++;
// write translation and header to disk (or at least to OS internal buffer)
......@@ -5338,7 +5358,7 @@ int toku_brt_maybe_delete_both(BRT brt, DBT *key, DBT *val, TOKUTXN txn, BOOL op
message_xids = xids_get_root_xids();
}
TOKULOGGER logger = toku_txn_logger(txn);
if (logger) {
if (logger && brt->h->txnid_that_suppressed_recovery_logs == TXNID_NONE) {
BYTESTRING keybs = {.len=key->size, .data=key->data};
BYTESTRING valbs = {.len=val->size, .data=val->data};
r = toku_log_enq_delete_both(logger, (LSN*)0, 0, toku_cachefile_filenum(brt->cf), xid, keybs, valbs);
......@@ -5693,25 +5713,18 @@ toku_brt_is_empty (BRT brt) {
return is_empty;
}
int
toku_brt_note_table_lock (BRT brt, TOKUTXN txn, BOOL ignore_not_empty) {
int r = 0;
if (brt->h->txnid_that_created_or_locked_when_empty != toku_txn_get_txnid(txn) &&
(ignore_not_empty || toku_brt_is_empty(brt)) &&
brt->h->txnid_that_created_or_locked_when_empty == TXNID_NONE)
{
brt->h->txnid_that_created_or_locked_when_empty = toku_txn_get_txnid(txn);
r = toku_txn_note_brt(txn, brt);
assert(r==0);
r = toku_logger_save_rollback_tablelock_on_empty_table(txn, toku_cachefile_filenum(brt->cf));
if (r==0) {
TOKULOGGER logger = toku_txn_logger(txn);
TXNID xid = toku_txn_get_txnid(txn);
r = toku_log_tablelock_on_empty_table(logger, (LSN*)NULL,
0, toku_cachefile_filenum(brt->cf), xid);
}
}
return r;
//Suppress both rollback and recovery logs.
void
toku_brt_suppress_recovery_logs (BRT brt, TOKUTXN txn) {
assert(brt->h->txnid_that_created_or_locked_when_empty == toku_txn_get_txnid(txn));
assert(brt->h->txnid_that_suppressed_recovery_logs == TXNID_NONE);
brt->h->txnid_that_suppressed_recovery_logs = toku_txn_get_txnid(txn);
toku_list_push(&txn->checkpoint_before_commit, &brt->h->checkpoint_before_commit_link);
}
BOOL
toku_brt_is_recovery_logging_suppressed (BRT brt) {
return brt->h->txnid_that_suppressed_recovery_logs != TXNID_NONE;
}
LSN toku_brt_checkpoint_lsn(BRT brt) {
......
......@@ -206,8 +206,11 @@ void toku_maybe_truncate_cachefile (CACHEFILE cf, int fd, u_int64_t size_used);
int maybe_preallocate_in_file (int fd, u_int64_t size);
// Effect: If file size is less than SIZE, make it bigger by either doubling it or growing by 16MB whichever is less.
int toku_brt_note_table_lock (BRT brt, TOKUTXN txn, BOOL ignore_not_empty);
// Effect: Record the fact that the BRT has a table lock (and thus no other txn will modify it until this txn completes. As a result, we can limit the amount of information in the rollback data structure.
void toku_brt_suppress_recovery_logs (BRT brt, TOKUTXN txn);
// Effect: suppresses recovery logs
// Requires: this is a (target) redirected brt
// implies: txnid_that_created_or_locked_when_empty matches txn
// implies: toku_txn_note_brt(brt, txn) has been called
int toku_brt_zombie_needed (BRT brt);
......@@ -217,6 +220,7 @@ BOOL toku_brt_is_empty (BRT brt);
double get_tdiff(void) __attribute__((__visibility__("default")));
BOOL toku_brt_is_recovery_logging_suppressed (BRT);
//TODO: #1485 once we have multiple main threads, restore this code, analyze performance.
#ifndef TOKU_MULTIPLE_MAIN_THREADS
#define TOKU_MULTIPLE_MAIN_THREADS 0
......
......@@ -201,6 +201,13 @@ static inline void cachetable_wait_write(CACHETABLE ct) {
}
}
enum cachefile_checkpoint_state {
CS_INVALID = 0,
CS_NOT_IN_PROGRESS,
CS_CALLED_BEGIN_CHECKPOINT,
CS_CALLED_CHECKPOINT
};
struct cachefile {
CACHEFILE next;
CACHEFILE next_in_checkpoint;
......@@ -233,6 +240,11 @@ struct cachefile {
toku_pthread_cond_t openfd_wait; // openfd must wait until file is fully closed (purged from cachetable) if file is opened and closed simultaneously
toku_pthread_cond_t closefd_wait; // toku_cachefile_of_iname_and_add_reference() must wait until file is fully closed (purged from cachetable) if run while file is being closed.
u_int32_t closefd_waiting; // Number of threads waiting on closefd_wait (0 or 1, error otherwise).
struct rwlock checkpoint_lock; //protects checkpoint callback functions
//acts as fast mutex by only using 'write-lock'
LSN most_recent_global_checkpoint_that_finished_early;
LSN for_local_checkpoint;
enum cachefile_checkpoint_state checkpoint_state; //Protected by checkpoint_lock
};
static int
......@@ -567,6 +579,11 @@ int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd
ct->cachefiles = newcf;
rwlock_init(&newcf->fdlock);
rwlock_init(&newcf->checkpoint_lock);
newcf->most_recent_global_checkpoint_that_finished_early = ZERO_LSN;
newcf->for_local_checkpoint = ZERO_LSN;
newcf->checkpoint_state = CS_NOT_IN_PROGRESS;
r = toku_pthread_cond_init(&newcf->openfd_wait, NULL); assert(r == 0);
r = toku_pthread_cond_init(&newcf->closefd_wait, NULL); assert(r == 0);
*cfptr = newcf;
......@@ -794,6 +811,9 @@ int toku_cachefile_close (CACHEFILE *cfp, char **error_string, BOOL oplsn_valid,
//assert(r == 0);
rwlock_write_unlock(&cf->fdlock);
rwlock_destroy(&cf->fdlock);
rwlock_write_lock(&cf->checkpoint_lock, ct->mutex); //Just to make sure we can get it
rwlock_write_unlock(&cf->checkpoint_lock);
rwlock_destroy(&cf->checkpoint_lock);
toku_free(cf);
*cfp = NULL;
cachetable_unlock(ct);
......@@ -842,6 +862,9 @@ int toku_cachefile_close (CACHEFILE *cfp, char **error_string, BOOL oplsn_valid,
cachetable_lock(ct);
rwlock_write_unlock(&cf->fdlock);
rwlock_destroy(&cf->fdlock);
rwlock_write_lock(&cf->checkpoint_lock, ct->mutex); //Just to make sure we can get it
rwlock_write_unlock(&cf->checkpoint_lock);
rwlock_destroy(&cf->checkpoint_lock);
cachetable_unlock(ct);
r = close(cf->fd);
......@@ -1338,36 +1361,37 @@ static PAIR write_for_checkpoint_pair = NULL;
// On entry: hold the ct lock
// On exit: the node is written out
// Method: take write lock
// if still pending write out the node
// maybe write out the node
// if p->cq, put on completion queue. Else release write lock
static void
write_pair_for_checkpoint (CACHETABLE ct, PAIR p)
write_pair_for_checkpoint (CACHETABLE ct, PAIR p, BOOL write_if_dirty)
{
write_for_checkpoint_pair = p;
rwlock_write_lock(&p->rwlock, ct->mutex); // grab an exclusive lock on the pair
assert(p->state!=CTPAIR_WRITING); // if we have the write lock, no one else should be writing out the node
if (p->checkpoint_pending) {
// this is essentially a flush_and_maybe_remove except that
// we already have p->rwlock and we just do the write in our own thread.
assert(p->dirty); // it must be dirty if its pending.
p->state = CTPAIR_WRITING; //most of this code should run only if NOT ALREADY CTPAIR_WRITING
assert(ct->size_writing>=0);
ct->size_writing += p->size;
assert(ct->size_writing>=0);
p->write_me = TRUE;
p->remove_me = FALSE;
workitem_init(&p->asyncwork, NULL, p);
cachetable_write_pair(ct, p); // releases the write lock on the pair
}
else if (p->cq) {
assert(ct->size_writing>=0);
ct->size_writing += p->size; //cachetable_complete_write_pair will reduce by p->size
workitem_init(&p->asyncwork, NULL, p);
workqueue_enq(p->cq, &p->asyncwork, 1);
if (p->dirty) {
write_for_checkpoint_pair = p;
rwlock_write_lock(&p->rwlock, ct->mutex); // grab an exclusive lock on the pair
assert(p->state!=CTPAIR_WRITING); // if we have the write lock, no one else should be writing out the node
if (p->dirty && (write_if_dirty || p->checkpoint_pending)) {
// this is essentially a flush_and_maybe_remove except that
// we already have p->rwlock and we just do the write in our own thread.
p->state = CTPAIR_WRITING; //most of this code should run only if NOT ALREADY CTPAIR_WRITING
assert(ct->size_writing>=0);
ct->size_writing += p->size;
assert(ct->size_writing>=0);
p->write_me = TRUE;
p->remove_me = FALSE;
workitem_init(&p->asyncwork, NULL, p);
cachetable_write_pair(ct, p); // releases the write lock on the pair
}
else if (p->cq) {
assert(ct->size_writing>=0);
ct->size_writing += p->size; //cachetable_complete_write_pair will reduce by p->size
workitem_init(&p->asyncwork, NULL, p);
workqueue_enq(p->cq, &p->asyncwork, 1);
}
else
rwlock_write_unlock(&p->rwlock); // didn't call cachetable_write_pair so we have to unlock it ourselves.
write_for_checkpoint_pair = NULL;
}
else
rwlock_write_unlock(&p->rwlock); // didn't call cachetable_write_pair so we have to unlock it ourselves.
write_for_checkpoint_pair = NULL;
}
// for debugging
......@@ -1418,7 +1442,7 @@ int toku_cachetable_get_and_pin(CACHEFILE cachefile, CACHEKEY key, u_int32_t ful
if (p->checkpoint_pending) {
get_and_pin_footprint = 4;
write_pair_for_checkpoint(ct, p);
write_pair_for_checkpoint(ct, p, FALSE);
}
// still have the cachetable lock
// TODO: #1398 kill this hack before it multiplies further
......@@ -1755,7 +1779,7 @@ static int cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) {
unsigned i;
unsigned num_pairs = 0;
unsigned list_size = 16;
unsigned list_size = 256;
PAIR *list = NULL;
XMALLOC_N(list_size, list);
//It is not safe to loop through the table (and hash chains) if you can
......@@ -1788,8 +1812,7 @@ static int cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) {
nfound++;
p->cq = &cq;
if (p->state == CTPAIR_IDLE)
flush_and_maybe_remove(ct, p, TRUE); //TODO: 1485 If this is being removed, why is it counted in nfound?
//TODO: 1485 How are things being added to the queue?
flush_and_maybe_remove(ct, p, TRUE);
}
ctpair_destroy(p); //Release our reference
}
......@@ -1805,14 +1828,13 @@ static int cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) {
cachetable_lock(ct);
PAIR p = workitem_arg(wi);
p->cq = 0;
if (p->state == CTPAIR_READING) { //TODO: 1485 Doesn't this mean someone ELSE is holding a lock?
rwlock_write_unlock(&p->rwlock); //TODO: 1485 When did we grab a write lock? (the other person who grabbed the read (write) lock?
//Does this mean no one has a pin? since it never finished...
// POSSIBLE CAUSE
cachetable_maybe_remove_and_free_pair(ct, p); //TODO: 1485 MUST be removed. Can't be 'maybe_remove'
} else if (p->state == CTPAIR_WRITING) { //TODO: 1485 This could mean WE or SOMEONE ELSE is holding a lock, right? Can't be both.
//Someone else could have a PIN! This could be the cause.
cachetable_complete_write_pair(ct, p, TRUE); //TODO: 1485 MUST be removed. Can't be 'maybe_remove'
if (p->state == CTPAIR_READING) { //Some other thread owned the lock, but transferred ownership to the thread executing this function
rwlock_write_unlock(&p->rwlock); //Release the lock
//No one has a pin. This was being read because of a prefetch.
cachetable_maybe_remove_and_free_pair(ct, p);
} else if (p->state == CTPAIR_WRITING) { //Some other thread (or this thread) owned the lock and transferred ownership to the thread executing this function
//No one has a pin. This was written because of an eviction.
cachetable_complete_write_pair(ct, p, TRUE);
} else if (p->state == CTPAIR_INVALID) {
abort_fetch_pair(p);
} else
......@@ -1888,36 +1910,39 @@ int toku_cachetable_unpin_and_remove (CACHEFILE cachefile, CACHEKEY key) {
if (p->key.b==key.b && p->cachefile==cachefile) {
p->dirty = CACHETABLE_CLEAN; // clear the dirty bit. We're just supposed to remove it.
assert(rwlock_readers(&p->rwlock)==1);
assert(rwlock_blocked_readers(&p->rwlock)==0);
rwlock_read_unlock(&p->rwlock);
if (rwlock_blocked_writers(&p->rwlock)>0) {
//Someone (checkpoint thread) is waiting for a write lock on this
//pair.
//They are still blocked because we have not released the
//cachetable lock.
//If we freed the memory for the pair we would have dangling
//pointers. We need to let the checkpoint thread finish up with
//this pair.
assert(rwlock_blocked_writers(&p->rwlock)==1); //Only one checkpoint thread.
struct workqueue cq;
workqueue_init(&cq);
p->cq = &cq;
// If anyone is waiting on write lock, let them finish.
cachetable_unlock(ct);
while (rwlock_blocked_writers(&p->rwlock)>0) {
//Someone (one or more checkpoint threads) is waiting for a write lock
//on this pair.
//They are still blocked because we have not released the
//cachetable lock.
//If we freed the memory for the pair we would have dangling
//pointers. We need to let the checkpoint thread finish up with
//this pair.
p->cq = &cq;
// If anyone is waiting on write lock, let them finish.
cachetable_unlock(ct);
WORKITEM wi = 0;
r = workqueue_deq(&cq, &wi, 1);
//Writer is now done.
assert(r == 0);
PAIR pp = workitem_arg(wi);
assert(pp == p);
//We are holding the write lock on the pair
cachetable_lock(ct);
assert(rwlock_writers(&p->rwlock) == 1);
assert(rwlock_users(&p->rwlock) == 1);
cachetable_complete_write_pair(ct, p, TRUE);
WORKITEM wi = NULL;
r = workqueue_deq(&cq, &wi, 1);
//Writer is now done.
assert(r == 0);
PAIR pp = workitem_arg(wi);
assert(pp == p);
//We are holding the write lock on the pair
cachetable_lock(ct);
assert(rwlock_writers(&p->rwlock) == 1);
assert(rwlock_readers(&p->rwlock) == 0);
assert(rwlock_blocked_readers(&p->rwlock) == 0);
cachetable_complete_write_pair(ct, p, TRUE);
}
workqueue_destroy(&cq);
}
else {
......@@ -2104,8 +2129,12 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
for (cf = ct->cachefiles_in_checkpoint; cf; cf=cf->next_in_checkpoint) {
if (cf->begin_checkpoint_userdata) {
rwlock_prefer_read_lock(&cf->fdlock, ct->mutex);
rwlock_write_lock(&cf->checkpoint_lock, ct->mutex);
assert(cf->checkpoint_state == CS_NOT_IN_PROGRESS);
int r = cf->begin_checkpoint_userdata(cf, cf->fd, ct->lsn_of_checkpoint_in_progress, cf->userdata);
assert(r==0);
cf->checkpoint_state = CS_CALLED_BEGIN_CHECKPOINT;
rwlock_write_unlock(&cf->checkpoint_lock);
rwlock_read_unlock(&cf->fdlock);
}
}
......@@ -2138,24 +2167,34 @@ toku_cachetable_end_checkpoint(CACHETABLE ct, TOKULOGGER logger,
while ((p = ct->pending_head)!=0) {
ct->pending_head = ct->pending_head->pending_next;
pending_pairs_remove(ct, p);
write_pair_for_checkpoint(ct, p); // if still pending, clear the pending bit and write out the node
write_pair_for_checkpoint(ct, p, FALSE); // if still pending, clear the pending bit and write out the node
// Don't need to unlock and lock cachetable, because the cachetable was unlocked and locked while the flush callback ran.
}
}
assert(!ct->pending_head);
cachetable_unlock(ct);
{ // have just written data blocks, so next write the translation and header for each open dictionary
CACHEFILE cf;
//cachefiles_in_checkpoint is protected by the checkpoint_safe_lock
for (cf = ct->cachefiles_in_checkpoint; cf; cf=cf->next_in_checkpoint) {
if (cf->checkpoint_userdata) {
//Cachetable lock is not held, so use wrapper functions to get fd and lock.
int fd = toku_cachefile_get_and_pin_fd(cf);
int r = cf->checkpoint_userdata(cf, fd, cf->userdata);
toku_cachefile_unpin_fd(cf);
assert(r==0);
rwlock_prefer_read_lock(&cf->fdlock, ct->mutex);
rwlock_write_lock(&cf->checkpoint_lock, ct->mutex);
if (!logger || ct->lsn_of_checkpoint_in_progress.lsn != cf->most_recent_global_checkpoint_that_finished_early.lsn) {
assert(ct->lsn_of_checkpoint_in_progress.lsn >= cf->most_recent_global_checkpoint_that_finished_early.lsn);
cachetable_unlock(ct);
assert(cf->checkpoint_state == CS_CALLED_BEGIN_CHECKPOINT);
int r = cf->checkpoint_userdata(cf, cf->fd, cf->userdata);
assert(r==0);
cf->checkpoint_state = CS_CALLED_CHECKPOINT;
cachetable_lock(ct);
}
else {
assert(cf->checkpoint_state == CS_NOT_IN_PROGRESS);
}
rwlock_write_unlock(&cf->checkpoint_lock);
rwlock_read_unlock(&cf->fdlock);
}
}
}
......@@ -2167,15 +2206,25 @@ toku_cachetable_end_checkpoint(CACHETABLE ct, TOKULOGGER logger,
//cachefiles_in_checkpoint is protected by the checkpoint_safe_lock
for (cf = ct->cachefiles_in_checkpoint; cf; cf=cf->next_in_checkpoint) {
if (cf->end_checkpoint_userdata) {
//Cachetable lock is not held, so use wrapper functions to get fd and lock.
int fd = toku_cachefile_get_and_pin_fd(cf);
//end_checkpoint fsyncs the fd, which needs the fdlock
int r = cf->end_checkpoint_userdata(cf, fd, cf->userdata);
toku_cachefile_unpin_fd(cf);
assert(r==0);
rwlock_prefer_read_lock(&cf->fdlock, ct->mutex);
rwlock_write_lock(&cf->checkpoint_lock, ct->mutex);
if (!logger || ct->lsn_of_checkpoint_in_progress.lsn != cf->most_recent_global_checkpoint_that_finished_early.lsn) {
assert(ct->lsn_of_checkpoint_in_progress.lsn >= cf->most_recent_global_checkpoint_that_finished_early.lsn);
cachetable_unlock(ct);
//end_checkpoint fsyncs the fd, which needs the fdlock
assert(cf->checkpoint_state == CS_CALLED_CHECKPOINT);
int r = cf->end_checkpoint_userdata(cf, cf->fd, cf->userdata);
assert(r==0);
cf->checkpoint_state = CS_NOT_IN_PROGRESS;
cachetable_lock(ct);
}
assert(cf->checkpoint_state == CS_NOT_IN_PROGRESS);
rwlock_write_unlock(&cf->checkpoint_lock);
rwlock_read_unlock(&cf->fdlock);
}
}
}
cachetable_unlock(ct);
{
//Delete list of cachefiles in the checkpoint,
......@@ -2488,3 +2537,129 @@ toku_cachetable_get_fname_in_cwd(CACHETABLE ct, const char * fname_in_env) {
uint64_t toku_cachetable_get_size_limit(CACHETABLE ct) {
return ct->size_limit;
}
int
toku_cachetable_local_checkpoint_for_commit (CACHETABLE ct, TOKUTXN txn, uint32_t n, CACHEFILE cachefiles[n]) {
cachetable_lock(ct);
LSN begin_checkpoint_lsn = ZERO_LSN;
uint32_t i;
TOKULOGGER logger = txn->logger;
CACHEFILE cf;
assert(logger); //Need transaction, so there must be a logger
{
int r = toku_log_local_txn_checkpoint(logger, &begin_checkpoint_lsn, 0, txn->txnid64);
assert(r==0);
}
for (i = 0; i < n; i++) {
cf = cachefiles[i];
assert(cf->for_local_checkpoint.lsn == ZERO_LSN.lsn);
cf->for_local_checkpoint = begin_checkpoint_lsn;
}
//Write out all dirty pairs.
{
uint32_t num_pairs = 0;
uint32_t list_size = 256;
PAIR *list = NULL;
XMALLOC_N(list_size, list);
PAIR p;
//TODO: Determine if we can get rid of this use of pending_lock
rwlock_write_lock(&ct->pending_lock, ct->mutex);
for (i=0; i < ct->table_size; i++) {
for (p = ct->table[i]; p; p=p->hash_chain) {
//Only include pairs belonging to cachefiles in the checkpoint
if (p->cachefile->for_local_checkpoint.lsn != begin_checkpoint_lsn.lsn) continue;
if (p->state == CTPAIR_READING)
continue; // skip pairs being read as they will be clean
else if (p->state == CTPAIR_IDLE || p->state == CTPAIR_WRITING) {
if (p->dirty) {
ctpair_add_ref(p);
list[num_pairs] = p;
num_pairs++;
if (num_pairs == list_size) {
list_size *= 2;
XREALLOC_N(list_size, list);
}
}
} else
assert(0);
}
}
rwlock_write_unlock(&ct->pending_lock);
for (i = 0; i < num_pairs; i++) {
p = list[i];
if (!p->already_removed) {
write_pair_for_checkpoint(ct, p, TRUE);
}
ctpair_destroy(p); //Release our reference
// Don't need to unlock and lock cachetable,
// because the cachetable was unlocked and locked while the flush callback ran.
}
toku_free(list);
}
for (i = 0; i < n; i++) {
int r;
cf = cachefiles[i];
rwlock_prefer_read_lock(&cf->fdlock, ct->mutex);
rwlock_write_lock(&cf->checkpoint_lock, ct->mutex);
BOOL own_cachetable_lock = TRUE;
switch (cf->checkpoint_state) {
case CS_NOT_IN_PROGRESS:
break;
case CS_CALLED_BEGIN_CHECKPOINT:
cachetable_unlock(ct);
own_cachetable_lock = FALSE;
assert(cf->checkpoint_state == CS_CALLED_BEGIN_CHECKPOINT);
r = cf->checkpoint_userdata(cf, cf->fd, cf->userdata);
assert(r==0);
cf->checkpoint_state = CS_CALLED_CHECKPOINT;
//FALL THROUGH ON PURPOSE.
case CS_CALLED_CHECKPOINT:
if (own_cachetable_lock)
cachetable_unlock(ct);
//end_checkpoint fsyncs the fd, which needs the fdlock
assert(cf->checkpoint_state == CS_CALLED_CHECKPOINT);
r = cf->end_checkpoint_userdata(cf, cf->fd, cf->userdata);
assert(r==0);
cf->checkpoint_state = CS_NOT_IN_PROGRESS;
cachetable_lock(ct);
assert(cf->most_recent_global_checkpoint_that_finished_early.lsn < ct->lsn_of_checkpoint_in_progress.lsn);
cf->most_recent_global_checkpoint_that_finished_early = ct->lsn_of_checkpoint_in_progress;
break;
default:
assert(FALSE);
}
{ //Begin
assert(cf->checkpoint_state == CS_NOT_IN_PROGRESS);
r = cf->begin_checkpoint_userdata(cf, cf->fd, begin_checkpoint_lsn, cf->userdata);
assert(r==0);
cf->checkpoint_state = CS_CALLED_BEGIN_CHECKPOINT;
}
{ //Middle
assert(cf->checkpoint_state == CS_CALLED_BEGIN_CHECKPOINT);
r = cf->checkpoint_userdata(cf, cf->fd, cf->userdata);
assert(r==0);
cf->checkpoint_state = CS_CALLED_CHECKPOINT;
}
{ //End
assert(cf->checkpoint_state == CS_CALLED_CHECKPOINT);
r = cf->end_checkpoint_userdata(cf, cf->fd, cf->userdata);
assert(r==0);
cf->checkpoint_state = CS_NOT_IN_PROGRESS;
}
assert(cf->for_local_checkpoint.lsn == begin_checkpoint_lsn.lsn);
cf->for_local_checkpoint = ZERO_LSN;
rwlock_write_unlock(&cf->checkpoint_lock);
rwlock_read_unlock(&cf->fdlock);
}
cachetable_unlock(ct);
return 0;
}
......@@ -322,4 +322,5 @@ void toku_cachetable_set_env_dir(CACHETABLE ct, char *env_dir);
char * toku_construct_full_name(int count, ...);
char * toku_cachetable_get_fname_in_cwd(CACHETABLE ct, const char * fname_in_env);
int toku_cachetable_local_checkpoint_for_commit(CACHETABLE ct, TOKUTXN txn, uint32_t n, CACHEFILE cachefiles[n]);
#endif
......@@ -135,6 +135,7 @@ struct tokutxn {
uint32_t current_rollback_hash;
BOOL recovered_from_checkpoint;
ROLLBACK_LOG_NODE pinned_inprogress_rollback_log;
struct toku_list checkpoint_before_commit;
};
struct txninfo {
......
......@@ -13,8 +13,8 @@
#include "memory.h"
#include "x1764.h"
typedef void(*voidfp)(void);
typedef void(*YIELDF)(voidfp, void*);
typedef void(*voidfp)(void *thunk);
typedef void(*YIELDF)(voidfp, void *fpthunk, void *yieldthunk);
struct roll_entry;
#include "logger.h"
......
......@@ -77,8 +77,6 @@ const struct logtype rollbacks[] = {
{"BLOCKNUM", "spilled_tail", 0},
{"u_int32_t", "spilled_tail_hash", 0},
NULLFIELD}},
{"tablelock_on_empty_table", 'L', FA{{"FILENUM", "filenum", 0},
NULLFIELD}},
{"load", 'l', FA{{"BYTESTRING", "old_iname", 0},
{"BYTESTRING", "new_iname", 0},
NULLFIELD}},
......@@ -90,6 +88,7 @@ const struct logtype rollbacks[] = {
const struct logtype logtypes[] = {
// Records produced by checkpoints
{"local_txn_checkpoint", 'c', FA{{"TXNID", "xid", 0}, NULLFIELD}},
{"begin_checkpoint", 'x', FA{{"u_int64_t", "timestamp", 0}, NULLFIELD}},
{"end_checkpoint", 'X', FA{{"TXNID", "xid", 0}, // xid is LSN of begin_checkpoint
{"u_int64_t", "timestamp", 0},
......@@ -143,9 +142,6 @@ const struct logtype logtypes[] = {
{"fdelete", 'U', FA{{"TXNID", "xid", 0},
{"BYTESTRING", "iname", 0},
NULLFIELD}},
{"tablelock_on_empty_table", 'L', FA{{"FILENUM", "filenum", 0},
{"TXNID", "xid", 0},
NULLFIELD}},
{"enq_insert", 'I', FA{{"FILENUM", "filenum", 0},
{"TXNID", "xid", 0},
{"BYTESTRING", "key", 0},
......@@ -432,6 +428,7 @@ generate_log_reader (void) {
fprintf2(cf, hf, "int toku_log_fread_backward (FILE *infile, struct log_entry *le)");
fprintf(hf, ";\n");
fprintf(cf, "{\n");
fprintf(cf, " memset(le, 0, sizeof(*le));\n");
fprintf(cf, " {\n long pos = ftell(infile);\n if (pos<=12) return -1;\n }\n");
fprintf(cf, " int r = fseek(infile, -4, SEEK_CUR); \n");// assert(r==0);\n");
fprintf(cf, " if (r!=0) return errno;\n");
......
......@@ -458,6 +458,24 @@ int toku_logger_fsync (TOKULOGGER logger)
return r;
}
int
toku_logger_fsync_if_lsn_not_fsynced (TOKULOGGER logger, LSN lsn) {
int r = 0;
if (logger->is_panicked) r = EINVAL;
else if (logger->write_log_files && logger->fsynced_lsn.lsn < lsn.lsn) {
r = ml_lock(&logger->input_lock); assert(r==0);
logger->input_lock_ctr++;
r = toku_logger_maybe_fsync(logger, lsn, TRUE);
if (r!=0) {
toku_logger_panic(logger, r);
}
else {
assert(logger->fsynced_lsn.lsn >= lsn.lsn);
}
}
return r;
}
void toku_logger_panic (TOKULOGGER logger, int err) {
logger->panic_errno=err;
logger->is_panicked=TRUE;
......
......@@ -21,6 +21,7 @@ int toku_logger_open_rollback(TOKULOGGER logger, CACHETABLE cachetable, BOOL cre
int toku_logger_close_rollback(TOKULOGGER logger, BOOL recovery_failed);
int toku_logger_fsync (TOKULOGGER logger);
int toku_logger_fsync_if_lsn_not_fsynced(TOKULOGGER logger, LSN lsn);
void toku_logger_panic (TOKULOGGER logger, int err);
int toku_logger_panicked(TOKULOGGER logger);
int toku_logger_is_open(TOKULOGGER logger);
......
......@@ -231,8 +231,8 @@ static const char *recover_state(RECOVER_ENV renv) {
// function supplied to transaction commit and abort
// No yielding is necessary, but it must call the f function if provided.
static void recover_yield(voidfp f, void *UU(extra)) {
if (f) f();
static void recover_yield(voidfp f, void *fpthunk, void *UU(yieldthunk)) {
if (f) f(fpthunk);
}
static int
......@@ -290,6 +290,30 @@ static int internal_recover_fopen_or_fcreate (RECOVER_ENV renv, BOOL must_create
return 0;
}
static int toku_recover_local_txn_checkpoint (struct logtype_local_txn_checkpoint *l, RECOVER_ENV UU(renv)) {
int r;
switch (renv->ss.ss) {
case FORWARD_BETWEEN_CHECKPOINT_BEGIN_END:
case FORWARD_NEWER_CHECKPOINT_END: {
// assert that the transaction exists
TOKUTXN txn = NULL;
r = toku_txnid2txn(renv->logger, l->xid, &txn);
assert(r == 0 && txn != NULL);
r = 0;
break;
}
default:
assert(0);
return 0;
}
return r;
}
static int toku_recover_backward_local_txn_checkpoint (struct logtype_local_txn_checkpoint *UU(l), RECOVER_ENV UU(renv)) {
// nothing
return 0;
}
static int toku_recover_begin_checkpoint (struct logtype_begin_checkpoint *l, RECOVER_ENV renv) {
int r;
switch (renv->ss.ss) {
......@@ -549,9 +573,7 @@ static int toku_recover_xcommit (struct logtype_xcommit *l, RECOVER_ENV renv) {
// commit the transaction
r = toku_txn_commit_with_lsn(txn, TRUE, recover_yield, NULL, l->lsn,
NULL, NULL,
// No need to release locks during recovery.
NULL, NULL, NULL);
NULL, NULL);
assert(r == 0);
// close the transaction
......@@ -709,26 +731,6 @@ static int toku_recover_backward_fdelete (struct logtype_fdelete *UU(l), RECOVER
return 0;
}
static int toku_recover_tablelock_on_empty_table(struct logtype_tablelock_on_empty_table *l, RECOVER_ENV renv) {
struct file_map_tuple *tuple = NULL;
int r = file_map_find(&renv->fmap, l->filenum, &tuple);
if (r==0) {
//Our work is only if it is open
TOKUTXN txn = NULL;
r = toku_txnid2txn(renv->logger, l->xid, &txn);
assert(r == 0);
assert(txn != NULL);
r = toku_brt_note_table_lock(tuple->brt, txn, TRUE);
assert(r == 0);
}
return 0;
}
static int toku_recover_backward_tablelock_on_empty_table(struct logtype_tablelock_on_empty_table *UU(l), RECOVER_ENV UU(renv)) {
// nothing
return 0;
}
static int toku_recover_enq_insert (struct logtype_enq_insert *l, RECOVER_ENV renv) {
int r;
TOKUTXN txn = NULL;
......
......@@ -315,7 +315,7 @@ toku_apply_rollinclude (TXNID xid,
r = func(txn, item, yield, yieldv, oplsn);
if (r!=0) return r;
count++;
if (count%2 == 0) yield(NULL, yieldv);
if (count%2 == 0) yield(NULL, NULL, yieldv);
}
if (next_log.b == spilled_head.b) {
assert(!found_head);
......@@ -382,55 +382,6 @@ toku_rollback_rollinclude (TXNID xid,
return r;
}
int
toku_rollback_tablelock_on_empty_table (FILENUM filenum,
TOKUTXN txn,
YIELDF yield,
void* yield_v,
LSN oplsn)
{
//TODO: Replace truncate function with something that doesn't need to mess with checkpoints.
// on rollback we have to make the file be empty, since we locked an empty table, and then may have done things to it.
CACHEFILE cf;
//printf("%s:%d committing insert %s %s\n", __FILE__, __LINE__, key.data, data.data);
int r = toku_cachefile_of_filenum(txn->logger->ct, filenum, &cf);
if (r==ENOENT) { //Missing file on recovered transaction is not an error
assert(txn->recovered_from_checkpoint);
r = 0;
goto done;
}
assert(r==0);
OMTVALUE brtv=NULL;
r = toku_omt_find_zero(txn->open_brts, find_brt_from_filenum, &filenum, &brtv, NULL, NULL);
assert(r==0);
BRT brt = brtv;
{ //Do NOT truncate the file if
//the file already survived the truncate and was checkpointed.
LSN treelsn = toku_brt_checkpoint_lsn(brt);
if (oplsn.lsn != 0 && oplsn.lsn <= treelsn.lsn) {
r = 0;
goto done;
}
}
toku_poll_txn_progress_function(txn, FALSE, TRUE);
yield(toku_checkpoint_safe_client_lock, yield_v);
toku_poll_txn_progress_function(txn, FALSE, FALSE);
r = toku_brt_truncate(brt);
assert(r==0);
toku_checkpoint_safe_client_unlock();
done:
return r;
}
int
toku_commit_tablelock_on_empty_table (FILENUM filenum, TOKUTXN txn, YIELDF UU(yield), void* UU(yield_v), LSN UU(oplsn))
{
return do_nothing_with_filenum(txn, filenum);
}
int
toku_commit_load (BYTESTRING old_iname,
BYTESTRING UU(new_iname),
......
......@@ -104,7 +104,7 @@ toku_apply_txn (TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn,
r = func(txn, item, yield, yieldv, lsn);
if (r!=0) return r;
count++;
if (count%2 == 0) yield(NULL, yieldv);
if (count%2 == 0) yield(NULL, NULL, yieldv);
}
}
if (next_log.b == txn->spilled_rollback_head.b) {
......@@ -206,6 +206,11 @@ static int note_brt_used_in_txns_parent(OMTVALUE brtv, u_int32_t UU(index), void
//Pass magic "no rollback needed" flag to parent.
brt->h->txnid_that_created_or_locked_when_empty = toku_txn_get_txnid(parent);
}
if (r==0 &&
brt->h->txnid_that_suppressed_recovery_logs == toku_txn_get_txnid(child)) {
//Pass magic "no recovery needed" flag to parent.
brt->h->txnid_that_suppressed_recovery_logs = toku_txn_get_txnid(parent);
}
return r;
}
......@@ -279,6 +284,12 @@ int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn) {
r = toku_omt_iterate(txn->open_brts, note_brt_used_in_txns_parent, txn);
assert(r==0);
// Merge the list of headers that must be checkpointed before commit
while (!toku_list_empty(&txn->checkpoint_before_commit)) {
struct toku_list *list = toku_list_pop(&txn->checkpoint_before_commit);
toku_list_push(&txn->parent->checkpoint_before_commit, list);
}
//If this transaction needs an fsync (if it commits)
//save that in the parent. Since the commit really happens in the root txn.
txn->parent->force_fsync_on_commit |= txn->force_fsync_on_commit;
......@@ -293,6 +304,11 @@ int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn) {
int toku_rollback_abort(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn) {
int r;
//Empty the list
while (!toku_list_empty(&txn->checkpoint_before_commit)) {
toku_list_pop(&txn->checkpoint_before_commit);
}
r = toku_apply_txn(txn, yield, yieldv, lsn, toku_abort_rollback_item);
assert(r==0);
return r;
......@@ -557,7 +573,10 @@ static int remove_txn (OMTVALUE brtv, u_int32_t UU(idx), void *txnv) {
r = toku_omt_delete_at(brt->txns, index);
assert(r==0);
if (txn->txnid64==brt->h->txnid_that_created_or_locked_when_empty) {
brt->h->txnid_that_created_or_locked_when_empty = 0;
brt->h->txnid_that_created_or_locked_when_empty = TXNID_NONE;
}
if (txn->txnid64==brt->h->txnid_that_suppressed_recovery_logs) {
brt->h->txnid_that_suppressed_recovery_logs = TXNID_NONE;
}
if (!toku_brt_zombie_needed(brt) && brt->was_closed) {
//Close immediately.
......
......@@ -147,7 +147,13 @@ static inline int rwlock_readers(RWLOCK rwlock) {
return rwlock->reader;
}
// returns: the number of writers
// returns: the number of readers who are waiting for the lock
static inline int rwlock_blocked_readers(RWLOCK rwlock) {
return rwlock->want_read;
}
// returns: the number of writers who are waiting for the lock
static inline int rwlock_blocked_writers(RWLOCK rwlock) {
return rwlock->want_write;
......
......@@ -5,6 +5,7 @@
#include "includes.h"
#include "txn.h"
#include "checkpoint.h"
int toku_txn_begin_txn (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER logger) {
return toku_txn_begin_with_xid(parent_tokutxn, tokutxn, logger, 0);
......@@ -70,6 +71,7 @@ int toku_txn_begin_with_xid (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGE
result->rollentry_raw_count = 0;
result->force_fsync_on_commit = FALSE;
result->recovered_from_checkpoint = FALSE;
toku_list_init(&result->checkpoint_before_commit);
*tokutxn = result;
return 0;
......@@ -112,16 +114,52 @@ toku_txn_load_txninfo (TOKUTXN txn, TXNINFO info) {
// Doesn't close the txn, just performs the commit operations.
int toku_txn_commit_txn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra,
void (*release_locks)(void*), void(*reacquire_locks)(void*), void *locks_thunk) {
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) {
return toku_txn_commit_with_lsn(txn, nosync, yield, yieldv, ZERO_LSN,
poll, poll_extra,
release_locks, reacquire_locks, locks_thunk);
poll, poll_extra);
}
struct xcommit_info {
int r;
TOKUTXN txn;
int do_fsync;
};
//Called during a yield (ydb lock NOT held).
static void
local_checkpoints_and_log_xcommit(void *thunk) {
struct xcommit_info *info = thunk;
TOKUTXN txn = info->txn;
if (!txn->parent && !toku_list_empty(&txn->checkpoint_before_commit)) {
//Do local checkpoints that must happen BEFORE logging xcommit
uint32_t num_cachefiles = 0;
uint32_t list_size = 16;
CACHEFILE *cachefiles= NULL;
XMALLOC_N(list_size, cachefiles);
while (!toku_list_empty(&txn->checkpoint_before_commit)) {
struct toku_list *list = toku_list_pop(&txn->checkpoint_before_commit);
struct brt_header *h = toku_list_struct(list,
struct brt_header,
checkpoint_before_commit_link);
cachefiles[num_cachefiles++] = h->cf;
if (num_cachefiles == list_size) {
list_size *= 2;
XREALLOC_N(list_size, cachefiles);
}
}
assert(num_cachefiles);
CACHETABLE ct = toku_cachefile_get_cachetable(cachefiles[0]);
int r = toku_cachetable_local_checkpoint_for_commit(ct, txn, num_cachefiles, cachefiles);
assert(r==0);
}
info->r = toku_log_xcommit(txn->logger, (LSN*)0, info->do_fsync, txn->txnid64); // exits holding neither of the tokulogger locks.
}
int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, LSN oplsn,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra,
void (*release_locks)(void*), void(*reacquire_locks)(void*), void *locks_thunk) {
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) {
int r;
// panic handled in log_commit
......@@ -131,9 +169,15 @@ int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv
txn->progress_poll_fun = poll;
txn->progress_poll_fun_extra = poll_extra;
if (release_locks) release_locks(locks_thunk);
r = toku_log_xcommit(txn->logger, (LSN*)0, do_fsync, txn->txnid64); // exits holding neither of the tokulogger locks.
if (reacquire_locks) reacquire_locks(locks_thunk);
{
struct xcommit_info info = {
.r = 0,
.txn = txn,
.do_fsync = do_fsync
};
yield(local_checkpoints_and_log_xcommit, &info, yieldv);
r = info.r;
}
if (r!=0)
return r;
r = toku_rollback_commit(txn, yield, yieldv, oplsn);
......@@ -152,6 +196,7 @@ int toku_txn_abort_with_lsn(TOKUTXN txn, YIELDF yield, void *yieldv, LSN oplsn,
// Must undo everything. Must undo it all in reverse order.
// Build the reverse list
//printf("%s:%d abort\n", __FILE__, __LINE__);
txn->progress_poll_fun = poll;
txn->progress_poll_fun_extra = poll_extra;
int r=0;
......
......@@ -10,11 +10,9 @@ int toku_txn_begin_with_xid (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGE
int toku_txn_load_txninfo (TOKUTXN txn, TXNINFO info);
int toku_txn_commit_txn (TOKUTXN txn, int nosync, YIELDF yield, void *yieldv,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra,
void (*release_locks)(void*), void(*reacquire_locks)(void*), void *locks_thunk);
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra);
int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, LSN oplsn,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra,
void (*release_locks)(void*), void(*reacquire_locks)(void*), void *locks_thunk);
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra);
int toku_txn_abort_txn(TOKUTXN txn, YIELDF yield, void *yieldv,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra);
......
......@@ -70,15 +70,16 @@ static void free_loader_resources(DB_LOADER *loader)
}
if (loader->i->ekeys) toku_free(loader->i->ekeys);
if (loader->i->evals) toku_free(loader->i->evals);
for (int i=0; i<loader->i->N; i++) {
if (loader->i->inames_in_env[i]) toku_free(loader->i->inames_in_env[i]);
}
if (loader->i->err_key.data) toku_free(loader->i->err_key.data);
if (loader->i->err_val.data) toku_free(loader->i->err_val.data);
if (loader->i->inames_in_env) toku_free(loader->i->inames_in_env);
if (loader->i->inames_in_env) {
for (int i=0; i<loader->i->N; i++) {
if (loader->i->inames_in_env[i]) toku_free(loader->i->inames_in_env[i]);
}
toku_free(loader->i->inames_in_env);
}
if (loader->i->temp_file_template) toku_free(loader->i->temp_file_template);
if (loader->i->brt_loader) toku_free(loader->i->brt_loader);
......@@ -162,7 +163,7 @@ int toku_loader_create_loader(DB_ENV *env,
int r;
// lock tables and check empty
for(int i=0;i<N;i++) {
r = toku_db_pre_acquire_table_lock(dbs[i], txn);
r = toku_db_pre_acquire_table_lock(dbs[i], txn, TRUE);
if ( r!=0 ) {
free_loader(loader);
return -1;
......@@ -200,9 +201,10 @@ int toku_loader_create_loader(DB_ENV *env,
LSN load_lsn;
r = locked_ydb_load_inames (env, txn, N, dbs, new_inames_in_env, &load_lsn);
if ( r!=0 ) {
toku_free(new_inames_in_env);
toku_free(descriptors);
free_loader(loader);
return -1;
return r;
}
toku_brt_loader_open(&loader->i->brt_loader,
loader->i->env->i->cachetable,
......
......@@ -66,6 +66,7 @@
static DB_ENV *env = NULL;
static DB_TXN *txn_parent = NULL;
static DB_TXN *txn_child = NULL;
static DB_TXN *txn_hold_dname_lock = NULL;
static DB *db;
static char *dname = DICT_0;
static DBT key;
......@@ -108,6 +109,37 @@ end_env(void) {
env = NULL;
}
static void
start_txn_prevent_dname_lock(void) {
assert(env!=NULL);
assert(txn_hold_dname_lock==NULL);
int r;
r=env->txn_begin(env, 0, &txn_hold_dname_lock, 0);
CKERR(r);
DB *db2;
r = db_create(&db2, env, 0);
CKERR(r);
r=db2->open(db2, txn_hold_dname_lock, dname, 0, DB_BTREE, DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO);
CKERR(r);
r = db2->close(db2, 0);
}
static void nopoll(TOKU_TXN_PROGRESS UU(progress), void *UU(extra)) {
assert(FALSE);
}
static void
commit_txn_prevent_dname_lock(void) {
assert(env!=NULL);
assert(txn_hold_dname_lock!=NULL);
int r;
r = txn_hold_dname_lock->commit_with_progress(txn_hold_dname_lock, 0, nopoll, NULL);
CKERR(r);
txn_hold_dname_lock = NULL;
}
static void
start_txn(void) {
assert(env!=NULL);
......@@ -301,43 +333,29 @@ progress_test_1(int n, int commit) {
end_env();
}
struct progress_stall_expect {
int num_calls;
BOOL has_been_stalled;
};
static void stall_poll(TOKU_TXN_PROGRESS progress, void *extra) {
struct progress_stall_expect *info = extra;
info->num_calls++;
assert(info->num_calls <= 2);
assert(progress->is_commit == FALSE);
if (!info->has_been_stalled) {
assert(info->num_calls==1);
assert(progress->stalled_on_checkpoint);
info->has_been_stalled = TRUE;
}
else {
assert(info->num_calls==2);
assert(!progress->stalled_on_checkpoint);
}
}
static void
abort_txn_stall_checkpoint(void) {
//We have disabled the norollback log fallback optimization.
//Checkpoint will not stall
assert(env!=NULL);
assert(txn_parent);
assert(!txn_child);
struct progress_stall_expect extra = {
.num_calls = 0,
.has_been_stalled = FALSE
};
int r;
r=txn_parent->abort_with_progress(txn_parent, nopoll, NULL);
CKERR(r);
txn_parent = NULL;
}
static void
abort_txn_nostall_checkpoint(void) {
assert(env!=NULL);
assert(txn_parent);
assert(!txn_child);
int r;
r=txn_parent->abort_with_progress(txn_parent, stall_poll, &extra);
r=txn_parent->abort_with_progress(txn_parent, nopoll, NULL);
CKERR(r);
assert(extra.num_calls == 2);
txn_parent = NULL;
}
......@@ -358,12 +376,25 @@ progress_test_2(void) {
start_env();
open_db();
start_txn();
start_txn_prevent_dname_lock();
lock();
commit_txn_prevent_dname_lock();
abort_txn_stall_checkpoint();
close_db();
end_env();
}
static void
progress_test_3(void) {
start_env();
open_db();
start_txn();
lock();
abort_txn_nostall_checkpoint();
close_db();
end_env();
}
int
test_main (int argc, char * const argv[])
{
......@@ -373,5 +404,6 @@ test_main (int argc, char * const argv[])
progress_test_1(4, commit);
}
progress_test_2();
progress_test_3();
return 0;
}
......@@ -40,8 +40,8 @@ do_x1_shutdown (void) {
{
DB_LOADER *loader;
DB *dbs[1] = {db};
uint32_t db_flags[1] = {DB_NOOVERWRITE};
uint32_t dbt_flags[1];
uint32_t db_flags[1] = {DB_NOOVERWRITE};
uint32_t dbt_flags[1] = {0};
uint32_t loader_flags = 0;
r = env->create_loader(env, tid, &loader, NULL, 1, dbs, db_flags, dbt_flags, loader_flags);
......
......@@ -197,6 +197,6 @@ struct __toku_dbc_internal {
struct simple_dbt *skey,*sval;
};
int toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn);
int toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn, BOOL just_lock);
#endif
......@@ -1830,21 +1830,12 @@ static int toku_txn_release_locks(DB_TXN* txn) {
// Yield the lock so someone else can work, and then reacquire the lock.
// Useful while processing commit or rollback logs, to allow others to access the system.
static void ydb_yield (voidfp f, void *UU(v)) {
static void ydb_yield (voidfp f, void *fv, void *UU(v)) {
toku_ydb_unlock();
if (f) f();
if (f) f(fv);
toku_ydb_lock();
}
static void release_ydb_lock_callback (void *ignore __attribute__((__unused__))) {
//printf("%8.6fs Thread %ld release\n", get_tdiff(), pthread_self());
toku_ydb_unlock();
}
static void reacquire_ydb_lock_callback (void *ignore __attribute__((__unused__))) {
//printf("%8.6fs Thread %ld reacquire\n", get_tdiff(), pthread_self());
toku_ydb_lock();
}
static int toku_txn_commit(DB_TXN * txn, u_int32_t flags,
TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra) {
if (!txn) return EINVAL;
......@@ -1886,8 +1877,7 @@ static int toku_txn_commit(DB_TXN * txn, u_int32_t flags,
// Calls ydb_yield(NULL) occasionally
//r = toku_logger_commit(db_txn_struct_i(txn)->tokutxn, nosync, ydb_yield, NULL);
r = toku_txn_commit_txn(db_txn_struct_i(txn)->tokutxn, nosync, ydb_yield, NULL,
poll, poll_extra,
release_ydb_lock_callback, reacquire_ydb_lock_callback, NULL);
poll, poll_extra);
if (r!=0 && !toku_env_is_panicked(txn->mgrp)) {
txn->mgrp->i->is_panicked = r;
......@@ -4729,7 +4719,7 @@ toku_env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbna
if (r==0) {
DB* zombie = env_get_zombie_db_with_dname(env, dname);
if (zombie)
r = toku_db_pre_acquire_table_lock(zombie, child);
r = toku_db_pre_acquire_table_lock(zombie, child, TRUE);
if (r!=0)
toku_ydb_do_error(env, r, "Cannot remove dictionary.\n");
}
......@@ -4846,7 +4836,7 @@ toku_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbnam
if (r==0) {
zombie = env_get_zombie_db_with_dname(env, dname);
if (zombie)
r = toku_db_pre_acquire_table_lock(zombie, child);
r = toku_db_pre_acquire_table_lock(zombie, child, TRUE);
if (r!=0)
toku_ydb_do_error(env, r, "Cannot rename dictionary.\n");
}
......@@ -5041,7 +5031,7 @@ static int toku_db_pre_acquire_read_lock(DB *db, DB_TXN *txn, const DBT *key_lef
//static int toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn) {
// needed by loader.c
int toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn) {
int toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn, BOOL just_lock) {
HANDLE_PANICKED_DB(db);
if (!db->i->lt || !txn) return EINVAL;
......@@ -5053,8 +5043,40 @@ int toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn) {
r = toku_lt_acquire_range_write_lock(db->i->lt, db, id_anc,
toku_lt_neg_infinity, toku_lt_neg_infinity,
toku_lt_infinity, toku_lt_infinity);
if (r==0) {
r = toku_brt_note_table_lock(db->i->brt, db_txn_struct_i(txn)->tokutxn, FALSE); // tell the BRT layer that the table is locked (so that it can reduce the amount of rollback data.
if (r==0 && !just_lock &&
!toku_brt_is_recovery_logging_suppressed(db->i->brt) &&
toku_brt_is_empty(db->i->brt) &&
db_is_nodup(db) //TODO: Remove this check once we kill dupsort support.
) {
//Try to suppress both rollback and recovery logs
DB_LOADER *loader;
DB *dbs[1] = {db};
uint32_t db_flags[1] = {DB_NOOVERWRITE};
uint32_t dbt_flags[1] = {0};
uint32_t loader_flags = 0;
DB_ENV *env = db->dbenv;
toku_ydb_unlock(); //Cannot hold ydb lock when creating loader
int r_loader;
r_loader = env->create_loader(env, txn, &loader, NULL, 1, dbs, db_flags, dbt_flags, loader_flags);
if (r_loader==0) {
int r2;
r2 = loader->set_error_callback(loader, NULL, NULL);
assert(r2==0);
r2 = loader->set_poll_function(loader, NULL, NULL);
assert(r2==0);
// close the loader
r2 = loader->close(loader);
assert(r2==0);
toku_brt_suppress_recovery_logs(db->i->brt, db_txn_struct_i(txn)->tokutxn);
}
else if (r_loader != DB_LOCK_NOTGRANTED) {
//Lock not granted is not an error.
//It just means we cannot use the loader optimization.
assert(r==0);
r = r_loader;
}
toku_ydb_lock(); //Reaquire ydb lock.
}
return r;
......@@ -5174,7 +5196,7 @@ static int locked_db_pre_acquire_read_lock(DB *db, DB_TXN *txn, const DBT *key_l
static int locked_db_pre_acquire_table_lock(DB *db, DB_TXN *txn) {
toku_ydb_lock();
int r = toku_db_pre_acquire_table_lock(db, txn);
int r = toku_db_pre_acquire_table_lock(db, txn, FALSE);
toku_ydb_unlock();
return r;
}
......@@ -5202,7 +5224,7 @@ static int toku_db_truncate(DB *db, DB_TXN *txn, u_int32_t *row_count, u_int32_t
// acquire a table lock
if (txn) {
r = toku_db_pre_acquire_table_lock(db, txn);
r = toku_db_pre_acquire_table_lock(db, txn, TRUE);
if (r != 0)
return r;
}
......@@ -5601,10 +5623,10 @@ ydb_load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[N], char * new_iname
char hint[strlen(dname) + 1];
create_iname_hint(dname, hint);
char * new_iname = create_iname(env, xid, hint, i); // allocates memory for iname_in_env
new_inames_in_env[i] = new_iname;
toku_fill_dbt(&iname_dbt, new_iname, strlen(new_iname) + 1); // iname_in_env goes in directory
rval = toku_db_put(env->i->directory, child, &dname_dbt, &iname_dbt, DB_YESOVERWRITE); // DB_YESOVERWRITE necessary
if (rval) break;
new_inames_in_env[i] = new_iname;
}
// Generate load log entries.
......
......@@ -360,17 +360,23 @@ else
endif
.PHONY: tags
tags: cscope.out TAGS
tags: $(TOKUROOT)cscope.out $(TOKUROOT)TAGS
ifneq ($(TOKUROOT),./)
TAGS: $(TOKUROOT)*/*.[ch] $(TOKUROOT)*/*/*.[ch] $(TOKUROOT)*/*/*/*.[ch]
rm -f `find $(TOKUROOT) -type f -name TAGS` #Delete all other tag files
etags $(TOKUROOT)*/*.[ch] $(TOKUROOT)*/*/*.[ch] $(TOKUROOT)*/*/*/*.[ch]
endif
$(TOKUROOT)TAGS: $(TOKUROOT)*/*.[ch] $(TOKUROOT)*/*/*.[ch] $(TOKUROOT)*/*/*/*.[ch]
cd $(TOKUROOT) && etags */*.[ch] */*/*.[ch] */*/*/*.[ch]
$(TOKUROOT)cscope.files: $(TOKUROOT)*/*.[ch] $(TOKUROOT)*/*/*.[ch] $(TOKUROOT)*/*/*/*.[ch]
cd $(TOKUROOT) && (echo */*.[ch] */*/*.[ch] */*/*/*.[ch] | tr " " "\n") > $(notdir $@)
cscope.files: $(TOKUROOT)*/*.[ch] $(TOKUROOT)*/*/*.[ch] $(TOKUROOT)*/*/*/*.[ch]
(echo $(TOKUROOT)*/*.[ch] $(TOKUROOT)*/*/*.[ch] $(TOKUROOT)*/*/*/*.[ch] | tr " " "\n") > $@
$(TOKUROOT)cscope.in.out $(TOKUROOT)cscope.po.out: $(TOKUROOT)cscope.out;
cscope.out: cscope.files $(TOKUROOT)*/*.[ch] $(TOKUROOT)*/*/*.[ch] $(TOKUROOT)*/*/*/*.[ch]
$(CSCOPE) -qb
$(TOKUROOT)cscope.out: $(TOKUROOT)cscope.files $(TOKUROOT)*/*.[ch] $(TOKUROOT)*/*/*.[ch] $(TOKUROOT)*/*/*/*.[ch]
cd $(TOKUROOT) && $(CSCOPE) -qb
.PHONY: clean clean-default %.dir.clean
clean: clean-default
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment