Commit 96741a6d authored by Dave Wells's avatar Dave Wells Committed by Yoni Fogel

fix logfile trimming code, change ydb.c to exclusively use toku_checkpoint

git-svn-id: file:///svn/toku/tokudb@14522 c7de825b-a66e-492c-adef-691d508d4ae1
parent 7c208708
...@@ -50,6 +50,8 @@ ...@@ -50,6 +50,8 @@
#include <toku_portability.h> #include <toku_portability.h>
#include "brttypes.h" #include "brttypes.h"
#include "cachetable.h" #include "cachetable.h"
#include "log-internal.h"
#include "logger.h"
#include "checkpoint.h" #include "checkpoint.h"
// footprint for debugging only // footprint for debugging only
...@@ -193,6 +195,7 @@ toku_checkpoint(CACHETABLE ct, TOKULOGGER logger, char **error_string, ...@@ -193,6 +195,7 @@ toku_checkpoint(CACHETABLE ct, TOKULOGGER logger, char **error_string,
checkpoint_footprint = 4; checkpoint_footprint = 4;
r = toku_cachetable_begin_checkpoint(ct, logger); r = toku_cachetable_begin_checkpoint(ct, logger);
LSN oldest_live_lsn = toku_logger_get_oldest_living_lsn(logger);
multi_operation_checkpoint_unlock(); multi_operation_checkpoint_unlock();
ydb_unlock(); ydb_unlock();
...@@ -203,6 +206,10 @@ toku_checkpoint(CACHETABLE ct, TOKULOGGER logger, char **error_string, ...@@ -203,6 +206,10 @@ toku_checkpoint(CACHETABLE ct, TOKULOGGER logger, char **error_string,
callback_f(extra); // callback is called with checkpoint_safe_lock still held callback_f(extra); // callback is called with checkpoint_safe_lock still held
r = toku_cachetable_end_checkpoint(ct, logger, error_string, callback2_f, extra2); r = toku_cachetable_end_checkpoint(ct, logger, error_string, callback2_f, extra2);
} }
if (r==0) {
LSN trim_lsn = (oldest_live_lsn.lsn < logger->checkpoint_lsn.lsn) ? oldest_live_lsn : logger->checkpoint_lsn;
r = toku_logger_maybe_trim_log(logger, trim_lsn);
}
checkpoint_footprint = 6; checkpoint_footprint = 6;
checkpoint_safe_checkpoint_unlock(); checkpoint_safe_checkpoint_unlock();
......
...@@ -84,10 +84,16 @@ int toku_logfilemgr_init(TOKULOGFILEMGR lfm, const char *log_dir) { ...@@ -84,10 +84,16 @@ int toku_logfilemgr_init(TOKULOGFILEMGR lfm, const char *log_dir) {
lf_info->index = index; lf_info->index = index;
// find last LSN // find last LSN
r = toku_logcursor_create_for_file(&cursor, log_dir, basename); r = toku_logcursor_create_for_file(&cursor, log_dir, basename);
assert(r == 0); if (r!=0)
return r;
r = toku_logcursor_last(cursor, &entry); r = toku_logcursor_last(cursor, &entry);
assert(r == 0); if ( r == 0 ) {
lf_info->maxlsn = toku_log_entry_get_lsn(entry); lf_info->maxlsn = toku_log_entry_get_lsn(entry);
}
else {
lf_info->maxlsn.lsn = 0;
}
// add to logfilemgr // add to logfilemgr
toku_logfilemgr_add_logfile_info(lfm, lf_info); toku_logfilemgr_add_logfile_info(lfm, lf_info);
toku_logcursor_destroy(&cursor); toku_logcursor_destroy(&cursor);
......
...@@ -350,12 +350,14 @@ static int open_logfile (TOKULOGGER logger) { ...@@ -350,12 +350,14 @@ static int open_logfile (TOKULOGGER logger) {
r = write_it(logger->fd, "tokulogg", 8); if (r!=8) return errno; r = write_it(logger->fd, "tokulogg", 8); if (r!=8) return errno;
int version_l = toku_htonl(log_format_version); //version MUST be in network byte order regardless of disk order int version_l = toku_htonl(log_format_version); //version MUST be in network byte order regardless of disk order
r = write_it(logger->fd, &version_l, 4); if (r!=4) return errno; r = write_it(logger->fd, &version_l, 4); if (r!=4) return errno;
TOKULOGFILEINFO lf_info = toku_malloc(sizeof(struct toku_logfile_info)); if ( logger->write_log_files ) {
if (lf_info == NULL) TOKULOGFILEINFO lf_info = toku_malloc(sizeof(struct toku_logfile_info));
return ENOMEM; if (lf_info == NULL)
lf_info->index = index; return ENOMEM;
lf_info->maxlsn = logger->written_lsn; // ?? not sure this is right, but better than 0 - DSW lf_info->index = index;
toku_logfilemgr_add_logfile_info(logger->logfilemgr, lf_info); lf_info->maxlsn = logger->written_lsn; // ?? not sure this is right, but better than 0 - DSW
toku_logfilemgr_add_logfile_info(logger->logfilemgr, lf_info);
}
logger->fsynced_lsn = logger->written_lsn; logger->fsynced_lsn = logger->written_lsn;
logger->n_in_file = 12; logger->n_in_file = 12;
return 0; return 0;
...@@ -369,25 +371,28 @@ static int delete_logfile(TOKULOGGER logger, long long index) { ...@@ -369,25 +371,28 @@ static int delete_logfile(TOKULOGGER logger, long long index) {
return r; return r;
} }
int toku_logger_maybe_trim_log(TOKULOGGER logger, LSN oldest_open_lsn) { int toku_logger_maybe_trim_log(TOKULOGGER logger, LSN trim_lsn) {
int r=0; int r=0;
TOKULOGFILEMGR lfm = logger->logfilemgr; TOKULOGFILEMGR lfm = logger->logfilemgr;
int n_logfiles = toku_logfilemgr_num_logfiles(lfm); int n_logfiles = toku_logfilemgr_num_logfiles(lfm);
TOKULOGFILEINFO lf_info = NULL; TOKULOGFILEINFO lf_info = NULL;
while ( n_logfiles > 1 ) { // don't delete current logfile
lf_info = toku_logfilemgr_get_oldest_logfile_info(lfm); if ( logger->write_log_files ) {
if ( lf_info->maxlsn.lsn > oldest_open_lsn.lsn ) { while ( n_logfiles > 1 ) { // don't delete current logfile
// file contains an open LSN, can't delete this or any newer log files lf_info = toku_logfilemgr_get_oldest_logfile_info(lfm);
break; if ( lf_info->maxlsn.lsn > trim_lsn.lsn ) {
} // file contains an open LSN, can't delete this or any newer log files
// need to save copy - toku_logfilemgr_delete_oldest_logfile_info free's the lf_info break;
long long index = lf_info->index; }
toku_logfilemgr_delete_oldest_logfile_info(lfm); // need to save copy - toku_logfilemgr_delete_oldest_logfile_info free's the lf_info
n_logfiles--; long index = lf_info->index;
r = delete_logfile(logger, index); toku_logfilemgr_delete_oldest_logfile_info(lfm);
if (r!=0) { n_logfiles--;
return r; r = delete_logfile(logger, index);
if (r!=0) {
return r;
}
} }
} }
return r; return r;
...@@ -458,7 +463,8 @@ static int do_write (TOKULOGGER logger, int do_fsync) { ...@@ -458,7 +463,8 @@ static int do_write (TOKULOGGER logger, int do_fsync) {
} }
logger->fsynced_lsn = logger->written_lsn; logger->fsynced_lsn = logger->written_lsn;
} }
toku_logfilemgr_update_last_lsn(logger->logfilemgr, logger->written_lsn); if ( logger->write_log_files )
toku_logfilemgr_update_last_lsn(logger->logfilemgr, logger->written_lsn);
return 0; return 0;
panic: panic:
toku_logger_panic(logger, r); toku_logger_panic(logger, r);
...@@ -976,3 +982,14 @@ TXNID toku_logger_get_oldest_living_xid(TOKULOGGER logger) { ...@@ -976,3 +982,14 @@ TXNID toku_logger_get_oldest_living_xid(TOKULOGGER logger) {
return rval; return rval;
} }
LSN toku_logger_get_oldest_living_lsn(TOKULOGGER logger) {
LSN lsn = {0};
if (logger) {
if (logger->oldest_living_xid == TXNID_NONE_LIVING)
lsn = MAX_LSN;
else
lsn.lsn = logger->oldest_living_xid;
}
return lsn;
}
...@@ -38,7 +38,7 @@ int toku_logger_restart(TOKULOGGER logger, LSN lastlsn); ...@@ -38,7 +38,7 @@ int toku_logger_restart(TOKULOGGER logger, LSN lastlsn);
// Effect: find all of the log files whose largest LSN is smaller than the // Effect: find all of the log files whose largest LSN is smaller than the
// given LSN and delete them. // given LSN and delete them.
// Returns: 0 if success // Returns: 0 if success
int toku_logger_maybe_trim_log(TOKULOGGER logger, LSN lsn); int toku_logger_maybe_trim_log(TOKULOGGER logger, LSN oldest_open_lsn);
int toku_logger_log_fcreate (TOKUTXN txn, const char *fname, FILENUM filenum, u_int32_t mode, u_int32_t flags); int toku_logger_log_fcreate (TOKUTXN txn, const char *fname, FILENUM filenum, u_int32_t mode, u_int32_t flags);
int toku_logger_log_fopen (TOKUTXN txn, const char * fname, FILENUM filenum); int toku_logger_log_fopen (TOKUTXN txn, const char * fname, FILENUM filenum);
...@@ -85,5 +85,6 @@ TOKUTXN toku_logger_txn_parent (TOKUTXN txn); ...@@ -85,5 +85,6 @@ TOKUTXN toku_logger_txn_parent (TOKUTXN txn);
void toku_logger_note_checkpoint(TOKULOGGER logger, LSN lsn); void toku_logger_note_checkpoint(TOKULOGGER logger, LSN lsn);
TXNID toku_logger_get_oldest_living_xid(TOKULOGGER logger); TXNID toku_logger_get_oldest_living_xid(TOKULOGGER logger);
LSN toku_logger_get_oldest_living_lsn(TOKULOGGER logger);
#endif #endif
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
#include "includes.h" #include "includes.h"
#include "log_header.h" #include "log_header.h"
#include "varray.h" #include "varray.h"
#include "checkpoint.h"
static int toku_recover_trace = 0; static int toku_recover_trace = 0;
...@@ -760,12 +761,17 @@ static int do_recovery(RECOVER_ENV renv, const char *data_dir, const char *log_d ...@@ -760,12 +761,17 @@ static int do_recovery(RECOVER_ENV renv, const char *data_dir, const char *log_d
assert(r == 0); assert(r == 0);
// checkpoint // checkpoint
#if 1
r = toku_checkpoint(renv->ct, renv->logger, NULL, NULL, NULL, NULL, NULL);
assert(r == 0);
#else
// TODO: checkpoint locks needed here? // TODO: checkpoint locks needed here?
r = toku_cachetable_begin_checkpoint(renv->ct, renv->logger); r = toku_cachetable_begin_checkpoint(renv->ct, renv->logger);
assert(r == 0); assert(r == 0);
// TODO: what about the error_string? // TODO: what about the error_string?
r = toku_cachetable_end_checkpoint(renv->ct, renv->logger, NULL, NULL, NULL); r = toku_cachetable_end_checkpoint(renv->ct, renv->logger, NULL, NULL, NULL);
assert(r == 0); assert(r == 0);
#endif
r = chdir(org_wd); r = chdir(org_wd);
assert(r == 0); assert(r == 0);
......
...@@ -315,7 +315,9 @@ static int do_recovery (DB_ENV *env) { ...@@ -315,7 +315,9 @@ static int do_recovery (DB_ENV *env) {
} else { } else {
logdir = toku_strdup(env->i->dir); logdir = toku_strdup(env->i->dir);
} }
toku_ydb_unlock();
int r = tokudb_recover(datadir, logdir, env->i->bt_compare, env->i->dup_compare); int r = tokudb_recover(datadir, logdir, env->i->bt_compare, env->i->dup_compare);
toku_ydb_lock();
toku_free(logdir); toku_free(logdir);
return r; return r;
} }
...@@ -467,8 +469,16 @@ static int toku_env_close(DB_ENV * env, u_int32_t flags) { ...@@ -467,8 +469,16 @@ static int toku_env_close(DB_ENV * env, u_int32_t flags) {
toku_ydb_unlock(); // ydb lock must not be held when shutting down minicron toku_ydb_unlock(); // ydb lock must not be held when shutting down minicron
toku_cachetable_minicron_shutdown(env->i->cachetable); toku_cachetable_minicron_shutdown(env->i->cachetable);
if (env->i->logger) { if (env->i->logger) {
#if 1
r0 = toku_checkpoint(env->i->cachetable, env->i->logger, NULL, NULL, NULL, NULL, NULL); r0 = toku_checkpoint(env->i->cachetable, env->i->logger, NULL, NULL, NULL, NULL, NULL);
assert(r0 == 0); assert(r0 == 0);
#else
// TODO locks?
r0 = toku_cachetable_begin_checkpoint(env->i->cachetable, env->i->logger);
if (r0 == 0)
toku_cachetable_end_checkpoint(env->i->cachetable, env->i->logger, NULL, NULL, NULL);
assert(r0 == 0);
#endif
r0 = toku_logger_shutdown(env->i->logger); assert(r0 == 0); r0 = toku_logger_shutdown(env->i->logger); assert(r0 == 0);
} }
toku_ydb_lock(); toku_ydb_lock();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment