Commit 80154022 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

The recovered file is exactly the same as the original file for test_log2.tdb.

(The wrong LSN was being saved in the file.  The recovered version was right.)
Addresses #27.
(Also, it was broken, and that appears to be fixed.)


git-svn-id: file:///svn/tokudb@730 c7de825b-a66e-492c-adef-691d508d4ae1
parent b6c9d1bc
......@@ -94,7 +94,7 @@ brt-test: ybt.o brt.o hashtable.o pma.o memory.o brt-serialize.o cachetable.o he
log.o: log_header.h log-internal.h log.h wbuf.h crc.h brttypes.h $(BRT_INTERNAL_H_INCLUDES)
brt-test.o brt.o: brt.h ../include/db.h hashtable.h pma.h brttypes.h cachetable.h
brt-serialize-test.o: $(BRT_INTERNAL_H_INCLUDES)
brt.o: $(BRT_INTERNAL_H_INCLUDES)
brt.o: $(BRT_INTERNAL_H_INCLUDES) key.h log_header.h
mdict.o: pma.h
hashtable.o: hashtable.h brttypes.h memory.h key.h yerror.h ../include/db.h hashfun.h
memory.o: memory.h
......
......@@ -28,7 +28,8 @@ struct brtnode {
// BRT brt; // The containing BRT
unsigned int nodesize;
DISKOFF thisnodename; // The size of the node allocated on disk. Not all is necessarily in use.
LSN lsn; // Need the LSN as of the most recent modification.
LSN disk_lsn; // The LSN as of the most recent version on disk.
LSN log_lsn; // The LSN as of the most recent log write.
int layout_version; // What version of the data structure?
BRTNODE parent_brtnode; /* Invariant: The parent of an in-memory node must be in main memory. This is so we can find and update the down pointer when we change the diskoff of a node. */
int height; /* height is always >= 0. 0 for leaf, >0 for nonleaf. */
......
......@@ -20,7 +20,8 @@ void test_serialize(void) {
char *hello_string;
sn.nodesize = nodesize;
sn.thisnodename = sn.nodesize*20;
sn.lsn.lsn = 123456;
sn.disk_lsn.lsn = 789;
sn.log_lsn.lsn = 123456;
sn.layout_version = 0;
sn.height = 1;
sn.rand4fingerprint = randval;
......@@ -47,7 +48,7 @@ void test_serialize(void) {
assert(r==0);
assert(dn->thisnodename==nodesize*20);
assert(dn->lsn.lsn==123456);
assert(dn->disk_lsn.lsn==123456);
assert(dn->layout_version ==0);
assert(dn->height == 1);
assert(dn->rand4fingerprint==randval);
......
......@@ -99,7 +99,8 @@ void toku_serialize_brtnode_to(int fd, DISKOFF off, DISKOFF size, BRTNODE node)
if (node->height==0) wbuf_literal_bytes(&w, "leaf", 4);
else wbuf_literal_bytes(&w, "node", 4);
wbuf_int(&w, node->layout_version);
wbuf_ulonglong(&w, node->lsn.lsn);
fprintf(stderr, "%s:%d lsn at file write = %llu\n", __FILE__, __LINE__, node->log_lsn.lsn);
wbuf_ulonglong(&w, node->log_lsn.lsn);
//printf("%s:%d %lld.calculated_size=%d\n", __FILE__, __LINE__, off, calculated_size);
wbuf_int(&w, calculated_size);
wbuf_int(&w, node->height);
......@@ -241,7 +242,8 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int fl
r=DB_BADFORMAT;
goto died1;
}
result->lsn.lsn = rbuf_ulonglong(&rc);
result->disk_lsn.lsn = rbuf_ulonglong(&rc);
result->log_lsn = result->disk_lsn;
{
unsigned int stored_size = rbuf_int(&rc);
if (stored_size!=datasize) { r=DB_BADFORMAT; goto died1; }
......
......@@ -24,6 +24,7 @@
#include "brt-internal.h"
#include "key.h"
#include "log_header.h"
#include <stdlib.h>
#include <assert.h>
......@@ -135,7 +136,7 @@ static int brt_compare_pivot(BRT brt, DBT *key, DBT *data, bytevec ck, unsigned
}
void brtnode_flush_callback (CACHEFILE cachefile, DISKOFF nodename, void *brtnode_v, long size __attribute((unused)), BOOL write_me, BOOL keep_me, LSN modified_lsn, BOOL rename_p __attribute__((__unused__))) {
void brtnode_flush_callback (CACHEFILE cachefile, DISKOFF nodename, void *brtnode_v, long size __attribute((unused)), BOOL write_me, BOOL keep_me, LSN modified_lsn __attribute__((__unused__)) , BOOL rename_p __attribute__((__unused__))) {
BRTNODE brtnode = brtnode_v;
// if ((write_me || keep_me) && (brtnode->height==0)) {
// toku_pma_verify_fingerprint(brtnode->u.l.buffer, brtnode->rand4fingerprint, brtnode->subtree_fingerprint);
......@@ -145,7 +146,7 @@ void brtnode_flush_callback (CACHEFILE cachefile, DISKOFF nodename, void *brtnod
if (brtnode->height==0) printf(" pma=%p", brtnode->u.l.buffer);
printf("\n");
}
if (modified_lsn.lsn > brtnode->lsn.lsn) brtnode->lsn=modified_lsn;
//if (modified_lsn.lsn > brtnode->lsn.lsn) brtnode->lsn=modified_lsn;
fix_up_parent_pointers_of_children_now_that_parent_is_gone(cachefile, brtnode);
assert(brtnode->thisnodename==nodename);
{
......@@ -200,7 +201,7 @@ int brtnode_fetch_callback (CACHEFILE cachefile, DISKOFF nodename, void **brtnod
t->compare_fun, t->dup_compare);
if (r == 0)
*sizep = brtnode_size(*result);
*written_lsn = (*result)->lsn;
*written_lsn = (*result)->disk_lsn;
//(*result)->parent_brtnode = 0; /* Don't know it right now. */
//printf("%s:%d installed %p (offset=%lld)\n", __FILE__, __LINE__, *result, nodename);
return r;
......@@ -297,7 +298,8 @@ static void initialize_brtnode (BRT t, BRTNODE n, DISKOFF nodename, int height)
// n->brt = t;
n->nodesize = t->h->nodesize;
n->thisnodename = nodename;
n->lsn.lsn = 0; // a new one can always be 0.
n->disk_lsn.lsn = 0; // a new one can always be 0.
n->log_lsn = n->disk_lsn;
n->layout_version = 0;
n->height = height;
n->rand4fingerprint = random();
......@@ -1451,7 +1453,11 @@ static int setup_brt_root_node (BRT t, DISKOFF offset, TOKUTXN txn) {
//printf("%s:%d created %lld\n", __FILE__, __LINE__, node->thisnodename);
toku_verify_counts(node);
// verify_local_fingerprint_nonleaf(node);
tokulogger_log_newbrtnode(txn, toku_cachefile_filenum(t->cf), offset, 0, t->h->nodesize, (t->flags&TOKU_DB_DUPSORT)!=0, node->rand4fingerprint);
toku_log_newbrtnode(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(t->cf), offset, 0, t->h->nodesize, (t->flags&TOKU_DB_DUPSORT)!=0, node->rand4fingerprint);
if (txn) {
node->log_lsn = toku_txn_get_last_lsn(txn);
fprintf(stderr, "%s:%d last lsn=%llu\n", __FILE__, __LINE__, node->log_lsn.lsn);
}
r=toku_cachetable_unpin(t->cf, node->thisnodename, node->dirty, brtnode_size(node));
if (r!=0) {
toku_free(node);
......
......@@ -42,7 +42,15 @@ typedef struct loggedbrtheader {
DISKOFF freelist;
DISKOFF unused_memory;
u_int32_t n_named_roots;
DISKOFF root;
union {
struct {
char **names;
DISKOFF *roots;
} many;
struct {
DISKOFF root;
} one;
} u;
} LOGGEDBRTHEADER;
#endif
......@@ -37,6 +37,7 @@ struct tokutxn {
u_int64_t txnid64;
TOKULOGGER logger;
TOKUTXN parent;
LSN last_lsn; /* Everytime anything is logged, update the LSN. (We need to atomically record the LSN along with writing into the log.) */
};
int tokulogger_finish (TOKULOGGER logger, struct wbuf *wbuf);
......
......@@ -57,7 +57,7 @@ int tokulogger_find_logfiles (const char *directory, int *n_resultsp, char ***re
}
*n_resultsp = n_results;
*resultp = result;
return 0;
return closedir(d);
}
int tokulogger_create_and_open_logger (const char *directory, TOKULOGGER *resultp) {
......@@ -350,6 +350,29 @@ int tokulogger_log_unlink (TOKUTXN txn, const char *fname) {
};
int tokulogger_log_header (TOKUTXN txn, FILENUM filenum, struct brt_header *h) {
#if 0
LOGGEDBRTHEADER lh;
lh.size = toku_serialize_brt_header_size(h);
lh.flags = h->flags;
lh.nodesize = h->nodesize;
lh.freelist = h->freelist;
lh.unused_memory = h->unused_memory;
lh.n_named_roots = h->n_named_roots;
if (h->n_named_roots==-1) {
lh.u.one.root = h->unnamed_root;
} else {
int i;
MALLOC_N(h->n_named_roots, lh.u.many.names);
MALLOC_N(h->n_named_roots, lh.u.many.roots);
for (i=0; i<h->n_named_roots; i++) {
lh.u.many.names[i]=toku_strdup(h->names[i]);
lh.u.many.roots[i]=h->roots[i];
}
}
r = toku_log_fheader(txn, toku_txn_get_txnid(txn), filenum, lh);
toku_free(all_that_stuff);
return r;
#else
if (txn==0) return 0;
int subsize=toku_serialize_brt_header_size(h);
int buflen = (1+
......@@ -374,35 +397,7 @@ int tokulogger_log_header (TOKUTXN txn, FILENUM filenum, struct brt_header *h) {
r=tokulogger_finish(txn->logger, &wbuf);
toku_free(buf);
return r;
}
int tokulogger_log_newbrtnode (TOKUTXN txn, FILENUM filenum, DISKOFF offset, u_int32_t height, u_int32_t nodesize, char is_dup_sort_mode, u_int32_t rand4fingerprint) {
if (txn==0) return 0;
int buflen=(1+
+ 8 // lsn
+ 8 // txnid
+ 4 // filenum
+ 8 // diskoff
+ 4 // height
+ 4 // nodesize
+ 1 // is_dup_sort_mode
+ 4 // rand4fingerprint
+ 8 // crc & len
);
unsigned char buf[buflen];
struct wbuf wbuf;
wbuf_init (&wbuf, buf, buflen);
wbuf_char(&wbuf, LT_NEWBRTNODE);
wbuf_LSN (&wbuf, txn->logger->lsn);
txn->logger->lsn.lsn++;
wbuf_TXNID(&wbuf, txn->txnid64);
wbuf_FILENUM(&wbuf, filenum);
wbuf_DISKOFF(&wbuf, offset);
wbuf_int(&wbuf, height);
wbuf_int(&wbuf, nodesize);
wbuf_char(&wbuf, is_dup_sort_mode);
wbuf_int(&wbuf, rand4fingerprint);
return tokulogger_finish(txn->logger, &wbuf);
#endif
}
/*
......@@ -506,7 +501,7 @@ int toku_fread_LOGGEDBRTHEADER(FILE *f, LOGGEDBRTHEADER *v, u_int32_t *crc, u_in
r = toku_fread_DISKOFF (f, &v->unused_memory, crc, len); if (r!=0) return r;
r = toku_fread_u_int32_t(f, &v->n_named_roots, crc, len); if (r!=0) return r;
assert((signed)v->n_named_roots==-1);
r = toku_fread_DISKOFF (f, &v->root, crc, len); if (r!=0) return r;
r = toku_fread_DISKOFF (f, &v->u.one.root, crc, len); if (r!=0) return r;
return 0;
}
......@@ -606,3 +601,12 @@ int read_and_print_logmagic (FILE *f, u_int32_t *versionp) {
}
return 0;
}
TXNID toku_txn_get_txnid (TOKUTXN txn) {
if (txn==0) return 0;
else return txn->txnid64;
}
LSN toku_txn_get_last_lsn (TOKUTXN txn) {
return txn->last_lsn;
}
......@@ -51,4 +51,7 @@ int toku_logprint_LOGGEDBRTHEADER (FILE *outf, FILE *inf, const char *fieldname,
int read_and_print_logmagic (FILE *f, u_int32_t *version);
TXNID toku_txn_get_txnid (TOKUTXN);
LSN toku_txn_get_last_lsn (TOKUTXN);
#endif
......@@ -130,6 +130,7 @@ void generate_log_writer (void) {
fprintf2(cf, hf, ", %s %s", ft->type, ft->name));
fprintf(hf, ");\n");
fprintf(cf, ") {\n");
fprintf(cf, " if (txn==0) return 0;\n");
fprintf(cf, " const unsigned int buflen= (1 // log command\n");
fprintf(cf, " +8 // lsn\n");
DO_FIELDS(ft, lt,
......@@ -142,7 +143,8 @@ void generate_log_writer (void) {
fprintf(cf, " wbuf_init(&wbuf, buf, buflen);\n");
fprintf(cf, " wbuf_char(&wbuf, '%c');\n", lt->command);
fprintf(cf, " wbuf_LSN(&wbuf, txn->logger->lsn);\n");
fprintf(cf, " txn->logger->lsn.lsn++;;\n");
fprintf(cf, " txn->last_lsn = txn->logger->lsn;\n");
fprintf(cf, " txn->logger->lsn.lsn++;\n");
DO_FIELDS(ft, lt,
fprintf(cf, " wbuf_%s(&wbuf, %s);\n", ft->type, ft->name));
fprintf(cf, " int r= tokulogger_finish(txn->logger, &wbuf);\n");
......
......@@ -49,6 +49,7 @@ static void toku_recover_fcreate (struct logtype_fcreate *c) {
int fd = creat(fname, c->mode);
assert(fd>=0);
toku_free(fname);
toku_free(c->fname.data);
}
static void toku_recover_fheader (struct logtype_fheader *c) {
CACHEFILE cf = find_cachefile(c->filenum);
......@@ -113,6 +114,8 @@ static void toku_recover_fopen (struct logtype_fopen *c) {
}
cf_pairs[n_cf_pairs-1].filenum = c->filenum;
cf_pairs[n_cf_pairs-1].cf = cf;
toku_free(fname);
toku_free(c->fname.data);
}
int main (int argc, char *argv[]) {
......@@ -152,6 +155,7 @@ int main (int argc, char *argv[]) {
r = toku_cachefile_close(&cf_pairs[i].cf);
assert(r==0);
}
toku_free(cf_pairs);
r = toku_cachetable_close(&ct);
assert(r==0);
for (i=0; i<n_logfiles; i++) {
......
......@@ -124,8 +124,15 @@ static inline void wbuf_LOGGEDBRTHEADER (struct wbuf *w, LOGGEDBRTHEADER h) {
wbuf_DISKOFF(w, h.freelist);
wbuf_DISKOFF(w, h.unused_memory);
wbuf_int(w, h.n_named_roots);
assert(h.n_named_roots==0);
wbuf_DISKOFF(w, h.root);
if ((signed)h.n_named_roots==-1) {
wbuf_DISKOFF(w, h.u.one.root);
} else {
unsigned int i;
for (i=0; i<h.n_named_roots; i++) {
wbuf_DISKOFF(w, h.u.many.roots[i]);
wbuf_bytes (w, h.u.many.names[i], 1+strlen(h.u.many.names[i]));
}
}
}
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment