Commit a7b11d26 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

merge 1967.mtm2 to main refs[t:1967]

git-svn-id: file:///svn/toku/tokudb@15205 c7de825b-a66e-492c-adef-691d508d4ae1
parent 51c361c5
...@@ -61,6 +61,7 @@ double compressibility = -1; // -1 means make it very compressible. 1 means use ...@@ -61,6 +61,7 @@ double compressibility = -1; // -1 means make it very compressible. 1 means use
int do_append = 0; int do_append = 0;
int do_checkpoint_period = 0; int do_checkpoint_period = 0;
u_int32_t checkpoint_period = 0; u_int32_t checkpoint_period = 0;
static const char *log_dir = NULL;
static void do_prelock(DB* db, DB_TXN* txn) { static void do_prelock(DB* db, DB_TXN* txn) {
if (prelock) { if (prelock) {
...@@ -119,11 +120,15 @@ static void benchmark_setup (void) { ...@@ -119,11 +120,15 @@ static void benchmark_setup (void) {
if (r != 0) if (r != 0)
printf("WARNING: set_cachesize %d\n", r); printf("WARNING: set_cachesize %d\n", r);
} }
{
r = dbenv->open(dbenv, dbdir, env_open_flags, S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH); if (log_dir) {
r = dbenv->set_lg_dir(dbenv, log_dir);
assert(r == 0); assert(r == 0);
} }
r = dbenv->open(dbenv, dbdir, env_open_flags, S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
assert(r == 0);
#if defined(TOKUDB) #if defined(TOKUDB)
if (do_checkpoint_period) { if (do_checkpoint_period) {
r = dbenv->checkpointing_set_period(dbenv, checkpoint_period); r = dbenv->checkpointing_set_period(dbenv, checkpoint_period);
...@@ -364,12 +369,13 @@ static int print_usage (const char *argv0) { ...@@ -364,12 +369,13 @@ static int print_usage (const char *argv0) {
fprintf(stderr, " --prelockflag Prelock the database and send the DB_PRELOCKED_WRITE flag.\n"); fprintf(stderr, " --prelockflag Prelock the database and send the DB_PRELOCKED_WRITE flag.\n");
fprintf(stderr, " --abort Abort the singlex after the transaction is over. (Requires --singlex.)\n"); fprintf(stderr, " --abort Abort the singlex after the transaction is over. (Requires --singlex.)\n");
fprintf(stderr, " --nolog If transactions are used, then don't write the recovery log\n"); fprintf(stderr, " --nolog If transactions are used, then don't write the recovery log\n");
fprintf(stderr, " --log_dir LOGDIR Put the logs in LOGDIR\n");
fprintf(stderr, " --env DIR\n");
fprintf(stderr, " --periter N how many insertions per iteration (default=%d)\n", DEFAULT_ITEMS_TO_INSERT_PER_ITERATION); fprintf(stderr, " --periter N how many insertions per iteration (default=%d)\n", DEFAULT_ITEMS_TO_INSERT_PER_ITERATION);
fprintf(stderr, " --DB_INIT_TXN (1|0) turn on or off the DB_INIT_TXN env_open_flag\n"); fprintf(stderr, " --DB_INIT_TXN (1|0) turn on or off the DB_INIT_TXN env_open_flag\n");
fprintf(stderr, " --DB_INIT_LOG (1|0) turn on or off the DB_INIT_LOG env_open_flag\n"); fprintf(stderr, " --DB_INIT_LOG (1|0) turn on or off the DB_INIT_LOG env_open_flag\n");
fprintf(stderr, " --DB_INIT_LOCK (1|0) turn on or off the DB_INIT_LOCK env_open_flag\n"); fprintf(stderr, " --DB_INIT_LOCK (1|0) turn on or off the DB_INIT_LOCK env_open_flag\n");
fprintf(stderr, " --1514 do a point query for something not there at end. See #1514. (Requires --norandom)\n"); fprintf(stderr, " --1514 do a point query for something not there at end. See #1514. (Requires --norandom)\n");
fprintf(stderr, " --env DIR\n");
fprintf(stderr, " --append append to an existing file\n"); fprintf(stderr, " --append append to an existing file\n");
fprintf(stderr, " --checkpoint-period %"PRIu32" checkpoint period\n", checkpoint_period); fprintf(stderr, " --checkpoint-period %"PRIu32" checkpoint period\n", checkpoint_period);
fprintf(stderr, " n_iterations how many iterations (default %lld)\n", default_n_items/DEFAULT_ITEMS_TO_INSERT_PER_ITERATION); fprintf(stderr, " n_iterations how many iterations (default %lld)\n", default_n_items/DEFAULT_ITEMS_TO_INSERT_PER_ITERATION);
...@@ -515,6 +521,9 @@ int main (int argc, const char *argv[]) { ...@@ -515,6 +521,9 @@ int main (int argc, const char *argv[]) {
put_flags = DB_NOOVERWRITE; put_flags = DB_NOOVERWRITE;
else else
put_flags = DB_YESOVERWRITE; put_flags = DB_YESOVERWRITE;
} else if (strcmp(arg, "--log_dir") == 0) {
if (i+1 >= argc) return print_usage(argv[0]);
log_dir = argv[++i];
} else { } else {
return print_usage(argv[0]); return print_usage(argv[0]);
} }
......
...@@ -25,6 +25,7 @@ static int do_mysql = 0; ...@@ -25,6 +25,7 @@ static int do_mysql = 0;
static u_int64_t start_range = 0, end_range = 0; static u_int64_t start_range = 0, end_range = 0;
static int n_experiments = 2; static int n_experiments = 2;
static int verbose = 0; static int verbose = 0;
static const char *log_dir = NULL;
static int print_usage (const char *argv0) { static int print_usage (const char *argv0) {
fprintf(stderr, "Usage:\n%s [--verify-lwc | --lwc | --nohwc] [--prelock] [--prelockflag] [--prelockwriteflag] [--env DIR]\n", argv0); fprintf(stderr, "Usage:\n%s [--verify-lwc | --lwc | --nohwc] [--prelock] [--prelockflag] [--prelockwriteflag] [--env DIR]\n", argv0);
...@@ -40,6 +41,7 @@ static int print_usage (const char *argv0) { ...@@ -40,6 +41,7 @@ static int print_usage (const char *argv0) {
fprintf(stderr, " --cachesize <n> set the env cachesize to <n>\n"); fprintf(stderr, " --cachesize <n> set the env cachesize to <n>\n");
fprintf(stderr, " --mysql compare keys that are mysql big int not null types\n"); fprintf(stderr, " --mysql compare keys that are mysql big int not null types\n");
fprintf(stderr, " --env DIR put db files in DIR instead of default\n"); fprintf(stderr, " --env DIR put db files in DIR instead of default\n");
fprintf(stderr, " --log_dir LOGDIR put the logs in LOGDIR\n");
return 1; return 1;
} }
...@@ -103,6 +105,10 @@ static void parse_args (int argc, const char *argv[]) { ...@@ -103,6 +105,10 @@ static void parse_args (int argc, const char *argv[]) {
argc--; argv++; argc--; argv++;
if (argc==0) exit(print_usage(pname)); if (argc==0) exit(print_usage(pname));
dbdir = *argv; dbdir = *argv;
} else if (strcmp(*argv, "--log_dir") == 0) {
argc--; argv++;
if (argc==0) exit(print_usage(pname));
log_dir = *argv;
} else if (strcmp(*argv, "--mysql") == 0) { } else if (strcmp(*argv, "--mysql") == 0) {
do_mysql = 1; do_mysql = 1;
} else if (strcmp(*argv, "--range") == 0 && argc > 2) { } else if (strcmp(*argv, "--range") == 0 && argc > 2) {
...@@ -155,6 +161,9 @@ static void scanscan_setup (void) { ...@@ -155,6 +161,9 @@ static void scanscan_setup (void) {
int r; int r;
r = db_env_create(&env, 0); assert(r==0); r = db_env_create(&env, 0); assert(r==0);
r = env->set_cachesize(env, 0, cachesize, 1); assert(r==0); r = env->set_cachesize(env, 0, cachesize, 1); assert(r==0);
if (log_dir) {
r = env->set_lg_dir(env, log_dir); assert(r==0);
}
double tstart = gettime(); double tstart = gettime();
r = env->open(env, dbdir, do_txns? env_open_flags_yesx : env_open_flags_nox, S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH); assert(r==0); r = env->open(env, dbdir, do_txns? env_open_flags_yesx : env_open_flags_nox, S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH); assert(r==0);
double tend = gettime(); double tend = gettime();
......
...@@ -75,5 +75,3 @@ typedef struct { ...@@ -75,5 +75,3 @@ typedef struct {
void toku_checkpoint_get_status(CHECKPOINT_STATUS stat); void toku_checkpoint_get_status(CHECKPOINT_STATUS stat);
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
// If the buf would overflow, then grab the file lock, swap file&buf, release buf lock, write the file, write the entry, release the file lock // If the buf would overflow, then grab the file lock, swap file&buf, release buf lock, write the file, write the entry, release the file lock
// else append to buf & release lock // else append to buf & release lock
#define LOGGER_BUF_SIZE (1<<24) #define LOGGER_MIN_BUF_SIZE (1<<24)
struct mylock { struct mylock {
toku_pthread_mutex_t lock; toku_pthread_mutex_t lock;
...@@ -50,12 +50,18 @@ static inline int ml_destroy(struct mylock *l) { ...@@ -50,12 +50,18 @@ static inline int ml_destroy(struct mylock *l) {
return toku_pthread_mutex_destroy(&l->lock); return toku_pthread_mutex_destroy(&l->lock);
} }
struct logbuf {
int n_in_buf;
int buf_size;
char *buf;
LSN max_lsn_in_buf;
};
struct tokulogger { struct tokulogger {
enum typ_tag tag; // must be first enum typ_tag tag; // must be first
struct mylock input_lock, output_lock; // acquired in that order struct mylock input_lock, output_lock; // acquired in that order
int is_open; BOOL is_open;
int is_panicked; BOOL is_panicked;
BOOL write_log_files; BOOL write_log_files;
BOOL trim_log_files; // for test purposes BOOL trim_log_files; // for test purposes
int panic_errno; int panic_errno;
...@@ -65,18 +71,17 @@ struct tokulogger { ...@@ -65,18 +71,17 @@ struct tokulogger {
int lg_max; // The size of the single file in the log. Default is 100MB in TokuDB int lg_max; // The size of the single file in the log. Default is 100MB in TokuDB
// To access these, you must have the input lock // To access these, you must have the input lock
struct logbytes *head,*tail;
LSN lsn; // the next available lsn LSN lsn; // the next available lsn
OMT live_txns; // a sorted tree. Old comment said should be a hashtable. Do we still want that? OMT live_txns; // a sorted tree. Old comment said should be a hashtable. Do we still want that?
int n_in_buf; struct logbuf inbuf; // data being accumulated for the write
// To access these, you must have the output lock // To access these, you must have the output lock
LSN written_lsn; // the last lsn written LSN written_lsn; // the last lsn written
LSN fsynced_lsn; // What is the LSN of the highest fsynced log entry LSN fsynced_lsn; // What is the LSN of the highest fsynced log entry
LSN checkpoint_lsn; // What is the LSN of the most recent completed checkpoint. LSN checkpoint_lsn; // What is the LSN of the most recent completed checkpoint.
long long next_log_file_number; long long next_log_file_number;
char buf[LOGGER_BUF_SIZE]; // used to marshall logbytes so we can use only a single write struct logbuf outbuf; // data being written to the file
int n_in_file; int n_in_file; // The amount of data in the current file
TOKULOGFILEMGR logfilemgr; TOKULOGFILEMGR logfilemgr;
...@@ -112,8 +117,6 @@ struct tokutxn { ...@@ -112,8 +117,6 @@ struct tokutxn {
XIDS xids; //Represents the xid list XIDS xids; //Represents the xid list
}; };
int toku_logger_finish (TOKULOGGER logger, struct logbytes *logbytes, struct wbuf *wbuf, int do_fsync);
static inline int toku_logsizeof_u_int8_t (u_int32_t v __attribute__((__unused__))) { static inline int toku_logsizeof_u_int8_t (u_int32_t v __attribute__((__unused__))) {
return 1; return 1;
} }
......
...@@ -14,15 +14,6 @@ ...@@ -14,15 +14,6 @@
#include "bread.h" #include "bread.h"
#include "x1764.h" #include "x1764.h"
struct logbytes {
struct logbytes *next;
int nbytes;
LSN lsn;
char bytes[1];
};
#define MALLOC_LOGBYTES(n) toku_malloc(sizeof(struct logbytes)+n -1)
typedef void(*voidfp)(void); typedef void(*voidfp)(void);
typedef void(*YIELDF)(voidfp, void*); typedef void(*YIELDF)(voidfp, void*);
struct roll_entry; struct roll_entry;
......
...@@ -295,12 +295,14 @@ generate_log_writer (void) { ...@@ -295,12 +295,14 @@ generate_log_writer (void) {
DO_FIELDS(ft, lt, fprintf2(cf, hf, ", %s %s", ft->type, ft->name)); DO_FIELDS(ft, lt, fprintf2(cf, hf, ", %s %s", ft->type, ft->name));
fprintf(hf, ");\n"); fprintf(hf, ");\n");
fprintf(cf, ") {\n"); fprintf(cf, ") {\n");
fprintf(cf, " int r = 0;\n");
fprintf(cf, " if (logger==0) return 0;\n"); fprintf(cf, " if (logger==0) return 0;\n");
fprintf(cf, " if (!logger->write_log_files) {\n"); fprintf(cf, " if (!logger->write_log_files) {\n");
fprintf(cf, " ml_lock(&logger->input_lock);\n"); fprintf(cf, " ml_lock(&logger->input_lock);\n");
fprintf(cf, " logger->lsn.lsn += toku_lsn_increment;\n"); fprintf(cf, " logger->lsn.lsn += toku_lsn_increment;\n");
fprintf(cf, " if (lsnp) *lsnp=logger->lsn;\n"); fprintf(cf, " if (lsnp) *lsnp=logger->lsn;\n");
fprintf(cf, " ml_unlock(&logger->input_lock);\n"); fprintf(cf, " r = ml_unlock(&logger->input_lock);\n");
fprintf(cf, " if (r!=0) goto panic;\n");
fprintf(cf, " return 0;\n"); fprintf(cf, " return 0;\n");
fprintf(cf, " }\n"); fprintf(cf, " }\n");
fprintf(cf, " const unsigned int buflen= (+4 // len at the beginning\n"); fprintf(cf, " const unsigned int buflen= (+4 // len at the beginning\n");
...@@ -311,23 +313,31 @@ generate_log_writer (void) { ...@@ -311,23 +313,31 @@ generate_log_writer (void) {
fprintf(cf, " +8 // crc + len\n"); fprintf(cf, " +8 // crc + len\n");
fprintf(cf, " );\n"); fprintf(cf, " );\n");
fprintf(cf, " struct wbuf wbuf;\n"); fprintf(cf, " struct wbuf wbuf;\n");
fprintf(cf, " struct logbytes *lbytes = MALLOC_LOGBYTES(buflen);\n"); fprintf(cf, " r = ml_lock(&logger->input_lock);\n");
fprintf(cf, " if (lbytes==0) return errno;\n"); fprintf(cf, " if (r!=0) goto panic;\n");
fprintf(cf, " wbuf_init(&wbuf, &lbytes->bytes[0], buflen);\n"); fprintf(cf, " r = toku_logger_make_space_in_inbuf(logger, buflen);\n");
fprintf(cf, " wbuf_int(&wbuf, buflen);\n"); fprintf(cf, " if (r!=0) goto panic;\n");
fprintf(cf, " wbuf_char(&wbuf, '%c');\n", (char)(0xff&lt->command_and_flags)); fprintf(cf, " wbuf_nocrc_init(&wbuf, logger->inbuf.buf+logger->inbuf.n_in_buf, buflen);\n");
fprintf(cf, " ml_lock(&logger->input_lock);\n"); fprintf(cf, " wbuf_nocrc_int(&wbuf, buflen);\n");
fprintf(cf, " wbuf_nocrc_char(&wbuf, '%c');\n", (char)(0xff&lt->command_and_flags));
fprintf(cf, " logger->lsn.lsn += toku_lsn_increment;\n"); fprintf(cf, " logger->lsn.lsn += toku_lsn_increment;\n");
fprintf(cf, " LSN lsn = logger->lsn;\n"); fprintf(cf, " LSN lsn =logger->lsn;\n");
fprintf(cf, " wbuf_LSN(&wbuf, lsn);\n"); fprintf(cf, " logger->inbuf.max_lsn_in_buf = lsn;\n");
fprintf(cf, " lbytes->lsn = lsn;\n"); fprintf(cf, " wbuf_nocrc_LSN(&wbuf, lsn);\n");
fprintf(cf, " if (lsnp) *lsnp=logger->lsn;\n"); fprintf(cf, " if (lsnp) *lsnp=lsn;\n");
DO_FIELDS(ft, lt, DO_FIELDS(ft, lt,
if (strcmp(ft->name, "timestamp") == 0) if (strcmp(ft->name, "timestamp") == 0)
fprintf(cf, " if (timestamp == 0) timestamp = toku_get_timestamp();\n"); fprintf(cf, " if (timestamp == 0) timestamp = toku_get_timestamp();\n");
fprintf(cf, " wbuf_%s(&wbuf, %s);\n", ft->type, ft->name)); fprintf(cf, " wbuf_nocrc_%s(&wbuf, %s);\n", ft->type, ft->name));
fprintf(cf, " int r= toku_logger_finish(logger, lbytes, &wbuf, do_fsync);\n"); fprintf(cf, " wbuf_nocrc_int(&wbuf, x1764_memory(wbuf.buf, wbuf.ndone));\n");
fprintf(cf, " wbuf_nocrc_int(&wbuf, buflen);\n");
fprintf(cf, " assert(wbuf.ndone==buflen);\n"); fprintf(cf, " assert(wbuf.ndone==buflen);\n");
fprintf(cf, " logger->inbuf.n_in_buf += buflen;\n");
fprintf(cf, " r = toku_logger_maybe_fsync(logger, lsn, do_fsync);\n");
fprintf(cf, " if (r!=0) goto panic;\n");
fprintf(cf, " return 0;\n");
fprintf(cf, " panic:\n");
fprintf(cf, " toku_logger_panic(logger, r);\n");
fprintf(cf, " return r;\n"); fprintf(cf, " return r;\n");
fprintf(cf, "}\n\n"); fprintf(cf, "}\n\n");
}); });
......
...@@ -12,8 +12,8 @@ static u_int32_t logger_lock_ctr = 0; // useful for debug at a live installation ...@@ -12,8 +12,8 @@ static u_int32_t logger_lock_ctr = 0; // useful for debug at a live installation
static int (*toku_os_fsync_function)(int)=fsync; static int (*toku_os_fsync_function)(int)=fsync;
static int open_logfile (TOKULOGGER logger); static int open_logfile (TOKULOGGER logger);
static int toku_logger_write_buffer (TOKULOGGER logger, int do_fsync);
static int delete_logfile(TOKULOGGER logger, long long index); static int delete_logfile(TOKULOGGER logger, long long index);
static int do_write (TOKULOGGER logger, int do_fsync);
u_int32_t toku_logger_get_lock_ctr(void) { u_int32_t toku_logger_get_lock_ctr(void) {
return logger_lock_ctr; return logger_lock_ctr;
...@@ -23,32 +23,34 @@ int toku_logger_create (TOKULOGGER *resultp) { ...@@ -23,32 +23,34 @@ int toku_logger_create (TOKULOGGER *resultp) {
int r; int r;
TAGMALLOC(TOKULOGGER, result); TAGMALLOC(TOKULOGGER, result);
if (result==0) return errno; if (result==0) return errno;
result->is_open=0; result->is_open=FALSE;
result->is_panicked=0; result->is_panicked=FALSE;
result->panic_errno = 0;
result->write_log_files = TRUE; result->write_log_files = TRUE;
result->trim_log_files = TRUE; result->trim_log_files = TRUE;
result->lg_max = 100<<20; // 100MB default
result->head = result->tail = 0;
result->lsn = result->written_lsn = result->fsynced_lsn = (LSN){0};
r = toku_omt_create(&result->live_txns); if (r!=0) goto died0;
result->n_in_buf=0;
result->n_in_file=0;
result->directory=0; result->directory=0;
result->checkpoint_lsn=(LSN){0}; // fd is uninitialized on purpose
result->oldest_living_xid = TXNID_NONE_LIVING; // ct is uninitialized on purpose
result->lg_max = 100<<20; // 100MB default
// lsn is uninitialized
r = toku_omt_create(&result->live_txns); if (r!=0) goto panic;
result->inbuf = (struct logbuf) {0, LOGGER_MIN_BUF_SIZE, toku_xmalloc(LOGGER_MIN_BUF_SIZE), ZERO_LSN};
result->outbuf = (struct logbuf) {0, LOGGER_MIN_BUF_SIZE, toku_xmalloc(LOGGER_MIN_BUF_SIZE), ZERO_LSN};
// written_lsn is uninitialized
// fsynced_lsn is uninitialized
result->checkpoint_lsn = ZERO_LSN;
// next_log_file_number is uninitialized
// n_in_file is uninitialized
result->write_block_size = BRT_DEFAULT_NODE_SIZE; // default logging size is the same as the default brt block size result->write_block_size = BRT_DEFAULT_NODE_SIZE; // default logging size is the same as the default brt block size
result->oldest_living_xid = TXNID_NONE_LIVING;
toku_logfilemgr_create(&result->logfilemgr); toku_logfilemgr_create(&result->logfilemgr);
*resultp=result; *resultp=result;
r = ml_init(&result->input_lock); if (r!=0) goto died1; r = ml_init(&result->input_lock); if (r!=0) goto panic;
r = ml_init(&result->output_lock); if (r!=0) goto died2; r = ml_init(&result->output_lock); if (r!=0) goto panic;
return 0; return 0;
died2: panic:
ml_destroy(&result->input_lock); toku_logger_panic(result, r);
died1:
toku_omt_destroy(&result->live_txns);
died0:
toku_free(result);
return r; return r;
} }
...@@ -61,9 +63,10 @@ int toku_logger_open (const char *directory, TOKULOGGER logger) { ...@@ -61,9 +63,10 @@ int toku_logger_open (const char *directory, TOKULOGGER logger) {
if ( r!=0 ) if ( r!=0 )
return r; return r;
logger->lsn = toku_logfilemgr_get_last_lsn(logger->logfilemgr); logger->lsn = toku_logfilemgr_get_last_lsn(logger->logfilemgr);
//printf("starting after LSN=%lu\n", logger->lsn.lsn);
logger->written_lsn = logger->lsn; logger->written_lsn = logger->lsn;
logger->fsynced_lsn = logger->lsn; logger->fsynced_lsn = logger->lsn;
logger->inbuf.max_lsn_in_buf = logger->lsn;
logger->outbuf.max_lsn_in_buf = logger->lsn;
long long nexti; long long nexti;
r = toku_logger_find_next_unused_log_file(directory, &nexti); r = toku_logger_find_next_unused_log_file(directory, &nexti);
...@@ -89,50 +92,9 @@ int toku_logger_open (const char *directory, TOKULOGGER logger) { ...@@ -89,50 +92,9 @@ int toku_logger_open (const char *directory, TOKULOGGER logger) {
return 0; return 0;
} }
// enter holding input_lock
// exit holding no locks
int toku_logger_log_bytes (TOKULOGGER logger, struct logbytes *bytes, int do_fsync) {
int r;
if (logger->is_panicked) return EINVAL;
logger->n_in_buf += bytes->nbytes;
if (logger->tail) {
logger->tail->next=bytes;
} else {
logger->head = bytes;
}
logger->tail = bytes;
bytes->next = 0;
if (logger->n_in_buf >= LOGGER_BUF_SIZE || do_fsync) {
// We must flush it
r=ml_unlock(&logger->input_lock); if (r!=0) goto panic;
r=ml_lock(&logger->output_lock); if (r!=0) goto panic;
if (logger->written_lsn.lsn < bytes->lsn.lsn) {
// We found that our record has not yet been written, so we must write it, and everything else
r=ml_lock(&logger->input_lock); if (r!=0) goto panic;
r=do_write(logger, do_fsync); if (r!=0) goto panic;
} else {
/* Our LSN has been written. We have the output lock */
if (do_fsync && logger->fsynced_lsn.lsn > bytes->lsn.lsn) {
/* But we need to fsync it. */
if (logger->write_log_files) {
r = toku_os_fsync_function(logger->fd); assert(r == 0);
}
logger->fsynced_lsn = logger->written_lsn;
}
}
r=ml_unlock(&logger->output_lock); if (r!=0) goto panic;
} else {
r=ml_unlock(&logger->input_lock); if (r!=0) goto panic;
}
return 0;
panic:
toku_logger_panic(logger, r);
return r;
}
// No locks held on entry // No locks held on entry
// No locks held on exit. // No locks held on exit.
// No locks are needed, since you cannot legally close the log concurrently with doing anything else. // Perhaps no locks are needed, since you cannot legally close the log concurrently with doing anything else.
// But grab the locks just to be careful, including one to prevent access // But grab the locks just to be careful, including one to prevent access
// between unlocking and destroying. // between unlocking and destroying.
int toku_logger_close(TOKULOGGER *loggerp) { int toku_logger_close(TOKULOGGER *loggerp) {
...@@ -146,7 +108,7 @@ int toku_logger_close(TOKULOGGER *loggerp) { ...@@ -146,7 +108,7 @@ int toku_logger_close(TOKULOGGER *loggerp) {
r = toku_pthread_mutex_lock(&logger_mutex); if (r!=0) goto panic; r = toku_pthread_mutex_lock(&logger_mutex); if (r!=0) goto panic;
logger_lock_ctr++; logger_lock_ctr++;
locked_logger = 1; locked_logger = 1;
r = do_write(logger, 1); if (r!=0) goto panic; //Releases the input lock r = toku_logger_write_buffer(logger, 1); if (r!=0) goto panic; //Releases the input lock
if (logger->fd!=-1) { if (logger->fd!=-1) {
r = close(logger->fd); if (r!=0) { r=errno; goto panic; } r = close(logger->fd); if (r!=0) { r=errno; goto panic; }
} }
...@@ -154,6 +116,8 @@ int toku_logger_close(TOKULOGGER *loggerp) { ...@@ -154,6 +116,8 @@ int toku_logger_close(TOKULOGGER *loggerp) {
r = ml_unlock(&logger->output_lock); if (r!=0) goto panic; r = ml_unlock(&logger->output_lock); if (r!=0) goto panic;
is_closed: is_closed:
toku_free(logger->inbuf.buf);
toku_free(logger->outbuf.buf);
r = ml_destroy(&logger->output_lock); if (r!=0) goto panic; r = ml_destroy(&logger->output_lock); if (r!=0) goto panic;
r = ml_destroy(&logger->input_lock); if (r!=0) goto panic; r = ml_destroy(&logger->input_lock); if (r!=0) goto panic;
logger->is_panicked=1; // Just in case this might help. logger->is_panicked=1; // Just in case this might help.
...@@ -181,16 +145,91 @@ int toku_logger_shutdown(TOKULOGGER logger) { ...@@ -181,16 +145,91 @@ int toku_logger_shutdown(TOKULOGGER logger) {
return r; return r;
} }
#if 0 // Write data to a file. Keep trying even if partial writes occur.
int toku_logger_log_checkpoint (TOKULOGGER logger) { // On error: Return negative with errno set.
if (logger->is_panicked) return EINVAL; // On success return nbytes.
int r = toku_cachetable_checkpoint(logger->ct); static int write_it (int fd, const void *bufv, int nbytes) {
if (r!=0) return r; int org_nbytes=nbytes;
logger->checkpoint_lsns[1]=logger->checkpoint_lsns[0]; const char *buf=bufv;
logger->checkpoint_lsns[0]=logger->lsn; while (nbytes>0) {
return toku_log_checkpoint(logger, (LSN*)0, 1); int r = write(fd, buf, nbytes);
if (r<0 || errno!=EAGAIN) return r;
buf+=r;
nbytes-=r;
}
return org_nbytes;
}
// close the current file, and open the next one.
// Entry: The output lock is held.
// Exit: The output lock is stlil held.
static int close_and_open_logfile (TOKULOGGER logger) {
int r;
if (logger->write_log_files) {
r = toku_os_fsync_function(logger->fd); if (r!=0) return errno;
assert(logger->fsynced_lsn.lsn <= logger->written_lsn.lsn);
logger->fsynced_lsn = logger->written_lsn;
}
r = close(logger->fd); if (r!=0) return errno;
return open_logfile(logger);
}
static int
max (int a, int b)
{
if (a>b) return a;
return b;
}
int
toku_logger_make_space_in_inbuf (TOKULOGGER logger, int n_bytes_needed)
// Entry: Holds the inlock
// Exit: Holds the inlock
// Effect: Upon exit, the inlock is held and there are at least n_bytes_needed in the buffer.
// May release the inlock, so this is not atomic.
// Implementation: Makes space in the inbuf, possibly by writing the inbuf to disk or increasing the size of the inbuf. There is no fsync.
// Arguments: logger: the logger (side effects)
// n_bytes_needed: how many bytes to make space for.
{
int r;
if (logger->inbuf.n_in_buf + n_bytes_needed <= LOGGER_MIN_BUF_SIZE) return 0;
r = ml_unlock(&logger->input_lock); if (r!=0) goto panic;
r = ml_lock(&logger->output_lock); if (r!=0) goto panic;
r = ml_lock(&logger->input_lock); if (r!=0) goto panic;
// Some other thread may have written the log out while we didn't have the lock. If we can squeeze it in now, then be happy
if (logger->inbuf.n_in_buf + n_bytes_needed <= LOGGER_MIN_BUF_SIZE) return 0;
if (logger->inbuf.n_in_buf > 0) {
// There isn't enough space, and there is something in the buffer, so write the inbuf.
{
struct logbuf tmp = logger->inbuf;
logger->inbuf = logger->outbuf;
logger->outbuf = tmp;
assert(logger->inbuf.n_in_buf == 0);
}
r = write_it(logger->fd, logger->outbuf.buf, logger->outbuf.n_in_buf);
if (r!=logger->outbuf.n_in_buf) { r = errno; goto panic; }
assert(logger->outbuf.max_lsn_in_buf.lsn > logger->written_lsn.lsn); // since there is something in the buffer, its LSN must be bigger than what's previously written.
logger->written_lsn = logger->outbuf.max_lsn_in_buf;
logger->n_in_file += logger->outbuf.n_in_buf;
logger->outbuf.n_in_buf = 0;
if (logger->n_in_file > logger->lg_max) {
r = close_and_open_logfile(logger); if (r!=0) goto panic; // set
}
}
// the inbuf is empty. Is it big enough?
if (n_bytes_needed > logger->inbuf.buf_size) {
assert(n_bytes_needed < (1<<30)); // it seems unlikely to work if a logentry gets that big.
int new_size = max(logger->inbuf.buf_size * 2, n_bytes_needed); // make it at least twice as big, and big enough for n_bytes
assert(new_size < (1<<30));
XREALLOC_N(new_size, logger->inbuf.buf);
logger->inbuf.buf_size = new_size;
}
r = ml_unlock(&logger->output_lock); if (r!=0) goto panic;
return 0;
panic:
toku_logger_panic(logger, r);
return r;
} }
#endif
// Entry: Holds no locks // Entry: Holds no locks
// Exit: Holds no locks // Exit: Holds no locks
...@@ -200,7 +239,7 @@ int toku_logger_fsync (TOKULOGGER logger) { ...@@ -200,7 +239,7 @@ int toku_logger_fsync (TOKULOGGER logger) {
if (logger->is_panicked) return EINVAL; if (logger->is_panicked) return EINVAL;
r = ml_lock(&logger->output_lock); if (r!=0) goto panic; r = ml_lock(&logger->output_lock); if (r!=0) goto panic;
r = ml_lock(&logger->input_lock); if (r!=0) goto panic; r = ml_lock(&logger->input_lock); if (r!=0) goto panic;
r = do_write(logger, 1); r = toku_logger_write_buffer(logger, 1); if (r!=0) goto panic;
r = ml_unlock(&logger->output_lock); if (r!=0) goto panic; r = ml_unlock(&logger->output_lock); if (r!=0) goto panic;
return 0; return 0;
panic: panic:
...@@ -208,8 +247,6 @@ int toku_logger_fsync (TOKULOGGER logger) { ...@@ -208,8 +247,6 @@ int toku_logger_fsync (TOKULOGGER logger) {
return r; return r;
} }
void toku_logger_panic (TOKULOGGER logger, int err) { void toku_logger_panic (TOKULOGGER logger, int err) {
logger->panic_errno=err; logger->panic_errno=err;
logger->is_panicked=1; logger->is_panicked=1;
...@@ -240,7 +277,6 @@ int toku_logger_get_lg_max(TOKULOGGER logger, u_int32_t *lg_maxp) { ...@@ -240,7 +277,6 @@ int toku_logger_get_lg_max(TOKULOGGER logger, u_int32_t *lg_maxp) {
if (logger->is_panicked) return EINVAL; if (logger->is_panicked) return EINVAL;
*lg_maxp = logger->lg_max; *lg_maxp = logger->lg_max;
return 0; return 0;
} }
int toku_logger_set_lg_bsize(TOKULOGGER logger, u_int32_t bsize) { int toku_logger_set_lg_bsize(TOKULOGGER logger, u_int32_t bsize) {
...@@ -267,16 +303,16 @@ int toku_logger_lock_destroy(void) { ...@@ -267,16 +303,16 @@ int toku_logger_lock_destroy(void) {
int toku_logger_find_next_unused_log_file(const char *directory, long long *result) { int toku_logger_find_next_unused_log_file(const char *directory, long long *result) {
DIR *d=opendir(directory); DIR *d=opendir(directory);
long long max=-1; *result = max; long long maxf=-1; *result = maxf;
struct dirent *de; struct dirent *de;
if (d==0) return errno; if (d==0) return errno;
while ((de=readdir(d))) { while ((de=readdir(d))) {
if (de==0) return errno; if (de==0) return errno;
long long thisl; long long thisl;
int r = sscanf(de->d_name, "log%lld.tokulog", &thisl); int r = sscanf(de->d_name, "log%lld.tokulog", &thisl);
if (r==1 && thisl>max) max=thisl; if (r==1 && thisl>maxf) maxf=thisl;
} }
*result=max+1; *result=maxf+1;
int r = closedir(d); int r = closedir(d);
return r; return r;
} }
...@@ -326,21 +362,6 @@ int toku_logger_find_logfiles (const char *directory, char ***resultp, int *n_lo ...@@ -326,21 +362,6 @@ int toku_logger_find_logfiles (const char *directory, char ***resultp, int *n_lo
return d ? closedir(d) : 0; return d ? closedir(d) : 0;
} }
// Write something out. Keep trying even if partial writes occur.
// On error: Return negative with errno set.
// On success return nbytes.
static int write_it (int fd, const void *bufv, int nbytes) {
int org_nbytes=nbytes;
const char *buf=bufv;
while (nbytes>0) {
int r = write(fd, buf, nbytes);
if (r<0 || errno!=EAGAIN) return r;
buf+=r;
nbytes-=r;
}
return org_nbytes;
}
static int open_logfile (TOKULOGGER logger) { static int open_logfile (TOKULOGGER logger) {
int r; int r;
int fnamelen = strlen(logger->directory)+50; int fnamelen = strlen(logger->directory)+50;
...@@ -407,15 +428,6 @@ int toku_logger_maybe_trim_log(TOKULOGGER logger, LSN trim_lsn) { ...@@ -407,15 +428,6 @@ int toku_logger_maybe_trim_log(TOKULOGGER logger, LSN trim_lsn) {
return r; return r;
} }
static int close_and_open_logfile (TOKULOGGER logger) {
int r;
if (logger->write_log_files) {
r = toku_os_fsync_function(logger->fd); if (r!=0) return errno;
}
r = close(logger->fd); if (r!=0) return errno;
return open_logfile(logger);
}
void toku_logger_write_log_files (TOKULOGGER logger, BOOL write_log_files) { void toku_logger_write_log_files (TOKULOGGER logger, BOOL write_log_files) {
assert(!logger->is_open); assert(!logger->is_open);
logger->write_log_files = write_log_files; logger->write_log_files = write_log_files;
...@@ -426,55 +438,55 @@ void toku_logger_trim_log_files (TOKULOGGER logger, BOOL trim_log_files) { ...@@ -426,55 +438,55 @@ void toku_logger_trim_log_files (TOKULOGGER logger, BOOL trim_log_files) {
logger->trim_log_files = trim_log_files; logger->trim_log_files = trim_log_files;
} }
// Enter holding both locks int toku_logger_maybe_fsync (TOKULOGGER logger, LSN lsn, int do_fsync)
// Exit holding only the output_lock // Effect: If fsync is nonzero, then make sure that the log is flushed and synced at least up to lsn.
static int do_write (TOKULOGGER logger, int do_fsync) { // Entry: Holds input lock.
// Exit: Holds no locks.
// The input lock may be released and then reacquired. Thus this function does not run atomically.
{
int r; int r;
struct logbytes *list = logger->head; BOOL have_input_lock = TRUE;
logger->head=logger->tail=0; if (do_fsync && logger->fsynced_lsn.lsn < lsn.lsn) {
logger->n_in_buf=0; // need to fsync and not enough is done
while (list) { // reacquire the locks (acquire output lock first)
if (logger->n_in_file + list->nbytes <= logger->lg_max) { r = ml_unlock(&logger->input_lock); if (r!=0) goto panic;
if (logger->n_in_buf + list->nbytes <= LOGGER_BUF_SIZE) { r = ml_lock(&logger->output_lock); if (r!=0) goto panic;
memcpy(logger->buf+logger->n_in_buf, list->bytes, list->nbytes); r = ml_lock(&logger->input_lock); if (r!=0) goto panic;
logger->n_in_buf+=list->nbytes;
logger->n_in_file+=list->nbytes;
logger->written_lsn = list->lsn; // it's possible that the written lsn is now written enough that we are happy. If not then do the I/O
struct logbytes *next=list->next; if (logger->fsynced_lsn.lsn < lsn.lsn) {
toku_free(list); // now we actually do the I/O
list=next; struct logbuf tmp = logger->inbuf;
} else { logger->inbuf = logger->outbuf;
// it doesn't fit in the buffer, but it does fit in the file. So flush the buffer logger->outbuf = tmp;
r=write_it(logger->fd, logger->buf, logger->n_in_buf); r = ml_unlock(&logger->input_lock); // release the input lock now, so group commit can operate
if (r!=logger->n_in_buf) { r=errno; goto panic; } if (r!=0) goto panic;
logger->n_in_buf=0; have_input_lock = FALSE;
// Special case for a log entry that's too big to fit in the buffer. if (logger->outbuf.n_in_buf>0) {
if (list->nbytes > LOGGER_BUF_SIZE) { r = write_it(logger->fd, logger->outbuf.buf, logger->outbuf.n_in_buf);
r=write_it(logger->fd, list->bytes, list->nbytes); if (r!=logger->outbuf.n_in_buf) { r = errno; goto panic; }
if (r!=list->nbytes) { r=errno; goto panic; } assert(logger->outbuf.max_lsn_in_buf.lsn > logger->written_lsn.lsn); // since there is something in the buffer, its LSN must be bigger than what's previously written.
logger->n_in_file+=list->nbytes; logger->written_lsn = logger->outbuf.max_lsn_in_buf;
logger->written_lsn = list->lsn; logger->n_in_file += logger->outbuf.n_in_buf;
struct logbytes *next=list->next; logger->outbuf.n_in_buf = 0;
toku_free(list);
list=next;
}
} }
if (logger->n_in_file > logger->lg_max) {
r = close_and_open_logfile(logger); if (r!=0) goto panic;
logger->fsynced_lsn = logger->outbuf.max_lsn_in_buf;
} else { } else {
// The new item doesn't fit in the file, so write the buffer, reopen the file, and try again assert(logger->fsynced_lsn.lsn < logger->written_lsn.lsn); // the fsynced_lsn was less than lsn, but not less than the written lsn?
r=write_it(logger->fd, logger->buf, logger->n_in_buf); r = toku_os_fsync_function(logger->fd);
logger->n_in_buf=0; if (r!=0) { r = errno; goto panic; }
r=close_and_open_logfile(logger); if (r!=0) goto panic; logger->fsynced_lsn = logger->written_lsn;
} }
} }
r=write_it(logger->fd, logger->buf, logger->n_in_buf); r = ml_unlock(&logger->output_lock);
if (r!=logger->n_in_buf) { r=errno; goto panic; } if (r!=0) goto panic;
logger->n_in_buf=0;
r=ml_unlock(&logger->input_lock); if (r!=0) goto panic2;
if (do_fsync) {
if (logger->write_log_files) {
r = toku_os_fsync_function(logger->fd); assert(r == 0);
} }
logger->fsynced_lsn = logger->written_lsn; if (have_input_lock) {
r = ml_unlock(&logger->input_lock);
if (r!=0) goto panic2;
} }
if ( logger->write_log_files ) if ( logger->write_log_files )
toku_logfilemgr_update_last_lsn(logger->logfilemgr, logger->written_lsn); toku_logfilemgr_update_last_lsn(logger->logfilemgr, logger->written_lsn);
...@@ -486,13 +498,55 @@ static int do_write (TOKULOGGER logger, int do_fsync) { ...@@ -486,13 +498,55 @@ static int do_write (TOKULOGGER logger, int do_fsync) {
return r; return r;
} }
static int
toku_logger_write_buffer (TOKULOGGER logger, int do_fsync)
// Entry: Holds both locks.
// Exit: Holds only the output lock.
// Effect: Write the buffers to the output. If DO_FSYNC is true, then fsync.
{
int r;
{
struct logbuf tmp = logger->inbuf;
logger->inbuf = logger->outbuf;
logger->outbuf = tmp;
assert(logger->inbuf.n_in_buf == 0);
}
r = ml_unlock(&logger->input_lock);
if (r!=0)
goto panic;
if (logger->outbuf.n_in_buf > 0) {
r = write_it(logger->fd, logger->outbuf.buf, logger->outbuf.n_in_buf);
if (r != logger->outbuf.n_in_buf) {
r = errno;
goto panic;
}
assert(logger->outbuf.max_lsn_in_buf.lsn > logger->written_lsn.lsn); // since there is something in the buffer, its LSN must be bigger than what's previously written.
logger->written_lsn = logger->outbuf.max_lsn_in_buf;
logger->n_in_file += logger->outbuf.n_in_buf;
logger->outbuf.n_in_buf = 0;
if (logger->n_in_file > logger->lg_max) {
r = close_and_open_logfile(logger); if (r!=0) goto panic;
logger->fsynced_lsn = logger->outbuf.max_lsn_in_buf;
} else if (do_fsync) {
r = toku_os_fsync_function(logger->fd);
logger->fsynced_lsn = logger->outbuf.max_lsn_in_buf;
} else {
/* nothing */
}
}
return 0;
panic:
toku_logger_panic(logger, r);
return r;
}
int toku_logger_restart(TOKULOGGER logger, LSN lastlsn) { int toku_logger_restart(TOKULOGGER logger, LSN lastlsn) {
int r; int r;
// flush out the log buffer // flush out the log buffer
r = ml_lock(&logger->output_lock); assert(r == 0); r = ml_lock(&logger->output_lock); assert(r == 0);
r = ml_lock(&logger->input_lock); assert(r == 0); r = ml_lock(&logger->input_lock); assert(r == 0);
r = do_write(logger, TRUE); assert(r == 0); r = toku_logger_write_buffer(logger, TRUE); assert(r == 0);
r = ml_unlock(&logger->output_lock); assert(r == 0); r = ml_unlock(&logger->output_lock); assert(r == 0);
// close the log file // close the log file
......
...@@ -9,7 +9,6 @@ enum { TOKU_LOG_VERSION = 1 }; ...@@ -9,7 +9,6 @@ enum { TOKU_LOG_VERSION = 1 };
int toku_logger_create (TOKULOGGER *resultp); int toku_logger_create (TOKULOGGER *resultp);
int toku_logger_open (const char *directory, TOKULOGGER logger); int toku_logger_open (const char *directory, TOKULOGGER logger);
int toku_logger_log_bytes (TOKULOGGER logger, struct logbytes *bytes, int do_fsync);
int toku_logger_shutdown(TOKULOGGER logger); int toku_logger_shutdown(TOKULOGGER logger);
int toku_logger_close(TOKULOGGER *loggerp); int toku_logger_close(TOKULOGGER *loggerp);
...@@ -90,4 +89,62 @@ void toku_logger_note_checkpoint(TOKULOGGER logger, LSN lsn); ...@@ -90,4 +89,62 @@ void toku_logger_note_checkpoint(TOKULOGGER logger, LSN lsn);
TXNID toku_logger_get_oldest_living_xid(TOKULOGGER logger); TXNID toku_logger_get_oldest_living_xid(TOKULOGGER logger);
LSN toku_logger_get_oldest_living_lsn(TOKULOGGER logger); LSN toku_logger_get_oldest_living_lsn(TOKULOGGER logger);
int toku_logger_make_space_in_inbuf (TOKULOGGER logger, int n_bytes_needed);
int
toku_logger_write_inbuf (TOKULOGGER logger);
// Effect: Write the buffered data (from the inbuf) to a file. No fsync, however.
// As a side effect, the inbuf will be made empty.
// Return 0 on success, otherwise return an error number.
// Requires: The inbuf lock is currently held, and the outbuf lock is not held.
// Upon return, the inbuf lock will be held, and the outbuf lock is not held.
// However, no side effects should have been made to the logger. The lock was acquired simply to determine that the buffer will overflow if we try to put something into it.
// The inbuf lock will be released, so the operations before and after this function call will not be atomic.
// Rationale: When the buffer becomes nearly full, call this function so that more can be put in.
// Implementation note: Since the output lock is acquired first, we must release the input lock, and then grab both in the right order.
int
toku_logger_maybe_fsync (TOKULOGGER logger, LSN lsn, int do_fsync);
// Effect: If fsync is nonzero, then make sure that the log is flushed and synced at least up to lsn.
// Entry: Holds input lock.
// Exit: Holds no locks.
// Discussion: How does the logger work:
// The logger has two buffers: an inbuf and an outbuf.
// There are two locks, called the inlock, and the outlock. To write, both locks must be held, and the outlock is acquired first.
// Roughly speaking, the inbuf is used to accumulate logged data, and the outbuf is used to write to disk.
// When something is to be logged we do the following:
// acquire the inlock.
// Make sure there is space in the inbuf for the logentry. (We know the size of the logentry in advance):
// if the inbuf doesn't have enough space then
// release the inlock
// acquire the outlock
// acquire the inlock
// it's possible that some other thread made space.
// if there still isn't space
// swap the inbuf and the outbuf
// release the inlock
// write the outbuf
// acquire the inlock
// release the outlock
// if the inbuf is still too small, then increase the size of the inbuf
// Increment the LSN and fill the inbuf.
// If fsync is required then
// release the inlock
// acquire the outlock
// acquire the inlock
// if the LSN has been flushed and fsynced (if so we are done. Some other thread did the flush.)
// release the locks
// if the LSN has been flushed but not fsynced up to the LSN:
// release the inlock
// fsync
// release the outlock
// otherwise:
// swap the outbuf and the inbuf
// release the inlock
// write the outbuf
// fsync
// release the outlock
#endif #endif
...@@ -67,16 +67,6 @@ void toku_rollback_txn_close (TOKUTXN txn) { ...@@ -67,16 +67,6 @@ void toku_rollback_txn_close (TOKUTXN txn) {
return; return;
} }
// wbuf points into logbytes
int toku_logger_finish (TOKULOGGER logger, struct logbytes *logbytes, struct wbuf *wbuf, int do_fsync) {
if (logger->is_panicked) return EINVAL;
u_int32_t checksum = x1764_finish(&wbuf->checksum);
wbuf_int(wbuf, checksum);
wbuf_int(wbuf, 4+wbuf->ndone);
logbytes->nbytes=wbuf->ndone;
return toku_logger_log_bytes(logger, logbytes, do_fsync);
}
void* toku_malloc_in_rollback(TOKUTXN txn, size_t size) { void* toku_malloc_in_rollback(TOKUTXN txn, size_t size) {
return malloc_in_memarena(txn->rollentry_arena, size); return malloc_in_memarena(txn->rollentry_arena, size);
} }
......
...@@ -17,23 +17,20 @@ test_main (int argc __attribute__((__unused__)), ...@@ -17,23 +17,20 @@ test_main (int argc __attribute__((__unused__)),
system(rmrf); system(rmrf);
r = toku_os_mkdir(dname, S_IRWXU); assert(r==0); r = toku_os_mkdir(dname, S_IRWXU); assert(r==0);
TOKULOGGER logger; TOKULOGGER logger;
r = toku_logger_create(&logger); r = toku_logger_create(&logger); assert(r == 0);
assert(r == 0); r = toku_logger_open(dname, logger); assert(r == 0);
r = toku_logger_open(dname, logger);
assert(r == 0);
{ {
struct logbytes *b = MALLOC_LOGBYTES(5); r = ml_lock(&logger->input_lock); assert(r == 0);
b->nbytes=5; r = toku_logger_make_space_in_inbuf(logger, 5); assert(r == 0);
memcpy(b->bytes, "a1234", 5); snprintf(logger->inbuf.buf+logger->inbuf.n_in_buf, 5, "a1234");
b->lsn=(LSN){0}; logger->inbuf.n_in_buf+=5;
r = ml_lock(&logger->input_lock); logger->lsn.lsn++;
assert(r==0); logger->inbuf.max_lsn_in_buf = logger->lsn;
r = toku_logger_log_bytes(logger, b, 0); r = ml_unlock(&logger->input_lock); assert(r == 0);
assert(r==0);
assert(logger->input_lock.is_locked==0);
} }
r = toku_logger_close(&logger);
assert(r == 0); r = toku_logger_close(&logger); assert(r == 0);
{ {
toku_struct_stat statbuf; toku_struct_stat statbuf;
r = toku_stat(dname "/log000000000000.tokulog", &statbuf); r = toku_stat(dname "/log000000000000.tokulog", &statbuf);
......
...@@ -31,16 +31,16 @@ test_main (int argc __attribute__((__unused__)), ...@@ -31,16 +31,16 @@ test_main (int argc __attribute__((__unused__)),
assert(r == 0); assert(r == 0);
int i; int i;
for (i=0; i<1000; i++) { for (i=0; i<1000; i++) {
r = ml_lock(&logger->input_lock); r = ml_lock(&logger->input_lock); assert(r==0);
assert(r==0);
int ilen=3+random()%5; int ilen=3+random()%5;
struct logbytes *b = MALLOC_LOGBYTES(ilen+1); r = toku_logger_make_space_in_inbuf(logger, ilen+1); assert(r==0);
b->nbytes=ilen+1; snprintf(logger->inbuf.buf+logger->inbuf.n_in_buf, ilen+1, "a%0*d ", (int)ilen, i);
snprintf(b->bytes, ilen+1, "a%0*d ", (int)ilen, i); // skip the trailing nul logger->inbuf.n_in_buf+=(ilen+1);
b->lsn=(LSN){23+i}; logger->lsn.lsn++;
r = toku_logger_log_bytes(logger, b, 0); logger->inbuf.max_lsn_in_buf = logger->lsn;
assert(r==0); r = ml_unlock(&logger->input_lock); assert(r == 0);
r = toku_logger_fsync(logger); assert(r == 0);
} }
r = toku_logger_close(&logger); r = toku_logger_close(&logger);
assert(r == 0); assert(r == 0);
...@@ -56,7 +56,7 @@ test_main (int argc __attribute__((__unused__)), ...@@ -56,7 +56,7 @@ test_main (int argc __attribute__((__unused__)),
toku_struct_stat statbuf; toku_struct_stat statbuf;
r = toku_stat(fname, &statbuf); r = toku_stat(fname, &statbuf);
assert(r==0); assert(r==0);
assert(statbuf.st_size<=LSIZE); assert(statbuf.st_size<=LSIZE+10);
} }
r = closedir(dir); r = closedir(dir);
assert(r==0); assert(r==0);
......
...@@ -31,28 +31,24 @@ test_main (int argc __attribute__((__unused__)), ...@@ -31,28 +31,24 @@ test_main (int argc __attribute__((__unused__)),
assert(r == 0); assert(r == 0);
{ {
r = ml_lock(&logger->input_lock); r = ml_lock(&logger->input_lock); assert(r==0);
assert(r==0);
int lsize=LSIZE-12-2; int lsize=LSIZE-12-2;
struct logbytes *b = MALLOC_LOGBYTES(lsize); r = toku_logger_make_space_in_inbuf(logger, lsize); assert(r==0);
b->nbytes=lsize; snprintf(logger->inbuf.buf+logger->inbuf.n_in_buf, lsize, "a%*d", lsize-1, 0);
snprintf(b->bytes, lsize, "a%*d", LSIZE-12-2, 0); logger->inbuf.n_in_buf += lsize;
b->lsn=(LSN){23}; logger->lsn.lsn++;
r = toku_logger_log_bytes(logger, b, 0); logger->inbuf.max_lsn_in_buf = logger->lsn;
assert(r==0); r = ml_unlock(&logger->input_lock); assert(r == 0);
} }
{ {
r = ml_lock(&logger->input_lock); r = ml_lock(&logger->input_lock); assert(r==0);
assert(r==0); r = toku_logger_make_space_in_inbuf(logger, 2); assert(r==0);
memcpy(logger->inbuf.buf+logger->inbuf.n_in_buf, "b1", 2);
struct logbytes *b = MALLOC_LOGBYTES(2); logger->inbuf.n_in_buf += 2;
b->lsn=(LSN){24}; logger->lsn.lsn++;
b->nbytes=2; logger->inbuf.max_lsn_in_buf = logger->lsn;
memcpy(b->bytes, "b1", 2); r = ml_unlock(&logger->input_lock); assert(r == 0);
r = toku_logger_log_bytes(logger, b, 0);
assert(r==0);
} }
r = toku_logger_close(&logger); r = toku_logger_close(&logger);
......
...@@ -17,8 +17,8 @@ TOKULOGGER logger[NUM_LOGGERS]; ...@@ -17,8 +17,8 @@ TOKULOGGER logger[NUM_LOGGERS];
static void setup_logger(int which) { static void setup_logger(int which) {
char dnamewhich[200]; char dnamewhich[200];
int r; int r;
sprintf(dnamewhich, "%s_%d", dname, which); snprintf(dnamewhich, sizeof(dnamewhich), "%s_%d", dname, which);
r = toku_os_mkdir(dnamewhich, S_IRWXU); assert(r==0); r = toku_os_mkdir(dnamewhich, S_IRWXU); if (r!=0) printf("file %s error (%d) %s\n", dnamewhich, errno, strerror(errno)); assert(r==0);
r = toku_logger_create(&logger[which]); r = toku_logger_create(&logger[which]);
assert(r == 0); assert(r == 0);
r = toku_logger_set_lg_max(logger[which], LSIZE); r = toku_logger_set_lg_max(logger[which], LSIZE);
...@@ -34,28 +34,24 @@ static void setup_logger(int which) { ...@@ -34,28 +34,24 @@ static void setup_logger(int which) {
static void play_with_logger(int which) { static void play_with_logger(int which) {
int r; int r;
{ {
r = ml_lock(&logger[which]->input_lock); r = ml_lock(&logger[which]->input_lock); assert(r==0);
assert(r==0);
int lsize=LSIZE-12-2; int lsize=LSIZE-12-2;
struct logbytes *b = MALLOC_LOGBYTES(lsize); r = toku_logger_make_space_in_inbuf(logger[which], lsize); assert(r==0);
b->nbytes=lsize; snprintf(logger[which]->inbuf.buf+logger[which]->inbuf.n_in_buf, lsize, "a%*d", lsize-1, 0);
snprintf(b->bytes, lsize, "a%*d", LSIZE-12-2, 0); logger[which]->inbuf.n_in_buf += lsize;
b->lsn=(LSN){23}; logger[which]->lsn.lsn++;
r = toku_logger_log_bytes(logger[which], b, 0); logger[which]->inbuf.max_lsn_in_buf = logger[which]->lsn;
assert(r==0); r = ml_unlock(&logger[which]->input_lock); assert(r == 0);
} }
{ {
r = ml_lock(&logger[which]->input_lock); r = ml_lock(&logger[which]->input_lock); assert(r==0);
assert(r==0); r = toku_logger_make_space_in_inbuf(logger[which], 2); assert(r==0);
memcpy(logger[which]->inbuf.buf+logger[which]->inbuf.n_in_buf, "b1", 2);
struct logbytes *b = MALLOC_LOGBYTES(2); logger[which]->inbuf.n_in_buf += 2;
b->lsn=(LSN){24}; logger[which]->lsn.lsn++;
b->nbytes=2; logger[which]->inbuf.max_lsn_in_buf = logger[which]->lsn;
memcpy(b->bytes, "b1", 2); r = ml_unlock(&logger[which]->input_lock); assert(r == 0);
r = toku_logger_log_bytes(logger[which], b, 0);
assert(r==0);
} }
} }
......
...@@ -24,17 +24,25 @@ struct wbuf { ...@@ -24,17 +24,25 @@ struct wbuf {
struct x1764 checksum; // The checksumx state struct x1764 checksum; // The checksumx state
}; };
static inline void wbuf_init (struct wbuf *w, void *buf, DISKOFF size) { static inline void wbuf_nocrc_init (struct wbuf *w, void *buf, DISKOFF size) {
w->buf=buf; w->buf=buf;
w->size=size; w->size=size;
w->ndone=0; w->ndone=0;
}
static inline void wbuf_init (struct wbuf *w, void *buf, DISKOFF size) {
wbuf_nocrc_init(w, buf, size);
x1764_init(&w->checksum); x1764_init(&w->checksum);
} }
/* Write a character. */ /* Write a character. */
static inline void wbuf_char (struct wbuf *w, unsigned char ch) { static inline void wbuf_nocrc_char (struct wbuf *w, unsigned char ch) {
assert(w->ndone<w->size); assert(w->ndone<w->size);
w->buf[w->ndone++]=ch; w->buf[w->ndone++]=ch;
}
static inline void wbuf_char (struct wbuf *w, unsigned char ch) {
wbuf_nocrc_char (w, ch);
x1764_add(&w->checksum, &w->buf[w->ndone-1], 1); x1764_add(&w->checksum, &w->buf[w->ndone-1], 1);
} }
...@@ -47,12 +55,12 @@ static void wbuf_network_int (struct wbuf *w, int32_t i) { ...@@ -47,12 +55,12 @@ static void wbuf_network_int (struct wbuf *w, int32_t i) {
w->ndone += 4; w->ndone += 4;
} }
static void wbuf_int (struct wbuf *w, int32_t i) { static inline void wbuf_nocrc_int (struct wbuf *w, int32_t i) {
#if 0 #if 0
wbuf_char(w, i>>24); wbuf_nocrc_char(w, i>>24);
wbuf_char(w, i>>16); wbuf_nocrc_char(w, i>>16);
wbuf_char(w, i>>8); wbuf_nocrc_char(w, i>>8);
wbuf_char(w, i>>0); wbuf_nocrc_char(w, i>>0);
#else #else
assert(w->ndone + 4 <= w->size); assert(w->ndone + 4 <= w->size);
#if 0 #if 0
...@@ -63,42 +71,74 @@ static void wbuf_int (struct wbuf *w, int32_t i) { ...@@ -63,42 +71,74 @@ static void wbuf_int (struct wbuf *w, int32_t i) {
#else #else
*(u_int32_t*)(&w->buf[w->ndone]) = toku_htod32(i); *(u_int32_t*)(&w->buf[w->ndone]) = toku_htod32(i);
#endif #endif
x1764_add(&w->checksum, &w->buf[w->ndone], 4);
w->ndone += 4; w->ndone += 4;
#endif #endif
} }
static void wbuf_uint (struct wbuf *w, u_int32_t i) { static inline void wbuf_int (struct wbuf *w, int32_t i) {
wbuf_nocrc_int(w, i);
x1764_add(&w->checksum, &w->buf[w->ndone-4], 4);
}
static inline void wbuf_nocrc_uint (struct wbuf *w, u_int32_t i) {
wbuf_nocrc_int(w, (int32_t)i);
}
static inline void wbuf_uint (struct wbuf *w, u_int32_t i) {
wbuf_int(w, (int32_t)i); wbuf_int(w, (int32_t)i);
} }
static inline void wbuf_literal_bytes(struct wbuf *w, bytevec bytes_bv, u_int32_t nbytes) { static inline void wbuf_nocrc_literal_bytes(struct wbuf *w, bytevec bytes_bv, u_int32_t nbytes) {
const unsigned char *bytes=bytes_bv; const unsigned char *bytes=bytes_bv;
#if 0 #if 0
{ int i; for (i=0; i<nbytes; i++) wbuf_char(w, bytes[i]); } { int i; for (i=0; i<nbytes; i++) wbuf_nocrc_char(w, bytes[i]); }
#else #else
assert(w->ndone + nbytes <= w->size); assert(w->ndone + nbytes <= w->size);
memcpy(w->buf + w->ndone, bytes, (size_t)nbytes); memcpy(w->buf + w->ndone, bytes, (size_t)nbytes);
x1764_add(&w->checksum, &w->buf[w->ndone], nbytes);
w->ndone += nbytes; w->ndone += nbytes;
#endif #endif
}
static inline void wbuf_literal_bytes(struct wbuf *w, bytevec bytes_bv, u_int32_t nbytes) {
wbuf_nocrc_literal_bytes(w, bytes_bv, nbytes);
x1764_add(&w->checksum, &w->buf[w->ndone-nbytes], nbytes);
}
static void wbuf_nocrc_bytes (struct wbuf *w, bytevec bytes_bv, u_int32_t nbytes) {
wbuf_nocrc_uint(w, nbytes);
wbuf_nocrc_literal_bytes(w, bytes_bv, nbytes);
} }
static void wbuf_bytes (struct wbuf *w, bytevec bytes_bv, u_int32_t nbytes) { static void wbuf_bytes (struct wbuf *w, bytevec bytes_bv, u_int32_t nbytes) {
wbuf_uint(w, nbytes); wbuf_uint(w, nbytes);
wbuf_literal_bytes(w, bytes_bv, nbytes); wbuf_literal_bytes(w, bytes_bv, nbytes);
} }
static void wbuf_nocrc_ulonglong (struct wbuf *w, u_int64_t ull) {
wbuf_nocrc_uint(w, (u_int32_t)(ull>>32));
wbuf_nocrc_uint(w, (u_int32_t)(ull&0xFFFFFFFF));
}
static void wbuf_ulonglong (struct wbuf *w, u_int64_t ull) { static void wbuf_ulonglong (struct wbuf *w, u_int64_t ull) {
wbuf_uint(w, (u_int32_t)(ull>>32)); wbuf_uint(w, (u_int32_t)(ull>>32));
wbuf_uint(w, (u_int32_t)(ull&0xFFFFFFFF)); wbuf_uint(w, (u_int32_t)(ull&0xFFFFFFFF));
} }
static inline void wbuf_nocrc_u_int64_t(struct wbuf *w, u_int64_t ull) {
wbuf_nocrc_ulonglong(w, ull);
}
static inline void wbuf_u_int64_t(struct wbuf *w, u_int64_t ull) { static inline void wbuf_u_int64_t(struct wbuf *w, u_int64_t ull) {
wbuf_ulonglong(w, ull); wbuf_ulonglong(w, ull);
} }
static inline void wbuf_nocrc_BYTESTRING (struct wbuf *w, BYTESTRING v) {
wbuf_nocrc_bytes(w, v.data, v.len);
}
static inline void wbuf_BYTESTRING (struct wbuf *w, BYTESTRING v) { static inline void wbuf_BYTESTRING (struct wbuf *w, BYTESTRING v) {
wbuf_bytes(w, v.data, v.len); wbuf_bytes(w, v.data, v.len);
} }
...@@ -107,6 +147,10 @@ static inline void wbuf_u_int8_t (struct wbuf *w, u_int8_t v) { ...@@ -107,6 +147,10 @@ static inline void wbuf_u_int8_t (struct wbuf *w, u_int8_t v) {
wbuf_char(w, v); wbuf_char(w, v);
} }
static inline void wbuf_nocrc_u_int32_t (struct wbuf *w, u_int32_t v) {
wbuf_nocrc_uint(w, v);
}
static inline void wbuf_u_int32_t (struct wbuf *w, u_int32_t v) { static inline void wbuf_u_int32_t (struct wbuf *w, u_int32_t v) {
wbuf_uint(w, v); wbuf_uint(w, v);
} }
...@@ -119,15 +163,26 @@ static inline void wbuf_BLOCKNUM (struct wbuf *w, BLOCKNUM b) { ...@@ -119,15 +163,26 @@ static inline void wbuf_BLOCKNUM (struct wbuf *w, BLOCKNUM b) {
} }
static inline void wbuf_nocrc_TXNID (struct wbuf *w, TXNID tid) {
wbuf_nocrc_ulonglong(w, tid);
}
static inline void wbuf_TXNID (struct wbuf *w, TXNID tid) { static inline void wbuf_TXNID (struct wbuf *w, TXNID tid) {
wbuf_ulonglong(w, tid); wbuf_ulonglong(w, tid);
} }
static inline void wbuf_nocrc_LSN (struct wbuf *w, LSN lsn) {
wbuf_nocrc_ulonglong(w, lsn.lsn);
}
static inline void wbuf_LSN (struct wbuf *w, LSN lsn) { static inline void wbuf_LSN (struct wbuf *w, LSN lsn) {
wbuf_ulonglong(w, lsn.lsn); wbuf_ulonglong(w, lsn.lsn);
} }
static inline void wbuf_nocrc_FILENUM (struct wbuf *w, FILENUM fileid) {
wbuf_nocrc_uint(w, fileid.fileid);
}
static inline void wbuf_FILENUM (struct wbuf *w, FILENUM fileid) { static inline void wbuf_FILENUM (struct wbuf *w, FILENUM fileid) {
wbuf_uint(w, fileid.fileid); wbuf_uint(w, fileid.fileid);
} }
......
...@@ -178,7 +178,8 @@ DEPEND_COMPILE += $(wildcard $(TOKUROOT)$(OS_CHOICE)/*.h) ...@@ -178,7 +178,8 @@ DEPEND_COMPILE += $(wildcard $(TOKUROOT)$(OS_CHOICE)/*.h)
SUPPRESSIONS=no SUPPRESSIONS=no
#Tools #Tools
VGRIND=valgrind --quiet --error-exitcode=1 --leak-check=full --show-reachable=yes \ VALGRIND=valgrind
VGRIND=$(VALGRIND) --quiet --error-exitcode=1 --leak-check=full --show-reachable=yes \
--suppressions=$(TOKUROOT)newbrt/valgrind.suppressions \ --suppressions=$(TOKUROOT)newbrt/valgrind.suppressions \
--suppressions=$(TOKUROOT)src/tests/bdb.suppressions \ --suppressions=$(TOKUROOT)src/tests/bdb.suppressions \
--gen-suppressions=$(SUPPRESSIONS) --gen-suppressions=$(SUPPRESSIONS)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment