Commit e57b5d01 authored by Dave Wells's avatar Dave Wells Committed by Yoni Fogel

merge second round of changes to support logcursors into main, refs[t:1857]

git-svn-id: file:///svn/toku/tokudb@13515 c7de825b-a66e-492c-adef-691d508d4ae1
parent 2ef08187
...@@ -6,15 +6,14 @@ ...@@ -6,15 +6,14 @@
#include "includes.h" #include "includes.h"
struct toku_logcursor { struct toku_logcursor {
struct log_entry entry;
char *logdir; char *logdir;
char **logfiles; char **logfiles;
int n_logfiles; int n_logfiles;
int cur_logfiles_index; int cur_logfiles_index;
FILE *cur_fp; FILE *cur_fp;
int is_open; BOOL is_open;
struct log_entry *cur_le; struct log_entry entry;
struct log_entry cur_log_entry; BOOL entry_valid;
}; };
static int lc_close_cur_logfile(TOKULOGCURSOR lc) { static int lc_close_cur_logfile(TOKULOGCURSOR lc) {
...@@ -22,13 +21,13 @@ static int lc_close_cur_logfile(TOKULOGCURSOR lc) { ...@@ -22,13 +21,13 @@ static int lc_close_cur_logfile(TOKULOGCURSOR lc) {
if ( lc->is_open ) { if ( lc->is_open ) {
r = fclose(lc->cur_fp); r = fclose(lc->cur_fp);
assert(0==r); assert(0==r);
lc->is_open = 0; lc->is_open = FALSE;
} }
return 0; return 0;
} }
static int lc_open_logfile(TOKULOGCURSOR lc, int index) { static int lc_open_logfile(TOKULOGCURSOR lc, int index) {
int r=0; int r=0;
assert( lc->is_open == 0 ); assert( !lc->is_open );
lc->cur_fp = fopen(lc->logfiles[index], "r"); lc->cur_fp = fopen(lc->logfiles[index], "r");
if ( lc->cur_fp == NULL ) if ( lc->cur_fp == NULL )
return DB_NOTFOUND; return DB_NOTFOUND;
...@@ -40,7 +39,7 @@ static int lc_open_logfile(TOKULOGCURSOR lc, int index) { ...@@ -40,7 +39,7 @@ static int lc_open_logfile(TOKULOGCURSOR lc, int index) {
if (version != 1) if (version != 1)
return DB_BADFORMAT; return DB_BADFORMAT;
// mark as open // mark as open
lc->is_open = 1; lc->is_open = TRUE;
return r; return r;
} }
...@@ -55,9 +54,9 @@ int toku_logcursor_create(TOKULOGCURSOR *lc, const char *log_dir) { ...@@ -55,9 +54,9 @@ int toku_logcursor_create(TOKULOGCURSOR *lc, const char *log_dir) {
if ( NULL==cursor ) if ( NULL==cursor )
return ENOMEM; return ENOMEM;
// find logfiles in logdir // find logfiles in logdir
cursor->cur_le = NULL; cursor->is_open = FALSE;
cursor->is_open = 0;
cursor->cur_logfiles_index = 0; cursor->cur_logfiles_index = 0;
cursor->entry_valid = FALSE;
cursor->logdir = (char *) toku_malloc(strlen(log_dir)+1); cursor->logdir = (char *) toku_malloc(strlen(log_dir)+1);
if ( NULL==cursor->logdir ) if ( NULL==cursor->logdir )
return ENOMEM; return ENOMEM;
...@@ -77,27 +76,29 @@ int toku_logcursor_create(TOKULOGCURSOR *lc, const char *log_dir) { ...@@ -77,27 +76,29 @@ int toku_logcursor_create(TOKULOGCURSOR *lc, const char *log_dir) {
int toku_logcursor_destroy(TOKULOGCURSOR *lc) { int toku_logcursor_destroy(TOKULOGCURSOR *lc) {
int r=0; int r=0;
if ( (*lc)->entry_valid )
toku_log_free_log_entry_resources(&((*lc)->entry));
r = lc_close_cur_logfile(*lc); r = lc_close_cur_logfile(*lc);
int lf;
for(lf=0;lf<(*lc)->n_logfiles;lf++) {
toku_free((*lc)->logfiles[lf]);
}
toku_free((*lc)->logfiles);
toku_free((*lc)->logdir); toku_free((*lc)->logdir);
toku_free(*lc); toku_free(*lc);
*lc = NULL; *lc = NULL;
return r; return r;
} }
int toku_logcursor_current(TOKULOGCURSOR lc, struct log_entry *le) { int toku_logcursor_next(TOKULOGCURSOR lc, struct log_entry **le) {
if ( lc->cur_le == NULL )
return DB_NOTFOUND;
*le = lc->cur_log_entry;
return 0;
}
int toku_logcursor_next(TOKULOGCURSOR lc, struct log_entry *le) {
int r=0; int r=0;
if ( lc->cur_le == NULL ) { if ( lc->entry_valid )
toku_log_free_log_entry_resources(&(lc->entry));
if ( !lc->entry_valid ) {
r = toku_logcursor_first(lc, le); r = toku_logcursor_first(lc, le);
return r; return r;
} }
r = toku_log_fread(lc->cur_fp, le); r = toku_log_fread(lc->cur_fp, &(lc->entry));
while ( EOF == r ) { while ( EOF == r ) {
// move to next file // move to next file
r = lc_close_cur_logfile(lc); r = lc_close_cur_logfile(lc);
...@@ -109,7 +110,7 @@ int toku_logcursor_next(TOKULOGCURSOR lc, struct log_entry *le) { ...@@ -109,7 +110,7 @@ int toku_logcursor_next(TOKULOGCURSOR lc, struct log_entry *le) {
r = lc_open_logfile(lc, lc->cur_logfiles_index); r = lc_open_logfile(lc, lc->cur_logfiles_index);
if (r!= 0) if (r!= 0)
return r; return r;
r = toku_log_fread(lc->cur_fp, le); r = toku_log_fread(lc->cur_fp, &(lc->entry));
} }
if (r!=0) { if (r!=0) {
if (r==DB_BADFORMAT) { if (r==DB_BADFORMAT) {
...@@ -120,17 +121,19 @@ int toku_logcursor_next(TOKULOGCURSOR lc, struct log_entry *le) { ...@@ -120,17 +121,19 @@ int toku_logcursor_next(TOKULOGCURSOR lc, struct log_entry *le) {
return r; return r;
} }
} }
lc->cur_log_entry = *le; *le = &(lc->entry);
return r; return r;
} }
int toku_logcursor_prev(TOKULOGCURSOR lc, struct log_entry *le) { int toku_logcursor_prev(TOKULOGCURSOR lc, struct log_entry **le) {
int r=0; int r=0;
if ( lc->cur_le == NULL ) { if ( lc->entry_valid )
toku_log_free_log_entry_resources(&(lc->entry));
if ( !lc->entry_valid ) {
r = toku_logcursor_last(lc, le); r = toku_logcursor_last(lc, le);
return r; return r;
} }
r = toku_log_fread_backward(lc->cur_fp, le); r = toku_log_fread_backward(lc->cur_fp, &(lc->entry));
while ( -1 == r) { // if within header length of top of file while ( -1 == r) { // if within header length of top of file
// move to previous file // move to previous file
r = lc_close_cur_logfile(lc); r = lc_close_cur_logfile(lc);
...@@ -142,7 +145,7 @@ int toku_logcursor_prev(TOKULOGCURSOR lc, struct log_entry *le) { ...@@ -142,7 +145,7 @@ int toku_logcursor_prev(TOKULOGCURSOR lc, struct log_entry *le) {
r = lc_open_logfile(lc, lc->cur_logfiles_index); r = lc_open_logfile(lc, lc->cur_logfiles_index);
if (r!=0) if (r!=0)
return r; return r;
r = toku_log_fread_backward(lc->cur_fp, le); r = toku_log_fread_backward(lc->cur_fp, &(lc->entry));
} }
if (r!=0) { if (r!=0) {
if (r==DB_BADFORMAT) { if (r==DB_BADFORMAT) {
...@@ -153,12 +156,14 @@ int toku_logcursor_prev(TOKULOGCURSOR lc, struct log_entry *le) { ...@@ -153,12 +156,14 @@ int toku_logcursor_prev(TOKULOGCURSOR lc, struct log_entry *le) {
return r; return r;
} }
} }
lc->cur_log_entry = *le; *le = &(lc->entry);
return 0; return r;
} }
int toku_logcursor_first(TOKULOGCURSOR lc, struct log_entry *le) { int toku_logcursor_first(TOKULOGCURSOR lc, struct log_entry **le) {
int r=0; int r=0;
if ( lc->entry_valid )
toku_log_free_log_entry_resources(&(lc->entry));
// close any but the first log file // close any but the first log file
if ( lc->cur_logfiles_index != 0 ) { if ( lc->cur_logfiles_index != 0 ) {
lc_close_cur_logfile(lc); lc_close_cur_logfile(lc);
...@@ -170,17 +175,18 @@ int toku_logcursor_first(TOKULOGCURSOR lc, struct log_entry *le) { ...@@ -170,17 +175,18 @@ int toku_logcursor_first(TOKULOGCURSOR lc, struct log_entry *le) {
return r; return r;
lc->cur_logfiles_index = 0; lc->cur_logfiles_index = 0;
} }
r = toku_log_fread(lc->cur_fp, le); r = toku_log_fread(lc->cur_fp, &(lc->entry));
if (r!=0) if (r!=0)
return r; return r;
// copy into cur_le lc->entry_valid = TRUE;
lc->cur_log_entry = *le; *le = &(lc->entry);
lc->cur_le = &lc->cur_log_entry;
return r; return r;
} }
int toku_logcursor_last(TOKULOGCURSOR lc, struct log_entry *le) { int toku_logcursor_last(TOKULOGCURSOR lc, struct log_entry **le) {
int r=0; int r=0;
if ( lc->entry_valid )
toku_log_free_log_entry_resources(&(lc->entry));
// close any but last log file // close any but last log file
if ( lc->cur_logfiles_index != lc->n_logfiles-1 ) { if ( lc->cur_logfiles_index != lc->n_logfiles-1 ) {
lc_close_cur_logfile(lc); lc_close_cur_logfile(lc);
...@@ -196,14 +202,10 @@ int toku_logcursor_last(TOKULOGCURSOR lc, struct log_entry *le) { ...@@ -196,14 +202,10 @@ int toku_logcursor_last(TOKULOGCURSOR lc, struct log_entry *le) {
r = fseek(lc->cur_fp, 0, SEEK_END); r = fseek(lc->cur_fp, 0, SEEK_END);
assert(0==r); assert(0==r);
// read backward // read backward
r = toku_log_fread_backward(lc->cur_fp, le); r = toku_log_fread_backward(lc->cur_fp, &(lc->entry));
if (r!=0) if (r!=0)
return r; return r;
// copy into cur_le lc->entry_valid = TRUE;
lc->cur_log_entry = *le; *le = &(lc->entry);
lc->cur_le = &lc->cur_log_entry;
return r; return r;
} }
...@@ -7,22 +7,29 @@ ...@@ -7,22 +7,29 @@
#include "log_header.h" #include "log_header.h"
typedef struct log_entry * TOKULOGENTRY;
struct toku_logcursor; struct toku_logcursor;
typedef struct toku_logcursor *TOKULOGCURSOR; typedef struct toku_logcursor *TOKULOGCURSOR;
// All routines return 0 on success
// toku_logcursor_create() // toku_logcursor_create()
// - returns a pointer to a logcursor // - returns a pointer to a logcursor
// - following toku_logcursor_create()
// if toku_logcursor_next() is called, it returns the first entry in the log
// if toku_logcursor_prev() is called, it returns the last entry in the log
int toku_logcursor_create(TOKULOGCURSOR *lc, const char *log_dir); int toku_logcursor_create(TOKULOGCURSOR *lc, const char *log_dir);
// toku_logcursor_destroy()
// - frees all resources associated with the logcursor, including the log_entry
// associated with the latest cursor action
int toku_logcursor_destroy(TOKULOGCURSOR *lc); int toku_logcursor_destroy(TOKULOGCURSOR *lc);
// returns 0 on success // toku_logcursor_[next,prev,first,last] take care of malloc'ing and free'ing log_entrys.
int toku_logcursor_current(TOKULOGCURSOR lc, struct log_entry *le); // - routines NULL out the **le pointers on entry, then set the **le pointers to
int toku_logcursor_next(TOKULOGCURSOR lc, struct log_entry *le); // the malloc'ed entries when successful,
int toku_logcursor_prev(TOKULOGCURSOR lc, struct log_entry *le); int toku_logcursor_next(TOKULOGCURSOR lc, struct log_entry **le);
int toku_logcursor_prev(TOKULOGCURSOR lc, struct log_entry **le);
int toku_logcursor_first(const TOKULOGCURSOR lc, struct log_entry *le); int toku_logcursor_first(const TOKULOGCURSOR lc, struct log_entry **le);
int toku_logcursor_last(const TOKULOGCURSOR lc, struct log_entry *le); int toku_logcursor_last(const TOKULOGCURSOR lc, struct log_entry **le);
#endif // TOKULOGCURSOR_H #endif // TOKULOGCURSOR_H
...@@ -373,7 +373,33 @@ generate_log_reader (void) { ...@@ -373,7 +373,33 @@ generate_log_reader (void) {
fprintf(cf, " r = fseek(infile, -(int)len, SEEK_CUR); assert(r==0);\n"); fprintf(cf, " r = fseek(infile, -(int)len, SEEK_CUR); assert(r==0);\n");
//fprintf(cf, " if (r!=0) return errno;\n"); //fprintf(cf, " if (r!=0) return errno;\n");
fprintf(cf, " return 0;\n"); fprintf(cf, " return 0;\n");
fprintf(cf, "}\n"); fprintf(cf, "}\n\n");
int free_count=0;
DO_LOGTYPES(lt, {
free_count=0;
fprintf(cf, "static void toku_log_free_log_entry_%s_resources (struct logtype_%s *data)", lt->name, lt->name);
fprintf(cf, " {\n");
DO_FIELDS(ft, lt, {
if ( strcmp(ft->type, "BYTESTRING") == 0 ) {
fprintf(cf, " toku_free_BYTESTRING(data->%s);\n", ft->name);
free_count++;
}
});
if ( free_count == 0 ) fprintf(cf, " struct logtype_%s *dummy __attribute__ ((unused)) = data;\n", lt->name);
fprintf(cf, "}\n\n");
});
fprintf2(cf, hf, "void toku_log_free_log_entry_resources (struct log_entry *le)");
fprintf(hf, ";\n");
fprintf(cf, " {\n");
fprintf(cf, " switch ((enum lt_cmd)le->cmd) {\n");
DO_LOGTYPES(lt, {
fprintf(cf, " case LT_%s:\n", lt->name);
fprintf(cf, " return toku_log_free_log_entry_%s_resources (&(le->u.%s));\n", lt->name, lt->name);
});
fprintf(cf, " };\n");
fprintf(cf, " return;\n");
fprintf(cf, "}\n\n");
} }
static void static void
......
...@@ -7,35 +7,35 @@ ...@@ -7,35 +7,35 @@
int test_main(int argc __attribute__((unused)), const char *argv[] __attribute__((unused))) { int test_main(int argc __attribute__((unused)), const char *argv[] __attribute__((unused))) {
int r=0; int r=0;
char dbdir[100] = "/home/wells/svn/tokudb.1857/src/tests/dir.x1.c.tdb"; char dbdir[100] = "./dir.test_logcursor.tdb";
struct toku_logcursor *cursor; struct toku_logcursor *cursor;
struct log_entry entry; struct log_entry *entry;
r = toku_logcursor_create(&cursor, dbdir); r = toku_logcursor_create(&cursor, dbdir);
if ( r!=0 ) return r; if ( r!=0 ) return r;
r = toku_logcursor_next(cursor, &entry); printf("Entry = %c\n", entry.cmd); r = toku_logcursor_next(cursor, &entry); printf("Entry = %c\n", entry->cmd);
r = toku_logcursor_next(cursor, &entry); printf("Entry = %c\n", entry.cmd); r = toku_logcursor_next(cursor, &entry); printf("Entry = %c\n", entry->cmd);
r = toku_logcursor_next(cursor, &entry); printf("Entry = %c\n", entry.cmd); r = toku_logcursor_next(cursor, &entry); printf("Entry = %c\n", entry->cmd);
printf("r=%d\n", r); printf("r=%d\n", r);
r = toku_logcursor_destroy(&cursor); r = toku_logcursor_destroy(&cursor);
r = toku_logcursor_create(&cursor, dbdir); r = toku_logcursor_create(&cursor, dbdir);
if ( r!=0 ) return r; if ( r!=0 ) return r;
r = toku_logcursor_prev(cursor, &entry); printf("Entry = %c\n", entry.cmd); r = toku_logcursor_prev(cursor, &entry); printf("Entry = %c\n", entry->cmd);
r = toku_logcursor_prev(cursor, &entry); printf("Entry = %c\n", entry.cmd); r = toku_logcursor_prev(cursor, &entry); printf("Entry = %c\n", entry->cmd);
r = toku_logcursor_prev(cursor, &entry); printf("Entry = %c\n", entry.cmd); r = toku_logcursor_prev(cursor, &entry); printf("Entry = %c\n", entry->cmd);
printf("r=%d\n", r); printf("r=%d\n", r);
r = toku_logcursor_destroy(&cursor); r = toku_logcursor_destroy(&cursor);
r = toku_logcursor_create(&cursor, dbdir); r = toku_logcursor_create(&cursor, dbdir);
if ( r!=0 ) return r; if ( r!=0 ) return r;
r = toku_logcursor_next(cursor, &entry); printf("Entry = %c\n", entry.cmd); r = toku_logcursor_next(cursor, &entry); printf("Entry = %c\n", entry->cmd);
r = toku_logcursor_next(cursor, &entry); printf("Entry = %c\n", entry.cmd); r = toku_logcursor_next(cursor, &entry); printf("Entry = %c\n", entry->cmd);
r = toku_logcursor_next(cursor, &entry); printf("Entry = %c\n", entry.cmd); r = toku_logcursor_next(cursor, &entry); printf("Entry = %c\n", entry->cmd);
r = toku_logcursor_prev(cursor, &entry); printf("Entry = %c\n", entry.cmd); r = toku_logcursor_prev(cursor, &entry); printf("Entry = %c\n", entry->cmd);
r = toku_logcursor_prev(cursor, &entry); printf("Entry = %c\n", entry.cmd); r = toku_logcursor_prev(cursor, &entry); printf("Entry = %c\n", entry->cmd);
r = toku_logcursor_prev(cursor, &entry); printf("Entry = %c\n", entry.cmd); r = toku_logcursor_prev(cursor, &entry); printf("Entry = %c\n", entry->cmd);
r = toku_logcursor_prev(cursor, &entry); printf("Entry = %c\n", entry.cmd); r = toku_logcursor_prev(cursor, &entry); printf("Entry = %c\n", entry->cmd);
if ( r == DB_NOTFOUND ) printf("PASS\n"); else printf("FAIL\n"); if ( r == DB_NOTFOUND ) printf("PASS\n"); else printf("FAIL\n");
r = toku_logcursor_destroy(&cursor); r = toku_logcursor_destroy(&cursor);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment