Commit 6157eb7d authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Up

git-svn-id: file:///svn/tokudb@519 c7de825b-a66e-492c-adef-691d508d4ae1
parent 38cf566a
......@@ -12,7 +12,7 @@ FPICFLAGS = -fPIC
DTOOL = valgrind --quiet --error-exitcode=1
endif
CFLAGS = -Wall -W $(OPTFLAGS) -g $(GCOV_FLAGS) $(PROF_FLAGS) -Werror $(FPICFLAGS)
CFLAGS = -Wall -W $(OPTFLAGS) -g $(GCOV_FLAGS) $(PROF_FLAGS) -Werror $(FPICFLAGS) -Wshadow
LDFLAGS = $(OPTFLAGS) -g $(GCOV_FLAGS) $(PROF_FLAGS)
CPPFLAGS += -D_FILE_OFFSET_BITS=64 -D_LARGEFILE64_SOURCE
......@@ -31,10 +31,10 @@ REGRESSION_TESTS = \
ybt-test \
pma-test \
brt-serialize-test \
brt-test \
cachetable-test \
cachetable-test2 \
hashtest \
brt-test \
# This line intentially kept commented so I can have a \ on the end of the previous line
BINS = $(REGRESSION_TESTS) \
......@@ -46,7 +46,6 @@ BINS = $(REGRESSION_TESTS) \
libs: log.o
bins: $(BINS)
check: bins
./benchmark-test --valsize 256 --verify 1
$(DTOOL) ./ybt-test
$(DTOOL) ./pma-test
$(DTOOL) ./cachetable-test
......@@ -54,6 +53,7 @@ check: bins
$(DTOOL) ./brt-serialize-test
$(DTOOL) ./brt-test
$(DTOOL) ./hashtest
./benchmark-test --valsize 256 --verify 1
# ./mdict-test
check-fanout:
......@@ -63,33 +63,40 @@ check-fanout:
let BRT_FANOUT=BRT_FANOUT+1; \
done
pma-test benchmark-test brt-test brt-serialize-test: LDFLAGS+=-lz
# pma: PROF_FLAGS=-fprofile-arcs -ftest-coverage
BRT_INTERNAL_H_INCLUDES = brt-internal.h cachetable.h hashtable.h pma.h brt.h brttypes.h yerror.h ybt.h log.h ../include/db.h kv-pair.h memory.h crc.h
key.o: brttypes.h key.h
pma-test.o: pma-internal.h pma.h yerror.h memory.h ../include/db.h list.h kv-pair.h brttypes.h ybt.h yerror.h
pma-test: pma.o memory.o key.o ybt.o log.o mempool.o
pma-test.o: $(BRT_INTERNAL_H_INCLUDES) pma-internal.h pma.h list.h mempool.h
pma-test: pma.o memory.o key.o ybt.o log.o mempool.o fingerprint.o
pma.o: pma.h yerror.h pma-internal.h memory.h key.h ybt.h brttypes.h log.h ../include/db.h
ybt.o: ybt.h brttypes.h ../include/db.h
ybt-test: ybt-test.o ybt.o memory.o
ybt-test.o: ybt.h ../include/db.h
cachetable.o: cachetable.h hashfun.h
brt-test: ybt.o brt.o hashtable.o pma.o memory.o brt-serialize.o cachetable.o header-io.o ybt.o key.o primes.o log.o mempool.o
log.o: log-internal.h log.h
brt-test: ybt.o brt.o hashtable.o pma.o memory.o brt-serialize.o cachetable.o header-io.o ybt.o key.o primes.o log.o mempool.o brt-verify.o fingerprint.o
log.o: log-internal.h log.h wbuf.h crc.h
brt-test.o brt.o: brt.h ../include/db.h hashtable.h pma.h brttypes.h cachetable.h
brt-serialize-test.o: pma.h yerror.h brt.h ../include/db.h memory.h hashtable.h brttypes.h brt-internal.h
brt.o: brt.h ../include/db.h mdict.h pma.h brttypes.h memory.h brt-internal.h cachetable.h hashtable.h
brt-serialize-test.o: $(BRT_INTERNAL_H_INCLUDES)
brt.o: $(BRT_INTERNAL_H_INCLUDES)
mdict.o: pma.h
hashtable.o: hashtable.h brttypes.h memory.h key.h yerror.h ../include/db.h hashfun.h
memory.o: memory.h
primes.o: primes.h
hashtest: hashtable.o memory.o primes.o
brt-serialize.o: brt.h ../include/db.h cachetable.h memory.h mdict.h pma.h brttypes.h brt-internal.h hashtable.h wbuf.h rbuf.h
header-io.o: brttypes.h brt-internal.h brt.h ../include/db.h memory.h
brt-serialize.o: $(BRT_INTERNAL_H_INCLUDES) key.h wbuf.h rbuf.h
header-io.o: $(BRT_INTERNAL_H_INCLUDES)
mdict-test: hashtable.o pma.o memory.o
brt-bigtest: memory.o ybt.o brt.o pma.o cachetable.o key.o hashtable.o brt-serialize.o
brt-bigtest.o: brt.h ../include/db.h
log-test: log.o memory.o
brt-verify.o: $(BRT_INTERNAL_H_INCLUDES)
fingerprint.o: $(BRT_INTERNAL_H_INCLUDES)
brt-serialize-test: brt-serialize-test.o brt-serialize.o memory.o hashtable.o pma.o key.o ybt.o brt.o cachetable.o primes.o log.o mempool.o brt-verify.o fingerprint.o
brt-serialize-test: brt-serialize-test.o brt-serialize.o memory.o hashtable.o pma.o key.o ybt.o brt.o cachetable.o primes.o log.o mempool.o
cachetable-test.o: cachetable.h memory.h
cachetable-test: cachetable.o memory.o cachetable-test.o primes.o
......@@ -97,7 +104,7 @@ cachetable-test: cachetable.o memory.o cachetable-test.o primes.o
cachetable-test2.o: cachetable.h memory.h
cachetable-test2: cachetable.o memory.o cachetable-test2.o primes.o
benchmark-test: benchmark-test.o ybt.o memory.o brt.o pma.o cachetable.o key.o hashtable.o brt-serialize.o primes.o log.o mempool.o
benchmark-test: benchmark-test.o ybt.o memory.o brt.o pma.o cachetable.o key.o hashtable.o brt-serialize.o primes.o log.o mempool.o brt-verify.o fingerprint.o
benchmark-test.o: brt.h ../include/db.h
clean:
......
......@@ -30,7 +30,7 @@ BRT t;
void setup (void) {
int r;
unlink(fname);
r = brt_create_cachetable(&ct, 0); assert(r==0);
r = brt_create_cachetable(&ct, 0, ZERO_LSN, NULL_LOGGER); assert(r==0);
r = open_brt(fname, 0, 1, &t, nodesize, ct, default_compare_fun); assert(r==0);
}
......@@ -69,6 +69,7 @@ long long llrandom (void) {
void random_insert_below (long long below) {
long long i;
assert(0 < below);
for (i=0; i<ITEMS_TO_INSERT_PER_ITERATION; i++) {
insert(llrandom()%below);
}
......@@ -79,7 +80,7 @@ double tdiff (struct timeval *a, struct timeval *b) {
}
void biginsert (long long n_elements, struct timeval *starttime) {
long i;
long long i;
struct timeval t1,t2;
int iteration;
for (i=0, iteration=0; i<n_elements; i+=ITEMS_TO_INSERT_PER_ITERATION, iteration++) {
......
static int brt_root_put_cmd_XY (BRT brt, BRT_CMD *md, TOKUTXN txn) {
int r;
if ((r = toku_read_and_pin_brt_header(brt->cf, &brt->h))) {
if (0) { died0: toku_unpin_brt_header(brt); }
return r;
}
CACHEKEY *rootp = toku_calculate_root_offset_pointer(brt);
if ((r=cachetable_get_and_pin(brt->cf, *rootp, &node_v, NULL,
brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)brt->h->nodesize))) {
goto died0;
}
node=node_v;
if (0) {
died1:
cachetable_unpin(brt->cf, node->thisnodename, node->dirty, brtnodesize(node));
goto died0;
}
node->parent_brtnode = 0;
result = brtnode_put_cmd_XY(brt, node, cmd, txn);
// It's still pinned, and it may be too big or the fanout may be too large.
if (node->height>0 && node->u.n.n_children==TREE_FANOUT) {
// Must split it.
r = do_split_node(node, &nodea, &nodeb, &splitk); // On error: node is unmodified
if (r!=0) goto died1;
// node is garbage, and nodea and nodeb are pinned
r = brt_init_new_root(brt, nodea, nodeb, splitk, rootp); // On error: root is unmodified and nodea and nodeb are both unpinned
if (r!=0) goto died0;
// nodea and nodeb are unpinned, and the root has been fixed
// up to point at a new node (*rootp) containing two children
// (nodea and nodeb). nodea and nodeb are unpinned. *rootp is still pinned
node = *rootp;
}
// Now the fanout is small enough.
// But the node could still be too large.
if (serialize_brtnode_size(node)>node->nodesize) {
}
}
......@@ -2,14 +2,15 @@
#include "hashtable.h"
#include "pma.h"
#include "brt.h"
//#include "pma.h"
#include "crc.h"
#ifndef BRT_FANOUT
#define BRT_FANOUT 16
#endif
enum { TREE_FANOUT = BRT_FANOUT }; //, NODESIZE=1<<20 };
enum { TREE_FANOUT = BRT_FANOUT };
enum { KEY_VALUE_OVERHEAD = 8 }; /* Must store the two lengths. */
enum { BRT_CMD_OVERHEAD = 1 };
enum { BRT_DEFAULT_NODE_SIZE = 1 << 20 };
struct nodeheader_in_file {
int n_in_buffer;
......@@ -22,21 +23,28 @@ typedef struct brtnode *BRTNODE;
/* Internal nodes. */
struct brtnode {
enum typ_tag tag;
BRT brt; // The containing BRT
unsigned int nodesize;
diskoff thisnodename;
DISKOFF thisnodename; // The size of the node allocated on disk. Not all is necessarily in use.
LSN lsn; // Need the LSN as of the most recent modification.
int layout_version; // What version of the data structure?
BRTNODE parent_brtnode; /* Invariant: The parent of an in-memory node must be in main memory. This is so we can find and update the down pointer when we change the diskoff of a node. */
int height; /* height is always >= 0. 0 for leaf, >0 for nonleaf. */
int dirty;
u_int32_t rand4fingerprint;
u_int32_t local_fingerprint; /* For leaves this is everything in the buffer. For nonleaves, this is everything in the hash tables, but does not include child subtree fingerprints. */
int dirty;
union node {
struct nonleaf {
// Don't actually store the subree fingerprint in the in-memory data structure.
int n_children; /* if n_children==TREE_FANOUT+1 then the tree needs to be rebalanced. */
u_int32_t child_subtree_fingerprints[TREE_FANOUT+1];
bytevec childkeys[TREE_FANOUT]; /* Pivot keys. Child 0's keys are <= childkeys[0]. Child 1's keys are <= childkeys[1].
Note: It is possible that Child 1's keys are == to child 0's key's, so it is
not necessarily true that child 1's keys are > childkeys[0].
However, in the absense of duplicate keys, child 1's keys *are* > childkeys[0]. */
unsigned int childkeylens[TREE_FANOUT];
unsigned int totalchildkeylens;
diskoff children[TREE_FANOUT+1]; /* unused if height==0 */ /* Note: The last element of these arrays is used only temporarily while splitting a node. */
DISKOFF children[TREE_FANOUT+1]; /* unused if height==0 */ /* Note: The last element of these arrays is used only temporarily while splitting a node. */
HASHTABLE htables[TREE_FANOUT+1];
unsigned int n_bytes_in_hashtable[TREE_FANOUT+1]; /* how many bytes are in each hashtable (including overheads) */
unsigned int n_bytes_in_hashtables;
......@@ -52,12 +60,13 @@ struct brtnode {
struct brt_header {
int dirty;
unsigned int nodesize;
diskoff freelist;
diskoff unused_memory;
diskoff unnamed_root;
DISKOFF freelist;
DISKOFF unused_memory;
DISKOFF unnamed_root;
int n_named_roots; /* -1 if the only one is unnamed */
char **names;
diskoff *roots;
DISKOFF *roots;
unsigned int flags;
};
......@@ -69,21 +78,24 @@ struct brt {
BRT_CURSOR cursors_head, cursors_tail;
unsigned int nodesize;
unsigned int flags;
int (*compare_fun)(DB*,const DBT*,const DBT*);
int (*dup_compare)(DB*,const DBT*,const DBT*);
void *skey,*sval; /* Used for DBT return values. */
};
/* serialization code */
void serialize_brtnode_to(int fd, diskoff off, diskoff size, BRTNODE node);
int deserialize_brtnode_from (int fd, diskoff off, BRTNODE *brtnode, int nodesize);
void serialize_brtnode_to(int fd, DISKOFF off, DISKOFF size, BRTNODE node);
int deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int nodesize);
unsigned int serialize_brtnode_size(BRTNODE node); /* How much space will it take? */
int keycompare (bytevec key1, ITEMLEN key1len, bytevec key2, ITEMLEN key2len);
void verify_counts(BRTNODE);
int serialize_brt_header_to (int fd, struct brt_header *h);
int deserialize_brtheader_from (int fd, diskoff off, struct brt_header **brth);
int deserialize_brtheader_from (int fd, DISKOFF off, struct brt_header **brth);
/* return the size of a tree node */
long brtnode_size (BRTNODE node);
......@@ -169,3 +181,21 @@ struct brt_cmd {
};
typedef struct brt_cmd BRT_CMD;
struct brtenv {
CACHETABLE ct;
TOKULOGGER logger;
long long checksum_number;
// SPINLOCK checkpointing;
};
extern cachetable_flush_func_t brtnode_flush_callback;
extern cachetable_fetch_func_t brtnode_fetch_callback;
extern int toku_read_and_pin_brt_header (CACHEFILE cf, struct brt_header **header);
extern int toku_unpin_brt_header (BRT brt);
extern CACHEKEY* toku_calculate_root_offset_pointer (BRT brt);
static const BRTNODE null_brtnode=0;
extern u_int32_t toku_calccrc32_kvpair (const void *key, int keylen, const void *val, int vallen);
extern u_int32_t toku_calccrc32_cmd (int type, const void *key, int keylen, const void *val, int vallen);
extern u_int32_t toku_calccrc32_cmdstruct (BRT_CMD *cmd);
#include "brt.h"
#include "memory.h"
#include "brt-internal.h"
#include <fcntl.h>
#include <assert.h>
#include <string.h>
#include <zlib.h>
#include <arpa/inet.h>
#include <stdlib.h>
void test_serialize(void) {
// struct brt source_brt;
......@@ -12,41 +13,59 @@ void test_serialize(void) {
struct brtnode sn, *dn;
int fd = open("brt-serialize-test.brt", O_RDWR|O_CREAT, 0777);
int r;
const u_int32_t randval = random();
assert(fd>=0);
// source_brt.fd=fd;
char *hello_string;
sn.nodesize = nodesize;
sn.thisnodename = sn.nodesize*20;
sn.lsn.lsn = 123456;
sn.layout_version = 0;
sn.height = 1;
sn.rand4fingerprint = randval;
sn.local_fingerprint = 0;
sn.u.n.n_children = 2;
sn.u.n.childkeys[0] = hello_string = toku_strdup("hello");
sn.u.n.childkeylens[0] = 6;
sn.u.n.totalchildkeylens = 6;
sn.u.n.children[0] = sn.nodesize*30;
sn.u.n.children[1] = sn.nodesize*35;
sn.u.n.child_subtree_fingerprints[0] = random();
sn.u.n.child_subtree_fingerprints[1] = random();
r = toku_hashtable_create(&sn.u.n.htables[0]); assert(r==0);
r = toku_hashtable_create(&sn.u.n.htables[1]); assert(r==0);
r = toku_hash_insert(sn.u.n.htables[0], "a", 2, "aval", 5, BRT_NONE); assert(r==0);
r = toku_hash_insert(sn.u.n.htables[0], "b", 2, "bval", 5, BRT_NONE); assert(r==0);
r = toku_hash_insert(sn.u.n.htables[1], "x", 2, "xval", 5, BRT_NONE); assert(r==0);
r = toku_hash_insert(sn.u.n.htables[0], "a", 2, "aval", 5, BRT_NONE); assert(r==0); sn.local_fingerprint += randval*toku_calccrc32_cmd(BRT_NONE, "a", 2, "aval", 5);
r = toku_hash_insert(sn.u.n.htables[0], "b", 2, "bval", 5, BRT_NONE); assert(r==0); sn.local_fingerprint += randval*toku_calccrc32_cmd(BRT_NONE, "b", 2, "bval", 5);
r = toku_hash_insert(sn.u.n.htables[1], "x", 2, "xval", 5, BRT_NONE); assert(r==0); sn.local_fingerprint += randval*toku_calccrc32_cmd(BRT_NONE, "x", 2, "xval", 5);
sn.u.n.n_bytes_in_hashtables = 3*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+2+5);
serialize_brtnode_to(fd, sn.nodesize*20, sn.nodesize, &sn); assert(r==0);
r = deserialize_brtnode_from(fd, nodesize*20, &dn, nodesize);
assert(r==0);
assert(dn->thisnodename==nodesize*20);
assert(dn->lsn.lsn==123456);
assert(dn->layout_version ==0);
assert(dn->height == 1);
assert(dn->rand4fingerprint==randval);
assert(dn->u.n.n_children==2);
assert(strcmp(dn->u.n.childkeys[0], "hello")==0);
assert(dn->u.n.childkeylens[0]==6);
assert(dn->u.n.totalchildkeylens==6);
assert(dn->u.n.children[0]==nodesize*30);
assert(dn->u.n.children[1]==nodesize*35);
{
int i;
for (i=0; i<2; i++) {
assert(dn->u.n.child_subtree_fingerprints[i]==sn.u.n.child_subtree_fingerprints[i]);
}
assert(dn->local_fingerprint==sn.local_fingerprint);
}
{
bytevec data; ITEMLEN datalen; int type;
int r = toku_hash_find(dn->u.n.htables[0], "a", 2, &data, &datalen, &type);
r = toku_hash_find(dn->u.n.htables[0], "a", 2, &data, &datalen, &type);
assert(r==0);
assert(strcmp(data,"aval")==0);
assert(datalen==5);
......@@ -64,7 +83,7 @@ void test_serialize(void) {
assert(datalen==5);
assert(type == BRT_NONE);
}
// brtnode_free(&dn);
brtnode_free(&dn);
toku_free(hello_string);
toku_hashtable_free(&sn.u.n.htables[0]);
......
This diff is collapsed.
This diff is collapsed.
/* Verify a BRT. */
/* Check:
* the fingerprint of every node (local check)
* the child's fingerprint matches the parent's copy
* the tree is of uniform depth (and the height is correct at every node)
* For non-dup trees: the values to the left are < the values to the right
* and < the pivot
* For dup trees: the values to the left are <= the values to the right
* the pivots are < or <= left values (according to the PresentL bit)
* the pivots are > or >= right values (according to the PresentR bit)
*
* Note: We don't yet have DUP trees, so thee checks on duplicate trees are unimplemented. (Nov 1 2007)
*/
#include "brt-internal.h"
#include <assert.h>
static void verify_local_fingerprint (BRTNODE node) {
u_int32_t fp=0;
int i;
if (node->height>0) {
for (i=0; i<node->u.n.n_children; i++)
HASHTABLE_ITERATE(node->u.n.htables[i], key, keylen, data, datalen, type,
({
fp += node->rand4fingerprint * toku_calccrc32_cmd(type, key, keylen, data, datalen);
}));
assert(fp==node->local_fingerprint);
} else {
pma_verify_fingerprint(node->u.l.buffer, node->rand4fingerprint, node->local_fingerprint);
}
}
static void verify_parent_fingerprint (BRTNODE node) {
BRTNODE parent=node->parent_brtnode;
u_int32_t subtree_fingerprint=node->local_fingerprint;
if (node->height>0) {
int i;
for (i=0; i<node->u.n.n_children; i++) {
subtree_fingerprint+=node->u.n.child_subtree_fingerprints[i];
}
}
if (parent) {
int i;
assert(parent->height>0);
for (i=0; i<parent->u.n.n_children; i++) {
if (parent->u.n.children[i]==node->thisnodename) {
assert(parent->u.n.child_subtree_fingerprints[i]==subtree_fingerprint);
return;
}
}
assert(0); // no parent matches
}
}
int verify_brtnode (BRT brt, DISKOFF off, bytevec lorange, ITEMLEN lolen, bytevec hirange, ITEMLEN hilen, int recurse, BRTNODE parent_brtnode) {
int result=0;
BRTNODE node;
void *node_v;
int r;
if ((r = cachetable_get_and_pin(brt->cf, off, &node_v, NULL,
brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)brt->h->nodesize)))
return r;
//printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v);
node=node_v;
node->parent_brtnode = parent_brtnode;
verify_local_fingerprint(node);
verify_parent_fingerprint(node);
if (node->height>0) {
int i;
for (i=0; i< node->u.n.n_children-1; i++) {
bytevec thislorange,thishirange;
ITEMLEN thislolen, thishilen;
if (node->u.n.n_children==0 || i==0) {
thislorange=lorange;
thislolen =lolen;
} else {
thislorange=node->u.n.childkeys[i-1];
thislolen =node->u.n.childkeylens[i-1];
}
if (node->u.n.n_children==0 || i+1>=node->u.n.n_children) {
thishirange=hirange;
thishilen =hilen;
} else {
thishirange=node->u.n.childkeys[i];
thishilen =node->u.n.childkeylens[i];
}
{
void verify_pair (bytevec key, unsigned int keylen,
bytevec data __attribute__((__unused__)),
unsigned int datalen __attribute__((__unused__)),
int type __attribute__((__unused__)),
void *ignore __attribute__((__unused__))) {
if (thislorange) assert(keycompare(thislorange,thislolen,key,keylen)<0);
if (thishirange && keycompare(key,keylen,thishirange,thishilen)>0) {
printf("%s:%d in buffer %d key %s is bigger than %s\n", __FILE__, __LINE__, i, (char*)key, (char*)thishirange);
result=1;
}
}
toku_hashtable_iterate(node->u.n.htables[i], verify_pair, 0);
}
}
for (i=0; i<node->u.n.n_children; i++) {
if (i>0) {
if (lorange) assert(keycompare(lorange,lolen, node->u.n.childkeys[i-1], node->u.n.childkeylens[i-1])<0);
if (hirange) assert(keycompare(node->u.n.childkeys[i-1], node->u.n.childkeylens[i-1], hirange, hilen)<=0);
}
if (recurse) {
result|=verify_brtnode(brt, node->u.n.children[i],
(i==0) ? lorange : node->u.n.childkeys[i-1],
(i==0) ? lolen : node->u.n.childkeylens[i-1],
(i==node->u.n.n_children-1) ? hirange : node->u.n.childkeys[i],
(i==node->u.n.n_children-1) ? hilen : node->u.n.childkeylens[i],
recurse,
node);
}
}
}
if ((r = cachetable_unpin(brt->cf, off, 0, 0))) return r;
return result;
}
int verify_brt (BRT brt) {
int r;
CACHEKEY *rootp;
if ((r = toku_read_and_pin_brt_header(brt->cf, &brt->h))) {
if (0) { died0: toku_unpin_brt_header(brt); }
return r;
}
rootp = toku_calculate_root_offset_pointer(brt);
if ((r=verify_brtnode(brt, *rootp, 0, 0, 0, 0, 1, null_brtnode))) goto died0;
if ((r = toku_unpin_brt_header(brt))!=0) return r;
return 0;
}
This diff is collapsed.
......@@ -11,8 +11,15 @@
#include "log.h"
typedef struct brt *BRT;
int open_brt (const char *fname, const char *dbname, int is_create, BRT *, int nodesize, CACHETABLE, int(*)(DB*,const DBT*,const DBT*));
//int brt_create (BRT **, int nodesize, int n_nodes_in_cache); /* the nodesize and n_nodes in cache really should be separately configured. */
//int brt_open (BRT *, char *fname, char *dbname);
int brt_create(BRT *);
int brt_set_flags(BRT, int flags);
int brt_set_nodesize(BRT, int nodesize);
int brt_set_bt_compare(BRT, int (*bt_compare)(DB *, const DBT*, const DBT*));
int brt_set_dup_compare(BRT, int (*dup_compare)(DB *, const DBT*, const DBT*));
int brt_set_cachetable(BRT, CACHETABLE);
int brt_open(BRT, const char *fname, const char *dbname, int is_create, CACHETABLE ct);
int brt_insert (BRT, DBT *, DBT *, DB*, TOKUTXN);
int brt_lookup (BRT brt, DBT *k, DBT *v, DB*db);
int brt_delete (BRT brt, DBT *k, DB *db);
......@@ -22,12 +29,11 @@ void brt_fsync (BRT); /* fsync, but don't clear the caches. */
void brt_flush (BRT); /* fsync and clear the caches. */
int brt_create_cachetable (CACHETABLE *t, int n_cachlines /* Pass 0 if you want the default. */);
/* create and initialize a cache table
hashsize is the initialize size of the lookup table
cachesize is the upper limit on the size of the size of the values in the table */
int brt_create_cachetable_size (CACHETABLE *t, int hashsize, long cachesize);
cachesize is the upper limit on the size of the size of the values in the table
pass 0 if you want the default */
int brt_create_cachetable(CACHETABLE *t, long cachesize, LSN initial_lsn, TOKULOGGER);
extern int brt_debug_mode;
int verify_brt (BRT brt);
......@@ -40,4 +46,7 @@ int brt_cursor_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int brtc_flags, DB *d
int brt_cursor_delete(BRT_CURSOR cursor, int flags);
int brt_cursor_close (BRT_CURSOR curs);
typedef struct brtenv *BRTENV;
int brtenv_checkpoint (BRTENV env);
#endif
#ifndef BRTTYPES_H
#define BRTTYPES_H
#include <sys/types.h>
#define _XOPEN_SOURCE 500
#define _FILE_OFFSET_BITS 64
typedef unsigned int ITEMLEN;
typedef const void *bytevec;
//typedef const void *bytevec;
typedef long long diskoff; /* Offset in a disk. -1 is the NULL pointer. */
typedef long long DISKOFF; /* Offset in a disk. -1 is the NULL pointer. */
typedef long long TXNID;
/* Make the LSN be a struct instead of an integer so that we get better type checking. */
typedef struct __toku_lsn { u_int64_t lsn; } LSN;
#define ZERO_LSN ((LSN){0})
/* Make the FILEID a struct for the same reason. */
typedef struct __toku_fileid { u_int32_t fileid; } FILENUM;
typedef enum __toku_bool { FALSE=0, TRUE=1} BOOL;
typedef struct tokulogger *TOKULOGGER;
#define NULL_LOGGER ((TOKULOGGER)0)
typedef struct tokutxn *TOKUTXN;
#endif
This diff is collapsed.
......@@ -58,7 +58,14 @@ static void file_is_not_present(CACHEFILE cf) {
}
static void flush_forchain (CACHEFILE f __attribute__((__unused__)), CACHEKEY key, void *value, long size __attribute__((__unused__)), int write_me __attribute__((__unused__)), int keep_me __attribute__((__unused__))) {
static void flush_forchain (CACHEFILE f __attribute__((__unused__)),
CACHEKEY key,
void *value,
long size __attribute__((__unused__)),
BOOL write_me __attribute__((__unused__)),
BOOL keep_me __attribute__((__unused__)),
LSN modified_lsn __attribute__((__unused__)),
BOOL rename_p __attribute__((__unused__))) {
int *v = value;
//cachetable_print_state(ct);
//printf("Flush %lld %d\n", key, (int)value);
......@@ -67,9 +74,10 @@ static void flush_forchain (CACHEFILE f __attribute__((__unused__)), CACHEKEY ke
//print_ints();
}
static int fetch_forchain (CACHEFILE f __attribute__((__unused__)), CACHEKEY key, void**value, long *sizep __attribute__((__unused__)), void*extraargs) {
static int fetch_forchain (CACHEFILE f __attribute__((__unused__)), CACHEKEY key, void**value, long *sizep __attribute__((__unused__)), void*extraargs, LSN *written_lsn) {
assert((long)extraargs==(long)key);
*value = (void*)(long)key;
written_lsn->lsn = 0;
return 0;
}
......@@ -93,9 +101,9 @@ void test_chaining (void) {
char fname[N_FILES][FILENAME_LEN];
int r;
long i, trial;
r = create_cachetable(&ct, N_PRESENT_LIMIT, N_PRESENT_LIMIT); assert(r==0);
r = create_cachetable(&ct, N_PRESENT_LIMIT, ZERO_LSN, NULL_LOGGER); assert(r==0);
for (i=0; i<N_FILES; i++) {
int r = snprintf(fname[i], FILENAME_LEN, "cachetabletest2.%ld.dat", i);
r = snprintf(fname[i], FILENAME_LEN, "cachetabletest2.%ld.dat", i);
assert(r>0 && r<FILENAME_LEN);
unlink(fname[i]);
r = cachetable_openf(&f[i], ct, fname[i], O_RDWR|O_CREAT, 0777); assert(r==0);
......
......@@ -29,12 +29,15 @@ struct ctpair {
PAIR next,prev; // In LRU list.
PAIR hash_chain;
CACHEFILE cachefile;
cachetable_flush_func_t flush_callback;
cachetable_fetch_func_t fetch_callback;
void*extraargs;
CACHETABLE_FLUSH_FUNC_T flush_callback;
CACHETABLE_FETCH_FUNC_T fetch_callback;
void *extraargs;
int verify_flag; /* Used in verify_cachetable() */
LSN modified_lsn; // What was the LSN when modified (undefined if not dirty)
LSN written_lsn; // What was the LSN when written (we need to get this information when we fetch)
};
// The cachetable is as close to an ENV as we get.
struct cachetable {
enum typ_tag tag;
int n_in_table;
......@@ -44,6 +47,8 @@ struct cachetable {
CACHEFILE cachefiles;
long size_current, size_limit;
int primeidx;
LSN lsn_of_checkpoint; // the most recent checkpoint in the log.
TOKULOGGER logger;
};
struct fileid {
......@@ -57,9 +62,10 @@ struct cachefile {
int fd; /* Bug: If a file is opened read-only, then it is stuck in read-only. If it is opened read-write, then subsequent writers can write to it too. */
CACHETABLE cachetable;
struct fileid fileid;
FILENUM filenum;
};
int create_cachetable(CACHETABLE *result, int table_size __attribute__((unused)), long size_limit) {
int create_cachetable(CACHETABLE *result, long size_limit, LSN initial_lsn, TOKULOGGER logger) {
TAGMALLOC(CACHETABLE, t);
int i;
t->n_in_table = 0;
......@@ -74,6 +80,8 @@ int create_cachetable(CACHETABLE *result, int table_size __attribute__((unused))
t->cachefiles = 0;
t->size_current = 0;
t->size_limit = size_limit;
t->lsn_of_checkpoint = initial_lsn;
t->logger = logger;
*result = t;
return 0;
}
......@@ -257,13 +265,25 @@ static PAIR remove_from_hash_chain (PAIR remove_me, PAIR list) {
return list;
}
// Predicate to determine if a node must be renamed. Nodes are renamed on the time they are written
// after a checkpoint.
// Thus we need to rename it if it is dirty,
// if it has been modified within the current checkpoint regime (hence non-strict inequality)
// and the last time it was written was in a previous checkpoint regime (strict inequality)
static BOOL need_to_rename_p (CACHETABLE t, PAIR p) {
return (p->dirty
&& p->modified_lsn.lsn>=t->lsn_of_checkpoint.lsn // nonstrict
&& p->written_lsn.lsn < t->lsn_of_checkpoint.lsn); // strict
}
static void flush_and_remove (CACHETABLE t, PAIR remove_me, int write_me) {
lru_remove(t, remove_me);
//printf("flush_callback(%lld,%p)\n", remove_me->key, remove_me->value);
WHEN_TRACE_CT(printf("%s:%d CT flush_callback(%lld, %p, dirty=%d, 0)\n", __FILE__, __LINE__, remove_me->key, remove_me->value, remove_me->dirty && write_me));
//printf("%s:%d TAG=%x p=%p\n", __FILE__, __LINE__, remove_me->tag, remove_me);
//printf("%s:%d dirty=%d\n", __FILE__, __LINE__, remove_me->dirty);
remove_me->flush_callback(remove_me->cachefile, remove_me->key, remove_me->value, remove_me->size, remove_me->dirty && write_me, 0);
remove_me->flush_callback(remove_me->cachefile, remove_me->key, remove_me->value, remove_me->size, remove_me->dirty && write_me, 0,
t->lsn_of_checkpoint, need_to_rename_p(t, remove_me));
t->n_in_table--;
// Remove it from the hash chain.
{
......@@ -274,14 +294,6 @@ static void flush_and_remove (CACHETABLE t, PAIR remove_me, int write_me) {
toku_free(remove_me);
}
static void flush_and_keep (PAIR flush_me) {
if (flush_me->dirty) {
WHEN_TRACE_CT(printf("%s:%d CT flush_callback(%lld, %p, dirty=1, 0)\n", __FILE__, __LINE__, flush_me->key, flush_me->value));
flush_me->flush_callback(flush_me->cachefile, flush_me->key, flush_me->value, flush_me->size, 1, 1);
flush_me->dirty=0;
}
}
static int maybe_flush_some (CACHETABLE t, long size __attribute__((unused))) {
int r = 0;
again:
......@@ -309,7 +321,8 @@ static int maybe_flush_some (CACHETABLE t, long size __attribute__((unused))) {
static int cachetable_insert_at(CACHEFILE cachefile, int h, CACHEKEY key, void *value, long size,
cachetable_flush_func_t flush_callback,
cachetable_fetch_func_t fetch_callback,
void *extraargs, int dirty) {
void *extraargs, int dirty,
LSN written_lsn) {
TAGMALLOC(PAIR, p);
p->pinned = 1;
p->dirty = dirty;
......@@ -322,6 +335,8 @@ static int cachetable_insert_at(CACHEFILE cachefile, int h, CACHEKEY key, void *
p->flush_callback = flush_callback;
p->fetch_callback = fetch_callback;
p->extraargs = extraargs;
p->modified_lsn.lsn = 0;
p->written_lsn = written_lsn;
CACHETABLE ct = cachefile->cachetable;
lru_add_to_list(ct, p);
p->hash_chain = ct->table[h];
......@@ -352,7 +367,7 @@ int cachetable_put(CACHEFILE cachefile, CACHEKEY key, void*value, long size,
if (maybe_flush_some(cachefile->cachetable, size))
return -2;
// flushing could change the result from hashit()
int r = cachetable_insert_at(cachefile, hashit(cachefile->cachetable, key), key, value, size, flush_callback, fetch_callback, extraargs, 1);
int r = cachetable_insert_at(cachefile, hashit(cachefile->cachetable, key), key, value, size, flush_callback, fetch_callback, extraargs, 1, ZERO_LSN);
return r;
}
......@@ -377,10 +392,11 @@ int cachetable_get_and_pin(CACHEFILE cachefile, CACHEKEY key, void**value, long
void *toku_value;
long size = 1; // compat
int r;
LSN written_lsn;
WHEN_TRACE_CT(printf("%s:%d CT: fetch_callback(%lld...)\n", __FILE__, __LINE__, key));
if ((r=fetch_callback(cachefile, key, &toku_value, &size, extraargs)))
if ((r=fetch_callback(cachefile, key, &toku_value, &size, extraargs, &written_lsn)))
return r;
cachetable_insert_at(cachefile, hashit(t,key), key, toku_value, size, flush_callback, fetch_callback, extraargs, 0);
cachetable_insert_at(cachefile, hashit(t,key), key, toku_value, size, flush_callback, fetch_callback, extraargs, 0, written_lsn);
*value = toku_value;
if (sizep)
*sizep = size;
......@@ -428,6 +444,26 @@ int cachetable_unpin(CACHEFILE cachefile, CACHEKEY key, int dirty, long size) {
return 0;
}
// effect: Move an object from one key to another key.
// requires: The object is pinned in the table
int cachetable_rename (CACHEFILE cachefile, CACHEKEY oldkey, CACHEKEY newkey) {
CACHETABLE t = cachefile->cachetable;
PAIR *ptr_to_p,p;
for (ptr_to_p = &t->table[hashit(t, oldkey)], p = *ptr_to_p;
p;
ptr_to_p = &p->hash_chain, p = *ptr_to_p) {
if (p->key==oldkey && p->cachefile==cachefile) {
*ptr_to_p = p->hash_chain;
p->key = newkey;
int nh = hashit(t, newkey);
p->hash_chain = t->table[nh];
t->table[nh] = p;
return 0;
}
}
return -1;
}
int cachetable_flush (CACHETABLE t) {
int i;
for (i=0; i<t->table_size; i++) {
......@@ -559,6 +595,15 @@ int cachetable_remove (CACHEFILE cachefile, CACHEKEY key, int write_me) {
return 0;
}
#if 0
static void flush_and_keep (PAIR flush_me) {
if (flush_me->dirty) {
WHEN_TRACE_CT(printf("%s:%d CT flush_callback(%lld, %p, dirty=1, 0)\n", __FILE__, __LINE__, flush_me->key, flush_me->value));
flush_me->flush_callback(flush_me->cachefile, flush_me->key, flush_me->value, flush_me->size, 1, 1);
flush_me->dirty=0;
}
}
static int cachetable_fsync_pairs (CACHETABLE t, PAIR p) {
if (p) {
int r = cachetable_fsync_pairs(t, p->hash_chain);
......@@ -577,6 +622,7 @@ int cachetable_fsync (CACHETABLE t) {
}
return 0;
}
#endif
#if 0
int cachefile_pwrite (CACHEFILE cf, const void *buf, size_t count, off_t offset) {
......@@ -643,3 +689,54 @@ int cachetable_get_key_state(CACHETABLE ct, CACHEKEY key, void **value_ptr,
}
return 1;
}
int cachetable_checkpoint (CACHETABLE ct) {
// Single threaded checkpoint.
// In future: for multithreaded checkpoint we should not proceed if the previous checkpoint has not finished.
// Requires: Everything is unpinned. (In the multithreaded version we have to wait for things to get unpinned and then
// grab them (or else the unpinner has to do something.)
// Algorithm: Write a checkpoint record to the log, noting the LSN of that record.
// Note the LSN of the previous checkpoint (stored in lsn_of_checkpoint)
// For every (unpinnned) dirty node in which the LSN is newer than the prev checkpoint LSN:
// flush the node (giving it a new nodeid, and fixing up the downpointer in the parent)
// Watch out since evicting the node modifies the hash table.
//?? This is a skeleton. It compiles, but doesn't do anything reasonable yet.
//?? log_the_checkpoint();
int n_saved=0;
int n_in_table = ct->n_in_table;
struct save_something {
CACHEFILE cf;
DISKOFF key;
void *value;
long size;
LSN modified_lsn;
CACHETABLE_FLUSH_FUNC_T flush_callback;
} *MALLOC_N(n_in_table, info);
{
PAIR pair;
for (pair=ct->head; pair; pair=pair->next) {
assert(!pair->pinned);
if (pair->dirty && pair->modified_lsn.lsn>ct->lsn_of_checkpoint.lsn) {
//?? /save_something_about_the_pair(); // This read-only so it doesn't modify the table.
n_saved++;
}
}
}
{
int i;
for (i=0; i<n_saved; i++) {
info[i].flush_callback(info[i].cf, info[i].key, info[i].value, info[i].size, 1, 1, info[i].modified_lsn, 0);
}
}
toku_free(info);
return 0;
}
TOKULOGGER cachefile_logger (CACHEFILE cf) {
return cf->cachetable->logger;
}
FILENUM cachefile_filenum (CACHEFILE cf) {
return cf->filenum;
}
......@@ -2,6 +2,7 @@
#define CACHETABLE_H
#include <fcntl.h>
#include "brttypes.h"
/* Implement the cache table. */
......@@ -22,14 +23,16 @@ typedef struct cachefile *CACHEFILE;
* table_size is the initial size of the cache table hash table (in number of entries)
* size limit is the upper bound of the sum of size of the entries in the cache table (total number of bytes)
*/
int create_cachetable(CACHETABLE */*result*/, int table_size, long size_limit);
int create_cachetable(CACHETABLE */*result*/, long size_limit, LSN initial_lsn, TOKULOGGER);
int cachetable_openf (CACHEFILE *,CACHETABLE, const char */*fname*/, int flags, mode_t mode);
typedef void (*cachetable_flush_func_t)(CACHEFILE, CACHEKEY key, void*value, long size, int write_me, int keep_me);
typedef void (cachetable_flush_func_t)(CACHEFILE, CACHEKEY key, void*value, long size, BOOL write_me, BOOL keep_me, LSN modified_lsn, BOOL rename_p);
typedef cachetable_flush_func_t *CACHETABLE_FLUSH_FUNC_T;
/* If we are asked to fetch something, get it by calling this back. */
typedef int (*cachetable_fetch_func_t)(CACHEFILE, CACHEKEY key, void **value, long *sizep, void *extraargs);
typedef int (cachetable_fetch_func_t)(CACHEFILE, CACHEKEY key, void **value, long *sizep, void *extraargs, LSN *written_lsn);
typedef cachetable_fetch_func_t *CACHETABLE_FETCH_FUNC_T;
/* Error if already present. On success, pin the value. */
int cachetable_put(CACHEFILE cf, CACHEKEY key, void* value, long size,
......@@ -51,6 +54,9 @@ int cachetable_remove (CACHEFILE, CACHEKEY, int /*write_me*/); /* Removing somet
int cachetable_assert_all_unpinned (CACHETABLE);
int cachefile_count_pinned (CACHEFILE, int /*printthem*/ );
/* Rename whatever is at oldkey to be newkey. Requires that the object be pinned. */
int cachetable_rename (CACHEFILE cachefile, CACHEKEY oldkey, CACHEKEY newkey);
//int cachetable_fsync_all (CACHETABLE); /* Flush everything to disk, but keep it in cache. */
int cachetable_close (CACHETABLE*); /* Flushes everything to disk, and destroys the cachetable. */
......@@ -63,7 +69,7 @@ int cachefile_close (CACHEFILE*);
int cachefile_fd (CACHEFILE);
// Useful for debugging
// Useful for debugging
void cachetable_print_state (CACHETABLE ct);
void cachetable_get_state(CACHETABLE ct, int *num_entries_ptr, int *hash_size_ptr, long *size_current_ptr, long *size_limit_ptr);
int cachetable_get_key_state(CACHETABLE ct, CACHEKEY key, void **value_ptr,
......@@ -72,4 +78,7 @@ int cachetable_get_key_state(CACHETABLE ct, CACHEKEY key, void **value_ptr,
void cachefile_verify (CACHEFILE cf); // Verify the whole cachetable that the CF is in. Slow.
void cachetable_verify (CACHETABLE t); // Slow...
TOKULOGGER cachefile_logger (CACHEFILE);
FILENUM cachefile_filenum (CACHEFILE);
#endif
CFLAGS = -O2 -Wall -W -Werror -g
LDFLAGS = -lz -lssl -g
adler32:
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <zlib.h>
#include <openssl/md2.h>
#include <openssl/md4.h>
#include <openssl/md5.h>
const unsigned int prime = 2000000011;
unsigned int karprabin (unsigned char *datac, int N) {
assert(N%4==0);
unsigned int *data=(unsigned int*)datac;
N=N/4;
int i;
unsigned int result=0;
for (i=0; i<N; i++) {
result=(result*prime)+data[i];
}
return result;
}
// According to
// P. L'Ecuyer, "Tables of Linear Congruential Generators of
// Different Sizes and Good Lattice Structure", Mathematics of
// Computation 68:225, 249--260 (1999).
// m=2^{32}-5 a=1588635695 is good.
const unsigned int mkr = 4294967291U;
const unsigned int akr = 1588635695U;
// But this is slower
unsigned int karprabinP (unsigned char *datac, int N) {
assert(N%4==0);
unsigned int *data=(unsigned int*)datac;
N=N/4;
int i;
unsigned long long result=0;
for (i=0; i<N; i++) {
result=((result*akr)+data[i])%mkr;
}
return result;
}
float tdiff (struct timeval *start, struct timeval *end) {
return (end->tv_sec-start->tv_sec) +1e-6*(end->tv_usec - start->tv_usec);
}
int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__unused__))) {
struct timeval start, end;
const int N=2<<20;
unsigned char *data=malloc(N);
int i;
assert(data);
for (i=0; i<N; i++) data[i]=random();
// adler32
{
uLong a32 = adler32(0L, Z_NULL, 0);
for (i=0; i<3; i++) {
gettimeofday(&start, 0);
a32 = adler32(a32, data, N);
gettimeofday(&end, 0);
float tm = tdiff(&start, &end);
printf("adler32=%lu, time=%9.6fs %9.6fns/b\n", a32, tm, 1e9*tm/N);
}
}
// crc32
{
uLong c32 = crc32(0L, Z_NULL, 0);
for (i=0; i<3; i++) {
gettimeofday(&start, 0);
c32 = crc32(c32, data, N);
gettimeofday(&end, 0);
float tm = tdiff(&start, &end);
printf("crc32=%lu, time=%9.6fs %9.6fns/b\n", c32, tm, 1e9*tm/N);
}
}
// MD2
{
unsigned char buf[MD2_DIGEST_LENGTH];
int j;
for (i=0; i<3; i++) {
gettimeofday(&start, 0);
MD2(data, N, buf);
gettimeofday(&end, 0);
float tm = tdiff(&start, &end);
printf("md2=");
for (j=0; j<MD2_DIGEST_LENGTH; j++) {
printf("%02x", buf[j]);
}
printf(" time=%9.6fs %9.6fns/b\n", tm, 1e9*tm/N);
}
}
// MD4
{
unsigned char buf[MD4_DIGEST_LENGTH];
int j;
for (i=0; i<3; i++) {
gettimeofday(&start, 0);
MD4(data, N, buf);
gettimeofday(&end, 0);
float tm = tdiff(&start, &end);
printf("md4=");
for (j=0; j<MD4_DIGEST_LENGTH; j++) {
printf("%02x", buf[j]);
}
printf(" time=%9.6fs %9.6fns/b\n", tm, 1e9*tm/N);
}
}
// MD5
{
unsigned char buf[MD5_DIGEST_LENGTH];
int j;
for (i=0; i<3; i++) {
gettimeofday(&start, 0);
MD5(data, N, buf);
gettimeofday(&end, 0);
float tm = tdiff(&start, &end);
printf("md5=");
for (j=0; j<MD5_DIGEST_LENGTH; j++) {
printf("%02x", buf[j]);
}
printf(" time=%9.6fs %9.6fns/b\n", tm, 1e9*tm/N);
}
}
// karp rabin
{
for (i=0; i<3; i++) {
gettimeofday(&start, 0);
unsigned int kr = karprabin(data, N);
gettimeofday(&end, 0);
float tm = tdiff(&start, &end);
printf("kr=%ud time=%9.6fs %9.6fns/b\n", kr, tm, 1e9*tm/N);
}
}
free(data);
return 0;
}
#ifndef TOKU_CRC_H
#define TOKU_CRC_H
#include <zlib.h>
// zlib crc32 has a bug: If len==0 then it should return oldcrc32, but crc32 returns 0.
static inline u_int32_t toku_crc32 (u_int32_t oldcrc32, const void *data, u_int32_t len) {
if (len==0) return oldcrc32;
else return crc32(oldcrc32, data, len);
}
static const u_int32_t toku_null_crc = 0;
// Don't use crc32, use toku_crc32 to avoid that bug.
ZEXTERN uLong ZEXPORT crc32 OF((uLong crc, const Bytef *buf, uInt len)) __attribute__((deprecated));
#endif
#include <arpa/inet.h>
#include <assert.h>
#include "brt-internal.h"
// Calculate the fingerprint for a kvpair
static inline u_int32_t toku_calc_more_crc32_kvpair (u_int32_t crc, const void *key, int keylen, const void *val, int vallen) {
int i;
i = htonl(keylen);
crc = toku_crc32(crc, (void*)&i, 4);
crc = toku_crc32(crc, key, keylen);
i = htonl(vallen);
crc = toku_crc32(crc, (void*)&i, 4);
crc = toku_crc32(crc, val, vallen);
return crc;
}
u_int32_t toku_calccrc32_kvpair (const void *key, int keylen, const void *val, int vallen) {
return toku_calc_more_crc32_kvpair(toku_null_crc, key, keylen, val, vallen);
}
u_int32_t toku_calccrc32_cmd (int type, const void *key, int keylen, const void *val, int vallen) {
unsigned char type_c = type;
return toku_calc_more_crc32_kvpair(toku_crc32(toku_null_crc,
&type_c, 1),
key, keylen, val, vallen);
}
u_int32_t toku_calccrc32_cmdstruct (BRT_CMD *cmd) {
switch (cmd->type) {
case BRT_NONE:
case BRT_INSERT:
case BRT_DELETE:
return toku_calccrc32_cmd (cmd->type, cmd->u.id.key->data, cmd->u.id.key->size, cmd->u.id.val->data, cmd->u.id.val->size);
}
assert(0); /* Should not have come here. */
}
#include "brttypes.h"
#include "brt-internal.h"
#include "memory.h"
#include <sys/types.h>
#include <unistd.h>
#include <assert.h>
......@@ -32,7 +30,7 @@ int write_int (int fd, unsigned int v) {
return 0;
}
int read_diskoff (int fd, diskoff *result) {
int read_diskoff (int fd, DISKOFF *result) {
unsigned int i0,i1;
int r;
r = read_uint(fd, &i0); if(r!=0) return r;
......@@ -41,7 +39,7 @@ int read_diskoff (int fd, diskoff *result) {
return 0;
}
int write_diskoff (int fd, diskoff v) {
int write_diskoff (int fd, DISKOFF v) {
int r;
r = write_int(fd, (unsigned int)(v>>32)); if (r!=0) return r;
r = write_int(fd, (unsigned int)(v&0xffffffff)); if (r!=0) return r;
......@@ -97,14 +95,14 @@ int read_brt_header (int fd, struct brt_header *header) {
return 0;
}
int read_brt_h_unused_memory (int fd, diskoff *unused_memory) {
int read_brt_h_unused_memory (int fd, DISKOFF *unused_memory) {
off_t r = lseek(fd, 12, SEEK_SET);
assert(r==12);
r = read_diskoff(fd, unused_memory);
return r;
}
int write_brt_h_unused_memory (int fd, diskoff unused_memory) {
int write_brt_h_unused_memory (int fd, DISKOFF unused_memory) {
off_t r = lseek(fd, 12, SEEK_SET);
assert(r==12);
r = write_diskoff(fd, unused_memory);
......
// This list is intended to be embedded in other data structures.
struct list {
struct list *next, *prev;
};
......
#if defined(__x86_64) || defined(__i386)
static inline void mfence (void) {
__asm__ volatile ("mfence":::"memory");
}
static inline void rfence (void) {
__asm__ volatile ("rfence":::"memory");
}
static inline void sfence (void) {
__asm__ volatile ("sfence":::"memory");
}
/* According to the Intel Architecture Software Developer's
* Manual, Volume 3: System Programming Guide
* (http://www.intel.com/design/pro/manuals/243192.htm), page 7-6,
* "For the P6 family processors, locked operations serialize all
* outstanding load and store operations (that is, wait for them to
* complete)."
*
* Bradley found that fence instructions is faster on an opteron
* mfence takes 8ns on a 1.5GHZ AMD64 (maybe this is an 801)
* sfence takes 5ns
* lfence takes 3ns
* xchgl takes 14ns
*/
static inline lock_xchgl(volatile int *ptr, int x)
{
__asm__("xchgl %0,%1" :"=r" (x) :"m" (*(ptr)), "0" (x) :"memory");
return x;
}
#endif
typedef volatile int SPINLOCK[1];
static inline void spin_init (SPINLOCK v) {
v[0] = 0;
mfence();
}
static inline void spin_lock (SPINLOCK v) {
while (lock_xchgl((int*)v, 1)!=0) {
while (v[0]); /* Spin using only reads. It would be better to use MCS locks, but this reduces bus traffic. */
}
}
static inline void spin_unlock (SPINLOCK v) {
sfence(); // Want all previous stores to take place before we unlock.
v[0]=0;
}
#else
#error Need to define architectur-specific stuff for other machines.
#endif
CFLAGS=-O2 -Wall -W -Werror
LDFLAGS=-lpthread
trylock:
/* Time {m,l,s}fence vs.xchgl for a memory barrier. */
/* Timing numbers:
* Intel T2500 2GHZ
do1 9.0ns/loop
mfence: 29.0ns/loop (marginal cost= 20.0ns)
sfence: 17.3ns/loop (marginal cost= 8.3ns)
lfence: 23.6ns/loop (marginal cost= 14.6ns)
xchgl: 35.8ns/loop (marginal cost= 26.8ns)
* AMD Athlon 64 X2 Dual Core Processor 4200+
Timings are more crazy
do1 20.6ns/loop
mfence: 12.9ns/loop (marginal cost= -7.6ns)
sfence: 8.4ns/loop (marginal cost= -12.1ns)
lfence: 20.2ns/loop (marginal cost= -0.3ns)
xchgl: 16.6ns/loop (marginal cost= -3.9ns)
do1 13.0ns/loop
mfence: 25.6ns/loop (marginal cost= 12.6ns)
sfence: 21.0ns/loop (marginal cost= 8.1ns)
lfence: 12.9ns/loop (marginal cost= -0.1ns)
xchgl: 29.3ns/loop (marginal cost= 16.3ns)
*/
#include <sys/time.h>
#include <stdio.h>
enum { COUNT = 100000000 };
static inline void xchgl (void) {
{
/*
* According to the Intel Architecture Software Developer's
* Manual, Volume 3: System Programming Guide
* (http://www.intel.com/design/pro/manuals/243192.htm), page
* 7-6, "For the P6 family processors, locked operations
* serialize all outstanding load and store operations (that
* is, wait for them to complete)."
* Since xchg is locked by default, it is one way to do membar.
*/
int x=0, y;
asm volatile ("xchgl %0,%1" :"=r" (x) :"m" (y), "0" (x) :"memory");
}
}
static inline void mfence (void) {
asm volatile ("mfence":::"memory");
}
static inline void lfence (void) {
asm volatile ("lfence":::"memory");
}
static inline void sfence (void) {
asm volatile ("sfence":::"memory");
}
double tdiff (struct timeval *start, struct timeval *end) {
return ((end->tv_sec-start->tv_sec + 1e-6*(end->tv_usec + start->tv_usec))/COUNT)*1e9;
}
double nop_cost;
void do1 (volatile int *x) {
int i;
struct timeval start, end;
gettimeofday(&start, 0);
for (i=0; i<COUNT; i++) {
x[0]++;
x[1]++;
x[2]++;
x[3]++;
}
gettimeofday(&end, 0);
printf("do1 %6.1fns/loop\n", nop_cost=tdiff(&start, &end));
}
#define doit(name) void do ##name (volatile int *x) { \
int i; \
struct timeval start, end; \
gettimeofday(&start, 0); \
for (i=0; i<COUNT; i++) { \
x[0]++; \
x[1]++; \
name(); \
x[2]++; \
x[3]++; \
} \
gettimeofday(&end, 0); \
double this_cost = tdiff(&start, &end); \
printf("%6s:%6.1fns/loop (marginal cost=%6.1fns)\n", #name, this_cost, this_cost-nop_cost); \
}
doit(mfence)
doit(lfence)
doit(sfence)
doit(xchgl)
int main (int argc __attribute__((__unused__)),
char *argv[] __attribute__((__unused__))) {
int x[4];
int i;
for (i=0; i<2; i++) {
do1(x);
domfence(x);
dosfence(x);
dolfence(x);
doxchgl(x);
}
return 0;
}
/* How expensive is
* - Obtaining a read-only lock for the first obtainer.
* - Obtaining it for the second one?
* - The third one? */
#include <assert.h>
#include <pthread.h>
#include <stdio.h>
#include <sys/time.h>
float tdiff (struct timeval *start, struct timeval *end) {
return 1e6*(end->tv_sec-start->tv_sec) +(end->tv_usec - start->tv_usec);
}
/* My own rwlock implementation. */
struct brwl {
int mutex;
int state; // 0 for unlocked, -1 for a writer, otherwise many readers
};
static inline int xchg(volatile int *ptr, int x)
{
__asm__("xchgl %0,%1" :"=r" (x) :"m" (*(ptr)), "0" (x) :"memory");
return x;
}
static inline void sfence (void) {
asm volatile ("sfence":::"memory");
}
static inline void brwl_rlock (struct brwl *l) {
while (xchg(&l->mutex, 1)) ;
l->state++;
#if 1
sfence();
l->mutex=0;
#else
xchg(&l->mutex, 0);
#endif
}
enum {K=1000};
pthread_rwlock_t rwlocks[K];
struct brwl blocks[K];
int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__unused__))) {
int j;
int i;
int r;
struct timeval start, end;
for (j=0; j<3; j++) {
for (i=0; i<K; i++) {
r=pthread_rwlock_init(&rwlocks[i], NULL);
assert(r==0);
}
gettimeofday(&start, 0);
for (i=0; i<K; i++) {
r = pthread_rwlock_tryrdlock(&rwlocks[i]);
assert(r==0);
}
gettimeofday(&end, 0);
printf("pthread_rwlock_tryrdlock took %9.3fus for %d ops: %9.3fus/lock (%9.3fMops/s)\n", tdiff(&start,&end), K, tdiff(&start,&end)/K, K/tdiff(&start,&end));
}
for (j=0; j<3; j++) {
for (i=0; i<K; i++) {
r=pthread_rwlock_init(&rwlocks[i], NULL);
assert(r==0);
}
gettimeofday(&start, 0);
for (i=0; i<K; i++) {
r = pthread_rwlock_rdlock(&rwlocks[i]);
assert(r==0);
}
gettimeofday(&end, 0);
printf("pthread_rwlock_rdlock took %9.3fus for %d ops: %9.3fus/lock (%9.3fMops/s)\n", tdiff(&start,&end), K, tdiff(&start,&end)/K, K/tdiff(&start,&end));
}
for (j=0; j<3; j++) {
for (i=0; i<K; i++) {
blocks[i].state=0;
blocks[i].mutex=0;
}
gettimeofday(&start, 0);
for (i=0; i<K; i++) {
brwl_rlock(&blocks[i]);
}
gettimeofday(&end, 0);
printf("brwl_rlock took %9.3fus for %d ops: %9.3fus/lock (%9.3fMops/s)\n", tdiff(&start,&end), K, tdiff(&start,&end)/K, K/tdiff(&start,&end));
}
return 0;
}
#define _MULTI_THREADED
#include <pthread.h>
#include <stdio.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/time.h>
float tdiff (struct timeval *start, struct timeval *end) {
return 1e6*(end->tv_sec-start->tv_sec) +(end->tv_usec - start->tv_usec);
}
/* Simple function to check the return code and exit the program
if the function call failed
*/
static void compResults(char *string, int rc) {
if (rc) {
printf("Error on : %s, rc=%d",
string, rc);
exit(EXIT_FAILURE);
}
return;
}
pthread_rwlock_t rwlock = PTHREAD_RWLOCK_INITIALIZER;
void *rdlockThread(void *arg)
{
int rc;
int count=0;
struct timeval start, end;
printf("Entered thread, getting read lock with mp wait\n");
Retry:
gettimeofday(&start, 0);
rc = pthread_rwlock_tryrdlock(&rwlock);
gettimeofday(&end, 0);
printf("pthread_rwlock_tryrdlock took %9.3fus\n", tdiff(&start,&end));
if (rc == EBUSY) {
if (count >= 10) {
printf("Retried too many times, failure!\n");
exit(EXIT_FAILURE);
}
++count;
printf("Could not get lock, do other work, then RETRY...\n");
sleep(1);
goto Retry;
}
compResults("pthread_rwlock_tryrdlock() 1\n", rc);
sleep(2);
printf("unlock the read lock\n");
gettimeofday(&start, 0);
rc = pthread_rwlock_unlock(&rwlock);
gettimeofday(&end, 0);
compResults("pthread_rwlock_unlock()\n", rc);
printf("%d.%6d to %d.%6d is %9.2f\n", start.tv_sec, start.tv_usec, end.tv_sec, end.tv_usec, tdiff(&start, &end));
printf("Secondary thread complete\n");
return NULL;
}
int main(int argc, char **argv)
{
int rc=0;
pthread_t thread;
struct timeval start, end;
printf("Enter Testcase - %s\n", argv[0]);
gettimeofday(&start, 0);
gettimeofday(&end, 0);
printf("nop Took %9.2f\n", tdiff(&start, &end));
{
int N=1000;
int i;
printf("Main, get and release the write lock %d times\n", N);
gettimeofday(&start, 0);
for (i=0; i<N; i++) {
rc = pthread_rwlock_wrlock(&rwlock);
rc = pthread_rwlock_unlock(&rwlock);
}
gettimeofday(&end, 0);
compResults("pthread_rwlock_wrlock()\n", rc);
printf("Took %9.2fns/op\n", 1000*tdiff(&start, &end)/N);
}
printf("Main, get the write lock\n");
gettimeofday(&start, 0);
rc = pthread_rwlock_wrlock(&rwlock);
gettimeofday(&end, 0);
compResults("pthread_rwlock_wrlock()\n", rc);
printf("Took %9.2f\n", tdiff(&start, &end));
printf("Main, create the try read lock thread\n");
rc = pthread_create(&thread, NULL, rdlockThread, NULL);
compResults("pthread_create\n", rc);
printf("Main, wait a bit holding the write lock\n");
sleep(5);
printf("Main, Now unlock the write lock\n");
gettimeofday(&start, 0);
rc = pthread_rwlock_unlock(&rwlock);
gettimeofday(&end, 0);
compResults("pthread_rwlock_unlock()\n", rc);
printf("Took %9.2f\n", tdiff(&start, &end));
printf("Main, wait for the thread to end\n");
rc = pthread_join(thread, NULL);
compResults("pthread_join\n", rc);
rc = pthread_rwlock_destroy(&rwlock);
compResults("pthread_rwlock_destroy()\n", rc);
printf("Main completed\n");
return 0;
}
......@@ -11,13 +11,20 @@ struct tokulogger {
int fd;
int n_in_file;
long long next_log_file_number;
LSN lsn;
char buf[LOGGER_BUF_SIZE];
int n_in_buf;
};
int tokulogger_find_next_unused_log_file(const char *directory, long long *result);
enum { LT_INSERT_WITH_NO_OVERWRITE = 'I', LT_DELETE = 'D', LT_COMMIT = 'C' };
enum {
LT_COMMIT = 'C',
LT_DELETE = 'D',
LT_INSERT_WITH_NO_OVERWRITE = 'I',
LT_CHECKPOINT = 'P',
LT_BLOCK_RENAME = 'R'
};
struct tokutxn {
u_int64_t txnid64;
......
......@@ -2,7 +2,6 @@
#include "log-internal.h"
#include "wbuf.h"
#include "memory.h"
#include "../src/ydb-internal.h"
#include <dirent.h>
#include <errno.h>
#include <fcntl.h>
......@@ -11,6 +10,7 @@
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/uio.h>
#include "../src/ydb-internal.h"
int tokulogger_find_next_unused_log_file(const char *directory, long long *result) {
DIR *d=opendir(directory);
......@@ -44,6 +44,9 @@ int tokulogger_create_and_open_logger (const char *directory, TOKULOGGER *result
result->fd = -1;
result->next_log_file_number = nexti;
result->n_in_buf = 0;
result->lsn.lsn = 0; // WRONG!!! This should actually be calculated by looking at the log file.
*resultp=result;
return tokulogger_log_bytes(result, 0, "");
}
......@@ -85,26 +88,6 @@ int tokulogger_log_bytes(TOKULOGGER logger, int nbytes, void *bytes) {
return 0;
}
// Log an insertion of a key-value pair into a particular node of the tree.
int tokulogger_log_brt_insert_with_no_overwrite (TOKULOGGER logger,
TXNID txnid,
diskoff diskoff,
unsigned char *key,
int keylen,
unsigned char *val,
int vallen) {
int buflen=30+keylen+vallen;
unsigned char buf[buflen];
struct wbuf wbuf;
wbuf_init(&wbuf, buf, buflen) ;
wbuf_char(&wbuf, LT_INSERT_WITH_NO_OVERWRITE);
wbuf_txnid(&wbuf, txnid);
wbuf_diskoff(&wbuf, diskoff);
wbuf_bytes(&wbuf, key, keylen);
wbuf_bytes(&wbuf, val, vallen);
return tokulogger_log_bytes(logger, wbuf.ndone, wbuf.buf);
}
int tokulogger_log_close(TOKULOGGER *loggerp) {
TOKULOGGER logger = *loggerp;
int r = 0;
......@@ -133,29 +116,6 @@ n
}
#endif
int tokulogger_log_phys_add_or_delete_in_leaf (DB *db, TOKUTXN txn, diskoff diskoff, int is_add, const struct kv_pair *pair) {
if (txn==0) return 0;
int keylen = pair->keylen;
int vallen = pair->vallen;
int buflen=(keylen+vallen+4+4 // the key and value
+1 // log command
+8 // txnid
+8 // fileid
+8 // diskoff
);
unsigned char buf[buflen];
struct wbuf wbuf;
wbuf_init(&wbuf, buf, buflen) ;
wbuf_char(&wbuf, is_add ? LT_INSERT_WITH_NO_OVERWRITE : LT_DELETE);
wbuf_txnid(&wbuf, txn->txnid64);
wbuf_fileid(&wbuf, db->i->fileid);
wbuf_diskoff(&wbuf, diskoff);
wbuf_bytes(&wbuf, kv_pair_key_const(pair), keylen);
wbuf_bytes(&wbuf, kv_pair_val_const(pair), vallen);
return tokulogger_log_bytes(txn->logger, wbuf.ndone, wbuf.buf);
}
int tokulogger_fsync (TOKULOGGER logger) {
//return 0;/// NO TXN
//fprintf(stderr, "%s:%d syncing log\n", __FILE__, __LINE__);
......@@ -171,19 +131,101 @@ int tokulogger_fsync (TOKULOGGER logger) {
return 0;
}
static int tokulogger_finish (TOKULOGGER logger, struct wbuf *wbuf) {
wbuf_int(wbuf, toku_crc32(0, wbuf->buf, wbuf->ndone));
wbuf_int(wbuf, 4+wbuf->ndone);
return tokulogger_log_bytes(logger, wbuf->ndone, wbuf->buf);
}
// Log an insertion of a key-value pair into a particular node of the tree.
int tokulogger_log_brt_insert_with_no_overwrite (TOKULOGGER logger,
TXNID txnid,
FILENUM fileid,
DISKOFF diskoff,
unsigned char *key,
int keylen,
unsigned char *val,
int vallen) {
int buflen=(keylen+vallen+4+4 // key and value
+1 // command
+8 // lsn
+8 // txnid
+4 // fileid
+8 // diskoff
+8 // crc and len
);
unsigned char buf[buflen];
struct wbuf wbuf;
wbuf_init(&wbuf, buf, buflen) ;
wbuf_char(&wbuf, LT_INSERT_WITH_NO_OVERWRITE);
wbuf_lsn (&wbuf, logger->lsn); logger->lsn.lsn++;
wbuf_txnid(&wbuf, txnid);
wbuf_filenum(&wbuf, fileid);
wbuf_diskoff(&wbuf, diskoff);
wbuf_bytes(&wbuf, key, keylen);
wbuf_bytes(&wbuf, val, vallen);
return tokulogger_finish (logger, &wbuf);
}
int tokulogger_log_phys_add_or_delete_in_leaf (DB *db, TOKUTXN txn, DISKOFF diskoff, int is_add, const struct kv_pair *pair) {
if (txn==0) return 0;
assert(db);
int keylen = pair->keylen;
int vallen = pair->vallen;
const int buflen=(keylen+vallen+4+4 // the key and value
+1 // log command
+8 // lsn
+8 // txnid
+8 // fileid
+8 // diskoff
+8 // crc & len
);
unsigned char buf[buflen];
struct wbuf wbuf;
wbuf_init(&wbuf, buf, buflen) ;
wbuf_char(&wbuf, is_add ? LT_INSERT_WITH_NO_OVERWRITE : LT_DELETE);
wbuf_lsn (&wbuf, txn->logger->lsn);
txn->logger->lsn.lsn++;
wbuf_txnid(&wbuf, txn->txnid64);
wbuf_filenum(&wbuf, db->i->fileid);
wbuf_diskoff(&wbuf, diskoff);
wbuf_bytes(&wbuf, kv_pair_key_const(pair), keylen);
wbuf_bytes(&wbuf, kv_pair_val_const(pair), vallen);
return tokulogger_finish(txn->logger, &wbuf);
}
int tokulogger_log_commit (TOKUTXN txn) {
struct wbuf wbuf;
int buflen =30;
const int buflen = (1 // log command
+8 // lsn
+8 // txnid
+8 // crc & len
);
unsigned char buf[buflen];
wbuf_init(&wbuf, buf, buflen);
wbuf_char(&wbuf, LT_COMMIT);
wbuf_lsn (&wbuf, txn->logger->lsn);
txn->logger->lsn.lsn++;
wbuf_txnid(&wbuf, txn->txnid64);
int r = tokulogger_log_bytes(txn->logger, wbuf.ndone, wbuf.buf);
int r = tokulogger_finish(txn->logger, &wbuf);
if (r!=0) return r;
if (txn->parent) return 0;
else return tokulogger_fsync(txn->logger);
}
int tokulogger_log_checkpoint (TOKULOGGER logger, LSN *lsn) {
struct wbuf wbuf;
const int buflen =10;
unsigned char buf[buflen];
wbuf_init(&wbuf, buf, buflen);
wbuf_char(&wbuf, LT_CHECKPOINT);
wbuf_lsn (&wbuf, logger->lsn);
*lsn = logger->lsn;
logger->lsn.lsn++;
return tokulogger_log_bytes(logger, wbuf.ndone, wbuf.buf);
}
int tokutxn_begin (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TXNID txnid64, TOKULOGGER logger) {
TAGMALLOC(TOKUTXN, result);
if (result==0) return errno;
......@@ -194,3 +236,35 @@ int tokutxn_begin (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TXNID txnid64, TOKU
return 0;
}
int tokulogger_log_block_rename (TOKULOGGER logger, FILENUM fileid, DISKOFF olddiskoff, DISKOFF newdiskoff, DISKOFF parentdiskoff, int childnum) {
const int buflen=(+1 // log command
+8 // lsn
+8 // fileid
+8 // olddiskoff
+8 // newdiskoff
+8 // parentdiskoff
+4 // childnum
+8 // crc & len
);
unsigned char buf[buflen];
struct wbuf wbuf;
wbuf_init (&wbuf, buf, buflen) ;
wbuf_char (&wbuf, LT_BLOCK_RENAME);
wbuf_lsn (&wbuf, logger->lsn);
logger->lsn.lsn++;
wbuf_filenum(&wbuf, fileid);
wbuf_diskoff(&wbuf, olddiskoff);
wbuf_diskoff(&wbuf, newdiskoff);
wbuf_diskoff(&wbuf, parentdiskoff);
wbuf_int (&wbuf, childnum);
return tokulogger_finish(logger, &wbuf);
}
/*
int brtenv_checkpoint (BRTENV env) {
init the checkpointing lock
acquire_spinlock(&env->checkpointing);
release_spinlock(&env->checkpointing);
return -1;
}
*/
......@@ -3,16 +3,17 @@
#include "../include/db.h"
#include "brttypes.h"
#include "kv-pair.h"
typedef struct tokulogger *TOKULOGGER;
typedef struct tokutxn *TOKUTXN;
int tokulogger_create_and_open_logger (const char *directory, TOKULOGGER *resultp);
int tokulogger_log_bytes(TOKULOGGER logger, int nbytes, void *bytes);
int tokulogger_log_close(TOKULOGGER *logger);
int tokulogger_log_checkpoint (TOKULOGGER, LSN*);
int tokulogger_log_phys_add_or_delete_in_leaf (DB *db, TOKUTXN txn, diskoff diskoff, int is_add, const struct kv_pair *pair);
int tokulogger_log_phys_add_or_delete_in_leaf (DB *db, TOKUTXN txn, DISKOFF diskoff, int is_add, const struct kv_pair *pair);
int tokulogger_log_commit (TOKUTXN txn);
int tokulogger_log_block_rename (TOKULOGGER logger, FILENUM fileid, DISKOFF olddiskoff, DISKOFF newdiskoff, DISKOFF parentdiskoff, int childnum);
int tokutxn_begin (TOKUTXN /*parent*/,TOKUTXN *, TXNID txnid64, TOKULOGGER logger);
#endif
......@@ -47,4 +47,8 @@ void *mempool_malloc(struct mempool *mp, int size, int alignment);
pool does not keep track of the locations of the free chunks */
void mempool_mfree(struct mempool *mp, void *vp, int size);
static inline int mempool_inrange(struct mempool *mp, void *vp, int size) {
return mp->base <= vp && vp + size <= mp->base + mp->size;
}
#endif
......@@ -10,6 +10,7 @@ struct pma_cursor {
struct pma {
enum typ_tag tag;
int dup_mode;
int N; /* How long is the array? Always a power of two >= 4. */
int n_pairs_present; /* How many array elements are non-null. */
struct kv_pair **pairs;
......@@ -23,7 +24,8 @@ struct pma {
* The density step is 0.10. */
double ldt_step; /* lower density threshold step */
struct list cursors;
int (*compare_fun)(DB*,const DBT*,const DBT*);
pma_compare_fun_t compare_fun;
pma_compare_fun_t dup_compare_fun;
void *skey, *sval; /* used in dbts */
struct mempool kvspace;
};
......@@ -36,49 +38,6 @@ int pmainternal_make_space_at (PMA pma, int idx);
int pmainternal_find (PMA pma, DBT *, DB*); // The DB is so the comparison fuction can be called.
void print_pma (PMA pma); /* useful for debugging, so keep the name short. I.e., not pmainternal_print_pma() */
/*
* resize the pma array to asksize. zero all array entries starting from startx.
*/
int __pma_resize_array(PMA pma, int asksize, int startx);
/*
* extract pairs from the pma in the window delimited by lo and hi.
*/
struct kv_pair_tag *__pma_extract_pairs(PMA pma, int count, int lo, int hi);
/*
* update the cursors in a cursor set given a set of tagged pairs.
*/
void __pma_update_cursors(PMA pma, struct list *cursorset, struct kv_pair_tag *tpairs, int n);
/*
* update this pma's cursors given a set of tagged pairs.
*/
void __pma_update_my_cursors(PMA pma, struct kv_pair_tag *tpairs, int n);
/*
* a deletion occured at index "here" in the pma. rebalance the windows around "here". if
* necessary, shrink the pma.
*/
void __pma_delete_at(PMA pma, int here);
/*
* if the pma entry at here is deleted and there are no more references to it
* then finish the deletion
*/
void __pma_delete_resume(PMA pma, int here);
/*
* finish a deletion from the pma. called when there are no cursor references
* to the kv pair.
*/
void __pma_delete_finish(PMA pma, int here);
/*
* count the number of cursors that reference a pma pair
*/
int __pma_count_cursor_refs(PMA pma, int here);
/* density thresholds */
#define PMA_LDT_HIGH 0.25
#define PMA_LDT_LOW 0.40
......
This diff is collapsed.
This diff is collapsed.
......@@ -10,11 +10,26 @@
/* An in-memory Packed Memory Array dictionary. */
/* There is a built-in-cursor. */
/* All functions return 0 on success. */
typedef struct pma *PMA;
typedef struct pma_cursor *PMA_CURSOR;
/* All functions return 0 on success. */
int pma_create(PMA *, int (*compare_fun)(DB*,const DBT*,const DBT*), int maxsize);
/* compare 2 DBT's
return a value < 0, = 0, > 0 if a < b, a == b, a > b respectively */
typedef int (*pma_compare_fun_t)(DB *, const DBT *a, const DBT *b);
int pma_create(PMA *, pma_compare_fun_t compare_fun, int maxsize);
/* set the duplicate mode
0 -> no duplications, DB_DUP, DB_DUPSORT */
int pma_set_dup_mode(PMA pma, int mode);
/* set the duplicate compare function */
int pma_set_dup_compare(PMA pma, pma_compare_fun_t dup_compare_fun);
/* verify the integrity of a pma */
void pma_verify(PMA pma, DB *db);
/* returns 0 if OK.
* You must have freed all the cursors, otherwise returns nonzero and does nothing. */
......@@ -28,15 +43,16 @@ int pma_n_entries (PMA);
/* Duplicates the key and keylen. */
//enum pma_errors pma_insert (PMA, bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen);
// The DB pointer is there so that the comparison function can be called.
enum pma_errors pma_insert (PMA, DBT*, DBT*, DB*, TOKUTXN txn, diskoff);
enum pma_errors pma_insert (PMA, DBT*, DBT*, DB*, TOKUTXN txn, DISKOFF, u_int32_t /*random for fingerprint */, u_int32_t */*fingerprint*/);
/* This returns an error if the key is NOT present. */
int pma_replace (PMA, bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen);
/* This returns an error if the key is NOT present. */
int pma_delete (PMA, DBT *, DB*);
int pma_delete (PMA, DBT *, DB*, u_int32_t /*random for fingerprint*/, u_int32_t */*fingerprint*/);
int pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
int *replaced_v_size, /* If it is a replacement, set to the size of the old value, otherwise set to -1. */
DB *db, TOKUTXN txn, diskoff);
DB *db, TOKUTXN txn, DISKOFF,
u_int32_t /*random for fingerprint*/, u_int32_t */*fingerprint*/);
/* Exposes internals of the PMA by returning a pointer to the guts.
......@@ -53,13 +69,14 @@ enum pma_errors pma_lookup (PMA, DBT*, DBT*, DB*);
* leftpma - the pma assigned keys <= pivot key
* rightpma - the pma assigned keys > pivot key
*/
int pma_split(PMA origpma, unsigned int *origpma_size,
PMA leftpma, unsigned int *leftpma_size,
PMA rightpma, unsigned int *rightpma_size);
int pma_split(PMA origpma, unsigned int *origpma_size,
PMA leftpma, unsigned int *leftpma_size, u_int32_t leftrand4sum, u_int32_t *leftfingerprint,
PMA rightpma, unsigned int *rightpma_size, u_int32_t rightrand4sum, u_int32_t *rightfingerprint);
/*
* Insert several key value pairs into an empty pma. The keys are
* assumed to be sorted.
* Insert several key value pairs into an empty pma.
* Doesn't delete any existing keys (even if they are duplicates)
* Requires: The keys are sorted
*
* pma - the pma that the key value pairs will be inserted into.
* must be empty with no cursors.
......@@ -67,7 +84,7 @@ int pma_split(PMA origpma, unsigned int *origpma_size,
* vals - an array of values
* n_newpairs - the number of key value pairs
*/
int pma_bulk_insert(PMA pma, DBT *keys, DBT *vals, int n_newpairs);
int pma_bulk_insert(PMA pma, DBT *keys, DBT *vals, int n_newpairs, u_int32_t rand4sem, u_int32_t *fingerprint);
/* Move the cursor to the beginning or the end or to a key */
int pma_cursor (PMA, PMA_CURSOR *);
......@@ -122,4 +139,6 @@ void pma_iterate (PMA, void(*)(bytevec,ITEMLEN,bytevec,ITEMLEN, void*), void*);
int keycompare (bytevec key1, ITEMLEN key1len, bytevec key2, ITEMLEN key2len);
void pma_verify_fingerprint (PMA pma, u_int32_t rand4fingerprint, u_int32_t fingerprint);
#endif
......@@ -31,7 +31,8 @@ void create_directory (void) {
assert(r==0);
r=env->set_cachesize(env, 0, 512*(1<<20), 0);
assert(r==0);
#if DB_VERSION_MAJOR >= 4 && DB_VERSION_MINOR >= 3
IF40((void)0,
({
unsigned int gbytes,bytes;
......@@ -40,7 +41,7 @@ void create_directory (void) {
assert(r==0);
printf("Using %.2fMiB Berkeley DB Cache Size\n", gbytes*1024 + ((double)bytes/(1<<20)));
}));
#endif
r= env->open(env, dir, DB_CREATE|DB_INIT_MPOOL,0777); // No logging.
assert(r==0);
......
......@@ -25,16 +25,26 @@ static unsigned int rbuf_int (struct rbuf *r) {
(c3<<0));
}
static inline void rbuf_literal_bytes (struct rbuf *r, bytevec *bytes, unsigned int n_bytes) {
*bytes = &r->buf[r->ndone];
r->ndone+=n_bytes;
assert(r->ndone<=r->size);
}
/* Return a pointer into the middle of the buffer. */
static void rbuf_bytes (struct rbuf *r, bytevec *bytes, unsigned int *n_bytes)
{
*n_bytes = rbuf_int(r);
*bytes = &r->buf[r->ndone];
r->ndone+=*n_bytes;
assert(r->ndone<=r->size);
rbuf_literal_bytes(r, bytes, *n_bytes);
}
static unsigned long long rbuf_ulonglong (struct rbuf *r) {
unsigned i0 = rbuf_int(r);
unsigned i1 = rbuf_int(r);
return ((unsigned long long)(i0)<<32) | ((unsigned long long)(i1));
}
static diskoff rbuf_diskoff (struct rbuf *r) {
static DISKOFF rbuf_diskoff (struct rbuf *r) {
unsigned i0 = rbuf_int(r);
unsigned i1 = rbuf_int(r);
return ((unsigned long long)(i0)<<32) | ((unsigned long long)(i1));
......
/* Readers/writers locks implementation
*
*****************************************
* Overview
*****************************************
*
* TokuDB employs readers/writers locks for the ephemeral locks (e.g.,
* on BRT nodes) Why not just use the pthread_rwlock API?
*
* 1) we need multiprocess rwlocks (not just multithreaded)
*
* 2) pthread rwlocks are very slow since they entail a system call
* (about 2000ns on a 2GHz T2500.)
*
* Related: We expect the common case to be that the lock is
* granted
*
* 3) We are willing to employ machine-specific instructions (such
* as atomic exchange, and mfence, each of which runs in about
* 10ns.)
*
* 4) We want to guarantee nonstarvation (many rwlock
* implementations can starve the writers because another reader
* comes * along before all the other readers have unlocked.)
*
*****************************************
* How it works
*****************************************
*
* We arrange that the rwlock object is in the address space of both
* threads or processes. For processes we use mmap().
*
* The rwlock struct comprises the following fields
*
* a long mutex field (which is accessed using xchgl() or other
* machine-specific instructions. This is a spin lock.
*
* a read counter (how many readers currently have the lock?)
*
* a write boolean (does a writer have the lock?)
*
* a singly linked list of semaphores for waiting requesters. This
* list is sorted oldest requester first. Each list element
* contains a semaphore (which is provided by the requestor) and a
* boolean indicating whether it is a reader or a writer.
*
* To lock a read rwlock:
*
* 1) Acquire the mutex.
*
* 2) If the linked list is not empty or the writer boolean is true
* then
*
* a) initialize your semaphore (to 0),
* b) add your list element to the end of the list (with rw="read")
* c) release the mutex
* d) wait on the semaphore
* e) when the semaphore release, return success.
*
* 3) Otherwise increment the reader count, release the mutex, and
* return success.
*
* To lock the write rwlock is almost the same.
* 1) Acquire the mutex
* 2) If the list is not empty or the reader count is nonzero
* a) initialize semaphore
* b) add to end of list (with rw="write")
* c) release mutex
* d) wait on the semaphore
* e) return success when the semaphore releases
* 3) Otherwise set writer=TRUE, release mutex and return success.
*
* To unlock a read rwlock:
* 1) Acquire mutex
* 2) Decrement reader count
* 3) If the count is still positive or the list is empty then
* return success
* 4) Otherwise (count==zero and the list is nonempty):
* a) If the first element of the list is a reader:
* i) while the first element is a reader:
* x) pop the list
* y) increment the reader count
* z) increment the semaphore (releasing it for some waiter)
* ii) return success
* b) Else if the first element is a writer
* i) pop the list
* ii) set writer to TRUE
* iii) increment the semaphore
* iv) return success
*/
......@@ -6,6 +6,14 @@
#include <errno.h>
#include "memory.h"
//#define CRC_NO
#define CRC_INCR
//#define CRC_ATEND
#ifndef CRC_NO
#include "crc.h"
#endif
/* When serializing a value, write it into a buffer. */
/* This code requires that the buffer be big enough to hold whatever you put into it. */
/* This abstraction doesn't do a good job of hiding its internals.
......@@ -14,18 +22,27 @@ struct wbuf {
unsigned char *buf;
unsigned int size;
unsigned int ndone;
#ifdef CRC_INCR
u_int32_t crc32; // A 32-bit CRC of everything written so foar.
#endif
};
static void wbuf_init (struct wbuf *w, void *buf, diskoff size) {
static void wbuf_init (struct wbuf *w, void *buf, DISKOFF size) {
w->buf=buf;
w->size=size;
w->ndone=0;
#ifdef CRC_INCR
w->crc32 = toku_crc32(0L, Z_NULL, 0);
#endif
}
/* Write a character. */
static inline void wbuf_char (struct wbuf *w, int ch) {
assert(w->ndone<w->size);
w->buf[w->ndone++]=ch;
#ifdef CRC_INCR
w->crc32 = toku_crc32(w->crc32, &w->buf[w->ndone-1], 1);
#endif
}
static void wbuf_int (struct wbuf *w, unsigned int i) {
......@@ -40,20 +57,31 @@ static void wbuf_int (struct wbuf *w, unsigned int i) {
w->buf[w->ndone+1] = i>>16;
w->buf[w->ndone+2] = i>>8;
w->buf[w->ndone+3] = i>>0;
#ifdef CRC_INCR
w->crc32 = toku_crc32(w->crc32, &w->buf[w->ndone], 4);
#endif
w->ndone += 4;
#endif
}
static void wbuf_bytes (struct wbuf *w, bytevec bytes_bv, int nbytes) {
static inline void wbuf_literal_bytes(struct wbuf *w, bytevec bytes_bv, int nbytes) {
const unsigned char *bytes=bytes_bv;
wbuf_int(w, nbytes);
#if 0
{ int i; for (i=0; i<nbytes; i++) wbuf_char(w, bytes[i]); }
#else
assert(w->ndone + nbytes <= w->size);
memcpy(w->buf + w->ndone, bytes, nbytes);
#ifdef CRC_INCR
w->crc32 = toku_crc32(w->crc32, &w->buf[w->ndone], nbytes);
#endif
w->ndone += nbytes;
#endif
}
static void wbuf_bytes (struct wbuf *w, bytevec bytes_bv, int nbytes) {
wbuf_int(w, nbytes);
wbuf_literal_bytes(w, bytes_bv, nbytes);
}
static void wbuf_ulonglong (struct wbuf *w, unsigned long long ull) {
......@@ -61,7 +89,7 @@ static void wbuf_ulonglong (struct wbuf *w, unsigned long long ull) {
wbuf_int(w, ull&0xFFFFFFFF);
}
static void wbuf_diskoff (struct wbuf *w, diskoff off) {
static void wbuf_diskoff (struct wbuf *w, DISKOFF off) {
wbuf_ulonglong(w, off);
}
......@@ -69,8 +97,12 @@ static inline void wbuf_txnid (struct wbuf *w, TXNID tid) {
wbuf_ulonglong(w, tid);
}
static inline void wbuf_fileid (struct wbuf *w, unsigned long long fileid) {
wbuf_ulonglong(w, fileid);
static inline void wbuf_lsn (struct wbuf *w, LSN lsn) {
wbuf_ulonglong(w, lsn.lsn);
}
static inline void wbuf_filenum (struct wbuf *w, FILENUM fileid) {
wbuf_int(w, fileid.fileid);
}
#endif
......@@ -16,9 +16,11 @@ DBT *fill_dbt(DBT *dbt, bytevec k, ITEMLEN len) {
return dbt;
}
DBT *fill_dbt_ap(DBT *dbt, bytevec k, ITEMLEN len, void *app_private) {
DBT *fill_dbt_ap(DBT *dbt, bytevec k, ITEMLEN len, void *app_private __attribute__((unused))) {
fill_dbt(dbt, k, len);
#if USE_DBT_APP_PRIVATE
dbt->app_private=app_private;
#endif
return dbt;
}
......
......@@ -11,4 +11,22 @@ DBT *fill_dbt(DBT *dbt, bytevec k, ITEMLEN len);
DBT *fill_dbt_ap(DBT *dbt, bytevec k, ITEMLEN len, void *app_private);
int ybt_set_value (DBT *, bytevec val, ITEMLEN vallen, void **staticptrp);
#ifndef USE_DBT_APP_PRIVATE
#define USE_DBT_APP_PRIVATE 0
#endif
static inline void *dbt_get_app_private(DBT *dbt __attribute__((unused))) {
#if USE_DBT_APP_PRIVATE
return dbt->app_private;
#else
return 0;
#endif
}
static inline void dbt_set_app_private(DBT *dbt __attribute__((unused)), void *ap __attribute__((unused))) {
#if USE_DBT_APP_PRIVATE
dbt->app_private = ap;
#endif
}
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment