Commit 3523924d authored by Dave Wells's avatar Dave Wells Committed by Yoni Fogel

logcursor changes, fix LSNs, make progress on tests [t:1926]

git-svn-id: file:///svn/toku/tokudb@13970 c7de825b-a66e-492c-adef-691d508d4ae1
parent 9197483c
...@@ -141,6 +141,7 @@ static inline int toku_logsizeof_BYTESTRING (BYTESTRING bs) { ...@@ -141,6 +141,7 @@ static inline int toku_logsizeof_BYTESTRING (BYTESTRING bs) {
return 4+bs.len; return 4+bs.len;
} }
#if 0
static inline int toku_logsizeof_LOGGEDBRTHEADER (LOGGEDBRTHEADER bs) { static inline int toku_logsizeof_LOGGEDBRTHEADER (LOGGEDBRTHEADER bs) {
int in_both = 4+4+4+8+8+4+8; int in_both = 4+4+4+8+8+4+8;
in_both += 8; // for the number of block headers in_both += 8; // for the number of block headers
...@@ -152,6 +153,7 @@ static inline int toku_logsizeof_LOGGEDBRTHEADER (LOGGEDBRTHEADER bs) { ...@@ -152,6 +153,7 @@ static inline int toku_logsizeof_LOGGEDBRTHEADER (LOGGEDBRTHEADER bs) {
static inline int toku_logsizeof_INTPAIRARRAY (INTPAIRARRAY pa) { static inline int toku_logsizeof_INTPAIRARRAY (INTPAIRARRAY pa) {
return 4+(4+4)*pa.size; return 4+(4+4)*pa.size;
} }
#endif
static inline char *fixup_fname(BYTESTRING *f) { static inline char *fixup_fname(BYTESTRING *f) {
assert(f->len>0); assert(f->len>0);
......
...@@ -5,8 +5,10 @@ ...@@ -5,8 +5,10 @@
#include "includes.h" #include "includes.h"
enum lc_direction { LC_FORWARD, LC_BACKWARD, LC_FIRST, LC_LAST };
struct toku_logcursor { struct toku_logcursor {
char *logdir; char *logdir; // absolute directory name
char **logfiles; char **logfiles;
int n_logfiles; int n_logfiles;
int cur_logfiles_index; int cur_logfiles_index;
...@@ -14,8 +16,27 @@ struct toku_logcursor { ...@@ -14,8 +16,27 @@ struct toku_logcursor {
BOOL is_open; BOOL is_open;
struct log_entry entry; struct log_entry entry;
BOOL entry_valid; BOOL entry_valid;
LSN cur_lsn;
enum lc_direction last_direction;
}; };
#define LC_LSN_ERROR (-1)
static void lc_print_logcursor (TOKULOGCURSOR lc) __attribute__((unused));
static void lc_print_logcursor (TOKULOGCURSOR lc) {
printf("lc = %p\n", lc);
printf(" logdir = %s\n", lc->logdir);
printf(" logfiles = %p\n", lc->logfiles);
for (int lf=0;lf<lc->n_logfiles;lf++) {
printf(" logfile[%d] = %p (%s)\n", lf, lc->logfiles[lf], lc->logfiles[lf]);
}
printf(" n_logfiles = %d\n", lc->n_logfiles);
printf(" cur_logfiles_index = %d\n", lc->cur_logfiles_index);
printf(" cur_fp = %p\n", lc->cur_fp);
printf(" cur_lsn = %lu\n", lc->cur_lsn.lsn);
printf(" last_direction = %d\n", lc->last_direction);
}
static int lc_close_cur_logfile(TOKULOGCURSOR lc) { static int lc_close_cur_logfile(TOKULOGCURSOR lc) {
int r=0; int r=0;
if ( lc->is_open ) { if ( lc->is_open ) {
...@@ -28,6 +49,7 @@ static int lc_close_cur_logfile(TOKULOGCURSOR lc) { ...@@ -28,6 +49,7 @@ static int lc_close_cur_logfile(TOKULOGCURSOR lc) {
static int lc_open_logfile(TOKULOGCURSOR lc, int index) { static int lc_open_logfile(TOKULOGCURSOR lc, int index) {
int r=0; int r=0;
assert( !lc->is_open ); assert( !lc->is_open );
if( index == -1 || index >= lc->n_logfiles) return DB_NOTFOUND;
lc->cur_fp = fopen(lc->logfiles[index], "r"); lc->cur_fp = fopen(lc->logfiles[index], "r");
if ( lc->cur_fp == NULL ) if ( lc->cur_fp == NULL )
return DB_NOTFOUND; return DB_NOTFOUND;
...@@ -36,13 +58,25 @@ static int lc_open_logfile(TOKULOGCURSOR lc, int index) { ...@@ -36,13 +58,25 @@ static int lc_open_logfile(TOKULOGCURSOR lc, int index) {
r = toku_read_logmagic(lc->cur_fp, &version); r = toku_read_logmagic(lc->cur_fp, &version);
if (r!=0) if (r!=0)
return DB_BADFORMAT; return DB_BADFORMAT;
if (version != 1) if (version != TOKU_LOG_VERSION)
return DB_BADFORMAT; return DB_BADFORMAT;
// mark as open // mark as open
lc->is_open = TRUE; lc->is_open = TRUE;
return r; return r;
} }
static int lc_check_lsn(TOKULOGCURSOR lc, int dir) {
int r=0;
LSN lsn = toku_log_entry_get_lsn(&(lc->entry));
if (((dir == LC_FORWARD) && ( lsn.lsn != lc->cur_lsn.lsn + 1 )) ||
((dir == LC_BACKWARD) && ( lsn.lsn != lc->cur_lsn.lsn - 1 ))) {
fprintf(stderr, "Bad LSN : direction = %d, lsn.lsn = %"PRIu64", cur_lsn.lsn=%"PRIu64"\n", dir, lsn.lsn, lc->cur_lsn.lsn);
return LC_LSN_ERROR;
}
lc->cur_lsn.lsn = lsn.lsn;
return r;
}
// toku_logcursor_create() // toku_logcursor_create()
// - returns a pointer to a logcursor // - returns a pointer to a logcursor
int toku_logcursor_create(TOKULOGCURSOR *lc, const char *log_dir) { int toku_logcursor_create(TOKULOGCURSOR *lc, const char *log_dir) {
...@@ -51,16 +85,38 @@ int toku_logcursor_create(TOKULOGCURSOR *lc, const char *log_dir) { ...@@ -51,16 +85,38 @@ int toku_logcursor_create(TOKULOGCURSOR *lc, const char *log_dir) {
// malloc a cursor // malloc a cursor
TOKULOGCURSOR cursor = (TOKULOGCURSOR) toku_malloc(sizeof(struct toku_logcursor)); TOKULOGCURSOR cursor = (TOKULOGCURSOR) toku_malloc(sizeof(struct toku_logcursor));
if ( NULL==cursor ) if ( cursor == NULL ) {
return ENOMEM; failresult = ENOMEM;
goto fail;
}
// find logfiles in logdir // find logfiles in logdir
cursor->is_open = FALSE; cursor->is_open = FALSE;
cursor->cur_logfiles_index = 0; cursor->cur_logfiles_index = 0;
cursor->entry_valid = FALSE; cursor->entry_valid = FALSE;
// cursor->logdir must be an absolute path
if ( log_dir[0]=='/' ) {
cursor->logdir = (char *) toku_malloc(strlen(log_dir)+1); cursor->logdir = (char *) toku_malloc(strlen(log_dir)+1);
if ( NULL==cursor->logdir ) if ( cursor->logdir == NULL ) {
return ENOMEM; failresult = ENOMEM;
strcpy(cursor->logdir, log_dir); goto fail;
}
sprintf(cursor->logdir, "%s", log_dir);
}
else {
char *cwd = getcwd(NULL, 0);
if ( cwd == NULL ) {
failresult = -1;
goto fail;
}
cursor->logdir = (char *) toku_malloc(strlen(cwd)+strlen(log_dir)+2);
if ( cursor->logdir == NULL ) {
toku_free(cwd);
failresult = ENOMEM;
goto fail;
}
sprintf(cursor->logdir, "%s/%s", cwd, log_dir);
toku_free(cwd);
}
cursor->logfiles = NULL; cursor->logfiles = NULL;
cursor->n_logfiles = 0; cursor->n_logfiles = 0;
r = toku_logger_find_logfiles(cursor->logdir, &(cursor->logfiles), &(cursor->n_logfiles)); r = toku_logger_find_logfiles(cursor->logdir, &(cursor->logfiles), &(cursor->n_logfiles));
...@@ -68,6 +124,8 @@ int toku_logcursor_create(TOKULOGCURSOR *lc, const char *log_dir) { ...@@ -68,6 +124,8 @@ int toku_logcursor_create(TOKULOGCURSOR *lc, const char *log_dir) {
failresult=r; failresult=r;
goto fail; goto fail;
} }
cursor->cur_lsn.lsn=0;
cursor->last_direction=LC_FIRST;
*lc = cursor; *lc = cursor;
return r; return r;
fail: fail:
...@@ -76,6 +134,50 @@ int toku_logcursor_create(TOKULOGCURSOR *lc, const char *log_dir) { ...@@ -76,6 +134,50 @@ int toku_logcursor_create(TOKULOGCURSOR *lc, const char *log_dir) {
return failresult; return failresult;
} }
int toku_logcursor_create_for_file(TOKULOGCURSOR *lc, const char *log_dir, const char *log_file) {
int r=0;
int failresult=0;
TOKULOGCURSOR cursor;
r = toku_logcursor_create(&cursor, log_dir);
if (r!=0)
return r;
int idx;
int found_it=0;
int fullnamelen = strlen(cursor->logdir) + strlen(log_file) + 3;
char *log_file_fullname = toku_malloc(fullnamelen);
if ( log_file_fullname == NULL ) {
failresult = ENOMEM;
goto fail;
}
sprintf(log_file_fullname, "%s/%s", cursor->logdir, log_file);
for(idx=0;idx<cursor->n_logfiles;idx++) {
if ( strcmp(log_file_fullname, cursor->logfiles[idx]) == 0 ) {
found_it = 1;
break;
}
}
if (found_it==0) {
failresult = DB_NOTFOUND;
goto fail;
}
// replace old logfile structure with single file version
int lf;
for(lf=0;lf<cursor->n_logfiles;lf++) {
toku_free(cursor->logfiles[lf]);
}
cursor->n_logfiles=1;
cursor->logfiles[0] = log_file_fullname;
*lc = cursor;
return r;
fail:
toku_free(log_file_fullname);
toku_logcursor_destroy(&cursor);
*lc = NULL;
return failresult;
}
int toku_logcursor_destroy(TOKULOGCURSOR *lc) { int toku_logcursor_destroy(TOKULOGCURSOR *lc) {
int r=0; int r=0;
if ( (*lc)->entry_valid ) { if ( (*lc)->entry_valid ) {
...@@ -99,7 +201,13 @@ int toku_logcursor_next(TOKULOGCURSOR lc, struct log_entry **le) { ...@@ -99,7 +201,13 @@ int toku_logcursor_next(TOKULOGCURSOR lc, struct log_entry **le) {
if ( lc->entry_valid ) { if ( lc->entry_valid ) {
toku_log_free_log_entry_resources(&(lc->entry)); toku_log_free_log_entry_resources(&(lc->entry));
lc->entry_valid = FALSE; lc->entry_valid = FALSE;
} else if ( !lc->entry_valid ) { if (lc->last_direction == LC_BACKWARD) {
struct log_entry junk;
r = toku_log_fread(lc->cur_fp, &junk);
assert(r == 0);
toku_log_free_log_entry_resources(&junk);
}
} else {
r = toku_logcursor_first(lc, le); r = toku_logcursor_first(lc, le);
return r; return r;
} }
...@@ -126,6 +234,10 @@ int toku_logcursor_next(TOKULOGCURSOR lc, struct log_entry **le) { ...@@ -126,6 +234,10 @@ int toku_logcursor_next(TOKULOGCURSOR lc, struct log_entry **le) {
return r; return r;
} }
} }
r = lc_check_lsn(lc, LC_FORWARD);
if (r!=0)
return r;
lc->last_direction = LC_FORWARD;
lc->entry_valid = TRUE; lc->entry_valid = TRUE;
*le = &(lc->entry); *le = &(lc->entry);
return r; return r;
...@@ -136,7 +248,13 @@ int toku_logcursor_prev(TOKULOGCURSOR lc, struct log_entry **le) { ...@@ -136,7 +248,13 @@ int toku_logcursor_prev(TOKULOGCURSOR lc, struct log_entry **le) {
if ( lc->entry_valid ) { if ( lc->entry_valid ) {
toku_log_free_log_entry_resources(&(lc->entry)); toku_log_free_log_entry_resources(&(lc->entry));
lc->entry_valid = FALSE; lc->entry_valid = FALSE;
} else if ( !lc->entry_valid ) { if (lc->last_direction == LC_FORWARD) {
struct log_entry junk;
r = toku_log_fread_backward(lc->cur_fp, &junk);
assert(r == 0);
toku_log_free_log_entry_resources(&junk);
}
} else {
r = toku_logcursor_last(lc, le); r = toku_logcursor_last(lc, le);
return r; return r;
} }
...@@ -163,6 +281,10 @@ int toku_logcursor_prev(TOKULOGCURSOR lc, struct log_entry **le) { ...@@ -163,6 +281,10 @@ int toku_logcursor_prev(TOKULOGCURSOR lc, struct log_entry **le) {
return r; return r;
} }
} }
r = lc_check_lsn(lc, LC_BACKWARD);
if (r!=0)
return r;
lc->last_direction = LC_BACKWARD;
lc->entry_valid = TRUE; lc->entry_valid = TRUE;
*le = &(lc->entry); *le = &(lc->entry);
return r; return r;
...@@ -188,6 +310,10 @@ int toku_logcursor_first(TOKULOGCURSOR lc, struct log_entry **le) { ...@@ -188,6 +310,10 @@ int toku_logcursor_first(TOKULOGCURSOR lc, struct log_entry **le) {
r = toku_log_fread(lc->cur_fp, &(lc->entry)); r = toku_log_fread(lc->cur_fp, &(lc->entry));
if (r!=0) if (r!=0)
return r; return r;
r = lc_check_lsn(lc, LC_FIRST);
if (r!=0)
return r;
lc->last_direction = LC_FIRST;
lc->entry_valid = TRUE; lc->entry_valid = TRUE;
*le = &(lc->entry); *le = &(lc->entry);
return r; return r;
...@@ -217,6 +343,10 @@ int toku_logcursor_last(TOKULOGCURSOR lc, struct log_entry **le) { ...@@ -217,6 +343,10 @@ int toku_logcursor_last(TOKULOGCURSOR lc, struct log_entry **le) {
r = toku_log_fread_backward(lc->cur_fp, &(lc->entry)); r = toku_log_fread_backward(lc->cur_fp, &(lc->entry));
if (r!=0) if (r!=0)
return r; return r;
r = lc_check_lsn(lc, LC_LAST);
if (r!=0)
return r;
lc->last_direction = LC_LAST;
lc->entry_valid = TRUE; lc->entry_valid = TRUE;
*le = &(lc->entry); *le = &(lc->entry);
return r; return r;
......
...@@ -13,11 +13,14 @@ typedef struct toku_logcursor *TOKULOGCURSOR; ...@@ -13,11 +13,14 @@ typedef struct toku_logcursor *TOKULOGCURSOR;
// All routines return 0 on success // All routines return 0 on success
// toku_logcursor_create() // toku_logcursor_create()
// - returns a pointer to a logcursor // - creates a logcursor (lc)
// - following toku_logcursor_create() // - following toku_logcursor_create()
// if toku_logcursor_next() is called, it returns the first entry in the log // if toku_logcursor_next() is called, it returns the first entry in the log
// if toku_logcursor_prev() is called, it returns the last entry in the log // if toku_logcursor_prev() is called, it returns the last entry in the log
int toku_logcursor_create(TOKULOGCURSOR *lc, const char *log_dir); int toku_logcursor_create(TOKULOGCURSOR *lc, const char *log_dir);
// toku_logcursor_create_for_file()
// - creates a logcusor (lc) that only knows about the file log_file
int toku_logcursor_create_for_file(TOKULOGCURSOR *lc, const char *log_dir, const char *log_file);
// toku_logcursor_destroy() // toku_logcursor_destroy()
// - frees all resources associated with the logcursor, including the log_entry // - frees all resources associated with the logcursor, including the log_entry
// associated with the latest cursor action // associated with the latest cursor action
......
...@@ -48,7 +48,25 @@ int toku_logger_create (TOKULOGGER *resultp) { ...@@ -48,7 +48,25 @@ int toku_logger_create (TOKULOGGER *resultp) {
int toku_logger_open (const char *directory, TOKULOGGER logger) { int toku_logger_open (const char *directory, TOKULOGGER logger) {
if (logger->is_open) return EINVAL; if (logger->is_open) return EINVAL;
if (logger->is_panicked) return EINVAL; if (logger->is_panicked) return EINVAL;
int r; int r;
TOKULOGCURSOR logcursor;
struct log_entry *le;
logger->lsn.lsn=0;
// In searching existing logfiles for last LSN, have chosen to
// ignore errors that may occur. In the event of a return error,
// revert to using LSN=0
r = toku_logcursor_create(&logcursor, directory);
if (r==0) {
r = toku_logcursor_last(logcursor, &le);
if (r==0)
logger->lsn = toku_log_entry_get_lsn(le);
toku_logcursor_destroy(&logcursor);
}
//printf("starting after LSN=%lu\n", logger->lsn.lsn);
logger->written_lsn = logger->lsn;
logger->fsynced_lsn = logger->lsn;
long long nexti; long long nexti;
r = toku_logger_find_next_unused_log_file(directory, &nexti); r = toku_logger_find_next_unused_log_file(directory, &nexti);
if (r!=0) return r; if (r!=0) return r;
...@@ -57,10 +75,6 @@ int toku_logger_open (const char *directory, TOKULOGGER logger) { ...@@ -57,10 +75,6 @@ int toku_logger_open (const char *directory, TOKULOGGER logger) {
logger->next_log_file_number = nexti; logger->next_log_file_number = nexti;
open_logfile(logger); open_logfile(logger);
logger->lsn.lsn = 0; // WRONG!!! This should actually be calculated by looking at the log file.
logger->written_lsn.lsn = 0;
logger->fsynced_lsn.lsn = 0;
logger->is_open = 1; logger->is_open = 1;
if (!logger->write_log_files) if (!logger->write_log_files)
toku_set_func_fsync(toku_logger_fsync_null); toku_set_func_fsync(toku_logger_fsync_null);
......
...@@ -58,6 +58,7 @@ struct backward_scan_state { ...@@ -58,6 +58,7 @@ struct backward_scan_state {
int n_live_txns; int n_live_txns;
LSN min_live_txn; LSN min_live_txn;
}; };
static struct backward_scan_state initial_bss = {BS_INIT,{0},0,{0}}; static struct backward_scan_state initial_bss = {BS_INIT,{0},0,{0}};
static void static void
...@@ -176,12 +177,10 @@ internal_toku_recover_fopen_or_fcreate (int flags, int mode, char *fixedfname, F ...@@ -176,12 +177,10 @@ internal_toku_recover_fopen_or_fcreate (int flags, int mode, char *fixedfname, F
static void static void
toku_recover_fopen (LSN UU(lsn), TXNID UU(txnid), BYTESTRING fname, FILENUM filenum) { toku_recover_fopen (LSN UU(lsn), TXNID UU(txnid), BYTESTRING fname, FILENUM filenum) {
char *fixedfname = fixup_fname(&fname); char *fixedfname = fixup_fname(&fname);
toku_free_BYTESTRING(fname);
internal_toku_recover_fopen_or_fcreate(0, 0, fixedfname, filenum); internal_toku_recover_fopen_or_fcreate(0, 0, fixedfname, filenum);
} }
static int toku_recover_backward_fopen (struct logtype_fopen *l, struct backward_scan_state *UU(bs)) { static int toku_recover_backward_fopen (struct logtype_fopen *UU(l), struct backward_scan_state *UU(bs)) {
toku_free_BYTESTRING(l->fname);
return 0; return 0;
} }
...@@ -190,44 +189,14 @@ static int toku_recover_backward_fopen (struct logtype_fopen *l, struct backward ...@@ -190,44 +189,14 @@ static int toku_recover_backward_fopen (struct logtype_fopen *l, struct backward
static void static void
toku_recover_fcreate (LSN UU(lsn), TXNID UU(txnid), FILENUM filenum, BYTESTRING fname,u_int32_t mode) { toku_recover_fcreate (LSN UU(lsn), TXNID UU(txnid), FILENUM filenum, BYTESTRING fname,u_int32_t mode) {
char *fixedfname = fixup_fname(&fname); char *fixedfname = fixup_fname(&fname);
toku_free_BYTESTRING(fname);
create_dir_from_file(fixedfname); create_dir_from_file(fixedfname);
internal_toku_recover_fopen_or_fcreate(O_CREAT|O_TRUNC, mode, fixedfname, filenum); internal_toku_recover_fopen_or_fcreate(O_CREAT|O_TRUNC, mode, fixedfname, filenum);
} }
static int toku_recover_backward_fcreate (struct logtype_fcreate *l, struct backward_scan_state *UU(bs)) { static int toku_recover_backward_fcreate (struct logtype_fcreate *UU(l), struct backward_scan_state *UU(bs)) {
toku_free_BYTESTRING(l->fname);
return 0; return 0;
} }
#if 0
static void
toku_recover_enqrootentry (LSN lsn __attribute__((__unused__)), FILENUM filenum, TXNID xid, u_int32_t typ, BYTESTRING key, BYTESTRING val) {
struct cf_pair *pair = NULL;
int r = find_cachefile(filenum, &pair);
if (r!=0) {
// if we didn't find a cachefile, then we don't have to do anything.
return;
}
struct brt_cmd cmd;
DBT keydbt, valdbt;
cmd.type=(enum brt_cmd_type) typ;
cmd.xid =xid;
cmd.u.id.key = toku_fill_dbt(&keydbt, key.data, key.len);
cmd.u.id.val = toku_fill_dbt(&valdbt, val.data, val.len);
r = toku_brt_root_put_cmd(pair->brt, &cmd, null_tokulogger);
assert(r==0);
toku_free(key.data);
toku_free(val.data);
}
static int toku_recover_backward_enqrootentry (struct logtype_enqrootentry *l, struct backward_scan_state *UU(bs)) {
toku_free_BYTESTRING(l->key);
toku_free_BYTESTRING(l->data);
return 0;
}
#endif
static void static void
toku_recover_enq_insert (LSN lsn __attribute__((__unused__)), FILENUM filenum, TXNID xid, BYTESTRING key, BYTESTRING val) { toku_recover_enq_insert (LSN lsn __attribute__((__unused__)), FILENUM filenum, TXNID xid, BYTESTRING key, BYTESTRING val) {
struct cf_pair *pair = NULL; struct cf_pair *pair = NULL;
...@@ -250,13 +219,9 @@ toku_recover_enq_insert (LSN lsn __attribute__((__unused__)), FILENUM filenum, T ...@@ -250,13 +219,9 @@ toku_recover_enq_insert (LSN lsn __attribute__((__unused__)), FILENUM filenum, T
assert(r==0); assert(r==0);
xids_destroy(&cmd.xids); xids_destroy(&cmd.xids);
xids_destroy(&root); xids_destroy(&root);
toku_free(key.data);
toku_free(val.data);
} }
static int toku_recover_backward_enq_insert (struct logtype_enq_insert *l, struct backward_scan_state *UU(bs)) { static int toku_recover_backward_enq_insert (struct logtype_enq_insert *UU(l), struct backward_scan_state *UU(bs)) {
toku_free_BYTESTRING(l->key);
toku_free_BYTESTRING(l->value);
return 0; return 0;
} }
...@@ -282,13 +247,9 @@ toku_recover_enq_delete_both (LSN lsn __attribute__((__unused__)), FILENUM filen ...@@ -282,13 +247,9 @@ toku_recover_enq_delete_both (LSN lsn __attribute__((__unused__)), FILENUM filen
assert(r==0); assert(r==0);
xids_destroy(&cmd.xids); xids_destroy(&cmd.xids);
xids_destroy(&root); xids_destroy(&root);
toku_free(key.data);
toku_free(val.data);
} }
static int toku_recover_backward_enq_delete_both (struct logtype_enq_delete_both *l, struct backward_scan_state *UU(bs)) { static int toku_recover_backward_enq_delete_both (struct logtype_enq_delete_both *UU(l), struct backward_scan_state *UU(bs)) {
toku_free_BYTESTRING(l->key);
toku_free_BYTESTRING(l->value);
return 0; return 0;
} }
...@@ -314,13 +275,9 @@ toku_recover_enq_delete_any (LSN lsn __attribute__((__unused__)), FILENUM filenu ...@@ -314,13 +275,9 @@ toku_recover_enq_delete_any (LSN lsn __attribute__((__unused__)), FILENUM filenu
assert(r==0); assert(r==0);
xids_destroy(&cmd.xids); xids_destroy(&cmd.xids);
xids_destroy(&root); xids_destroy(&root);
toku_free(key.data);
toku_free(val.data);
} }
static int toku_recover_backward_enq_delete_any (struct logtype_enq_delete_any *l, struct backward_scan_state *UU(bs)) { static int toku_recover_backward_enq_delete_any (struct logtype_enq_delete_any *UU(l), struct backward_scan_state *UU(bs)) {
toku_free_BYTESTRING(l->key);
toku_free_BYTESTRING(l->value);
return 0; return 0;
} }
...@@ -332,11 +289,9 @@ toku_recover_fclose (LSN UU(lsn), BYTESTRING UU(fname), FILENUM filenum) { ...@@ -332,11 +289,9 @@ toku_recover_fclose (LSN UU(lsn), BYTESTRING UU(fname), FILENUM filenum) {
r = toku_close_brt(pair->brt, 0, 0); r = toku_close_brt(pair->brt, 0, 0);
assert(r==0); assert(r==0);
pair->brt=0; pair->brt=0;
toku_free_BYTESTRING(fname);
} }
static int toku_recover_backward_fclose (struct logtype_fclose *l, struct backward_scan_state *UU(bs)) { static int toku_recover_backward_fclose (struct logtype_fclose *UU(l), struct backward_scan_state *UU(bs)) {
toku_free_BYTESTRING(l->fname);
return 0; return 0;
} }
...@@ -363,7 +318,6 @@ static int toku_recover_backward_begin_checkpoint (struct logtype_begin_checkpoi ...@@ -363,7 +318,6 @@ static int toku_recover_backward_begin_checkpoint (struct logtype_begin_checkpoi
abort(); abort();
} }
static int toku_recover_end_checkpoint (LSN UU(lsn), TXNID UU(txnid)) { static int toku_recover_end_checkpoint (LSN UU(lsn), TXNID UU(txnid)) {
return 0; return 0;
} }
...@@ -391,7 +345,6 @@ static int toku_recover_fassociate (LSN UU(lsn), FILENUM UU(filenum), BYTESTRING ...@@ -391,7 +345,6 @@ static int toku_recover_fassociate (LSN UU(lsn), FILENUM UU(filenum), BYTESTRING
static int toku_recover_backward_fassociate (struct logtype_fassociate *l, struct backward_scan_state *UU(bs)) { static int toku_recover_backward_fassociate (struct logtype_fassociate *l, struct backward_scan_state *UU(bs)) {
char *fixedfname = fixup_fname(&l->fname); char *fixedfname = fixup_fname(&l->fname);
toku_free_BYTESTRING(l->fname);
internal_toku_recover_fopen_or_fcreate(0, 0, fixedfname, l->filenum); internal_toku_recover_fopen_or_fcreate(0, 0, fixedfname, l->filenum);
return 0; return 0;
} }
...@@ -445,13 +398,11 @@ static int toku_recover_backward_xbegin (struct logtype_xbegin *l, struct backwa ...@@ -445,13 +398,11 @@ static int toku_recover_backward_xbegin (struct logtype_xbegin *l, struct backwa
abort(); abort();
} }
static int toku_recover_timestamp (LSN UU(lsn), u_int64_t UU(timestamp), BYTESTRING comment) { static int toku_recover_timestamp (LSN UU(lsn), u_int64_t UU(timestamp), BYTESTRING UU(comment)) {
toku_free_BYTESTRING(comment);
return 0; return 0;
} }
static int toku_recover_backward_timestamp (struct logtype_timestamp *l, struct backward_scan_state *UU(bs)) { static int toku_recover_backward_timestamp (struct logtype_timestamp *UU(l), struct backward_scan_state *UU(bs)) {
toku_free_BYTESTRING(l->comment);
return 0; return 0;
} }
...@@ -495,9 +446,6 @@ static int toku_delete_rolltmp_files (const char *log_dir) { ...@@ -495,9 +446,6 @@ static int toku_delete_rolltmp_files (const char *log_dir) {
int tokudb_recover(const char *data_dir, const char *log_dir) { int tokudb_recover(const char *data_dir, const char *log_dir) {
int failresult = 0; int failresult = 0;
int r; int r;
int entrycount=0;
char **logfiles;
int lockfd; int lockfd;
{ {
...@@ -514,117 +462,61 @@ int tokudb_recover(const char *data_dir, const char *log_dir) { ...@@ -514,117 +462,61 @@ int tokudb_recover(const char *data_dir, const char *log_dir) {
} }
} }
r = toku_delete_rolltmp_files(log_dir);
if (r!=0) { failresult=r; goto fail; }
int n_logfiles;
r = toku_logger_find_logfiles(log_dir, &logfiles, &n_logfiles);
if (r!=0) { failresult=r; goto fail; }
int i;
toku_recover_init();
char org_wd[1000]; char org_wd[1000];
{ {
char *wd=getcwd(org_wd, sizeof(org_wd)); char *wd=getcwd(org_wd, sizeof(org_wd));
assert(wd!=0); assert(wd!=0);
//printf("%s:%d org_wd=\"%s\"\n", __FILE__, __LINE__, org_wd); //printf("%s:%d org_wd=\"%s\"\n", __FILE__, __LINE__, org_wd);
} }
char data_wd[1000];
{
r=chdir(data_dir); assert(r==0);
char *wd=getcwd(data_wd, sizeof(data_wd));
assert(wd!=0);
//printf("%s:%d data_wd=\"%s\"\n", __FILE__, __LINE__, data_wd);
}
LSN lastlsn = ZERO_LSN; r = toku_delete_rolltmp_files(log_dir);
FILE *f = NULL; if (r!=0) { failresult=r; goto fail; }
for (i=0; i<n_logfiles; i++) {
if (f) fclose(f); TOKULOGCURSOR logcursor;
r=chdir(org_wd); r = toku_logcursor_create(&logcursor, log_dir);
assert(r==0); assert(r == 0);
char *logfile = logfiles[n_logfiles-i-1];
f = fopen(logfile, "r"); toku_recover_init();
assert(f);
printf("Opened %s\n", logfiles[n_logfiles-i-1]); r = chdir(data_dir);
r = fseek(f, 0, SEEK_END); assert(r==0); assert(r == 0);
struct log_entry le;
struct log_entry *le;
struct backward_scan_state bs = initial_bss; struct backward_scan_state bs = initial_bss;
r=chdir(data_wd);
assert(r==0);
while (1) { while (1) {
r = toku_log_fread_backward(f, &le); r = toku_logcursor_prev(logcursor, &le);
if (r==-1) break; // Ran out of file if (0) printf("%s:%d r=%d cmd=%c\n", __FUNCTION__, __LINE__, r, r==0 ? le->cmd:'?');
LSN thislsn = toku_log_entry_get_lsn(&le); if (r != 0)
if (lastlsn.lsn != 0) { break;
if (thislsn.lsn != lastlsn.lsn - 1) logtype_dispatch_assign(le, toku_recover_backward_, r, &bs);
printf("bw lastlsn=%"PRIu64" lsn=%"PRIu64"\n", lastlsn.lsn, thislsn.lsn); if (r != 0)
//assert(thislsn.lsn == lastlsn.lsn - 1); break;
}
lastlsn = thislsn;
logtype_dispatch_assign(&le, toku_recover_backward_, r, &bs);
if (r!=0) goto go_forward;
}
}
i--;
// We got to the end of the last log
if (f) { r=fclose(f); assert(r==0); }
// Now we go forward from this point
while (n_logfiles-i-1<n_logfiles) {
//fprintf(stderr, "Opening %s\n", logfiles[i]);
int j = n_logfiles-i-1;
r=chdir(org_wd);
assert(r==0);
f = fopen(logfiles[j], "r");
assert(f);
struct log_entry le;
u_int32_t version;
//printf("Reading file %d: %s\n", j, logfiles[j]);
r=toku_read_logmagic(f, &version);
assert(r==0 && version==TOKU_LOG_VERSION);
go_forward: // we have an open file, so go forward.
//printf("Going forward\n");
r=chdir(data_wd);
assert(r==0);
while ((r = toku_log_fread(f, &le))==0) {
//printf("doing %c\n", le.cmd);
LSN thislsn = toku_log_entry_get_lsn(&le);
if (lastlsn.lsn != thislsn.lsn) {
printf("fw expectlsn=%"PRIu64" lsn=%"PRIu64"\n", lastlsn.lsn, thislsn.lsn);
} }
// assert(lastlsn.lsn == thislsn.lsn);
lastlsn.lsn += 1;
logtype_dispatch_args(&le, toku_recover_); while (1) {
entrycount++; r = toku_logcursor_next(logcursor, &le);
} if (0) printf("%s:%d r=%d cmd=%c\n", __FUNCTION__, __LINE__, r, r==0 ? le->cmd:'?');
if (r!=EOF) { if (r != 0)
if (r==DB_BADFORMAT) { break;
fprintf(stderr, "Bad log format at record %d\n", entrycount); logtype_dispatch_args(le, toku_recover_);
return r;
} else {
fprintf(stderr, "Huh? %s\n", strerror(r));
return r;
}
}
fclose(f);
i--;
} }
toku_recover_cleanup(); toku_recover_cleanup();
for (i=0; logfiles[i]; i++) {
toku_free(logfiles[i]);
}
toku_free(logfiles);
r=toku_os_unlock_file(lockfd); r = chdir(org_wd);
if (r!=0) return errno; assert(r == 0);
r=chdir(org_wd); r = toku_logcursor_destroy(&logcursor);
assert(r == 0);
r=toku_os_unlock_file(lockfd);
if (r!=0) return errno; if (r!=0) return errno;
//printf("%s:%d recovery successful! ls -l says\n", __FILE__, __LINE__); //printf("%s:%d recovery successful! ls -l says\n", __FILE__, __LINE__);
//system("ls -l"); //system("ls -l");
return 0; return 0;
fail: fail:
toku_os_unlock_file(lockfd); toku_os_unlock_file(lockfd);
chdir(org_wd); chdir(org_wd);
......
...@@ -83,7 +83,8 @@ REGRESSION_TESTS_RAW = \ ...@@ -83,7 +83,8 @@ REGRESSION_TESTS_RAW = \
log-test5 \ log-test5 \
log-test6 \ log-test6 \
log-test7 \ log-test7 \
log-test-timestamp \ logcursor-timestamp \
logcursor-empty-logdir \
memtest \ memtest \
minicron-test \ minicron-test \
omt-cursor-test \ omt-cursor-test \
...@@ -97,6 +98,7 @@ REGRESSION_TESTS_RAW = \ ...@@ -97,6 +98,7 @@ REGRESSION_TESTS_RAW = \
test-inc-split \ test-inc-split \
test-leafentry10 \ test-leafentry10 \
test-leafentry-nested \ test-leafentry-nested \
test_logcursor \
test_oexcl \ test_oexcl \
test_toku_malloc_plain_free \ test_toku_malloc_plain_free \
threadpool-test \ threadpool-test \
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
#include "test.h"
#include "includes.h"
#define dname __FILE__ ".dir"
#define rmrf "rm -rf " dname "/"
// a logcursor in an empty directory should not find any log entries
int
test_main (int argc, const char *argv[]) {
default_parse_args(argc, argv);
int r;
system(rmrf);
r = toku_os_mkdir(dname, S_IRWXU); assert(r==0);
// verify the log is empty
TOKULOGCURSOR lc = NULL;
struct log_entry *le;
r = toku_logcursor_create(&lc, dname);
assert(r == 0 && lc != NULL);
r = toku_logcursor_next(lc, &le);
assert(r != 0);
r = toku_logcursor_prev(lc, &le);
assert(r != 0);
r = toku_logcursor_destroy(&lc);
assert(r == 0 && lc == NULL);
return 0;
}
...@@ -19,8 +19,9 @@ static u_int64_t now(void) { ...@@ -19,8 +19,9 @@ static u_int64_t now(void) {
// a cursor through the log entries // a cursor through the log entries
int int
test_main (int argc __attribute__((__unused__)), test_main (int argc, const char *argv[]) {
const char *argv[] __attribute__((__unused__))) { default_parse_args(argc, argv);
int r; int r;
system(rmrf); system(rmrf);
r = toku_os_mkdir(dname, S_IRWXU); assert(r==0); r = toku_os_mkdir(dname, S_IRWXU); assert(r==0);
...@@ -48,12 +49,13 @@ test_main (int argc __attribute__((__unused__)), ...@@ -48,12 +49,13 @@ test_main (int argc __attribute__((__unused__)),
r = toku_logger_close(&logger); r = toku_logger_close(&logger);
assert(r == 0); assert(r == 0);
// TODO verify the log // verify the log forwards
TOKULOGCURSOR lc = NULL; TOKULOGCURSOR lc = NULL;
struct log_entry *le;
r = toku_logcursor_create(&lc, dname); r = toku_logcursor_create(&lc, dname);
assert(r == 0 && lc != NULL); assert(r == 0 && lc != NULL);
struct log_entry *le;
r = toku_logcursor_next(lc, &le); r = toku_logcursor_next(lc, &le);
assert(r == 0 && le->cmd == LT_timestamp); assert(r == 0 && le->cmd == LT_timestamp);
assert(le->u.timestamp.comment.len == 5 && memcmp(le->u.timestamp.comment.data, "hello", 5) == 0); assert(le->u.timestamp.comment.len == 5 && memcmp(le->u.timestamp.comment.data, "hello", 5) == 0);
...@@ -62,7 +64,9 @@ test_main (int argc __attribute__((__unused__)), ...@@ -62,7 +64,9 @@ test_main (int argc __attribute__((__unused__)),
r = toku_logcursor_next(lc, &le); r = toku_logcursor_next(lc, &le);
assert(r == 0 && le->cmd == LT_timestamp); assert(r == 0 && le->cmd == LT_timestamp);
assert(le->u.timestamp.comment.len == 5 && memcmp(le->u.timestamp.comment.data, "world", 5) == 0); assert(le->u.timestamp.comment.len == 5 && memcmp(le->u.timestamp.comment.data, "world", 5) == 0);
if (verbose)
printf("%"PRId64"\n", le->u.timestamp.timestamp - t); printf("%"PRId64"\n", le->u.timestamp.timestamp - t);
assert(le->u.timestamp.timestamp - t >= 10*1000000);
r = toku_logcursor_next(lc, &le); r = toku_logcursor_next(lc, &le);
assert(r != 0); assert(r != 0);
...@@ -70,5 +74,27 @@ test_main (int argc __attribute__((__unused__)), ...@@ -70,5 +74,27 @@ test_main (int argc __attribute__((__unused__)),
r = toku_logcursor_destroy(&lc); r = toku_logcursor_destroy(&lc);
assert(r == 0 && lc == NULL); assert(r == 0 && lc == NULL);
// verify the log backwards
r = toku_logcursor_create(&lc, dname);
assert(r == 0 && lc != NULL);
r = toku_logcursor_prev(lc, &le);
assert(r == 0 && le->cmd == LT_timestamp);
assert(le->u.timestamp.comment.len == 5 && memcmp(le->u.timestamp.comment.data, "world", 5) == 0);
t = le->u.timestamp.timestamp;
r = toku_logcursor_prev(lc, &le);
assert(r == 0 && le->cmd == LT_timestamp);
assert(le->u.timestamp.comment.len == 5 && memcmp(le->u.timestamp.comment.data, "hello", 5) == 0);
if (verbose)
printf("%"PRId64"\n", t - le->u.timestamp.timestamp);
assert(t - le->u.timestamp.timestamp >= 10*1000000);
r = toku_logcursor_prev(lc, &le);
assert(r != 0);
r = toku_logcursor_destroy(&lc);
assert(r == 0 && lc == NULL);
return 0; return 0;
} }
...@@ -6,50 +6,116 @@ ...@@ -6,50 +6,116 @@
#include "includes.h" #include "includes.h"
int test_0 (); int test_0 ();
int test_1 ();
static void usage() {
printf("test_logcursors [OPTIONS]\n");
printf("[-v]\n");
printf("[-q]\n");
}
int test_main(int argc __attribute__((unused)), const char *argv[] __attribute__((unused))) { int test_main(int argc, const char *argv[]) {
int r = 0; int i;
for (i=1; i<argc; i++) {
r = test_0(); const char *arg = argv[i];
if (arg[0] != '-')
break;
if (strcmp(arg, "-v")==0) {
verbose++;
} else if (strcmp(arg, "-q")==0) {
verbose = 0;
} else {
usage();
return 1;
}
}
return r int r = 0;
if ( (r=test_0()) !=0 ) return r;
if ( (r=test_1()) !=0 ) return r;
return r;
} }
int test_0 () { int test_0 () {
int r=0; int r=0;
char dbdir[100] = "./dir.test_logcursor.tdb"; char dbdir[100] = "./dir.test_logcursor";
struct toku_logcursor *cursor; struct toku_logcursor *cursor;
struct log_entry *entry; struct log_entry *entry;
r = toku_logcursor_create(&cursor, dbdir); r = toku_logcursor_create(&cursor, dbdir); if (verbose) printf("create returns %d\n", r); assert(r==0);
if ( r!=0 ) return r; r = toku_logcursor_next(cursor, &entry); if (verbose) printf("Entry = %c\n", entry->cmd); assert(r==0);
r = toku_logcursor_next(cursor, &entry); printf("Entry = %c\n", entry->cmd); r = toku_logcursor_next(cursor, &entry); if (verbose) printf("Entry = %c\n", entry->cmd); assert(r==0);
r = toku_logcursor_next(cursor, &entry); printf("Entry = %c\n", entry->cmd); r = toku_logcursor_next(cursor, &entry); if (verbose) printf("Entry = %c\n", entry->cmd); assert(r==0);
r = toku_logcursor_next(cursor, &entry); printf("Entry = %c\n", entry->cmd); r = toku_logcursor_destroy(&cursor); if (verbose) printf("destroy returns %d\n", r); assert(r==0);
printf("r=%d\n", r);
r = toku_logcursor_destroy(&cursor);
r = toku_logcursor_create(&cursor, dbdir); r = toku_logcursor_create(&cursor, dbdir); if (verbose) printf("create returns %d\n", r); assert(r==0);
if ( r!=0 ) return r; r = toku_logcursor_first(cursor, &entry); if (verbose) printf("First Entry = %c\n", entry->cmd); assert(r==0);
r = toku_logcursor_prev(cursor, &entry); printf("Entry = %c\n", entry->cmd); r = toku_logcursor_next(cursor, &entry); if (verbose) printf("Entry = %c\n", entry->cmd); assert(r==0);
r = toku_logcursor_prev(cursor, &entry); printf("Entry = %c\n", entry->cmd); r = toku_logcursor_next(cursor, &entry); if (verbose) printf("Entry = %c\n", entry->cmd); assert(r==0);
r = toku_logcursor_prev(cursor, &entry); printf("Entry = %c\n", entry->cmd); r = toku_logcursor_destroy(&cursor); if (verbose) printf("destroy returns %d\n", r); assert(r==0);
printf("r=%d\n", r);
r = toku_logcursor_destroy(&cursor);
r = toku_logcursor_create(&cursor, dbdir); r = toku_logcursor_create(&cursor, dbdir); if (verbose) printf("create returns %d\n", r); assert(r==0);
if ( r!=0 ) return r; r = toku_logcursor_prev(cursor, &entry); if (verbose) printf("Entry = %c\n", entry->cmd); assert(r==0);
r = toku_logcursor_next(cursor, &entry); printf("Entry = %c\n", entry->cmd); r = toku_logcursor_prev(cursor, &entry); if (verbose) printf("Entry = %c\n", entry->cmd); assert(r==0);
r = toku_logcursor_next(cursor, &entry); printf("Entry = %c\n", entry->cmd); r = toku_logcursor_prev(cursor, &entry); if (verbose) printf("Entry = %c\n", entry->cmd); assert(r==0);
r = toku_logcursor_next(cursor, &entry); printf("Entry = %c\n", entry->cmd); r = toku_logcursor_destroy(&cursor); if (verbose) printf("destroy returns %d\n", r); assert(r==0);
r = toku_logcursor_prev(cursor, &entry); printf("Entry = %c\n", entry->cmd);
r = toku_logcursor_prev(cursor, &entry); printf("Entry = %c\n", entry->cmd);
r = toku_logcursor_prev(cursor, &entry); printf("Entry = %c\n", entry->cmd);
r = toku_logcursor_prev(cursor, &entry); printf("Entry = %c\n", entry->cmd);
if ( r == DB_NOTFOUND ) printf("PASS\n"); else printf("FAIL\n");
r = toku_logcursor_destroy(&cursor); r = toku_logcursor_create(&cursor, dbdir); if (verbose) printf("create returns %d\n", r); assert(r==0);
r = toku_logcursor_last(cursor, &entry); if (verbose) printf("Last Entry = %c\n", entry->cmd); assert(r==0);
r = toku_logcursor_prev(cursor, &entry); if (verbose) printf("Entry = %c\n", entry->cmd); assert(r==0);
r = toku_logcursor_prev(cursor, &entry); if (verbose) printf("Entry = %c\n", entry->cmd); assert(r==0);
r = toku_logcursor_destroy(&cursor); if (verbose) printf("destroy returns %d\n", r); assert(r==0);
r = toku_logcursor_create(&cursor, dbdir); if (verbose) printf("create returns %d\n", r); assert(r==0);
r = toku_logcursor_prev(cursor, &entry); if (verbose) printf("Entry = %c\n", entry->cmd); assert(r==0);
r = toku_logcursor_prev(cursor, &entry); if (verbose) printf("Entry = %c\n", entry->cmd); assert(r==0);
r = toku_logcursor_next(cursor, &entry); if (verbose) printf("Entry = %c\n", entry->cmd); assert(r==0);
r = toku_logcursor_next(cursor, &entry); assert(r==DB_NOTFOUND);
r = toku_logcursor_destroy(&cursor); if (verbose) printf("destroy returns %d\n", r); assert(r==0);
r = toku_logcursor_create(&cursor, dbdir); if (verbose) printf("create returns %d\n", r); assert(r==0);
r = toku_logcursor_next(cursor, &entry); if (verbose) printf("Entry = %c\n", entry->cmd); assert(r==0);
r = toku_logcursor_next(cursor, &entry); if (verbose) printf("Entry = %c\n", entry->cmd); assert(r==0);
r = toku_logcursor_prev(cursor, &entry); if (verbose) printf("Entry = %c\n", entry->cmd); assert(r==0);
r = toku_logcursor_prev(cursor, &entry); assert(r==DB_NOTFOUND);
r = toku_logcursor_destroy(&cursor); if (verbose) printf("destroy returns %d\n", r); assert(r==0);
r = toku_logcursor_create(&cursor, dbdir); if (verbose) printf("create returns %d\n", r); assert(r==0);
r = toku_logcursor_next(cursor, &entry); if (verbose) printf("Entry = %c\n", entry->cmd); assert(r==0);
r = toku_logcursor_next(cursor, &entry); if (verbose) printf("Entry = %c\n", entry->cmd); assert(r==0);
r = toku_logcursor_next(cursor, &entry); if (verbose) printf("Entry = %c\n", entry->cmd); assert(r==0);
r = toku_logcursor_prev(cursor, &entry); if (verbose) printf("Entry = %c\n", entry->cmd); assert(r==0);
r = toku_logcursor_prev(cursor, &entry); if (verbose) printf("Entry = %c\n", entry->cmd); assert(r==0);
r = toku_logcursor_prev(cursor, &entry); if (verbose) printf("Entry = %c\n", entry->cmd);
if ( verbose) {
if ( r == DB_NOTFOUND ) printf("PASS\n");
else printf("FAIL\n");
}
assert(r==DB_NOTFOUND);
r = toku_logcursor_destroy(&cursor); if (verbose) printf("destroy returns %d\n", r); assert(r==0);
return 0; return 0;
} }
// test per-file version
int test_1 () {
int r=0;
char dbdir[100] = "./dir.test_logcursor";
char logfile[100] = "log000000000000.tokulog";
struct toku_logcursor *cursor;
struct log_entry *entry;
r = toku_logcursor_create_for_file(&cursor, dbdir, logfile);
if (verbose) printf("create returns %d\n", r);
assert(r==0);
r = toku_logcursor_last(cursor, &entry);
if (verbose) printf("entry = %c\n", entry->cmd);
assert(r==0);
assert(entry->cmd =='C');
r = toku_logcursor_destroy(&cursor);
if (verbose) printf("destroy returns %d\n", r);
assert(r==0);
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment