Commit 7c0b052f authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Separate the code for generating rollbacks and recover log entries. Addresses #27.

git-svn-id: file:///svn/tokudb@2420 c7de825b-a66e-492c-adef-691d508d4ae1
parent 7d811fd9
...@@ -46,7 +46,7 @@ struct tokutxn { ...@@ -46,7 +46,7 @@ struct tokutxn {
TOKULOGGER logger; TOKULOGGER logger;
TOKUTXN parent; TOKUTXN parent;
LSN last_lsn; /* Everytime anything is logged, update the LSN. (We need to atomically record the LSN along with writing into the log.) */ LSN last_lsn; /* Everytime anything is logged, update the LSN. (We need to atomically record the LSN along with writing into the log.) */
struct log_entry *oldest_logentry,*newest_logentry; /* Only logentries with rollbacks are here. There is a list going from newest to oldest. */ struct roll_entry *oldest_logentry,*newest_logentry; /* Only logentries with rollbacks are here. There is a list going from newest to oldest. */
struct list live_txns_link; struct list live_txns_link;
}; };
......
...@@ -236,10 +236,10 @@ int toku_logger_commit (TOKUTXN txn, int nosync) { ...@@ -236,10 +236,10 @@ int toku_logger_commit (TOKUTXN txn, int nosync) {
txn->newest_logentry = txn->oldest_logentry = 0; txn->newest_logentry = txn->oldest_logentry = 0;
} }
free_and_return: /*nothing*/; free_and_return: /*nothing*/;
struct log_entry *item; struct roll_entry *item;
while ((item=txn->newest_logentry)) { while ((item=txn->newest_logentry)) {
txn->newest_logentry = item->prev; txn->newest_logentry = item->prev;
logtype_dispatch(item, toku_free_logtype_); rolltype_dispatch(item, toku_free_rolltype_);
toku_free(item); toku_free(item);
} }
list_remove(&txn->live_txns_link); list_remove(&txn->live_txns_link);
...@@ -280,7 +280,7 @@ int toku_logger_log_fcreate (TOKUTXN txn, const char *fname, int mode) { ...@@ -280,7 +280,7 @@ int toku_logger_log_fcreate (TOKUTXN txn, const char *fname, int mode) {
BYTESTRING bs = { .len=strlen(fname), .data = strdup(fname) }; BYTESTRING bs = { .len=strlen(fname), .data = strdup(fname) };
int r = toku_log_fcreate (txn->logger, toku_txn_get_txnid(txn), bs, mode); int r = toku_log_fcreate (txn->logger, toku_txn_get_txnid(txn), bs, mode);
if (r!=0) return r; if (r!=0) return r;
r = toku_logger_save_rollback_fcreate(txn, toku_txn_get_txnid(txn), bs, mode); r = toku_logger_save_rollback_fcreate(txn, bs);
return r; return r;
} }
...@@ -608,13 +608,13 @@ int toku_abort_logentry_commit (struct logtype_commit *le __attribute__((__unuse ...@@ -608,13 +608,13 @@ int toku_abort_logentry_commit (struct logtype_commit *le __attribute__((__unuse
int toku_logger_abort(TOKUTXN txn) { int toku_logger_abort(TOKUTXN txn) {
// Must undo everything. Must undo it all in reverse order. // Must undo everything. Must undo it all in reverse order.
// Build the reverse list // Build the reverse list
struct log_entry *item; struct roll_entry *item;
while ((item=txn->newest_logentry)) { while ((item=txn->newest_logentry)) {
txn->newest_logentry = item->prev; txn->newest_logentry = item->prev;
int r; int r;
logtype_dispatch_assign_rollback(item, toku_rollback_, r, txn); rolltype_dispatch_assign(item, toku_rollback_, r, txn);
if (r!=0) return r; if (r!=0) return r;
logtype_dispatch(item, toku_free_logtype_); rolltype_dispatch(item, toku_free_rolltype_);
toku_free(item); toku_free(item);
} }
list_remove(&txn->live_txns_link); list_remove(&txn->live_txns_link);
......
...@@ -34,12 +34,19 @@ struct logtype { ...@@ -34,12 +34,19 @@ struct logtype {
struct field *fields; struct field *fields;
}; };
#define GEN_ROLLBACK (1<<9)
// In the fields, don't mention the command, the LSN, the CRC or the trailing LEN. // In the fields, don't mention the command, the LSN, the CRC or the trailing LEN.
int logformat_version_number = 0; int logformat_version_number = 0;
const struct logtype rollbacks[] = {
{"fcreate", 'F', FA{{"BYTESTRING", "fname", 0},
NULLFIELD}},
{"delete", 'K', FA{{"FILENUM", "filenum", 0}, // Note a delete for rollback.
{"BYTESTRING", "key", 0},
{"BYTESTRING", "data", 0},
NULLFIELD}},
};
const struct logtype logtypes[] = { const struct logtype logtypes[] = {
{"commit", 'C', FA{{"TXNID", "txnid", 0},NULLFIELD}}, {"commit", 'C', FA{{"TXNID", "txnid", 0},NULLFIELD}},
{"delete", 'D', FA{{"FILENUM", "filenum", 0}, {"delete", 'D', FA{{"FILENUM", "filenum", 0},
...@@ -47,7 +54,7 @@ const struct logtype logtypes[] = { ...@@ -47,7 +54,7 @@ const struct logtype logtypes[] = {
{"BYTESTRING", "key", 0}, {"BYTESTRING", "key", 0},
{"BYTESTRING", "data", 0}, {"BYTESTRING", "data", 0},
NULLFIELD}}, NULLFIELD}},
{"fcreate", 'F'+GEN_ROLLBACK, FA{{"TXNID", "txnid", 0}, {"fcreate", 'F', FA{{"TXNID", "txnid", 0},
{"BYTESTRING", "fname", 0}, {"BYTESTRING", "fname", 0},
{"u_int32_t", "mode", "0%o"}, {"u_int32_t", "mode", "0%o"},
NULLFIELD}}, NULLFIELD}},
...@@ -136,10 +143,6 @@ const struct logtype logtypes[] = { ...@@ -136,10 +143,6 @@ const struct logtype logtypes[] = {
{"BYTESTRING", "key", 0}, {"BYTESTRING", "key", 0},
{"BYTESTRING", "data", 0}, {"BYTESTRING", "data", 0},
NULLFIELD}}, NULLFIELD}},
{"tl_delete", 'K'+GEN_ROLLBACK, FA{{"FILENUM", "filenum", 0}, // Note a delete for rollback.
{"BYTESTRING", "key", 0},
{"BYTESTRING", "data", 0},
NULLFIELD}},
{"deleteinleaf", 'd', FA{{"TXNID", "txnid", 0}, {"deleteinleaf", 'd', FA{{"TXNID", "txnid", 0},
{"FILENUM", "filenum", 0}, {"FILENUM", "filenum", 0},
{"DISKOFF", "diskoff", 0}, {"DISKOFF", "diskoff", 0},
...@@ -160,18 +163,26 @@ const struct logtype logtypes[] = { ...@@ -160,18 +163,26 @@ const struct logtype logtypes[] = {
{0,0,FA{NULLFIELD}} {0,0,FA{NULLFIELD}}
}; };
#define DO_LOGTYPES(lt, body) ({ \
#define DO_STRUCTS(lt, array, body) ({ \
const struct logtype *lt; \ const struct logtype *lt; \
for (lt=&logtypes[0]; lt->name; lt++) { \ for (lt=&array[0]; lt->name; lt++) { \
body; \ body; \
} }) } })
#define DO_ROLLBACKS(lt, body) DO_STRUCTS(lt, rollbacks, body)
#define DO_LOGTYPES(lt, body) DO_STRUCTS(lt, logtypes, body)
#define DO_LOGTYPES_AND_ROLLBACKS(lt, body) (DO_ROLLBACKS(lt,body), DO_LOGTYPES(lt, body))
#define DO_FIELDS(fld, lt, body) ({ \ #define DO_FIELDS(fld, lt, body) ({ \
struct field *fld; \ struct field *fld; \
for (fld=lt->fields; fld->type; fld++) { \ for (fld=lt->fields; fld->type; fld++) { \
body; \ body; \
} }) } })
void fprintf2 (FILE *f1, FILE *f2, const char *format, ...) { void fprintf2 (FILE *f1, FILE *f2, const char *format, ...) {
va_list ap; va_list ap;
int r; int r;
...@@ -185,22 +196,28 @@ void fprintf2 (FILE *f1, FILE *f2, const char *format, ...) { ...@@ -185,22 +196,28 @@ void fprintf2 (FILE *f1, FILE *f2, const char *format, ...) {
FILE *hf=0, *cf=0; FILE *hf=0, *cf=0;
void generate_lt_enum (void) { void generate_enum_internal (char *enum_name, char *enum_prefix, const struct logtype *lts) {
char used_cmds[256]; char used_cmds[256];
int count=0; int count=0;
memset(used_cmds, 0, 256); memset(used_cmds, 0, 256);
fprintf(hf, "enum lt_cmd {"); fprintf(hf, "enum %s {", enum_name);
DO_LOGTYPES(lt, DO_STRUCTS(lt, lts,
({ ({
unsigned char cmd = lt->command_and_flags&0xff; unsigned char cmd = lt->command_and_flags&0xff;
if (count!=0) fprintf(hf, ","); if (count!=0) fprintf(hf, ",");
count++; count++;
fprintf(hf, "\n"); fprintf(hf, "\n");
fprintf(hf," LT_%-16s = '%c'", lt->name, cmd); fprintf(hf," %s_%-16s = '%c'", enum_prefix, lt->name, cmd);
if (used_cmds[cmd]!=0) { fprintf(stderr, "%s:%d Command %d (%c) was used twice\n", __FILE__, __LINE__, cmd, cmd); abort(); } if (used_cmds[cmd]!=0) { fprintf(stderr, "%s:%d Command %d (%c) was used twice\n", __FILE__, __LINE__, cmd, cmd); abort(); }
used_cmds[cmd]=1; used_cmds[cmd]=1;
})); }));
fprintf(hf, "\n};\n\n"); fprintf(hf, "\n};\n\n");
}
void generate_enum (void) {
generate_enum_internal("lt_cmd", "LT", logtypes);
generate_enum_internal("rt_cmd", "RT", rollbacks);
} }
void generate_log_struct (void) { void generate_log_struct (void) {
...@@ -215,35 +232,46 @@ void generate_log_struct (void) { ...@@ -215,35 +232,46 @@ void generate_log_struct (void) {
fprintf(hf, "void toku_recover_%s (LSN lsn", lt->name); fprintf(hf, "void toku_recover_%s (LSN lsn", lt->name);
DO_FIELDS(ft, lt, fprintf(hf, ", %s %s", ft->type, ft->name)); DO_FIELDS(ft, lt, fprintf(hf, ", %s %s", ft->type, ft->name));
fprintf(hf, ");\n"); fprintf(hf, ");\n");
if (lt->command_and_flags & GEN_ROLLBACK) { }));
DO_ROLLBACKS(lt,
({ fprintf(hf, "struct rolltype_%s {\n", lt->name);
DO_FIELDS(ft, lt,
fprintf(hf, " %-16s %s;\n", ft->type, ft->name));
fprintf(hf, "};\n");
fprintf(hf, "int toku_rollback_%s (", lt->name); fprintf(hf, "int toku_rollback_%s (", lt->name);
DO_FIELDS(ft, lt, fprintf(hf, "%s %s,", ft->type, ft->name)); DO_FIELDS(ft, lt, fprintf(hf, "%s %s,", ft->type, ft->name));
fprintf(hf, "TOKUTXN txn);\n"); fprintf(hf, "TOKUTXN txn);\n");
}
})); }));
fprintf(hf, "struct log_entry {\n"); fprintf(hf, "struct log_entry {\n");
fprintf(hf, " enum lt_cmd cmd;\n"); fprintf(hf, " enum lt_cmd cmd;\n");
fprintf(hf, " union {\n"); fprintf(hf, " union {\n");
DO_LOGTYPES(lt, fprintf(hf," struct logtype_%s %s;\n", lt->name, lt->name)); DO_LOGTYPES(lt, fprintf(hf," struct logtype_%s %s;\n", lt->name, lt->name));
fprintf(hf, " } u;\n"); fprintf(hf, " } u;\n");
fprintf(hf, " struct log_entry *prev; /* for in-memory list of log entries. Threads from newest to oldest. */\n");
fprintf(hf, "};\n"); fprintf(hf, "};\n");
fprintf(hf, "struct roll_entry {\n");
fprintf(hf, " enum rt_cmd cmd;\n");
fprintf(hf, " union {\n");
DO_ROLLBACKS(lt, fprintf(hf," struct rolltype_%s %s;\n", lt->name, lt->name));
fprintf(hf, " } u;\n");
fprintf(hf, " struct roll_entry *prev; /* for in-memory list of log entries. Threads from newest to oldest. */\n");
fprintf(hf, "};\n");
} }
void generate_dispatch (void) { void generate_dispatch (void) {
fprintf(hf, "#define logtype_dispatch(s, funprefix) ({ switch((s)->cmd) {\\\n"); fprintf(hf, "#define rolltype_dispatch(s, funprefix) ({ switch((s)->cmd) {\\\n");
DO_LOGTYPES(lt, fprintf(hf, " case LT_%s: funprefix ## %s (&(s)->u.%s); break;\\\n", lt->name, lt->name, lt->name)); DO_ROLLBACKS(lt, fprintf(hf, " case RT_%s: funprefix ## %s (&(s)->u.%s); break;\\\n", lt->name, lt->name, lt->name));
fprintf(hf, " }})\n"); fprintf(hf, " }})\n");
fprintf(hf, "#define logtype_dispatch_assign(s, funprefix, var, args...) ({ switch((s)->cmd) {\\\n"); fprintf(hf, "#define logtype_dispatch_assign(s, funprefix, var, args...) ({ switch((s)->cmd) {\\\n");
DO_LOGTYPES(lt, fprintf(hf, " case LT_%s: var = funprefix ## %s (&(s)->u.%s, ## args); break;\\\n", lt->name, lt->name, lt->name)); DO_LOGTYPES(lt, fprintf(hf, " case LT_%s: var = funprefix ## %s (&(s)->u.%s, ## args); break;\\\n", lt->name, lt->name, lt->name));
fprintf(hf, " }})\n"); fprintf(hf, " }})\n");
fprintf(hf, "#define logtype_dispatch_assign_rollback(s, funprefix, var, args...) ({ \\\n"); fprintf(hf, "#define rolltype_dispatch_assign(s, funprefix, var, args...) ({ \\\n");
fprintf(hf, " switch((s)->cmd) {\\\n"); fprintf(hf, " switch((s)->cmd) {\\\n");
DO_LOGTYPES(lt, ({ DO_ROLLBACKS(lt, ({
if (lt->command_and_flags & GEN_ROLLBACK) { fprintf(hf, " case RT_%s: var = funprefix ## %s (", lt->name, lt->name);
fprintf(hf, " case LT_%s: var = funprefix ## %s (", lt->name, lt->name);
int fieldcount=0; int fieldcount=0;
DO_FIELDS(ft, lt, ({ DO_FIELDS(ft, lt, ({
if (fieldcount>0) fprintf(hf, ","); if (fieldcount>0) fprintf(hf, ",");
...@@ -251,7 +279,7 @@ void generate_dispatch (void) { ...@@ -251,7 +279,7 @@ void generate_dispatch (void) {
fieldcount++; fieldcount++;
})); }));
fprintf(hf, ",## args); break;\\\n"); fprintf(hf, ",## args); break;\\\n");
}})); }));
fprintf(hf, " default: assert(0);} })\n"); fprintf(hf, " default: assert(0);} })\n");
fprintf(hf, "#define logtype_dispatch_args(s, funprefix) ({ switch((s)->cmd) {\\\n"); fprintf(hf, "#define logtype_dispatch_args(s, funprefix) ({ switch((s)->cmd) {\\\n");
...@@ -265,8 +293,8 @@ void generate_dispatch (void) { ...@@ -265,8 +293,8 @@ void generate_dispatch (void) {
} }
void generate_log_free(void) { void generate_log_free(void) {
DO_LOGTYPES(lt, ({ DO_ROLLBACKS(lt, ({
fprintf2(cf, hf, "void toku_free_logtype_%s(struct logtype_%s *e)", lt->name, lt->name); fprintf2(cf, hf, "void toku_free_rolltype_%s(struct rolltype_%s *e)", lt->name, lt->name);
fprintf(hf, ";\n"); fprintf(hf, ";\n");
fprintf(cf, " {\n"); fprintf(cf, " {\n");
DO_FIELDS(ft, lt, fprintf(cf, " toku_free_%s(e->%s);\n", ft->type, ft->name)); DO_FIELDS(ft, lt, fprintf(cf, " toku_free_%s(e->%s);\n", ft->type, ft->name));
...@@ -392,13 +420,12 @@ void generate_logprint (void) { ...@@ -392,13 +420,12 @@ void generate_logprint (void) {
} }
void generate_rollbacks (void) { void generate_rollbacks (void) {
DO_LOGTYPES(lt, ({ DO_ROLLBACKS(lt, ({
if (lt->command_and_flags & GEN_ROLLBACK) {
fprintf2(cf, hf, "int toku_logger_save_rollback_%s (TOKUTXN txn", lt->name); fprintf2(cf, hf, "int toku_logger_save_rollback_%s (TOKUTXN txn", lt->name);
DO_FIELDS(ft, lt, fprintf2(cf, hf, ", %s %s", ft->type, ft->name)); DO_FIELDS(ft, lt, fprintf2(cf, hf, ", %s %s", ft->type, ft->name));
fprintf(hf, ");\n"); fprintf(hf, ");\n");
fprintf(cf, ") {\n"); fprintf(cf, ") {\n");
fprintf(cf, " struct log_entry *v = toku_malloc(sizeof(*v));\n"); fprintf(cf, " struct roll_entry *v = toku_malloc(sizeof(*v));\n");
fprintf(cf, " if (v==0) return errno;\n"); fprintf(cf, " if (v==0) return errno;\n");
fprintf(cf, " v->cmd = %d;\n", lt->command_and_flags&0xff); fprintf(cf, " v->cmd = %d;\n", lt->command_and_flags&0xff);
DO_FIELDS(ft, lt, fprintf(cf, " v->u.%s.%s = %s;\n", lt->name, ft->name, ft->name)); DO_FIELDS(ft, lt, fprintf(cf, " v->u.%s.%s = %s;\n", lt->name, ft->name, ft->name));
...@@ -406,7 +433,6 @@ void generate_rollbacks (void) { ...@@ -406,7 +433,6 @@ void generate_rollbacks (void) {
fprintf(cf, " if (txn->oldest_logentry==0) txn->oldest_logentry=v;\n"); fprintf(cf, " if (txn->oldest_logentry==0) txn->oldest_logentry=v;\n");
fprintf(cf, " txn->newest_logentry = v;\n"); fprintf(cf, " txn->newest_logentry = v;\n");
fprintf(cf, " return 0;\n}\n"); fprintf(cf, " return 0;\n}\n");
}
})); }));
} }
...@@ -425,7 +451,7 @@ int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__u ...@@ -425,7 +451,7 @@ int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__u
fprintf(cf, "#include \"log_header.h\"\n"); fprintf(cf, "#include \"log_header.h\"\n");
fprintf(cf, "#include \"wbuf.h\"\n"); fprintf(cf, "#include \"wbuf.h\"\n");
fprintf(cf, "#include \"log-internal.h\"\n"); fprintf(cf, "#include \"log-internal.h\"\n");
generate_lt_enum(); generate_enum();
generate_log_struct(); generate_log_struct();
generate_dispatch(); generate_dispatch();
generate_log_writer(); generate_log_writer();
......
...@@ -870,7 +870,7 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v, ...@@ -870,7 +870,7 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
const BYTESTRING deleteddata = { kv->vallen, toku_memdup(kv_pair_val(kv), kv->vallen) }; const BYTESTRING deleteddata = { kv->vallen, toku_memdup(kv_pair_val(kv), kv->vallen) };
TOKUTXN txn; TOKUTXN txn;
if (0!=toku_txnid2txn(logger, xid, &txn)) return -1; if (0!=toku_txnid2txn(logger, xid, &txn)) return -1;
r=toku_logger_save_rollback_tl_delete(txn, pma->filenum, deletedkey, deleteddata); r=toku_logger_save_rollback_delete(txn, pma->filenum, deletedkey, deleteddata);
if (r!=0) { if (r!=0) {
toku_free(deletedkey.data); toku_free(deletedkey.data);
toku_free(deleteddata.data); toku_free(deleteddata.data);
......
...@@ -94,13 +94,8 @@ static char *fixup_fname(BYTESTRING *f) { ...@@ -94,13 +94,8 @@ static char *fixup_fname(BYTESTRING *f) {
void toku_recover_commit (LSN UU(lsn), TXNID UU(txnid)) { void toku_recover_commit (LSN UU(lsn), TXNID UU(txnid)) {
} }
// Rolling back a commit doesn't make much sense. It won't happen during an abort. But it doesn't hurt to ignore it.
int toku_rollback_commit (struct logtype_commit *le __attribute__((__unused__)), TOKUTXN txn __attribute__((__unused__))) {
return 0;
}
#define ABORTIT { le=le; txn=txn; fprintf(stderr, "%s:%d (%s) not ready to go\n", __FILE__, __LINE__, __func__); abort(); } #define ABORTIT { le=le; txn=txn; fprintf(stderr, "%s:%d (%s) not ready to go\n", __FILE__, __LINE__, __func__); abort(); }
int toku_rollback_delete (struct logtype_delete *le, TOKUTXN txn) ABORTIT
void toku_recover_delete (LSN UU(lsn), FILENUM UU(filenum),DISKOFF UU(diskoff),BYTESTRING UU(key),BYTESTRING UU(data)) { assert(0); } void toku_recover_delete (LSN UU(lsn), FILENUM UU(filenum),DISKOFF UU(diskoff),BYTESTRING UU(key),BYTESTRING UU(data)) { assert(0); }
void toku_recover_fcreate (LSN UU(lsn), TXNID UU(txnid),BYTESTRING fname,u_int32_t mode) { void toku_recover_fcreate (LSN UU(lsn), TXNID UU(txnid),BYTESTRING fname,u_int32_t mode) {
...@@ -111,9 +106,7 @@ void toku_recover_fcreate (LSN UU(lsn), TXNID UU(txnid),BYTESTRING fname,u_int32 ...@@ -111,9 +106,7 @@ void toku_recover_fcreate (LSN UU(lsn), TXNID UU(txnid),BYTESTRING fname,u_int32
toku_free_BYTESTRING(fname); toku_free_BYTESTRING(fname);
} }
int toku_rollback_fcreate (TXNID txnid __attribute__((__unused__)), int toku_rollback_fcreate (BYTESTRING bs_fname,
BYTESTRING bs_fname,
u_int32_t mode __attribute__((__unused__)),
TOKUTXN txn __attribute__((__unused__))) { TOKUTXN txn __attribute__((__unused__))) {
char *fname = fixup_fname(&bs_fname); char *fname = fixup_fname(&bs_fname);
char *directory = txn->logger->directory; char *directory = txn->logger->directory;
...@@ -203,14 +196,14 @@ void toku_recover_newbrtnode (LSN lsn, FILENUM filenum,DISKOFF diskoff,u_int32_t ...@@ -203,14 +196,14 @@ void toku_recover_newbrtnode (LSN lsn, FILENUM filenum,DISKOFF diskoff,u_int32_t
assert(r==0); assert(r==0);
} }
int toku_rollback_newbrtnode (struct logtype_newbrtnode *le, TOKUTXN txn) { //int toku_rollback_newbrtnode (struct logtype_newbrtnode *le, TOKUTXN txn) {
// All that must be done is to put the node on the freelist. // // All that must be done is to put the node on the freelist.
// Since we don't have a freelist right now, we don't have anything to do. // // Since we don't have a freelist right now, we don't have anything to do.
// We'll fix this later (See #264) // // We'll fix this later (See #264)
le=le; // le=le;
txn=txn; // txn=txn;
return 0; // return 0;
} //}
static void recover_setup_node (FILENUM filenum, DISKOFF diskoff, CACHEFILE *cf, BRTNODE *resultnode) { static void recover_setup_node (FILENUM filenum, DISKOFF diskoff, CACHEFILE *cf, BRTNODE *resultnode) {
...@@ -226,13 +219,10 @@ static void recover_setup_node (FILENUM filenum, DISKOFF diskoff, CACHEFILE *cf, ...@@ -226,13 +219,10 @@ static void recover_setup_node (FILENUM filenum, DISKOFF diskoff, CACHEFILE *cf,
*cf = pair->cf; *cf = pair->cf;
} }
int toku_rollback_brtdeq (struct logtype_brtdeq * le, TOKUTXN txn) ABORTIT
void toku_recover_brtdeq (LSN UU(lsn), FILENUM UU(filenum), DISKOFF UU(diskoff), u_int32_t UU(childnum), TXNID UU(xid), u_int32_t UU(typ), BYTESTRING UU(key), BYTESTRING UU(data), u_int32_t UU(oldfingerprint), u_int32_t UU(newfingerprint)) { assert(0); } void toku_recover_brtdeq (LSN UU(lsn), FILENUM UU(filenum), DISKOFF UU(diskoff), u_int32_t UU(childnum), TXNID UU(xid), u_int32_t UU(typ), BYTESTRING UU(key), BYTESTRING UU(data), u_int32_t UU(oldfingerprint), u_int32_t UU(newfingerprint)) { assert(0); }
int toku_rollback_brtenq (struct logtype_brtenq * le, TOKUTXN txn) ABORTIT
void toku_recover_brtenq (LSN UU(lsn), FILENUM UU(filenum), DISKOFF UU(diskoff), u_int32_t UU(childnum), TXNID UU(xid), u_int32_t UU(typ), BYTESTRING UU(key), BYTESTRING UU(data), u_int32_t UU(oldfingerprint), u_int32_t UU(newfingerprint)) { assert(0); } void toku_recover_brtenq (LSN UU(lsn), FILENUM UU(filenum), DISKOFF UU(diskoff), u_int32_t UU(childnum), TXNID UU(xid), u_int32_t UU(typ), BYTESTRING UU(key), BYTESTRING UU(data), u_int32_t UU(oldfingerprint), u_int32_t UU(newfingerprint)) { assert(0); }
int toku_rollback_addchild (struct logtype_addchild *le, TOKUTXN txn) ABORTIT
void toku_recover_addchild (LSN lsn, FILENUM filenum, DISKOFF diskoff, u_int32_t childnum, DISKOFF child, u_int32_t childfingerprint) { void toku_recover_addchild (LSN lsn, FILENUM filenum, DISKOFF diskoff, u_int32_t childnum, DISKOFF child, u_int32_t childfingerprint) {
CACHEFILE cf; CACHEFILE cf;
BRTNODE node; BRTNODE node;
...@@ -259,7 +249,6 @@ void toku_recover_addchild (LSN lsn, FILENUM filenum, DISKOFF diskoff, u_int32_t ...@@ -259,7 +249,6 @@ void toku_recover_addchild (LSN lsn, FILENUM filenum, DISKOFF diskoff, u_int32_t
assert(r==0); assert(r==0);
} }
int toku_rollback_delchild (struct logtype_delchild * le, TOKUTXN txn) ABORTIT
void toku_recover_delchild (LSN lsn, FILENUM filenum, DISKOFF diskoff, u_int32_t childnum, DISKOFF child, u_int32_t childfingerprint, BYTESTRING pivotkey) { void toku_recover_delchild (LSN lsn, FILENUM filenum, DISKOFF diskoff, u_int32_t childnum, DISKOFF child, u_int32_t childfingerprint, BYTESTRING pivotkey) {
struct cf_pair *pair; struct cf_pair *pair;
int r = find_cachefile(filenum, &pair); int r = find_cachefile(filenum, &pair);
...@@ -295,7 +284,6 @@ void toku_recover_delchild (LSN lsn, FILENUM filenum, DISKOFF diskoff, u_int32_t ...@@ -295,7 +284,6 @@ void toku_recover_delchild (LSN lsn, FILENUM filenum, DISKOFF diskoff, u_int32_t
toku_free(pivotkey.data); toku_free(pivotkey.data);
} }
int toku_rollback_setchild (struct logtype_setchild *le, TOKUTXN txn) ABORTIT
void toku_recover_setchild (LSN lsn, FILENUM filenum, DISKOFF diskoff, u_int32_t childnum, DISKOFF UU(oldchild), DISKOFF newchild) { void toku_recover_setchild (LSN lsn, FILENUM filenum, DISKOFF diskoff, u_int32_t childnum, DISKOFF UU(oldchild), DISKOFF newchild) {
struct cf_pair *pair; struct cf_pair *pair;
int r = find_cachefile(filenum, &pair); int r = find_cachefile(filenum, &pair);
...@@ -312,7 +300,6 @@ void toku_recover_setchild (LSN lsn, FILENUM filenum, DISKOFF diskoff, u_int32_t ...@@ -312,7 +300,6 @@ void toku_recover_setchild (LSN lsn, FILENUM filenum, DISKOFF diskoff, u_int32_t
r = toku_cachetable_unpin(pair->cf, diskoff, 1, toku_serialize_brtnode_size(node)); r = toku_cachetable_unpin(pair->cf, diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0); assert(r==0);
} }
int toku_rollback_setpivot (struct logtype_setpivot *le, TOKUTXN txn) ABORTIT
void toku_recover_setpivot (LSN lsn, FILENUM filenum, DISKOFF diskoff, u_int32_t childnum, BYTESTRING pivotkey) { void toku_recover_setpivot (LSN lsn, FILENUM filenum, DISKOFF diskoff, u_int32_t childnum, BYTESTRING pivotkey) {
struct cf_pair *pair; struct cf_pair *pair;
int r = find_cachefile(filenum, &pair); int r = find_cachefile(filenum, &pair);
...@@ -353,7 +340,6 @@ void toku_recover_changechildfingerprint (LSN lsn, FILENUM filenum, DISKOFF disk ...@@ -353,7 +340,6 @@ void toku_recover_changechildfingerprint (LSN lsn, FILENUM filenum, DISKOFF disk
assert(r==0); assert(r==0);
} }
int toku_rollback_changechildfingerprint (struct logtype_changechildfingerprint *le, TOKUTXN txn) ABORTIT
void toku_recover_fopen (LSN UU(lsn), TXNID UU(txnid), BYTESTRING fname, FILENUM filenum) { void toku_recover_fopen (LSN UU(lsn), TXNID UU(txnid), BYTESTRING fname, FILENUM filenum) {
char *fixedfname = fixup_fname(&fname); char *fixedfname = fixup_fname(&fname);
...@@ -367,11 +353,6 @@ void toku_recover_fopen (LSN UU(lsn), TXNID UU(txnid), BYTESTRING fname, FILENUM ...@@ -367,11 +353,6 @@ void toku_recover_fopen (LSN UU(lsn), TXNID UU(txnid), BYTESTRING fname, FILENUM
toku_free_BYTESTRING(fname); toku_free_BYTESTRING(fname);
} }
int toku_rollback_fopen (struct logtype_fopen *le __attribute__((__unused__)), TOKUTXN txn __attribute__((__unused__))) {
// Nothing needs to be done to undo an fopen.
return 0;
}
void toku_recover_insertinleaf (LSN lsn, TXNID UU(txnid), FILENUM filenum, DISKOFF diskoff, u_int32_t pmaidx, BYTESTRING keybs, BYTESTRING databs) { void toku_recover_insertinleaf (LSN lsn, TXNID UU(txnid), FILENUM filenum, DISKOFF diskoff, u_int32_t pmaidx, BYTESTRING keybs, BYTESTRING databs) {
struct cf_pair *pair; struct cf_pair *pair;
int r = find_cachefile(filenum, &pair); int r = find_cachefile(filenum, &pair);
...@@ -398,28 +379,6 @@ void toku_recover_insertinleaf (LSN lsn, TXNID UU(txnid), FILENUM filenum, DISKO ...@@ -398,28 +379,6 @@ void toku_recover_insertinleaf (LSN lsn, TXNID UU(txnid), FILENUM filenum, DISKO
toku_free_BYTESTRING(databs); toku_free_BYTESTRING(databs);
} }
int toku_rollback_insertinleaf (TXNID txnid __attribute__((__unused__)), FILENUM filenum, DISKOFF diskoff, u_int32_t pmaidx, BYTESTRING key, BYTESTRING data, TOKUTXN txn) {
CACHEFILE cf;
BRT brt;
void *node_v;
printf("Rollback insertinleaf\n");
int r = toku_cachefile_of_filenum(txn->logger->ct, filenum, &cf, &brt);
assert(r==0);
r = toku_cachetable_get_and_pin(cf, diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
if (r!=0) return r;
BRTNODE node = node_v;
r = toku_pma_clear_at_index(node->u.l.buffer, pmaidx);
if (r == 0) {
node->local_fingerprint -= node->rand4fingerprint*toku_calccrc32_kvpair(key.data, key.len, data.data, data.len);
node->u.l.n_bytes_in_buffer -= PMA_ITEM_OVERHEAD + KEY_VALUE_OVERHEAD + key.len + data.len;
VERIFY_COUNTS(node);
//node->log_lsn = c->lsn;
}
int r2 = toku_cachetable_unpin(cf, diskoff, 1, toku_serialize_brtnode_size(node));
return r == 0 ? r2 : r;
}
void toku_recover_deleteinleaf (LSN lsn, TXNID UU(txnid), FILENUM filenum, DISKOFF diskoff, u_int32_t pmaidx, BYTESTRING keybs, BYTESTRING databs) { void toku_recover_deleteinleaf (LSN lsn, TXNID UU(txnid), FILENUM filenum, DISKOFF diskoff, u_int32_t pmaidx, BYTESTRING keybs, BYTESTRING databs) {
struct cf_pair *pair; struct cf_pair *pair;
int r = find_cachefile(filenum, &pair); int r = find_cachefile(filenum, &pair);
...@@ -443,13 +402,7 @@ void toku_recover_deleteinleaf (LSN lsn, TXNID UU(txnid), FILENUM filenum, DISKO ...@@ -443,13 +402,7 @@ void toku_recover_deleteinleaf (LSN lsn, TXNID UU(txnid), FILENUM filenum, DISKO
toku_free_BYTESTRING(databs); toku_free_BYTESTRING(databs);
} }
void toku_recover_tl_delete (LSN lsn __attribute__((__unused__)), FILENUM filenum __attribute__((__unused__)), BYTESTRING key __attribute__((__unused__)), BYTESTRING data __attribute__((__unused__))) { int toku_rollback_delete (FILENUM filenum,
return; // tl_delete should not appear in the log.
}
int toku_rollback_tl_delete (FILENUM filenum,
BYTESTRING key,BYTESTRING data,TOKUTXN txn) { BYTESTRING key,BYTESTRING data,TOKUTXN txn) {
CACHEFILE cf; CACHEFILE cf;
BRT brt; BRT brt;
...@@ -524,44 +477,6 @@ void toku_recover_pmadistribute (LSN lsn, FILENUM filenum, DISKOFF old_diskoff, ...@@ -524,44 +477,6 @@ void toku_recover_pmadistribute (LSN lsn, FILENUM filenum, DISKOFF old_diskoff,
toku_free_INTPAIRARRAY(fromto); toku_free_INTPAIRARRAY(fromto);
} }
int toku_rollback_pmadistribute (struct logtype_pmadistribute *le, TOKUTXN txn) {
CACHEFILE cf;
BRT brt;
int r = toku_cachefile_of_filenum(txn->logger->ct, le->filenum, &cf, &brt);
if (r!=0) return r;
void *node_va, *node_vb;
r = toku_cachetable_get_and_pin(cf, le->old_diskoff, &node_va, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
if (r!=0) return r;
if (0) {
died0: toku_cachetable_unpin(cf, le->old_diskoff, 1, 0);
return r;
}
r = toku_cachetable_get_and_pin(cf, le->new_diskoff, &node_vb, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
if (r!=0) goto died0;
if (0) {
died1:
toku_cachetable_unpin(cf, le->new_diskoff, 1, 0);
goto died0;
}
BRTNODE nodea = node_va;
BRTNODE nodeb = node_vb;
r = toku_pma_move_indices_back(nodea->u.l.buffer, nodeb->u.l.buffer, le->fromto,
nodeb->rand4fingerprint, &nodeb->local_fingerprint,
nodea->rand4fingerprint, &nodea->local_fingerprint,
&nodeb->u.l.n_bytes_in_buffer, &nodea->u.l.n_bytes_in_buffer
);
if (r!=0) goto died1;
nodea->log_lsn = le->lsn;
nodeb->log_lsn = le->lsn;
r = toku_cachetable_unpin(cf, le->old_diskoff, 1, toku_serialize_brtnode_size(nodea));
r = toku_cachetable_unpin(cf, le->new_diskoff, 1, toku_serialize_brtnode_size(nodeb));
return r;
}
int toku_rollback_fheader (struct logtype_fheader *le, TOKUTXN txn) ABORTIT
int toku_rollback_resizepma (struct logtype_resizepma *le, TOKUTXN txn) ABORTIT
void toku_recover_changeunnamedroot (LSN UU(lsn), FILENUM filenum, DISKOFF UU(oldroot), DISKOFF newroot) { void toku_recover_changeunnamedroot (LSN UU(lsn), FILENUM filenum, DISKOFF UU(oldroot), DISKOFF newroot) {
struct cf_pair *pair; struct cf_pair *pair;
int r = find_cachefile(filenum, &pair); int r = find_cachefile(filenum, &pair);
...@@ -573,8 +488,6 @@ void toku_recover_changeunnamedroot (LSN UU(lsn), FILENUM filenum, DISKOFF UU(ol ...@@ -573,8 +488,6 @@ void toku_recover_changeunnamedroot (LSN UU(lsn), FILENUM filenum, DISKOFF UU(ol
r = toku_unpin_brt_header(pair->brt); r = toku_unpin_brt_header(pair->brt);
} }
void toku_recover_changenamedroot (LSN UU(lsn), FILENUM UU(filenum), BYTESTRING UU(name), DISKOFF UU(oldroot), DISKOFF UU(newroot)) { assert(0); } void toku_recover_changenamedroot (LSN UU(lsn), FILENUM UU(filenum), BYTESTRING UU(name), DISKOFF UU(oldroot), DISKOFF UU(newroot)) { assert(0); }
int toku_rollback_changeunnamedroot (struct logtype_changeunnamedroot *le, TOKUTXN txn) ABORTIT
int toku_rollback_changenamedroot (struct logtype_changenamedroot *le, TOKUTXN txn) ABORTIT
void toku_recover_changeunusedmemory (LSN UU(lsn), FILENUM filenum, DISKOFF UU(oldunused), DISKOFF newunused) { void toku_recover_changeunusedmemory (LSN UU(lsn), FILENUM filenum, DISKOFF UU(oldunused), DISKOFF newunused) {
struct cf_pair *pair; struct cf_pair *pair;
...@@ -586,5 +499,4 @@ void toku_recover_changeunusedmemory (LSN UU(lsn), FILENUM filenum, DISKOFF UU(o ...@@ -586,5 +499,4 @@ void toku_recover_changeunusedmemory (LSN UU(lsn), FILENUM filenum, DISKOFF UU(o
pair->brt->h->unused_memory = newunused; pair->brt->h->unused_memory = newunused;
r = toku_unpin_brt_header(pair->brt); r = toku_unpin_brt_header(pair->brt);
} }
int toku_rollback_changeunusedmemory (struct logtype_changeunusedmemory *le, TOKUTXN txn) ABORTIT
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment