Commit 22b6cb16 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

merge 1967.mtm2 to main refs[t:1967]

git-svn-id: file:///svn/toku/tokudb@15205 c7de825b-a66e-492c-adef-691d508d4ae1
parent b9fd911c
...@@ -61,6 +61,7 @@ double compressibility = -1; // -1 means make it very compressible. 1 means use ...@@ -61,6 +61,7 @@ double compressibility = -1; // -1 means make it very compressible. 1 means use
int do_append = 0; int do_append = 0;
int do_checkpoint_period = 0; int do_checkpoint_period = 0;
u_int32_t checkpoint_period = 0; u_int32_t checkpoint_period = 0;
static const char *log_dir = NULL;
static void do_prelock(DB* db, DB_TXN* txn) { static void do_prelock(DB* db, DB_TXN* txn) {
if (prelock) { if (prelock) {
...@@ -119,11 +120,15 @@ static void benchmark_setup (void) { ...@@ -119,11 +120,15 @@ static void benchmark_setup (void) {
if (r != 0) if (r != 0)
printf("WARNING: set_cachesize %d\n", r); printf("WARNING: set_cachesize %d\n", r);
} }
{
r = dbenv->open(dbenv, dbdir, env_open_flags, S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH); if (log_dir) {
r = dbenv->set_lg_dir(dbenv, log_dir);
assert(r == 0); assert(r == 0);
} }
r = dbenv->open(dbenv, dbdir, env_open_flags, S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
assert(r == 0);
#if defined(TOKUDB) #if defined(TOKUDB)
if (do_checkpoint_period) { if (do_checkpoint_period) {
r = dbenv->checkpointing_set_period(dbenv, checkpoint_period); r = dbenv->checkpointing_set_period(dbenv, checkpoint_period);
...@@ -364,12 +369,13 @@ static int print_usage (const char *argv0) { ...@@ -364,12 +369,13 @@ static int print_usage (const char *argv0) {
fprintf(stderr, " --prelockflag Prelock the database and send the DB_PRELOCKED_WRITE flag.\n"); fprintf(stderr, " --prelockflag Prelock the database and send the DB_PRELOCKED_WRITE flag.\n");
fprintf(stderr, " --abort Abort the singlex after the transaction is over. (Requires --singlex.)\n"); fprintf(stderr, " --abort Abort the singlex after the transaction is over. (Requires --singlex.)\n");
fprintf(stderr, " --nolog If transactions are used, then don't write the recovery log\n"); fprintf(stderr, " --nolog If transactions are used, then don't write the recovery log\n");
fprintf(stderr, " --log_dir LOGDIR Put the logs in LOGDIR\n");
fprintf(stderr, " --env DIR\n");
fprintf(stderr, " --periter N how many insertions per iteration (default=%d)\n", DEFAULT_ITEMS_TO_INSERT_PER_ITERATION); fprintf(stderr, " --periter N how many insertions per iteration (default=%d)\n", DEFAULT_ITEMS_TO_INSERT_PER_ITERATION);
fprintf(stderr, " --DB_INIT_TXN (1|0) turn on or off the DB_INIT_TXN env_open_flag\n"); fprintf(stderr, " --DB_INIT_TXN (1|0) turn on or off the DB_INIT_TXN env_open_flag\n");
fprintf(stderr, " --DB_INIT_LOG (1|0) turn on or off the DB_INIT_LOG env_open_flag\n"); fprintf(stderr, " --DB_INIT_LOG (1|0) turn on or off the DB_INIT_LOG env_open_flag\n");
fprintf(stderr, " --DB_INIT_LOCK (1|0) turn on or off the DB_INIT_LOCK env_open_flag\n"); fprintf(stderr, " --DB_INIT_LOCK (1|0) turn on or off the DB_INIT_LOCK env_open_flag\n");
fprintf(stderr, " --1514 do a point query for something not there at end. See #1514. (Requires --norandom)\n"); fprintf(stderr, " --1514 do a point query for something not there at end. See #1514. (Requires --norandom)\n");
fprintf(stderr, " --env DIR\n");
fprintf(stderr, " --append append to an existing file\n"); fprintf(stderr, " --append append to an existing file\n");
fprintf(stderr, " --checkpoint-period %"PRIu32" checkpoint period\n", checkpoint_period); fprintf(stderr, " --checkpoint-period %"PRIu32" checkpoint period\n", checkpoint_period);
fprintf(stderr, " n_iterations how many iterations (default %lld)\n", default_n_items/DEFAULT_ITEMS_TO_INSERT_PER_ITERATION); fprintf(stderr, " n_iterations how many iterations (default %lld)\n", default_n_items/DEFAULT_ITEMS_TO_INSERT_PER_ITERATION);
...@@ -515,6 +521,9 @@ int main (int argc, const char *argv[]) { ...@@ -515,6 +521,9 @@ int main (int argc, const char *argv[]) {
put_flags = DB_NOOVERWRITE; put_flags = DB_NOOVERWRITE;
else else
put_flags = DB_YESOVERWRITE; put_flags = DB_YESOVERWRITE;
} else if (strcmp(arg, "--log_dir") == 0) {
if (i+1 >= argc) return print_usage(argv[0]);
log_dir = argv[++i];
} else { } else {
return print_usage(argv[0]); return print_usage(argv[0]);
} }
......
...@@ -25,6 +25,7 @@ static int do_mysql = 0; ...@@ -25,6 +25,7 @@ static int do_mysql = 0;
static u_int64_t start_range = 0, end_range = 0; static u_int64_t start_range = 0, end_range = 0;
static int n_experiments = 2; static int n_experiments = 2;
static int verbose = 0; static int verbose = 0;
static const char *log_dir = NULL;
static int print_usage (const char *argv0) { static int print_usage (const char *argv0) {
fprintf(stderr, "Usage:\n%s [--verify-lwc | --lwc | --nohwc] [--prelock] [--prelockflag] [--prelockwriteflag] [--env DIR]\n", argv0); fprintf(stderr, "Usage:\n%s [--verify-lwc | --lwc | --nohwc] [--prelock] [--prelockflag] [--prelockwriteflag] [--env DIR]\n", argv0);
...@@ -40,6 +41,7 @@ static int print_usage (const char *argv0) { ...@@ -40,6 +41,7 @@ static int print_usage (const char *argv0) {
fprintf(stderr, " --cachesize <n> set the env cachesize to <n>\n"); fprintf(stderr, " --cachesize <n> set the env cachesize to <n>\n");
fprintf(stderr, " --mysql compare keys that are mysql big int not null types\n"); fprintf(stderr, " --mysql compare keys that are mysql big int not null types\n");
fprintf(stderr, " --env DIR put db files in DIR instead of default\n"); fprintf(stderr, " --env DIR put db files in DIR instead of default\n");
fprintf(stderr, " --log_dir LOGDIR put the logs in LOGDIR\n");
return 1; return 1;
} }
...@@ -103,6 +105,10 @@ static void parse_args (int argc, const char *argv[]) { ...@@ -103,6 +105,10 @@ static void parse_args (int argc, const char *argv[]) {
argc--; argv++; argc--; argv++;
if (argc==0) exit(print_usage(pname)); if (argc==0) exit(print_usage(pname));
dbdir = *argv; dbdir = *argv;
} else if (strcmp(*argv, "--log_dir") == 0) {
argc--; argv++;
if (argc==0) exit(print_usage(pname));
log_dir = *argv;
} else if (strcmp(*argv, "--mysql") == 0) { } else if (strcmp(*argv, "--mysql") == 0) {
do_mysql = 1; do_mysql = 1;
} else if (strcmp(*argv, "--range") == 0 && argc > 2) { } else if (strcmp(*argv, "--range") == 0 && argc > 2) {
...@@ -155,6 +161,9 @@ static void scanscan_setup (void) { ...@@ -155,6 +161,9 @@ static void scanscan_setup (void) {
int r; int r;
r = db_env_create(&env, 0); assert(r==0); r = db_env_create(&env, 0); assert(r==0);
r = env->set_cachesize(env, 0, cachesize, 1); assert(r==0); r = env->set_cachesize(env, 0, cachesize, 1); assert(r==0);
if (log_dir) {
r = env->set_lg_dir(env, log_dir); assert(r==0);
}
double tstart = gettime(); double tstart = gettime();
r = env->open(env, dbdir, do_txns? env_open_flags_yesx : env_open_flags_nox, S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH); assert(r==0); r = env->open(env, dbdir, do_txns? env_open_flags_yesx : env_open_flags_nox, S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH); assert(r==0);
double tend = gettime(); double tend = gettime();
......
...@@ -75,5 +75,3 @@ typedef struct { ...@@ -75,5 +75,3 @@ typedef struct {
void toku_checkpoint_get_status(CHECKPOINT_STATUS stat); void toku_checkpoint_get_status(CHECKPOINT_STATUS stat);
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
// If the buf would overflow, then grab the file lock, swap file&buf, release buf lock, write the file, write the entry, release the file lock // If the buf would overflow, then grab the file lock, swap file&buf, release buf lock, write the file, write the entry, release the file lock
// else append to buf & release lock // else append to buf & release lock
#define LOGGER_BUF_SIZE (1<<24) #define LOGGER_MIN_BUF_SIZE (1<<24)
struct mylock { struct mylock {
toku_pthread_mutex_t lock; toku_pthread_mutex_t lock;
...@@ -50,12 +50,18 @@ static inline int ml_destroy(struct mylock *l) { ...@@ -50,12 +50,18 @@ static inline int ml_destroy(struct mylock *l) {
return toku_pthread_mutex_destroy(&l->lock); return toku_pthread_mutex_destroy(&l->lock);
} }
struct logbuf {
int n_in_buf;
int buf_size;
char *buf;
LSN max_lsn_in_buf;
};
struct tokulogger { struct tokulogger {
enum typ_tag tag; // must be first enum typ_tag tag; // must be first
struct mylock input_lock, output_lock; // acquired in that order struct mylock input_lock, output_lock; // acquired in that order
int is_open; BOOL is_open;
int is_panicked; BOOL is_panicked;
BOOL write_log_files; BOOL write_log_files;
BOOL trim_log_files; // for test purposes BOOL trim_log_files; // for test purposes
int panic_errno; int panic_errno;
...@@ -65,18 +71,17 @@ struct tokulogger { ...@@ -65,18 +71,17 @@ struct tokulogger {
int lg_max; // The size of the single file in the log. Default is 100MB in TokuDB int lg_max; // The size of the single file in the log. Default is 100MB in TokuDB
// To access these, you must have the input lock // To access these, you must have the input lock
struct logbytes *head,*tail;
LSN lsn; // the next available lsn LSN lsn; // the next available lsn
OMT live_txns; // a sorted tree. Old comment said should be a hashtable. Do we still want that? OMT live_txns; // a sorted tree. Old comment said should be a hashtable. Do we still want that?
int n_in_buf; struct logbuf inbuf; // data being accumulated for the write
// To access these, you must have the output lock // To access these, you must have the output lock
LSN written_lsn; // the last lsn written LSN written_lsn; // the last lsn written
LSN fsynced_lsn; // What is the LSN of the highest fsynced log entry LSN fsynced_lsn; // What is the LSN of the highest fsynced log entry
LSN checkpoint_lsn; // What is the LSN of the most recent completed checkpoint. LSN checkpoint_lsn; // What is the LSN of the most recent completed checkpoint.
long long next_log_file_number; long long next_log_file_number;
char buf[LOGGER_BUF_SIZE]; // used to marshall logbytes so we can use only a single write struct logbuf outbuf; // data being written to the file
int n_in_file; int n_in_file; // The amount of data in the current file
TOKULOGFILEMGR logfilemgr; TOKULOGFILEMGR logfilemgr;
...@@ -112,8 +117,6 @@ struct tokutxn { ...@@ -112,8 +117,6 @@ struct tokutxn {
XIDS xids; //Represents the xid list XIDS xids; //Represents the xid list
}; };
int toku_logger_finish (TOKULOGGER logger, struct logbytes *logbytes, struct wbuf *wbuf, int do_fsync);
static inline int toku_logsizeof_u_int8_t (u_int32_t v __attribute__((__unused__))) { static inline int toku_logsizeof_u_int8_t (u_int32_t v __attribute__((__unused__))) {
return 1; return 1;
} }
......
...@@ -14,15 +14,6 @@ ...@@ -14,15 +14,6 @@
#include "bread.h" #include "bread.h"
#include "x1764.h" #include "x1764.h"
struct logbytes {
struct logbytes *next;
int nbytes;
LSN lsn;
char bytes[1];
};
#define MALLOC_LOGBYTES(n) toku_malloc(sizeof(struct logbytes)+n -1)
typedef void(*voidfp)(void); typedef void(*voidfp)(void);
typedef void(*YIELDF)(voidfp, void*); typedef void(*YIELDF)(voidfp, void*);
struct roll_entry; struct roll_entry;
......
...@@ -295,12 +295,14 @@ generate_log_writer (void) { ...@@ -295,12 +295,14 @@ generate_log_writer (void) {
DO_FIELDS(ft, lt, fprintf2(cf, hf, ", %s %s", ft->type, ft->name)); DO_FIELDS(ft, lt, fprintf2(cf, hf, ", %s %s", ft->type, ft->name));
fprintf(hf, ");\n"); fprintf(hf, ");\n");
fprintf(cf, ") {\n"); fprintf(cf, ") {\n");
fprintf(cf, " int r = 0;\n");
fprintf(cf, " if (logger==0) return 0;\n"); fprintf(cf, " if (logger==0) return 0;\n");
fprintf(cf, " if (!logger->write_log_files) {\n"); fprintf(cf, " if (!logger->write_log_files) {\n");
fprintf(cf, " ml_lock(&logger->input_lock);\n"); fprintf(cf, " ml_lock(&logger->input_lock);\n");
fprintf(cf, " logger->lsn.lsn += toku_lsn_increment;\n"); fprintf(cf, " logger->lsn.lsn += toku_lsn_increment;\n");
fprintf(cf, " if (lsnp) *lsnp=logger->lsn;\n"); fprintf(cf, " if (lsnp) *lsnp=logger->lsn;\n");
fprintf(cf, " ml_unlock(&logger->input_lock);\n"); fprintf(cf, " r = ml_unlock(&logger->input_lock);\n");
fprintf(cf, " if (r!=0) goto panic;\n");
fprintf(cf, " return 0;\n"); fprintf(cf, " return 0;\n");
fprintf(cf, " }\n"); fprintf(cf, " }\n");
fprintf(cf, " const unsigned int buflen= (+4 // len at the beginning\n"); fprintf(cf, " const unsigned int buflen= (+4 // len at the beginning\n");
...@@ -311,23 +313,31 @@ generate_log_writer (void) { ...@@ -311,23 +313,31 @@ generate_log_writer (void) {
fprintf(cf, " +8 // crc + len\n"); fprintf(cf, " +8 // crc + len\n");
fprintf(cf, " );\n"); fprintf(cf, " );\n");
fprintf(cf, " struct wbuf wbuf;\n"); fprintf(cf, " struct wbuf wbuf;\n");
fprintf(cf, " struct logbytes *lbytes = MALLOC_LOGBYTES(buflen);\n"); fprintf(cf, " r = ml_lock(&logger->input_lock);\n");
fprintf(cf, " if (lbytes==0) return errno;\n"); fprintf(cf, " if (r!=0) goto panic;\n");
fprintf(cf, " wbuf_init(&wbuf, &lbytes->bytes[0], buflen);\n"); fprintf(cf, " r = toku_logger_make_space_in_inbuf(logger, buflen);\n");
fprintf(cf, " wbuf_int(&wbuf, buflen);\n"); fprintf(cf, " if (r!=0) goto panic;\n");
fprintf(cf, " wbuf_char(&wbuf, '%c');\n", (char)(0xff&lt->command_and_flags)); fprintf(cf, " wbuf_nocrc_init(&wbuf, logger->inbuf.buf+logger->inbuf.n_in_buf, buflen);\n");
fprintf(cf, " ml_lock(&logger->input_lock);\n"); fprintf(cf, " wbuf_nocrc_int(&wbuf, buflen);\n");
fprintf(cf, " wbuf_nocrc_char(&wbuf, '%c');\n", (char)(0xff&lt->command_and_flags));
fprintf(cf, " logger->lsn.lsn += toku_lsn_increment;\n"); fprintf(cf, " logger->lsn.lsn += toku_lsn_increment;\n");
fprintf(cf, " LSN lsn = logger->lsn;\n"); fprintf(cf, " LSN lsn =logger->lsn;\n");
fprintf(cf, " wbuf_LSN(&wbuf, lsn);\n"); fprintf(cf, " logger->inbuf.max_lsn_in_buf = lsn;\n");
fprintf(cf, " lbytes->lsn = lsn;\n"); fprintf(cf, " wbuf_nocrc_LSN(&wbuf, lsn);\n");
fprintf(cf, " if (lsnp) *lsnp=logger->lsn;\n"); fprintf(cf, " if (lsnp) *lsnp=lsn;\n");
DO_FIELDS(ft, lt, DO_FIELDS(ft, lt,
if (strcmp(ft->name, "timestamp") == 0) if (strcmp(ft->name, "timestamp") == 0)
fprintf(cf, " if (timestamp == 0) timestamp = toku_get_timestamp();\n"); fprintf(cf, " if (timestamp == 0) timestamp = toku_get_timestamp();\n");
fprintf(cf, " wbuf_%s(&wbuf, %s);\n", ft->type, ft->name)); fprintf(cf, " wbuf_nocrc_%s(&wbuf, %s);\n", ft->type, ft->name));
fprintf(cf, " int r= toku_logger_finish(logger, lbytes, &wbuf, do_fsync);\n"); fprintf(cf, " wbuf_nocrc_int(&wbuf, x1764_memory(wbuf.buf, wbuf.ndone));\n");
fprintf(cf, " wbuf_nocrc_int(&wbuf, buflen);\n");
fprintf(cf, " assert(wbuf.ndone==buflen);\n"); fprintf(cf, " assert(wbuf.ndone==buflen);\n");
fprintf(cf, " logger->inbuf.n_in_buf += buflen;\n");
fprintf(cf, " r = toku_logger_maybe_fsync(logger, lsn, do_fsync);\n");
fprintf(cf, " if (r!=0) goto panic;\n");
fprintf(cf, " return 0;\n");
fprintf(cf, " panic:\n");
fprintf(cf, " toku_logger_panic(logger, r);\n");
fprintf(cf, " return r;\n"); fprintf(cf, " return r;\n");
fprintf(cf, "}\n\n"); fprintf(cf, "}\n\n");
}); });
......
This diff is collapsed.
...@@ -9,7 +9,6 @@ enum { TOKU_LOG_VERSION = 1 }; ...@@ -9,7 +9,6 @@ enum { TOKU_LOG_VERSION = 1 };
int toku_logger_create (TOKULOGGER *resultp); int toku_logger_create (TOKULOGGER *resultp);
int toku_logger_open (const char *directory, TOKULOGGER logger); int toku_logger_open (const char *directory, TOKULOGGER logger);
int toku_logger_log_bytes (TOKULOGGER logger, struct logbytes *bytes, int do_fsync);
int toku_logger_shutdown(TOKULOGGER logger); int toku_logger_shutdown(TOKULOGGER logger);
int toku_logger_close(TOKULOGGER *loggerp); int toku_logger_close(TOKULOGGER *loggerp);
...@@ -90,4 +89,62 @@ void toku_logger_note_checkpoint(TOKULOGGER logger, LSN lsn); ...@@ -90,4 +89,62 @@ void toku_logger_note_checkpoint(TOKULOGGER logger, LSN lsn);
TXNID toku_logger_get_oldest_living_xid(TOKULOGGER logger); TXNID toku_logger_get_oldest_living_xid(TOKULOGGER logger);
LSN toku_logger_get_oldest_living_lsn(TOKULOGGER logger); LSN toku_logger_get_oldest_living_lsn(TOKULOGGER logger);
int toku_logger_make_space_in_inbuf (TOKULOGGER logger, int n_bytes_needed);
int
toku_logger_write_inbuf (TOKULOGGER logger);
// Effect: Write the buffered data (from the inbuf) to a file. No fsync, however.
// As a side effect, the inbuf will be made empty.
// Return 0 on success, otherwise return an error number.
// Requires: The inbuf lock is currently held, and the outbuf lock is not held.
// Upon return, the inbuf lock will be held, and the outbuf lock is not held.
// However, no side effects should have been made to the logger. The lock was acquired simply to determine that the buffer will overflow if we try to put something into it.
// The inbuf lock will be released, so the operations before and after this function call will not be atomic.
// Rationale: When the buffer becomes nearly full, call this function so that more can be put in.
// Implementation note: Since the output lock is acquired first, we must release the input lock, and then grab both in the right order.
int
toku_logger_maybe_fsync (TOKULOGGER logger, LSN lsn, int do_fsync);
// Effect: If fsync is nonzero, then make sure that the log is flushed and synced at least up to lsn.
// Entry: Holds input lock.
// Exit: Holds no locks.
// Discussion: How does the logger work:
// The logger has two buffers: an inbuf and an outbuf.
// There are two locks, called the inlock, and the outlock. To write, both locks must be held, and the outlock is acquired first.
// Roughly speaking, the inbuf is used to accumulate logged data, and the outbuf is used to write to disk.
// When something is to be logged we do the following:
// acquire the inlock.
// Make sure there is space in the inbuf for the logentry. (We know the size of the logentry in advance):
// if the inbuf doesn't have enough space then
// release the inlock
// acquire the outlock
// acquire the inlock
// it's possible that some other thread made space.
// if there still isn't space
// swap the inbuf and the outbuf
// release the inlock
// write the outbuf
// acquire the inlock
// release the outlock
// if the inbuf is still too small, then increase the size of the inbuf
// Increment the LSN and fill the inbuf.
// If fsync is required then
// release the inlock
// acquire the outlock
// acquire the inlock
// if the LSN has been flushed and fsynced (if so we are done. Some other thread did the flush.)
// release the locks
// if the LSN has been flushed but not fsynced up to the LSN:
// release the inlock
// fsync
// release the outlock
// otherwise:
// swap the outbuf and the inbuf
// release the inlock
// write the outbuf
// fsync
// release the outlock
#endif #endif
...@@ -67,16 +67,6 @@ void toku_rollback_txn_close (TOKUTXN txn) { ...@@ -67,16 +67,6 @@ void toku_rollback_txn_close (TOKUTXN txn) {
return; return;
} }
// wbuf points into logbytes
int toku_logger_finish (TOKULOGGER logger, struct logbytes *logbytes, struct wbuf *wbuf, int do_fsync) {
if (logger->is_panicked) return EINVAL;
u_int32_t checksum = x1764_finish(&wbuf->checksum);
wbuf_int(wbuf, checksum);
wbuf_int(wbuf, 4+wbuf->ndone);
logbytes->nbytes=wbuf->ndone;
return toku_logger_log_bytes(logger, logbytes, do_fsync);
}
void* toku_malloc_in_rollback(TOKUTXN txn, size_t size) { void* toku_malloc_in_rollback(TOKUTXN txn, size_t size) {
return malloc_in_memarena(txn->rollentry_arena, size); return malloc_in_memarena(txn->rollentry_arena, size);
} }
......
...@@ -17,23 +17,20 @@ test_main (int argc __attribute__((__unused__)), ...@@ -17,23 +17,20 @@ test_main (int argc __attribute__((__unused__)),
system(rmrf); system(rmrf);
r = toku_os_mkdir(dname, S_IRWXU); assert(r==0); r = toku_os_mkdir(dname, S_IRWXU); assert(r==0);
TOKULOGGER logger; TOKULOGGER logger;
r = toku_logger_create(&logger); r = toku_logger_create(&logger); assert(r == 0);
assert(r == 0); r = toku_logger_open(dname, logger); assert(r == 0);
r = toku_logger_open(dname, logger);
assert(r == 0);
{ {
struct logbytes *b = MALLOC_LOGBYTES(5); r = ml_lock(&logger->input_lock); assert(r == 0);
b->nbytes=5; r = toku_logger_make_space_in_inbuf(logger, 5); assert(r == 0);
memcpy(b->bytes, "a1234", 5); snprintf(logger->inbuf.buf+logger->inbuf.n_in_buf, 5, "a1234");
b->lsn=(LSN){0}; logger->inbuf.n_in_buf+=5;
r = ml_lock(&logger->input_lock); logger->lsn.lsn++;
assert(r==0); logger->inbuf.max_lsn_in_buf = logger->lsn;
r = toku_logger_log_bytes(logger, b, 0); r = ml_unlock(&logger->input_lock); assert(r == 0);
assert(r==0);
assert(logger->input_lock.is_locked==0);
} }
r = toku_logger_close(&logger);
assert(r == 0); r = toku_logger_close(&logger); assert(r == 0);
{ {
toku_struct_stat statbuf; toku_struct_stat statbuf;
r = toku_stat(dname "/log000000000000.tokulog", &statbuf); r = toku_stat(dname "/log000000000000.tokulog", &statbuf);
......
...@@ -31,16 +31,16 @@ test_main (int argc __attribute__((__unused__)), ...@@ -31,16 +31,16 @@ test_main (int argc __attribute__((__unused__)),
assert(r == 0); assert(r == 0);
int i; int i;
for (i=0; i<1000; i++) { for (i=0; i<1000; i++) {
r = ml_lock(&logger->input_lock); r = ml_lock(&logger->input_lock); assert(r==0);
assert(r==0);
int ilen=3+random()%5; int ilen=3+random()%5;
struct logbytes *b = MALLOC_LOGBYTES(ilen+1); r = toku_logger_make_space_in_inbuf(logger, ilen+1); assert(r==0);
b->nbytes=ilen+1; snprintf(logger->inbuf.buf+logger->inbuf.n_in_buf, ilen+1, "a%0*d ", (int)ilen, i);
snprintf(b->bytes, ilen+1, "a%0*d ", (int)ilen, i); // skip the trailing nul logger->inbuf.n_in_buf+=(ilen+1);
b->lsn=(LSN){23+i}; logger->lsn.lsn++;
r = toku_logger_log_bytes(logger, b, 0); logger->inbuf.max_lsn_in_buf = logger->lsn;
assert(r==0); r = ml_unlock(&logger->input_lock); assert(r == 0);
r = toku_logger_fsync(logger); assert(r == 0);
} }
r = toku_logger_close(&logger); r = toku_logger_close(&logger);
assert(r == 0); assert(r == 0);
...@@ -56,7 +56,7 @@ test_main (int argc __attribute__((__unused__)), ...@@ -56,7 +56,7 @@ test_main (int argc __attribute__((__unused__)),
toku_struct_stat statbuf; toku_struct_stat statbuf;
r = toku_stat(fname, &statbuf); r = toku_stat(fname, &statbuf);
assert(r==0); assert(r==0);
assert(statbuf.st_size<=LSIZE); assert(statbuf.st_size<=LSIZE+10);
} }
r = closedir(dir); r = closedir(dir);
assert(r==0); assert(r==0);
......
...@@ -31,28 +31,24 @@ test_main (int argc __attribute__((__unused__)), ...@@ -31,28 +31,24 @@ test_main (int argc __attribute__((__unused__)),
assert(r == 0); assert(r == 0);
{ {
r = ml_lock(&logger->input_lock); r = ml_lock(&logger->input_lock); assert(r==0);
assert(r==0);
int lsize=LSIZE-12-2; int lsize=LSIZE-12-2;
struct logbytes *b = MALLOC_LOGBYTES(lsize); r = toku_logger_make_space_in_inbuf(logger, lsize); assert(r==0);
b->nbytes=lsize; snprintf(logger->inbuf.buf+logger->inbuf.n_in_buf, lsize, "a%*d", lsize-1, 0);
snprintf(b->bytes, lsize, "a%*d", LSIZE-12-2, 0); logger->inbuf.n_in_buf += lsize;
b->lsn=(LSN){23}; logger->lsn.lsn++;
r = toku_logger_log_bytes(logger, b, 0); logger->inbuf.max_lsn_in_buf = logger->lsn;
assert(r==0); r = ml_unlock(&logger->input_lock); assert(r == 0);
} }
{ {
r = ml_lock(&logger->input_lock); r = ml_lock(&logger->input_lock); assert(r==0);
assert(r==0); r = toku_logger_make_space_in_inbuf(logger, 2); assert(r==0);
memcpy(logger->inbuf.buf+logger->inbuf.n_in_buf, "b1", 2);
struct logbytes *b = MALLOC_LOGBYTES(2); logger->inbuf.n_in_buf += 2;
b->lsn=(LSN){24}; logger->lsn.lsn++;
b->nbytes=2; logger->inbuf.max_lsn_in_buf = logger->lsn;
memcpy(b->bytes, "b1", 2); r = ml_unlock(&logger->input_lock); assert(r == 0);
r = toku_logger_log_bytes(logger, b, 0);
assert(r==0);
} }
r = toku_logger_close(&logger); r = toku_logger_close(&logger);
......
...@@ -17,8 +17,8 @@ TOKULOGGER logger[NUM_LOGGERS]; ...@@ -17,8 +17,8 @@ TOKULOGGER logger[NUM_LOGGERS];
static void setup_logger(int which) { static void setup_logger(int which) {
char dnamewhich[200]; char dnamewhich[200];
int r; int r;
sprintf(dnamewhich, "%s_%d", dname, which); snprintf(dnamewhich, sizeof(dnamewhich), "%s_%d", dname, which);
r = toku_os_mkdir(dnamewhich, S_IRWXU); assert(r==0); r = toku_os_mkdir(dnamewhich, S_IRWXU); if (r!=0) printf("file %s error (%d) %s\n", dnamewhich, errno, strerror(errno)); assert(r==0);
r = toku_logger_create(&logger[which]); r = toku_logger_create(&logger[which]);
assert(r == 0); assert(r == 0);
r = toku_logger_set_lg_max(logger[which], LSIZE); r = toku_logger_set_lg_max(logger[which], LSIZE);
...@@ -34,28 +34,24 @@ static void setup_logger(int which) { ...@@ -34,28 +34,24 @@ static void setup_logger(int which) {
static void play_with_logger(int which) { static void play_with_logger(int which) {
int r; int r;
{ {
r = ml_lock(&logger[which]->input_lock); r = ml_lock(&logger[which]->input_lock); assert(r==0);
assert(r==0);
int lsize=LSIZE-12-2; int lsize=LSIZE-12-2;
struct logbytes *b = MALLOC_LOGBYTES(lsize); r = toku_logger_make_space_in_inbuf(logger[which], lsize); assert(r==0);
b->nbytes=lsize; snprintf(logger[which]->inbuf.buf+logger[which]->inbuf.n_in_buf, lsize, "a%*d", lsize-1, 0);
snprintf(b->bytes, lsize, "a%*d", LSIZE-12-2, 0); logger[which]->inbuf.n_in_buf += lsize;
b->lsn=(LSN){23}; logger[which]->lsn.lsn++;
r = toku_logger_log_bytes(logger[which], b, 0); logger[which]->inbuf.max_lsn_in_buf = logger[which]->lsn;
assert(r==0); r = ml_unlock(&logger[which]->input_lock); assert(r == 0);
} }
{ {
r = ml_lock(&logger[which]->input_lock); r = ml_lock(&logger[which]->input_lock); assert(r==0);
assert(r==0); r = toku_logger_make_space_in_inbuf(logger[which], 2); assert(r==0);
memcpy(logger[which]->inbuf.buf+logger[which]->inbuf.n_in_buf, "b1", 2);
struct logbytes *b = MALLOC_LOGBYTES(2); logger[which]->inbuf.n_in_buf += 2;
b->lsn=(LSN){24}; logger[which]->lsn.lsn++;
b->nbytes=2; logger[which]->inbuf.max_lsn_in_buf = logger[which]->lsn;
memcpy(b->bytes, "b1", 2); r = ml_unlock(&logger[which]->input_lock); assert(r == 0);
r = toku_logger_log_bytes(logger[which], b, 0);
assert(r==0);
} }
} }
......
...@@ -24,17 +24,25 @@ struct wbuf { ...@@ -24,17 +24,25 @@ struct wbuf {
struct x1764 checksum; // The checksumx state struct x1764 checksum; // The checksumx state
}; };
static inline void wbuf_init (struct wbuf *w, void *buf, DISKOFF size) { static inline void wbuf_nocrc_init (struct wbuf *w, void *buf, DISKOFF size) {
w->buf=buf; w->buf=buf;
w->size=size; w->size=size;
w->ndone=0; w->ndone=0;
}
static inline void wbuf_init (struct wbuf *w, void *buf, DISKOFF size) {
wbuf_nocrc_init(w, buf, size);
x1764_init(&w->checksum); x1764_init(&w->checksum);
} }
/* Write a character. */ /* Write a character. */
static inline void wbuf_char (struct wbuf *w, unsigned char ch) { static inline void wbuf_nocrc_char (struct wbuf *w, unsigned char ch) {
assert(w->ndone<w->size); assert(w->ndone<w->size);
w->buf[w->ndone++]=ch; w->buf[w->ndone++]=ch;
}
static inline void wbuf_char (struct wbuf *w, unsigned char ch) {
wbuf_nocrc_char (w, ch);
x1764_add(&w->checksum, &w->buf[w->ndone-1], 1); x1764_add(&w->checksum, &w->buf[w->ndone-1], 1);
} }
...@@ -47,12 +55,12 @@ static void wbuf_network_int (struct wbuf *w, int32_t i) { ...@@ -47,12 +55,12 @@ static void wbuf_network_int (struct wbuf *w, int32_t i) {
w->ndone += 4; w->ndone += 4;
} }
static void wbuf_int (struct wbuf *w, int32_t i) { static inline void wbuf_nocrc_int (struct wbuf *w, int32_t i) {
#if 0 #if 0
wbuf_char(w, i>>24); wbuf_nocrc_char(w, i>>24);
wbuf_char(w, i>>16); wbuf_nocrc_char(w, i>>16);
wbuf_char(w, i>>8); wbuf_nocrc_char(w, i>>8);
wbuf_char(w, i>>0); wbuf_nocrc_char(w, i>>0);
#else #else
assert(w->ndone + 4 <= w->size); assert(w->ndone + 4 <= w->size);
#if 0 #if 0
...@@ -63,42 +71,74 @@ static void wbuf_int (struct wbuf *w, int32_t i) { ...@@ -63,42 +71,74 @@ static void wbuf_int (struct wbuf *w, int32_t i) {
#else #else
*(u_int32_t*)(&w->buf[w->ndone]) = toku_htod32(i); *(u_int32_t*)(&w->buf[w->ndone]) = toku_htod32(i);
#endif #endif
x1764_add(&w->checksum, &w->buf[w->ndone], 4);
w->ndone += 4; w->ndone += 4;
#endif #endif
} }
static void wbuf_uint (struct wbuf *w, u_int32_t i) { static inline void wbuf_int (struct wbuf *w, int32_t i) {
wbuf_nocrc_int(w, i);
x1764_add(&w->checksum, &w->buf[w->ndone-4], 4);
}
static inline void wbuf_nocrc_uint (struct wbuf *w, u_int32_t i) {
wbuf_nocrc_int(w, (int32_t)i);
}
static inline void wbuf_uint (struct wbuf *w, u_int32_t i) {
wbuf_int(w, (int32_t)i); wbuf_int(w, (int32_t)i);
} }
static inline void wbuf_literal_bytes(struct wbuf *w, bytevec bytes_bv, u_int32_t nbytes) { static inline void wbuf_nocrc_literal_bytes(struct wbuf *w, bytevec bytes_bv, u_int32_t nbytes) {
const unsigned char *bytes=bytes_bv; const unsigned char *bytes=bytes_bv;
#if 0 #if 0
{ int i; for (i=0; i<nbytes; i++) wbuf_char(w, bytes[i]); } { int i; for (i=0; i<nbytes; i++) wbuf_nocrc_char(w, bytes[i]); }
#else #else
assert(w->ndone + nbytes <= w->size); assert(w->ndone + nbytes <= w->size);
memcpy(w->buf + w->ndone, bytes, (size_t)nbytes); memcpy(w->buf + w->ndone, bytes, (size_t)nbytes);
x1764_add(&w->checksum, &w->buf[w->ndone], nbytes);
w->ndone += nbytes; w->ndone += nbytes;
#endif #endif
}
static inline void wbuf_literal_bytes(struct wbuf *w, bytevec bytes_bv, u_int32_t nbytes) {
wbuf_nocrc_literal_bytes(w, bytes_bv, nbytes);
x1764_add(&w->checksum, &w->buf[w->ndone-nbytes], nbytes);
}
static void wbuf_nocrc_bytes (struct wbuf *w, bytevec bytes_bv, u_int32_t nbytes) {
wbuf_nocrc_uint(w, nbytes);
wbuf_nocrc_literal_bytes(w, bytes_bv, nbytes);
} }
static void wbuf_bytes (struct wbuf *w, bytevec bytes_bv, u_int32_t nbytes) { static void wbuf_bytes (struct wbuf *w, bytevec bytes_bv, u_int32_t nbytes) {
wbuf_uint(w, nbytes); wbuf_uint(w, nbytes);
wbuf_literal_bytes(w, bytes_bv, nbytes); wbuf_literal_bytes(w, bytes_bv, nbytes);
} }
static void wbuf_nocrc_ulonglong (struct wbuf *w, u_int64_t ull) {
wbuf_nocrc_uint(w, (u_int32_t)(ull>>32));
wbuf_nocrc_uint(w, (u_int32_t)(ull&0xFFFFFFFF));
}
static void wbuf_ulonglong (struct wbuf *w, u_int64_t ull) { static void wbuf_ulonglong (struct wbuf *w, u_int64_t ull) {
wbuf_uint(w, (u_int32_t)(ull>>32)); wbuf_uint(w, (u_int32_t)(ull>>32));
wbuf_uint(w, (u_int32_t)(ull&0xFFFFFFFF)); wbuf_uint(w, (u_int32_t)(ull&0xFFFFFFFF));
} }
static inline void wbuf_nocrc_u_int64_t(struct wbuf *w, u_int64_t ull) {
wbuf_nocrc_ulonglong(w, ull);
}
static inline void wbuf_u_int64_t(struct wbuf *w, u_int64_t ull) { static inline void wbuf_u_int64_t(struct wbuf *w, u_int64_t ull) {
wbuf_ulonglong(w, ull); wbuf_ulonglong(w, ull);
} }
static inline void wbuf_nocrc_BYTESTRING (struct wbuf *w, BYTESTRING v) {
wbuf_nocrc_bytes(w, v.data, v.len);
}
static inline void wbuf_BYTESTRING (struct wbuf *w, BYTESTRING v) { static inline void wbuf_BYTESTRING (struct wbuf *w, BYTESTRING v) {
wbuf_bytes(w, v.data, v.len); wbuf_bytes(w, v.data, v.len);
} }
...@@ -107,6 +147,10 @@ static inline void wbuf_u_int8_t (struct wbuf *w, u_int8_t v) { ...@@ -107,6 +147,10 @@ static inline void wbuf_u_int8_t (struct wbuf *w, u_int8_t v) {
wbuf_char(w, v); wbuf_char(w, v);
} }
static inline void wbuf_nocrc_u_int32_t (struct wbuf *w, u_int32_t v) {
wbuf_nocrc_uint(w, v);
}
static inline void wbuf_u_int32_t (struct wbuf *w, u_int32_t v) { static inline void wbuf_u_int32_t (struct wbuf *w, u_int32_t v) {
wbuf_uint(w, v); wbuf_uint(w, v);
} }
...@@ -119,15 +163,26 @@ static inline void wbuf_BLOCKNUM (struct wbuf *w, BLOCKNUM b) { ...@@ -119,15 +163,26 @@ static inline void wbuf_BLOCKNUM (struct wbuf *w, BLOCKNUM b) {
} }
static inline void wbuf_nocrc_TXNID (struct wbuf *w, TXNID tid) {
wbuf_nocrc_ulonglong(w, tid);
}
static inline void wbuf_TXNID (struct wbuf *w, TXNID tid) { static inline void wbuf_TXNID (struct wbuf *w, TXNID tid) {
wbuf_ulonglong(w, tid); wbuf_ulonglong(w, tid);
} }
static inline void wbuf_nocrc_LSN (struct wbuf *w, LSN lsn) {
wbuf_nocrc_ulonglong(w, lsn.lsn);
}
static inline void wbuf_LSN (struct wbuf *w, LSN lsn) { static inline void wbuf_LSN (struct wbuf *w, LSN lsn) {
wbuf_ulonglong(w, lsn.lsn); wbuf_ulonglong(w, lsn.lsn);
} }
static inline void wbuf_nocrc_FILENUM (struct wbuf *w, FILENUM fileid) {
wbuf_nocrc_uint(w, fileid.fileid);
}
static inline void wbuf_FILENUM (struct wbuf *w, FILENUM fileid) { static inline void wbuf_FILENUM (struct wbuf *w, FILENUM fileid) {
wbuf_uint(w, fileid.fileid); wbuf_uint(w, fileid.fileid);
} }
......
...@@ -178,7 +178,8 @@ DEPEND_COMPILE += $(wildcard $(TOKUROOT)$(OS_CHOICE)/*.h) ...@@ -178,7 +178,8 @@ DEPEND_COMPILE += $(wildcard $(TOKUROOT)$(OS_CHOICE)/*.h)
SUPPRESSIONS=no SUPPRESSIONS=no
#Tools #Tools
VGRIND=valgrind --quiet --error-exitcode=1 --leak-check=full --show-reachable=yes \ VALGRIND=valgrind
VGRIND=$(VALGRIND) --quiet --error-exitcode=1 --leak-check=full --show-reachable=yes \
--suppressions=$(TOKUROOT)newbrt/valgrind.suppressions \ --suppressions=$(TOKUROOT)newbrt/valgrind.suppressions \
--suppressions=$(TOKUROOT)src/tests/bdb.suppressions \ --suppressions=$(TOKUROOT)src/tests/bdb.suppressions \
--gen-suppressions=$(SUPPRESSIONS) --gen-suppressions=$(SUPPRESSIONS)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment