diff --git a/newbrt/brt-internal.h b/newbrt/brt-internal.h index 933e2ec29cb747864851820c73bd3bf89bc151b8..9b7f6771592dc22367615dad86d333b20160b4c9 100644 --- a/newbrt/brt-internal.h +++ b/newbrt/brt-internal.h @@ -753,7 +753,7 @@ int toku_verify_brtnode (BRT brt, MSN rootmsn, MSN parentmsn, void toku_brtheader_free (struct brt_header *h); int toku_brtheader_close (CACHEFILE cachefile, int fd, void *header_v, char **error_string, BOOL oplsn_valid, LSN oplsn) __attribute__((__warn_unused_result__)); -int toku_brtheader_begin_checkpoint (CACHEFILE cachefile, int fd, LSN checkpoint_lsn, void *header_v) __attribute__((__warn_unused_result__)); +int toku_brtheader_begin_checkpoint (LSN checkpoint_lsn, void *header_v) __attribute__((__warn_unused_result__)); int toku_brtheader_checkpoint (CACHEFILE cachefile, int fd, void *header_v) __attribute__((__warn_unused_result__)); int toku_brtheader_end_checkpoint (CACHEFILE cachefile, int fd, void *header_v) __attribute__((__warn_unused_result__)); int toku_maybe_upgrade_brt(BRT t) __attribute__((__warn_unused_result__)); diff --git a/newbrt/brt.c b/newbrt/brt.c index 80c0bd78d048f19358779903ad418ad16319a478..856c99a187cef35153f408c1234e4d5a261dc462 100644 --- a/newbrt/brt.c +++ b/newbrt/brt.c @@ -3667,7 +3667,7 @@ int toku_brt_create_cachetable(CACHETABLE *ct, long cachesize, LSN initial_lsn, // Create checkpoint-in-progress versions of header and translation (btt) (and fifo for now...). //Has access to fd (it is protected). int -toku_brtheader_begin_checkpoint (CACHEFILE UU(cachefile), int UU(fd), LSN checkpoint_lsn, void *header_v) { +toku_brtheader_begin_checkpoint (LSN checkpoint_lsn, void *header_v) { struct brt_header *h = header_v; int r = h->panic; if (r==0) { @@ -3897,7 +3897,7 @@ toku_brtheader_close (CACHEFILE cachefile, int fd, void *header_v, char **malloc } int r2; //assert(lsn.lsn!=0); - r2 = toku_brtheader_begin_checkpoint(cachefile, fd, lsn, header_v); + r2 = toku_brtheader_begin_checkpoint(lsn, header_v); if (r==0) r = r2; r2 = toku_brtheader_checkpoint(cachefile, fd, h); if (r==0) r = r2; diff --git a/newbrt/cachetable.c b/newbrt/cachetable.c index 04645cfc9898179640adafcd50c258e2550365e4..be34b730a29fe203fe737a8a26ce29c34ede301d 100644 --- a/newbrt/cachetable.c +++ b/newbrt/cachetable.c @@ -279,7 +279,7 @@ struct cachefile { int (*log_fassociate_during_checkpoint)(CACHEFILE cf, void *userdata); // When starting a checkpoint we must log all open files. int (*log_suppress_rollback_during_checkpoint)(CACHEFILE cf, void *userdata); // When starting a checkpoint we must log which files need rollbacks suppressed int (*close_userdata)(CACHEFILE cf, int fd, void *userdata, char **error_string, BOOL lsnvalid, LSN); // when closing the last reference to a cachefile, first call this function. - int (*begin_checkpoint_userdata)(CACHEFILE cf, int fd, LSN lsn_of_checkpoint, void *userdata); // before checkpointing cachefiles call this function. + int (*begin_checkpoint_userdata)(LSN lsn_of_checkpoint, void *userdata); // before checkpointing cachefiles call this function. int (*checkpoint_userdata)(CACHEFILE cf, int fd, void *userdata); // when checkpointing a cachefile, call this function. int (*end_checkpoint_userdata)(CACHEFILE cf, int fd, void *userdata); // after checkpointing cachefiles call this function. int (*note_pin_by_checkpoint)(CACHEFILE cf, void *userdata); // add a reference to the userdata to prevent it from being removed from memory @@ -287,11 +287,9 @@ struct cachefile { toku_pthread_cond_t openfd_wait; // openfd must wait until file is fully closed (purged from cachetable) if file is opened and closed simultaneously toku_pthread_cond_t closefd_wait; // toku_cachefile_of_iname_and_add_reference() must wait until file is fully closed (purged from cachetable) if run while file is being closed. u_int32_t closefd_waiting; // Number of threads waiting on closefd_wait (0 or 1, error otherwise). - struct rwlock checkpoint_lock; //protects checkpoint callback functions - //acts as fast mutex by only using 'write-lock' LSN most_recent_global_checkpoint_that_finished_early; LSN for_local_checkpoint; - enum cachefile_checkpoint_state checkpoint_state; //Protected by checkpoint_lock + enum cachefile_checkpoint_state checkpoint_state; int n_background_jobs; // how many jobs in the cachetable's kibbutz or // on the cleaner thread (anything @@ -758,7 +756,6 @@ int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd ct->cachefiles = newcf; rwlock_init(&newcf->fdlock); - rwlock_init(&newcf->checkpoint_lock); newcf->most_recent_global_checkpoint_that_finished_early = ZERO_LSN; newcf->for_local_checkpoint = ZERO_LSN; newcf->checkpoint_state = CS_NOT_IN_PROGRESS; @@ -956,9 +953,6 @@ int toku_cachefile_close (CACHEFILE *cfp, char **error_string, BOOL oplsn_valid, //assert(r == 0); rwlock_write_unlock(&cf->fdlock); rwlock_destroy(&cf->fdlock); - rwlock_write_lock(&cf->checkpoint_lock, ct->mutex); //Just to make sure we can get it - rwlock_write_unlock(&cf->checkpoint_lock); - rwlock_destroy(&cf->checkpoint_lock); assert(toku_list_empty(&cf->pairs_for_cachefile)); toku_free(cf); *cfp = NULL; @@ -1010,9 +1004,6 @@ int toku_cachefile_close (CACHEFILE *cfp, char **error_string, BOOL oplsn_valid, cachetable_lock(ct); rwlock_write_unlock(&cf->fdlock); rwlock_destroy(&cf->fdlock); - rwlock_write_lock(&cf->checkpoint_lock, ct->mutex); //Just to make sure we can get it - rwlock_write_unlock(&cf->checkpoint_lock); - rwlock_destroy(&cf->checkpoint_lock); assert(toku_list_empty(&cf->pairs_for_cachefile)); cachetable_unlock(ct); @@ -3313,14 +3304,10 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) { cachefiles_lock(ct); for (cf = ct->cachefiles_in_checkpoint; cf; cf=cf->next_in_checkpoint) { if (cf->begin_checkpoint_userdata) { - rwlock_prefer_read_lock(&cf->fdlock, ct->mutex); - rwlock_write_lock(&cf->checkpoint_lock, ct->mutex); assert(cf->checkpoint_state == CS_NOT_IN_PROGRESS); - int r = cf->begin_checkpoint_userdata(cf, cf->fd, ct->lsn_of_checkpoint_in_progress, cf->userdata); + int r = cf->begin_checkpoint_userdata(ct->lsn_of_checkpoint_in_progress, cf->userdata); assert(r==0); cf->checkpoint_state = CS_CALLED_BEGIN_CHECKPOINT; - rwlock_write_unlock(&cf->checkpoint_lock); - rwlock_read_unlock(&cf->fdlock); } } cachefiles_unlock(ct); @@ -3374,7 +3361,6 @@ toku_cachetable_end_checkpoint(CACHETABLE ct, TOKULOGGER logger, for (cf = ct->cachefiles_in_checkpoint; cf; cf=cf->next_in_checkpoint) { if (cf->checkpoint_userdata) { rwlock_prefer_read_lock(&cf->fdlock, ct->mutex); - rwlock_write_lock(&cf->checkpoint_lock, ct->mutex); if (!logger || ct->lsn_of_checkpoint_in_progress.lsn != cf->most_recent_global_checkpoint_that_finished_early.lsn) { assert(ct->lsn_of_checkpoint_in_progress.lsn >= cf->most_recent_global_checkpoint_that_finished_early.lsn); cachetable_unlock(ct); @@ -3389,7 +3375,6 @@ toku_cachetable_end_checkpoint(CACHETABLE ct, TOKULOGGER logger, else { assert(cf->checkpoint_state == CS_NOT_IN_PROGRESS); } - rwlock_write_unlock(&cf->checkpoint_lock); rwlock_read_unlock(&cf->fdlock); } } @@ -3403,7 +3388,6 @@ toku_cachetable_end_checkpoint(CACHETABLE ct, TOKULOGGER logger, for (cf = ct->cachefiles_in_checkpoint; cf; cf=cf->next_in_checkpoint) { if (cf->end_checkpoint_userdata) { rwlock_prefer_read_lock(&cf->fdlock, ct->mutex); - rwlock_write_lock(&cf->checkpoint_lock, ct->mutex); if (!logger || ct->lsn_of_checkpoint_in_progress.lsn != cf->most_recent_global_checkpoint_that_finished_early.lsn) { assert(ct->lsn_of_checkpoint_in_progress.lsn >= cf->most_recent_global_checkpoint_that_finished_early.lsn); cachetable_unlock(ct); @@ -3415,7 +3399,6 @@ toku_cachetable_end_checkpoint(CACHETABLE ct, TOKULOGGER logger, cachetable_lock(ct); } assert(cf->checkpoint_state == CS_NOT_IN_PROGRESS); - rwlock_write_unlock(&cf->checkpoint_lock); rwlock_read_unlock(&cf->fdlock); } } @@ -3618,7 +3601,7 @@ toku_cachefile_set_userdata (CACHEFILE cf, int (*log_suppress_rollback_during_checkpoint)(CACHEFILE, void*), int (*close_userdata)(CACHEFILE, int, void*, char**, BOOL, LSN), int (*checkpoint_userdata)(CACHEFILE, int, void*), - int (*begin_checkpoint_userdata)(CACHEFILE, int, LSN, void*), + int (*begin_checkpoint_userdata)(LSN, void*), int (*end_checkpoint_userdata)(CACHEFILE, int, void*), int (*note_pin_by_checkpoint)(CACHEFILE, void*), int (*note_unpin_by_checkpoint)(CACHEFILE, void*)) { @@ -3841,133 +3824,3 @@ cleaner_thread (void *cachetable_v) return 0; } - -#if 0 -int -toku_cachetable_local_checkpoint_for_commit (CACHETABLE ct, TOKUTXN txn, uint32_t n, CACHEFILE cachefiles[n]) { - cachetable_lock(ct); - local_checkpoint++; - local_checkpoint_files += n; - - LSN begin_checkpoint_lsn = ZERO_LSN; - uint32_t i; - TOKULOGGER logger = txn->logger; - CACHEFILE cf; - assert(logger); //Need transaction, so there must be a logger - { - int r = toku_log_local_txn_checkpoint(logger, &begin_checkpoint_lsn, 0, txn->txnid64); - assert(r==0); - } - for (i = 0; i < n; i++) { - cf = cachefiles[i]; - assert(cf->for_local_checkpoint.lsn == ZERO_LSN.lsn); - cf->for_local_checkpoint = begin_checkpoint_lsn; - } - - //Write out all dirty pairs. - { - uint32_t num_pairs = 0; - uint32_t list_size = 256; - PAIR *list = NULL; - XMALLOC_N(list_size, list); - PAIR p; - - //TODO: Determine if we can get rid of this use of pending_lock - rwlock_write_lock(&ct->pending_lock, ct->mutex); - for (i=0; i < ct->table_size; i++) { - for (p = ct->table[i]; p; p=p->hash_chain) { - //Only include pairs belonging to cachefiles in the checkpoint - if (p->cachefile->for_local_checkpoint.lsn != begin_checkpoint_lsn.lsn) continue; - if (p->state == CTPAIR_READING) - continue; // skip pairs being read as they will be clean - else if (p->state == CTPAIR_IDLE || p->state == CTPAIR_WRITING) { - if (p->dirty) { - ctpair_add_ref(p); - list[num_pairs] = p; - num_pairs++; - if (num_pairs == list_size) { - list_size *= 2; - XREALLOC_N(list_size, list); - } - } - } else - assert(0); - } - } - rwlock_write_unlock(&ct->pending_lock); - - for (i = 0; i < num_pairs; i++) { - p = list[i]; - if (!p->already_removed) { - write_pair_for_checkpoint(ct, p, TRUE); - } - ctpair_destroy(p); //Release our reference - // Don't need to unlock and lock cachetable, - // because the cachetable was unlocked and locked while the flush callback ran. - } - toku_free(list); - } - - for (i = 0; i < n; i++) { - int r; - cf = cachefiles[i]; - rwlock_prefer_read_lock(&cf->fdlock, ct->mutex); - rwlock_write_lock(&cf->checkpoint_lock, ct->mutex); - BOOL own_cachetable_lock = TRUE; - switch (cf->checkpoint_state) { - case CS_NOT_IN_PROGRESS: - break; - case CS_CALLED_BEGIN_CHECKPOINT: - cachetable_unlock(ct); - own_cachetable_lock = FALSE; - assert(cf->checkpoint_state == CS_CALLED_BEGIN_CHECKPOINT); - r = cf->checkpoint_userdata(cf, cf->fd, cf->userdata); - assert(r==0); - cf->checkpoint_state = CS_CALLED_CHECKPOINT; - //FALL THROUGH ON PURPOSE. - case CS_CALLED_CHECKPOINT: - if (own_cachetable_lock) - cachetable_unlock(ct); - //end_checkpoint fsyncs the fd, which needs the fdlock - assert(cf->checkpoint_state == CS_CALLED_CHECKPOINT); - r = cf->end_checkpoint_userdata(cf, cf->fd, cf->userdata); - assert(r==0); - cf->checkpoint_state = CS_NOT_IN_PROGRESS; - cachetable_lock(ct); - assert(cf->most_recent_global_checkpoint_that_finished_early.lsn < ct->lsn_of_checkpoint_in_progress.lsn); - cf->most_recent_global_checkpoint_that_finished_early = ct->lsn_of_checkpoint_in_progress; - local_checkpoint_during_checkpoint++; - break; - default: - assert(FALSE); - } - { //Begin - assert(cf->checkpoint_state == CS_NOT_IN_PROGRESS); - r = cf->begin_checkpoint_userdata(cf, cf->fd, begin_checkpoint_lsn, cf->userdata); - assert(r==0); - cf->checkpoint_state = CS_CALLED_BEGIN_CHECKPOINT; - } - { //Middle - assert(cf->checkpoint_state == CS_CALLED_BEGIN_CHECKPOINT); - r = cf->checkpoint_userdata(cf, cf->fd, cf->userdata); - assert(r==0); - cf->checkpoint_state = CS_CALLED_CHECKPOINT; - } - { //End - assert(cf->checkpoint_state == CS_CALLED_CHECKPOINT); - r = cf->end_checkpoint_userdata(cf, cf->fd, cf->userdata); - assert(r==0); - cf->checkpoint_state = CS_NOT_IN_PROGRESS; - } - assert(cf->for_local_checkpoint.lsn == begin_checkpoint_lsn.lsn); - cf->for_local_checkpoint = ZERO_LSN; - - rwlock_write_unlock(&cf->checkpoint_lock); - rwlock_read_unlock(&cf->fdlock); - } - - cachetable_unlock(ct); - - return 0; -} -#endif diff --git a/newbrt/cachetable.h b/newbrt/cachetable.h index d34482669e889b4f3b4c90c73d87318c112ad633..24497a8efa19407dde2cb44c042d73b02452c731 100644 --- a/newbrt/cachetable.h +++ b/newbrt/cachetable.h @@ -189,7 +189,7 @@ void toku_cachefile_set_userdata(CACHEFILE cf, void *userdata, int (*log_suppress_rollback_during_checkpoint)(CACHEFILE, void*), int (*close_userdata)(CACHEFILE, int, void*, char **/*error_string*/, BOOL, LSN), int (*checkpoint_userdata)(CACHEFILE, int, void*), - int (*begin_checkpoint_userdata)(CACHEFILE, int, LSN, void*), + int (*begin_checkpoint_userdata)(LSN, void*), int (*end_checkpoint_userdata)(CACHEFILE, int, void*), int (*note_pin_by_checkpoint)(CACHEFILE, void*), int (*note_unpin_by_checkpoint)(CACHEFILE, void*)); @@ -544,10 +544,6 @@ void toku_cachetable_set_lock_unlock_for_io (CACHETABLE ct, void (*ydb_lock_call // Effect: When we do I/O we may need to release locks (e.g., the ydb lock). These functions release the lock acquire the lock. -#if 0 -int toku_cachetable_local_checkpoint_for_commit(CACHETABLE ct, TOKUTXN txn, uint32_t n, CACHEFILE cachefiles[]); -#endif - void cachefile_kibbutz_enq (CACHEFILE cf, void (*f)(void*), void *extra); // Effect: Add a job to the cachetable's collection of work to do. Note that function f must call remove_background_job() diff --git a/newbrt/tests/cachetable-pin-checkpoint.c b/newbrt/tests/cachetable-pin-checkpoint.c index c77b53114efc6e394679b901f78cacbdd3e312e7..72f158fed67270362c5f7ab95794491e381fbc2d 100644 --- a/newbrt/tests/cachetable-pin-checkpoint.c +++ b/newbrt/tests/cachetable-pin-checkpoint.c @@ -266,8 +266,6 @@ static void *checkpoints(void *arg) { static int test_begin_checkpoint ( - CACHEFILE UU(cachefile), - int UU(fd), LSN UU(checkpoint_lsn), void* UU(header_v)) { diff --git a/newbrt/txn.c b/newbrt/txn.c index 48c9f717d96539e15077d528d0b183061a11099d..42a22748ef9c80984a3e5e8392dc9ff4e4487b12 100644 --- a/newbrt/txn.c +++ b/newbrt/txn.c @@ -338,38 +338,9 @@ BOOL toku_txn_requires_checkpoint(TOKUTXN txn) { //Called during a yield (ydb lock NOT held). static void -local_checkpoints_and_log_xcommit(void *thunk) { +log_xcommit(void *thunk) { struct xcommit_info *info = thunk; TOKUTXN txn = info->txn; - -#if 0 - if (!txn->parent && !toku_list_empty(&txn->checkpoint_before_commit)) { - toku_poll_txn_progress_function(txn, TRUE, TRUE); - //Do local checkpoints that must happen BEFORE logging xcommit - uint32_t num_cachefiles = 0; - uint32_t list_size = 16; - CACHEFILE *cachefiles= NULL; - XMALLOC_N(list_size, cachefiles); - while (!toku_list_empty(&txn->checkpoint_before_commit)) { - struct toku_list *list = toku_list_pop(&txn->checkpoint_before_commit); - struct brt_header *h = toku_list_struct(list, - struct brt_header, - checkpoint_before_commit_link); - cachefiles[num_cachefiles++] = h->cf; - if (num_cachefiles == list_size) { - list_size *= 2; - XREALLOC_N(list_size, cachefiles); - } - } - assert(num_cachefiles); - CACHETABLE ct = toku_cachefile_get_cachetable(cachefiles[0]); - - int r = toku_cachetable_local_checkpoint_for_commit(ct, txn, num_cachefiles, cachefiles); - assert_zero(r); - toku_free(cachefiles); - toku_poll_txn_progress_function(txn, TRUE, FALSE); - } -#endif // not sure how the elements in the list are getting freed, so I am doing this if (!txn->parent && !toku_list_empty(&txn->checkpoint_before_commit)) { while (!toku_list_empty(&txn->checkpoint_before_commit)) { @@ -400,7 +371,7 @@ int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv .r = 0, .txn = txn, }; - yield(local_checkpoints_and_log_xcommit, &info, yieldv); + yield(log_xcommit, &info, yieldv); r = info.r; } if (r!=0)