Commit 9bdd6eaf authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

All the old recovery tests work (#1364), and are merged onto the main line.

Refs #1510, Closes #1364.
This work was done on the 1510 branch, which we will create again.


git-svn-id: file:///svn/toku/tokudb@10313 c7de825b-a66e-492c-adef-691d508d4ae1
parent e4ee82ef
......@@ -343,13 +343,6 @@ toku_block_recovery_set_unused_blocks(BLOCK_TABLE bt, BLOCKNUM newunused) {
unlock_for_blocktable(bt);
}
void
toku_block_recovery_set_free_blocks(BLOCK_TABLE bt, BLOCKNUM newfree) {
lock_for_blocktable(bt);
bt->free_blocks = newfree;
unlock_for_blocktable(bt);
}
void
toku_block_memcpy_translation_table(BLOCK_TABLE bt, size_t n, void *p) {
lock_for_blocktable(bt);
......@@ -405,40 +398,73 @@ toku_blocktable_debug_set_translation(BLOCK_TABLE bt,
unlock_for_blocktable(bt);
}
void
toku_blocktable_create(BLOCK_TABLE *btp,
BLOCKNUM free_blocks,
BLOCKNUM unused_blocks,
u_int64_t translated_blocknum_limit,
u_int64_t block_translation_address_on_disk,
u_int64_t block_translation_size_on_disk,
unsigned char *buffer) {
BLOCK_TABLE bt;
XMALLOC(bt);
static void
toku_blocktable_create_internal (BLOCK_TABLE *btp,
BLOCKNUM free_blocks,
BLOCKNUM unused_blocks,
u_int64_t translated_blocknum_limit,
u_int64_t block_translation_address_on_disk)
// Effect: Fill it in, including the translation table, which is uninitialized
{
BLOCK_TABLE XMALLOC(bt);
blocktable_lock_init(bt);
bt->free_blocks = free_blocks;
bt->unused_blocks = unused_blocks;
bt->translated_blocknum_limit = translated_blocknum_limit;
bt->block_translation_address_on_disk = block_translation_address_on_disk;
update_size_on_disk(bt);
if (block_translation_address_on_disk==0 && block_translation_size_on_disk == 0) {
if (block_translation_address_on_disk==0) {
bt->block_translation_size_on_disk = 0;
}
assert(block_translation_size_on_disk==bt->block_translation_size_on_disk);
// Set up the the block translation buffer.
create_block_allocator(&bt->block_allocator, BLOCK_ALLOCATOR_HEADER_RESERVE, BLOCK_ALLOCATOR_ALIGNMENT);
if (block_translation_address_on_disk==0) {
bt->block_translation = NULL;
assert(buffer==NULL);
}
else {
} else {
XMALLOC_N(translated_blocknum_limit, bt->block_translation);
//Mark where the translation table is stored on disk.
block_allocator_alloc_block_at(bt->block_allocator, bt->block_translation_size_on_disk, bt->block_translation_address_on_disk);
}
*btp = bt;
}
void toku_blocktable_create_from_loggedheader(BLOCK_TABLE *btp, LOGGEDBRTHEADER h) {
// We don't need the lock for the block table for this operation.
BLOCK_TABLE bt;
toku_blocktable_create_internal (&bt,
h.free_blocks,
h.unused_blocks,
h.btt_size.b,
h.btt_diskoff);
int64_t i;
for (i=0; i<h.btt_size.b; i++) {
bt->block_translation[i].diskoff = h.btt_pairs[i].off;
bt->block_translation[i].size = h.btt_pairs[i].size;
if (h.btt_pairs[i].size > 0) {
block_allocator_alloc_block_at(bt->block_allocator, h.btt_pairs[i].size, h.btt_pairs[i].off);
}
}
*btp = bt;
}
void
toku_blocktable_create(BLOCK_TABLE *btp,
BLOCKNUM free_blocks,
BLOCKNUM unused_blocks,
u_int64_t translated_blocknum_limit,
u_int64_t block_translation_address_on_disk,
unsigned char *buffer) {
BLOCK_TABLE bt;
toku_blocktable_create_internal(&bt,
free_blocks, unused_blocks,
translated_blocknum_limit,
block_translation_address_on_disk);
// Set up the the block translation buffer.
if (block_translation_address_on_disk != 0) {
assert(buffer);
//Load translations from the buffer.
u_int64_t i;
struct rbuf rt;
......@@ -453,7 +479,8 @@ toku_blocktable_create(BLOCK_TABLE *btp,
block_allocator_alloc_block_at(bt->block_allocator, bt->block_translation[i].size, bt->block_translation[i].diskoff);
//printf("%s:%d %ld %ld\n", __FILE__, __LINE__, bt->block_translation[i].diskoff, bt->block_translation[i].size);
}
} else {
assert(buffer==0);
}
// printf("%s:%d translated_blocknum_limit=%ld, block_translation_address_on_disk=%ld\n", __FILE__, __LINE__, bt->translated_blocknum_limit, bt->block_translation_address_on_disk);
......@@ -466,6 +493,5 @@ toku_blocktable_create_new(BLOCK_TABLE *btp) {
toku_blocktable_create(btp,
make_blocknum(-1),
make_blocknum(2),
0, 0, 0, NULL);
0, 0, NULL);
}
......@@ -34,12 +34,10 @@ void toku_blocktable_create(BLOCK_TABLE *btp,
BLOCKNUM unused_blocks,
u_int64_t translated_blocknum_limit,
u_int64_t block_translation_address_on_disk,
u_int64_t block_translation_size_on_disk,
unsigned char *buffer);
void toku_blocktable_create_from_loggedheader(BLOCK_TABLE *btp, LOGGEDBRTHEADER);
void toku_blocktable_create_new(BLOCK_TABLE *bt);
void toku_block_recovery_set_unused_blocks(BLOCK_TABLE bt, BLOCKNUM newunused);
void toku_block_recovery_set_free_blocks(BLOCK_TABLE bt, BLOCKNUM newfree);
BLOCKNUM toku_block_get_unused_blocks(BLOCK_TABLE bt);
BLOCKNUM toku_block_get_free_blocks(BLOCK_TABLE bt);
u_int64_t toku_block_get_translated_blocknum_limit(BLOCK_TABLE bt);
......
......@@ -194,6 +194,7 @@ struct brtenv {
extern void toku_brtnode_flush_callback (CACHEFILE cachefile, BLOCKNUM nodename, void *brtnode_v, void *extraargs, long size, BOOL write_me, BOOL keep_me, LSN modified_lsn, BOOL rename_p);
extern int toku_brtnode_fetch_callback (CACHEFILE cachefile, BLOCKNUM nodename, u_int32_t fullhash, void **brtnode_pv, long *sizep, void*extraargs, LSN *written_lsn);
extern int toku_brt_alloc_init_header(BRT t, const char *dbname);
extern int toku_read_brt_header_and_store_in_cachefile (CACHEFILE cf, struct brt_header **header);
extern CACHEKEY* toku_calculate_root_offset_pointer (BRT brt, u_int32_t *root_hash);
......
......@@ -825,7 +825,7 @@ deserialize_brtheader (u_int32_t size, int fd, DISKOFF off, struct brt_header **
free_blocks, unused_blocks,
translated_blocknum_limit,
block_translation_address_on_disk,
block_translation_size_on_disk, NULL);
NULL);
}
else {
//Load translation table if it exists on disk.
......@@ -850,7 +850,6 @@ deserialize_brtheader (u_int32_t size, int fd, DISKOFF off, struct brt_header **
free_blocks, unused_blocks,
translated_blocknum_limit,
block_translation_address_on_disk,
block_translation_size_on_disk,
tbuf);
unlock_for_pwrite();
toku_free(tbuf);
......
......@@ -2670,7 +2670,7 @@ static int brt_open_file(BRT brt, const char *fname, int is_create, int *fdp, BO
return 0;
}
static int brt_init_header(BRT t, TOKUTXN txn) {
static int brt_init_header(BRT t) {
int r;
BLOCKNUM root = make_blocknum(1);
......@@ -2687,19 +2687,24 @@ static int brt_init_header(BRT t, TOKUTXN txn) {
{
BLOCKNUM free_blocks = toku_block_get_free_blocks(t->h->blocktable);
BLOCKNUM unused_blocks = toku_block_get_unused_blocks(t->h->blocktable);
u_int64_t n_blocks_translated = toku_block_get_translated_blocknum_limit(t->h->blocktable);
assert(n_blocks_translated==0);
LOGGEDBRTHEADER lh = {.size= toku_serialize_brt_header_size(t->h),
.flags = t->flags,
.nodesize = t->h->nodesize,
.free_blocks = free_blocks,
.unused_blocks = unused_blocks,
.n_named_roots = t->h->n_named_roots };
.n_named_roots = t->h->n_named_roots,
.btt_size = make_blocknum(n_blocks_translated),
.btt_diskoff = 0, // No diskoffset yet allocated, since it's a new blocktable.
.btt_pairs = 0};
if (t->h->n_named_roots>=0) {
lh.u.many.names = t->h->names;
lh.u.many.roots = t->h->roots;
} else {
lh.u.one.root = t->h->roots[0];
}
if ((r=toku_log_fheader(toku_txn_logger(txn), (LSN*)0, 0, toku_txn_get_txnid(txn), toku_cachefile_filenum(t->cf), lh))) { return r; }
//if ((r=toku_log_fheader(toku_txn_logger(txn), (LSN*)0, 0, toku_txn_get_txnid(txn), toku_cachefile_filenum(t->cf), lh))) { return r; }
}
if ((r=setup_initial_brt_root_node(t, root))!=0) { return r; }
//printf("%s:%d putting %p (%d)\n", __FILE__, __LINE__, t->h, 0);
......@@ -2711,7 +2716,7 @@ static int brt_init_header(BRT t, TOKUTXN txn) {
// allocate and initialize a brt header.
// t->cf is not set to anything.
static int brt_alloc_init_header(BRT t, const char *dbname, TOKUTXN txn) {
int toku_brt_alloc_init_header(BRT t, const char *dbname) {
int r;
r = brtheader_alloc(&t->h);
......@@ -2736,7 +2741,7 @@ static int brt_alloc_init_header(BRT t, const char *dbname, TOKUTXN txn) {
t->h->names=0;
}
r = brt_init_header(t, txn);
r = brt_init_header(t);
if (r != 0) goto died7;
return r;
}
......@@ -2805,8 +2810,9 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char
r = toku_logger_log_fcreate(txn, fname_in_env, toku_cachefile_filenum(t->cf), mode);
if (r != 0) goto died_after_open;
t->txnid_that_created_or_locked_when_empty = toku_txn_get_txnid(txn);
}
r = toku_logger_log_fopen(txn, fname_in_env, toku_cachefile_filenum(t->cf));
} else {
r = toku_logger_log_fopen(txn, fname_in_env, toku_cachefile_filenum(t->cf));
}
}
if (r!=0) {
died_after_open:
......@@ -2823,7 +2829,7 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char
if (is_create) {
r = toku_read_brt_header_and_store_in_cachefile(t->cf, &t->h);
if (r==-1) {
r = brt_alloc_init_header(t, dbname, txn);
r = toku_brt_alloc_init_header(t, dbname);
if (r != 0) goto died_after_read_and_pin;
}
else if (r!=0) {
......@@ -4421,7 +4427,7 @@ int toku_brt_truncate (BRT brt) {
// reinit the header
brtheader_partial_destroy(brt->h);
r = brt_init_header(brt, NULL_TXN);
r = brt_init_header(brt);
return r;
}
......
......@@ -48,6 +48,11 @@ typedef struct tokulogger *TOKULOGGER;
typedef struct tokutxn *TOKUTXN;
#define NULL_TXN ((TOKUTXN)0)
struct logged_btt_pair {
DISKOFF off;
int32_t size;
};
// The data that appears in the log to encode a brtheader. */
typedef struct loggedbrtheader {
u_int32_t size;
......@@ -65,6 +70,9 @@ typedef struct loggedbrtheader {
BLOCKNUM root;
} one;
} u;
BLOCKNUM btt_size; // block translation table size
DISKOFF btt_diskoff;
struct logged_btt_pair *btt_pairs;
} LOGGEDBRTHEADER;
typedef struct intpairarray {
......
......@@ -1488,8 +1488,8 @@ unlock_for_graceful (void) {
assert(r==0);
}
static void
graceful_fill_names(const char *db_fname, char *cleanbuf, size_t cleansize, char *dirtybuf, size_t dirtysize) {
void
toku_graceful_fill_names(const char *db_fname, char *cleanbuf, size_t cleansize, char *dirtybuf, size_t dirtysize) {
int written;
written = snprintf(cleanbuf, cleansize, "%s.clean", db_fname);
assert(written>=0);
......@@ -1506,7 +1506,7 @@ graceful_open_get_append_fd(const char *db_fname, BOOL *was_dirtyp, BOOL *create
char cleanbuf[strlen(db_fname) + sizeof(".clean")];
char dirtybuf[strlen(db_fname) + sizeof(".dirty")];
graceful_fill_names(db_fname, cleanbuf, sizeof(cleanbuf), dirtybuf, sizeof(dirtybuf));
toku_graceful_fill_names(db_fname, cleanbuf, sizeof(cleanbuf), dirtybuf, sizeof(dirtybuf));
struct stat tmpbuf;
clean_exists = (BOOL)(stat(cleanbuf, &tmpbuf) == 0);
......@@ -1536,7 +1536,7 @@ graceful_close_get_append_fd(const char *db_fname, BOOL *db_missing) {
char cleanbuf[strlen(db_fname) + sizeof(".clean")];
char dirtybuf[strlen(db_fname) + sizeof(".dirty")];
graceful_fill_names(db_fname, cleanbuf, sizeof(cleanbuf), dirtybuf, sizeof(dirtybuf));
toku_graceful_fill_names(db_fname, cleanbuf, sizeof(cleanbuf), dirtybuf, sizeof(dirtybuf));
struct stat tmpbuf;
clean_exists = (BOOL)(stat(cleanbuf, &tmpbuf) == 0);
......@@ -1562,7 +1562,7 @@ graceful_dirty_get_append_fd(const char *db_fname) {
char cleanbuf[strlen(db_fname) + sizeof(".clean")];
char dirtybuf[strlen(db_fname) + sizeof(".dirty")];
graceful_fill_names(db_fname, cleanbuf, sizeof(cleanbuf), dirtybuf, sizeof(dirtybuf));
toku_graceful_fill_names(db_fname, cleanbuf, sizeof(cleanbuf), dirtybuf, sizeof(dirtybuf));
struct stat tmpbuf;
clean_exists = (BOOL)(stat(cleanbuf, &tmpbuf) == 0);
......
......@@ -216,6 +216,8 @@ int toku_graceful_delete(const char *db_fname);
void toku_graceful_lock_init(void);
void toku_graceful_lock_destroy(void);
void toku_graceful_fill_names(const char *db_fname, char *cleanbuf, size_t cleansize, char *dirtybuf, size_t dirtysize);
#define TOKU_CACHETABLE_DO_EVICT_FROM_WRITER 0
void toku_cachetable_maybe_flush_some(CACHETABLE ct);
......
......@@ -135,7 +135,10 @@ static inline int toku_logsizeof_BYTESTRING (BYTESTRING bs) {
}
static inline int toku_logsizeof_LOGGEDBRTHEADER (LOGGEDBRTHEADER bs) {
int in_both = 4+4+4+8+8+4;
int in_both = 4+4+4+8+8+4+8;
in_both += 8; // for the number of block headers
assert(bs.btt_size.b< 30000000); // otherwise we in trouble on the next line
in_both += 12*bs.btt_size.b;
if (bs.n_named_roots==-1)
return in_both+8;
else {
......
......@@ -675,6 +675,16 @@ int toku_fread_LOGGEDBRTHEADER (FILE *f, LOGGEDBRTHEADER *v, struct x1764 *check
r = toku_fread_int32_t (f, &v->n_named_roots, checksum, len); if (r!=0) return r;
assert(v->n_named_roots==-1);
r = toku_fread_BLOCKNUM (f, &v->u.one.root, checksum, len); if (r!=0) return r;
r = toku_fread_BLOCKNUM (f, &v->btt_size, checksum, len); if (r!=0) return r;
r = toku_fread_DISKOFF (f, &v->btt_diskoff, checksum, len); if (r!=0) return r;
XMALLOC_N(v->btt_size.b, v->btt_pairs);
int64_t i;
for (i=0; i<v->btt_size.b; i++) {
r = toku_fread_DISKOFF(f, &v->btt_pairs[i].off, checksum, len);
if (r!=0) { toku_free(v->btt_pairs); return r; }
r = toku_fread_int32_t (f, &v->btt_pairs[i].size, checksum, len);
if (r!=0) { toku_free(v->btt_pairs); return r; }
}
return 0;
}
......@@ -779,6 +789,13 @@ int toku_logprint_LOGGEDBRTHEADER (FILE *outf, FILE *inf, const char *fieldname,
int r = toku_fread_LOGGEDBRTHEADER(inf, &v, checksum, len);
if (r!=0) return r;
fprintf(outf, " %s={size=%u flags=%u nodesize=%u free_blocks=%" PRId64 " unused_memory=%" PRId64 " n_named_roots=%d", fieldname, v.size, v.flags, v.nodesize, v.free_blocks.b, v.unused_blocks.b, v.n_named_roots);
fprintf(outf, " btt_size=%" PRId64 " btt_diskoff=%" PRId64 " btt_pairs={", v.btt_size.b, v.btt_diskoff) ;
int64_t i;
for (i=0; i<v.btt_size.b; i++) {
if (i>0) printf(" ");
fprintf(outf, "%" PRId64 ",%d", v.btt_pairs[i].off, v.btt_pairs[i].size);
}
fprintf(outf, "}");
return 0;
}
......
......@@ -115,10 +115,6 @@ const struct logtype logtypes[] = {
{"BLOCKNUM", "oldroot", 0},
{"BLOCKNUM", "newroot", 0},
NULLFIELD}},
{"changeunusedmemory", 'm', FA{{"FILENUM", "filenum", 0},
{"BLOCKNUM", "oldunused", 0},
{"BLOCKNUM", "newunused", 0},
NULLFIELD}},
{"fopen", 'O', FA{{"TXNID", "txnid", 0},
{"BYTESTRING", "fname", 0},
{"FILENUM", "filenum", 0},
......
......@@ -17,6 +17,7 @@
#endif
static DB * const null_db=0;
static TOKULOGGER const null_tokulogger = 0;
// These data structures really should be part of a recovery data structure. Recovery could be multithreaded (on different environments...) But this is OK since recovery can only happen in one
static CACHETABLE ct;
......@@ -82,18 +83,6 @@ create_dir_from_file (const char *fname) {
toku_free(tmp);
}
static void
toku_recover_fcreate (LSN UU(lsn), TXNID UU(txnid), FILENUM UU(filenum), BYTESTRING fname,u_int32_t mode) {
char *fixed_fname = fixup_fname(&fname);
create_dir_from_file(fixed_fname);
int fd = open(fixed_fname, O_CREAT+O_TRUNC+O_WRONLY+O_BINARY, mode);
assert(fd>=0);
toku_free(fixed_fname);
toku_free_BYTESTRING(fname);
int r = close(fd);
assert(r==0);
}
static int
toku_recover_note_cachefile (FILENUM fnum, CACHEFILE cf, BRT brt) {
if (max_cf_pairs==0) {
......@@ -114,6 +103,61 @@ toku_recover_note_cachefile (FILENUM fnum, CACHEFILE cf, BRT brt) {
return 0;
}
#define CLEANSUFFIX ".clean"
#define DIRTYSUFFIX ".dirty"
static void
internal_toku_recover_fopen_or_fcreate (int flags, int mode, char *fixedfname, FILENUM filenum) {
// If .dirty file exists rename it to .clean
int slen = strlen(fixedfname);
char cleanname[slen + sizeof(CLEANSUFFIX)];
char dirtyname[slen + sizeof(DIRTYSUFFIX)];
toku_graceful_fill_names(fixedfname, cleanname, sizeof(cleanname), dirtyname, sizeof(dirtyname));
struct stat tmpbuf;
BOOL clean_exists = stat(cleanname, &tmpbuf)==0;
BOOL dirty_exists = stat(dirtyname, &tmpbuf)==0;
if (dirty_exists) {
if (clean_exists) { int r = unlink(dirtyname); assert(r==0); }
else { int r = rename(dirtyname, cleanname); assert(r==0); }
}
CACHEFILE cf;
int fd = open(fixedfname, O_RDWR|O_BINARY|flags, mode);
assert(fd>=0);
BRT brt=0;
int r = toku_brt_create(&brt);
assert(r==0);
brt->fname = fixedfname;
brt->database_name = 0;
brt->h=0;
brt->compare_fun = toku_default_compare_fun; // we'll need to set these to the right comparison function, or do without them.
brt->dup_compare = toku_default_compare_fun;
brt->db = 0;
r = toku_cachetable_openfd(&cf, ct, fd, fixedfname);
assert(r==0);
brt->cf=cf;
r = toku_read_brt_header_and_store_in_cachefile(brt->cf, &brt->h);
if (r==-1) {
r = toku_brt_alloc_init_header(brt, 0);
}
toku_recover_note_cachefile(filenum, cf, brt);
}
static void
toku_recover_fopen (LSN UU(lsn), TXNID UU(txnid), BYTESTRING fname, FILENUM filenum) {
char *fixedfname = fixup_fname(&fname);
toku_free_BYTESTRING(fname);
internal_toku_recover_fopen_or_fcreate(0, 0, fixedfname, filenum);
}
// fcreate is like fopen except that the file must be created. Also creates the dir if needed.
static void
toku_recover_fcreate (LSN UU(lsn), TXNID UU(txnid), FILENUM filenum, BYTESTRING fname,u_int32_t mode) {
char *fixedfname = fixup_fname(&fname);
toku_free_BYTESTRING(fname);
create_dir_from_file(fixedfname);
internal_toku_recover_fopen_or_fcreate(O_CREAT|O_TRUNC, mode, fixedfname, filenum);
}
static int find_cachefile (FILENUM fnum, struct cf_pair **cf_pair) {
int i;
for (i=0; i<n_cf_pairs; i++) {
......@@ -125,7 +169,7 @@ static int find_cachefile (FILENUM fnum, struct cf_pair **cf_pair) {
return 1;
}
static void toku_recover_fheader (LSN UU(lsn), TXNID UU(txnid),FILENUM filenum,LOGGEDBRTHEADER header) {
static void toku_recover_fheader (LSN UU(lsn), TXNID UU(txnid),FILENUM filenum, LOGGEDBRTHEADER header) {
struct cf_pair *pair = NULL;
int r = find_cachefile(filenum, &pair);
assert(r==0);
......@@ -137,9 +181,9 @@ static void toku_recover_fheader (LSN UU(lsn), TXNID UU(txnid),FILENUM filenum,L
XMALLOC(h->flags_array);
h->flags_array[0] = header.flags;
h->nodesize = header.nodesize;
assert(h->blocktable /* Not initialized. Is this used? */);
toku_block_recovery_set_free_blocks(h->blocktable, header.free_blocks);
toku_block_recovery_set_unused_blocks(h->blocktable, header.unused_blocks);
toku_blocktable_create_from_loggedheader(&h->blocktable,
header);
assert(h->blocktable);
h->n_named_roots = header.n_named_roots;
r=toku_fifo_create(&h->fifo);
assert(r==0);
......@@ -195,53 +239,24 @@ toku_recover_enqrootentry (LSN lsn __attribute__((__unused__)), FILENUM filenum,
struct cf_pair *pair = NULL;
int r = find_cachefile(filenum, &pair);
assert(r==0);
void *h_v;
u_int32_t fullhash = toku_cachetable_hash(pair->cf, header_blocknum);
if (0) {
//r = toku_cachetable_get_and_pin(pair->cf, header_blocknum, fullhash, &h_v, NULL, toku_brtheader_flush_callback, toku_brtheader_fetch_callback, 0);
} else {
h_v=0;
assert(0);
}
assert(r==0);
struct brt_header *h=h_v;
r = toku_fifo_enq(h->fifo, key.data, key.len, val.data, val.len, typ, xid);
assert(r==0);
r = toku_cachetable_unpin(pair->cf, header_blocknum, fullhash, CACHETABLE_DIRTY, 0);
struct brt_cmd cmd;
DBT keydbt, valdbt;
cmd.type=typ;
cmd.xid =xid;
cmd.u.id.key = toku_fill_dbt(&keydbt, key.data, key.len);
cmd.u.id.val = toku_fill_dbt(&valdbt, val.data, val.len);
r = toku_brt_root_put_cmd(pair->brt, &cmd, null_tokulogger);
assert(r==0);
toku_free(key.data);
toku_free(val.data);
}
static void
toku_recover_fopen (LSN UU(lsn), TXNID UU(txnid), BYTESTRING fname, FILENUM filenum) {
char *fixedfname = fixup_fname(&fname);
CACHEFILE cf;
int fd = open(fixedfname, O_RDWR+O_BINARY, 0);
assert(fd>=0);
BRT brt=0;
int r = toku_brt_create(&brt);
assert(r==0);
brt->fname = fixedfname;
brt->database_name = 0;
brt->h=0;
brt->compare_fun = 0;
brt->dup_compare = 0;
brt->db = 0;
r = toku_cachetable_openfd(&cf, ct, fd, fixedfname);
assert(r==0);
brt->cf=cf;
toku_recover_note_cachefile(filenum, cf, brt);
toku_free_BYTESTRING(fname);
}
static void
toku_recover_brtclose (LSN UU(lsn), BYTESTRING UU(fname), FILENUM filenum) {
struct cf_pair *pair = NULL;
int r = find_cachefile(filenum, &pair);
assert(r==0);
// Bump up the reference count
toku_cachefile_refup(pair->cf);
r = toku_close_brt(pair->brt, 0, 0);
assert(r==0);
pair->brt=0;
......@@ -276,16 +291,6 @@ toku_recover_changeunnamedroot (LSN UU(lsn), FILENUM filenum, BLOCKNUM UU(oldroo
static void
toku_recover_changenamedroot (LSN UU(lsn), FILENUM UU(filenum), BYTESTRING UU(name), BLOCKNUM UU(oldroot), BLOCKNUM UU(newroot)) { assert(0); }
static void
toku_recover_changeunusedmemory (LSN UU(lsn), FILENUM filenum, BLOCKNUM UU(oldunused), BLOCKNUM newunused) {
struct cf_pair *pair = NULL;
int r = find_cachefile(filenum, &pair);
assert(r==0);
assert(pair->brt);
assert(pair->brt->h);
toku_block_recovery_set_unused_blocks(pair->brt->h->blocktable, newunused);
}
static int toku_recover_checkpoint (LSN UU(lsn)) {
return 0;
}
......
......@@ -9,10 +9,19 @@
#include "includes.h"
int main (int argc, char *argv[]) {
assert(argc==2);
int main (int argc, const char *argv[]) {
const char *data_dir, *log_dir;
if (argc==3) {
data_dir = argv[1];
log_dir = argv[2];
} else if (argc==2) {
data_dir = log_dir = argv[1];
} else {
printf("Usage: %s <datadir> [ <logdir> ]\n", argv[0]);
exit(1);
}
int r=tokudb_recover(".", argv[1]);
int r=tokudb_recover(data_dir, log_dir);
if (r!=0) {
fprintf(stderr, "Recovery failed\n");
exit(1);
......
......@@ -133,6 +133,15 @@ static inline void wbuf_LOGGEDBRTHEADER (struct wbuf *w, LOGGEDBRTHEADER h) {
wbuf_bytes (w, h.u.many.names[i], (u_int32_t)(1+strlen(h.u.many.names[i])));
}
}
wbuf_BLOCKNUM(w, h.btt_size);
wbuf_DISKOFF(w, h.btt_diskoff);
{
BLOCKNUM i;
for (i.b=0; i.b<h.btt_size.b; i.b++) {
wbuf_DISKOFF(w, h.btt_pairs[i.b].off);
wbuf_int(w, h.btt_pairs[i.b].size);
}
}
}
static inline void wbuf_INTPAIRARRAY (struct wbuf *w, INTPAIRARRAY h) {
......
......@@ -73,10 +73,6 @@ BDB_TESTS = $(patsubst %.c,%.bdb$(BINSUF),$(filter-out $(patsubst %,%.c,$(BDB_DO
TDB_TESTS_THAT_SHOULD_FAIL= \
test_groupcommit_count \
test-recover1 \
test-recover2 \
test-recover3 \
test_txn_recover3 \
test944 \
test_truncate_txn_abort \
test_truncate_subdb \
......@@ -87,15 +83,6 @@ TDB_TESTS_THAT_SHOULD_FAIL= \
#\ ends prev line
TDB_TESTS_THAT_SHOULD_FAIL_LIT= \
test_log2.recover \
test_log3.recover \
test_log4.recover \
test_log5.recover \
test_log6.recover \
test_log7.recover \
test_log8.recover \
test_log9.recover \
test_log10.recover \
#\ ends prev line
ALL_TESTS = $(TDB_TESTS)
......@@ -159,11 +146,16 @@ TDBVGRIND=$(VGRIND)
$(TDBVGRIND) ./$< $(VERBVERBOSE) $(MAYBEINVERTER) $(SUMMARIZE_CMD)
%.recover: %.tdb$(BINSUF)
$(VGRIND) ./$< && \
rm -rf dir.$*.c.tdb.recover && \
mkdir dir.$*.c.tdb.recover && \
(cd dir.$*.c.tdb.recover && $(VGRIND) ../../../newbrt/tdb-recover ../dir.$*.c.tdb ) && \
diff dir.$*.c.tdb dir.$*.c.tdb.recover/foo.db \
echo doing ./$< &&\
$(VGRIND) ./$< && \
rm -rf dir.$*.c.tdb.recover && \
mkdir dir.$*.c.tdb.recover && \
echo doing recovery &&\
$(VGRIND) ../../newbrt/tdb-recover dir.$*.c.tdb.recover dir.$*.c.tdb && \
echo dump and compare &&\
../../utils/tokudb_dump -h dir.$*.c.tdb foo.db > dir.$*.c.tdb/foo.dump && \
../../utils/tokudb_dump -h dir.$*.c.tdb.recover foo.db > dir.$*.c.tdb.recover/foo.dump && \
diff dir.$*.c.tdb/foo.dump dir.$*.c.tdb.recover/foo.dump \
$(MAYBEINVERTER) $(SUMMARIZE_CMD)
%.recoverwc: %.tdb$(BINSUF)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment