Commit b938b597 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

If fsync fails on a log commit, panic the DB. (We don't know if the...

If fsync fails on a log commit, panic the DB.  (We don't know if the transaction committed or failed without doing recovery.)
Start work on saving the undo records in main memory for rollback.
Addresses #27 (recovery) 
Addresses #253 (rollback)


git-svn-id: file:///svn/tokudb@1561 c7de825b-a66e-492c-adef-691d508d4ae1
parent 3a29cece
......@@ -9,6 +9,9 @@
#define LOGGER_BUF_SIZE (1<<24)
struct tokulogger {
int is_open;
int is_panicked;
int panic_errno;
enum typ_tag tag;
char *directory;
int fd;
......@@ -40,6 +43,7 @@ struct tokutxn {
TOKULOGGER logger;
TOKUTXN parent;
LSN last_lsn; /* Everytime anything is logged, update the LSN. (We need to atomically record the LSN along with writing into the log.) */
struct log_entry *oldest_logentry,*newest_logentry;
};
int toku_logger_finish (TOKULOGGER logger, struct wbuf *wbuf);
......
......@@ -64,33 +64,50 @@ int toku_logger_find_logfiles (const char *directory, int *n_resultsp, char ***r
return closedir(d);
}
int toku_logger_create_and_open_logger (const char *directory, TOKULOGGER *resultp) {
int toku_logger_create (TOKULOGGER *resultp) {
TAGMALLOC(TOKULOGGER, result);
if (result==0) return -1;
if (result==0) return errno;
result->is_open=0;
result->is_panicked=0;
*resultp=result;
return 0;
}
int toku_logger_open (const char *directory, TOKULOGGER logger) {
if (logger->is_open) return EINVAL;
if (logger->is_panicked) return EINVAL;
int r;
long long nexti;
r = toku_logger_find_next_unused_log_file(directory, &nexti);
if (r!=0) {
died0:
toku_free(result);
return r;
}
result->directory = toku_strdup(directory);
if (result->directory==0) goto died0;
result->fd = -1;
result->next_log_file_number = nexti;
result->n_in_buf = 0;
logger->directory = toku_strdup(directory);
if (logger->directory==0) goto died0;
logger->fd = -1;
logger->next_log_file_number = nexti;
logger->n_in_buf = 0;
result->lsn.lsn = 0; // WRONG!!! This should actually be calculated by looking at the log file.
logger->lsn.lsn = 0; // WRONG!!! This should actually be calculated by looking at the log file.
*resultp=result;
return toku_logger_log_bytes(result, 0, "");
return toku_logger_log_bytes(logger, 0, "");
}
void toku_logger_panic (TOKULOGGER logger, int err) {
logger->panic_errno=err;
logger->is_panicked=1;
}
int toku_logger_panicked(TOKULOGGER logger) {
if (logger==0) return 0;
return logger->is_panicked;
}
static int log_format_version=0;
int toku_logger_log_bytes(TOKULOGGER logger, int nbytes, void *bytes) {
int r;
if (logger->is_panicked) return EINVAL;
//fprintf(stderr, "%s:%d logging %d bytes\n", __FILE__, __LINE__, nbytes);
if (logger->fd==-1) {
int fnamelen = strlen(logger->directory)+50;
......@@ -129,9 +146,11 @@ int toku_logger_log_bytes(TOKULOGGER logger, int nbytes, void *bytes) {
return 0;
}
int toku_logger_log_close(TOKULOGGER *loggerp) {
int toku_logger_close(TOKULOGGER *loggerp) {
TOKULOGGER logger = *loggerp;
if (logger->is_panicked) return EINVAL;
int r = 0;
if (!logger->is_open) goto is_closed;
if (logger->fd!=-1) {
//printf("%s:%d closing log: n_in_buf=%d\n", __FILE__, __LINE__, logger->n_in_buf);
if (logger->n_in_buf>0) {
......@@ -141,6 +160,8 @@ int toku_logger_log_close(TOKULOGGER *loggerp) {
r = close(logger->fd);
}
toku_free(logger->directory);
is_closed:
logger->is_panicked=1; // Just in case this might help.
toku_free(logger);
*loggerp=0;
return r;
......@@ -160,6 +181,7 @@ n
int toku_logger_fsync (TOKULOGGER logger) {
//return 0;/// NO TXN
//fprintf(stderr, "%s:%d syncing log\n", __FILE__, __LINE__);
if (logger->is_panicked) return EINVAL;
if (logger->n_in_buf>0) {
int r = write(logger->fd, logger->buf, logger->n_in_buf);
if (r==-1) return errno;
......@@ -173,6 +195,7 @@ int toku_logger_fsync (TOKULOGGER logger) {
}
int toku_logger_finish (TOKULOGGER logger, struct wbuf *wbuf) {
if (logger->is_panicked) return EINVAL;
wbuf_int(wbuf, toku_crc32(0, wbuf->buf, wbuf->ndone));
wbuf_int(wbuf, 4+wbuf->ndone);
return toku_logger_log_bytes(logger, wbuf->ndone, wbuf->buf);
......@@ -187,6 +210,7 @@ int toku_logger_log_brt_insert_with_no_overwrite (TOKULOGGER logger,
int keylen,
unsigned char *val,
int vallen) {
if (logger->is_panicked) return EINVAL;
printf("%s:%d\n", __FILE__, __LINE__);
return 0;
int buflen=(keylen+vallen+4+4 // key and value
......@@ -213,6 +237,7 @@ int toku_logger_log_brt_insert_with_no_overwrite (TOKULOGGER logger,
int toku_logger_log_phys_add_or_delete_in_leaf (DB *db, TOKUTXN txn, DISKOFF diskoff, int is_add, const struct kv_pair *pair) {
assert(is_add==0);
if (txn==0) return 0;
if (txn->logger->is_panicked) return EINVAL;
assert(db);
int keylen = pair->keylen;
int vallen = pair->vallen;
......@@ -239,12 +264,14 @@ int toku_logger_log_phys_add_or_delete_in_leaf (DB *db, TOKUTXN txn, DISKOFF dis
}
int toku_logger_commit (TOKUTXN txn, int nosync) {
// panic handled in log_commit
int r = toku_log_commit(txn, txn->txnid64, nosync);
toku_free(txn);
return r;
}
int toku_logger_log_checkpoint (TOKULOGGER logger, LSN *lsn) {
if (logger->is_panicked) return EINVAL;
struct wbuf wbuf;
const int buflen =10;
unsigned char buf[buflen];
......@@ -258,6 +285,7 @@ int toku_logger_log_checkpoint (TOKULOGGER logger, LSN *lsn) {
}
int toku_logger_txn_begin (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TXNID txnid64, TOKULOGGER logger) {
if (logger->is_panicked) return EINVAL;
TAGMALLOC(TOKUTXN, result);
if (result==0) return errno;
result->txnid64 = txnid64;
......@@ -268,6 +296,7 @@ int toku_logger_txn_begin (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TXNID txnid
}
int toku_logger_log_block_rename (TOKULOGGER logger, FILENUM fileid, DISKOFF olddiskoff, DISKOFF newdiskoff, DISKOFF parentdiskoff, int childnum) {
if (logger->is_panicked) return EINVAL;
const int buflen=(+1 // log command
+8 // lsn
+8 // fileid
......@@ -292,6 +321,8 @@ int toku_logger_log_block_rename (TOKULOGGER logger, FILENUM fileid, DISKOFF old
}
int toku_logger_log_fcreate (TOKUTXN txn, const char *fname, int mode) {
if (txn==0) return 0;
if (txn->logger->is_panicked) return EINVAL;
BYTESTRING bs;
bs.len = strlen(fname);
bs.data = (char*)fname;
......@@ -300,6 +331,8 @@ int toku_logger_log_fcreate (TOKUTXN txn, const char *fname, int mode) {
/* fopen isn't really an action. It's just for bookkeeping. We need to know the filename that goes with a filenum. */
int toku_logger_log_fopen (TOKUTXN txn, const char * fname, FILENUM filenum) {
if (txn==0) return 0;
if (txn->logger->is_panicked) return EINVAL;
BYTESTRING bs;
bs.len = strlen(fname);
bs.data = (char*)fname;
......@@ -310,6 +343,7 @@ int toku_logger_log_fopen (TOKUTXN txn, const char * fname, FILENUM filenum) {
int toku_logger_log_unlink (TOKUTXN txn, const char *fname) {
if (txn==0) return 0;
if (txn->logger->is_panicked) return EINVAL;
const int fnamelen = strlen(fname);
const int buflen = (+1 // log command
+4 // length of fname
......@@ -325,6 +359,8 @@ int toku_logger_log_unlink (TOKUTXN txn, const char *fname) {
};
int toku_logger_log_header (TOKUTXN txn, FILENUM filenum, struct brt_header *h) {
if (txn==0) return 0;
if (txn->logger->is_panicked) return EINVAL;
#if 0
LOGGEDBRTHEADER lh;
lh.size = toku_serialize_brt_header_size(h);
......@@ -348,7 +384,6 @@ int toku_logger_log_header (TOKUTXN txn, FILENUM filenum, struct brt_header *h)
toku_free(all_that_stuff);
return r;
#else
if (txn==0) return 0;
int subsize=toku_serialize_brt_header_size(h);
int buflen = (4 // firstlen
+ 1 //cmd
......
......@@ -6,10 +6,13 @@
#include "../include/db.h"
#include "brttypes.h"
#include "kv-pair.h"
int toku_logger_create_and_open_logger (const char *directory, TOKULOGGER *resultp);
int toku_logger_create(TOKULOGGER */*resultp*/);
int toku_logger_open(const char */*directory*/, TOKULOGGER);
int toku_logger_log_bytes(TOKULOGGER logger, int nbytes, void *bytes);
int toku_logger_log_close(TOKULOGGER *logger);
int toku_logger_close(TOKULOGGER *logger);
int toku_logger_log_checkpoint (TOKULOGGER, LSN*);
void toku_logger_panic(TOKULOGGER, int/*err*/);
int toku_logger_panicked(TOKULOGGER /*logger*/);
int toku_logger_log_phys_add_or_delete_in_leaf (DB *db, TOKUTXN txn, DISKOFF diskoff, int is_add, const struct kv_pair *pair);
......
......@@ -138,6 +138,7 @@ void generate_log_struct (void) {
fprintf(hf, " union {\n");
DO_LOGTYPES(lt, fprintf(hf," struct logtype_%s %s;\n", lt->name, lt->name));
fprintf(hf, " } u;\n");
fprintf(hf, " struct log_entry *next; /* for in-memory list of log entries */\n");
fprintf(hf, "};\n");
}
......@@ -183,8 +184,18 @@ void generate_log_writer (void) {
if (lt->command=='C') {
fprintf(cf, " if (r!=0) return r;\n");
fprintf(cf, " // commit has some extra work to do.\n");
fprintf(cf, " if (txn->parent || nosync) return 0; // don't fsync if there is a parent.\n");
fprintf(cf, " else return toku_logger_fsync(txn->logger);\n");
fprintf(cf, " if (nosync) return 0;\n");
fprintf(cf, " if (txn->parent) { // do not fsync if there is a parent. Instead append the log entries onto the parent.\n");
fprintf(cf, " if (txn->parent->oldest_logentry) txn->parent->newest_logentry->next = txn->oldest_logentry;\n");
fprintf(cf, " else txn->parent->oldest_logentry = txn->oldest_logentry;\n");
fprintf(cf, " if (txn->newest_logentry) txn->parent->newest_logentry = txn->newest_logentry;\n");
fprintf(cf, " txn->newest_logentry = txn->oldest_logentry = 0;\n");
fprintf(cf, " } else {\n");
fprintf(cf, " while (txn->newest_logentry) { struct log_entry *next=txn->newest_logentry->next; toku_free(txn->newest_logentry); txn->newest_logentry=next; }\n");
fprintf(cf, " r = toku_logger_fsync(txn->logger);\n");
fprintf(cf, " if (r!=0) toku_logger_panic(txn->logger, r);\n");
fprintf(cf, " }\n");
fprintf(cf, " return 0;\n");
} else {
fprintf(cf, " return r;\n");
}
......
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment