Commit 07125ac2 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Store the rollbacks on disk. Fixes #711. Addresses #698 (maybe fixes it).


git-svn-id: file:///svn/tokudb@3528 c7de825b-a66e-492c-adef-691d508d4ae1
parent 7ea8678f
...@@ -130,7 +130,7 @@ tdb-recover: $(OFILES) ...@@ -130,7 +130,7 @@ tdb-recover: $(OFILES)
roll.o: log_header.h log-internal.h log.h yerror.h brttypes.h kv-pair.h memory.h key.h cachetable.h gpma.h roll.o: log_header.h log-internal.h log.h yerror.h brttypes.h kv-pair.h memory.h key.h cachetable.h gpma.h
log_code.o: log_header.h wbuf.h log-internal.h log_code.o: log_header.h wbuf.h log-internal.h rbuf.h
log_header.h: log_code.c log_header.h: log_code.c
@echo generated log_code.c so log_header.c was also generated @echo generated log_code.c so log_header.c was also generated
log_code.c: logformat log_code.c: logformat
......
...@@ -82,6 +82,10 @@ struct tokutxn { ...@@ -82,6 +82,10 @@ struct tokutxn {
LSN first_lsn; /* The first lsn in the transaction. */ LSN first_lsn; /* The first lsn in the transaction. */
struct roll_entry *oldest_logentry,*newest_logentry; /* Only logentries with rollbacks are here. There is a list going from newest to oldest. */ struct roll_entry *oldest_logentry,*newest_logentry; /* Only logentries with rollbacks are here. There is a list going from newest to oldest. */
struct list live_txns_link; struct list live_txns_link;
size_t rollentry_resident_bytecount; // How many bytes for the rollentries that are stored in main memory.
char *rollentry_filename;
int rollentry_fd; // If we spill the roll_entries, we write them into this fd.
off_t rollentry_filesize; // How many bytes are in the rollentry.
}; };
int toku_logger_finish (TOKULOGGER logger, struct logbytes *logbytes, struct wbuf *wbuf, int do_fsync); int toku_logger_finish (TOKULOGGER logger, struct logbytes *logbytes, struct wbuf *wbuf, int do_fsync);
...@@ -136,3 +140,5 @@ static inline char *fixup_fname(BYTESTRING *f) { ...@@ -136,3 +140,5 @@ static inline char *fixup_fname(BYTESTRING *f) {
fname[f->len]=0; fname[f->len]=0;
return fname; return fname;
} }
int toku_read_rollback_backwards(int fd, off_t at, struct roll_entry **item, off_t *new_at);
/* -*- mode: C; c-basic-offset: 4 -*- */ /* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved." #ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
#define _XOPEN_SOURCE 500
#include <arpa/inet.h> #include <arpa/inet.h>
#include <ctype.h> #include <ctype.h>
#include <dirent.h> #include <dirent.h>
...@@ -342,21 +343,61 @@ int toku_logger_finish (TOKULOGGER logger, struct logbytes *logbytes, struct wbu ...@@ -342,21 +343,61 @@ int toku_logger_finish (TOKULOGGER logger, struct logbytes *logbytes, struct wbu
return toku_logger_log_bytes(logger, logbytes, do_fsync); return toku_logger_log_bytes(logger, logbytes, do_fsync);
} }
static void cleanup_txn (TOKUTXN txn) {
struct roll_entry *item;
while ((item=txn->newest_logentry)) {
txn->newest_logentry = item->prev;
rolltype_dispatch(item, toku_free_rolltype_);
toku_free(item);
toku_logger_rollback_malloc_size -= sizeof(*item);
toku_logger_rollback_malloc_count --;
}
if (txn->rollentry_filename!=0) {
int r = close(txn->rollentry_fd);
assert(r==0);
r = unlink(txn->rollentry_filename);
assert(r==0);
toku_free(txn->rollentry_filename);
}
list_remove(&txn->live_txns_link);
toku_free(txn);
return;
}
static int commit_rollback_item (TOKUTXN txn, struct roll_entry *item) {
int r=0;
rolltype_dispatch_assign(item, toku_commit_, r, txn);
if (r!=0) return r;
u_int32_t rollback_fsize = toku_logger_rollback_fsize(item);
toku_logger_rollback_malloc_size -= rollback_fsize;
toku_logger_rollback_malloc_count --;
rolltype_dispatch(item, toku_free_rolltype_);
toku_free(item);
return 0;
}
static int abort_rollback_item (TOKUTXN txn, struct roll_entry *item) {
int r=0;
rolltype_dispatch_assign(item, toku_rollback_, r, txn);
if (r!=0) return r;
u_int32_t rollback_fsize = toku_logger_rollback_fsize(item);
toku_logger_rollback_malloc_size -= rollback_fsize;
toku_logger_rollback_malloc_count --;
rolltype_dispatch(item, toku_free_rolltype_);
toku_free(item);
return 0;
}
int toku_logger_commit (TOKUTXN txn, int nosync) { int toku_logger_commit (TOKUTXN txn, int nosync) {
// printf("%s:%d committing\n", __FILE__, __LINE__); // printf("%s:%d committing\n", __FILE__, __LINE__);
// panic handled in log_commit // panic handled in log_commit
int r = toku_log_commit(txn->logger, (LSN*)0, (txn->parent==0) && !nosync, txn->txnid64); // exits holding neither of the tokulogger locks. int r = toku_log_commit(txn->logger, (LSN*)0, (txn->parent==0) && !nosync, txn->txnid64); // exits holding neither of the tokulogger locks.
if (r!=0) { if (r!=0) {
struct roll_entry *item; cleanup_txn(txn);
broken: return r;
while ((item=txn->newest_logentry)) {
txn->newest_logentry = item->prev;
rolltype_dispatch(item, toku_free_rolltype_);
toku_free(item);
toku_logger_rollback_malloc_size -= sizeof(*item);
toku_logger_rollback_malloc_count --;
}
r = 0;
} else if (txn->parent!=0) { } else if (txn->parent!=0) {
// Append the list to the front. // Append the list to the front.
if (txn->oldest_logentry) { if (txn->oldest_logentry) {
...@@ -368,25 +409,32 @@ int toku_logger_commit (TOKUTXN txn, int nosync) { ...@@ -368,25 +409,32 @@ int toku_logger_commit (TOKUTXN txn, int nosync) {
txn->parent->oldest_logentry = txn->oldest_logentry; txn->parent->oldest_logentry = txn->oldest_logentry;
} }
txn->newest_logentry = txn->oldest_logentry = 0; txn->newest_logentry = txn->oldest_logentry = 0;
r = 0; assert(txn->rollentry_filename==0); // This code isn't ready for this case. ??? When committing a child, we have to get the child's records into the parent.
} else { } else {
// do the commit calls and free everything // do the commit calls and free everything
// we do the commit calls in reverse order too. // we do the commit calls in reverse order too.
struct roll_entry *item; {
//printf("%s:%d abort\n", __FILE__, __LINE__); struct roll_entry *item;
while ((item=txn->newest_logentry)) { //printf("%s:%d abort\n", __FILE__, __LINE__);
txn->newest_logentry = item->prev; while ((item=txn->newest_logentry)) {
rolltype_dispatch_assign(item, toku_commit_, r, txn); txn->newest_logentry = item->prev;
if (r!=0) goto broken; r = commit_rollback_item(txn, item);
rolltype_dispatch(item, toku_free_rolltype_); if (r!=0) { cleanup_txn(txn); return r; }
toku_free(item); }
toku_logger_rollback_malloc_size -= sizeof(*item); }
toku_logger_rollback_malloc_count --;
// Read stuff out of the file and execute it.
if (txn->rollentry_filename) {
while (txn->rollentry_filesize>0) {
struct roll_entry *item;
r = toku_read_rollback_backwards(txn->rollentry_fd, txn->rollentry_filesize, &item, &txn->rollentry_filesize);
if (r!=0) { cleanup_txn(txn); return r; }
r = commit_rollback_item(txn, item);
if (r!=0) { cleanup_txn(txn); return r; }
}
} }
r = 0;
} }
list_remove(&txn->live_txns_link); cleanup_txn(txn);
toku_free(txn);
return r; return r;
} }
...@@ -410,6 +458,10 @@ int toku_logger_txn_begin (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER ...@@ -410,6 +458,10 @@ int toku_logger_txn_begin (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER
result->parent = parent_tokutxn; result->parent = parent_tokutxn;
result->oldest_logentry = result->newest_logentry = 0; result->oldest_logentry = result->newest_logentry = 0;
list_push(&logger->live_txns, &result->live_txns_link); list_push(&logger->live_txns, &result->live_txns_link);
result->rollentry_resident_bytecount=0;
result->rollentry_filename = 0;
result->rollentry_fd = -1;
result->rollentry_filesize = 0;
*tokutxn = result; *tokutxn = result;
return 0; return 0;
} }
...@@ -507,6 +559,7 @@ int toku_fread_DISKOFF (FILE *f, DISKOFF *diskoff, u_int32_t *crc, u_int32_t *le ...@@ -507,6 +559,7 @@ int toku_fread_DISKOFF (FILE *f, DISKOFF *diskoff, u_int32_t *crc, u_int32_t *le
int toku_fread_TXNID (FILE *f, TXNID *txnid, u_int32_t *crc, u_int32_t *len) { int toku_fread_TXNID (FILE *f, TXNID *txnid, u_int32_t *crc, u_int32_t *len) {
return toku_fread_u_int64_t (f, txnid, crc, len); return toku_fread_u_int64_t (f, txnid, crc, len);
} }
// fills in the bs with malloced data. // fills in the bs with malloced data.
int toku_fread_BYTESTRING (FILE *f, BYTESTRING *bs, u_int32_t *crc, u_int32_t *len) { int toku_fread_BYTESTRING (FILE *f, BYTESTRING *bs, u_int32_t *crc, u_int32_t *len) {
int r=toku_fread_u_int32_t(f, (u_int32_t*)&bs->len, crc, len); int r=toku_fread_u_int32_t(f, (u_int32_t*)&bs->len, crc, len);
...@@ -699,20 +752,31 @@ int toku_logger_abort(TOKUTXN txn) { ...@@ -699,20 +752,31 @@ int toku_logger_abort(TOKUTXN txn) {
//printf("%s:%d aborting\n", __FILE__, __LINE__); //printf("%s:%d aborting\n", __FILE__, __LINE__);
// Must undo everything. Must undo it all in reverse order. // Must undo everything. Must undo it all in reverse order.
// Build the reverse list // Build the reverse list
struct roll_entry *item;
//printf("%s:%d abort\n", __FILE__, __LINE__); //printf("%s:%d abort\n", __FILE__, __LINE__);
while ((item=txn->newest_logentry)) { {
txn->newest_logentry = item->prev; int r = toku_log_xabort(txn->logger, (LSN*)0, 0, txn->txnid64);
int r;
rolltype_dispatch_assign(item, toku_rollback_, r, txn);
if (r!=0) return r; if (r!=0) return r;
rolltype_dispatch(item, toku_free_rolltype_); }
toku_free(item); {
toku_logger_rollback_malloc_size -= sizeof(*item); struct roll_entry *item;
toku_logger_rollback_malloc_count --; while ((item=txn->newest_logentry)) {
txn->newest_logentry = item->prev;
int r=abort_rollback_item(txn, item);
if (r!=0) { cleanup_txn(txn); return r; }
}
} }
list_remove(&txn->live_txns_link); list_remove(&txn->live_txns_link);
toku_free(txn); // Read stuff out of the file and roll it back.
if (txn->rollentry_filename) {
while (txn->rollentry_filesize>0) {
struct roll_entry *item;
int r = toku_read_rollback_backwards(txn->rollentry_fd, txn->rollentry_filesize, &item, &txn->rollentry_filesize);
if (r!=0) { cleanup_txn(txn); return r; }
r=abort_rollback_item(txn, item);
if (r!=0) { cleanup_txn(txn); return r; }
}
}
cleanup_txn(txn);
return 0; return 0;
} }
...@@ -832,3 +896,59 @@ int toku_logger_log_archive (TOKULOGGER logger, char ***logs_p, int flags) { ...@@ -832,3 +896,59 @@ int toku_logger_log_archive (TOKULOGGER logger, char ***logs_p, int flags) {
*logs_p = result; *logs_p = result;
return 0; return 0;
} }
int toku_maybe_spill_rollbacks (TOKUTXN txn) {
if (txn->rollentry_resident_bytecount>1<<20) {
struct roll_entry *item;
ssize_t bufsize = txn->rollentry_resident_bytecount;
char *MALLOC_N(bufsize, buf);
if (bufsize==0) return errno;
struct wbuf w;
wbuf_init(&w, buf, bufsize);
while ((item=txn->oldest_logentry)) {
assert(item->prev==0);
u_int32_t rollback_fsize = toku_logger_rollback_fsize(item);
txn->rollentry_resident_bytecount -= rollback_fsize;
txn->oldest_logentry = item->next;
if (item->next) { item->next->prev=0; }
toku_logger_rollback_wbufwrite(&w, item);
rolltype_dispatch(item, toku_free_rolltype_);
toku_free(item);
}
assert(txn->rollentry_resident_bytecount==0);
assert(w.ndone==bufsize);
txn->oldest_logentry = txn->newest_logentry = 0;
if (txn->rollentry_fd<0) {
const char filenamepart[] = "/__rolltmp.XXXXXX";
char fnamelen = strlen(txn->logger->directory)+sizeof(filenamepart);
assert(txn->rollentry_filename==0);
txn->rollentry_filename = toku_malloc(fnamelen);
if (txn->rollentry_filename==0) return errno;
snprintf(txn->rollentry_filename, fnamelen, "%s/__rolltmp.XXXXXX", txn->logger->directory);
txn->rollentry_fd = mkstemp(txn->rollentry_filename);
if (txn->rollentry_fd==-1) return errno;
}
ssize_t r = write_it(txn->rollentry_fd, buf, w.ndone);
if (r<0) return r;
assert(r==w.ndone);
txn->rollentry_filesize+=w.ndone;
toku_free(buf);
}
return 0;
}
int toku_read_rollback_backwards(int fd, off_t at, struct roll_entry **item, off_t *new_at) {
if (at==0) return -1;
u_int32_t nbytes_n; ssize_t sr;
if ((sr=pread(fd, &nbytes_n, 4, at-4))!=4) { assert(sr<0); return errno; }
u_int32_t n_bytes=ntohl(nbytes_n);
assert(at>=n_bytes);
unsigned char *buf = toku_malloc(n_bytes);
if (buf==0) return errno;
if ((sr=pread(fd, buf, n_bytes, at-n_bytes))!=n_bytes) { assert(sr<0); return errno; }
int r = toku_parse_rollback(buf, n_bytes, item);
if (r!=0) return r;
(*new_at) -= n_bytes;
toku_free(buf);
return 0;
}
...@@ -157,4 +157,6 @@ int toku_txnid2txn (TOKULOGGER logger, TXNID txnid, TOKUTXN *result); ...@@ -157,4 +157,6 @@ int toku_txnid2txn (TOKULOGGER logger, TXNID txnid, TOKUTXN *result);
int tokudb_recover(const char *datadir, const char *logdir); int tokudb_recover(const char *datadir, const char *logdir);
int toku_logger_log_archive (TOKULOGGER logger, char ***logs_p, int flags); int toku_logger_log_archive (TOKULOGGER logger, char ***logs_p, int flags);
int toku_maybe_spill_rollbacks (TOKUTXN txn);
#endif #endif
...@@ -77,6 +77,7 @@ const struct logtype rollbacks[] = { ...@@ -77,6 +77,7 @@ const struct logtype rollbacks[] = {
const struct logtype logtypes[] = { const struct logtype logtypes[] = {
{"checkpoint", 'x', FA{NULLFIELD}}, {"checkpoint", 'x', FA{NULLFIELD}},
{"commit", 'C', FA{{"TXNID", "txnid", 0},NULLFIELD}}, {"commit", 'C', FA{{"TXNID", "txnid", 0},NULLFIELD}},
{"xabort", 'q', FA{{"TXNID", "txnid", 0},NULLFIELD}},
{"xbegin", 'b', FA{{"TXNID", "parenttxnid", 0},NULLFIELD}}, {"xbegin", 'b', FA{{"TXNID", "parenttxnid", 0},NULLFIELD}},
#if 0 #if 0
{"tl_delete", 'D', FA{{"FILENUM", "filenum", 0}, // tl logentries can be used, by themselves, to rebuild the whole DB from scratch. {"tl_delete", 'D', FA{{"FILENUM", "filenum", 0}, // tl logentries can be used, by themselves, to rebuild the whole DB from scratch.
...@@ -325,6 +326,7 @@ void generate_log_struct (void) { ...@@ -325,6 +326,7 @@ void generate_log_struct (void) {
DO_ROLLBACKS(lt, fprintf(hf," struct rolltype_%s %s;\n", lt->name, lt->name)); DO_ROLLBACKS(lt, fprintf(hf," struct rolltype_%s %s;\n", lt->name, lt->name));
fprintf(hf, " } u;\n"); fprintf(hf, " } u;\n");
fprintf(hf, " struct roll_entry *prev; /* for in-memory list of log entries. Threads from newest to oldest. */\n"); fprintf(hf, " struct roll_entry *prev; /* for in-memory list of log entries. Threads from newest to oldest. */\n");
fprintf(hf, " struct roll_entry *next; /* Points to a newer logentry. Needed for flushing to disk, since we want to write the oldest one first. */\n");
fprintf(hf, "};\n"); fprintf(hf, "};\n");
} }
...@@ -492,34 +494,91 @@ void generate_logprint (void) { ...@@ -492,34 +494,91 @@ void generate_logprint (void) {
} }
void generate_rollbacks (void) { void generate_rollbacks (void) {
fprintf(cf, " u_int64_t toku_logger_rollback_malloc_size=0, toku_logger_rollback_malloc_count=0;\n"); fprintf(cf, "u_int64_t toku_logger_rollback_malloc_size=0, toku_logger_rollback_malloc_count=0;\n");
fprintf(hf, "extern u_int64_t toku_logger_rollback_malloc_size, toku_logger_rollback_malloc_count;\n"); fprintf(hf, "extern u_int64_t toku_logger_rollback_malloc_size, toku_logger_rollback_malloc_count;\n");
DO_ROLLBACKS(lt, ({ DO_ROLLBACKS(lt, ({
fprintf2(cf, hf, "int toku_logger_save_rollback_%s (TOKUTXN txn", lt->name); fprintf2(cf, hf, "int toku_logger_save_rollback_%s (TOKUTXN txn", lt->name);
DO_FIELDS(ft, lt, fprintf2(cf, hf, ", %s %s", ft->type, ft->name)); DO_FIELDS(ft, lt, fprintf2(cf, hf, ", %s %s", ft->type, ft->name));
fprintf(hf, ");\n"); fprintf(hf, ");\n");
fprintf(cf, ") {\n"); fprintf(cf, ") {\n");
{
int count=0;
fprintf(cf, " u_int32_t rollback_fsize = toku_logger_rollback_fsize_%s(", lt->name);
DO_FIELDS(ft, lt, fprintf(cf, "%s%s", (count++>0)?", ":"", ft->name));
fprintf(cf, ");\n");
}
fprintf(cf, " struct roll_entry *v = toku_malloc(sizeof(*v));\n"); fprintf(cf, " struct roll_entry *v = toku_malloc(sizeof(*v));\n");
fprintf(cf, " toku_logger_rollback_malloc_count++; toku_logger_rollback_malloc_size+=sizeof(*v);\n"); fprintf(cf, " toku_logger_rollback_malloc_count++; toku_logger_rollback_malloc_size+=rollback_fsize;\n");
fprintf(cf, " if (v==0) return errno;\n"); fprintf(cf, " if (v==0) return errno;\n");
fprintf(cf, " v->cmd = %d;\n", lt->command_and_flags&0xff); fprintf(cf, " v->cmd = %d;\n", lt->command_and_flags&0xff);
DO_FIELDS(ft, lt, fprintf(cf, " v->u.%s.%s = %s;\n", lt->name, ft->name, ft->name)); DO_FIELDS(ft, lt, fprintf(cf, " v->u.%s.%s = %s;\n", lt->name, ft->name, ft->name));
fprintf(cf, " v->prev = txn->newest_logentry;\n"); fprintf(cf, " v->prev = txn->newest_logentry;\n");
fprintf(cf, " v->next = 0;\n");
fprintf(cf, " if (txn->oldest_logentry==0) txn->oldest_logentry=v;\n"); fprintf(cf, " if (txn->oldest_logentry==0) txn->oldest_logentry=v;\n");
fprintf(cf, " else txn->newest_logentry->next = v;\n");
fprintf(cf, " txn->newest_logentry = v;\n"); fprintf(cf, " txn->newest_logentry = v;\n");
fprintf(cf, " return 0;\n}\n"); fprintf(cf, " txn->rollentry_resident_bytecount += rollback_fsize;\n");
fprintf(cf, " return toku_maybe_spill_rollbacks(txn);\n}\n");
})); }));
DO_ROLLBACKS(lt, ({
fprintf2(cf, hf, "void toku_logger_rollback_wbufwrite_%s (struct wbuf *wbuf", lt->name);
DO_FIELDS(ft, lt, fprintf2(cf, hf, ", %s %s", ft->type, ft->name));
fprintf2(cf, hf, ")");
fprintf(hf, ";\n");
fprintf(cf, " {\n");
fprintf(cf, " u_int32_t ndone_at_start = wbuf->ndone;\n");
fprintf(cf, " wbuf_char(wbuf, '%c');\n", 0xff&lt->command_and_flags);
DO_FIELDS(ft, lt, fprintf(cf, " wbuf_%s(wbuf, %s);\n", ft->type, ft->name));
fprintf(cf, " wbuf_int(wbuf, 4+wbuf->ndone - ndone_at_start);\n");
fprintf(cf, "}\n");
}));
fprintf2(cf, hf, "void toku_logger_rollback_wbufwrite (struct wbuf *wbuf, struct roll_entry *r)");
fprintf(hf, ";\n");
fprintf(cf, " {\n switch (r->cmd) {\n");
DO_ROLLBACKS(lt, ({ DO_ROLLBACKS(lt, ({
fprintf2(cf, hf, "int toku_logger_rollback_fsize_%s (", lt->name); fprintf(cf, " case RT_%s: toku_logger_rollback_wbufwrite_%s(wbuf", lt->name, lt->name);
DO_FIELDS(ft, lt, fprintf(cf, ", r->u.%s.%s", lt->name, ft->name));
fprintf(cf, "); return;\n");
}));
fprintf(cf, " }\n assert(0);\n");
fprintf(cf, "}\n");
DO_ROLLBACKS(lt, ({
fprintf2(cf, hf, "u_int32_t toku_logger_rollback_fsize_%s (", lt->name);
int count=0; int count=0;
DO_FIELDS(ft, lt, fprintf2(cf, hf, "%s%s %s", (count++>0)?", ":"", ft->type, ft->name)); DO_FIELDS(ft, lt, fprintf2(cf, hf, "%s%s %s", (count++>0)?", ":"", ft->type, ft->name));
fprintf(hf, ");\n"); fprintf(hf, ");\n");
fprintf(cf, ") {\n"); fprintf(cf, ") {\n");
fprintf(cf, " return 1 // the cmd"); fprintf(cf, " return 1 /* the cmd*/\n");
fprintf(cf, " + 4 /* the int at the end saying the size */");
DO_FIELDS(ft, lt, DO_FIELDS(ft, lt,
fprintf(cf, "\n + toku_logsizeof_%s(%s)", ft->type, ft->name)); fprintf(cf, "\n + toku_logsizeof_%s(%s)", ft->type, ft->name));
fprintf(cf, ";\n}\n"); fprintf(cf, ";\n}\n");
})); }));
fprintf2(cf, hf, "u_int32_t toku_logger_rollback_fsize(struct roll_entry *item)");
fprintf(hf, ";\n");
fprintf(cf, "{\n switch(item->cmd) {\n");
DO_ROLLBACKS(lt, ({
fprintf(cf, " case RT_%s: return toku_logger_rollback_fsize_%s(", lt->name, lt->name);
int count=0;
DO_FIELDS(ft, lt, fprintf(cf, "%sitem->u.%s.%s", (count++>0)?", ":"", lt->name, ft->name));
fprintf(cf, ");\n");
}));
fprintf(cf, " }\n assert(0);\n return 0;\n");
fprintf(cf, "}\n");
fprintf2(cf, hf, "int toku_parse_rollback(unsigned char *buf, u_int32_t n_bytes, struct roll_entry **itemp)");
fprintf(hf, ";\n");
fprintf(cf, " {\n assert(n_bytes>0);\n struct roll_entry *item = toku_malloc(sizeof(*item));\n item->cmd=buf[0];\n");
fprintf(cf, " struct rbuf rc = {buf, n_bytes, 1};\n");
fprintf(cf, " switch(item->cmd) {\n");
DO_ROLLBACKS(lt, ({
fprintf(cf, " case RT_%s:\n", lt->name);
DO_FIELDS(ft, lt, fprintf(cf, " rbuf_%s(&rc, &item->u.%s.%s);\n", ft->type, lt->name, ft->name));
fprintf(cf, " *itemp = item;\n");
fprintf(cf, " return 0;\n");
}));
fprintf(cf, " }\n return EINVAL;\n}\n");
} }
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
#ident "Copyright (c) 2007 Tokutek Inc. All rights reserved." #ident "Copyright (c) 2007 Tokutek Inc. All rights reserved."
#include "toku_assert.h" #include "toku_assert.h"
#include "memory.h"
struct rbuf { struct rbuf {
unsigned char *buf; unsigned char *buf;
...@@ -51,4 +52,23 @@ static inline DISKOFF rbuf_diskoff (struct rbuf *r) { ...@@ -51,4 +52,23 @@ static inline DISKOFF rbuf_diskoff (struct rbuf *r) {
unsigned i1 = rbuf_int(r); unsigned i1 = rbuf_int(r);
return ((unsigned long long)(i0)<<32) | ((unsigned long long)(i1)); return ((unsigned long long)(i0)<<32) | ((unsigned long long)(i1));
} }
static inline void rbuf_TXNID (struct rbuf *r, TXNID *txnid) {
*txnid = rbuf_ulonglong(r);
}
static inline void rbuf_FILENUM (struct rbuf *r, FILENUM *filenum) {
filenum->fileid = rbuf_int(r);
}
// Don't try to use the same space, malloc it
static inline void rbuf_BYTESTRING (struct rbuf *r, BYTESTRING *bs) {
bs->len = rbuf_int(r);
u_int32_t newndone = r->ndone + bs->len;
assert(newndone < r->size);
bs->data = toku_memdup(&r->buf[r->ndone], (size_t)bs->len);
assert(bs->data);
r->ndone = newndone;
}
#endif #endif
...@@ -62,6 +62,8 @@ void toku_recover_cleanup (void) { ...@@ -62,6 +62,8 @@ void toku_recover_cleanup (void) {
void toku_recover_commit (LSN UU(lsn), TXNID UU(txnid)) { void toku_recover_commit (LSN UU(lsn), TXNID UU(txnid)) {
} }
void toku_recover_xabort (LSN UU(lsn), TXNID UU(txnid)) {
}
void create_dir_from_file (const char *fname) { void create_dir_from_file (const char *fname) {
int i; int i;
......
...@@ -74,7 +74,7 @@ foo: ...@@ -74,7 +74,7 @@ foo:
echo ALL_TESTS: $(ALL_TESTS) echo ALL_TESTS: $(ALL_TESTS)
.PHONY: check check.bdb check.tdb .PHONY: check check.bdb check.tdb
check: check.bdb check.tdb test_db_assoc3.tdbrun_wasbad check: check.tdb test_db_assoc3.tdbrun_wasbad check.bdb
@ echo ok $@ @ echo ok $@
tests: tests.bdb tests.tdb tests: tests.bdb tests.tdb
@ echo ok $@ @ echo ok $@
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment