Commit 48f0ad74 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Up

git-svn-id: file:///svn/tokudb@519 c7de825b-a66e-492c-adef-691d508d4ae1
parent d5153759
...@@ -12,7 +12,7 @@ FPICFLAGS = -fPIC ...@@ -12,7 +12,7 @@ FPICFLAGS = -fPIC
DTOOL = valgrind --quiet --error-exitcode=1 DTOOL = valgrind --quiet --error-exitcode=1
endif endif
CFLAGS = -Wall -W $(OPTFLAGS) -g $(GCOV_FLAGS) $(PROF_FLAGS) -Werror $(FPICFLAGS) CFLAGS = -Wall -W $(OPTFLAGS) -g $(GCOV_FLAGS) $(PROF_FLAGS) -Werror $(FPICFLAGS) -Wshadow
LDFLAGS = $(OPTFLAGS) -g $(GCOV_FLAGS) $(PROF_FLAGS) LDFLAGS = $(OPTFLAGS) -g $(GCOV_FLAGS) $(PROF_FLAGS)
CPPFLAGS += -D_FILE_OFFSET_BITS=64 -D_LARGEFILE64_SOURCE CPPFLAGS += -D_FILE_OFFSET_BITS=64 -D_LARGEFILE64_SOURCE
...@@ -31,10 +31,10 @@ REGRESSION_TESTS = \ ...@@ -31,10 +31,10 @@ REGRESSION_TESTS = \
ybt-test \ ybt-test \
pma-test \ pma-test \
brt-serialize-test \ brt-serialize-test \
brt-test \
cachetable-test \ cachetable-test \
cachetable-test2 \ cachetable-test2 \
hashtest \ hashtest \
brt-test \
# This line intentially kept commented so I can have a \ on the end of the previous line # This line intentially kept commented so I can have a \ on the end of the previous line
BINS = $(REGRESSION_TESTS) \ BINS = $(REGRESSION_TESTS) \
...@@ -46,7 +46,6 @@ BINS = $(REGRESSION_TESTS) \ ...@@ -46,7 +46,6 @@ BINS = $(REGRESSION_TESTS) \
libs: log.o libs: log.o
bins: $(BINS) bins: $(BINS)
check: bins check: bins
./benchmark-test --valsize 256 --verify 1
$(DTOOL) ./ybt-test $(DTOOL) ./ybt-test
$(DTOOL) ./pma-test $(DTOOL) ./pma-test
$(DTOOL) ./cachetable-test $(DTOOL) ./cachetable-test
...@@ -54,6 +53,7 @@ check: bins ...@@ -54,6 +53,7 @@ check: bins
$(DTOOL) ./brt-serialize-test $(DTOOL) ./brt-serialize-test
$(DTOOL) ./brt-test $(DTOOL) ./brt-test
$(DTOOL) ./hashtest $(DTOOL) ./hashtest
./benchmark-test --valsize 256 --verify 1
# ./mdict-test # ./mdict-test
check-fanout: check-fanout:
...@@ -63,33 +63,40 @@ check-fanout: ...@@ -63,33 +63,40 @@ check-fanout:
let BRT_FANOUT=BRT_FANOUT+1; \ let BRT_FANOUT=BRT_FANOUT+1; \
done done
pma-test benchmark-test brt-test brt-serialize-test: LDFLAGS+=-lz
# pma: PROF_FLAGS=-fprofile-arcs -ftest-coverage # pma: PROF_FLAGS=-fprofile-arcs -ftest-coverage
BRT_INTERNAL_H_INCLUDES = brt-internal.h cachetable.h hashtable.h pma.h brt.h brttypes.h yerror.h ybt.h log.h ../include/db.h kv-pair.h memory.h crc.h
key.o: brttypes.h key.h key.o: brttypes.h key.h
pma-test.o: pma-internal.h pma.h yerror.h memory.h ../include/db.h list.h kv-pair.h brttypes.h ybt.h yerror.h pma-test.o: $(BRT_INTERNAL_H_INCLUDES) pma-internal.h pma.h list.h mempool.h
pma-test: pma.o memory.o key.o ybt.o log.o mempool.o pma-test: pma.o memory.o key.o ybt.o log.o mempool.o fingerprint.o
pma.o: pma.h yerror.h pma-internal.h memory.h key.h ybt.h brttypes.h log.h ../include/db.h pma.o: pma.h yerror.h pma-internal.h memory.h key.h ybt.h brttypes.h log.h ../include/db.h
ybt.o: ybt.h brttypes.h ../include/db.h ybt.o: ybt.h brttypes.h ../include/db.h
ybt-test: ybt-test.o ybt.o memory.o ybt-test: ybt-test.o ybt.o memory.o
ybt-test.o: ybt.h ../include/db.h ybt-test.o: ybt.h ../include/db.h
cachetable.o: cachetable.h hashfun.h cachetable.o: cachetable.h hashfun.h
brt-test: ybt.o brt.o hashtable.o pma.o memory.o brt-serialize.o cachetable.o header-io.o ybt.o key.o primes.o log.o mempool.o brt-test: ybt.o brt.o hashtable.o pma.o memory.o brt-serialize.o cachetable.o header-io.o ybt.o key.o primes.o log.o mempool.o brt-verify.o fingerprint.o
log.o: log-internal.h log.h log.o: log-internal.h log.h wbuf.h crc.h
brt-test.o brt.o: brt.h ../include/db.h hashtable.h pma.h brttypes.h cachetable.h brt-test.o brt.o: brt.h ../include/db.h hashtable.h pma.h brttypes.h cachetable.h
brt-serialize-test.o: pma.h yerror.h brt.h ../include/db.h memory.h hashtable.h brttypes.h brt-internal.h brt-serialize-test.o: $(BRT_INTERNAL_H_INCLUDES)
brt.o: brt.h ../include/db.h mdict.h pma.h brttypes.h memory.h brt-internal.h cachetable.h hashtable.h brt.o: $(BRT_INTERNAL_H_INCLUDES)
mdict.o: pma.h mdict.o: pma.h
hashtable.o: hashtable.h brttypes.h memory.h key.h yerror.h ../include/db.h hashfun.h hashtable.o: hashtable.h brttypes.h memory.h key.h yerror.h ../include/db.h hashfun.h
memory.o: memory.h memory.o: memory.h
primes.o: primes.h primes.o: primes.h
hashtest: hashtable.o memory.o primes.o hashtest: hashtable.o memory.o primes.o
brt-serialize.o: brt.h ../include/db.h cachetable.h memory.h mdict.h pma.h brttypes.h brt-internal.h hashtable.h wbuf.h rbuf.h brt-serialize.o: $(BRT_INTERNAL_H_INCLUDES) key.h wbuf.h rbuf.h
header-io.o: brttypes.h brt-internal.h brt.h ../include/db.h memory.h header-io.o: $(BRT_INTERNAL_H_INCLUDES)
mdict-test: hashtable.o pma.o memory.o mdict-test: hashtable.o pma.o memory.o
brt-bigtest: memory.o ybt.o brt.o pma.o cachetable.o key.o hashtable.o brt-serialize.o brt-bigtest: memory.o ybt.o brt.o pma.o cachetable.o key.o hashtable.o brt-serialize.o
brt-bigtest.o: brt.h ../include/db.h brt-bigtest.o: brt.h ../include/db.h
log-test: log.o memory.o log-test: log.o memory.o
brt-verify.o: $(BRT_INTERNAL_H_INCLUDES)
fingerprint.o: $(BRT_INTERNAL_H_INCLUDES)
brt-serialize-test: brt-serialize-test.o brt-serialize.o memory.o hashtable.o pma.o key.o ybt.o brt.o cachetable.o primes.o log.o mempool.o brt-verify.o fingerprint.o
brt-serialize-test: brt-serialize-test.o brt-serialize.o memory.o hashtable.o pma.o key.o ybt.o brt.o cachetable.o primes.o log.o mempool.o
cachetable-test.o: cachetable.h memory.h cachetable-test.o: cachetable.h memory.h
cachetable-test: cachetable.o memory.o cachetable-test.o primes.o cachetable-test: cachetable.o memory.o cachetable-test.o primes.o
...@@ -97,7 +104,7 @@ cachetable-test: cachetable.o memory.o cachetable-test.o primes.o ...@@ -97,7 +104,7 @@ cachetable-test: cachetable.o memory.o cachetable-test.o primes.o
cachetable-test2.o: cachetable.h memory.h cachetable-test2.o: cachetable.h memory.h
cachetable-test2: cachetable.o memory.o cachetable-test2.o primes.o cachetable-test2: cachetable.o memory.o cachetable-test2.o primes.o
benchmark-test: benchmark-test.o ybt.o memory.o brt.o pma.o cachetable.o key.o hashtable.o brt-serialize.o primes.o log.o mempool.o benchmark-test: benchmark-test.o ybt.o memory.o brt.o pma.o cachetable.o key.o hashtable.o brt-serialize.o primes.o log.o mempool.o brt-verify.o fingerprint.o
benchmark-test.o: brt.h ../include/db.h benchmark-test.o: brt.h ../include/db.h
clean: clean:
......
...@@ -30,7 +30,7 @@ BRT t; ...@@ -30,7 +30,7 @@ BRT t;
void setup (void) { void setup (void) {
int r; int r;
unlink(fname); unlink(fname);
r = brt_create_cachetable(&ct, 0); assert(r==0); r = brt_create_cachetable(&ct, 0, ZERO_LSN, NULL_LOGGER); assert(r==0);
r = open_brt(fname, 0, 1, &t, nodesize, ct, default_compare_fun); assert(r==0); r = open_brt(fname, 0, 1, &t, nodesize, ct, default_compare_fun); assert(r==0);
} }
...@@ -69,6 +69,7 @@ long long llrandom (void) { ...@@ -69,6 +69,7 @@ long long llrandom (void) {
void random_insert_below (long long below) { void random_insert_below (long long below) {
long long i; long long i;
assert(0 < below);
for (i=0; i<ITEMS_TO_INSERT_PER_ITERATION; i++) { for (i=0; i<ITEMS_TO_INSERT_PER_ITERATION; i++) {
insert(llrandom()%below); insert(llrandom()%below);
} }
...@@ -79,7 +80,7 @@ double tdiff (struct timeval *a, struct timeval *b) { ...@@ -79,7 +80,7 @@ double tdiff (struct timeval *a, struct timeval *b) {
} }
void biginsert (long long n_elements, struct timeval *starttime) { void biginsert (long long n_elements, struct timeval *starttime) {
long i; long long i;
struct timeval t1,t2; struct timeval t1,t2;
int iteration; int iteration;
for (i=0, iteration=0; i<n_elements; i+=ITEMS_TO_INSERT_PER_ITERATION, iteration++) { for (i=0, iteration=0; i<n_elements; i+=ITEMS_TO_INSERT_PER_ITERATION, iteration++) {
......
static int brt_root_put_cmd_XY (BRT brt, BRT_CMD *md, TOKUTXN txn) {
int r;
if ((r = toku_read_and_pin_brt_header(brt->cf, &brt->h))) {
if (0) { died0: toku_unpin_brt_header(brt); }
return r;
}
CACHEKEY *rootp = toku_calculate_root_offset_pointer(brt);
if ((r=cachetable_get_and_pin(brt->cf, *rootp, &node_v, NULL,
brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)brt->h->nodesize))) {
goto died0;
}
node=node_v;
if (0) {
died1:
cachetable_unpin(brt->cf, node->thisnodename, node->dirty, brtnodesize(node));
goto died0;
}
node->parent_brtnode = 0;
result = brtnode_put_cmd_XY(brt, node, cmd, txn);
// It's still pinned, and it may be too big or the fanout may be too large.
if (node->height>0 && node->u.n.n_children==TREE_FANOUT) {
// Must split it.
r = do_split_node(node, &nodea, &nodeb, &splitk); // On error: node is unmodified
if (r!=0) goto died1;
// node is garbage, and nodea and nodeb are pinned
r = brt_init_new_root(brt, nodea, nodeb, splitk, rootp); // On error: root is unmodified and nodea and nodeb are both unpinned
if (r!=0) goto died0;
// nodea and nodeb are unpinned, and the root has been fixed
// up to point at a new node (*rootp) containing two children
// (nodea and nodeb). nodea and nodeb are unpinned. *rootp is still pinned
node = *rootp;
}
// Now the fanout is small enough.
// But the node could still be too large.
if (serialize_brtnode_size(node)>node->nodesize) {
}
}
...@@ -2,14 +2,15 @@ ...@@ -2,14 +2,15 @@
#include "hashtable.h" #include "hashtable.h"
#include "pma.h" #include "pma.h"
#include "brt.h" #include "brt.h"
//#include "pma.h" #include "crc.h"
#ifndef BRT_FANOUT #ifndef BRT_FANOUT
#define BRT_FANOUT 16 #define BRT_FANOUT 16
#endif #endif
enum { TREE_FANOUT = BRT_FANOUT }; //, NODESIZE=1<<20 }; enum { TREE_FANOUT = BRT_FANOUT };
enum { KEY_VALUE_OVERHEAD = 8 }; /* Must store the two lengths. */ enum { KEY_VALUE_OVERHEAD = 8 }; /* Must store the two lengths. */
enum { BRT_CMD_OVERHEAD = 1 }; enum { BRT_CMD_OVERHEAD = 1 };
enum { BRT_DEFAULT_NODE_SIZE = 1 << 20 };
struct nodeheader_in_file { struct nodeheader_in_file {
int n_in_buffer; int n_in_buffer;
...@@ -22,21 +23,28 @@ typedef struct brtnode *BRTNODE; ...@@ -22,21 +23,28 @@ typedef struct brtnode *BRTNODE;
/* Internal nodes. */ /* Internal nodes. */
struct brtnode { struct brtnode {
enum typ_tag tag; enum typ_tag tag;
BRT brt; // The containing BRT
unsigned int nodesize; unsigned int nodesize;
diskoff thisnodename; DISKOFF thisnodename; // The size of the node allocated on disk. Not all is necessarily in use.
LSN lsn; // Need the LSN as of the most recent modification.
int layout_version; // What version of the data structure?
BRTNODE parent_brtnode; /* Invariant: The parent of an in-memory node must be in main memory. This is so we can find and update the down pointer when we change the diskoff of a node. */ BRTNODE parent_brtnode; /* Invariant: The parent of an in-memory node must be in main memory. This is so we can find and update the down pointer when we change the diskoff of a node. */
int height; /* height is always >= 0. 0 for leaf, >0 for nonleaf. */ int height; /* height is always >= 0. 0 for leaf, >0 for nonleaf. */
int dirty; u_int32_t rand4fingerprint;
u_int32_t local_fingerprint; /* For leaves this is everything in the buffer. For nonleaves, this is everything in the hash tables, but does not include child subtree fingerprints. */
int dirty;
union node { union node {
struct nonleaf { struct nonleaf {
// Don't actually store the subree fingerprint in the in-memory data structure.
int n_children; /* if n_children==TREE_FANOUT+1 then the tree needs to be rebalanced. */ int n_children; /* if n_children==TREE_FANOUT+1 then the tree needs to be rebalanced. */
u_int32_t child_subtree_fingerprints[TREE_FANOUT+1];
bytevec childkeys[TREE_FANOUT]; /* Pivot keys. Child 0's keys are <= childkeys[0]. Child 1's keys are <= childkeys[1]. bytevec childkeys[TREE_FANOUT]; /* Pivot keys. Child 0's keys are <= childkeys[0]. Child 1's keys are <= childkeys[1].
Note: It is possible that Child 1's keys are == to child 0's key's, so it is Note: It is possible that Child 1's keys are == to child 0's key's, so it is
not necessarily true that child 1's keys are > childkeys[0]. not necessarily true that child 1's keys are > childkeys[0].
However, in the absense of duplicate keys, child 1's keys *are* > childkeys[0]. */ However, in the absense of duplicate keys, child 1's keys *are* > childkeys[0]. */
unsigned int childkeylens[TREE_FANOUT]; unsigned int childkeylens[TREE_FANOUT];
unsigned int totalchildkeylens; unsigned int totalchildkeylens;
diskoff children[TREE_FANOUT+1]; /* unused if height==0 */ /* Note: The last element of these arrays is used only temporarily while splitting a node. */ DISKOFF children[TREE_FANOUT+1]; /* unused if height==0 */ /* Note: The last element of these arrays is used only temporarily while splitting a node. */
HASHTABLE htables[TREE_FANOUT+1]; HASHTABLE htables[TREE_FANOUT+1];
unsigned int n_bytes_in_hashtable[TREE_FANOUT+1]; /* how many bytes are in each hashtable (including overheads) */ unsigned int n_bytes_in_hashtable[TREE_FANOUT+1]; /* how many bytes are in each hashtable (including overheads) */
unsigned int n_bytes_in_hashtables; unsigned int n_bytes_in_hashtables;
...@@ -52,12 +60,13 @@ struct brtnode { ...@@ -52,12 +60,13 @@ struct brtnode {
struct brt_header { struct brt_header {
int dirty; int dirty;
unsigned int nodesize; unsigned int nodesize;
diskoff freelist; DISKOFF freelist;
diskoff unused_memory; DISKOFF unused_memory;
diskoff unnamed_root; DISKOFF unnamed_root;
int n_named_roots; /* -1 if the only one is unnamed */ int n_named_roots; /* -1 if the only one is unnamed */
char **names; char **names;
diskoff *roots; DISKOFF *roots;
unsigned int flags;
}; };
...@@ -69,21 +78,24 @@ struct brt { ...@@ -69,21 +78,24 @@ struct brt {
BRT_CURSOR cursors_head, cursors_tail; BRT_CURSOR cursors_head, cursors_tail;
unsigned int nodesize;
unsigned int flags;
int (*compare_fun)(DB*,const DBT*,const DBT*); int (*compare_fun)(DB*,const DBT*,const DBT*);
int (*dup_compare)(DB*,const DBT*,const DBT*);
void *skey,*sval; /* Used for DBT return values. */ void *skey,*sval; /* Used for DBT return values. */
}; };
/* serialization code */ /* serialization code */
void serialize_brtnode_to(int fd, diskoff off, diskoff size, BRTNODE node); void serialize_brtnode_to(int fd, DISKOFF off, DISKOFF size, BRTNODE node);
int deserialize_brtnode_from (int fd, diskoff off, BRTNODE *brtnode, int nodesize); int deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int nodesize);
unsigned int serialize_brtnode_size(BRTNODE node); /* How much space will it take? */ unsigned int serialize_brtnode_size(BRTNODE node); /* How much space will it take? */
int keycompare (bytevec key1, ITEMLEN key1len, bytevec key2, ITEMLEN key2len); int keycompare (bytevec key1, ITEMLEN key1len, bytevec key2, ITEMLEN key2len);
void verify_counts(BRTNODE); void verify_counts(BRTNODE);
int serialize_brt_header_to (int fd, struct brt_header *h); int serialize_brt_header_to (int fd, struct brt_header *h);
int deserialize_brtheader_from (int fd, diskoff off, struct brt_header **brth); int deserialize_brtheader_from (int fd, DISKOFF off, struct brt_header **brth);
/* return the size of a tree node */ /* return the size of a tree node */
long brtnode_size (BRTNODE node); long brtnode_size (BRTNODE node);
...@@ -169,3 +181,21 @@ struct brt_cmd { ...@@ -169,3 +181,21 @@ struct brt_cmd {
}; };
typedef struct brt_cmd BRT_CMD; typedef struct brt_cmd BRT_CMD;
struct brtenv {
CACHETABLE ct;
TOKULOGGER logger;
long long checksum_number;
// SPINLOCK checkpointing;
};
extern cachetable_flush_func_t brtnode_flush_callback;
extern cachetable_fetch_func_t brtnode_fetch_callback;
extern int toku_read_and_pin_brt_header (CACHEFILE cf, struct brt_header **header);
extern int toku_unpin_brt_header (BRT brt);
extern CACHEKEY* toku_calculate_root_offset_pointer (BRT brt);
static const BRTNODE null_brtnode=0;
extern u_int32_t toku_calccrc32_kvpair (const void *key, int keylen, const void *val, int vallen);
extern u_int32_t toku_calccrc32_cmd (int type, const void *key, int keylen, const void *val, int vallen);
extern u_int32_t toku_calccrc32_cmdstruct (BRT_CMD *cmd);
#include "brt.h"
#include "memory.h"
#include "brt-internal.h" #include "brt-internal.h"
#include <fcntl.h> #include <fcntl.h>
#include <assert.h> #include <assert.h>
#include <string.h> #include <string.h>
#include <zlib.h>
#include <arpa/inet.h>
#include <stdlib.h>
void test_serialize(void) { void test_serialize(void) {
// struct brt source_brt; // struct brt source_brt;
...@@ -12,41 +13,59 @@ void test_serialize(void) { ...@@ -12,41 +13,59 @@ void test_serialize(void) {
struct brtnode sn, *dn; struct brtnode sn, *dn;
int fd = open("brt-serialize-test.brt", O_RDWR|O_CREAT, 0777); int fd = open("brt-serialize-test.brt", O_RDWR|O_CREAT, 0777);
int r; int r;
const u_int32_t randval = random();
assert(fd>=0); assert(fd>=0);
// source_brt.fd=fd; // source_brt.fd=fd;
char *hello_string; char *hello_string;
sn.nodesize = nodesize; sn.nodesize = nodesize;
sn.thisnodename = sn.nodesize*20; sn.thisnodename = sn.nodesize*20;
sn.lsn.lsn = 123456;
sn.layout_version = 0;
sn.height = 1; sn.height = 1;
sn.rand4fingerprint = randval;
sn.local_fingerprint = 0;
sn.u.n.n_children = 2; sn.u.n.n_children = 2;
sn.u.n.childkeys[0] = hello_string = toku_strdup("hello"); sn.u.n.childkeys[0] = hello_string = toku_strdup("hello");
sn.u.n.childkeylens[0] = 6; sn.u.n.childkeylens[0] = 6;
sn.u.n.totalchildkeylens = 6; sn.u.n.totalchildkeylens = 6;
sn.u.n.children[0] = sn.nodesize*30; sn.u.n.children[0] = sn.nodesize*30;
sn.u.n.children[1] = sn.nodesize*35; sn.u.n.children[1] = sn.nodesize*35;
sn.u.n.child_subtree_fingerprints[0] = random();
sn.u.n.child_subtree_fingerprints[1] = random();
r = toku_hashtable_create(&sn.u.n.htables[0]); assert(r==0); r = toku_hashtable_create(&sn.u.n.htables[0]); assert(r==0);
r = toku_hashtable_create(&sn.u.n.htables[1]); assert(r==0); r = toku_hashtable_create(&sn.u.n.htables[1]); assert(r==0);
r = toku_hash_insert(sn.u.n.htables[0], "a", 2, "aval", 5, BRT_NONE); assert(r==0); r = toku_hash_insert(sn.u.n.htables[0], "a", 2, "aval", 5, BRT_NONE); assert(r==0); sn.local_fingerprint += randval*toku_calccrc32_cmd(BRT_NONE, "a", 2, "aval", 5);
r = toku_hash_insert(sn.u.n.htables[0], "b", 2, "bval", 5, BRT_NONE); assert(r==0); r = toku_hash_insert(sn.u.n.htables[0], "b", 2, "bval", 5, BRT_NONE); assert(r==0); sn.local_fingerprint += randval*toku_calccrc32_cmd(BRT_NONE, "b", 2, "bval", 5);
r = toku_hash_insert(sn.u.n.htables[1], "x", 2, "xval", 5, BRT_NONE); assert(r==0); r = toku_hash_insert(sn.u.n.htables[1], "x", 2, "xval", 5, BRT_NONE); assert(r==0); sn.local_fingerprint += randval*toku_calccrc32_cmd(BRT_NONE, "x", 2, "xval", 5);
sn.u.n.n_bytes_in_hashtables = 3*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+2+5); sn.u.n.n_bytes_in_hashtables = 3*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+2+5);
serialize_brtnode_to(fd, sn.nodesize*20, sn.nodesize, &sn); assert(r==0); serialize_brtnode_to(fd, sn.nodesize*20, sn.nodesize, &sn); assert(r==0);
r = deserialize_brtnode_from(fd, nodesize*20, &dn, nodesize); r = deserialize_brtnode_from(fd, nodesize*20, &dn, nodesize);
assert(r==0);
assert(dn->thisnodename==nodesize*20); assert(dn->thisnodename==nodesize*20);
assert(dn->lsn.lsn==123456);
assert(dn->layout_version ==0);
assert(dn->height == 1); assert(dn->height == 1);
assert(dn->rand4fingerprint==randval);
assert(dn->u.n.n_children==2); assert(dn->u.n.n_children==2);
assert(strcmp(dn->u.n.childkeys[0], "hello")==0); assert(strcmp(dn->u.n.childkeys[0], "hello")==0);
assert(dn->u.n.childkeylens[0]==6); assert(dn->u.n.childkeylens[0]==6);
assert(dn->u.n.totalchildkeylens==6); assert(dn->u.n.totalchildkeylens==6);
assert(dn->u.n.children[0]==nodesize*30); assert(dn->u.n.children[0]==nodesize*30);
assert(dn->u.n.children[1]==nodesize*35); assert(dn->u.n.children[1]==nodesize*35);
{
int i;
for (i=0; i<2; i++) {
assert(dn->u.n.child_subtree_fingerprints[i]==sn.u.n.child_subtree_fingerprints[i]);
}
assert(dn->local_fingerprint==sn.local_fingerprint);
}
{ {
bytevec data; ITEMLEN datalen; int type; bytevec data; ITEMLEN datalen; int type;
int r = toku_hash_find(dn->u.n.htables[0], "a", 2, &data, &datalen, &type); r = toku_hash_find(dn->u.n.htables[0], "a", 2, &data, &datalen, &type);
assert(r==0); assert(r==0);
assert(strcmp(data,"aval")==0); assert(strcmp(data,"aval")==0);
assert(datalen==5); assert(datalen==5);
...@@ -64,7 +83,7 @@ void test_serialize(void) { ...@@ -64,7 +83,7 @@ void test_serialize(void) {
assert(datalen==5); assert(datalen==5);
assert(type == BRT_NONE); assert(type == BRT_NONE);
} }
// brtnode_free(&dn); brtnode_free(&dn);
toku_free(hello_string); toku_free(hello_string);
toku_hashtable_free(&sn.u.n.htables[0]); toku_hashtable_free(&sn.u.n.htables[0]);
......
This diff is collapsed.
This diff is collapsed.
/* Verify a BRT. */
/* Check:
* the fingerprint of every node (local check)
* the child's fingerprint matches the parent's copy
* the tree is of uniform depth (and the height is correct at every node)
* For non-dup trees: the values to the left are < the values to the right
* and < the pivot
* For dup trees: the values to the left are <= the values to the right
* the pivots are < or <= left values (according to the PresentL bit)
* the pivots are > or >= right values (according to the PresentR bit)
*
* Note: We don't yet have DUP trees, so thee checks on duplicate trees are unimplemented. (Nov 1 2007)
*/
#include "brt-internal.h"
#include <assert.h>
static void verify_local_fingerprint (BRTNODE node) {
u_int32_t fp=0;
int i;
if (node->height>0) {
for (i=0; i<node->u.n.n_children; i++)
HASHTABLE_ITERATE(node->u.n.htables[i], key, keylen, data, datalen, type,
({
fp += node->rand4fingerprint * toku_calccrc32_cmd(type, key, keylen, data, datalen);
}));
assert(fp==node->local_fingerprint);
} else {
pma_verify_fingerprint(node->u.l.buffer, node->rand4fingerprint, node->local_fingerprint);
}
}
static void verify_parent_fingerprint (BRTNODE node) {
BRTNODE parent=node->parent_brtnode;
u_int32_t subtree_fingerprint=node->local_fingerprint;
if (node->height>0) {
int i;
for (i=0; i<node->u.n.n_children; i++) {
subtree_fingerprint+=node->u.n.child_subtree_fingerprints[i];
}
}
if (parent) {
int i;
assert(parent->height>0);
for (i=0; i<parent->u.n.n_children; i++) {
if (parent->u.n.children[i]==node->thisnodename) {
assert(parent->u.n.child_subtree_fingerprints[i]==subtree_fingerprint);
return;
}
}
assert(0); // no parent matches
}
}
int verify_brtnode (BRT brt, DISKOFF off, bytevec lorange, ITEMLEN lolen, bytevec hirange, ITEMLEN hilen, int recurse, BRTNODE parent_brtnode) {
int result=0;
BRTNODE node;
void *node_v;
int r;
if ((r = cachetable_get_and_pin(brt->cf, off, &node_v, NULL,
brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)brt->h->nodesize)))
return r;
//printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v);
node=node_v;
node->parent_brtnode = parent_brtnode;
verify_local_fingerprint(node);
verify_parent_fingerprint(node);
if (node->height>0) {
int i;
for (i=0; i< node->u.n.n_children-1; i++) {
bytevec thislorange,thishirange;
ITEMLEN thislolen, thishilen;
if (node->u.n.n_children==0 || i==0) {
thislorange=lorange;
thislolen =lolen;
} else {
thislorange=node->u.n.childkeys[i-1];
thislolen =node->u.n.childkeylens[i-1];
}
if (node->u.n.n_children==0 || i+1>=node->u.n.n_children) {
thishirange=hirange;
thishilen =hilen;
} else {
thishirange=node->u.n.childkeys[i];
thishilen =node->u.n.childkeylens[i];
}
{
void verify_pair (bytevec key, unsigned int keylen,
bytevec data __attribute__((__unused__)),
unsigned int datalen __attribute__((__unused__)),
int type __attribute__((__unused__)),
void *ignore __attribute__((__unused__))) {
if (thislorange) assert(keycompare(thislorange,thislolen,key,keylen)<0);
if (thishirange && keycompare(key,keylen,thishirange,thishilen)>0) {
printf("%s:%d in buffer %d key %s is bigger than %s\n", __FILE__, __LINE__, i, (char*)key, (char*)thishirange);
result=1;
}
}
toku_hashtable_iterate(node->u.n.htables[i], verify_pair, 0);
}
}
for (i=0; i<node->u.n.n_children; i++) {
if (i>0) {
if (lorange) assert(keycompare(lorange,lolen, node->u.n.childkeys[i-1], node->u.n.childkeylens[i-1])<0);
if (hirange) assert(keycompare(node->u.n.childkeys[i-1], node->u.n.childkeylens[i-1], hirange, hilen)<=0);
}
if (recurse) {
result|=verify_brtnode(brt, node->u.n.children[i],
(i==0) ? lorange : node->u.n.childkeys[i-1],
(i==0) ? lolen : node->u.n.childkeylens[i-1],
(i==node->u.n.n_children-1) ? hirange : node->u.n.childkeys[i],
(i==node->u.n.n_children-1) ? hilen : node->u.n.childkeylens[i],
recurse,
node);
}
}
}
if ((r = cachetable_unpin(brt->cf, off, 0, 0))) return r;
return result;
}
int verify_brt (BRT brt) {
int r;
CACHEKEY *rootp;
if ((r = toku_read_and_pin_brt_header(brt->cf, &brt->h))) {
if (0) { died0: toku_unpin_brt_header(brt); }
return r;
}
rootp = toku_calculate_root_offset_pointer(brt);
if ((r=verify_brtnode(brt, *rootp, 0, 0, 0, 0, 1, null_brtnode))) goto died0;
if ((r = toku_unpin_brt_header(brt))!=0) return r;
return 0;
}
This diff is collapsed.
...@@ -11,8 +11,15 @@ ...@@ -11,8 +11,15 @@
#include "log.h" #include "log.h"
typedef struct brt *BRT; typedef struct brt *BRT;
int open_brt (const char *fname, const char *dbname, int is_create, BRT *, int nodesize, CACHETABLE, int(*)(DB*,const DBT*,const DBT*)); int open_brt (const char *fname, const char *dbname, int is_create, BRT *, int nodesize, CACHETABLE, int(*)(DB*,const DBT*,const DBT*));
//int brt_create (BRT **, int nodesize, int n_nodes_in_cache); /* the nodesize and n_nodes in cache really should be separately configured. */
//int brt_open (BRT *, char *fname, char *dbname); int brt_create(BRT *);
int brt_set_flags(BRT, int flags);
int brt_set_nodesize(BRT, int nodesize);
int brt_set_bt_compare(BRT, int (*bt_compare)(DB *, const DBT*, const DBT*));
int brt_set_dup_compare(BRT, int (*dup_compare)(DB *, const DBT*, const DBT*));
int brt_set_cachetable(BRT, CACHETABLE);
int brt_open(BRT, const char *fname, const char *dbname, int is_create, CACHETABLE ct);
int brt_insert (BRT, DBT *, DBT *, DB*, TOKUTXN); int brt_insert (BRT, DBT *, DBT *, DB*, TOKUTXN);
int brt_lookup (BRT brt, DBT *k, DBT *v, DB*db); int brt_lookup (BRT brt, DBT *k, DBT *v, DB*db);
int brt_delete (BRT brt, DBT *k, DB *db); int brt_delete (BRT brt, DBT *k, DB *db);
...@@ -22,12 +29,11 @@ void brt_fsync (BRT); /* fsync, but don't clear the caches. */ ...@@ -22,12 +29,11 @@ void brt_fsync (BRT); /* fsync, but don't clear the caches. */
void brt_flush (BRT); /* fsync and clear the caches. */ void brt_flush (BRT); /* fsync and clear the caches. */
int brt_create_cachetable (CACHETABLE *t, int n_cachlines /* Pass 0 if you want the default. */);
/* create and initialize a cache table /* create and initialize a cache table
hashsize is the initialize size of the lookup table cachesize is the upper limit on the size of the size of the values in the table
cachesize is the upper limit on the size of the size of the values in the table */ pass 0 if you want the default */
int brt_create_cachetable_size (CACHETABLE *t, int hashsize, long cachesize);
int brt_create_cachetable(CACHETABLE *t, long cachesize, LSN initial_lsn, TOKULOGGER);
extern int brt_debug_mode; extern int brt_debug_mode;
int verify_brt (BRT brt); int verify_brt (BRT brt);
...@@ -40,4 +46,7 @@ int brt_cursor_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int brtc_flags, DB *d ...@@ -40,4 +46,7 @@ int brt_cursor_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int brtc_flags, DB *d
int brt_cursor_delete(BRT_CURSOR cursor, int flags); int brt_cursor_delete(BRT_CURSOR cursor, int flags);
int brt_cursor_close (BRT_CURSOR curs); int brt_cursor_close (BRT_CURSOR curs);
typedef struct brtenv *BRTENV;
int brtenv_checkpoint (BRTENV env);
#endif #endif
#ifndef BRTTYPES_H #ifndef BRTTYPES_H
#define BRTTYPES_H #define BRTTYPES_H
#include <sys/types.h>
#define _XOPEN_SOURCE 500 #define _XOPEN_SOURCE 500
#define _FILE_OFFSET_BITS 64 #define _FILE_OFFSET_BITS 64
typedef unsigned int ITEMLEN; typedef unsigned int ITEMLEN;
typedef const void *bytevec; typedef const void *bytevec;
//typedef const void *bytevec; //typedef const void *bytevec;
typedef long long diskoff; /* Offset in a disk. -1 is the NULL pointer. */ typedef long long DISKOFF; /* Offset in a disk. -1 is the NULL pointer. */
typedef long long TXNID; typedef long long TXNID;
/* Make the LSN be a struct instead of an integer so that we get better type checking. */
typedef struct __toku_lsn { u_int64_t lsn; } LSN;
#define ZERO_LSN ((LSN){0})
/* Make the FILEID a struct for the same reason. */
typedef struct __toku_fileid { u_int32_t fileid; } FILENUM;
typedef enum __toku_bool { FALSE=0, TRUE=1} BOOL;
typedef struct tokulogger *TOKULOGGER;
#define NULL_LOGGER ((TOKULOGGER)0)
typedef struct tokutxn *TOKUTXN;
#endif #endif
This diff is collapsed.
...@@ -58,7 +58,14 @@ static void file_is_not_present(CACHEFILE cf) { ...@@ -58,7 +58,14 @@ static void file_is_not_present(CACHEFILE cf) {
} }
static void flush_forchain (CACHEFILE f __attribute__((__unused__)), CACHEKEY key, void *value, long size __attribute__((__unused__)), int write_me __attribute__((__unused__)), int keep_me __attribute__((__unused__))) { static void flush_forchain (CACHEFILE f __attribute__((__unused__)),
CACHEKEY key,
void *value,
long size __attribute__((__unused__)),
BOOL write_me __attribute__((__unused__)),
BOOL keep_me __attribute__((__unused__)),
LSN modified_lsn __attribute__((__unused__)),
BOOL rename_p __attribute__((__unused__))) {
int *v = value; int *v = value;
//cachetable_print_state(ct); //cachetable_print_state(ct);
//printf("Flush %lld %d\n", key, (int)value); //printf("Flush %lld %d\n", key, (int)value);
...@@ -67,9 +74,10 @@ static void flush_forchain (CACHEFILE f __attribute__((__unused__)), CACHEKEY ke ...@@ -67,9 +74,10 @@ static void flush_forchain (CACHEFILE f __attribute__((__unused__)), CACHEKEY ke
//print_ints(); //print_ints();
} }
static int fetch_forchain (CACHEFILE f __attribute__((__unused__)), CACHEKEY key, void**value, long *sizep __attribute__((__unused__)), void*extraargs) { static int fetch_forchain (CACHEFILE f __attribute__((__unused__)), CACHEKEY key, void**value, long *sizep __attribute__((__unused__)), void*extraargs, LSN *written_lsn) {
assert((long)extraargs==(long)key); assert((long)extraargs==(long)key);
*value = (void*)(long)key; *value = (void*)(long)key;
written_lsn->lsn = 0;
return 0; return 0;
} }
...@@ -93,9 +101,9 @@ void test_chaining (void) { ...@@ -93,9 +101,9 @@ void test_chaining (void) {
char fname[N_FILES][FILENAME_LEN]; char fname[N_FILES][FILENAME_LEN];
int r; int r;
long i, trial; long i, trial;
r = create_cachetable(&ct, N_PRESENT_LIMIT, N_PRESENT_LIMIT); assert(r==0); r = create_cachetable(&ct, N_PRESENT_LIMIT, ZERO_LSN, NULL_LOGGER); assert(r==0);
for (i=0; i<N_FILES; i++) { for (i=0; i<N_FILES; i++) {
int r = snprintf(fname[i], FILENAME_LEN, "cachetabletest2.%ld.dat", i); r = snprintf(fname[i], FILENAME_LEN, "cachetabletest2.%ld.dat", i);
assert(r>0 && r<FILENAME_LEN); assert(r>0 && r<FILENAME_LEN);
unlink(fname[i]); unlink(fname[i]);
r = cachetable_openf(&f[i], ct, fname[i], O_RDWR|O_CREAT, 0777); assert(r==0); r = cachetable_openf(&f[i], ct, fname[i], O_RDWR|O_CREAT, 0777); assert(r==0);
......
...@@ -29,12 +29,15 @@ struct ctpair { ...@@ -29,12 +29,15 @@ struct ctpair {
PAIR next,prev; // In LRU list. PAIR next,prev; // In LRU list.
PAIR hash_chain; PAIR hash_chain;
CACHEFILE cachefile; CACHEFILE cachefile;
cachetable_flush_func_t flush_callback; CACHETABLE_FLUSH_FUNC_T flush_callback;
cachetable_fetch_func_t fetch_callback; CACHETABLE_FETCH_FUNC_T fetch_callback;
void*extraargs; void *extraargs;
int verify_flag; /* Used in verify_cachetable() */ int verify_flag; /* Used in verify_cachetable() */
LSN modified_lsn; // What was the LSN when modified (undefined if not dirty)
LSN written_lsn; // What was the LSN when written (we need to get this information when we fetch)
}; };
// The cachetable is as close to an ENV as we get.
struct cachetable { struct cachetable {
enum typ_tag tag; enum typ_tag tag;
int n_in_table; int n_in_table;
...@@ -44,6 +47,8 @@ struct cachetable { ...@@ -44,6 +47,8 @@ struct cachetable {
CACHEFILE cachefiles; CACHEFILE cachefiles;
long size_current, size_limit; long size_current, size_limit;
int primeidx; int primeidx;
LSN lsn_of_checkpoint; // the most recent checkpoint in the log.
TOKULOGGER logger;
}; };
struct fileid { struct fileid {
...@@ -57,9 +62,10 @@ struct cachefile { ...@@ -57,9 +62,10 @@ struct cachefile {
int fd; /* Bug: If a file is opened read-only, then it is stuck in read-only. If it is opened read-write, then subsequent writers can write to it too. */ int fd; /* Bug: If a file is opened read-only, then it is stuck in read-only. If it is opened read-write, then subsequent writers can write to it too. */
CACHETABLE cachetable; CACHETABLE cachetable;
struct fileid fileid; struct fileid fileid;
FILENUM filenum;
}; };
int create_cachetable(CACHETABLE *result, int table_size __attribute__((unused)), long size_limit) { int create_cachetable(CACHETABLE *result, long size_limit, LSN initial_lsn, TOKULOGGER logger) {
TAGMALLOC(CACHETABLE, t); TAGMALLOC(CACHETABLE, t);
int i; int i;
t->n_in_table = 0; t->n_in_table = 0;
...@@ -74,6 +80,8 @@ int create_cachetable(CACHETABLE *result, int table_size __attribute__((unused)) ...@@ -74,6 +80,8 @@ int create_cachetable(CACHETABLE *result, int table_size __attribute__((unused))
t->cachefiles = 0; t->cachefiles = 0;
t->size_current = 0; t->size_current = 0;
t->size_limit = size_limit; t->size_limit = size_limit;
t->lsn_of_checkpoint = initial_lsn;
t->logger = logger;
*result = t; *result = t;
return 0; return 0;
} }
...@@ -257,13 +265,25 @@ static PAIR remove_from_hash_chain (PAIR remove_me, PAIR list) { ...@@ -257,13 +265,25 @@ static PAIR remove_from_hash_chain (PAIR remove_me, PAIR list) {
return list; return list;
} }
// Predicate to determine if a node must be renamed. Nodes are renamed on the time they are written
// after a checkpoint.
// Thus we need to rename it if it is dirty,
// if it has been modified within the current checkpoint regime (hence non-strict inequality)
// and the last time it was written was in a previous checkpoint regime (strict inequality)
static BOOL need_to_rename_p (CACHETABLE t, PAIR p) {
return (p->dirty
&& p->modified_lsn.lsn>=t->lsn_of_checkpoint.lsn // nonstrict
&& p->written_lsn.lsn < t->lsn_of_checkpoint.lsn); // strict
}
static void flush_and_remove (CACHETABLE t, PAIR remove_me, int write_me) { static void flush_and_remove (CACHETABLE t, PAIR remove_me, int write_me) {
lru_remove(t, remove_me); lru_remove(t, remove_me);
//printf("flush_callback(%lld,%p)\n", remove_me->key, remove_me->value); //printf("flush_callback(%lld,%p)\n", remove_me->key, remove_me->value);
WHEN_TRACE_CT(printf("%s:%d CT flush_callback(%lld, %p, dirty=%d, 0)\n", __FILE__, __LINE__, remove_me->key, remove_me->value, remove_me->dirty && write_me)); WHEN_TRACE_CT(printf("%s:%d CT flush_callback(%lld, %p, dirty=%d, 0)\n", __FILE__, __LINE__, remove_me->key, remove_me->value, remove_me->dirty && write_me));
//printf("%s:%d TAG=%x p=%p\n", __FILE__, __LINE__, remove_me->tag, remove_me); //printf("%s:%d TAG=%x p=%p\n", __FILE__, __LINE__, remove_me->tag, remove_me);
//printf("%s:%d dirty=%d\n", __FILE__, __LINE__, remove_me->dirty); //printf("%s:%d dirty=%d\n", __FILE__, __LINE__, remove_me->dirty);
remove_me->flush_callback(remove_me->cachefile, remove_me->key, remove_me->value, remove_me->size, remove_me->dirty && write_me, 0); remove_me->flush_callback(remove_me->cachefile, remove_me->key, remove_me->value, remove_me->size, remove_me->dirty && write_me, 0,
t->lsn_of_checkpoint, need_to_rename_p(t, remove_me));
t->n_in_table--; t->n_in_table--;
// Remove it from the hash chain. // Remove it from the hash chain.
{ {
...@@ -274,14 +294,6 @@ static void flush_and_remove (CACHETABLE t, PAIR remove_me, int write_me) { ...@@ -274,14 +294,6 @@ static void flush_and_remove (CACHETABLE t, PAIR remove_me, int write_me) {
toku_free(remove_me); toku_free(remove_me);
} }
static void flush_and_keep (PAIR flush_me) {
if (flush_me->dirty) {
WHEN_TRACE_CT(printf("%s:%d CT flush_callback(%lld, %p, dirty=1, 0)\n", __FILE__, __LINE__, flush_me->key, flush_me->value));
flush_me->flush_callback(flush_me->cachefile, flush_me->key, flush_me->value, flush_me->size, 1, 1);
flush_me->dirty=0;
}
}
static int maybe_flush_some (CACHETABLE t, long size __attribute__((unused))) { static int maybe_flush_some (CACHETABLE t, long size __attribute__((unused))) {
int r = 0; int r = 0;
again: again:
...@@ -309,7 +321,8 @@ static int maybe_flush_some (CACHETABLE t, long size __attribute__((unused))) { ...@@ -309,7 +321,8 @@ static int maybe_flush_some (CACHETABLE t, long size __attribute__((unused))) {
static int cachetable_insert_at(CACHEFILE cachefile, int h, CACHEKEY key, void *value, long size, static int cachetable_insert_at(CACHEFILE cachefile, int h, CACHEKEY key, void *value, long size,
cachetable_flush_func_t flush_callback, cachetable_flush_func_t flush_callback,
cachetable_fetch_func_t fetch_callback, cachetable_fetch_func_t fetch_callback,
void *extraargs, int dirty) { void *extraargs, int dirty,
LSN written_lsn) {
TAGMALLOC(PAIR, p); TAGMALLOC(PAIR, p);
p->pinned = 1; p->pinned = 1;
p->dirty = dirty; p->dirty = dirty;
...@@ -322,6 +335,8 @@ static int cachetable_insert_at(CACHEFILE cachefile, int h, CACHEKEY key, void * ...@@ -322,6 +335,8 @@ static int cachetable_insert_at(CACHEFILE cachefile, int h, CACHEKEY key, void *
p->flush_callback = flush_callback; p->flush_callback = flush_callback;
p->fetch_callback = fetch_callback; p->fetch_callback = fetch_callback;
p->extraargs = extraargs; p->extraargs = extraargs;
p->modified_lsn.lsn = 0;
p->written_lsn = written_lsn;
CACHETABLE ct = cachefile->cachetable; CACHETABLE ct = cachefile->cachetable;
lru_add_to_list(ct, p); lru_add_to_list(ct, p);
p->hash_chain = ct->table[h]; p->hash_chain = ct->table[h];
...@@ -352,7 +367,7 @@ int cachetable_put(CACHEFILE cachefile, CACHEKEY key, void*value, long size, ...@@ -352,7 +367,7 @@ int cachetable_put(CACHEFILE cachefile, CACHEKEY key, void*value, long size,
if (maybe_flush_some(cachefile->cachetable, size)) if (maybe_flush_some(cachefile->cachetable, size))
return -2; return -2;
// flushing could change the result from hashit() // flushing could change the result from hashit()
int r = cachetable_insert_at(cachefile, hashit(cachefile->cachetable, key), key, value, size, flush_callback, fetch_callback, extraargs, 1); int r = cachetable_insert_at(cachefile, hashit(cachefile->cachetable, key), key, value, size, flush_callback, fetch_callback, extraargs, 1, ZERO_LSN);
return r; return r;
} }
...@@ -377,10 +392,11 @@ int cachetable_get_and_pin(CACHEFILE cachefile, CACHEKEY key, void**value, long ...@@ -377,10 +392,11 @@ int cachetable_get_and_pin(CACHEFILE cachefile, CACHEKEY key, void**value, long
void *toku_value; void *toku_value;
long size = 1; // compat long size = 1; // compat
int r; int r;
LSN written_lsn;
WHEN_TRACE_CT(printf("%s:%d CT: fetch_callback(%lld...)\n", __FILE__, __LINE__, key)); WHEN_TRACE_CT(printf("%s:%d CT: fetch_callback(%lld...)\n", __FILE__, __LINE__, key));
if ((r=fetch_callback(cachefile, key, &toku_value, &size, extraargs))) if ((r=fetch_callback(cachefile, key, &toku_value, &size, extraargs, &written_lsn)))
return r; return r;
cachetable_insert_at(cachefile, hashit(t,key), key, toku_value, size, flush_callback, fetch_callback, extraargs, 0); cachetable_insert_at(cachefile, hashit(t,key), key, toku_value, size, flush_callback, fetch_callback, extraargs, 0, written_lsn);
*value = toku_value; *value = toku_value;
if (sizep) if (sizep)
*sizep = size; *sizep = size;
...@@ -428,6 +444,26 @@ int cachetable_unpin(CACHEFILE cachefile, CACHEKEY key, int dirty, long size) { ...@@ -428,6 +444,26 @@ int cachetable_unpin(CACHEFILE cachefile, CACHEKEY key, int dirty, long size) {
return 0; return 0;
} }
// effect: Move an object from one key to another key.
// requires: The object is pinned in the table
int cachetable_rename (CACHEFILE cachefile, CACHEKEY oldkey, CACHEKEY newkey) {
CACHETABLE t = cachefile->cachetable;
PAIR *ptr_to_p,p;
for (ptr_to_p = &t->table[hashit(t, oldkey)], p = *ptr_to_p;
p;
ptr_to_p = &p->hash_chain, p = *ptr_to_p) {
if (p->key==oldkey && p->cachefile==cachefile) {
*ptr_to_p = p->hash_chain;
p->key = newkey;
int nh = hashit(t, newkey);
p->hash_chain = t->table[nh];
t->table[nh] = p;
return 0;
}
}
return -1;
}
int cachetable_flush (CACHETABLE t) { int cachetable_flush (CACHETABLE t) {
int i; int i;
for (i=0; i<t->table_size; i++) { for (i=0; i<t->table_size; i++) {
...@@ -559,6 +595,15 @@ int cachetable_remove (CACHEFILE cachefile, CACHEKEY key, int write_me) { ...@@ -559,6 +595,15 @@ int cachetable_remove (CACHEFILE cachefile, CACHEKEY key, int write_me) {
return 0; return 0;
} }
#if 0
static void flush_and_keep (PAIR flush_me) {
if (flush_me->dirty) {
WHEN_TRACE_CT(printf("%s:%d CT flush_callback(%lld, %p, dirty=1, 0)\n", __FILE__, __LINE__, flush_me->key, flush_me->value));
flush_me->flush_callback(flush_me->cachefile, flush_me->key, flush_me->value, flush_me->size, 1, 1);
flush_me->dirty=0;
}
}
static int cachetable_fsync_pairs (CACHETABLE t, PAIR p) { static int cachetable_fsync_pairs (CACHETABLE t, PAIR p) {
if (p) { if (p) {
int r = cachetable_fsync_pairs(t, p->hash_chain); int r = cachetable_fsync_pairs(t, p->hash_chain);
...@@ -577,6 +622,7 @@ int cachetable_fsync (CACHETABLE t) { ...@@ -577,6 +622,7 @@ int cachetable_fsync (CACHETABLE t) {
} }
return 0; return 0;
} }
#endif
#if 0 #if 0
int cachefile_pwrite (CACHEFILE cf, const void *buf, size_t count, off_t offset) { int cachefile_pwrite (CACHEFILE cf, const void *buf, size_t count, off_t offset) {
...@@ -643,3 +689,54 @@ int cachetable_get_key_state(CACHETABLE ct, CACHEKEY key, void **value_ptr, ...@@ -643,3 +689,54 @@ int cachetable_get_key_state(CACHETABLE ct, CACHEKEY key, void **value_ptr,
} }
return 1; return 1;
} }
int cachetable_checkpoint (CACHETABLE ct) {
// Single threaded checkpoint.
// In future: for multithreaded checkpoint we should not proceed if the previous checkpoint has not finished.
// Requires: Everything is unpinned. (In the multithreaded version we have to wait for things to get unpinned and then
// grab them (or else the unpinner has to do something.)
// Algorithm: Write a checkpoint record to the log, noting the LSN of that record.
// Note the LSN of the previous checkpoint (stored in lsn_of_checkpoint)
// For every (unpinnned) dirty node in which the LSN is newer than the prev checkpoint LSN:
// flush the node (giving it a new nodeid, and fixing up the downpointer in the parent)
// Watch out since evicting the node modifies the hash table.
//?? This is a skeleton. It compiles, but doesn't do anything reasonable yet.
//?? log_the_checkpoint();
int n_saved=0;
int n_in_table = ct->n_in_table;
struct save_something {
CACHEFILE cf;
DISKOFF key;
void *value;
long size;
LSN modified_lsn;
CACHETABLE_FLUSH_FUNC_T flush_callback;
} *MALLOC_N(n_in_table, info);
{
PAIR pair;
for (pair=ct->head; pair; pair=pair->next) {
assert(!pair->pinned);
if (pair->dirty && pair->modified_lsn.lsn>ct->lsn_of_checkpoint.lsn) {
//?? /save_something_about_the_pair(); // This read-only so it doesn't modify the table.
n_saved++;
}
}
}
{
int i;
for (i=0; i<n_saved; i++) {
info[i].flush_callback(info[i].cf, info[i].key, info[i].value, info[i].size, 1, 1, info[i].modified_lsn, 0);
}
}
toku_free(info);
return 0;
}
TOKULOGGER cachefile_logger (CACHEFILE cf) {
return cf->cachetable->logger;
}
FILENUM cachefile_filenum (CACHEFILE cf) {
return cf->filenum;
}
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
#define CACHETABLE_H #define CACHETABLE_H
#include <fcntl.h> #include <fcntl.h>
#include "brttypes.h"
/* Implement the cache table. */ /* Implement the cache table. */
...@@ -22,14 +23,16 @@ typedef struct cachefile *CACHEFILE; ...@@ -22,14 +23,16 @@ typedef struct cachefile *CACHEFILE;
* table_size is the initial size of the cache table hash table (in number of entries) * table_size is the initial size of the cache table hash table (in number of entries)
* size limit is the upper bound of the sum of size of the entries in the cache table (total number of bytes) * size limit is the upper bound of the sum of size of the entries in the cache table (total number of bytes)
*/ */
int create_cachetable(CACHETABLE */*result*/, int table_size, long size_limit); int create_cachetable(CACHETABLE */*result*/, long size_limit, LSN initial_lsn, TOKULOGGER);
int cachetable_openf (CACHEFILE *,CACHETABLE, const char */*fname*/, int flags, mode_t mode); int cachetable_openf (CACHEFILE *,CACHETABLE, const char */*fname*/, int flags, mode_t mode);
typedef void (*cachetable_flush_func_t)(CACHEFILE, CACHEKEY key, void*value, long size, int write_me, int keep_me); typedef void (cachetable_flush_func_t)(CACHEFILE, CACHEKEY key, void*value, long size, BOOL write_me, BOOL keep_me, LSN modified_lsn, BOOL rename_p);
typedef cachetable_flush_func_t *CACHETABLE_FLUSH_FUNC_T;
/* If we are asked to fetch something, get it by calling this back. */ /* If we are asked to fetch something, get it by calling this back. */
typedef int (*cachetable_fetch_func_t)(CACHEFILE, CACHEKEY key, void **value, long *sizep, void *extraargs); typedef int (cachetable_fetch_func_t)(CACHEFILE, CACHEKEY key, void **value, long *sizep, void *extraargs, LSN *written_lsn);
typedef cachetable_fetch_func_t *CACHETABLE_FETCH_FUNC_T;
/* Error if already present. On success, pin the value. */ /* Error if already present. On success, pin the value. */
int cachetable_put(CACHEFILE cf, CACHEKEY key, void* value, long size, int cachetable_put(CACHEFILE cf, CACHEKEY key, void* value, long size,
...@@ -51,6 +54,9 @@ int cachetable_remove (CACHEFILE, CACHEKEY, int /*write_me*/); /* Removing somet ...@@ -51,6 +54,9 @@ int cachetable_remove (CACHEFILE, CACHEKEY, int /*write_me*/); /* Removing somet
int cachetable_assert_all_unpinned (CACHETABLE); int cachetable_assert_all_unpinned (CACHETABLE);
int cachefile_count_pinned (CACHEFILE, int /*printthem*/ ); int cachefile_count_pinned (CACHEFILE, int /*printthem*/ );
/* Rename whatever is at oldkey to be newkey. Requires that the object be pinned. */
int cachetable_rename (CACHEFILE cachefile, CACHEKEY oldkey, CACHEKEY newkey);
//int cachetable_fsync_all (CACHETABLE); /* Flush everything to disk, but keep it in cache. */ //int cachetable_fsync_all (CACHETABLE); /* Flush everything to disk, but keep it in cache. */
int cachetable_close (CACHETABLE*); /* Flushes everything to disk, and destroys the cachetable. */ int cachetable_close (CACHETABLE*); /* Flushes everything to disk, and destroys the cachetable. */
...@@ -63,7 +69,7 @@ int cachefile_close (CACHEFILE*); ...@@ -63,7 +69,7 @@ int cachefile_close (CACHEFILE*);
int cachefile_fd (CACHEFILE); int cachefile_fd (CACHEFILE);
// Useful for debugging // Useful for debugging
void cachetable_print_state (CACHETABLE ct); void cachetable_print_state (CACHETABLE ct);
void cachetable_get_state(CACHETABLE ct, int *num_entries_ptr, int *hash_size_ptr, long *size_current_ptr, long *size_limit_ptr); void cachetable_get_state(CACHETABLE ct, int *num_entries_ptr, int *hash_size_ptr, long *size_current_ptr, long *size_limit_ptr);
int cachetable_get_key_state(CACHETABLE ct, CACHEKEY key, void **value_ptr, int cachetable_get_key_state(CACHETABLE ct, CACHEKEY key, void **value_ptr,
...@@ -72,4 +78,7 @@ int cachetable_get_key_state(CACHETABLE ct, CACHEKEY key, void **value_ptr, ...@@ -72,4 +78,7 @@ int cachetable_get_key_state(CACHETABLE ct, CACHEKEY key, void **value_ptr,
void cachefile_verify (CACHEFILE cf); // Verify the whole cachetable that the CF is in. Slow. void cachefile_verify (CACHEFILE cf); // Verify the whole cachetable that the CF is in. Slow.
void cachetable_verify (CACHETABLE t); // Slow... void cachetable_verify (CACHETABLE t); // Slow...
TOKULOGGER cachefile_logger (CACHEFILE);
FILENUM cachefile_filenum (CACHEFILE);
#endif #endif
CFLAGS = -O2 -Wall -W -Werror -g
LDFLAGS = -lz -lssl -g
adler32:
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <zlib.h>
#include <openssl/md2.h>
#include <openssl/md4.h>
#include <openssl/md5.h>
const unsigned int prime = 2000000011;
unsigned int karprabin (unsigned char *datac, int N) {
assert(N%4==0);
unsigned int *data=(unsigned int*)datac;
N=N/4;
int i;
unsigned int result=0;
for (i=0; i<N; i++) {
result=(result*prime)+data[i];
}
return result;
}
// According to
// P. L'Ecuyer, "Tables of Linear Congruential Generators of
// Different Sizes and Good Lattice Structure", Mathematics of
// Computation 68:225, 249--260 (1999).
// m=2^{32}-5 a=1588635695 is good.
const unsigned int mkr = 4294967291U;
const unsigned int akr = 1588635695U;
// But this is slower
unsigned int karprabinP (unsigned char *datac, int N) {
assert(N%4==0);
unsigned int *data=(unsigned int*)datac;
N=N/4;
int i;
unsigned long long result=0;
for (i=0; i<N; i++) {
result=((result*akr)+data[i])%mkr;
}
return result;
}
float tdiff (struct timeval *start, struct timeval *end) {
return (end->tv_sec-start->tv_sec) +1e-6*(end->tv_usec - start->tv_usec);
}
int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__unused__))) {
struct timeval start, end;
const int N=2<<20;
unsigned char *data=malloc(N);
int i;
assert(data);
for (i=0; i<N; i++) data[i]=random();
// adler32
{
uLong a32 = adler32(0L, Z_NULL, 0);
for (i=0; i<3; i++) {
gettimeofday(&start, 0);
a32 = adler32(a32, data, N);
gettimeofday(&end, 0);
float tm = tdiff(&start, &end);
printf("adler32=%lu, time=%9.6fs %9.6fns/b\n", a32, tm, 1e9*tm/N);
}
}
// crc32
{
uLong c32 = crc32(0L, Z_NULL, 0);
for (i=0; i<3; i++) {
gettimeofday(&start, 0);
c32 = crc32(c32, data, N);
gettimeofday(&end, 0);
float tm = tdiff(&start, &end);
printf("crc32=%lu, time=%9.6fs %9.6fns/b\n", c32, tm, 1e9*tm/N);
}
}
// MD2
{
unsigned char buf[MD2_DIGEST_LENGTH];
int j;
for (i=0; i<3; i++) {
gettimeofday(&start, 0);
MD2(data, N, buf);
gettimeofday(&end, 0);
float tm = tdiff(&start, &end);
printf("md2=");
for (j=0; j<MD2_DIGEST_LENGTH; j++) {
printf("%02x", buf[j]);
}
printf(" time=%9.6fs %9.6fns/b\n", tm, 1e9*tm/N);
}
}
// MD4
{
unsigned char buf[MD4_DIGEST_LENGTH];
int j;
for (i=0; i<3; i++) {
gettimeofday(&start, 0);
MD4(data, N, buf);
gettimeofday(&end, 0);
float tm = tdiff(&start, &end);
printf("md4=");
for (j=0; j<MD4_DIGEST_LENGTH; j++) {
printf("%02x", buf[j]);
}
printf(" time=%9.6fs %9.6fns/b\n", tm, 1e9*tm/N);
}
}
// MD5
{
unsigned char buf[MD5_DIGEST_LENGTH];
int j;
for (i=0; i<3; i++) {
gettimeofday(&start, 0);
MD5(data, N, buf);
gettimeofday(&end, 0);
float tm = tdiff(&start, &end);
printf("md5=");
for (j=0; j<MD5_DIGEST_LENGTH; j++) {
printf("%02x", buf[j]);
}
printf(" time=%9.6fs %9.6fns/b\n", tm, 1e9*tm/N);
}
}
// karp rabin
{
for (i=0; i<3; i++) {
gettimeofday(&start, 0);
unsigned int kr = karprabin(data, N);
gettimeofday(&end, 0);
float tm = tdiff(&start, &end);
printf("kr=%ud time=%9.6fs %9.6fns/b\n", kr, tm, 1e9*tm/N);
}
}
free(data);
return 0;
}
#ifndef TOKU_CRC_H
#define TOKU_CRC_H
#include <zlib.h>
// zlib crc32 has a bug: If len==0 then it should return oldcrc32, but crc32 returns 0.
static inline u_int32_t toku_crc32 (u_int32_t oldcrc32, const void *data, u_int32_t len) {
if (len==0) return oldcrc32;
else return crc32(oldcrc32, data, len);
}
static const u_int32_t toku_null_crc = 0;
// Don't use crc32, use toku_crc32 to avoid that bug.
ZEXTERN uLong ZEXPORT crc32 OF((uLong crc, const Bytef *buf, uInt len)) __attribute__((deprecated));
#endif
#include <arpa/inet.h>
#include <assert.h>
#include "brt-internal.h"
// Calculate the fingerprint for a kvpair
static inline u_int32_t toku_calc_more_crc32_kvpair (u_int32_t crc, const void *key, int keylen, const void *val, int vallen) {
int i;
i = htonl(keylen);
crc = toku_crc32(crc, (void*)&i, 4);
crc = toku_crc32(crc, key, keylen);
i = htonl(vallen);
crc = toku_crc32(crc, (void*)&i, 4);
crc = toku_crc32(crc, val, vallen);
return crc;
}
u_int32_t toku_calccrc32_kvpair (const void *key, int keylen, const void *val, int vallen) {
return toku_calc_more_crc32_kvpair(toku_null_crc, key, keylen, val, vallen);
}
u_int32_t toku_calccrc32_cmd (int type, const void *key, int keylen, const void *val, int vallen) {
unsigned char type_c = type;
return toku_calc_more_crc32_kvpair(toku_crc32(toku_null_crc,
&type_c, 1),
key, keylen, val, vallen);
}
u_int32_t toku_calccrc32_cmdstruct (BRT_CMD *cmd) {
switch (cmd->type) {
case BRT_NONE:
case BRT_INSERT:
case BRT_DELETE:
return toku_calccrc32_cmd (cmd->type, cmd->u.id.key->data, cmd->u.id.key->size, cmd->u.id.val->data, cmd->u.id.val->size);
}
assert(0); /* Should not have come here. */
}
#include "brttypes.h"
#include "brt-internal.h" #include "brt-internal.h"
#include "memory.h"
#include <sys/types.h> #include <sys/types.h>
#include <unistd.h> #include <unistd.h>
#include <assert.h> #include <assert.h>
...@@ -32,7 +30,7 @@ int write_int (int fd, unsigned int v) { ...@@ -32,7 +30,7 @@ int write_int (int fd, unsigned int v) {
return 0; return 0;
} }
int read_diskoff (int fd, diskoff *result) { int read_diskoff (int fd, DISKOFF *result) {
unsigned int i0,i1; unsigned int i0,i1;
int r; int r;
r = read_uint(fd, &i0); if(r!=0) return r; r = read_uint(fd, &i0); if(r!=0) return r;
...@@ -41,7 +39,7 @@ int read_diskoff (int fd, diskoff *result) { ...@@ -41,7 +39,7 @@ int read_diskoff (int fd, diskoff *result) {
return 0; return 0;
} }
int write_diskoff (int fd, diskoff v) { int write_diskoff (int fd, DISKOFF v) {
int r; int r;
r = write_int(fd, (unsigned int)(v>>32)); if (r!=0) return r; r = write_int(fd, (unsigned int)(v>>32)); if (r!=0) return r;
r = write_int(fd, (unsigned int)(v&0xffffffff)); if (r!=0) return r; r = write_int(fd, (unsigned int)(v&0xffffffff)); if (r!=0) return r;
...@@ -97,14 +95,14 @@ int read_brt_header (int fd, struct brt_header *header) { ...@@ -97,14 +95,14 @@ int read_brt_header (int fd, struct brt_header *header) {
return 0; return 0;
} }
int read_brt_h_unused_memory (int fd, diskoff *unused_memory) { int read_brt_h_unused_memory (int fd, DISKOFF *unused_memory) {
off_t r = lseek(fd, 12, SEEK_SET); off_t r = lseek(fd, 12, SEEK_SET);
assert(r==12); assert(r==12);
r = read_diskoff(fd, unused_memory); r = read_diskoff(fd, unused_memory);
return r; return r;
} }
int write_brt_h_unused_memory (int fd, diskoff unused_memory) { int write_brt_h_unused_memory (int fd, DISKOFF unused_memory) {
off_t r = lseek(fd, 12, SEEK_SET); off_t r = lseek(fd, 12, SEEK_SET);
assert(r==12); assert(r==12);
r = write_diskoff(fd, unused_memory); r = write_diskoff(fd, unused_memory);
......
// This list is intended to be embedded in other data structures.
struct list { struct list {
struct list *next, *prev; struct list *next, *prev;
}; };
......
#if defined(__x86_64) || defined(__i386)
static inline void mfence (void) {
__asm__ volatile ("mfence":::"memory");
}
static inline void rfence (void) {
__asm__ volatile ("rfence":::"memory");
}
static inline void sfence (void) {
__asm__ volatile ("sfence":::"memory");
}
/* According to the Intel Architecture Software Developer's
* Manual, Volume 3: System Programming Guide
* (http://www.intel.com/design/pro/manuals/243192.htm), page 7-6,
* "For the P6 family processors, locked operations serialize all
* outstanding load and store operations (that is, wait for them to
* complete)."
*
* Bradley found that fence instructions is faster on an opteron
* mfence takes 8ns on a 1.5GHZ AMD64 (maybe this is an 801)
* sfence takes 5ns
* lfence takes 3ns
* xchgl takes 14ns
*/
static inline lock_xchgl(volatile int *ptr, int x)
{
__asm__("xchgl %0,%1" :"=r" (x) :"m" (*(ptr)), "0" (x) :"memory");
return x;
}
#endif
typedef volatile int SPINLOCK[1];
static inline void spin_init (SPINLOCK v) {
v[0] = 0;
mfence();
}
static inline void spin_lock (SPINLOCK v) {
while (lock_xchgl((int*)v, 1)!=0) {
while (v[0]); /* Spin using only reads. It would be better to use MCS locks, but this reduces bus traffic. */
}
}
static inline void spin_unlock (SPINLOCK v) {
sfence(); // Want all previous stores to take place before we unlock.
v[0]=0;
}
#else
#error Need to define architectur-specific stuff for other machines.
#endif
CFLAGS=-O2 -Wall -W -Werror
LDFLAGS=-lpthread
trylock:
/* Time {m,l,s}fence vs.xchgl for a memory barrier. */
/* Timing numbers:
* Intel T2500 2GHZ
do1 9.0ns/loop
mfence: 29.0ns/loop (marginal cost= 20.0ns)
sfence: 17.3ns/loop (marginal cost= 8.3ns)
lfence: 23.6ns/loop (marginal cost= 14.6ns)
xchgl: 35.8ns/loop (marginal cost= 26.8ns)
* AMD Athlon 64 X2 Dual Core Processor 4200+
Timings are more crazy
do1 20.6ns/loop
mfence: 12.9ns/loop (marginal cost= -7.6ns)
sfence: 8.4ns/loop (marginal cost= -12.1ns)
lfence: 20.2ns/loop (marginal cost= -0.3ns)
xchgl: 16.6ns/loop (marginal cost= -3.9ns)
do1 13.0ns/loop
mfence: 25.6ns/loop (marginal cost= 12.6ns)
sfence: 21.0ns/loop (marginal cost= 8.1ns)
lfence: 12.9ns/loop (marginal cost= -0.1ns)
xchgl: 29.3ns/loop (marginal cost= 16.3ns)
*/
#include <sys/time.h>
#include <stdio.h>
enum { COUNT = 100000000 };
static inline void xchgl (void) {
{
/*
* According to the Intel Architecture Software Developer's
* Manual, Volume 3: System Programming Guide
* (http://www.intel.com/design/pro/manuals/243192.htm), page
* 7-6, "For the P6 family processors, locked operations
* serialize all outstanding load and store operations (that
* is, wait for them to complete)."
* Since xchg is locked by default, it is one way to do membar.
*/
int x=0, y;
asm volatile ("xchgl %0,%1" :"=r" (x) :"m" (y), "0" (x) :"memory");
}
}
static inline void mfence (void) {
asm volatile ("mfence":::"memory");
}
static inline void lfence (void) {
asm volatile ("lfence":::"memory");
}
static inline void sfence (void) {
asm volatile ("sfence":::"memory");
}
double tdiff (struct timeval *start, struct timeval *end) {
return ((end->tv_sec-start->tv_sec + 1e-6*(end->tv_usec + start->tv_usec))/COUNT)*1e9;
}
double nop_cost;
void do1 (volatile int *x) {
int i;
struct timeval start, end;
gettimeofday(&start, 0);
for (i=0; i<COUNT; i++) {
x[0]++;
x[1]++;
x[2]++;
x[3]++;
}
gettimeofday(&end, 0);
printf("do1 %6.1fns/loop\n", nop_cost=tdiff(&start, &end));
}
#define doit(name) void do ##name (volatile int *x) { \
int i; \
struct timeval start, end; \
gettimeofday(&start, 0); \
for (i=0; i<COUNT; i++) { \
x[0]++; \
x[1]++; \
name(); \
x[2]++; \
x[3]++; \
} \
gettimeofday(&end, 0); \
double this_cost = tdiff(&start, &end); \
printf("%6s:%6.1fns/loop (marginal cost=%6.1fns)\n", #name, this_cost, this_cost-nop_cost); \
}
doit(mfence)
doit(lfence)
doit(sfence)
doit(xchgl)
int main (int argc __attribute__((__unused__)),
char *argv[] __attribute__((__unused__))) {
int x[4];
int i;
for (i=0; i<2; i++) {
do1(x);
domfence(x);
dosfence(x);
dolfence(x);
doxchgl(x);
}
return 0;
}
/* How expensive is
* - Obtaining a read-only lock for the first obtainer.
* - Obtaining it for the second one?
* - The third one? */
#include <assert.h>
#include <pthread.h>
#include <stdio.h>
#include <sys/time.h>
float tdiff (struct timeval *start, struct timeval *end) {
return 1e6*(end->tv_sec-start->tv_sec) +(end->tv_usec - start->tv_usec);
}
/* My own rwlock implementation. */
struct brwl {
int mutex;
int state; // 0 for unlocked, -1 for a writer, otherwise many readers
};
static inline int xchg(volatile int *ptr, int x)
{
__asm__("xchgl %0,%1" :"=r" (x) :"m" (*(ptr)), "0" (x) :"memory");
return x;
}
static inline void sfence (void) {
asm volatile ("sfence":::"memory");
}
static inline void brwl_rlock (struct brwl *l) {
while (xchg(&l->mutex, 1)) ;
l->state++;
#if 1
sfence();
l->mutex=0;
#else
xchg(&l->mutex, 0);
#endif
}
enum {K=1000};
pthread_rwlock_t rwlocks[K];
struct brwl blocks[K];
int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__unused__))) {
int j;
int i;
int r;
struct timeval start, end;
for (j=0; j<3; j++) {
for (i=0; i<K; i++) {
r=pthread_rwlock_init(&rwlocks[i], NULL);
assert(r==0);
}
gettimeofday(&start, 0);
for (i=0; i<K; i++) {
r = pthread_rwlock_tryrdlock(&rwlocks[i]);
assert(r==0);
}
gettimeofday(&end, 0);
printf("pthread_rwlock_tryrdlock took %9.3fus for %d ops: %9.3fus/lock (%9.3fMops/s)\n", tdiff(&start,&end), K, tdiff(&start,&end)/K, K/tdiff(&start,&end));
}
for (j=0; j<3; j++) {
for (i=0; i<K; i++) {
r=pthread_rwlock_init(&rwlocks[i], NULL);
assert(r==0);
}
gettimeofday(&start, 0);
for (i=0; i<K; i++) {
r = pthread_rwlock_rdlock(&rwlocks[i]);
assert(r==0);
}
gettimeofday(&end, 0);
printf("pthread_rwlock_rdlock took %9.3fus for %d ops: %9.3fus/lock (%9.3fMops/s)\n", tdiff(&start,&end), K, tdiff(&start,&end)/K, K/tdiff(&start,&end));
}
for (j=0; j<3; j++) {
for (i=0; i<K; i++) {
blocks[i].state=0;
blocks[i].mutex=0;
}
gettimeofday(&start, 0);
for (i=0; i<K; i++) {
brwl_rlock(&blocks[i]);
}
gettimeofday(&end, 0);
printf("brwl_rlock took %9.3fus for %d ops: %9.3fus/lock (%9.3fMops/s)\n", tdiff(&start,&end), K, tdiff(&start,&end)/K, K/tdiff(&start,&end));
}
return 0;
}
#define _MULTI_THREADED
#include <pthread.h>
#include <stdio.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/time.h>
float tdiff (struct timeval *start, struct timeval *end) {
return 1e6*(end->tv_sec-start->tv_sec) +(end->tv_usec - start->tv_usec);
}
/* Simple function to check the return code and exit the program
if the function call failed
*/
static void compResults(char *string, int rc) {
if (rc) {
printf("Error on : %s, rc=%d",
string, rc);
exit(EXIT_FAILURE);
}
return;
}
pthread_rwlock_t rwlock = PTHREAD_RWLOCK_INITIALIZER;
void *rdlockThread(void *arg)
{
int rc;
int count=0;
struct timeval start, end;
printf("Entered thread, getting read lock with mp wait\n");
Retry:
gettimeofday(&start, 0);
rc = pthread_rwlock_tryrdlock(&rwlock);
gettimeofday(&end, 0);
printf("pthread_rwlock_tryrdlock took %9.3fus\n", tdiff(&start,&end));
if (rc == EBUSY) {
if (count >= 10) {
printf("Retried too many times, failure!\n");
exit(EXIT_FAILURE);
}
++count;
printf("Could not get lock, do other work, then RETRY...\n");
sleep(1);
goto Retry;
}
compResults("pthread_rwlock_tryrdlock() 1\n", rc);
sleep(2);
printf("unlock the read lock\n");
gettimeofday(&start, 0);
rc = pthread_rwlock_unlock(&rwlock);
gettimeofday(&end, 0);
compResults("pthread_rwlock_unlock()\n", rc);
printf("%d.%6d to %d.%6d is %9.2f\n", start.tv_sec, start.tv_usec, end.tv_sec, end.tv_usec, tdiff(&start, &end));
printf("Secondary thread complete\n");
return NULL;
}
int main(int argc, char **argv)
{
int rc=0;
pthread_t thread;
struct timeval start, end;
printf("Enter Testcase - %s\n", argv[0]);
gettimeofday(&start, 0);
gettimeofday(&end, 0);
printf("nop Took %9.2f\n", tdiff(&start, &end));
{
int N=1000;
int i;
printf("Main, get and release the write lock %d times\n", N);
gettimeofday(&start, 0);
for (i=0; i<N; i++) {
rc = pthread_rwlock_wrlock(&rwlock);
rc = pthread_rwlock_unlock(&rwlock);
}
gettimeofday(&end, 0);
compResults("pthread_rwlock_wrlock()\n", rc);
printf("Took %9.2fns/op\n", 1000*tdiff(&start, &end)/N);
}
printf("Main, get the write lock\n");
gettimeofday(&start, 0);
rc = pthread_rwlock_wrlock(&rwlock);
gettimeofday(&end, 0);
compResults("pthread_rwlock_wrlock()\n", rc);
printf("Took %9.2f\n", tdiff(&start, &end));
printf("Main, create the try read lock thread\n");
rc = pthread_create(&thread, NULL, rdlockThread, NULL);
compResults("pthread_create\n", rc);
printf("Main, wait a bit holding the write lock\n");
sleep(5);
printf("Main, Now unlock the write lock\n");
gettimeofday(&start, 0);
rc = pthread_rwlock_unlock(&rwlock);
gettimeofday(&end, 0);
compResults("pthread_rwlock_unlock()\n", rc);
printf("Took %9.2f\n", tdiff(&start, &end));
printf("Main, wait for the thread to end\n");
rc = pthread_join(thread, NULL);
compResults("pthread_join\n", rc);
rc = pthread_rwlock_destroy(&rwlock);
compResults("pthread_rwlock_destroy()\n", rc);
printf("Main completed\n");
return 0;
}
...@@ -11,13 +11,20 @@ struct tokulogger { ...@@ -11,13 +11,20 @@ struct tokulogger {
int fd; int fd;
int n_in_file; int n_in_file;
long long next_log_file_number; long long next_log_file_number;
LSN lsn;
char buf[LOGGER_BUF_SIZE]; char buf[LOGGER_BUF_SIZE];
int n_in_buf; int n_in_buf;
}; };
int tokulogger_find_next_unused_log_file(const char *directory, long long *result); int tokulogger_find_next_unused_log_file(const char *directory, long long *result);
enum { LT_INSERT_WITH_NO_OVERWRITE = 'I', LT_DELETE = 'D', LT_COMMIT = 'C' }; enum {
LT_COMMIT = 'C',
LT_DELETE = 'D',
LT_INSERT_WITH_NO_OVERWRITE = 'I',
LT_CHECKPOINT = 'P',
LT_BLOCK_RENAME = 'R'
};
struct tokutxn { struct tokutxn {
u_int64_t txnid64; u_int64_t txnid64;
......
...@@ -2,7 +2,6 @@ ...@@ -2,7 +2,6 @@
#include "log-internal.h" #include "log-internal.h"
#include "wbuf.h" #include "wbuf.h"
#include "memory.h" #include "memory.h"
#include "../src/ydb-internal.h"
#include <dirent.h> #include <dirent.h>
#include <errno.h> #include <errno.h>
#include <fcntl.h> #include <fcntl.h>
...@@ -11,6 +10,7 @@ ...@@ -11,6 +10,7 @@
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/types.h> #include <sys/types.h>
#include <sys/uio.h> #include <sys/uio.h>
#include "../src/ydb-internal.h"
int tokulogger_find_next_unused_log_file(const char *directory, long long *result) { int tokulogger_find_next_unused_log_file(const char *directory, long long *result) {
DIR *d=opendir(directory); DIR *d=opendir(directory);
...@@ -44,6 +44,9 @@ int tokulogger_create_and_open_logger (const char *directory, TOKULOGGER *result ...@@ -44,6 +44,9 @@ int tokulogger_create_and_open_logger (const char *directory, TOKULOGGER *result
result->fd = -1; result->fd = -1;
result->next_log_file_number = nexti; result->next_log_file_number = nexti;
result->n_in_buf = 0; result->n_in_buf = 0;
result->lsn.lsn = 0; // WRONG!!! This should actually be calculated by looking at the log file.
*resultp=result; *resultp=result;
return tokulogger_log_bytes(result, 0, ""); return tokulogger_log_bytes(result, 0, "");
} }
...@@ -85,26 +88,6 @@ int tokulogger_log_bytes(TOKULOGGER logger, int nbytes, void *bytes) { ...@@ -85,26 +88,6 @@ int tokulogger_log_bytes(TOKULOGGER logger, int nbytes, void *bytes) {
return 0; return 0;
} }
// Log an insertion of a key-value pair into a particular node of the tree.
int tokulogger_log_brt_insert_with_no_overwrite (TOKULOGGER logger,
TXNID txnid,
diskoff diskoff,
unsigned char *key,
int keylen,
unsigned char *val,
int vallen) {
int buflen=30+keylen+vallen;
unsigned char buf[buflen];
struct wbuf wbuf;
wbuf_init(&wbuf, buf, buflen) ;
wbuf_char(&wbuf, LT_INSERT_WITH_NO_OVERWRITE);
wbuf_txnid(&wbuf, txnid);
wbuf_diskoff(&wbuf, diskoff);
wbuf_bytes(&wbuf, key, keylen);
wbuf_bytes(&wbuf, val, vallen);
return tokulogger_log_bytes(logger, wbuf.ndone, wbuf.buf);
}
int tokulogger_log_close(TOKULOGGER *loggerp) { int tokulogger_log_close(TOKULOGGER *loggerp) {
TOKULOGGER logger = *loggerp; TOKULOGGER logger = *loggerp;
int r = 0; int r = 0;
...@@ -133,29 +116,6 @@ n ...@@ -133,29 +116,6 @@ n
} }
#endif #endif
int tokulogger_log_phys_add_or_delete_in_leaf (DB *db, TOKUTXN txn, diskoff diskoff, int is_add, const struct kv_pair *pair) {
if (txn==0) return 0;
int keylen = pair->keylen;
int vallen = pair->vallen;
int buflen=(keylen+vallen+4+4 // the key and value
+1 // log command
+8 // txnid
+8 // fileid
+8 // diskoff
);
unsigned char buf[buflen];
struct wbuf wbuf;
wbuf_init(&wbuf, buf, buflen) ;
wbuf_char(&wbuf, is_add ? LT_INSERT_WITH_NO_OVERWRITE : LT_DELETE);
wbuf_txnid(&wbuf, txn->txnid64);
wbuf_fileid(&wbuf, db->i->fileid);
wbuf_diskoff(&wbuf, diskoff);
wbuf_bytes(&wbuf, kv_pair_key_const(pair), keylen);
wbuf_bytes(&wbuf, kv_pair_val_const(pair), vallen);
return tokulogger_log_bytes(txn->logger, wbuf.ndone, wbuf.buf);
}
int tokulogger_fsync (TOKULOGGER logger) { int tokulogger_fsync (TOKULOGGER logger) {
//return 0;/// NO TXN //return 0;/// NO TXN
//fprintf(stderr, "%s:%d syncing log\n", __FILE__, __LINE__); //fprintf(stderr, "%s:%d syncing log\n", __FILE__, __LINE__);
...@@ -171,19 +131,101 @@ int tokulogger_fsync (TOKULOGGER logger) { ...@@ -171,19 +131,101 @@ int tokulogger_fsync (TOKULOGGER logger) {
return 0; return 0;
} }
static int tokulogger_finish (TOKULOGGER logger, struct wbuf *wbuf) {
wbuf_int(wbuf, toku_crc32(0, wbuf->buf, wbuf->ndone));
wbuf_int(wbuf, 4+wbuf->ndone);
return tokulogger_log_bytes(logger, wbuf->ndone, wbuf->buf);
}
// Log an insertion of a key-value pair into a particular node of the tree.
int tokulogger_log_brt_insert_with_no_overwrite (TOKULOGGER logger,
TXNID txnid,
FILENUM fileid,
DISKOFF diskoff,
unsigned char *key,
int keylen,
unsigned char *val,
int vallen) {
int buflen=(keylen+vallen+4+4 // key and value
+1 // command
+8 // lsn
+8 // txnid
+4 // fileid
+8 // diskoff
+8 // crc and len
);
unsigned char buf[buflen];
struct wbuf wbuf;
wbuf_init(&wbuf, buf, buflen) ;
wbuf_char(&wbuf, LT_INSERT_WITH_NO_OVERWRITE);
wbuf_lsn (&wbuf, logger->lsn); logger->lsn.lsn++;
wbuf_txnid(&wbuf, txnid);
wbuf_filenum(&wbuf, fileid);
wbuf_diskoff(&wbuf, diskoff);
wbuf_bytes(&wbuf, key, keylen);
wbuf_bytes(&wbuf, val, vallen);
return tokulogger_finish (logger, &wbuf);
}
int tokulogger_log_phys_add_or_delete_in_leaf (DB *db, TOKUTXN txn, DISKOFF diskoff, int is_add, const struct kv_pair *pair) {
if (txn==0) return 0;
assert(db);
int keylen = pair->keylen;
int vallen = pair->vallen;
const int buflen=(keylen+vallen+4+4 // the key and value
+1 // log command
+8 // lsn
+8 // txnid
+8 // fileid
+8 // diskoff
+8 // crc & len
);
unsigned char buf[buflen];
struct wbuf wbuf;
wbuf_init(&wbuf, buf, buflen) ;
wbuf_char(&wbuf, is_add ? LT_INSERT_WITH_NO_OVERWRITE : LT_DELETE);
wbuf_lsn (&wbuf, txn->logger->lsn);
txn->logger->lsn.lsn++;
wbuf_txnid(&wbuf, txn->txnid64);
wbuf_filenum(&wbuf, db->i->fileid);
wbuf_diskoff(&wbuf, diskoff);
wbuf_bytes(&wbuf, kv_pair_key_const(pair), keylen);
wbuf_bytes(&wbuf, kv_pair_val_const(pair), vallen);
return tokulogger_finish(txn->logger, &wbuf);
}
int tokulogger_log_commit (TOKUTXN txn) { int tokulogger_log_commit (TOKUTXN txn) {
struct wbuf wbuf; struct wbuf wbuf;
int buflen =30; const int buflen = (1 // log command
+8 // lsn
+8 // txnid
+8 // crc & len
);
unsigned char buf[buflen]; unsigned char buf[buflen];
wbuf_init(&wbuf, buf, buflen); wbuf_init(&wbuf, buf, buflen);
wbuf_char(&wbuf, LT_COMMIT); wbuf_char(&wbuf, LT_COMMIT);
wbuf_lsn (&wbuf, txn->logger->lsn);
txn->logger->lsn.lsn++;
wbuf_txnid(&wbuf, txn->txnid64); wbuf_txnid(&wbuf, txn->txnid64);
int r = tokulogger_log_bytes(txn->logger, wbuf.ndone, wbuf.buf); int r = tokulogger_finish(txn->logger, &wbuf);
if (r!=0) return r; if (r!=0) return r;
if (txn->parent) return 0; if (txn->parent) return 0;
else return tokulogger_fsync(txn->logger); else return tokulogger_fsync(txn->logger);
} }
int tokulogger_log_checkpoint (TOKULOGGER logger, LSN *lsn) {
struct wbuf wbuf;
const int buflen =10;
unsigned char buf[buflen];
wbuf_init(&wbuf, buf, buflen);
wbuf_char(&wbuf, LT_CHECKPOINT);
wbuf_lsn (&wbuf, logger->lsn);
*lsn = logger->lsn;
logger->lsn.lsn++;
return tokulogger_log_bytes(logger, wbuf.ndone, wbuf.buf);
}
int tokutxn_begin (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TXNID txnid64, TOKULOGGER logger) { int tokutxn_begin (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TXNID txnid64, TOKULOGGER logger) {
TAGMALLOC(TOKUTXN, result); TAGMALLOC(TOKUTXN, result);
if (result==0) return errno; if (result==0) return errno;
...@@ -194,3 +236,35 @@ int tokutxn_begin (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TXNID txnid64, TOKU ...@@ -194,3 +236,35 @@ int tokutxn_begin (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TXNID txnid64, TOKU
return 0; return 0;
} }
int tokulogger_log_block_rename (TOKULOGGER logger, FILENUM fileid, DISKOFF olddiskoff, DISKOFF newdiskoff, DISKOFF parentdiskoff, int childnum) {
const int buflen=(+1 // log command
+8 // lsn
+8 // fileid
+8 // olddiskoff
+8 // newdiskoff
+8 // parentdiskoff
+4 // childnum
+8 // crc & len
);
unsigned char buf[buflen];
struct wbuf wbuf;
wbuf_init (&wbuf, buf, buflen) ;
wbuf_char (&wbuf, LT_BLOCK_RENAME);
wbuf_lsn (&wbuf, logger->lsn);
logger->lsn.lsn++;
wbuf_filenum(&wbuf, fileid);
wbuf_diskoff(&wbuf, olddiskoff);
wbuf_diskoff(&wbuf, newdiskoff);
wbuf_diskoff(&wbuf, parentdiskoff);
wbuf_int (&wbuf, childnum);
return tokulogger_finish(logger, &wbuf);
}
/*
int brtenv_checkpoint (BRTENV env) {
init the checkpointing lock
acquire_spinlock(&env->checkpointing);
release_spinlock(&env->checkpointing);
return -1;
}
*/
...@@ -3,16 +3,17 @@ ...@@ -3,16 +3,17 @@
#include "../include/db.h" #include "../include/db.h"
#include "brttypes.h" #include "brttypes.h"
#include "kv-pair.h" #include "kv-pair.h"
typedef struct tokulogger *TOKULOGGER;
typedef struct tokutxn *TOKUTXN;
int tokulogger_create_and_open_logger (const char *directory, TOKULOGGER *resultp); int tokulogger_create_and_open_logger (const char *directory, TOKULOGGER *resultp);
int tokulogger_log_bytes(TOKULOGGER logger, int nbytes, void *bytes); int tokulogger_log_bytes(TOKULOGGER logger, int nbytes, void *bytes);
int tokulogger_log_close(TOKULOGGER *logger); int tokulogger_log_close(TOKULOGGER *logger);
int tokulogger_log_checkpoint (TOKULOGGER, LSN*);
int tokulogger_log_phys_add_or_delete_in_leaf (DB *db, TOKUTXN txn, diskoff diskoff, int is_add, const struct kv_pair *pair); int tokulogger_log_phys_add_or_delete_in_leaf (DB *db, TOKUTXN txn, DISKOFF diskoff, int is_add, const struct kv_pair *pair);
int tokulogger_log_commit (TOKUTXN txn); int tokulogger_log_commit (TOKUTXN txn);
int tokulogger_log_block_rename (TOKULOGGER logger, FILENUM fileid, DISKOFF olddiskoff, DISKOFF newdiskoff, DISKOFF parentdiskoff, int childnum);
int tokutxn_begin (TOKUTXN /*parent*/,TOKUTXN *, TXNID txnid64, TOKULOGGER logger); int tokutxn_begin (TOKUTXN /*parent*/,TOKUTXN *, TXNID txnid64, TOKULOGGER logger);
#endif #endif
...@@ -47,4 +47,8 @@ void *mempool_malloc(struct mempool *mp, int size, int alignment); ...@@ -47,4 +47,8 @@ void *mempool_malloc(struct mempool *mp, int size, int alignment);
pool does not keep track of the locations of the free chunks */ pool does not keep track of the locations of the free chunks */
void mempool_mfree(struct mempool *mp, void *vp, int size); void mempool_mfree(struct mempool *mp, void *vp, int size);
static inline int mempool_inrange(struct mempool *mp, void *vp, int size) {
return mp->base <= vp && vp + size <= mp->base + mp->size;
}
#endif #endif
...@@ -10,6 +10,7 @@ struct pma_cursor { ...@@ -10,6 +10,7 @@ struct pma_cursor {
struct pma { struct pma {
enum typ_tag tag; enum typ_tag tag;
int dup_mode;
int N; /* How long is the array? Always a power of two >= 4. */ int N; /* How long is the array? Always a power of two >= 4. */
int n_pairs_present; /* How many array elements are non-null. */ int n_pairs_present; /* How many array elements are non-null. */
struct kv_pair **pairs; struct kv_pair **pairs;
...@@ -23,7 +24,8 @@ struct pma { ...@@ -23,7 +24,8 @@ struct pma {
* The density step is 0.10. */ * The density step is 0.10. */
double ldt_step; /* lower density threshold step */ double ldt_step; /* lower density threshold step */
struct list cursors; struct list cursors;
int (*compare_fun)(DB*,const DBT*,const DBT*); pma_compare_fun_t compare_fun;
pma_compare_fun_t dup_compare_fun;
void *skey, *sval; /* used in dbts */ void *skey, *sval; /* used in dbts */
struct mempool kvspace; struct mempool kvspace;
}; };
...@@ -36,49 +38,6 @@ int pmainternal_make_space_at (PMA pma, int idx); ...@@ -36,49 +38,6 @@ int pmainternal_make_space_at (PMA pma, int idx);
int pmainternal_find (PMA pma, DBT *, DB*); // The DB is so the comparison fuction can be called. int pmainternal_find (PMA pma, DBT *, DB*); // The DB is so the comparison fuction can be called.
void print_pma (PMA pma); /* useful for debugging, so keep the name short. I.e., not pmainternal_print_pma() */ void print_pma (PMA pma); /* useful for debugging, so keep the name short. I.e., not pmainternal_print_pma() */
/*
* resize the pma array to asksize. zero all array entries starting from startx.
*/
int __pma_resize_array(PMA pma, int asksize, int startx);
/*
* extract pairs from the pma in the window delimited by lo and hi.
*/
struct kv_pair_tag *__pma_extract_pairs(PMA pma, int count, int lo, int hi);
/*
* update the cursors in a cursor set given a set of tagged pairs.
*/
void __pma_update_cursors(PMA pma, struct list *cursorset, struct kv_pair_tag *tpairs, int n);
/*
* update this pma's cursors given a set of tagged pairs.
*/
void __pma_update_my_cursors(PMA pma, struct kv_pair_tag *tpairs, int n);
/*
* a deletion occured at index "here" in the pma. rebalance the windows around "here". if
* necessary, shrink the pma.
*/
void __pma_delete_at(PMA pma, int here);
/*
* if the pma entry at here is deleted and there are no more references to it
* then finish the deletion
*/
void __pma_delete_resume(PMA pma, int here);
/*
* finish a deletion from the pma. called when there are no cursor references
* to the kv pair.
*/
void __pma_delete_finish(PMA pma, int here);
/*
* count the number of cursors that reference a pma pair
*/
int __pma_count_cursor_refs(PMA pma, int here);
/* density thresholds */ /* density thresholds */
#define PMA_LDT_HIGH 0.25 #define PMA_LDT_HIGH 0.25
#define PMA_LDT_LOW 0.40 #define PMA_LDT_LOW 0.40
......
This diff is collapsed.
This diff is collapsed.
...@@ -10,11 +10,26 @@ ...@@ -10,11 +10,26 @@
/* An in-memory Packed Memory Array dictionary. */ /* An in-memory Packed Memory Array dictionary. */
/* There is a built-in-cursor. */ /* There is a built-in-cursor. */
/* All functions return 0 on success. */
typedef struct pma *PMA; typedef struct pma *PMA;
typedef struct pma_cursor *PMA_CURSOR; typedef struct pma_cursor *PMA_CURSOR;
/* All functions return 0 on success. */ /* compare 2 DBT's
int pma_create(PMA *, int (*compare_fun)(DB*,const DBT*,const DBT*), int maxsize); return a value < 0, = 0, > 0 if a < b, a == b, a > b respectively */
typedef int (*pma_compare_fun_t)(DB *, const DBT *a, const DBT *b);
int pma_create(PMA *, pma_compare_fun_t compare_fun, int maxsize);
/* set the duplicate mode
0 -> no duplications, DB_DUP, DB_DUPSORT */
int pma_set_dup_mode(PMA pma, int mode);
/* set the duplicate compare function */
int pma_set_dup_compare(PMA pma, pma_compare_fun_t dup_compare_fun);
/* verify the integrity of a pma */
void pma_verify(PMA pma, DB *db);
/* returns 0 if OK. /* returns 0 if OK.
* You must have freed all the cursors, otherwise returns nonzero and does nothing. */ * You must have freed all the cursors, otherwise returns nonzero and does nothing. */
...@@ -28,15 +43,16 @@ int pma_n_entries (PMA); ...@@ -28,15 +43,16 @@ int pma_n_entries (PMA);
/* Duplicates the key and keylen. */ /* Duplicates the key and keylen. */
//enum pma_errors pma_insert (PMA, bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen); //enum pma_errors pma_insert (PMA, bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen);
// The DB pointer is there so that the comparison function can be called. // The DB pointer is there so that the comparison function can be called.
enum pma_errors pma_insert (PMA, DBT*, DBT*, DB*, TOKUTXN txn, diskoff); enum pma_errors pma_insert (PMA, DBT*, DBT*, DB*, TOKUTXN txn, DISKOFF, u_int32_t /*random for fingerprint */, u_int32_t */*fingerprint*/);
/* This returns an error if the key is NOT present. */ /* This returns an error if the key is NOT present. */
int pma_replace (PMA, bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen); int pma_replace (PMA, bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen);
/* This returns an error if the key is NOT present. */ /* This returns an error if the key is NOT present. */
int pma_delete (PMA, DBT *, DB*); int pma_delete (PMA, DBT *, DB*, u_int32_t /*random for fingerprint*/, u_int32_t */*fingerprint*/);
int pma_insert_or_replace (PMA pma, DBT *k, DBT *v, int pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
int *replaced_v_size, /* If it is a replacement, set to the size of the old value, otherwise set to -1. */ int *replaced_v_size, /* If it is a replacement, set to the size of the old value, otherwise set to -1. */
DB *db, TOKUTXN txn, diskoff); DB *db, TOKUTXN txn, DISKOFF,
u_int32_t /*random for fingerprint*/, u_int32_t */*fingerprint*/);
/* Exposes internals of the PMA by returning a pointer to the guts. /* Exposes internals of the PMA by returning a pointer to the guts.
...@@ -53,13 +69,14 @@ enum pma_errors pma_lookup (PMA, DBT*, DBT*, DB*); ...@@ -53,13 +69,14 @@ enum pma_errors pma_lookup (PMA, DBT*, DBT*, DB*);
* leftpma - the pma assigned keys <= pivot key * leftpma - the pma assigned keys <= pivot key
* rightpma - the pma assigned keys > pivot key * rightpma - the pma assigned keys > pivot key
*/ */
int pma_split(PMA origpma, unsigned int *origpma_size, int pma_split(PMA origpma, unsigned int *origpma_size,
PMA leftpma, unsigned int *leftpma_size, PMA leftpma, unsigned int *leftpma_size, u_int32_t leftrand4sum, u_int32_t *leftfingerprint,
PMA rightpma, unsigned int *rightpma_size); PMA rightpma, unsigned int *rightpma_size, u_int32_t rightrand4sum, u_int32_t *rightfingerprint);
/* /*
* Insert several key value pairs into an empty pma. The keys are * Insert several key value pairs into an empty pma.
* assumed to be sorted. * Doesn't delete any existing keys (even if they are duplicates)
* Requires: The keys are sorted
* *
* pma - the pma that the key value pairs will be inserted into. * pma - the pma that the key value pairs will be inserted into.
* must be empty with no cursors. * must be empty with no cursors.
...@@ -67,7 +84,7 @@ int pma_split(PMA origpma, unsigned int *origpma_size, ...@@ -67,7 +84,7 @@ int pma_split(PMA origpma, unsigned int *origpma_size,
* vals - an array of values * vals - an array of values
* n_newpairs - the number of key value pairs * n_newpairs - the number of key value pairs
*/ */
int pma_bulk_insert(PMA pma, DBT *keys, DBT *vals, int n_newpairs); int pma_bulk_insert(PMA pma, DBT *keys, DBT *vals, int n_newpairs, u_int32_t rand4sem, u_int32_t *fingerprint);
/* Move the cursor to the beginning or the end or to a key */ /* Move the cursor to the beginning or the end or to a key */
int pma_cursor (PMA, PMA_CURSOR *); int pma_cursor (PMA, PMA_CURSOR *);
...@@ -122,4 +139,6 @@ void pma_iterate (PMA, void(*)(bytevec,ITEMLEN,bytevec,ITEMLEN, void*), void*); ...@@ -122,4 +139,6 @@ void pma_iterate (PMA, void(*)(bytevec,ITEMLEN,bytevec,ITEMLEN, void*), void*);
int keycompare (bytevec key1, ITEMLEN key1len, bytevec key2, ITEMLEN key2len); int keycompare (bytevec key1, ITEMLEN key1len, bytevec key2, ITEMLEN key2len);
void pma_verify_fingerprint (PMA pma, u_int32_t rand4fingerprint, u_int32_t fingerprint);
#endif #endif
...@@ -31,7 +31,8 @@ void create_directory (void) { ...@@ -31,7 +31,8 @@ void create_directory (void) {
assert(r==0); assert(r==0);
r=env->set_cachesize(env, 0, 512*(1<<20), 0); r=env->set_cachesize(env, 0, 512*(1<<20), 0);
assert(r==0); assert(r==0);
#if DB_VERSION_MAJOR >= 4 && DB_VERSION_MINOR >= 3
IF40((void)0, IF40((void)0,
({ ({
unsigned int gbytes,bytes; unsigned int gbytes,bytes;
...@@ -40,7 +41,7 @@ void create_directory (void) { ...@@ -40,7 +41,7 @@ void create_directory (void) {
assert(r==0); assert(r==0);
printf("Using %.2fMiB Berkeley DB Cache Size\n", gbytes*1024 + ((double)bytes/(1<<20))); printf("Using %.2fMiB Berkeley DB Cache Size\n", gbytes*1024 + ((double)bytes/(1<<20)));
})); }));
#endif
r= env->open(env, dir, DB_CREATE|DB_INIT_MPOOL,0777); // No logging. r= env->open(env, dir, DB_CREATE|DB_INIT_MPOOL,0777); // No logging.
assert(r==0); assert(r==0);
......
...@@ -25,16 +25,26 @@ static unsigned int rbuf_int (struct rbuf *r) { ...@@ -25,16 +25,26 @@ static unsigned int rbuf_int (struct rbuf *r) {
(c3<<0)); (c3<<0));
} }
static inline void rbuf_literal_bytes (struct rbuf *r, bytevec *bytes, unsigned int n_bytes) {
*bytes = &r->buf[r->ndone];
r->ndone+=n_bytes;
assert(r->ndone<=r->size);
}
/* Return a pointer into the middle of the buffer. */ /* Return a pointer into the middle of the buffer. */
static void rbuf_bytes (struct rbuf *r, bytevec *bytes, unsigned int *n_bytes) static void rbuf_bytes (struct rbuf *r, bytevec *bytes, unsigned int *n_bytes)
{ {
*n_bytes = rbuf_int(r); *n_bytes = rbuf_int(r);
*bytes = &r->buf[r->ndone]; rbuf_literal_bytes(r, bytes, *n_bytes);
r->ndone+=*n_bytes; }
assert(r->ndone<=r->size);
static unsigned long long rbuf_ulonglong (struct rbuf *r) {
unsigned i0 = rbuf_int(r);
unsigned i1 = rbuf_int(r);
return ((unsigned long long)(i0)<<32) | ((unsigned long long)(i1));
} }
static diskoff rbuf_diskoff (struct rbuf *r) { static DISKOFF rbuf_diskoff (struct rbuf *r) {
unsigned i0 = rbuf_int(r); unsigned i0 = rbuf_int(r);
unsigned i1 = rbuf_int(r); unsigned i1 = rbuf_int(r);
return ((unsigned long long)(i0)<<32) | ((unsigned long long)(i1)); return ((unsigned long long)(i0)<<32) | ((unsigned long long)(i1));
......
/* Readers/writers locks implementation
*
*****************************************
* Overview
*****************************************
*
* TokuDB employs readers/writers locks for the ephemeral locks (e.g.,
* on BRT nodes) Why not just use the pthread_rwlock API?
*
* 1) we need multiprocess rwlocks (not just multithreaded)
*
* 2) pthread rwlocks are very slow since they entail a system call
* (about 2000ns on a 2GHz T2500.)
*
* Related: We expect the common case to be that the lock is
* granted
*
* 3) We are willing to employ machine-specific instructions (such
* as atomic exchange, and mfence, each of which runs in about
* 10ns.)
*
* 4) We want to guarantee nonstarvation (many rwlock
* implementations can starve the writers because another reader
* comes * along before all the other readers have unlocked.)
*
*****************************************
* How it works
*****************************************
*
* We arrange that the rwlock object is in the address space of both
* threads or processes. For processes we use mmap().
*
* The rwlock struct comprises the following fields
*
* a long mutex field (which is accessed using xchgl() or other
* machine-specific instructions. This is a spin lock.
*
* a read counter (how many readers currently have the lock?)
*
* a write boolean (does a writer have the lock?)
*
* a singly linked list of semaphores for waiting requesters. This
* list is sorted oldest requester first. Each list element
* contains a semaphore (which is provided by the requestor) and a
* boolean indicating whether it is a reader or a writer.
*
* To lock a read rwlock:
*
* 1) Acquire the mutex.
*
* 2) If the linked list is not empty or the writer boolean is true
* then
*
* a) initialize your semaphore (to 0),
* b) add your list element to the end of the list (with rw="read")
* c) release the mutex
* d) wait on the semaphore
* e) when the semaphore release, return success.
*
* 3) Otherwise increment the reader count, release the mutex, and
* return success.
*
* To lock the write rwlock is almost the same.
* 1) Acquire the mutex
* 2) If the list is not empty or the reader count is nonzero
* a) initialize semaphore
* b) add to end of list (with rw="write")
* c) release mutex
* d) wait on the semaphore
* e) return success when the semaphore releases
* 3) Otherwise set writer=TRUE, release mutex and return success.
*
* To unlock a read rwlock:
* 1) Acquire mutex
* 2) Decrement reader count
* 3) If the count is still positive or the list is empty then
* return success
* 4) Otherwise (count==zero and the list is nonempty):
* a) If the first element of the list is a reader:
* i) while the first element is a reader:
* x) pop the list
* y) increment the reader count
* z) increment the semaphore (releasing it for some waiter)
* ii) return success
* b) Else if the first element is a writer
* i) pop the list
* ii) set writer to TRUE
* iii) increment the semaphore
* iv) return success
*/
...@@ -6,6 +6,14 @@ ...@@ -6,6 +6,14 @@
#include <errno.h> #include <errno.h>
#include "memory.h" #include "memory.h"
//#define CRC_NO
#define CRC_INCR
//#define CRC_ATEND
#ifndef CRC_NO
#include "crc.h"
#endif
/* When serializing a value, write it into a buffer. */ /* When serializing a value, write it into a buffer. */
/* This code requires that the buffer be big enough to hold whatever you put into it. */ /* This code requires that the buffer be big enough to hold whatever you put into it. */
/* This abstraction doesn't do a good job of hiding its internals. /* This abstraction doesn't do a good job of hiding its internals.
...@@ -14,18 +22,27 @@ struct wbuf { ...@@ -14,18 +22,27 @@ struct wbuf {
unsigned char *buf; unsigned char *buf;
unsigned int size; unsigned int size;
unsigned int ndone; unsigned int ndone;
#ifdef CRC_INCR
u_int32_t crc32; // A 32-bit CRC of everything written so foar.
#endif
}; };
static void wbuf_init (struct wbuf *w, void *buf, diskoff size) { static void wbuf_init (struct wbuf *w, void *buf, DISKOFF size) {
w->buf=buf; w->buf=buf;
w->size=size; w->size=size;
w->ndone=0; w->ndone=0;
#ifdef CRC_INCR
w->crc32 = toku_crc32(0L, Z_NULL, 0);
#endif
} }
/* Write a character. */ /* Write a character. */
static inline void wbuf_char (struct wbuf *w, int ch) { static inline void wbuf_char (struct wbuf *w, int ch) {
assert(w->ndone<w->size); assert(w->ndone<w->size);
w->buf[w->ndone++]=ch; w->buf[w->ndone++]=ch;
#ifdef CRC_INCR
w->crc32 = toku_crc32(w->crc32, &w->buf[w->ndone-1], 1);
#endif
} }
static void wbuf_int (struct wbuf *w, unsigned int i) { static void wbuf_int (struct wbuf *w, unsigned int i) {
...@@ -40,20 +57,31 @@ static void wbuf_int (struct wbuf *w, unsigned int i) { ...@@ -40,20 +57,31 @@ static void wbuf_int (struct wbuf *w, unsigned int i) {
w->buf[w->ndone+1] = i>>16; w->buf[w->ndone+1] = i>>16;
w->buf[w->ndone+2] = i>>8; w->buf[w->ndone+2] = i>>8;
w->buf[w->ndone+3] = i>>0; w->buf[w->ndone+3] = i>>0;
#ifdef CRC_INCR
w->crc32 = toku_crc32(w->crc32, &w->buf[w->ndone], 4);
#endif
w->ndone += 4; w->ndone += 4;
#endif #endif
} }
static void wbuf_bytes (struct wbuf *w, bytevec bytes_bv, int nbytes) { static inline void wbuf_literal_bytes(struct wbuf *w, bytevec bytes_bv, int nbytes) {
const unsigned char *bytes=bytes_bv; const unsigned char *bytes=bytes_bv;
wbuf_int(w, nbytes);
#if 0 #if 0
{ int i; for (i=0; i<nbytes; i++) wbuf_char(w, bytes[i]); } { int i; for (i=0; i<nbytes; i++) wbuf_char(w, bytes[i]); }
#else #else
assert(w->ndone + nbytes <= w->size); assert(w->ndone + nbytes <= w->size);
memcpy(w->buf + w->ndone, bytes, nbytes); memcpy(w->buf + w->ndone, bytes, nbytes);
#ifdef CRC_INCR
w->crc32 = toku_crc32(w->crc32, &w->buf[w->ndone], nbytes);
#endif
w->ndone += nbytes; w->ndone += nbytes;
#endif #endif
}
static void wbuf_bytes (struct wbuf *w, bytevec bytes_bv, int nbytes) {
wbuf_int(w, nbytes);
wbuf_literal_bytes(w, bytes_bv, nbytes);
} }
static void wbuf_ulonglong (struct wbuf *w, unsigned long long ull) { static void wbuf_ulonglong (struct wbuf *w, unsigned long long ull) {
...@@ -61,7 +89,7 @@ static void wbuf_ulonglong (struct wbuf *w, unsigned long long ull) { ...@@ -61,7 +89,7 @@ static void wbuf_ulonglong (struct wbuf *w, unsigned long long ull) {
wbuf_int(w, ull&0xFFFFFFFF); wbuf_int(w, ull&0xFFFFFFFF);
} }
static void wbuf_diskoff (struct wbuf *w, diskoff off) { static void wbuf_diskoff (struct wbuf *w, DISKOFF off) {
wbuf_ulonglong(w, off); wbuf_ulonglong(w, off);
} }
...@@ -69,8 +97,12 @@ static inline void wbuf_txnid (struct wbuf *w, TXNID tid) { ...@@ -69,8 +97,12 @@ static inline void wbuf_txnid (struct wbuf *w, TXNID tid) {
wbuf_ulonglong(w, tid); wbuf_ulonglong(w, tid);
} }
static inline void wbuf_fileid (struct wbuf *w, unsigned long long fileid) { static inline void wbuf_lsn (struct wbuf *w, LSN lsn) {
wbuf_ulonglong(w, fileid); wbuf_ulonglong(w, lsn.lsn);
}
static inline void wbuf_filenum (struct wbuf *w, FILENUM fileid) {
wbuf_int(w, fileid.fileid);
} }
#endif #endif
...@@ -16,9 +16,11 @@ DBT *fill_dbt(DBT *dbt, bytevec k, ITEMLEN len) { ...@@ -16,9 +16,11 @@ DBT *fill_dbt(DBT *dbt, bytevec k, ITEMLEN len) {
return dbt; return dbt;
} }
DBT *fill_dbt_ap(DBT *dbt, bytevec k, ITEMLEN len, void *app_private) { DBT *fill_dbt_ap(DBT *dbt, bytevec k, ITEMLEN len, void *app_private __attribute__((unused))) {
fill_dbt(dbt, k, len); fill_dbt(dbt, k, len);
#if USE_DBT_APP_PRIVATE
dbt->app_private=app_private; dbt->app_private=app_private;
#endif
return dbt; return dbt;
} }
......
...@@ -11,4 +11,22 @@ DBT *fill_dbt(DBT *dbt, bytevec k, ITEMLEN len); ...@@ -11,4 +11,22 @@ DBT *fill_dbt(DBT *dbt, bytevec k, ITEMLEN len);
DBT *fill_dbt_ap(DBT *dbt, bytevec k, ITEMLEN len, void *app_private); DBT *fill_dbt_ap(DBT *dbt, bytevec k, ITEMLEN len, void *app_private);
int ybt_set_value (DBT *, bytevec val, ITEMLEN vallen, void **staticptrp); int ybt_set_value (DBT *, bytevec val, ITEMLEN vallen, void **staticptrp);
#ifndef USE_DBT_APP_PRIVATE
#define USE_DBT_APP_PRIVATE 0
#endif
static inline void *dbt_get_app_private(DBT *dbt __attribute__((unused))) {
#if USE_DBT_APP_PRIVATE
return dbt->app_private;
#else
return 0;
#endif
}
static inline void dbt_set_app_private(DBT *dbt __attribute__((unused)), void *ap __attribute__((unused))) {
#if USE_DBT_APP_PRIVATE
dbt->app_private = ap;
#endif
}
#endif #endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment