Commit ad623fbf authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

make {{{DB_ENV->log_archive}}} return something (but it's not quite right). ...

make {{{DB_ENV->log_archive}}} return something (but it's not quite right).  Clean up the serialization of the brt header.  Fix up {{{DB_ENV->txn_checkpoint}}} (but it's not quite right).  Addresses #75, #83, #392.

git-svn-id: file:///svn/tokudb@3000 c7de825b-a66e-492c-adef-691d508d4ae1
parent 178fb823
......@@ -1512,7 +1512,22 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char
t->h->names=0;
t->h->roots=0;
}
if ((r=toku_logger_log_header(txn, toku_cachefile_filenum(t->cf), t->h))) { goto died6; }
{
LOGGEDBRTHEADER lh = {.size= toku_serialize_brt_header_size(t->h),
.flags = t->h->flags,
.nodesize = t->h->nodesize,
.freelist = t->h->freelist,
.unused_memory = t->h->unused_memory,
.n_named_roots = t->h->n_named_roots };
if (t->h->n_named_roots>0) {
lh.u.many.names = t->h->names;
lh.u.many.roots = t->h->roots;
} else {
lh.u.one.root = t->h->unnamed_root;
}
if ((r=toku_log_fheader(toku_txn_logger(txn), 0, toku_txn_get_txnid(txn), toku_cachefile_filenum(t->cf), lh))) { goto died6; }
}
if ((r=setup_initial_brt_root_node(t, t->nodesize, toku_txn_logger(txn)))!=0) { died6: if (dbname) goto died5; else goto died2; }
if ((r=toku_cachetable_put(t->cf, 0, t->h, 0, toku_brtheader_flush_callback, toku_brtheader_fetch_callback, 0))) { goto died6; }
}
......
......@@ -53,7 +53,7 @@ typedef struct loggedbrtheader {
u_int32_t nodesize;
DISKOFF freelist;
DISKOFF unused_memory;
u_int32_t n_named_roots; // -1 for the union below to be "one".
int32_t n_named_roots; // -1 for the union below to be "one".
union {
struct {
char **names;
......
......@@ -62,6 +62,7 @@ struct tokulogger {
// To access these, you must have the output lock
LSN written_lsn; // the last lsn written
LSN fsynced_lsn; // What is the LSN of the highest fsynced log entry
LSN checkpoint_lsns[2]; // What are the LSNs of the most recent checkpoints. checkpoint_lsn[0] is the most recent one.
long long next_log_file_number;
char buf[LOGGER_BUF_SIZE]; // used to marshall logbytes so we can use only a single write
int n_in_file;
......@@ -69,20 +70,7 @@ struct tokulogger {
};
int toku_logger_find_next_unused_log_file(const char *directory, long long *result);
int toku_logger_find_logfiles (const char *directory, int *n_resultsp, char ***resultp);
enum lt_command {
LT_COMMIT = 'C',
LT_DELETE = 'D',
LT_FCREATE = 'F',
LT_FHEADER = 'H',
LT_INSERT_WITH_NO_OVERWRITE = 'I',
LT_NEWBRTNODE = 'N',
LT_FOPEN = 'O',
LT_CHECKPOINT = 'P',
LT_BLOCK_RENAME = 'R',
LT_UNLINK = 'U'
};
int toku_logger_find_logfiles (const char *directory, char ***resultp);
struct tokutxn {
enum typ_tag tag;
......@@ -121,8 +109,18 @@ static inline int toku_logsizeof_BYTESTRING (BYTESTRING bs) {
}
static inline int toku_logsizeof_LOGGEDBRTHEADER (LOGGEDBRTHEADER bs) {
assert(bs.n_named_roots=0);
return 4+4+4+8+8+4+8;
int in_both = 4+4+4+8+8+4;
if (bs.n_named_roots==-1)
return in_both+8;
else {
int sum_of_pieces=0;
int i;
for (i=0; i<bs.n_named_roots; i++) {
sum_of_pieces += 8+1+strlen(bs.u.many.names[i]);
}
return in_both+sum_of_pieces;
}
}
static inline int toku_logsizeof_INTPAIRARRAY (INTPAIRARRAY pa) {
......
......@@ -7,6 +7,7 @@
#include <errno.h>
#include <fcntl.h>
#include <inttypes.h>
#include <limits.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
......@@ -43,8 +44,8 @@ int logfilenamecompare (const void *ap, const void *bp) {
return strcmp(a,b);
}
int toku_logger_find_logfiles (const char *directory, int *n_resultsp, char ***resultp) {
int result_limit=1;
int toku_logger_find_logfiles (const char *directory, char ***resultp) {
int result_limit=2;
int n_results=0;
char **MALLOC_N(result_limit, result);
struct dirent *de;
......@@ -56,7 +57,7 @@ int toku_logger_find_logfiles (const char *directory, int *n_resultsp, char ***r
long long thisl;
int r = sscanf(de->d_name, "log%llu.tokulog", &thisl);
if (r!=1) continue; // Skip over non-log files.
if (n_results>=result_limit) {
if (n_results+1>=result_limit) {
result_limit*=2;
result = toku_realloc(result, result_limit*sizeof(*result));
}
......@@ -67,8 +68,8 @@ int toku_logger_find_logfiles (const char *directory, int *n_resultsp, char ***r
}
// Return them in increasing order.
qsort(result, n_results, sizeof(result[0]), logfilenamecompare);
*n_resultsp = n_results;
*resultp = result;
result[n_results]=0; // make a trailing null
return closedir(d);
}
......@@ -85,6 +86,7 @@ int toku_logger_create (TOKULOGGER *resultp) {
result->n_in_buf=0;
result->n_in_file=0;
result->directory=0;
result->checkpoint_lsns[0]=result->checkpoint_lsns[1]=(LSN){0};
*resultp=result;
r = ml_init(&result->input_lock); if (r!=0) goto died0;
r = ml_init(&result->output_lock); if (r!=0) goto died1;
......@@ -370,19 +372,9 @@ int toku_logger_commit (TOKUTXN txn, int nosync) {
return r;
}
int toku_logger_log_checkpoint (TOKULOGGER logger, LSN *lsn) {
int toku_logger_log_checkpoint (TOKULOGGER logger) {
if (logger->is_panicked) return EINVAL;
struct wbuf wbuf;
const int buflen =18;
struct logbytes *lbytes = MALLOC_LOGBYTES(buflen);
if (lbytes==0) return errno;
wbuf_init(&wbuf, &lbytes->bytes[0], buflen);
wbuf_char(&wbuf, LT_CHECKPOINT);
wbuf_LSN (&wbuf, logger->lsn);
*lsn = lbytes->lsn = logger->lsn;
logger->lsn.lsn++;
return toku_logger_finish(logger, lbytes, &wbuf, 1);
return toku_log_checkpoint(logger, 1);
}
int toku_logger_txn_begin (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TXNID txnid64, TOKULOGGER logger) {
......@@ -418,38 +410,6 @@ int toku_logger_log_fopen (TOKUTXN txn, const char * fname, FILENUM filenum) {
return toku_log_fopen (txn->logger, 0, toku_txn_get_txnid(txn), bs, filenum);
}
int toku_logger_log_header (TOKUTXN txn, FILENUM filenum, struct brt_header *h) {
if (txn==0) return 0;
if (txn->logger->is_panicked) return EINVAL;
int subsize=toku_serialize_brt_header_size(h);
int buflen = (4 // firstlen
+ 1 //cmd
+ 8 // lsn
+ 8 // txnid
+ 4 // filenum
+ subsize
+ 8 // crc & len
);
struct logbytes *lbytes=MALLOC_LOGBYTES(buflen); // alloc on heap because it might be big
int r;
if (lbytes==0) return errno;
struct wbuf wbuf;
r = ml_lock(&txn->logger->input_lock); if (r!=0) { txn->logger->is_panicked=1; txn->logger->panic_errno=r; return r; }
LSN lsn = txn->logger->lsn;
wbuf_init(&wbuf, &lbytes->bytes[0], buflen);
wbuf_int (&wbuf, buflen);
wbuf_char(&wbuf, LT_FHEADER);
wbuf_LSN (&wbuf, lsn);
lbytes->lsn = lsn;
txn->logger->lsn.lsn++;
wbuf_TXNID(&wbuf, txn->txnid64);
wbuf_FILENUM(&wbuf, filenum);
r=toku_serialize_brt_header_to_wbuf(&wbuf, h);
if (r!=0) return r;
r=toku_logger_finish(txn->logger, lbytes, &wbuf, 0);
return r;
}
int toku_fread_u_int8_t_nocrclen (FILE *f, u_int8_t *v) {
int vi=fgetc(f);
if (vi==EOF) return -1;
......@@ -494,6 +454,13 @@ int toku_fread_u_int32_t (FILE *f, u_int32_t *v, u_int32_t *crc, u_int32_t *len)
(c3<<0));
return 0;
}
int toku_fread_int32_t (FILE *f, int32_t *v, u_int32_t *crc, u_int32_t *len) {
u_int32_t uv;
int r = toku_fread_u_int32_t(f, &uv, crc, len);
int32_t rv = uv;
if (r==0) *v=rv;
return r;
}
int toku_fread_u_int64_t (FILE *f, u_int64_t *v, u_int32_t *crc, u_int32_t *len) {
u_int32_t v1,v2;
......@@ -540,8 +507,8 @@ int toku_fread_LOGGEDBRTHEADER (FILE *f, LOGGEDBRTHEADER *v, u_int32_t *crc, u_i
r = toku_fread_u_int32_t(f, &v->nodesize, crc, len); if (r!=0) return r;
r = toku_fread_DISKOFF (f, &v->freelist, crc, len); if (r!=0) return r;
r = toku_fread_DISKOFF (f, &v->unused_memory, crc, len); if (r!=0) return r;
r = toku_fread_u_int32_t(f, &v->n_named_roots, crc, len); if (r!=0) return r;
assert((signed)v->n_named_roots==-1);
r = toku_fread_int32_t (f, &v->n_named_roots, crc, len); if (r!=0) return r;
assert(v->n_named_roots==-1);
r = toku_fread_DISKOFF (f, &v->u.one.root, crc, len); if (r!=0) return r;
return 0;
}
......@@ -735,3 +702,75 @@ int toku_set_func_fsync (int (*fsync_function)(int)) {
toku_os_fsync_function = fsync_function;
return 0;
}
// Find the earliest LSN in a log
static int peek_at_log (TOKULOGGER logger, char* filename, LSN *first_lsn) {
logger=logger;
int fd = open(filename, O_RDONLY);
if (fd<0) { printf("couldn't open: %s\n", strerror(errno)); assert(fd>=0); return errno; }
enum { SKIP = 12+1+4 }; // read the 12 byte header, the first cmd, and the first len
unsigned char header[SKIP+8];
int r = read(fd, header, SKIP+8);
if (r!=SKIP+8) return 0; // cannot determine that it's archivable, so we'll assume no. If a later-log is archivable is then this one will be too.
u_int64_t lsn = 0;
int i;
for (i=0; i<8; i++) lsn=(lsn<<8)+header[SKIP+i];
r=close(fd);
if (r!=0) { return 0; }
first_lsn->lsn=lsn;
return 0;
}
// Return a malloc'd array of malloc'd strings which are the filenames that can be archived.
int toku_logger_log_archive (TOKULOGGER logger, char ***logs_p, int flags) {
if (flags!=0) return EINVAL; // don't know what to do.
int all_n_logs;
int i;
char **all_logs;
int r = toku_logger_find_logfiles (logger->directory, &all_logs);
if (r!=0) return r;
for (i=0; all_logs[i]; i++);
all_n_logs=i;
// get them into increasing order
qsort(all_logs, all_n_logs, sizeof(all_logs[0]), logfilenamecompare);
// Now starting at the last one, look for archivable ones.
// Count the total number of bytes, because we have to return a single big array. (That's the BDB interface. Bleah...)
LSN earliest_lsn_seen={(unsigned long long)(-1LL)};
r = peek_at_log(logger, all_logs[all_n_logs-1], &earliest_lsn_seen); // try to find the lsn that's in the most recent log
for (i=all_n_logs-2; i>=0; i--) { // start at all_n_logs-2 because we never archive the most recent log
r = peek_at_log(logger, all_logs[i], &earliest_lsn_seen);
if (r!=0) continue; // In case of error, just keep going
if ((earliest_lsn_seen.lsn <= logger->checkpoint_lsns[0].lsn)&&
(earliest_lsn_seen.lsn <= logger->checkpoint_lsns[1].lsn)) {
break;
}
}
// all log files up to, but not including i can be archived.
int n_to_archive=i;
int count_bytes=0;
for (i=0; i<n_to_archive; i++) {
count_bytes+=1+strlen(all_logs[i]);
}
char **result = toku_malloc((1+n_to_archive)*sizeof(*result) + count_bytes);
char *base = (char*)(result+1+n_to_archive);
for (i=0; i<n_to_archive; i++) {
int len=1+strlen(all_logs[i]);
result[i]=base;
memcpy(base, all_logs[i], len);
base+=len;
free(all_logs[i]);
}
for (; all_logs[i]; i++) {
free(all_logs[i]);
}
free(all_logs);
result[i]=0;
*logs_p = result;
return 0;
}
......@@ -24,7 +24,7 @@ void toku_logger_set_cachetable (TOKULOGGER, CACHETABLE);
int toku_logger_open(const char */*directory*/, TOKULOGGER);
int toku_logger_log_bytes(TOKULOGGER logger, struct logbytes *bytes, int do_fsync);
int toku_logger_close(TOKULOGGER *logger);
int toku_logger_log_checkpoint (TOKULOGGER, LSN*);
int toku_logger_log_checkpoint (TOKULOGGER);
void toku_logger_panic(TOKULOGGER, int/*err*/);
int toku_logger_panicked(TOKULOGGER /*logger*/);
int toku_logger_is_open(TOKULOGGER);
......@@ -124,11 +124,11 @@ static inline int toku_copy_LOGGEDBRTHEADER(LOGGEDBRTHEADER *target, LOGGEDBRTHE
if (target->u.many.names==0) { r=errno; if (0) { died0: toku_free(target->u.many.names); } return r; }
target->u.many.roots = toku_memdup(target->u.many.roots, val.n_named_roots*sizeof(target->u.many.roots[0]));
if (target->u.many.roots==0) { r=errno; if (0) { died1: toku_free(target->u.many.names); } goto died0; }
u_int32_t i;
int32_t i;
for (i=0; i<val.n_named_roots; i++) {
target->u.many.names[i] = toku_strdup(target->u.many.names[i]);
if (target->u.many.names[i]==0) {
u_int32_t j;
int32_t j;
r=errno;
for (j=0; j<i; j++) toku_free(target->u.many.names[j]);
goto died1;
......@@ -139,7 +139,7 @@ static inline int toku_copy_LOGGEDBRTHEADER(LOGGEDBRTHEADER *target, LOGGEDBRTHE
}
static inline void toku_free_LOGGEDBRTHEADER(LOGGEDBRTHEADER val) {
if ((int32_t)val.n_named_roots==-1) return;
u_int32_t i;
int32_t i;
for (i=0; i<val.n_named_roots; i++) {
toku_free(val.u.many.names[i]);
}
......@@ -157,5 +157,6 @@ int toku_logger_abort(TOKUTXN);
int toku_txnid2txn (TOKULOGGER logger, TXNID txnid, TOKUTXN *result);
int tokudb_recover(const char *datadir, const char *logdir);
int toku_logger_log_archive (TOKULOGGER logger, char ***logs_p, int flags);
#endif
......@@ -57,6 +57,7 @@ const struct logtype rollbacks[] = {
};
const struct logtype logtypes[] = {
{"checkpoint", 'x', FA{NULLFIELD}},
{"commit", 'C', FA{{"TXNID", "txnid", 0},NULLFIELD}},
#if 0
{"tl_delete", 'D', FA{{"FILENUM", "filenum", 0}, // tl logentries can be used, by themselves, to rebuild the whole DB from scratch.
......
......@@ -523,11 +523,13 @@ void toku_recover_changeunusedmemory (LSN UU(lsn), FILENUM filenum, DISKOFF UU(o
r = toku_unpin_brt_header(pair->brt);
}
static int toku_recover_checkpoint (LSN UU(lsn)) {
return 0;
}
int tokudb_recover(const char *data_dir, const char *log_dir) {
int r;
int entrycount=0;
int n_logfiles;
char **logfiles;
int lockfd;
......@@ -549,7 +551,7 @@ int tokudb_recover(const char *data_dir, const char *log_dir) {
}
}
r = toku_logger_find_logfiles(log_dir, &n_logfiles, &logfiles);
r = toku_logger_find_logfiles(log_dir, &logfiles);
if (r!=0) return r;
int i;
toku_recover_init();
......@@ -566,7 +568,7 @@ int tokudb_recover(const char *data_dir, const char *log_dir) {
assert(wd!=0);
//printf("%s:%d data_wd=\"%s\"\n", __FILE__, __LINE__, data_wd);
}
for (i=0; i<n_logfiles; i++) {
for (i=0; logfiles[i]; i++) {
//fprintf(stderr, "Opening %s\n", logfiles[i]);
r=chdir(org_wd);
assert(r==0);
......@@ -595,7 +597,7 @@ int tokudb_recover(const char *data_dir, const char *log_dir) {
fclose(f);
}
toku_recover_cleanup();
for (i=0; i<n_logfiles; i++) {
for (i=0; logfiles[i]; i++) {
toku_free(logfiles[i]);
}
toku_free(logfiles);
......
......@@ -29,7 +29,7 @@ struct wbuf {
#endif
};
static void wbuf_init (struct wbuf *w, void *buf, DISKOFF size) {
static inline void wbuf_init (struct wbuf *w, void *buf, DISKOFF size) {
w->buf=buf;
w->size=size;
w->ndone=0;
......@@ -129,7 +129,7 @@ static inline void wbuf_LOGGEDBRTHEADER (struct wbuf *w, LOGGEDBRTHEADER h) {
if ((signed)h.n_named_roots==-1) {
wbuf_DISKOFF(w, h.u.one.root);
} else {
unsigned int i;
int i;
for (i=0; i<h.n_named_roots; i++) {
wbuf_DISKOFF(w, h.u.many.roots[i]);
wbuf_bytes (w, h.u.many.names[i], 1+strlen(h.u.many.names[i]));
......
......@@ -394,9 +394,7 @@ static int toku_env_close(DB_ENV * env, u_int32_t flags) {
}
static int toku_env_log_archive(DB_ENV * env, char **list[], u_int32_t flags) {
env=env; flags=flags; // Suppress compiler warnings.
*list = NULL;
return 0;
return toku_logger_log_archive(env->i->logger, list, flags);
}
static int toku_env_log_flush(DB_ENV * env, const DB_LSN * lsn __attribute__((__unused__))) {
......@@ -594,9 +592,8 @@ static int toku_env_set_verbose(DB_ENV * env, u_int32_t which, int onoff) {
return 1;
}
static int toku_env_txn_checkpoint(DB_ENV * env, u_int32_t kbyte, u_int32_t min, u_int32_t flags) {
env=env; kbyte=kbyte; min=min; flags=flags;
return 0;
static int toku_env_txn_checkpoint(DB_ENV * env, u_int32_t kbyte __attribute__((__unused__)), u_int32_t min __attribute__((__unused__)), u_int32_t flags __attribute__((__unused__))) {
return toku_logger_log_checkpoint(env->i->logger);
}
static int toku_env_txn_stat(DB_ENV * env, DB_TXN_STAT ** statp, u_int32_t flags) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment