Commit c9e96a87 authored by John Esmet's avatar John Esmet Committed by Yoni Fogel

refs #5500 fsyncs do not fail, so the userdata callbacks do not fail


git-svn-id: file:///svn/toku/tokudb@48236 c7de825b-a66e-492c-adef-691d508d4ae1
parent c652e648
...@@ -110,9 +110,9 @@ struct cachefile { ...@@ -110,9 +110,9 @@ struct cachefile {
void *userdata; void *userdata;
void (*log_fassociate_during_checkpoint)(CACHEFILE cf, void *userdata); // When starting a checkpoint we must log all open files. void (*log_fassociate_during_checkpoint)(CACHEFILE cf, void *userdata); // When starting a checkpoint we must log all open files.
void (*log_suppress_rollback_during_checkpoint)(CACHEFILE cf, void *userdata); // When starting a checkpoint we must log which files need rollbacks suppressed void (*log_suppress_rollback_during_checkpoint)(CACHEFILE cf, void *userdata); // When starting a checkpoint we must log which files need rollbacks suppressed
int (*close_userdata)(CACHEFILE cf, int fd, void *userdata, bool lsnvalid, LSN); // when closing the last reference to a cachefile, first call this function. void (*close_userdata)(CACHEFILE cf, int fd, void *userdata, bool lsnvalid, LSN); // when closing the last reference to a cachefile, first call this function.
void (*begin_checkpoint_userdata)(LSN lsn_of_checkpoint, void *userdata); // before checkpointing cachefiles call this function. void (*begin_checkpoint_userdata)(LSN lsn_of_checkpoint, void *userdata); // before checkpointing cachefiles call this function.
int (*checkpoint_userdata)(CACHEFILE cf, int fd, void *userdata); // when checkpointing a cachefile, call this function. void (*checkpoint_userdata)(CACHEFILE cf, int fd, void *userdata); // when checkpointing a cachefile, call this function.
void (*end_checkpoint_userdata)(CACHEFILE cf, int fd, void *userdata); // after checkpointing cachefiles call this function. void (*end_checkpoint_userdata)(CACHEFILE cf, int fd, void *userdata); // after checkpointing cachefiles call this function.
void (*note_pin_by_checkpoint)(CACHEFILE cf, void *userdata); // add a reference to the userdata to prevent it from being removed from memory void (*note_pin_by_checkpoint)(CACHEFILE cf, void *userdata); // add a reference to the userdata to prevent it from being removed from memory
void (*note_unpin_by_checkpoint)(CACHEFILE cf, void *userdata); // add a reference to the userdata to prevent it from being removed from memory void (*note_unpin_by_checkpoint)(CACHEFILE cf, void *userdata); // add a reference to the userdata to prevent it from being removed from memory
......
...@@ -316,13 +316,14 @@ int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd ...@@ -316,13 +316,14 @@ int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd
const char *fname_in_env, const char *fname_in_env,
FILENUM filenum) { FILENUM filenum) {
int r; int r;
CACHEFILE extant; CACHEFILE extant, newcf;
struct fileid fileid; struct fileid fileid;
assert(filenum.fileid != FILENUM_NONE.fileid); assert(filenum.fileid != FILENUM_NONE.fileid);
r = toku_os_get_unique_file_id(fd, &fileid); r = toku_os_get_unique_file_id(fd, &fileid);
if (r != 0) { if (r != 0) {
r=get_error_errno(); close(fd); // no change for t:2444 r = get_error_errno();
close(fd);
return r; return r;
} }
ct->cf_list.write_lock(); ct->cf_list.write_lock();
...@@ -348,20 +349,17 @@ int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd ...@@ -348,20 +349,17 @@ int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd
invariant(extant->filenum.fileid != filenum.fileid); invariant(extant->filenum.fileid != filenum.fileid);
} }
//File is not open. Make a new cachefile. // File is not open. Make a new cachefile.
{ XCALLOC(newcf);
// create a new cachefile entry in the cachetable newcf->cachetable = ct;
CACHEFILE XCALLOC(newcf); newcf->filenum = filenum;
newcf->cachetable = ct; cachefile_init_filenum(newcf, fd, fname_in_env, fileid);
newcf->filenum = filenum; newcf->next = ct->cf_list.m_head;
cachefile_init_filenum(newcf, fd, fname_in_env, fileid); ct->cf_list.m_head = newcf;
newcf->next = ct->cf_list.m_head;
ct->cf_list.m_head = newcf; bjm_init(&newcf->bjm);
*cfptr = newcf;
bjm_init(&newcf->bjm); r = 0;
*cfptr = newcf;
r = 0;
}
exit: exit:
ct->cf_list.write_unlock(); ct->cf_list.write_unlock();
return r; return r;
...@@ -374,30 +372,34 @@ int toku_cachetable_openf (CACHEFILE *cfptr, CACHETABLE ct, const char *fname_in ...@@ -374,30 +372,34 @@ int toku_cachetable_openf (CACHEFILE *cfptr, CACHETABLE ct, const char *fname_in
char *fname_in_cwd = toku_construct_full_name(2, ct->env_dir, fname_in_env); char *fname_in_cwd = toku_construct_full_name(2, ct->env_dir, fname_in_env);
int fd = open(fname_in_cwd, flags+O_BINARY, mode); int fd = open(fname_in_cwd, flags+O_BINARY, mode);
int r; int r;
if (fd<0) r = get_error_errno(); if (fd < 0) {
else r = toku_cachetable_openfd (cfptr, ct, fd, fname_in_env); r = get_error_errno();
} else {
r = toku_cachetable_openfd (cfptr, ct, fd, fname_in_env);
}
toku_free(fname_in_cwd); toku_free(fname_in_cwd);
return r; return r;
} }
//Test-only function //Test-only function
int toku_cachefile_set_fd (CACHEFILE cf, int fd, const char *fname_in_env) { int toku_cachefile_set_fd (CACHEFILE cf, int fd, const char *fname_in_env) {
int r;
struct fileid fileid; struct fileid fileid;
r=toku_os_get_unique_file_id(fd, &fileid); int r = toku_os_get_unique_file_id(fd, &fileid);
if (r != 0) { if (r != 0) {
r=get_error_errno(); close(fd); goto cleanup; // no change for t:2444 r = get_error_errno();
} close(fd);
if (cf->close_userdata && (r = cf->close_userdata(cf, cf->fd, cf->userdata, false, ZERO_LSN))) {
goto cleanup; goto cleanup;
} }
if (cf->close_userdata) {
cf->close_userdata(cf, cf->fd, cf->userdata, false, ZERO_LSN);
}
cf->close_userdata = NULL; cf->close_userdata = NULL;
cf->checkpoint_userdata = NULL; cf->checkpoint_userdata = NULL;
cf->begin_checkpoint_userdata = NULL; cf->begin_checkpoint_userdata = NULL;
cf->end_checkpoint_userdata = NULL; cf->end_checkpoint_userdata = NULL;
cf->userdata = NULL; cf->userdata = NULL;
close(cf->fd); // no change for t:2444 close(cf->fd);
cf->fd = -1; cf->fd = -1;
if (cf->fname_in_env) { if (cf->fname_in_env) {
toku_free(cf->fname_in_env); toku_free(cf->fname_in_env);
...@@ -441,7 +443,6 @@ static void remove_cf_from_cachefiles_list (CACHEFILE cf) { ...@@ -441,7 +443,6 @@ static void remove_cf_from_cachefiles_list (CACHEFILE cf) {
// TODO: (Zardosht) review locking of this function carefully in code review // TODO: (Zardosht) review locking of this function carefully in code review
int int
toku_cachefile_close(CACHEFILE *cfp, bool oplsn_valid, LSN oplsn) { toku_cachefile_close(CACHEFILE *cfp, bool oplsn_valid, LSN oplsn) {
int r, close_error = 0;
CACHEFILE cf = *cfp; CACHEFILE cf = *cfp;
CACHETABLE ct = cf->cachetable; CACHETABLE ct = cf->cachetable;
...@@ -458,7 +459,7 @@ toku_cachefile_close(CACHEFILE *cfp, bool oplsn_valid, LSN oplsn) { ...@@ -458,7 +459,7 @@ toku_cachefile_close(CACHEFILE *cfp, bool oplsn_valid, LSN oplsn) {
// Call the close userdata callback to notify the client this cachefile // Call the close userdata callback to notify the client this cachefile
// and its underlying file are going to be closed // and its underlying file are going to be closed
if (cf->close_userdata) { if (cf->close_userdata) {
close_error = cf->close_userdata(cf, cf->fd, cf->userdata, oplsn_valid, oplsn); cf->close_userdata(cf, cf->fd, cf->userdata, oplsn_valid, oplsn);
} }
remove_cf_from_cachefiles_list(cf); remove_cf_from_cachefiles_list(cf);
...@@ -467,7 +468,7 @@ toku_cachefile_close(CACHEFILE *cfp, bool oplsn_valid, LSN oplsn) { ...@@ -467,7 +468,7 @@ toku_cachefile_close(CACHEFILE *cfp, bool oplsn_valid, LSN oplsn) {
// fsync and close the fd. // fsync and close the fd.
toku_file_fsync_without_accounting(cf->fd); toku_file_fsync_without_accounting(cf->fd);
r = close(cf->fd); int r = close(cf->fd);
assert(r == 0); assert(r == 0);
// Unlink the file if the bit was set // Unlink the file if the bit was set
...@@ -480,11 +481,8 @@ toku_cachefile_close(CACHEFILE *cfp, bool oplsn_valid, LSN oplsn) { ...@@ -480,11 +481,8 @@ toku_cachefile_close(CACHEFILE *cfp, bool oplsn_valid, LSN oplsn) {
toku_free(cf->fname_in_env); toku_free(cf->fname_in_env);
toku_free(cf); toku_free(cf);
// If close userdata returned nonzero, pass that error code to the caller // TODO: can't fail
if (close_error != 0) { return 0;
r = close_error;
}
return r;
} }
// //
...@@ -2920,8 +2918,8 @@ toku_cachefile_set_userdata (CACHEFILE cf, ...@@ -2920,8 +2918,8 @@ toku_cachefile_set_userdata (CACHEFILE cf,
void *userdata, void *userdata,
void (*log_fassociate_during_checkpoint)(CACHEFILE, void*), void (*log_fassociate_during_checkpoint)(CACHEFILE, void*),
void (*log_suppress_rollback_during_checkpoint)(CACHEFILE, void*), void (*log_suppress_rollback_during_checkpoint)(CACHEFILE, void*),
int (*close_userdata)(CACHEFILE, int, void*, bool, LSN), void (*close_userdata)(CACHEFILE, int, void*, bool, LSN),
int (*checkpoint_userdata)(CACHEFILE, int, void*), void (*checkpoint_userdata)(CACHEFILE, int, void*),
void (*begin_checkpoint_userdata)(LSN, void*), void (*begin_checkpoint_userdata)(LSN, void*),
void (*end_checkpoint_userdata)(CACHEFILE, int, void*), void (*end_checkpoint_userdata)(CACHEFILE, int, void*),
void (*note_pin_by_checkpoint)(CACHEFILE, void*), void (*note_pin_by_checkpoint)(CACHEFILE, void*),
...@@ -4397,9 +4395,8 @@ void checkpointer::checkpoint_userdata(CACHEFILE* checkpoint_cfs) { ...@@ -4397,9 +4395,8 @@ void checkpointer::checkpoint_userdata(CACHEFILE* checkpoint_cfs) {
assert(cf->for_checkpoint); assert(cf->for_checkpoint);
assert(cf->checkpoint_userdata); assert(cf->checkpoint_userdata);
toku_cachetable_set_checkpointing_user_data_status(1); toku_cachetable_set_checkpointing_user_data_status(1);
int r = cf->checkpoint_userdata(cf, cf->fd, cf->userdata); cf->checkpoint_userdata(cf, cf->fd, cf->userdata);
toku_cachetable_set_checkpointing_user_data_status(0); toku_cachetable_set_checkpointing_user_data_status(0);
assert(r==0);
} }
} }
......
...@@ -189,8 +189,8 @@ typedef void (*CACHETABLE_REMOVE_KEY)(CACHEKEY* cachekey, bool for_checkpoint, v ...@@ -189,8 +189,8 @@ typedef void (*CACHETABLE_REMOVE_KEY)(CACHEKEY* cachekey, bool for_checkpoint, v
void toku_cachefile_set_userdata(CACHEFILE cf, void *userdata, void toku_cachefile_set_userdata(CACHEFILE cf, void *userdata,
void (*log_fassociate_during_checkpoint)(CACHEFILE, void*), void (*log_fassociate_during_checkpoint)(CACHEFILE, void*),
void (*log_suppress_rollback_during_checkpoint)(CACHEFILE, void*), void (*log_suppress_rollback_during_checkpoint)(CACHEFILE, void*),
int (*close_userdata)(CACHEFILE, int, void*, bool, LSN), void (*close_userdata)(CACHEFILE, int, void*, bool, LSN),
int (*checkpoint_userdata)(CACHEFILE, int, void*), void (*checkpoint_userdata)(CACHEFILE, int, void*),
void (*begin_checkpoint_userdata)(LSN, void*), void (*begin_checkpoint_userdata)(LSN, void*),
void (*end_checkpoint_userdata)(CACHEFILE, int, void*), void (*end_checkpoint_userdata)(CACHEFILE, int, void*),
void (*note_pin_by_checkpoint)(CACHEFILE, void*), void (*note_pin_by_checkpoint)(CACHEFILE, void*),
......
...@@ -164,8 +164,7 @@ ft_hack_highest_unused_msn_for_upgrade_for_checkpoint(FT ft) { ...@@ -164,8 +164,7 @@ ft_hack_highest_unused_msn_for_upgrade_for_checkpoint(FT ft) {
// checkpoints hold references to FTs and so they cannot be closed during a checkpoint. // checkpoints hold references to FTs and so they cannot be closed during a checkpoint.
// ft_close is not reentrant for a single FT // ft_close is not reentrant for a single FT
// end_checkpoint is not reentrant period // end_checkpoint is not reentrant period
static int static void ft_checkpoint (CACHEFILE cf, int fd, void *header_v) {
ft_checkpoint (CACHEFILE cf, int fd, void *header_v) {
FT ft = (FT) header_v; FT ft = (FT) header_v;
FT_HEADER ch = ft->checkpoint_header; FT_HEADER ch = ft->checkpoint_header;
//printf("%s:%d allocated_limit=%lu writing queue to %lu\n", __FILE__, __LINE__, //printf("%s:%d allocated_limit=%lu writing queue to %lu\n", __FILE__, __LINE__,
...@@ -195,8 +194,6 @@ ft_checkpoint (CACHEFILE cf, int fd, void *header_v) { ...@@ -195,8 +194,6 @@ ft_checkpoint (CACHEFILE cf, int fd, void *header_v) {
else { else {
toku_block_translation_note_skipped_checkpoint(ft->blocktable); toku_block_translation_note_skipped_checkpoint(ft->blocktable);
} }
// TODO can't fail
return 0;
} }
// maps to cf->end_checkpoint_userdata // maps to cf->end_checkpoint_userdata
...@@ -215,8 +212,7 @@ static void ft_end_checkpoint (CACHEFILE UU(cachefile), int fd, void *header_v) ...@@ -215,8 +212,7 @@ static void ft_end_checkpoint (CACHEFILE UU(cachefile), int fd, void *header_v)
// maps to cf->close_userdata // maps to cf->close_userdata
// Has access to fd (it is protected). // Has access to fd (it is protected).
static int static void ft_close(CACHEFILE cachefile, int fd, void *header_v, bool oplsn_valid, LSN oplsn) {
ft_close(CACHEFILE cachefile, int fd, void *header_v, bool oplsn_valid, LSN oplsn) {
FT ft = (FT) header_v; FT ft = (FT) header_v;
assert(ft->h->type == FT_CURRENT); assert(ft->h->type == FT_CURRENT);
// We already have exclusive access to this field already, so skip the locking. // We already have exclusive access to this field already, so skip the locking.
...@@ -250,15 +246,11 @@ ft_close(CACHEFILE cachefile, int fd, void *header_v, bool oplsn_valid, LSN opls ...@@ -250,15 +246,11 @@ ft_close(CACHEFILE cachefile, int fd, void *header_v, bool oplsn_valid, LSN opls
assert(logger->rollback_cachefile != cachefile); assert(logger->rollback_cachefile != cachefile);
} }
ft_begin_checkpoint(lsn, header_v); ft_begin_checkpoint(lsn, header_v);
// TODO: can't fail ft_checkpoint(cachefile, fd, ft);
int r = ft_checkpoint(cachefile, fd, ft);
invariant(r == 0);
ft_end_checkpoint(cachefile, fd, header_v); ft_end_checkpoint(cachefile, fd, header_v);
assert(!ft->h->dirty); // dirty bit should be cleared by begin_checkpoint and never set again (because we're closing the dictionary) assert(!ft->h->dirty); // dirty bit should be cleared by begin_checkpoint and never set again (because we're closing the dictionary)
} }
toku_ft_free(ft); toku_ft_free(ft);
// TODO: can't fail
return 0;
} }
// maps to cf->note_pin_by_checkpoint // maps to cf->note_pin_by_checkpoint
......
...@@ -426,11 +426,6 @@ test_begin_checkpoint ( ...@@ -426,11 +426,6 @@ test_begin_checkpoint (
memcpy(checkpointed_data, data, sizeof(int64_t)*NUM_ELEMENTS); memcpy(checkpointed_data, data, sizeof(int64_t)*NUM_ELEMENTS);
} }
static int
dummy_int_checkpoint_userdata(CACHEFILE UU(cf), int UU(n), void* UU(extra)) {
return 0;
}
static void sum_vals(void) { static void sum_vals(void) {
int64_t sum = 0; int64_t sum = 0;
for (int i = 0; i < NUM_ELEMENTS; i++) { for (int i = 0; i < NUM_ELEMENTS; i++) {
...@@ -478,7 +473,7 @@ cachetable_test (void) { ...@@ -478,7 +473,7 @@ cachetable_test (void) {
&dummy_log_fassociate, &dummy_log_fassociate,
&dummy_log_rollback, &dummy_log_rollback,
&dummy_close_usr, &dummy_close_usr,
dummy_int_checkpoint_userdata, &dummy_chckpnt_usr,
test_begin_checkpoint, // called in begin_checkpoint test_begin_checkpoint, // called in begin_checkpoint
&dummy_end, &dummy_end,
&dummy_note_pin, &dummy_note_pin,
......
...@@ -9,22 +9,14 @@ ...@@ -9,22 +9,14 @@
// //
// Dummy callbacks for checkpointing // Dummy callbacks for checkpointing
// //
static void dummy_log_fassociate(CACHEFILE UU(cf), void* UU(p)) static void dummy_log_fassociate(CACHEFILE UU(cf), void* UU(p)) { }
{ return; } static void dummy_log_rollback(CACHEFILE UU(cf), void* UU(p)) { }
static void dummy_log_rollback(CACHEFILE UU(cf), void* UU(p)) static void dummy_close_usr(CACHEFILE UU(cf), int UU(i), void* UU(p), bool UU(b), LSN UU(lsn)) { }
{ return; } static void dummy_chckpnt_usr(CACHEFILE UU(cf), int UU(i), void* UU(p)) { }
static int dummy_close_usr(CACHEFILE UU(cf), int UU(i), void* UU(p), bool UU(b), LSN UU(lsn)) static void dummy_begin(LSN UU(lsn), void* UU(p)) { }
{ return 0; } static void dummy_end(CACHEFILE UU(cf), int UU(i), void* UU(p)) { }
static int dummy_chckpnt_usr(CACHEFILE UU(cf), int UU(i), void* UU(p)) static void dummy_note_pin(CACHEFILE UU(cf), void* UU(p)) { }
{ return 0; } static void dummy_note_unpin(CACHEFILE UU(cf), void* UU(p)) { }
static void dummy_begin(LSN UU(lsn), void* UU(p))
{ return; }
static void dummy_end(CACHEFILE UU(cf), int UU(i), void* UU(p))
{ return; }
static void dummy_note_pin(CACHEFILE UU(cf), void* UU(p))
{ return; }
static void dummy_note_unpin(CACHEFILE UU(cf), void* UU(p))
{ return; }
// //
// Helper function to set dummy functions in given cachefile. // Helper function to set dummy functions in given cachefile.
...@@ -33,14 +25,14 @@ static UU() void ...@@ -33,14 +25,14 @@ static UU() void
create_dummy_functions(CACHEFILE cf) create_dummy_functions(CACHEFILE cf)
{ {
void *ud = NULL; void *ud = NULL;
toku_cachefile_set_userdata (cf, toku_cachefile_set_userdata(cf,
ud, ud,
&dummy_log_fassociate, &dummy_log_fassociate,
&dummy_log_rollback, &dummy_log_rollback,
&dummy_close_usr, &dummy_close_usr,
&dummy_chckpnt_usr, &dummy_chckpnt_usr,
&dummy_begin, &dummy_begin,
&dummy_end, &dummy_end,
&dummy_note_pin, &dummy_note_pin,
&dummy_note_unpin); &dummy_note_unpin);
}; };
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment