Commit 2eaeb874 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Separate the code for generating rollbacks and recover log entries. Addresses #27.

git-svn-id: file:///svn/tokudb@2420 c7de825b-a66e-492c-adef-691d508d4ae1
parent d68e94a5
...@@ -46,7 +46,7 @@ struct tokutxn { ...@@ -46,7 +46,7 @@ struct tokutxn {
TOKULOGGER logger; TOKULOGGER logger;
TOKUTXN parent; TOKUTXN parent;
LSN last_lsn; /* Everytime anything is logged, update the LSN. (We need to atomically record the LSN along with writing into the log.) */ LSN last_lsn; /* Everytime anything is logged, update the LSN. (We need to atomically record the LSN along with writing into the log.) */
struct log_entry *oldest_logentry,*newest_logentry; /* Only logentries with rollbacks are here. There is a list going from newest to oldest. */ struct roll_entry *oldest_logentry,*newest_logentry; /* Only logentries with rollbacks are here. There is a list going from newest to oldest. */
struct list live_txns_link; struct list live_txns_link;
}; };
......
...@@ -236,10 +236,10 @@ int toku_logger_commit (TOKUTXN txn, int nosync) { ...@@ -236,10 +236,10 @@ int toku_logger_commit (TOKUTXN txn, int nosync) {
txn->newest_logentry = txn->oldest_logentry = 0; txn->newest_logentry = txn->oldest_logentry = 0;
} }
free_and_return: /*nothing*/; free_and_return: /*nothing*/;
struct log_entry *item; struct roll_entry *item;
while ((item=txn->newest_logentry)) { while ((item=txn->newest_logentry)) {
txn->newest_logentry = item->prev; txn->newest_logentry = item->prev;
logtype_dispatch(item, toku_free_logtype_); rolltype_dispatch(item, toku_free_rolltype_);
toku_free(item); toku_free(item);
} }
list_remove(&txn->live_txns_link); list_remove(&txn->live_txns_link);
...@@ -280,7 +280,7 @@ int toku_logger_log_fcreate (TOKUTXN txn, const char *fname, int mode) { ...@@ -280,7 +280,7 @@ int toku_logger_log_fcreate (TOKUTXN txn, const char *fname, int mode) {
BYTESTRING bs = { .len=strlen(fname), .data = strdup(fname) }; BYTESTRING bs = { .len=strlen(fname), .data = strdup(fname) };
int r = toku_log_fcreate (txn->logger, toku_txn_get_txnid(txn), bs, mode); int r = toku_log_fcreate (txn->logger, toku_txn_get_txnid(txn), bs, mode);
if (r!=0) return r; if (r!=0) return r;
r = toku_logger_save_rollback_fcreate(txn, toku_txn_get_txnid(txn), bs, mode); r = toku_logger_save_rollback_fcreate(txn, bs);
return r; return r;
} }
...@@ -608,13 +608,13 @@ int toku_abort_logentry_commit (struct logtype_commit *le __attribute__((__unuse ...@@ -608,13 +608,13 @@ int toku_abort_logentry_commit (struct logtype_commit *le __attribute__((__unuse
int toku_logger_abort(TOKUTXN txn) { int toku_logger_abort(TOKUTXN txn) {
// Must undo everything. Must undo it all in reverse order. // Must undo everything. Must undo it all in reverse order.
// Build the reverse list // Build the reverse list
struct log_entry *item; struct roll_entry *item;
while ((item=txn->newest_logentry)) { while ((item=txn->newest_logentry)) {
txn->newest_logentry = item->prev; txn->newest_logentry = item->prev;
int r; int r;
logtype_dispatch_assign_rollback(item, toku_rollback_, r, txn); rolltype_dispatch_assign(item, toku_rollback_, r, txn);
if (r!=0) return r; if (r!=0) return r;
logtype_dispatch(item, toku_free_logtype_); rolltype_dispatch(item, toku_free_rolltype_);
toku_free(item); toku_free(item);
} }
list_remove(&txn->live_txns_link); list_remove(&txn->live_txns_link);
......
...@@ -34,12 +34,19 @@ struct logtype { ...@@ -34,12 +34,19 @@ struct logtype {
struct field *fields; struct field *fields;
}; };
#define GEN_ROLLBACK (1<<9)
// In the fields, don't mention the command, the LSN, the CRC or the trailing LEN. // In the fields, don't mention the command, the LSN, the CRC or the trailing LEN.
int logformat_version_number = 0; int logformat_version_number = 0;
const struct logtype rollbacks[] = {
{"fcreate", 'F', FA{{"BYTESTRING", "fname", 0},
NULLFIELD}},
{"delete", 'K', FA{{"FILENUM", "filenum", 0}, // Note a delete for rollback.
{"BYTESTRING", "key", 0},
{"BYTESTRING", "data", 0},
NULLFIELD}},
};
const struct logtype logtypes[] = { const struct logtype logtypes[] = {
{"commit", 'C', FA{{"TXNID", "txnid", 0},NULLFIELD}}, {"commit", 'C', FA{{"TXNID", "txnid", 0},NULLFIELD}},
{"delete", 'D', FA{{"FILENUM", "filenum", 0}, {"delete", 'D', FA{{"FILENUM", "filenum", 0},
...@@ -47,7 +54,7 @@ const struct logtype logtypes[] = { ...@@ -47,7 +54,7 @@ const struct logtype logtypes[] = {
{"BYTESTRING", "key", 0}, {"BYTESTRING", "key", 0},
{"BYTESTRING", "data", 0}, {"BYTESTRING", "data", 0},
NULLFIELD}}, NULLFIELD}},
{"fcreate", 'F'+GEN_ROLLBACK, FA{{"TXNID", "txnid", 0}, {"fcreate", 'F', FA{{"TXNID", "txnid", 0},
{"BYTESTRING", "fname", 0}, {"BYTESTRING", "fname", 0},
{"u_int32_t", "mode", "0%o"}, {"u_int32_t", "mode", "0%o"},
NULLFIELD}}, NULLFIELD}},
...@@ -136,10 +143,6 @@ const struct logtype logtypes[] = { ...@@ -136,10 +143,6 @@ const struct logtype logtypes[] = {
{"BYTESTRING", "key", 0}, {"BYTESTRING", "key", 0},
{"BYTESTRING", "data", 0}, {"BYTESTRING", "data", 0},
NULLFIELD}}, NULLFIELD}},
{"tl_delete", 'K'+GEN_ROLLBACK, FA{{"FILENUM", "filenum", 0}, // Note a delete for rollback.
{"BYTESTRING", "key", 0},
{"BYTESTRING", "data", 0},
NULLFIELD}},
{"deleteinleaf", 'd', FA{{"TXNID", "txnid", 0}, {"deleteinleaf", 'd', FA{{"TXNID", "txnid", 0},
{"FILENUM", "filenum", 0}, {"FILENUM", "filenum", 0},
{"DISKOFF", "diskoff", 0}, {"DISKOFF", "diskoff", 0},
...@@ -160,18 +163,26 @@ const struct logtype logtypes[] = { ...@@ -160,18 +163,26 @@ const struct logtype logtypes[] = {
{0,0,FA{NULLFIELD}} {0,0,FA{NULLFIELD}}
}; };
#define DO_LOGTYPES(lt, body) ({ \
#define DO_STRUCTS(lt, array, body) ({ \
const struct logtype *lt; \ const struct logtype *lt; \
for (lt=&logtypes[0]; lt->name; lt++) { \ for (lt=&array[0]; lt->name; lt++) { \
body; \ body; \
} }) } })
#define DO_ROLLBACKS(lt, body) DO_STRUCTS(lt, rollbacks, body)
#define DO_LOGTYPES(lt, body) DO_STRUCTS(lt, logtypes, body)
#define DO_LOGTYPES_AND_ROLLBACKS(lt, body) (DO_ROLLBACKS(lt,body), DO_LOGTYPES(lt, body))
#define DO_FIELDS(fld, lt, body) ({ \ #define DO_FIELDS(fld, lt, body) ({ \
struct field *fld; \ struct field *fld; \
for (fld=lt->fields; fld->type; fld++) { \ for (fld=lt->fields; fld->type; fld++) { \
body; \ body; \
} }) } })
void fprintf2 (FILE *f1, FILE *f2, const char *format, ...) { void fprintf2 (FILE *f1, FILE *f2, const char *format, ...) {
va_list ap; va_list ap;
int r; int r;
...@@ -185,22 +196,28 @@ void fprintf2 (FILE *f1, FILE *f2, const char *format, ...) { ...@@ -185,22 +196,28 @@ void fprintf2 (FILE *f1, FILE *f2, const char *format, ...) {
FILE *hf=0, *cf=0; FILE *hf=0, *cf=0;
void generate_lt_enum (void) { void generate_enum_internal (char *enum_name, char *enum_prefix, const struct logtype *lts) {
char used_cmds[256]; char used_cmds[256];
int count=0; int count=0;
memset(used_cmds, 0, 256); memset(used_cmds, 0, 256);
fprintf(hf, "enum lt_cmd {"); fprintf(hf, "enum %s {", enum_name);
DO_LOGTYPES(lt, DO_STRUCTS(lt, lts,
({ ({
unsigned char cmd = lt->command_and_flags&0xff; unsigned char cmd = lt->command_and_flags&0xff;
if (count!=0) fprintf(hf, ","); if (count!=0) fprintf(hf, ",");
count++; count++;
fprintf(hf, "\n"); fprintf(hf, "\n");
fprintf(hf," LT_%-16s = '%c'", lt->name, cmd); fprintf(hf," %s_%-16s = '%c'", enum_prefix, lt->name, cmd);
if (used_cmds[cmd]!=0) { fprintf(stderr, "%s:%d Command %d (%c) was used twice\n", __FILE__, __LINE__, cmd, cmd); abort(); } if (used_cmds[cmd]!=0) { fprintf(stderr, "%s:%d Command %d (%c) was used twice\n", __FILE__, __LINE__, cmd, cmd); abort(); }
used_cmds[cmd]=1; used_cmds[cmd]=1;
})); }));
fprintf(hf, "\n};\n\n"); fprintf(hf, "\n};\n\n");
}
void generate_enum (void) {
generate_enum_internal("lt_cmd", "LT", logtypes);
generate_enum_internal("rt_cmd", "RT", rollbacks);
} }
void generate_log_struct (void) { void generate_log_struct (void) {
...@@ -215,35 +232,46 @@ void generate_log_struct (void) { ...@@ -215,35 +232,46 @@ void generate_log_struct (void) {
fprintf(hf, "void toku_recover_%s (LSN lsn", lt->name); fprintf(hf, "void toku_recover_%s (LSN lsn", lt->name);
DO_FIELDS(ft, lt, fprintf(hf, ", %s %s", ft->type, ft->name)); DO_FIELDS(ft, lt, fprintf(hf, ", %s %s", ft->type, ft->name));
fprintf(hf, ");\n"); fprintf(hf, ");\n");
if (lt->command_and_flags & GEN_ROLLBACK) { }));
DO_ROLLBACKS(lt,
({ fprintf(hf, "struct rolltype_%s {\n", lt->name);
DO_FIELDS(ft, lt,
fprintf(hf, " %-16s %s;\n", ft->type, ft->name));
fprintf(hf, "};\n");
fprintf(hf, "int toku_rollback_%s (", lt->name); fprintf(hf, "int toku_rollback_%s (", lt->name);
DO_FIELDS(ft, lt, fprintf(hf, "%s %s,", ft->type, ft->name)); DO_FIELDS(ft, lt, fprintf(hf, "%s %s,", ft->type, ft->name));
fprintf(hf, "TOKUTXN txn);\n"); fprintf(hf, "TOKUTXN txn);\n");
}
})); }));
fprintf(hf, "struct log_entry {\n"); fprintf(hf, "struct log_entry {\n");
fprintf(hf, " enum lt_cmd cmd;\n"); fprintf(hf, " enum lt_cmd cmd;\n");
fprintf(hf, " union {\n"); fprintf(hf, " union {\n");
DO_LOGTYPES(lt, fprintf(hf," struct logtype_%s %s;\n", lt->name, lt->name)); DO_LOGTYPES(lt, fprintf(hf," struct logtype_%s %s;\n", lt->name, lt->name));
fprintf(hf, " } u;\n"); fprintf(hf, " } u;\n");
fprintf(hf, " struct log_entry *prev; /* for in-memory list of log entries. Threads from newest to oldest. */\n");
fprintf(hf, "};\n"); fprintf(hf, "};\n");
fprintf(hf, "struct roll_entry {\n");
fprintf(hf, " enum rt_cmd cmd;\n");
fprintf(hf, " union {\n");
DO_ROLLBACKS(lt, fprintf(hf," struct rolltype_%s %s;\n", lt->name, lt->name));
fprintf(hf, " } u;\n");
fprintf(hf, " struct roll_entry *prev; /* for in-memory list of log entries. Threads from newest to oldest. */\n");
fprintf(hf, "};\n");
} }
void generate_dispatch (void) { void generate_dispatch (void) {
fprintf(hf, "#define logtype_dispatch(s, funprefix) ({ switch((s)->cmd) {\\\n"); fprintf(hf, "#define rolltype_dispatch(s, funprefix) ({ switch((s)->cmd) {\\\n");
DO_LOGTYPES(lt, fprintf(hf, " case LT_%s: funprefix ## %s (&(s)->u.%s); break;\\\n", lt->name, lt->name, lt->name)); DO_ROLLBACKS(lt, fprintf(hf, " case RT_%s: funprefix ## %s (&(s)->u.%s); break;\\\n", lt->name, lt->name, lt->name));
fprintf(hf, " }})\n"); fprintf(hf, " }})\n");
fprintf(hf, "#define logtype_dispatch_assign(s, funprefix, var, args...) ({ switch((s)->cmd) {\\\n"); fprintf(hf, "#define logtype_dispatch_assign(s, funprefix, var, args...) ({ switch((s)->cmd) {\\\n");
DO_LOGTYPES(lt, fprintf(hf, " case LT_%s: var = funprefix ## %s (&(s)->u.%s, ## args); break;\\\n", lt->name, lt->name, lt->name)); DO_LOGTYPES(lt, fprintf(hf, " case LT_%s: var = funprefix ## %s (&(s)->u.%s, ## args); break;\\\n", lt->name, lt->name, lt->name));
fprintf(hf, " }})\n"); fprintf(hf, " }})\n");
fprintf(hf, "#define logtype_dispatch_assign_rollback(s, funprefix, var, args...) ({ \\\n"); fprintf(hf, "#define rolltype_dispatch_assign(s, funprefix, var, args...) ({ \\\n");
fprintf(hf, " switch((s)->cmd) {\\\n"); fprintf(hf, " switch((s)->cmd) {\\\n");
DO_LOGTYPES(lt, ({ DO_ROLLBACKS(lt, ({
if (lt->command_and_flags & GEN_ROLLBACK) { fprintf(hf, " case RT_%s: var = funprefix ## %s (", lt->name, lt->name);
fprintf(hf, " case LT_%s: var = funprefix ## %s (", lt->name, lt->name);
int fieldcount=0; int fieldcount=0;
DO_FIELDS(ft, lt, ({ DO_FIELDS(ft, lt, ({
if (fieldcount>0) fprintf(hf, ","); if (fieldcount>0) fprintf(hf, ",");
...@@ -251,7 +279,7 @@ void generate_dispatch (void) { ...@@ -251,7 +279,7 @@ void generate_dispatch (void) {
fieldcount++; fieldcount++;
})); }));
fprintf(hf, ",## args); break;\\\n"); fprintf(hf, ",## args); break;\\\n");
}})); }));
fprintf(hf, " default: assert(0);} })\n"); fprintf(hf, " default: assert(0);} })\n");
fprintf(hf, "#define logtype_dispatch_args(s, funprefix) ({ switch((s)->cmd) {\\\n"); fprintf(hf, "#define logtype_dispatch_args(s, funprefix) ({ switch((s)->cmd) {\\\n");
...@@ -265,8 +293,8 @@ void generate_dispatch (void) { ...@@ -265,8 +293,8 @@ void generate_dispatch (void) {
} }
void generate_log_free(void) { void generate_log_free(void) {
DO_LOGTYPES(lt, ({ DO_ROLLBACKS(lt, ({
fprintf2(cf, hf, "void toku_free_logtype_%s(struct logtype_%s *e)", lt->name, lt->name); fprintf2(cf, hf, "void toku_free_rolltype_%s(struct rolltype_%s *e)", lt->name, lt->name);
fprintf(hf, ";\n"); fprintf(hf, ";\n");
fprintf(cf, " {\n"); fprintf(cf, " {\n");
DO_FIELDS(ft, lt, fprintf(cf, " toku_free_%s(e->%s);\n", ft->type, ft->name)); DO_FIELDS(ft, lt, fprintf(cf, " toku_free_%s(e->%s);\n", ft->type, ft->name));
...@@ -392,13 +420,12 @@ void generate_logprint (void) { ...@@ -392,13 +420,12 @@ void generate_logprint (void) {
} }
void generate_rollbacks (void) { void generate_rollbacks (void) {
DO_LOGTYPES(lt, ({ DO_ROLLBACKS(lt, ({
if (lt->command_and_flags & GEN_ROLLBACK) {
fprintf2(cf, hf, "int toku_logger_save_rollback_%s (TOKUTXN txn", lt->name); fprintf2(cf, hf, "int toku_logger_save_rollback_%s (TOKUTXN txn", lt->name);
DO_FIELDS(ft, lt, fprintf2(cf, hf, ", %s %s", ft->type, ft->name)); DO_FIELDS(ft, lt, fprintf2(cf, hf, ", %s %s", ft->type, ft->name));
fprintf(hf, ");\n"); fprintf(hf, ");\n");
fprintf(cf, ") {\n"); fprintf(cf, ") {\n");
fprintf(cf, " struct log_entry *v = toku_malloc(sizeof(*v));\n"); fprintf(cf, " struct roll_entry *v = toku_malloc(sizeof(*v));\n");
fprintf(cf, " if (v==0) return errno;\n"); fprintf(cf, " if (v==0) return errno;\n");
fprintf(cf, " v->cmd = %d;\n", lt->command_and_flags&0xff); fprintf(cf, " v->cmd = %d;\n", lt->command_and_flags&0xff);
DO_FIELDS(ft, lt, fprintf(cf, " v->u.%s.%s = %s;\n", lt->name, ft->name, ft->name)); DO_FIELDS(ft, lt, fprintf(cf, " v->u.%s.%s = %s;\n", lt->name, ft->name, ft->name));
...@@ -406,7 +433,6 @@ void generate_rollbacks (void) { ...@@ -406,7 +433,6 @@ void generate_rollbacks (void) {
fprintf(cf, " if (txn->oldest_logentry==0) txn->oldest_logentry=v;\n"); fprintf(cf, " if (txn->oldest_logentry==0) txn->oldest_logentry=v;\n");
fprintf(cf, " txn->newest_logentry = v;\n"); fprintf(cf, " txn->newest_logentry = v;\n");
fprintf(cf, " return 0;\n}\n"); fprintf(cf, " return 0;\n}\n");
}
})); }));
} }
...@@ -425,7 +451,7 @@ int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__u ...@@ -425,7 +451,7 @@ int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__u
fprintf(cf, "#include \"log_header.h\"\n"); fprintf(cf, "#include \"log_header.h\"\n");
fprintf(cf, "#include \"wbuf.h\"\n"); fprintf(cf, "#include \"wbuf.h\"\n");
fprintf(cf, "#include \"log-internal.h\"\n"); fprintf(cf, "#include \"log-internal.h\"\n");
generate_lt_enum(); generate_enum();
generate_log_struct(); generate_log_struct();
generate_dispatch(); generate_dispatch();
generate_log_writer(); generate_log_writer();
......
...@@ -870,7 +870,7 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v, ...@@ -870,7 +870,7 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
const BYTESTRING deleteddata = { kv->vallen, toku_memdup(kv_pair_val(kv), kv->vallen) }; const BYTESTRING deleteddata = { kv->vallen, toku_memdup(kv_pair_val(kv), kv->vallen) };
TOKUTXN txn; TOKUTXN txn;
if (0!=toku_txnid2txn(logger, xid, &txn)) return -1; if (0!=toku_txnid2txn(logger, xid, &txn)) return -1;
r=toku_logger_save_rollback_tl_delete(txn, pma->filenum, deletedkey, deleteddata); r=toku_logger_save_rollback_delete(txn, pma->filenum, deletedkey, deleteddata);
if (r!=0) { if (r!=0) {
toku_free(deletedkey.data); toku_free(deletedkey.data);
toku_free(deleteddata.data); toku_free(deleteddata.data);
......
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment