Commit 39498daf authored by Dave Wells's avatar Dave Wells Committed by Yoni Fogel

MERGE tokudb/toku/tokudb/1857 into main, refs [t:1857]

git-svn-id: file:///svn/toku/tokudb@13431 c7de825b-a66e-492c-adef-691d508d4ae1
parent 4120994e
......@@ -52,6 +52,7 @@ BRT_SOURCES = \
leaflock \
logger \
log_code \
logcursor \
memarena \
mempool \
minicron \
......
......@@ -44,6 +44,7 @@
#include "leafentry.h"
#include "log-internal.h"
#include "log_header.h"
#include "logcursor.h"
#include "mempool.h"
#include "rbuf.h"
#include "threadpool.h"
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id: logcursor.c 13196 2009-07-10 14:41:51Z zardosht $"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "includes.h"
struct toku_logcursor {
struct log_entry entry;
char *logdir;
char **logfiles;
int n_logfiles;
int cur_logfiles_index;
FILE *cur_fp;
int is_open;
struct log_entry *cur_le;
struct log_entry cur_log_entry;
};
static int lc_close_cur_logfile(TOKULOGCURSOR lc) {
int r=0;
if ( lc->is_open ) {
r = fclose(lc->cur_fp);
assert(0==r);
lc->is_open = 0;
}
return 0;
}
static int lc_open_logfile(TOKULOGCURSOR lc, int index) {
int r=0;
assert( lc->is_open == 0 );
lc->cur_fp = fopen(lc->logfiles[index], "r");
if ( lc->cur_fp == NULL )
return DB_NOTFOUND;
// position fp past header
unsigned int version=0;
r = toku_read_logmagic(lc->cur_fp, &version);
if (r!=0)
return DB_BADFORMAT;
if (version != 1)
return DB_BADFORMAT;
// mark as open
lc->is_open = 1;
return r;
}
// toku_logcursor_create()
// - returns a pointer to a logcursor
int toku_logcursor_create(TOKULOGCURSOR *lc, const char *log_dir) {
int failresult=0;
int r=0;
// malloc a cursor
TOKULOGCURSOR cursor = (TOKULOGCURSOR) toku_malloc(sizeof(struct toku_logcursor));
if ( NULL==cursor )
return ENOMEM;
// find logfiles in logdir
cursor->cur_le = NULL;
cursor->is_open = 0;
cursor->cur_logfiles_index = 0;
// cursor->logdir = (char *) toku_malloc(strlen(log_dir)+1);
cursor->logdir = (char *) toku_malloc(strlen(log_dir));
if ( NULL==cursor->logdir )
return ENOMEM;
strcpy(cursor->logdir, log_dir);
r = toku_logger_find_logfiles(cursor->logdir, &(cursor->logfiles), &(cursor->n_logfiles));
if (r!=0) {
failresult=r;
goto fail;
}
*lc = cursor;
return r;
fail:
toku_logcursor_destroy(&cursor);
*lc = NULL;
return failresult;
}
int toku_logcursor_destroy(TOKULOGCURSOR *lc) {
int r=0;
r = lc_close_cur_logfile(*lc);
toku_free((*lc)->logdir);
toku_free(*lc);
*lc = NULL;
return r;
}
int toku_logcursor_current(TOKULOGCURSOR lc, struct log_entry *le) {
if ( lc->cur_le == NULL )
return DB_NOTFOUND;
*le = lc->cur_log_entry;
return 0;
}
int toku_logcursor_next(TOKULOGCURSOR lc, struct log_entry *le) {
int r=0;
if ( lc->cur_le == NULL ) {
r = toku_logcursor_first(lc, le);
return r;
}
r = toku_log_fread(lc->cur_fp, le);
while ( EOF == r ) {
// move to next file
r = lc_close_cur_logfile(lc);
if (r!=0)
return r;
if ( lc->cur_logfiles_index == lc->n_logfiles-1)
return DB_NOTFOUND;
lc->cur_logfiles_index++;
r = lc_open_logfile(lc, lc->cur_logfiles_index);
if (r!= 0)
return r;
r = toku_log_fread(lc->cur_fp, le);
}
if (r!=0) {
if (r==DB_BADFORMAT) {
fprintf(stderr, "Bad log format in %s\n", lc->logfiles[lc->cur_logfiles_index]);
return r;
} else {
fprintf(stderr, "Unexpected log format error '%s' in %s\n", strerror(r), lc->logfiles[lc->cur_logfiles_index]);
return r;
}
}
lc->cur_log_entry = *le;
return r;
}
int toku_logcursor_prev(TOKULOGCURSOR lc, struct log_entry *le) {
int r=0;
if ( lc->cur_le == NULL ) {
r = toku_logcursor_last(lc, le);
return r;
}
r = toku_log_fread_backward(lc->cur_fp, le);
while ( -1 == r) { // if within header length of top of file
// move to previous file
r = lc_close_cur_logfile(lc);
if (r!=0)
return r;
if ( lc->cur_logfiles_index == 0 )
return DB_NOTFOUND;
lc->cur_logfiles_index--;
r = lc_open_logfile(lc, lc->cur_logfiles_index);
if (r!=0)
return r;
r = toku_log_fread_backward(lc->cur_fp, le);
}
if (r!=0) {
if (r==DB_BADFORMAT) {
fprintf(stderr, "Bad log format in %s\n", lc->logfiles[lc->cur_logfiles_index]);
return r;
} else {
fprintf(stderr, "Unexpected log format error '%s' in %s\n", strerror(r), lc->logfiles[lc->cur_logfiles_index]);
return r;
}
}
lc->cur_log_entry = *le;
return 0;
}
int toku_logcursor_first(TOKULOGCURSOR lc, struct log_entry *le) {
int r=0;
// close any but the first log file
if ( lc->cur_logfiles_index != 0 ) {
lc_close_cur_logfile(lc);
}
// open first log file if needed
if ( !lc->is_open ) {
r = lc_open_logfile(lc, 0);
if (r!=0)
return r;
lc->cur_logfiles_index = 0;
}
r = toku_log_fread(lc->cur_fp, le);
if (r!=0)
return r;
// copy into cur_le
lc->cur_log_entry = *le;
lc->cur_le = &lc->cur_log_entry;
return r;
}
int toku_logcursor_last(TOKULOGCURSOR lc, struct log_entry *le) {
int r=0;
// close any but last log file
if ( lc->cur_logfiles_index != lc->n_logfiles-1 ) {
lc_close_cur_logfile(lc);
}
// open last log file if needed
if ( !lc->is_open ) {
r = lc_open_logfile(lc, lc->n_logfiles-1);
if (r!=0)
return r;
lc->cur_logfiles_index = lc->n_logfiles-1;
}
// seek to end
r = fseek(lc->cur_fp, 0, SEEK_END);
assert(0==r);
// read backward
r = toku_log_fread_backward(lc->cur_fp, le);
if (r!=0)
return r;
// copy into cur_le
lc->cur_log_entry = *le;
lc->cur_le = &lc->cur_log_entry;
return r;
}
#ifndef TOKULOGCURSOR_H
#define TOKULOGCURSOR_H
#ident "$Id: logcursor.h 12375 2009-05-28 14:14:47Z yfogel $"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "log_header.h"
typedef struct log_entry * TOKULOGENTRY;
struct toku_logcursor;
typedef struct toku_logcursor *TOKULOGCURSOR;
// toku_logcursor_create()
// - returns a pointer to a logcursor
int toku_logcursor_create(TOKULOGCURSOR *lc, const char *log_dir);
int toku_logcursor_destroy(TOKULOGCURSOR *lc);
// returns 0 on success
int toku_logcursor_current(TOKULOGCURSOR lc, struct log_entry *le);
int toku_logcursor_next(TOKULOGCURSOR lc, struct log_entry *le);
int toku_logcursor_prev(TOKULOGCURSOR lc, struct log_entry *le);
int toku_logcursor_first(const TOKULOGCURSOR lc, struct log_entry *le);
int toku_logcursor_last(const TOKULOGCURSOR lc, struct log_entry *le);
#endif // TOKULOGCURSOR_H
......@@ -5,7 +5,7 @@
#include "includes.h"
static const int log_format_version=0;
static const int log_format_version=1;
static toku_pthread_mutex_t logger_mutex = TOKU_PTHREAD_MUTEX_INITIALIZER;
......@@ -259,6 +259,7 @@ int toku_logger_find_logfiles (const char *directory, char ***resultp, int *n_lo
int result_limit=2;
int n_results=0;
char **MALLOC_N(result_limit, result);
assert(result!= NULL);
struct dirent *de;
DIR *d=opendir(directory);
if (d==0) {
......@@ -273,9 +274,12 @@ int toku_logger_find_logfiles (const char *directory, char ***resultp, int *n_lo
if (n_results+1>=result_limit) {
result_limit*=2;
result = toku_realloc(result, result_limit*sizeof(*result));
// should we try to recover here?
assert(result!=NULL);
}
int fnamelen = dirnamelen + strlen(de->d_name) + 2; // One for the slash and one for the trailing NUL.
char *fname = toku_malloc(fnamelen);
assert(fname!=NULL);
snprintf(fname, fnamelen, "%s/%s", directory, de->d_name);
result[n_results++] = fname;
}
......@@ -676,13 +680,34 @@ int toku_read_and_print_logmagic (FILE *f, u_int32_t *versionp) {
if (r!=4) {
return DB_BADFORMAT;
}
//printf("tokulog v.%d\n", toku_dtoh32(version));
printf("tokulog v.%d\n", toku_dtoh32(version));
//version MUST be in network order regardless of disk order
*versionp=toku_ntohl(version);
}
return 0;
}
int toku_read_logmagic (FILE *f, u_int32_t *versionp) {
{
char magic[8];
int r=fread(magic, 1, 8, f);
if (r!=8) {
return DB_BADFORMAT;
}
if (memcmp(magic, "tokulogg", 8)!=0) {
return DB_BADFORMAT;
}
}
{
int version;
int r=fread(&version, 1, 4, f);
if (r!=4) {
return DB_BADFORMAT;
}
*versionp=toku_ntohl(version);
}
return 0;
}
TXNID toku_txn_get_txnid (TOKUTXN txn) {
if (txn==0) return 0;
......
......@@ -53,6 +53,7 @@ int toku_logprint_BLOCKNUM (FILE *outf, FILE *inf, const char *fieldname, struct
int toku_logprint_LOGGEDBRTHEADER (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, u_int32_t *len, const char *format __attribute__((__unused__)));
int toku_logprint_INTPAIRARRAY (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, u_int32_t *len, const char *format __attribute__((__unused__)));
int toku_read_and_print_logmagic (FILE *f, u_int32_t *versionp);
int toku_read_logmagic (FILE *f, u_int32_t *versionp);
TXNID toku_txn_get_txnid (TOKUTXN txn);
LSN toku_txn_get_last_lsn (TOKUTXN txn);
......
......@@ -475,7 +475,7 @@ int tokudb_recover(const char *data_dir, const char *log_dir) {
struct log_entry le;
u_int32_t version;
//printf("Reading file %d: %s\n", j, logfiles[j]);
r=toku_read_and_print_logmagic(f, &version);
r=toku_read_logmagic(f, &version);
assert(r==0 && version==0);
go_forward: // we have an open file, so go forward.
//printf("Going forward\n");
......
......@@ -95,6 +95,7 @@ REGRESSION_TESTS_RAW = \
test-del-inorder \
test-inc-split \
test-leafentry \
test_logcursor \
test_oexcl \
test_toku_malloc_plain_free \
threadpool-test \
......
#include <toku_portability.h>
#include <string.h>
#include "test.h"
#include "brttypes.h"
#include "includes.h"
int test_main(int argc __attribute__((unused)), const char *argv[] __attribute__((unused))) {
int r=0;
char dbdir[100] = "/home/wells/svn/tokudb.1857/src/tests/dir.x1.c.tdb";
struct toku_logcursor *cursor;
struct log_entry entry;
r = toku_logcursor_create(&cursor, dbdir);
if ( r!=0 ) return r;
r = toku_logcursor_next(cursor, &entry); printf("Entry = %c\n", entry.cmd);
r = toku_logcursor_next(cursor, &entry); printf("Entry = %c\n", entry.cmd);
r = toku_logcursor_next(cursor, &entry); printf("Entry = %c\n", entry.cmd);
printf("r=%d\n", r);
r = toku_logcursor_destroy(&cursor);
r = toku_logcursor_create(&cursor, dbdir);
if ( r!=0 ) return r;
r = toku_logcursor_prev(cursor, &entry); printf("Entry = %c\n", entry.cmd);
r = toku_logcursor_prev(cursor, &entry); printf("Entry = %c\n", entry.cmd);
r = toku_logcursor_prev(cursor, &entry); printf("Entry = %c\n", entry.cmd);
printf("r=%d\n", r);
r = toku_logcursor_destroy(&cursor);
r = toku_logcursor_create(&cursor, dbdir);
if ( r!=0 ) return r;
r = toku_logcursor_next(cursor, &entry); printf("Entry = %c\n", entry.cmd);
r = toku_logcursor_next(cursor, &entry); printf("Entry = %c\n", entry.cmd);
r = toku_logcursor_next(cursor, &entry); printf("Entry = %c\n", entry.cmd);
r = toku_logcursor_prev(cursor, &entry); printf("Entry = %c\n", entry.cmd);
r = toku_logcursor_prev(cursor, &entry); printf("Entry = %c\n", entry.cmd);
r = toku_logcursor_prev(cursor, &entry); printf("Entry = %c\n", entry.cmd);
r = toku_logcursor_prev(cursor, &entry); printf("Entry = %c\n", entry.cmd);
if ( r == DB_NOTFOUND ) printf("PASS\n"); else printf("FAIL\n");
r = toku_logcursor_destroy(&cursor);
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment