Commit 33cb0047 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:4097], [t:3971], merge to main

git-svn-id: file:///svn/toku/tokudb@38043 c7de825b-a66e-492c-adef-691d508d4ae1
parent 5d9eaf8e
......@@ -753,7 +753,7 @@ int toku_verify_brtnode (BRT brt, MSN rootmsn, MSN parentmsn,
void toku_brtheader_free (struct brt_header *h);
int toku_brtheader_close (CACHEFILE cachefile, int fd, void *header_v, char **error_string, BOOL oplsn_valid, LSN oplsn) __attribute__((__warn_unused_result__));
int toku_brtheader_begin_checkpoint (CACHEFILE cachefile, int fd, LSN checkpoint_lsn, void *header_v) __attribute__((__warn_unused_result__));
int toku_brtheader_begin_checkpoint (LSN checkpoint_lsn, void *header_v) __attribute__((__warn_unused_result__));
int toku_brtheader_checkpoint (CACHEFILE cachefile, int fd, void *header_v) __attribute__((__warn_unused_result__));
int toku_brtheader_end_checkpoint (CACHEFILE cachefile, int fd, void *header_v) __attribute__((__warn_unused_result__));
int toku_maybe_upgrade_brt(BRT t) __attribute__((__warn_unused_result__));
......
......@@ -3667,7 +3667,7 @@ int toku_brt_create_cachetable(CACHETABLE *ct, long cachesize, LSN initial_lsn,
// Create checkpoint-in-progress versions of header and translation (btt) (and fifo for now...).
//Has access to fd (it is protected).
int
toku_brtheader_begin_checkpoint (CACHEFILE UU(cachefile), int UU(fd), LSN checkpoint_lsn, void *header_v) {
toku_brtheader_begin_checkpoint (LSN checkpoint_lsn, void *header_v) {
struct brt_header *h = header_v;
int r = h->panic;
if (r==0) {
......@@ -3897,7 +3897,7 @@ toku_brtheader_close (CACHEFILE cachefile, int fd, void *header_v, char **malloc
}
int r2;
//assert(lsn.lsn!=0);
r2 = toku_brtheader_begin_checkpoint(cachefile, fd, lsn, header_v);
r2 = toku_brtheader_begin_checkpoint(lsn, header_v);
if (r==0) r = r2;
r2 = toku_brtheader_checkpoint(cachefile, fd, h);
if (r==0) r = r2;
......
......@@ -279,7 +279,7 @@ struct cachefile {
int (*log_fassociate_during_checkpoint)(CACHEFILE cf, void *userdata); // When starting a checkpoint we must log all open files.
int (*log_suppress_rollback_during_checkpoint)(CACHEFILE cf, void *userdata); // When starting a checkpoint we must log which files need rollbacks suppressed
int (*close_userdata)(CACHEFILE cf, int fd, void *userdata, char **error_string, BOOL lsnvalid, LSN); // when closing the last reference to a cachefile, first call this function.
int (*begin_checkpoint_userdata)(CACHEFILE cf, int fd, LSN lsn_of_checkpoint, void *userdata); // before checkpointing cachefiles call this function.
int (*begin_checkpoint_userdata)(LSN lsn_of_checkpoint, void *userdata); // before checkpointing cachefiles call this function.
int (*checkpoint_userdata)(CACHEFILE cf, int fd, void *userdata); // when checkpointing a cachefile, call this function.
int (*end_checkpoint_userdata)(CACHEFILE cf, int fd, void *userdata); // after checkpointing cachefiles call this function.
int (*note_pin_by_checkpoint)(CACHEFILE cf, void *userdata); // add a reference to the userdata to prevent it from being removed from memory
......@@ -287,11 +287,9 @@ struct cachefile {
toku_pthread_cond_t openfd_wait; // openfd must wait until file is fully closed (purged from cachetable) if file is opened and closed simultaneously
toku_pthread_cond_t closefd_wait; // toku_cachefile_of_iname_and_add_reference() must wait until file is fully closed (purged from cachetable) if run while file is being closed.
u_int32_t closefd_waiting; // Number of threads waiting on closefd_wait (0 or 1, error otherwise).
struct rwlock checkpoint_lock; //protects checkpoint callback functions
//acts as fast mutex by only using 'write-lock'
LSN most_recent_global_checkpoint_that_finished_early;
LSN for_local_checkpoint;
enum cachefile_checkpoint_state checkpoint_state; //Protected by checkpoint_lock
enum cachefile_checkpoint_state checkpoint_state;
int n_background_jobs; // how many jobs in the cachetable's kibbutz or
// on the cleaner thread (anything
......@@ -758,7 +756,6 @@ int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd
ct->cachefiles = newcf;
rwlock_init(&newcf->fdlock);
rwlock_init(&newcf->checkpoint_lock);
newcf->most_recent_global_checkpoint_that_finished_early = ZERO_LSN;
newcf->for_local_checkpoint = ZERO_LSN;
newcf->checkpoint_state = CS_NOT_IN_PROGRESS;
......@@ -956,9 +953,6 @@ int toku_cachefile_close (CACHEFILE *cfp, char **error_string, BOOL oplsn_valid,
//assert(r == 0);
rwlock_write_unlock(&cf->fdlock);
rwlock_destroy(&cf->fdlock);
rwlock_write_lock(&cf->checkpoint_lock, ct->mutex); //Just to make sure we can get it
rwlock_write_unlock(&cf->checkpoint_lock);
rwlock_destroy(&cf->checkpoint_lock);
assert(toku_list_empty(&cf->pairs_for_cachefile));
toku_free(cf);
*cfp = NULL;
......@@ -1010,9 +1004,6 @@ int toku_cachefile_close (CACHEFILE *cfp, char **error_string, BOOL oplsn_valid,
cachetable_lock(ct);
rwlock_write_unlock(&cf->fdlock);
rwlock_destroy(&cf->fdlock);
rwlock_write_lock(&cf->checkpoint_lock, ct->mutex); //Just to make sure we can get it
rwlock_write_unlock(&cf->checkpoint_lock);
rwlock_destroy(&cf->checkpoint_lock);
assert(toku_list_empty(&cf->pairs_for_cachefile));
cachetable_unlock(ct);
......@@ -3313,14 +3304,10 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
cachefiles_lock(ct);
for (cf = ct->cachefiles_in_checkpoint; cf; cf=cf->next_in_checkpoint) {
if (cf->begin_checkpoint_userdata) {
rwlock_prefer_read_lock(&cf->fdlock, ct->mutex);
rwlock_write_lock(&cf->checkpoint_lock, ct->mutex);
assert(cf->checkpoint_state == CS_NOT_IN_PROGRESS);
int r = cf->begin_checkpoint_userdata(cf, cf->fd, ct->lsn_of_checkpoint_in_progress, cf->userdata);
int r = cf->begin_checkpoint_userdata(ct->lsn_of_checkpoint_in_progress, cf->userdata);
assert(r==0);
cf->checkpoint_state = CS_CALLED_BEGIN_CHECKPOINT;
rwlock_write_unlock(&cf->checkpoint_lock);
rwlock_read_unlock(&cf->fdlock);
}
}
cachefiles_unlock(ct);
......@@ -3374,7 +3361,6 @@ toku_cachetable_end_checkpoint(CACHETABLE ct, TOKULOGGER logger,
for (cf = ct->cachefiles_in_checkpoint; cf; cf=cf->next_in_checkpoint) {
if (cf->checkpoint_userdata) {
rwlock_prefer_read_lock(&cf->fdlock, ct->mutex);
rwlock_write_lock(&cf->checkpoint_lock, ct->mutex);
if (!logger || ct->lsn_of_checkpoint_in_progress.lsn != cf->most_recent_global_checkpoint_that_finished_early.lsn) {
assert(ct->lsn_of_checkpoint_in_progress.lsn >= cf->most_recent_global_checkpoint_that_finished_early.lsn);
cachetable_unlock(ct);
......@@ -3389,7 +3375,6 @@ toku_cachetable_end_checkpoint(CACHETABLE ct, TOKULOGGER logger,
else {
assert(cf->checkpoint_state == CS_NOT_IN_PROGRESS);
}
rwlock_write_unlock(&cf->checkpoint_lock);
rwlock_read_unlock(&cf->fdlock);
}
}
......@@ -3403,7 +3388,6 @@ toku_cachetable_end_checkpoint(CACHETABLE ct, TOKULOGGER logger,
for (cf = ct->cachefiles_in_checkpoint; cf; cf=cf->next_in_checkpoint) {
if (cf->end_checkpoint_userdata) {
rwlock_prefer_read_lock(&cf->fdlock, ct->mutex);
rwlock_write_lock(&cf->checkpoint_lock, ct->mutex);
if (!logger || ct->lsn_of_checkpoint_in_progress.lsn != cf->most_recent_global_checkpoint_that_finished_early.lsn) {
assert(ct->lsn_of_checkpoint_in_progress.lsn >= cf->most_recent_global_checkpoint_that_finished_early.lsn);
cachetable_unlock(ct);
......@@ -3415,7 +3399,6 @@ toku_cachetable_end_checkpoint(CACHETABLE ct, TOKULOGGER logger,
cachetable_lock(ct);
}
assert(cf->checkpoint_state == CS_NOT_IN_PROGRESS);
rwlock_write_unlock(&cf->checkpoint_lock);
rwlock_read_unlock(&cf->fdlock);
}
}
......@@ -3618,7 +3601,7 @@ toku_cachefile_set_userdata (CACHEFILE cf,
int (*log_suppress_rollback_during_checkpoint)(CACHEFILE, void*),
int (*close_userdata)(CACHEFILE, int, void*, char**, BOOL, LSN),
int (*checkpoint_userdata)(CACHEFILE, int, void*),
int (*begin_checkpoint_userdata)(CACHEFILE, int, LSN, void*),
int (*begin_checkpoint_userdata)(LSN, void*),
int (*end_checkpoint_userdata)(CACHEFILE, int, void*),
int (*note_pin_by_checkpoint)(CACHEFILE, void*),
int (*note_unpin_by_checkpoint)(CACHEFILE, void*)) {
......@@ -3841,133 +3824,3 @@ cleaner_thread (void *cachetable_v)
return 0;
}
#if 0
int
toku_cachetable_local_checkpoint_for_commit (CACHETABLE ct, TOKUTXN txn, uint32_t n, CACHEFILE cachefiles[n]) {
cachetable_lock(ct);
local_checkpoint++;
local_checkpoint_files += n;
LSN begin_checkpoint_lsn = ZERO_LSN;
uint32_t i;
TOKULOGGER logger = txn->logger;
CACHEFILE cf;
assert(logger); //Need transaction, so there must be a logger
{
int r = toku_log_local_txn_checkpoint(logger, &begin_checkpoint_lsn, 0, txn->txnid64);
assert(r==0);
}
for (i = 0; i < n; i++) {
cf = cachefiles[i];
assert(cf->for_local_checkpoint.lsn == ZERO_LSN.lsn);
cf->for_local_checkpoint = begin_checkpoint_lsn;
}
//Write out all dirty pairs.
{
uint32_t num_pairs = 0;
uint32_t list_size = 256;
PAIR *list = NULL;
XMALLOC_N(list_size, list);
PAIR p;
//TODO: Determine if we can get rid of this use of pending_lock
rwlock_write_lock(&ct->pending_lock, ct->mutex);
for (i=0; i < ct->table_size; i++) {
for (p = ct->table[i]; p; p=p->hash_chain) {
//Only include pairs belonging to cachefiles in the checkpoint
if (p->cachefile->for_local_checkpoint.lsn != begin_checkpoint_lsn.lsn) continue;
if (p->state == CTPAIR_READING)
continue; // skip pairs being read as they will be clean
else if (p->state == CTPAIR_IDLE || p->state == CTPAIR_WRITING) {
if (p->dirty) {
ctpair_add_ref(p);
list[num_pairs] = p;
num_pairs++;
if (num_pairs == list_size) {
list_size *= 2;
XREALLOC_N(list_size, list);
}
}
} else
assert(0);
}
}
rwlock_write_unlock(&ct->pending_lock);
for (i = 0; i < num_pairs; i++) {
p = list[i];
if (!p->already_removed) {
write_pair_for_checkpoint(ct, p, TRUE);
}
ctpair_destroy(p); //Release our reference
// Don't need to unlock and lock cachetable,
// because the cachetable was unlocked and locked while the flush callback ran.
}
toku_free(list);
}
for (i = 0; i < n; i++) {
int r;
cf = cachefiles[i];
rwlock_prefer_read_lock(&cf->fdlock, ct->mutex);
rwlock_write_lock(&cf->checkpoint_lock, ct->mutex);
BOOL own_cachetable_lock = TRUE;
switch (cf->checkpoint_state) {
case CS_NOT_IN_PROGRESS:
break;
case CS_CALLED_BEGIN_CHECKPOINT:
cachetable_unlock(ct);
own_cachetable_lock = FALSE;
assert(cf->checkpoint_state == CS_CALLED_BEGIN_CHECKPOINT);
r = cf->checkpoint_userdata(cf, cf->fd, cf->userdata);
assert(r==0);
cf->checkpoint_state = CS_CALLED_CHECKPOINT;
//FALL THROUGH ON PURPOSE.
case CS_CALLED_CHECKPOINT:
if (own_cachetable_lock)
cachetable_unlock(ct);
//end_checkpoint fsyncs the fd, which needs the fdlock
assert(cf->checkpoint_state == CS_CALLED_CHECKPOINT);
r = cf->end_checkpoint_userdata(cf, cf->fd, cf->userdata);
assert(r==0);
cf->checkpoint_state = CS_NOT_IN_PROGRESS;
cachetable_lock(ct);
assert(cf->most_recent_global_checkpoint_that_finished_early.lsn < ct->lsn_of_checkpoint_in_progress.lsn);
cf->most_recent_global_checkpoint_that_finished_early = ct->lsn_of_checkpoint_in_progress;
local_checkpoint_during_checkpoint++;
break;
default:
assert(FALSE);
}
{ //Begin
assert(cf->checkpoint_state == CS_NOT_IN_PROGRESS);
r = cf->begin_checkpoint_userdata(cf, cf->fd, begin_checkpoint_lsn, cf->userdata);
assert(r==0);
cf->checkpoint_state = CS_CALLED_BEGIN_CHECKPOINT;
}
{ //Middle
assert(cf->checkpoint_state == CS_CALLED_BEGIN_CHECKPOINT);
r = cf->checkpoint_userdata(cf, cf->fd, cf->userdata);
assert(r==0);
cf->checkpoint_state = CS_CALLED_CHECKPOINT;
}
{ //End
assert(cf->checkpoint_state == CS_CALLED_CHECKPOINT);
r = cf->end_checkpoint_userdata(cf, cf->fd, cf->userdata);
assert(r==0);
cf->checkpoint_state = CS_NOT_IN_PROGRESS;
}
assert(cf->for_local_checkpoint.lsn == begin_checkpoint_lsn.lsn);
cf->for_local_checkpoint = ZERO_LSN;
rwlock_write_unlock(&cf->checkpoint_lock);
rwlock_read_unlock(&cf->fdlock);
}
cachetable_unlock(ct);
return 0;
}
#endif
......@@ -189,7 +189,7 @@ void toku_cachefile_set_userdata(CACHEFILE cf, void *userdata,
int (*log_suppress_rollback_during_checkpoint)(CACHEFILE, void*),
int (*close_userdata)(CACHEFILE, int, void*, char **/*error_string*/, BOOL, LSN),
int (*checkpoint_userdata)(CACHEFILE, int, void*),
int (*begin_checkpoint_userdata)(CACHEFILE, int, LSN, void*),
int (*begin_checkpoint_userdata)(LSN, void*),
int (*end_checkpoint_userdata)(CACHEFILE, int, void*),
int (*note_pin_by_checkpoint)(CACHEFILE, void*),
int (*note_unpin_by_checkpoint)(CACHEFILE, void*));
......@@ -544,10 +544,6 @@ void toku_cachetable_set_lock_unlock_for_io (CACHETABLE ct, void (*ydb_lock_call
// Effect: When we do I/O we may need to release locks (e.g., the ydb lock). These functions release the lock acquire the lock.
#if 0
int toku_cachetable_local_checkpoint_for_commit(CACHETABLE ct, TOKUTXN txn, uint32_t n, CACHEFILE cachefiles[]);
#endif
void cachefile_kibbutz_enq (CACHEFILE cf, void (*f)(void*), void *extra);
// Effect: Add a job to the cachetable's collection of work to do. Note that function f must call remove_background_job()
......
......@@ -266,8 +266,6 @@ static void *checkpoints(void *arg) {
static int
test_begin_checkpoint (
CACHEFILE UU(cachefile),
int UU(fd),
LSN UU(checkpoint_lsn),
void* UU(header_v))
{
......
......@@ -338,38 +338,9 @@ BOOL toku_txn_requires_checkpoint(TOKUTXN txn) {
//Called during a yield (ydb lock NOT held).
static void
local_checkpoints_and_log_xcommit(void *thunk) {
log_xcommit(void *thunk) {
struct xcommit_info *info = thunk;
TOKUTXN txn = info->txn;
#if 0
if (!txn->parent && !toku_list_empty(&txn->checkpoint_before_commit)) {
toku_poll_txn_progress_function(txn, TRUE, TRUE);
//Do local checkpoints that must happen BEFORE logging xcommit
uint32_t num_cachefiles = 0;
uint32_t list_size = 16;
CACHEFILE *cachefiles= NULL;
XMALLOC_N(list_size, cachefiles);
while (!toku_list_empty(&txn->checkpoint_before_commit)) {
struct toku_list *list = toku_list_pop(&txn->checkpoint_before_commit);
struct brt_header *h = toku_list_struct(list,
struct brt_header,
checkpoint_before_commit_link);
cachefiles[num_cachefiles++] = h->cf;
if (num_cachefiles == list_size) {
list_size *= 2;
XREALLOC_N(list_size, cachefiles);
}
}
assert(num_cachefiles);
CACHETABLE ct = toku_cachefile_get_cachetable(cachefiles[0]);
int r = toku_cachetable_local_checkpoint_for_commit(ct, txn, num_cachefiles, cachefiles);
assert_zero(r);
toku_free(cachefiles);
toku_poll_txn_progress_function(txn, TRUE, FALSE);
}
#endif
// not sure how the elements in the list are getting freed, so I am doing this
if (!txn->parent && !toku_list_empty(&txn->checkpoint_before_commit)) {
while (!toku_list_empty(&txn->checkpoint_before_commit)) {
......@@ -400,7 +371,7 @@ int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv
.r = 0,
.txn = txn,
};
yield(local_checkpoints_and_log_xcommit, &info, yieldv);
yield(log_xcommit, &info, yieldv);
r = info.r;
}
if (r!=0)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment