Commit abe26573 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Group commit working in tokulogger. Need to release some locks in ydb. Addresses #484.

git-svn-id: file:///svn/tokudb@2763 c7de825b-a66e-492c-adef-691d508d4ae1
parent 70232af2
......@@ -8,7 +8,7 @@
# GCOV_FLAGS = -fprofile-arcs -ftest-coverage
# PROF_FLAGS = -pg
OPTFLAGS = -O2
# OPTFLAGS = -O2
ifeq ($(VERBOSE),2)
VERBVERBOSE=-v
......@@ -59,6 +59,10 @@ REGRESSION_TESTS = \
brt-test-cursor \
log-test \
log-test2 \
log-test3 \
log-test4 \
log-test5 \
log-test6 \
test_oexcl \
test-assert \
test-primes \
......@@ -87,7 +91,9 @@ recover: recover.o log_code.o memory.o log.o brt-serialize.o fifo.o pma.o ybt.o
roll.o: log_header.h log-internal.h log.h yerror.h brttypes.h kv-pair.h memory.h key.h cachetable.h pma.h
log_code.o: log_header.h wbuf.h log-internal.h
log_code.c log_header.h: logformat
log_header.h: log_code.c
@echo generated log_code.c so log_header.c was also generated
log_code.c: logformat
./logformat
libs: log.o
......@@ -135,7 +141,7 @@ check-fanout:
let BRT_FANOUT=BRT_FANOUT+1; \
done
log-test log-test2 pma-test benchmark-test brt-test brt-test3 brt-test4 brt-test-cursor test-brt-delete-both brt-serialize-test brtdump test-inc-split test-del-inorder: LDFLAGS+=-lz
log-test log-test2 log-test3 log-test4 log-test5 log-test6 pma-test benchmark-test brt-test brt-test3 brt-test4 brt-test-cursor test-brt-delete-both brt-serialize-test brtdump test-inc-split test-del-inorder: LDFLAGS+=-lz
# pma: PROF_FLAGS=-fprofile-arcs -ftest-coverage
BRT_INTERNAL_H_INCLUDES = brt-internal.h cachetable.h fifo.h pma.h brt.h brttypes.h yerror.h ybt.h log.h ../include/db.h kv-pair.h memory.h crc.h
......@@ -164,7 +170,7 @@ fifo-test: fifo.o memory.o toku_assert.o ybt.o
brt-serialize.o: $(BRT_INTERNAL_H_INCLUDES) key.h wbuf.h rbuf.h
brt-bigtest: memory.o ybt.o brt.o pma.o cachetable.o key.o fifo.o brt-serialize.o
brt-bigtest.o: brt.h ../include/db.h
log-test2 log-test: log.o memory.o toku_assert.o roll.o log_code.o brt-serialize.o brt.o cachetable.o pma.o ybt.o fifo.o key.o fingerprint.o brt-verify.o mempool.o primes.o
log-test6 log-test5 log-test4 log-test3 log-test2 log-test: log.o memory.o toku_assert.o roll.o log_code.o brt-serialize.o brt.o cachetable.o pma.o ybt.o fifo.o key.o fingerprint.o brt-verify.o mempool.o primes.o
brt-verify.o: $(BRT_INTERNAL_H_INCLUDES)
fingerprint.o: $(BRT_INTERNAL_H_INCLUDES)
toku_assert.o: toku_assert.h
......
......@@ -95,7 +95,7 @@ static void fixup_child_fingerprint(BRTNODE node, int childnum_of_node, BRTNODE
// We only call this function if we have reason to believe that the child's fingerprint did change.
BNC_SUBTREE_FINGERPRINT(node,childnum_of_node)=sum;
node->dirty=1;
toku_log_changechildfingerprint(logger, toku_cachefile_filenum(brt->cf), node->thisnodename, childnum_of_node, old_fingerprint, sum);
toku_log_changechildfingerprint(logger, 0, toku_cachefile_filenum(brt->cf), node->thisnodename, childnum_of_node, old_fingerprint, sum);
toku_update_brtnode_loggerlsn(node, logger);
}
......@@ -227,7 +227,7 @@ static int malloc_diskblock_header_is_in_memory (DISKOFF *res, BRT brt, int size
DISKOFF result = brt->h->unused_memory;
brt->h->unused_memory+=size;
brt->h->dirty = 1;
int r = toku_log_changeunusedmemory(logger, toku_cachefile_filenum(brt->cf), result, brt->h->unused_memory);
int r = toku_log_changeunusedmemory(logger, 0, toku_cachefile_filenum(brt->cf), result, brt->h->unused_memory);
*res = result;
return r;
}
......@@ -295,7 +295,7 @@ int toku_create_new_brtnode (BRT t, BRTNODE *result, int height, TOKULOGGER logg
r=toku_cachetable_put(t->cf, n->thisnodename, n, brtnode_size(n),
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, t);
assert(r==0);
r=toku_log_newbrtnode(logger, toku_cachefile_filenum(t->cf), n->thisnodename, height, n->nodesize, (t->flags&TOKU_DB_DUPSORT)!=0, n->rand4fingerprint);
r=toku_log_newbrtnode(logger, 0, toku_cachefile_filenum(t->cf), n->thisnodename, height, n->nodesize, (t->flags&TOKU_DB_DUPSORT)!=0, n->rand4fingerprint);
assert(r==0);
toku_update_brtnode_loggerlsn(n, logger);
return 0;
......@@ -381,7 +381,7 @@ static int brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *node
BNC_DISKOFF(B, targchild) = thischilddiskoff;
int r = toku_log_addchild(logger, fnum, B->thisnodename, targchild, thischilddiskoff, BNC_SUBTREE_FINGERPRINT(node, i));
int r = toku_log_addchild(logger, 0, fnum, B->thisnodename, targchild, thischilddiskoff, BNC_SUBTREE_FINGERPRINT(node, i));
if (r!=0) return r;
while (1) {
......@@ -400,9 +400,9 @@ static int brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *node
u_int32_t new_from_fingerprint = old_from_fingerprint - node->rand4fingerprint*delta;
u_int32_t new_to_fingerprint = old_to_fingerprint + B->rand4fingerprint *delta;
if (r!=0) return r;
r = toku_log_brtdeq(logger, fnum, node->thisnodename, n_children_in_a, xid, type, keybs, databs, old_from_fingerprint, new_from_fingerprint);
r = toku_log_brtdeq(logger, 0, fnum, node->thisnodename, n_children_in_a, xid, type, keybs, databs, old_from_fingerprint, new_from_fingerprint);
if (r!=0) return r;
r = toku_log_brtenq(logger, fnum, B->thisnodename, targchild, xid, type, keybs, databs, old_to_fingerprint, new_to_fingerprint);
r = toku_log_brtenq(logger, 0, fnum, B->thisnodename, targchild, xid, type, keybs, databs, old_to_fingerprint, new_to_fingerprint);
r = toku_fifo_enq(to_htab, key, keylen, data, datalen, type, xid);
if (r!=0) return r;
toku_fifo_deq(from_htab);
......@@ -423,10 +423,10 @@ static int brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *node
BYTESTRING bs = { .len = kv_pair_keylen(node->u.n.childkeys[i-1]),
.data = kv_pair_key(node->u.n.childkeys[i-1]) };
assert(i>0);
r = toku_log_delchild(logger, fnum, node->thisnodename, n_children_in_a, thischilddiskoff, BNC_SUBTREE_FINGERPRINT(node, i), bs);
r = toku_log_delchild(logger, 0, fnum, node->thisnodename, n_children_in_a, thischilddiskoff, BNC_SUBTREE_FINGERPRINT(node, i), bs);
if (r!=0) return r;
if (i>n_children_in_a) {
r = toku_log_setpivot(logger, fnum, B->thisnodename, targchild-1, bs);
r = toku_log_setpivot(logger, 0, fnum, B->thisnodename, targchild-1, bs);
if (r!=0) return r;
B->u.n.childkeys[targchild-1] = node->u.n.childkeys[i-1];
B->u.n.totalchildkeylens += toku_brt_pivot_key_len(t, node->u.n.childkeys[i-1]);
......@@ -624,7 +624,7 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
for (cnum=node->u.n.n_children; cnum>childnum+1; cnum--) {
node->u.n.childinfos[cnum] = node->u.n.childinfos[cnum-1];
}
r = toku_log_addchild(logger, toku_cachefile_filenum(t->cf), node->thisnodename, childnum+1, childb->thisnodename, 0);
r = toku_log_addchild(logger, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum+1, childb->thisnodename, 0);
assert(BNC_DISKOFF(node, childnum)==childa->thisnodename);
BNC_DISKOFF(node, childnum+1) = childb->thisnodename;
BNC_SUBTREE_FINGERPRINT(node, childnum)=0;
......@@ -645,7 +645,7 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
struct kv_pair *pivot = childsplitk->data;
BYTESTRING bs = { .len = childsplitk->size,
.data = kv_pair_key(pivot) };
r = toku_log_setpivot(logger, toku_cachefile_filenum(t->cf), node->thisnodename, childnum, bs);
r = toku_log_setpivot(logger, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum, bs);
if (r!=0) return r;
for (cnum=node->u.n.n_children-1; cnum>childnum; cnum--) {
......@@ -1287,7 +1287,7 @@ static int setup_initial_brt_root_node (BRT t, DISKOFF offset, TOKULOGGER logger
}
toku_verify_counts(node);
// verify_local_fingerprint_nonleaf(node);
toku_log_newbrtnode(logger, toku_cachefile_filenum(t->cf), offset, 0, t->h->nodesize, (t->flags&TOKU_DB_DUPSORT)!=0, node->rand4fingerprint);
toku_log_newbrtnode(logger, 0, toku_cachefile_filenum(t->cf), offset, 0, t->h->nodesize, (t->flags&TOKU_DB_DUPSORT)!=0, node->rand4fingerprint);
toku_update_brtnode_loggerlsn(node, logger);
r = toku_unpin_brtnode(t, node);
if (r!=0) {
......@@ -1617,12 +1617,12 @@ static int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk,
assert(r==0);
assert(newroot);
if (brt->database_name==0) {
toku_log_changeunnamedroot(logger, toku_cachefile_filenum(brt->cf), *rootp, newroot_diskoff);
toku_log_changeunnamedroot(logger, 0, toku_cachefile_filenum(brt->cf), *rootp, newroot_diskoff);
} else {
BYTESTRING bs;
bs.len = 1+strlen(brt->database_name);
bs.data = brt->database_name;
toku_log_changenamedroot(logger, toku_cachefile_filenum(brt->cf), bs, *rootp, newroot_diskoff);
toku_log_changenamedroot(logger, 0, toku_cachefile_filenum(brt->cf), bs, *rootp, newroot_diskoff);
}
*rootp=newroot_diskoff;
brt->h->dirty=1;
......@@ -1645,18 +1645,18 @@ static int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk,
toku_verify_counts(newroot);
//verify_local_fingerprint_nonleaf(nodea);
//verify_local_fingerprint_nonleaf(nodeb);
r=toku_log_newbrtnode(logger, toku_cachefile_filenum(brt->cf), newroot_diskoff, new_height, new_nodesize, (brt->flags&TOKU_DB_DUPSORT)!=0, newroot->rand4fingerprint);
r=toku_log_newbrtnode(logger, 0, toku_cachefile_filenum(brt->cf), newroot_diskoff, new_height, new_nodesize, (brt->flags&TOKU_DB_DUPSORT)!=0, newroot->rand4fingerprint);
if (r!=0) return r;
r=toku_log_addchild(logger, toku_cachefile_filenum(brt->cf), newroot_diskoff, 0, nodea->thisnodename, 0);
r=toku_log_addchild(logger, 0, toku_cachefile_filenum(brt->cf), newroot_diskoff, 0, nodea->thisnodename, 0);
if (r!=0) return r;
r=toku_log_addchild(logger, toku_cachefile_filenum(brt->cf), newroot_diskoff, 1, nodeb->thisnodename, 0);
r=toku_log_addchild(logger, 0, toku_cachefile_filenum(brt->cf), newroot_diskoff, 1, nodeb->thisnodename, 0);
if (r!=0) return r;
fixup_child_fingerprint(newroot, 0, nodea, brt, logger);
fixup_child_fingerprint(newroot, 1, nodeb, brt, logger);
{
BYTESTRING bs = { .len = kv_pair_keylen(newroot->u.n.childkeys[0]),
.data = kv_pair_key(newroot->u.n.childkeys[0]) };
r=toku_log_setpivot(logger, toku_cachefile_filenum(brt->cf), newroot_diskoff, 0, bs);
r=toku_log_setpivot(logger, 0, toku_cachefile_filenum(brt->cf), newroot_diskoff, 0, bs);
if (r!=0) return r;
toku_update_brtnode_loggerlsn(newroot, logger);
}
......
......@@ -5,24 +5,67 @@
#include "list.h"
#include "yerror.h"
#include <stdio.h>
#include <pthread.h>
#include <sys/types.h>
// Locking for the logger
// For most purposes we use the big ydb lock.
// To log: grab the buf lock
// If the buf would overflow, then grab the file lock, swap file&buf, release buf lock, write the file, write the entry, release the file lock
// else append to buf & release lock
#define LOGGER_BUF_SIZE (1<<24)
struct mylock {
pthread_mutex_t lock;
int is_locked;
};
static inline int ml_init(struct mylock *l) {
l->is_locked=0;
return pthread_mutex_init(&l->lock, 0);
}
static inline int ml_lock(struct mylock *l) {
int r = pthread_mutex_lock(&l->lock);
assert(l->is_locked==0);
l->is_locked=1;
return r;
}
static inline int ml_unlock(struct mylock *l) {
assert(l->is_locked==1);
l->is_locked=0;
return pthread_mutex_unlock(&l->lock);
}
static inline int ml_destroy(struct mylock *l) {
assert(l->is_locked==0);
return pthread_mutex_destroy(&l->lock);
}
struct tokulogger {
enum typ_tag tag; // must be first
struct mylock input_lock, output_lock; // acquired in that order
int is_open;
int is_panicked;
int panic_errno;
enum typ_tag tag;
char *directory;
int fd;
int n_in_file;
long long next_log_file_number;
LSN lsn;
char buf[LOGGER_BUF_SIZE];
int n_in_buf;
CACHETABLE ct;
struct list live_txns; // just a linked list. Should be a hashtable.
int lg_max; // The size of the single file in the log. Default is 100MB in TokuDB
// To access these, you must have the input lock
struct logbytes *head,*tail;
LSN lsn; // the next available lsn
struct list live_txns; // just a linked list. Should be a hashtable.
int n_in_buf;
// To access these, you must have the output lock
LSN written_lsn; // the last lsn written
LSN fsynced_lsn; // What is the LSN of the highest fsynced log entry
long long next_log_file_number;
char buf[LOGGER_BUF_SIZE]; // used to marshall logbytes so we can use only a single write
int n_in_file;
};
int toku_logger_find_next_unused_log_file(const char *directory, long long *result);
......@@ -51,7 +94,7 @@ struct tokutxn {
struct list live_txns_link;
};
int toku_logger_finish (TOKULOGGER logger, struct wbuf *wbuf);
int toku_logger_finish (TOKULOGGER logger, struct logbytes *logbytes, struct wbuf *wbuf, int do_fsync);
static inline int toku_logsizeof_u_int8_t (u_int32_t v __attribute__((__unused__))) {
return 1;
......
......@@ -25,12 +25,23 @@ int main (int argc __attribute__((__unused__)),
r = toku_logger_open(dname, logger);
assert(r == 0);
{
LSN lsn={0};
char data[]="a1234";
r = toku_logger_log_bytes(logger, strlen(data), data, lsn);
struct logbytes *b = MALLOC_LOGBYTES(5);
b->nbytes=5;
memcpy(b->bytes, "a1234", 5);
b->lsn=(LSN){0};
r = ml_lock(&logger->input_lock);
assert(r==0);
r = toku_logger_log_bytes(logger, b, 0);
assert(r==0);
assert(logger->input_lock.is_locked==0);
}
r = toku_logger_close(&logger);
assert(r == 0);
{
struct stat statbuf;
r = stat(dname "/log000000000000.tokulog", &statbuf);
assert(r==0);
assert(statbuf.st_size==12+5);
}
return 0;
}
......@@ -3,6 +3,7 @@
#include "log-internal.h"
#include "toku_assert.h"
#include <dirent.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/stat.h>
......@@ -33,25 +34,36 @@ int main (int argc __attribute__((__unused__)),
r = toku_logger_open(dname, logger);
assert(r == 0);
int i;
for (i=0; i<20; i++) {
r = ml_lock(&logger->LSN_lock);
for (i=0; i<1000; i++) {
r = ml_lock(&logger->input_lock);
assert(r==0);
LSN lsn={23};
char data[100];
snprintf(data, sizeof(data), "a%04d", i);
r = toku_logger_log_bytes(logger, strlen(data), data, lsn);
int ilen=3+random()%5;
struct logbytes *b = MALLOC_LOGBYTES(ilen+1);
b->nbytes=ilen+1;
snprintf(b->bytes, ilen+1, "a%0*d ", (int)ilen, i); // skip the trailing nul
b->lsn=(LSN){23+i};
r = toku_logger_log_bytes(logger, b, 0);
assert(r==0);
}
r = toku_logger_close(&logger);
assert(r == 0);
{
struct stat statbuf;
r = stat(dname "/log000000000000.tokulog", &statbuf);
DIR *dir=opendir(dname);
assert(dir);
struct dirent *dirent;
while ((dirent=readdir(dir))) {
if (strncmp(dirent->d_name, "log", 3)!=0) continue;
char fname[sizeof(dname)+256+1];
snprintf(fname, sizeof(fname), "%s/%s", dname, dirent->d_name);
struct stat statbuf;
r = stat(fname, &statbuf);
assert(r==0);
assert(statbuf.st_size<=LSIZE);
}
r = closedir(dir);
assert(r==0);
assert(statbuf.st_size<=LSIZE);
}
return 0;
}
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
#include "log-internal.h"
#include "toku_assert.h"
#include <fcntl.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#define dname __FILE__ ".dir"
#define rmrf "rm -rf " dname "/"
// create and close, making sure that everything is deallocated properly.
#define LSIZE 100
int main (int argc __attribute__((__unused__)),
char *argv[] __attribute__((__unused__))) {
int r;
system(rmrf);
r = mkdir(dname, 0700); assert(r==0);
TOKULOGGER logger;
r = toku_logger_create(&logger);
assert(r == 0);
r = toku_logger_set_lg_max(logger, LSIZE);
{
u_int32_t n;
r = toku_logger_get_lg_max(logger, &n);
assert(n==LSIZE);
}
r = toku_logger_open(dname, logger);
assert(r == 0);
{
r = ml_lock(&logger->input_lock);
assert(r==0);
int lsize=LSIZE-12-2;
struct logbytes *b = MALLOC_LOGBYTES(lsize);
b->nbytes=lsize;
snprintf(b->bytes, lsize, "a%*d", LSIZE-12-2, 0);
b->lsn=(LSN){23};
r = toku_logger_log_bytes(logger, b, 0);
assert(r==0);
}
{
r = ml_lock(&logger->input_lock);
assert(r==0);
struct logbytes *b = MALLOC_LOGBYTES(2);
b->lsn=(LSN){24};
b->nbytes=2;
memcpy(b->bytes, "b1", 2);
r = toku_logger_log_bytes(logger, b, 0);
assert(r==0);
}
r = toku_logger_close(&logger);
assert(r == 0);
{
struct stat statbuf;
r = stat(dname "/log000000000000.tokulog", &statbuf);
assert(r==0);
assert(statbuf.st_size<=LSIZE);
}
return 0;
}
This diff is collapsed.
......@@ -9,10 +9,20 @@
#include "brttypes.h"
#include "kv-pair.h"
struct logbytes;
struct logbytes {
struct logbytes *next;
int nbytes;
LSN lsn;
char bytes[0];
};
#define MALLOC_LOGBYTES(n) toku_malloc(sizeof(struct logbytes)+n)
int toku_logger_create(TOKULOGGER */*resultp*/);
void toku_logger_set_cachetable (TOKULOGGER, CACHETABLE);
int toku_logger_open(const char */*directory*/, TOKULOGGER);
int toku_logger_log_bytes(TOKULOGGER logger, int nbytes, void *bytes);
int toku_logger_log_bytes(TOKULOGGER logger, struct logbytes *bytes, int do_fsync);
int toku_logger_close(TOKULOGGER *logger);
int toku_logger_log_checkpoint (TOKULOGGER, LSN*);
void toku_logger_panic(TOKULOGGER, int/*err*/);
......
......@@ -311,7 +311,7 @@ void generate_log_free(void) {
void generate_log_writer (void) {
DO_LOGTYPES(lt, ({
fprintf2(cf, hf, "int toku_log_%s (TOKULOGGER logger", lt->name);
fprintf2(cf, hf, "int toku_log_%s (TOKULOGGER logger, int do_fsync", lt->name);
DO_FIELDS(ft, lt, fprintf2(cf, hf, ", %s %s", ft->type, ft->name));
fprintf(hf, ");\n");
fprintf(cf, ") {\n");
......@@ -324,19 +324,20 @@ void generate_log_writer (void) {
fprintf(cf, " +8 // crc + len\n");
fprintf(cf, " );\n");
fprintf(cf, " struct wbuf wbuf;\n");
fprintf(cf, " char *buf = toku_malloc(buflen);\n");
fprintf(cf, " if (buf==0) return errno;\n");
fprintf(cf, " wbuf_init(&wbuf, buf, buflen);\n");
fprintf(cf, " struct logbytes *lbytes = MALLOC_LOGBYTES(buflen);\n");
fprintf(cf, " if (lbytes==0) return errno;\n");
fprintf(cf, " wbuf_init(&wbuf, &lbytes->bytes[0], buflen);\n");
fprintf(cf, " wbuf_int(&wbuf, buflen);\n");
fprintf(cf, " wbuf_char(&wbuf, '%c');\n", 0xff&lt->command_and_flags);
fprintf(cf, " wbuf_LSN(&wbuf, logger->lsn);\n");
fprintf(cf, " ml_lock(&logger->input_lock);\n");
fprintf(cf, " LSN lsn = logger->lsn;\n");
fprintf(cf, " wbuf_LSN(&wbuf, lsn);\n");
fprintf(cf, " lbytes->lsn = lsn;\n");
fprintf(cf, " logger->lsn.lsn++;\n");
DO_FIELDS(ft, lt,
fprintf(cf, " wbuf_%s(&wbuf, %s);\n", ft->type, ft->name));
fprintf(cf, " int r= toku_logger_finish(logger, &wbuf);\n");
fprintf(cf, " int r= toku_logger_finish(logger, lbytes, &wbuf, do_fsync);\n");
fprintf(cf, " assert(wbuf.ndone==buflen);\n");
fprintf(cf, " toku_free(buf);\n");
fprintf(cf, " return r;\n");
fprintf(cf, "}\n\n");
}));
......
......@@ -369,7 +369,7 @@ static int pma_log_distribute (TOKULOGGER logger, FILENUM filenum, DISKOFF old_d
}
}
ipa.size=j;
int r=toku_log_pmadistribute(logger, filenum, old_diskoff, new_diskoff, ipa);
int r=toku_log_pmadistribute(logger, 0, filenum, old_diskoff, new_diskoff, ipa);
if (logger && oldnode_lsn) *oldnode_lsn = toku_logger_last_lsn(logger);
if (logger && newnode_lsn) *newnode_lsn = toku_logger_last_lsn(logger);
// if (0 && pma) {
......@@ -546,7 +546,7 @@ static int pma_resize_array(TOKULOGGER logger, FILENUM filenum, DISKOFF offset,
unsigned int oldN, n;
int r = pma_resize_array_nolog(pma, asksize, startz, &oldN, &n);
if (r!=0) return r;
toku_log_resizepma (logger, filenum, offset, oldN, n);
toku_log_resizepma (logger, 0, filenum, offset, oldN, n);
if (logger && node_lsn) *node_lsn = toku_logger_last_lsn(logger);
return 0;
}
......@@ -734,7 +734,7 @@ int toku_pma_insert (PMA pma, DBT *k, DBT *v, TOKULOGGER logger, TXNID xid, FILE
{
const BYTESTRING key = { pair->keylen, kv_pair_key(pair) };
const BYTESTRING data = { pair->vallen, kv_pair_val(pair) };
int r = toku_log_insertinleaf (logger, xid, pma->filenum, diskoff, idx, key, data);
int r = toku_log_insertinleaf (logger, 0, xid, pma->filenum, diskoff, idx, key, data);
if (r!=0) return r;
if (node_lsn) *node_lsn = toku_logger_last_lsn(logger);
}
......@@ -772,7 +772,7 @@ static int pma_log_delete (PMA pma, const char *key, int keylen, const char *val
{
const BYTESTRING deletedkey = { keylen, (char*)key };
const BYTESTRING deleteddata = { vallen, (char*)val };
int r=toku_log_deleteinleaf(logger, xid, pma->filenum, diskoff, idx, deletedkey, deleteddata);
int r=toku_log_deleteinleaf(logger, 0, xid, pma->filenum, diskoff, idx, deletedkey, deleteddata);
if (r!=0) return r;
}
if (logger) {
......@@ -945,7 +945,7 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
{
const BYTESTRING key = { k->size, k->data };
const BYTESTRING data = { v->size, v->data };
r = toku_log_insertinleaf (logger, xid, pma->filenum, diskoff, idx, key, data);
r = toku_log_insertinleaf (logger, 0, xid, pma->filenum, diskoff, idx, key, data);
if (logger && node_lsn) *node_lsn = toku_logger_last_lsn(logger);
if (r!=0) return r;
/* We don't record the insert here for rollback. The insert should have been logged at the top-level. */
......@@ -1105,7 +1105,7 @@ int toku_pma_split(TOKULOGGER logger, FILENUM filenum,
{
int r = pma_log_distribute(logger, filenum, diskoff, diskoff, spliti, &pairs[0], lsn, lsn);
if (r!=0) { toku_free(pairs); return r; }
r = toku_log_resizepma(logger, filenum, diskoff, oldn_for_logging, newn_for_logging);
r = toku_log_resizepma(logger, 0, filenum, diskoff, oldn_for_logging, newn_for_logging);
if (r!=0) { toku_free(pairs); return r; }
if (logger && lsn) *lsn = toku_logger_last_lsn(logger);
......
......@@ -27,14 +27,11 @@ SHARED=-shared $(EXPORTMAP)
RPATHNAME=
endif
.PHONY: install logformat
install: logformat locktree $(LIBRARY) $(LIBNAME).a
.PHONY: install
install: locktree $(LIBRARY) $(LIBNAME).a
cp $(LIBRARY) ../lib/
cp $(LIBNAME).a ../lib
logformat:
(cd ../newbrt && make)
locktree:
cd lock_tree && make
......
......@@ -29,7 +29,7 @@ else
BDB_LDFLAGS += -Wl,-rpath,$(BDBDIR)/lib
endif
BDB_LDFLAGS += -lpthread
TDB_LOADLIBES = -L../ -ltokudb -Wl,-rpath,.. -lpthread
TDB_LOADLIBES = -L.. -ltokudb -Wl,-rpath,.. -lpthread
VGRIND=valgrind --quiet --error-exitcode=1 --leak-check=yes
HGRIND=valgrind --quiet --tool=helgrind
endif
......
......@@ -711,25 +711,27 @@ static int toku_txn_release_locks(DB_TXN* txn) {
}
static int toku_txn_commit(DB_TXN * txn, u_int32_t flags) {
if (!txn) return EINVAL;
HANDLE_PANICKED_ENV(txn->mgrp);
//toku_ydb_notef("flags=%d\n", flags);
int r;
int nosync = (flags & DB_TXN_NOSYNC)!=0;
flags &= ~DB_TXN_NOSYNC;
if (!txn) return EINVAL;
if (flags!=0) goto return_invalid;
r = toku_logger_commit(txn->i->tokutxn, nosync);
if (0) {
return_invalid:
r = EINVAL;
toku_free(txn->i->tokutxn);
if (flags!=0) {
if (txn->i) {
if (txn->i->tokutxn)
toku_free(txn->i->tokutxn);
toku_free(txn->i);
}
toku_free(txn);
return EINVAL;
}
// Cleanup */
int r2 = toku_txn_release_locks(txn);
int r = toku_logger_commit(txn->i->tokutxn, nosync); // frees the tokutxn
int r2 = toku_txn_release_locks(txn); // release the locks after the commit (otherwise, what if we abort)
// the toxutxn is freed, and we must free the rest. */
if (txn->i)
toku_free(txn->i);
toku_free(txn);
return r ? r : r2; // The txn is no good after the commit.
return r2 ? r2 : r; // The txn is no good after the commit even if the commit fails.
}
static u_int32_t toku_txn_id(DB_TXN * txn) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment