Commit 8813e3eb authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Store the rollbacks on disk. Fixes #711. Addresses #698 (maybe fixes it).


git-svn-id: file:///svn/tokudb@3528 c7de825b-a66e-492c-adef-691d508d4ae1
parent 0ebdc145
......@@ -130,7 +130,7 @@ tdb-recover: $(OFILES)
roll.o: log_header.h log-internal.h log.h yerror.h brttypes.h kv-pair.h memory.h key.h cachetable.h gpma.h
log_code.o: log_header.h wbuf.h log-internal.h
log_code.o: log_header.h wbuf.h log-internal.h rbuf.h
log_header.h: log_code.c
@echo generated log_code.c so log_header.c was also generated
log_code.c: logformat
......
......@@ -82,6 +82,10 @@ struct tokutxn {
LSN first_lsn; /* The first lsn in the transaction. */
struct roll_entry *oldest_logentry,*newest_logentry; /* Only logentries with rollbacks are here. There is a list going from newest to oldest. */
struct list live_txns_link;
size_t rollentry_resident_bytecount; // How many bytes for the rollentries that are stored in main memory.
char *rollentry_filename;
int rollentry_fd; // If we spill the roll_entries, we write them into this fd.
off_t rollentry_filesize; // How many bytes are in the rollentry.
};
int toku_logger_finish (TOKULOGGER logger, struct logbytes *logbytes, struct wbuf *wbuf, int do_fsync);
......@@ -136,3 +140,5 @@ static inline char *fixup_fname(BYTESTRING *f) {
fname[f->len]=0;
return fname;
}
int toku_read_rollback_backwards(int fd, off_t at, struct roll_entry **item, off_t *new_at);
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
#define _XOPEN_SOURCE 500
#include <arpa/inet.h>
#include <ctype.h>
#include <dirent.h>
......@@ -342,13 +343,8 @@ int toku_logger_finish (TOKULOGGER logger, struct logbytes *logbytes, struct wbu
return toku_logger_log_bytes(logger, logbytes, do_fsync);
}
int toku_logger_commit (TOKUTXN txn, int nosync) {
// printf("%s:%d committing\n", __FILE__, __LINE__);
// panic handled in log_commit
int r = toku_log_commit(txn->logger, (LSN*)0, (txn->parent==0) && !nosync, txn->txnid64); // exits holding neither of the tokulogger locks.
if (r!=0) {
static void cleanup_txn (TOKUTXN txn) {
struct roll_entry *item;
broken:
while ((item=txn->newest_logentry)) {
txn->newest_logentry = item->prev;
rolltype_dispatch(item, toku_free_rolltype_);
......@@ -356,7 +352,52 @@ int toku_logger_commit (TOKUTXN txn, int nosync) {
toku_logger_rollback_malloc_size -= sizeof(*item);
toku_logger_rollback_malloc_count --;
}
r = 0;
if (txn->rollentry_filename!=0) {
int r = close(txn->rollentry_fd);
assert(r==0);
r = unlink(txn->rollentry_filename);
assert(r==0);
toku_free(txn->rollentry_filename);
}
list_remove(&txn->live_txns_link);
toku_free(txn);
return;
}
static int commit_rollback_item (TOKUTXN txn, struct roll_entry *item) {
int r=0;
rolltype_dispatch_assign(item, toku_commit_, r, txn);
if (r!=0) return r;
u_int32_t rollback_fsize = toku_logger_rollback_fsize(item);
toku_logger_rollback_malloc_size -= rollback_fsize;
toku_logger_rollback_malloc_count --;
rolltype_dispatch(item, toku_free_rolltype_);
toku_free(item);
return 0;
}
static int abort_rollback_item (TOKUTXN txn, struct roll_entry *item) {
int r=0;
rolltype_dispatch_assign(item, toku_rollback_, r, txn);
if (r!=0) return r;
u_int32_t rollback_fsize = toku_logger_rollback_fsize(item);
toku_logger_rollback_malloc_size -= rollback_fsize;
toku_logger_rollback_malloc_count --;
rolltype_dispatch(item, toku_free_rolltype_);
toku_free(item);
return 0;
}
int toku_logger_commit (TOKUTXN txn, int nosync) {
// printf("%s:%d committing\n", __FILE__, __LINE__);
// panic handled in log_commit
int r = toku_log_commit(txn->logger, (LSN*)0, (txn->parent==0) && !nosync, txn->txnid64); // exits holding neither of the tokulogger locks.
if (r!=0) {
cleanup_txn(txn);
return r;
} else if (txn->parent!=0) {
// Append the list to the front.
if (txn->oldest_logentry) {
......@@ -368,25 +409,32 @@ int toku_logger_commit (TOKUTXN txn, int nosync) {
txn->parent->oldest_logentry = txn->oldest_logentry;
}
txn->newest_logentry = txn->oldest_logentry = 0;
r = 0;
assert(txn->rollentry_filename==0); // This code isn't ready for this case. ??? When committing a child, we have to get the child's records into the parent.
} else {
// do the commit calls and free everything
// we do the commit calls in reverse order too.
{
struct roll_entry *item;
//printf("%s:%d abort\n", __FILE__, __LINE__);
while ((item=txn->newest_logentry)) {
txn->newest_logentry = item->prev;
rolltype_dispatch_assign(item, toku_commit_, r, txn);
if (r!=0) goto broken;
rolltype_dispatch(item, toku_free_rolltype_);
toku_free(item);
toku_logger_rollback_malloc_size -= sizeof(*item);
toku_logger_rollback_malloc_count --;
r = commit_rollback_item(txn, item);
if (r!=0) { cleanup_txn(txn); return r; }
}
r = 0;
}
list_remove(&txn->live_txns_link);
toku_free(txn);
// Read stuff out of the file and execute it.
if (txn->rollentry_filename) {
while (txn->rollentry_filesize>0) {
struct roll_entry *item;
r = toku_read_rollback_backwards(txn->rollentry_fd, txn->rollentry_filesize, &item, &txn->rollentry_filesize);
if (r!=0) { cleanup_txn(txn); return r; }
r = commit_rollback_item(txn, item);
if (r!=0) { cleanup_txn(txn); return r; }
}
}
}
cleanup_txn(txn);
return r;
}
......@@ -410,6 +458,10 @@ int toku_logger_txn_begin (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER
result->parent = parent_tokutxn;
result->oldest_logentry = result->newest_logentry = 0;
list_push(&logger->live_txns, &result->live_txns_link);
result->rollentry_resident_bytecount=0;
result->rollentry_filename = 0;
result->rollentry_fd = -1;
result->rollentry_filesize = 0;
*tokutxn = result;
return 0;
}
......@@ -507,6 +559,7 @@ int toku_fread_DISKOFF (FILE *f, DISKOFF *diskoff, u_int32_t *crc, u_int32_t *le
int toku_fread_TXNID (FILE *f, TXNID *txnid, u_int32_t *crc, u_int32_t *len) {
return toku_fread_u_int64_t (f, txnid, crc, len);
}
// fills in the bs with malloced data.
int toku_fread_BYTESTRING (FILE *f, BYTESTRING *bs, u_int32_t *crc, u_int32_t *len) {
int r=toku_fread_u_int32_t(f, (u_int32_t*)&bs->len, crc, len);
......@@ -699,20 +752,31 @@ int toku_logger_abort(TOKUTXN txn) {
//printf("%s:%d aborting\n", __FILE__, __LINE__);
// Must undo everything. Must undo it all in reverse order.
// Build the reverse list
struct roll_entry *item;
//printf("%s:%d abort\n", __FILE__, __LINE__);
{
int r = toku_log_xabort(txn->logger, (LSN*)0, 0, txn->txnid64);
if (r!=0) return r;
}
{
struct roll_entry *item;
while ((item=txn->newest_logentry)) {
txn->newest_logentry = item->prev;
int r;
rolltype_dispatch_assign(item, toku_rollback_, r, txn);
if (r!=0) return r;
rolltype_dispatch(item, toku_free_rolltype_);
toku_free(item);
toku_logger_rollback_malloc_size -= sizeof(*item);
toku_logger_rollback_malloc_count --;
int r=abort_rollback_item(txn, item);
if (r!=0) { cleanup_txn(txn); return r; }
}
}
list_remove(&txn->live_txns_link);
toku_free(txn);
// Read stuff out of the file and roll it back.
if (txn->rollentry_filename) {
while (txn->rollentry_filesize>0) {
struct roll_entry *item;
int r = toku_read_rollback_backwards(txn->rollentry_fd, txn->rollentry_filesize, &item, &txn->rollentry_filesize);
if (r!=0) { cleanup_txn(txn); return r; }
r=abort_rollback_item(txn, item);
if (r!=0) { cleanup_txn(txn); return r; }
}
}
cleanup_txn(txn);
return 0;
}
......@@ -832,3 +896,59 @@ int toku_logger_log_archive (TOKULOGGER logger, char ***logs_p, int flags) {
*logs_p = result;
return 0;
}
int toku_maybe_spill_rollbacks (TOKUTXN txn) {
if (txn->rollentry_resident_bytecount>1<<20) {
struct roll_entry *item;
ssize_t bufsize = txn->rollentry_resident_bytecount;
char *MALLOC_N(bufsize, buf);
if (bufsize==0) return errno;
struct wbuf w;
wbuf_init(&w, buf, bufsize);
while ((item=txn->oldest_logentry)) {
assert(item->prev==0);
u_int32_t rollback_fsize = toku_logger_rollback_fsize(item);
txn->rollentry_resident_bytecount -= rollback_fsize;
txn->oldest_logentry = item->next;
if (item->next) { item->next->prev=0; }
toku_logger_rollback_wbufwrite(&w, item);
rolltype_dispatch(item, toku_free_rolltype_);
toku_free(item);
}
assert(txn->rollentry_resident_bytecount==0);
assert(w.ndone==bufsize);
txn->oldest_logentry = txn->newest_logentry = 0;
if (txn->rollentry_fd<0) {
const char filenamepart[] = "/__rolltmp.XXXXXX";
char fnamelen = strlen(txn->logger->directory)+sizeof(filenamepart);
assert(txn->rollentry_filename==0);
txn->rollentry_filename = toku_malloc(fnamelen);
if (txn->rollentry_filename==0) return errno;
snprintf(txn->rollentry_filename, fnamelen, "%s/__rolltmp.XXXXXX", txn->logger->directory);
txn->rollentry_fd = mkstemp(txn->rollentry_filename);
if (txn->rollentry_fd==-1) return errno;
}
ssize_t r = write_it(txn->rollentry_fd, buf, w.ndone);
if (r<0) return r;
assert(r==w.ndone);
txn->rollentry_filesize+=w.ndone;
toku_free(buf);
}
return 0;
}
int toku_read_rollback_backwards(int fd, off_t at, struct roll_entry **item, off_t *new_at) {
if (at==0) return -1;
u_int32_t nbytes_n; ssize_t sr;
if ((sr=pread(fd, &nbytes_n, 4, at-4))!=4) { assert(sr<0); return errno; }
u_int32_t n_bytes=ntohl(nbytes_n);
assert(at>=n_bytes);
unsigned char *buf = toku_malloc(n_bytes);
if (buf==0) return errno;
if ((sr=pread(fd, buf, n_bytes, at-n_bytes))!=n_bytes) { assert(sr<0); return errno; }
int r = toku_parse_rollback(buf, n_bytes, item);
if (r!=0) return r;
(*new_at) -= n_bytes;
toku_free(buf);
return 0;
}
......@@ -157,4 +157,6 @@ int toku_txnid2txn (TOKULOGGER logger, TXNID txnid, TOKUTXN *result);
int tokudb_recover(const char *datadir, const char *logdir);
int toku_logger_log_archive (TOKULOGGER logger, char ***logs_p, int flags);
int toku_maybe_spill_rollbacks (TOKUTXN txn);
#endif
......@@ -77,6 +77,7 @@ const struct logtype rollbacks[] = {
const struct logtype logtypes[] = {
{"checkpoint", 'x', FA{NULLFIELD}},
{"commit", 'C', FA{{"TXNID", "txnid", 0},NULLFIELD}},
{"xabort", 'q', FA{{"TXNID", "txnid", 0},NULLFIELD}},
{"xbegin", 'b', FA{{"TXNID", "parenttxnid", 0},NULLFIELD}},
#if 0
{"tl_delete", 'D', FA{{"FILENUM", "filenum", 0}, // tl logentries can be used, by themselves, to rebuild the whole DB from scratch.
......@@ -325,6 +326,7 @@ void generate_log_struct (void) {
DO_ROLLBACKS(lt, fprintf(hf," struct rolltype_%s %s;\n", lt->name, lt->name));
fprintf(hf, " } u;\n");
fprintf(hf, " struct roll_entry *prev; /* for in-memory list of log entries. Threads from newest to oldest. */\n");
fprintf(hf, " struct roll_entry *next; /* Points to a newer logentry. Needed for flushing to disk, since we want to write the oldest one first. */\n");
fprintf(hf, "};\n");
}
......@@ -492,34 +494,91 @@ void generate_logprint (void) {
}
void generate_rollbacks (void) {
fprintf(cf, " u_int64_t toku_logger_rollback_malloc_size=0, toku_logger_rollback_malloc_count=0;\n");
fprintf(cf, "u_int64_t toku_logger_rollback_malloc_size=0, toku_logger_rollback_malloc_count=0;\n");
fprintf(hf, "extern u_int64_t toku_logger_rollback_malloc_size, toku_logger_rollback_malloc_count;\n");
DO_ROLLBACKS(lt, ({
fprintf2(cf, hf, "int toku_logger_save_rollback_%s (TOKUTXN txn", lt->name);
DO_FIELDS(ft, lt, fprintf2(cf, hf, ", %s %s", ft->type, ft->name));
fprintf(hf, ");\n");
fprintf(cf, ") {\n");
{
int count=0;
fprintf(cf, " u_int32_t rollback_fsize = toku_logger_rollback_fsize_%s(", lt->name);
DO_FIELDS(ft, lt, fprintf(cf, "%s%s", (count++>0)?", ":"", ft->name));
fprintf(cf, ");\n");
}
fprintf(cf, " struct roll_entry *v = toku_malloc(sizeof(*v));\n");
fprintf(cf, " toku_logger_rollback_malloc_count++; toku_logger_rollback_malloc_size+=sizeof(*v);\n");
fprintf(cf, " toku_logger_rollback_malloc_count++; toku_logger_rollback_malloc_size+=rollback_fsize;\n");
fprintf(cf, " if (v==0) return errno;\n");
fprintf(cf, " v->cmd = %d;\n", lt->command_and_flags&0xff);
DO_FIELDS(ft, lt, fprintf(cf, " v->u.%s.%s = %s;\n", lt->name, ft->name, ft->name));
fprintf(cf, " v->prev = txn->newest_logentry;\n");
fprintf(cf, " v->next = 0;\n");
fprintf(cf, " if (txn->oldest_logentry==0) txn->oldest_logentry=v;\n");
fprintf(cf, " else txn->newest_logentry->next = v;\n");
fprintf(cf, " txn->newest_logentry = v;\n");
fprintf(cf, " return 0;\n}\n");
fprintf(cf, " txn->rollentry_resident_bytecount += rollback_fsize;\n");
fprintf(cf, " return toku_maybe_spill_rollbacks(txn);\n}\n");
}));
DO_ROLLBACKS(lt, ({
fprintf2(cf, hf, "void toku_logger_rollback_wbufwrite_%s (struct wbuf *wbuf", lt->name);
DO_FIELDS(ft, lt, fprintf2(cf, hf, ", %s %s", ft->type, ft->name));
fprintf2(cf, hf, ")");
fprintf(hf, ";\n");
fprintf(cf, " {\n");
fprintf(cf, " u_int32_t ndone_at_start = wbuf->ndone;\n");
fprintf(cf, " wbuf_char(wbuf, '%c');\n", 0xff&lt->command_and_flags);
DO_FIELDS(ft, lt, fprintf(cf, " wbuf_%s(wbuf, %s);\n", ft->type, ft->name));
fprintf(cf, " wbuf_int(wbuf, 4+wbuf->ndone - ndone_at_start);\n");
fprintf(cf, "}\n");
}));
fprintf2(cf, hf, "void toku_logger_rollback_wbufwrite (struct wbuf *wbuf, struct roll_entry *r)");
fprintf(hf, ";\n");
fprintf(cf, " {\n switch (r->cmd) {\n");
DO_ROLLBACKS(lt, ({
fprintf(cf, " case RT_%s: toku_logger_rollback_wbufwrite_%s(wbuf", lt->name, lt->name);
DO_FIELDS(ft, lt, fprintf(cf, ", r->u.%s.%s", lt->name, ft->name));
fprintf(cf, "); return;\n");
}));
fprintf(cf, " }\n assert(0);\n");
fprintf(cf, "}\n");
DO_ROLLBACKS(lt, ({
fprintf2(cf, hf, "int toku_logger_rollback_fsize_%s (", lt->name);
fprintf2(cf, hf, "u_int32_t toku_logger_rollback_fsize_%s (", lt->name);
int count=0;
DO_FIELDS(ft, lt, fprintf2(cf, hf, "%s%s %s", (count++>0)?", ":"", ft->type, ft->name));
fprintf(hf, ");\n");
fprintf(cf, ") {\n");
fprintf(cf, " return 1 // the cmd");
fprintf(cf, " return 1 /* the cmd*/\n");
fprintf(cf, " + 4 /* the int at the end saying the size */");
DO_FIELDS(ft, lt,
fprintf(cf, "\n + toku_logsizeof_%s(%s)", ft->type, ft->name));
fprintf(cf, ";\n}\n");
}));
fprintf2(cf, hf, "u_int32_t toku_logger_rollback_fsize(struct roll_entry *item)");
fprintf(hf, ";\n");
fprintf(cf, "{\n switch(item->cmd) {\n");
DO_ROLLBACKS(lt, ({
fprintf(cf, " case RT_%s: return toku_logger_rollback_fsize_%s(", lt->name, lt->name);
int count=0;
DO_FIELDS(ft, lt, fprintf(cf, "%sitem->u.%s.%s", (count++>0)?", ":"", lt->name, ft->name));
fprintf(cf, ");\n");
}));
fprintf(cf, " }\n assert(0);\n return 0;\n");
fprintf(cf, "}\n");
fprintf2(cf, hf, "int toku_parse_rollback(unsigned char *buf, u_int32_t n_bytes, struct roll_entry **itemp)");
fprintf(hf, ";\n");
fprintf(cf, " {\n assert(n_bytes>0);\n struct roll_entry *item = toku_malloc(sizeof(*item));\n item->cmd=buf[0];\n");
fprintf(cf, " struct rbuf rc = {buf, n_bytes, 1};\n");
fprintf(cf, " switch(item->cmd) {\n");
DO_ROLLBACKS(lt, ({
fprintf(cf, " case RT_%s:\n", lt->name);
DO_FIELDS(ft, lt, fprintf(cf, " rbuf_%s(&rc, &item->u.%s.%s);\n", ft->type, lt->name, ft->name));
fprintf(cf, " *itemp = item;\n");
fprintf(cf, " return 0;\n");
}));
fprintf(cf, " }\n return EINVAL;\n}\n");
}
......
......@@ -4,6 +4,7 @@
#ident "Copyright (c) 2007 Tokutek Inc. All rights reserved."
#include "toku_assert.h"
#include "memory.h"
struct rbuf {
unsigned char *buf;
......@@ -51,4 +52,23 @@ static inline DISKOFF rbuf_diskoff (struct rbuf *r) {
unsigned i1 = rbuf_int(r);
return ((unsigned long long)(i0)<<32) | ((unsigned long long)(i1));
}
static inline void rbuf_TXNID (struct rbuf *r, TXNID *txnid) {
*txnid = rbuf_ulonglong(r);
}
static inline void rbuf_FILENUM (struct rbuf *r, FILENUM *filenum) {
filenum->fileid = rbuf_int(r);
}
// Don't try to use the same space, malloc it
static inline void rbuf_BYTESTRING (struct rbuf *r, BYTESTRING *bs) {
bs->len = rbuf_int(r);
u_int32_t newndone = r->ndone + bs->len;
assert(newndone < r->size);
bs->data = toku_memdup(&r->buf[r->ndone], (size_t)bs->len);
assert(bs->data);
r->ndone = newndone;
}
#endif
......@@ -62,6 +62,8 @@ void toku_recover_cleanup (void) {
void toku_recover_commit (LSN UU(lsn), TXNID UU(txnid)) {
}
void toku_recover_xabort (LSN UU(lsn), TXNID UU(txnid)) {
}
void create_dir_from_file (const char *fname) {
int i;
......
......@@ -74,7 +74,7 @@ foo:
echo ALL_TESTS: $(ALL_TESTS)
.PHONY: check check.bdb check.tdb
check: check.bdb check.tdb test_db_assoc3.tdbrun_wasbad
check: check.tdb test_db_assoc3.tdbrun_wasbad check.bdb
@ echo ok $@
tests: tests.bdb tests.tdb
@ echo ok $@
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment