Commit 62cfb06c authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Things compile again after fiddling with the compare function. There's a malloc bug, however

git-svn-id: file:///svn/tokudb@32 c7de825b-a66e-492c-adef-691d508d4ae1
parent a52b758e
# GCOV_FLAGS = -fprofile-arcs -ftest-coverage
#PROF_FLAGS = -pg
#OPTFLAGS = -O2
OPTFLAGS = -O2
CFLAGS = -Wall -W $(OPTFLAGS) -g $(GCOV_FLAGS) $(PROF_FLAGS) -Werror -fPIC
LDFLAGS = $(OPTFLAGS) -g $(GCOV_FLAGS) $(PROF_FLAGS)
......@@ -24,7 +24,7 @@ pma.o: pma.h yerror.h pma-internal.h memory.h key.h ybt.h brttypes.h ../include
ybt.o: ybt.h brttypes.h
ybt-test: ybt-test.o ybt.o memory.o
cachetable.o: cachetable.h
brt-test: brt.o hashtable.o pma.o memory.o brt-serialize.o cachetable.o header-io.o ybt.o key.o
brt-test: ybt.o brt.o hashtable.o pma.o memory.o brt-serialize.o cachetable.o header-io.o ybt.o key.o
brt-test.o brt.o: brt.h cachetable.h brttypes.h
brt-serialize-test.o: pma.h yerror.h brt.h memory.h hashtable.h brttypes.h brt-internal.h
brt.o: brt.h mdict.h pma.h brttypes.h memory.h brt-internal.h cachetable.h
......
......@@ -63,13 +63,16 @@ struct brt {
struct brt_header *h;
BRT_CURSOR cursors_head, cursors_tail;
int (*compare_fun)(DB*,DBT*,DBT*);
void *skey,*sval; /* Used for DBT return values. */
};
/* serialization code */
void serialize_brtnode_to(int fd, diskoff off, diskoff size, BRTNODE node);
int deserialize_brtnode_from (int fd, diskoff off, BRTNODE *brtnode, int nodesize);
unsigned int serialize_brtnode_size(BRTNODE node); /* How much space will it take? */
unsigned int brtnode_which_child (BRTNODE node, bytevec key, ITEMLEN keylen);
int keycompare (bytevec key1, ITEMLEN key1len, bytevec key2, ITEMLEN key2len);
void verify_counts(BRTNODE);
......@@ -77,7 +80,7 @@ void verify_counts(BRTNODE);
int serialize_brt_header_to (int fd, struct brt_header *h);
int deserialize_brtheader_from (int fd, diskoff off, struct brt_header **brth);
static inline int brtnode_n_hashtables(BRTNODE node) { if (node->height==0) return 1; else return node->u.n.n_children; }
//static inline int brtnode_n_hashtables(BRTNODE node) { if (node->height==0) return 1; else return node->u.n.n_children; }
//int write_brt_header (int fd, struct brt_header *header);
......
......@@ -4,6 +4,7 @@
#include "memory.h"
//#include "pma.h"
#include "brt-internal.h"
#include "key.h"
#include <assert.h>
#include <unistd.h>
......@@ -87,7 +88,7 @@ static unsigned int serialize_brtnode_size_slow(BRTNODE node) {
for (i=0; i<node->u.n.n_children; i++) {
size+=8;
}
int n_hashtables = brtnode_n_hashtables(node);
int n_hashtables = node->u.n.n_bytes_in_hashtables;
size+=4; /* n_entries */
for (i=0; i< n_hashtables; i++) {
HASHTABLE_ITERATE(node->u.n.htables[i],
......@@ -118,8 +119,7 @@ unsigned int serialize_brtnode_size (BRTNODE node) {
result+=4; /* n_children */
result+=4*(node->u.n.n_children-1); /* key lengths */
result+=node->u.n.totalchildkeylens; /* the lengths of the pivot keys, without their key lengths. */
result+=8*(node->u.n.n_children); /* child offsets. */
result+=4; /* n_entries in hash table. */
result+=(8+4)*(node->u.n.n_children); /* For each child, a child offset and a count for the number of hash table entries. */
result+=node->u.n.n_bytes_in_hashtables;
} else {
result+=4; /* n_entries in buffer table. */
......@@ -158,15 +158,10 @@ void serialize_brtnode_to(int fd, diskoff off, diskoff size, BRTNODE node) {
}
{
int n_entries=0;
int n_hash_tables = brtnode_n_hashtables(node);
int n_hash_tables = node->u.n.n_children;
for (i=0; i< n_hash_tables; i++) {
//printf("%s:%d p%d=%p n_entries=%d\n", __FILE__, __LINE__, i, node->mdicts[i], mdict_n_entries(node->mdicts[i]));
n_entries += hashtable_n_entries(node->u.n.htables[i]);
}
//printf("%s:%d n_entries=%d\n", __FILE__, __LINE__, n_entries);
wbuf_int(&w, n_entries);
for (i=0; i< n_hash_tables; i++) {
wbuf_int(&w, hashtable_n_entries(node->u.n.htables[i]));
HASHTABLE_ITERATE(node->u.n.htables[i], key, keylen, data, datalen,
(wbuf_bytes(&w, key, keylen),
wbuf_bytes(&w, data, datalen)));
......@@ -206,7 +201,7 @@ int deserialize_brtnode_from (int fd, diskoff off, BRTNODE *brtnode, int nodesiz
}
{
uint32_t datasize_n;
int r = pread(fd, &datasize_n, sizeof(datasize_n), off);
r = pread(fd, &datasize_n, sizeof(datasize_n), off);
//printf("%s:%d r=%d the datasize=%d\n", __FILE__, __LINE__, r, ntohl(datasize_n));
if (r!=sizeof(datasize_n)) {
if (r==-1) r=errno;
......@@ -261,42 +256,43 @@ int deserialize_brtnode_from (int fd, diskoff off, BRTNODE *brtnode, int nodesiz
result->u.n.n_bytes_in_hashtable[i] = 0;
}
result->u.n.n_bytes_in_hashtables = 0;
for (i=0; i<brtnode_n_hashtables(result); i++) {
for (i=0; i<result->u.n.n_children; i++) {
int r=hashtable_create(&result->u.n.htables[i]);
if (r!=0) {
int j;
if (0) { died_12: j=brtnode_n_hashtables(result); }
if (0) { died_12: j=result->u.n.n_bytes_in_hashtables; }
for (j=0; j<i; j++) hashtable_free(&result->u.n.htables[j]);
goto died1;
}
}
{
int n_in_hash = rbuf_int(&rc);
int cnum;
for (cnum=0; cnum<result->u.n.n_children; cnum++) {
int n_in_this_hash = rbuf_int(&rc);
//printf("%d in hash\n", n_in_hash);
for (i=0; i<n_in_hash; i++) {
int childnum, diff;
for (i=0; i<n_in_this_hash; i++) {
int diff;
bytevec key; ITEMLEN keylen;
bytevec val; ITEMLEN vallen;
verify_counts(result);
rbuf_bytes(&rc, &key, &keylen); /* Returns a pointer into the rbuf. */
rbuf_bytes(&rc, &val, &vallen);
//printf("Found %s,%s\n", key, val);
childnum = brtnode_which_child(result, key, keylen);
{
int r=hash_insert(result->u.n.htables[childnum], key, keylen, val, vallen); /* Copies the data into the hash table. */
int r=hash_insert(result->u.n.htables[cnum], key, keylen, val, vallen); /* Copies the data into the hash table. */
if (r!=0) { goto died_12; }
}
diff = keylen + vallen + KEY_VALUE_OVERHEAD;
result->u.n.n_bytes_in_hashtables += diff;
result->u.n.n_bytes_in_hashtable[childnum] += diff;
result->u.n.n_bytes_in_hashtable[cnum] += diff;
//printf("Inserted\n");
}
}
}
} else {
int n_in_buf = rbuf_int(&rc);
result->u.l.n_bytes_in_buffer = 0;
int r=pma_create(&result->u.l.buffer);
int r=pma_create(&result->u.l.buffer, default_compare_fun);
if (r!=0) {
if (0) { died_21: pma_free(&result->u.l.buffer); }
goto died1;
......@@ -309,7 +305,8 @@ int deserialize_brtnode_from (int fd, diskoff off, BRTNODE *brtnode, int nodesiz
rbuf_bytes(&rc, &key, &keylen); /* Returns a pointer into the rbuf. */
rbuf_bytes(&rc, &val, &vallen);
{
int r = pma_insert(result->u.l.buffer, key, keylen, val, vallen);
DBT k,v;
int r = pma_insert(result->u.l.buffer, fill_dbt(&k, key, keylen), fill_dbt(&v, val, vallen), 0);
if (r!=0) goto died_21;
}
result->u.l.n_bytes_in_buffer += keylen + vallen + KEY_VALUE_OVERHEAD;
......@@ -322,17 +319,6 @@ int deserialize_brtnode_from (int fd, diskoff off, BRTNODE *brtnode, int nodesiz
return 0;
}
unsigned int brtnode_which_child (BRTNODE node, bytevec key, ITEMLEN keylen) {
int i;
assert(node->height>0);
for (i=0; i<node->u.n.n_children-1; i++) {
if (keycompare(key, keylen, node->u.n.childkeys[i], node->u.n.childkeylens[i])<=0) {
return i;
}
}
return node->u.n.n_children-1;
}
void verify_counts (BRTNODE node) {
if (node->height==0) {
assert(node->u.l.buffer);
......
#include "brt.h"
#include "key.h"
#include "memory.h"
#include <assert.h>
......@@ -23,7 +24,7 @@ static void test0 (void) {
assert(r==0);
printf("%s:%d test0\n", __FILE__, __LINE__);
unlink(fname);
r = open_brt(fname, 0, 1, &t, 1024, ct);
r = open_brt(fname, 0, 1, &t, 1024, ct, default_compare_fun);
assert(r==0);
printf("%s:%d test0\n", __FILE__, __LINE__);
printf("%s:%d n_items_malloced=%lld\n", __FILE__, __LINE__, n_items_malloced);
......@@ -39,20 +40,20 @@ static void test1 (void) {
int r;
CACHETABLE ct;
char fname[]="testbrt.brt";
DBT k,v;
memory_check=1;
memory_check_all_free();
r = brt_create_cachetable(&ct, 0);
assert(r==0);
unlink(fname);
r = open_brt(fname, 0, 1, &t, 1024, ct);
r = open_brt(fname, 0, 1, &t, 1024, ct, default_compare_fun);
assert(r==0);
brt_insert(t, "hello", 6, "there", 6);
brt_insert(t, fill_dbt(&k, "hello", 6), fill_dbt(&v, "there", 6), 0);
{
bytevec val; ITEMLEN vallen;
r = brt_lookup(t, "hello", 6, &val, &vallen);
r = brt_lookup(t, fill_dbt(&k, "hello", 6), init_dbt(&v), 0);
assert(r==0);
assert(strcmp(val, "there")==0);
assert(vallen==6);
assert(strcmp(v.data, "there")==0);
assert(v.size==6);
}
r = close_brt(t); assert(r==0);
r = cachetable_close(ct); assert(r==0);
......@@ -71,14 +72,15 @@ static void test2 (int memcheck) {
memory_check_all_free();
r = brt_create_cachetable(&ct, 0); assert(r==0);
unlink(fname);
r = open_brt(fname, 0, 1, &t, 1024, ct);
r = open_brt(fname, 0, 1, &t, 1024, ct, default_compare_fun);
printf("%s:%d did setup\n", __FILE__, __LINE__);
assert(r==0);
for (i=0; i<2048; i++) {
DBT k,v;
char key[100],val[100];
snprintf(key,100,"hello%d",i);
snprintf(val,100,"there%d",i);
brt_insert(t, key, 1+strlen(key), val, 1+strlen(val));
brt_insert(t, fill_dbt(&k, key, 1+strlen(key)), fill_dbt(&v, val, 1+strlen(val)), 0);
//printf("%s:%d did insert %d\n", __FILE__, __LINE__, i);
if (0) {
brt_flush(t);
......@@ -109,13 +111,14 @@ static void test3 (int nodesize, int count, int memcheck) {
r = brt_create_cachetable(&ct, 0); assert(r==0);
gettimeofday(&t0, 0);
unlink(fname);
r = open_brt(fname, 0, 1, &t, nodesize, ct);
r = open_brt(fname, 0, 1, &t, nodesize, ct, default_compare_fun);
assert(r==0);
for (i=0; i<count; i++) {
char key[100],val[100];
DBT k,v;
snprintf(key,100,"hello%d",i);
snprintf(val,100,"there%d",i);
brt_insert(t, key, 1+strlen(key), val, 1+strlen(val));
brt_insert(t, fill_dbt(&k, key, 1+strlen(key)), fill_dbt(&v, val, 1+strlen(val)), 0);
}
r = close_brt(t); assert(r==0);
r = cachetable_close(ct); assert(r==0);
......@@ -139,13 +142,14 @@ static void test4 (int nodesize, int count, int memcheck) {
memory_check=memcheck;
memory_check_all_free();
r = brt_create_cachetable(&ct, 0); assert(r==0);
r = open_brt(fname, 0, 1, &t, nodesize,ct); assert(r==0);
r = open_brt(fname, 0, 1, &t, nodesize, ct, default_compare_fun); assert(r==0);
for (i=0; i<count; i++) {
char key[100],val[100];
int rv = random();
DBT k,v;
snprintf(key,100,"hello%d",rv);
snprintf(val,100,"there%d",i);
brt_insert(t, key, 1+strlen(key), val, 1+strlen(val));
brt_insert(t, fill_dbt(&k, key, 1+strlen(key)), fill_dbt(&v, val, 1+strlen(val)), 0);
}
r = close_brt(t); assert(r==0);
r = cachetable_close(ct); assert(r==0);
......@@ -170,7 +174,7 @@ static void test5 (void) {
for (i=0; i<limit; i++) values[i]=-1;
unlink(fname);
r = brt_create_cachetable(&ct, 0); assert(r==0);
r = open_brt(fname, 0, 1, &t, 1<<12, ct); assert(r==0);
r = open_brt(fname, 0, 1, &t, 1<<12, ct, default_compare_fun); assert(r==0);
for (i=0; i<limit/2; i++) {
char key[100],val[100];
int rk = random()%limit;
......@@ -179,22 +183,22 @@ static void test5 (void) {
values[rk] = rv;
snprintf(key, 100, "key%d", rk);
snprintf(val, 100, "val%d", rv);
brt_insert(t, key, 1+strlen(key), val, 1+strlen(val));
DBT k,v;
brt_insert(t, fill_dbt(&k, key, 1+strlen(key)), fill_dbt(&v, val, 1+strlen(val)), 0);
}
printf("\n");
for (i=0; i<limit/2; i++) {
int rk = random()%limit;
if (values[rk]>=0) {
char key[100], valexpected[100];
bytevec val;
ITEMLEN vallen;
DBT k,v;
if (i%1000==0) printf("r"); fflush(stdout);
snprintf(key, 100, "key%d", rk);
snprintf(valexpected, 100, "val%d", values[rk]);
r = brt_lookup(t, key, 1+strlen(key), &val, &vallen);
r = brt_lookup(t, fill_dbt(&k, key, 1+strlen(key)), init_dbt(&v), 0);
assert(r==0);
assert(vallen==(1+strlen(valexpected)));
assert(memcmp(val,valexpected,vallen)==0);
assert(v.size==(1+strlen(valexpected)));
assert(memcmp(v.data,valexpected,v.size)==0);
}
}
printf("\n");
......@@ -213,7 +217,7 @@ static void test_dump_empty_db (void) {
r = brt_create_cachetable(&ct, 0);
assert(r==0);
unlink(fname);
r = open_brt(fname, 0, 1, &t, 1024, ct);
r = open_brt(fname, 0, 1, &t, 1024, ct, default_compare_fun);
assert(r==0);
dump_brt(t);
r = close_brt(t); assert(r==0);
......@@ -233,15 +237,16 @@ static void test_multiple_files_of_size (int size) {
unlink(n1);
memory_check_all_free();
r = brt_create_cachetable(&ct, 0); assert(r==0);
r = open_brt(n0, 0, 1, &t0, size, ct); assert(r==0);
r = open_brt(n1, 0, 1, &t1, size, ct); assert(r==0);
r = open_brt(n0, 0, 1, &t0, size, ct, default_compare_fun); assert(r==0);
r = open_brt(n1, 0, 1, &t1, size, ct, default_compare_fun); assert(r==0);
for (i=0; i<10000; i++) {
char key[100],val[100];
DBT k,v;
snprintf(key, 100, "key%d", i);
snprintf(val, 100, "val%d", i);
brt_insert(t0, key, 1+strlen(key), val, 1+strlen(val));
brt_insert(t0, fill_dbt(&k, key, 1+strlen(key)), fill_dbt(&v, val, 1+strlen(val)), 0);
snprintf(val, 100, "Val%d", i);
brt_insert(t1, key, 1+strlen(key), val, 1+strlen(val));
brt_insert(t1, fill_dbt(&k, key, 1+strlen(key)), fill_dbt(&v, val, 1+strlen(val)), 0);
}
//verify_brt(t0);
//dump_brt(t0);
......@@ -256,26 +261,25 @@ static void test_multiple_files_of_size (int size) {
/* Now see if the data is all there. */
r = brt_create_cachetable(&ct, 0); assert(r==0);
r = open_brt(n0, 0, 0, &t0, 1<<12, ct);
r = open_brt(n0, 0, 0, &t0, 1<<12, ct, default_compare_fun);
printf("%s:%d r=%d\n", __FILE__, __LINE__,r);
assert(r==0);
r = open_brt(n1, 0, 0, &t1, 1<<12, ct); assert(r==0);
r = open_brt(n1, 0, 0, &t1, 1<<12, ct, default_compare_fun); assert(r==0);
for (i=0; i<10000; i++) {
char key[100],val[100];
bytevec actualval;
ITEMLEN actuallen;
DBT k,actual;
snprintf(key, 100, "key%d", i);
snprintf(val, 100, "val%d", i);
r=brt_lookup(t0, key, 1+strlen(key), &actualval, &actuallen);
r=brt_lookup(t0, fill_dbt(&k, key, 1+strlen(key)), init_dbt(&actual), 0);
assert(r==0);
assert(strcmp(val,actualval)==0);
assert(actuallen==1+strlen(val));
assert(strcmp(val,actual.data)==0);
assert(actual.size==1+strlen(val));
snprintf(val, 100, "Val%d", i);
r=brt_lookup(t1, key, 1+strlen(key), &actualval, &actuallen);
r=brt_lookup(t1, fill_dbt(&k, key, 1+strlen(key)), init_dbt(&actual), 0);
assert(r==0);
assert(strcmp(val,actualval)==0);
assert(actuallen==1+strlen(val));
assert(strcmp(val,actual.data)==0);
assert(actual.size==1+strlen(val));
}
r = close_brt(t0); assert(r==0);
......@@ -295,14 +299,17 @@ static void test_named_db (void) {
CACHETABLE ct;
BRT t0;
int r;
DBT k,v;
printf("test_named_db\n");
unlink(n0);
unlink(n1);
memory_check_all_free();
r = brt_create_cachetable(&ct, 0); assert(r==0);
r = open_brt(n0, "db1", 1, &t0, 1<<12, ct); assert(r==0);
r = open_brt(n0, "db1", 1, &t0, 1<<12, ct, default_compare_fun); assert(r==0);
brt_insert(t0, "good", 5, "day", 4); assert(r==0);
brt_insert(t0, fill_dbt(&k, "good", 5), fill_dbt(&v, "day", 4), 0); assert(r==0);
r = close_brt(t0); assert(r==0);
r = cachetable_close(ct); assert(r==0);
......@@ -310,15 +317,13 @@ static void test_named_db (void) {
memory_check_all_free();
r = brt_create_cachetable(&ct, 0); assert(r==0);
r = open_brt(n0, "db1", 0, &t0, 1<<12, ct); assert(r==0);
r = open_brt(n0, "db1", 0, &t0, 1<<12, ct, default_compare_fun); assert(r==0);
{
bytevec val;
ITEMLEN vallen;
r = brt_lookup(t0, "good", 5, &val, &vallen);
r = brt_lookup(t0, fill_dbt(&k, "good", 5), init_dbt(&v), 0);
assert(r==0);
assert(vallen==4);
assert(strcmp(val,"day")==0);
assert(v.size==4);
assert(strcmp(v.data,"day")==0);
}
r = close_brt(t0); assert(r==0);
......@@ -332,16 +337,17 @@ static void test_multiple_dbs (void) {
CACHETABLE ct;
BRT t0,t1;
int r;
DBT k,v;
printf("test_multiple_dbs: ");
unlink(n0);
unlink(n1);
memory_check_all_free();
r = brt_create_cachetable(&ct, 0); assert(r==0);
r = open_brt(n0, "db1", 1, &t0, 1<<12, ct); assert(r==0);
r = open_brt(n1, "db2", 1, &t1, 1<<12, ct); assert(r==0);
r = open_brt(n0, "db1", 1, &t0, 1<<12, ct, default_compare_fun); assert(r==0);
r = open_brt(n1, "db2", 1, &t1, 1<<12, ct, default_compare_fun); assert(r==0);
brt_insert(t0, "good", 5, "grief", 6); assert(r==0);
brt_insert(t1, "bad", 4, "night", 6); assert(r==0);
brt_insert(t0, fill_dbt(&k, "good", 5), fill_dbt(&v, "grief", 6), 0); assert(r==0);
brt_insert(t1, fill_dbt(&k, "bad", 4), fill_dbt(&v, "night", 6), 0); assert(r==0);
r = close_brt(t0); assert(r==0);
r = close_brt(t1); assert(r==0);
......@@ -350,27 +356,25 @@ static void test_multiple_dbs (void) {
memory_check_all_free();
r = brt_create_cachetable(&ct, 0); assert(r==0);
r = open_brt(n0, "db1", 0, &t0, 1<<12, ct); assert(r==0);
r = open_brt(n1, "db2", 0, &t1, 1<<12, ct); assert(r==0);
r = open_brt(n0, "db1", 0, &t0, 1<<12, ct, default_compare_fun); assert(r==0);
r = open_brt(n1, "db2", 0, &t1, 1<<12, ct, default_compare_fun); assert(r==0);
{
bytevec val;
ITEMLEN vallen;
r = brt_lookup(t0, "good", 5, &val, &vallen);
r = brt_lookup(t0, fill_dbt(&k, "good", 5), init_dbt(&v), 0);
assert(r==0);
assert(vallen==6);
assert(strcmp(val,"grief")==0);
assert(v.size==6);
assert(strcmp(v.data,"grief")==0);
r = brt_lookup(t1, "good", 5, &val, &vallen);
r = brt_lookup(t1, fill_dbt(&k, "good", 5), init_dbt(&v), 0);
assert(r!=0);
r = brt_lookup(t0, "bad", 4, &val, &vallen);
r = brt_lookup(t0, fill_dbt(&k, "bad", 4), init_dbt(&v), 0);
assert(r!=0);
r = brt_lookup(t1, "bad", 4, &val, &vallen);
r = brt_lookup(t1, fill_dbt(&k, "bad", 4), init_dbt(&v), 0);
assert(r==0);
assert(vallen==6);
assert(strcmp(val,"night")==0);
assert(v.size==6);
assert(strcmp(v.data,"night")==0);
}
r = close_brt(t0); assert(r==0);
......@@ -395,14 +399,15 @@ static void test_multiple_dbs_many (void) {
for (i=0; i<MANYN; i++) {
char dbname[20];
snprintf(dbname, 20, "db%d", i);
r = open_brt(name, dbname, 1, &trees[i], 1<<12, ct);
r = open_brt(name, dbname, 1, &trees[i], 1<<12, ct, default_compare_fun);
assert(r==0);
}
for (i=0; i<MANYN; i++) {
char k[20], v[20];
DBT kdbt,vdbt;
snprintf(k, 20, "key%d", i);
snprintf(v, 20, "val%d", i);
brt_insert(trees[i], k, strlen(k)+1, v, strlen(v)+1);
brt_insert(trees[i], fill_dbt(&kdbt, k, strlen(k)+1), fill_dbt(&vdbt, v, strlen(v)+1), 0);
}
for (i=0; i<MANYN; i++) {
r = close_brt(trees[i]); assert(r==0);
......@@ -423,25 +428,25 @@ static void test_multiple_brts_one_db_one_file (void) {
unlink(name);
r = brt_create_cachetable(&ct, 32); assert(r==0);
for (i=0; i<MANYN; i++) {
r = open_brt(name, 0, (i==0), &trees[i], 1<<12, ct);
r = open_brt(name, 0, (i==0), &trees[i], 1<<12, ct, default_compare_fun);
assert(r==0);
}
for (i=0; i<MANYN; i++) {
char k[20], v[20];
DBT kb, vb;
snprintf(k, 20, "key%d", i);
snprintf(v, 20, "val%d", i);
brt_insert(trees[i], k, strlen(k)+1, v, strlen(v)+1);
brt_insert(trees[i], fill_dbt(&kb, k, strlen(k)+1), fill_dbt(&vb, v, strlen(v)+1), 0);
}
for (i=0; i<MANYN; i++) {
char k[20],vexpect[20];
bytevec v;
ITEMLEN vlen;
DBT kb, vb;
snprintf(k, 20, "key%d", i);
snprintf(vexpect, 20, "val%d", i);
r=brt_lookup(trees[0], k, strlen(k)+1, &v, &vlen);
r=brt_lookup(trees[0], fill_dbt(&kb, k, strlen(k)+1), init_dbt(&vb), 0);
assert(r==0);
assert(vlen==1+strlen(vexpect));
assert(strcmp(v, vexpect)==0);
assert(vb.size==1+strlen(vexpect));
assert(strcmp(vb.data, vexpect)==0);
}
for (i=0; i<MANYN; i++) {
r=close_brt(trees[i]); assert(r==0);
......@@ -459,6 +464,7 @@ static void test_read_what_was_written (void) {
BRT brt;
int r;
const int NVALS=10000;
DBT k,v;
printf("test_read_what_was_written(): "); fflush(stdout);
......@@ -466,7 +472,7 @@ static void test_read_what_was_written (void) {
memory_check_all_free();
r = brt_create_cachetable(&ct, 0); assert(r==0);
r = open_brt(n, 0, 1, &brt, 1<<12, ct); assert(r==0);
r = open_brt(n, 0, 1, &brt, 1<<12, ct, default_compare_fun); assert(r==0);
r = close_brt(brt); assert(r==0);
r = cachetable_close(ct); assert(r==0);
......@@ -474,10 +480,10 @@ static void test_read_what_was_written (void) {
/* Now see if we can read an empty tree in. */
r = brt_create_cachetable(&ct, 0); assert(r==0);
r = open_brt(n, 0, 0, &brt, 1<<12, ct); assert(r==0);
r = open_brt(n, 0, 0, &brt, 1<<12, ct, default_compare_fun); assert(r==0);
/* See if we can put something in it. */
brt_insert(brt, "hello", 6, "there", 6);
brt_insert(brt, fill_dbt(&k, "hello", 6), fill_dbt(&v, "there", 6), 0);
r = close_brt(brt); assert(r==0);
r = cachetable_close(ct); assert(r==0);
......@@ -486,15 +492,13 @@ static void test_read_what_was_written (void) {
/* Now see if we can read it in and get the value. */
r = brt_create_cachetable(&ct, 0); assert(r==0);
r = open_brt(n, 0, 0, &brt, 1<<12, ct); assert(r==0);
r = open_brt(n, 0, 0, &brt, 1<<12, ct, default_compare_fun); assert(r==0);
{
bytevec val;
ITEMLEN vallen;
r = brt_lookup(brt, "hello", 6, &val, &vallen);
r = brt_lookup(brt, fill_dbt(&k, "hello", 6), init_dbt(&v), 0);
assert(r==0);
assert(vallen==6);
assert(strcmp(val,"there")==0);
assert(v.size==6);
assert(strcmp(v.data,"there")==0);
}
assert(verify_brt(brt)==0);
......@@ -504,13 +508,14 @@ static void test_read_what_was_written (void) {
int i;
for (i=0; i<NVALS; i++) {
char key[100],val[100];
DBT k,v;
snprintf(key, 100, "key%d", i);
snprintf(val, 100, "val%d", i);
if (i<600) {
int verify_result=verify_brt(brt);;
assert(verify_result==0);
}
brt_insert(brt, key, strlen(key)+1, val, strlen(val)+1);
brt_insert(brt, fill_dbt(&k, key, strlen(key)+1), fill_dbt(&v, val, strlen(val)+1), 0);
if (i<600) {
int verify_result=verify_brt(brt);
if (verify_result) {
......@@ -521,11 +526,9 @@ static void test_read_what_was_written (void) {
int j;
for (j=0; j<=i; j++) {
char expectedval[100];
bytevec val;
ITEMLEN vallen;
snprintf(key, 100, "key%d", j);
snprintf(expectedval, 100, "val%d", j);
r=brt_lookup(brt, key, strlen(key)+1, &val, &vallen);
r=brt_lookup(brt, fill_dbt(&k, key, strlen(key)+1), init_dbt(&v), 0);
if (r!=0) {
printf("%s:%d r=%d on lookup(key=%s) after i=%d\n", __FILE__, __LINE__, r, key, i);
dump_brt(brt);
......@@ -547,11 +550,9 @@ static void test_read_what_was_written (void) {
int i;
for (i=0; i<NVALS; i++) {
char key[100],expectedval[100];
bytevec val;
ITEMLEN vallen;
snprintf(key, 100, "key%d", i);
snprintf(expectedval, 100, "val%d", i);
r=brt_lookup(brt, key, strlen(key)+1, &val, &vallen);
r=brt_lookup(brt, fill_dbt(&k, key, strlen(key)+1), init_dbt(&v), 0);
if (r!=0) printf("%s:%d r=%d on key=%s\n", __FILE__, __LINE__, r, key);
assert(r==0);
......@@ -564,25 +565,21 @@ static void test_read_what_was_written (void) {
memory_check_all_free();
r = brt_create_cachetable(&ct, 0); assert(r==0);
r = open_brt(n, 0, 0, &brt, 1<<12, ct); assert(r==0);
r = open_brt(n, 0, 0, &brt, 1<<12, ct, default_compare_fun); assert(r==0);
{
bytevec val;
ITEMLEN vallen;
r = brt_lookup(brt, "hello", 6, &val, &vallen);
r = brt_lookup(brt, fill_dbt(&k, "hello", 6), init_dbt(&v), 0);
assert(r==0);
assert(vallen==6);
assert(strcmp(val,"there")==0);
assert(v.size==6);
assert(strcmp(v.data,"there")==0);
}
{
int i;
for (i=0; i<NVALS; i++) {
char key[100],expectedval[100];
bytevec val;
ITEMLEN vallen;
snprintf(key, 100, "key%d", i);
snprintf(expectedval, 100, "val%d", i);
r=brt_lookup(brt, key, strlen(key)+1, &val, &vallen);
r=brt_lookup(brt, fill_dbt(&k, key, strlen(key)+1), init_dbt(&v), 0);
if (r!=0) printf("%s:%d r=%d on key=%s\n", __FILE__, __LINE__, r, key);
assert(r==0);
......@@ -614,12 +611,12 @@ void test_cursor_last_empty(void) {
//printf("%s:%d %d alloced\n", __FILE__, __LINE__, get_n_items_malloced()); print_malloced_items();
r = brt_create_cachetable(&ct, 0); assert(r==0);
//printf("%s:%d %d alloced\n", __FILE__, __LINE__, get_n_items_malloced()); print_malloced_items();
r = open_brt(n, 0, 1, &brt, 1<<12, ct); assert(r==0);
r = open_brt(n, 0, 1, &brt, 1<<12, ct, default_compare_fun); assert(r==0);
//printf("%s:%d %d alloced\n", __FILE__, __LINE__, get_n_items_malloced()); print_malloced_items();
r = brt_cursor(brt, &cursor); assert(r==0);
r = ybt_init(&kbt); assert(r==0);
init_dbt(&kbt);
//printf("%s:%d %d alloced\n", __FILE__, __LINE__, get_n_items_malloced()); print_malloced_items();
r = ybt_init(&vbt); assert(r==0);
init_dbt(&vbt);
//printf("%s:%d %d alloced\n", __FILE__, __LINE__, get_n_items_malloced()); print_malloced_items();
r = brt_c_get(cursor, &kbt, &vbt, DB_LAST);
//printf("%s:%d %d alloced\n", __FILE__, __LINE__, get_n_items_malloced()); print_malloced_items();
......@@ -646,15 +643,15 @@ void test_cursor_next (void) {
memory_check_all_free();
r = brt_create_cachetable(&ct, 0); assert(r==0);
//printf("%s:%d %d alloced\n", __FILE__, __LINE__, get_n_items_malloced()); print_malloced_items();
r = open_brt(n, 0, 1, &brt, 1<<12, ct); assert(r==0);
r = open_brt(n, 0, 1, &brt, 1<<12, ct, default_compare_fun); assert(r==0);
//printf("%s:%d %d alloced\n", __FILE__, __LINE__, get_n_items_malloced()); print_malloced_items();
r = brt_insert(brt, "hello", 6, "there", 6);
r = brt_insert(brt, "byebye", 7, "byenow", 7);
r = brt_insert(brt, fill_dbt(&kbt, "hello", 6), fill_dbt(&vbt, "there", 6), 0);
r = brt_insert(brt, fill_dbt(&kbt, "byebye", 7), fill_dbt(&vbt, "byenow", 7), 0);
printf("%s:%d calling brt_cursor(...)\n", __FILE__, __LINE__);
r = brt_cursor(brt, &cursor); assert(r==0);
r = ybt_init(&kbt); assert(r==0);
init_dbt(&kbt);
//printf("%s:%d %d alloced\n", __FILE__, __LINE__, get_n_items_malloced()); print_malloced_items();
r = ybt_init(&vbt); assert(r==0);
init_dbt(&vbt);
//printf("%s:%d %d alloced\n", __FILE__, __LINE__, get_n_items_malloced()); print_malloced_items();
printf("%s:%d calling brt_c_get(...)\n", __FILE__, __LINE__);
......
......@@ -213,7 +213,7 @@ static void initialize_brtnode (BRT t, BRTNODE n, diskoff nodename, int height)
}
n->u.n.n_bytes_in_hashtables = 0;
} else {
int r = pma_create(&n->u.l.buffer);
int r = pma_create(&n->u.l.buffer, t->compare_fun);
static int rcount=0;
assert(r==0);
//printf("%s:%d n PMA= %p (rcount=%d)\n", __FILE__, __LINE__, n->u.l.buffer, rcount);
......@@ -262,16 +262,16 @@ void delete_node (BRT t, BRTNODE node) {
}
static void insert_to_buffer_in_leaf (BRTNODE node, bytevec key, unsigned int keylen, bytevec val, unsigned int vallen) {
unsigned int n_bytes_added = KEY_VALUE_OVERHEAD + keylen + vallen;
int r = pma_insert(node->u.l.buffer, key, keylen, val, vallen);
static void insert_to_buffer_in_leaf (BRTNODE node, DBT *k, DBT *v, DB *db) {
unsigned int n_bytes_added = KEY_VALUE_OVERHEAD + k->size + v->size;
int r = pma_insert(node->u.l.buffer, k, v, db);
assert(r==0);
node->u.l.n_bytes_in_buffer += n_bytes_added;
}
static int insert_to_hash_in_nonleaf (BRTNODE node, int childnum, bytevec key, unsigned int keylen, bytevec val, unsigned int vallen) {
unsigned int n_bytes_added = KEY_VALUE_OVERHEAD + keylen + vallen;
int r = hash_insert(node->u.n.htables[childnum], key, keylen, val, vallen);
static int insert_to_hash_in_nonleaf (BRTNODE node, int childnum, DBT *k, DBT *v) {
unsigned int n_bytes_added = KEY_VALUE_OVERHEAD + k->size + v->size;
int r = hash_insert(node->u.n.htables[childnum], k->data, k->size, v->data, v->size);
if (r!=0) return r;
node->u.n.n_bytes_in_hashtable[childnum] += n_bytes_added;
node->u.n.n_bytes_in_hashtables += n_bytes_added;
......@@ -279,7 +279,7 @@ static int insert_to_hash_in_nonleaf (BRTNODE node, int childnum, bytevec key, u
}
int brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, bytevec*splitkey, ITEMLEN *splitkeylen) {
int brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, DB *db) {
int did_split=0;
BRTNODE A,B;
assert(node->height==0);
......@@ -296,15 +296,15 @@ int brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, bytevec*
assert(node->height>0 || node->u.l.buffer!=0);
PMA_ITERATE(node->u.l.buffer, key, keylen, val, vallen,
({
DBT k,v;
if (!did_split) {
insert_to_buffer_in_leaf(A, key, keylen, val, vallen);
insert_to_buffer_in_leaf(A, fill_dbt(&k, key, keylen), fill_dbt(&v, val, vallen), db);
if (A->u.l.n_bytes_in_buffer *2 >= node->u.l.n_bytes_in_buffer) {
*splitkey = memdup(key, keylen);
*splitkeylen = keylen;
fill_dbt(splitk, memdup(key, keylen), keylen);
did_split=1;
}
} else {
insert_to_buffer_in_leaf(B, key, keylen, val, vallen);
insert_to_buffer_in_leaf(B, fill_dbt(&k, key, keylen), fill_dbt(&v, val, vallen), db);
}
}));
assert(node->height>0 || node->u.l.buffer!=0);
......@@ -320,7 +320,8 @@ int brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, bytevec*
return 0;
}
void brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, bytevec*splitkey, ITEMLEN *splitkeylen) {
/* Side effect: sets splitk->data pointer to a malloc'd value */
void brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk) {
int n_children_in_a = node->u.n.n_children/2;
BRTNODE A,B;
assert(node->height>0);
......@@ -363,8 +364,8 @@ void brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, byt
node->u.n.childkeys[i] = 0;
node->u.n.childkeylens[i] = 0;
}
*splitkey = node->u.n.childkeys[n_children_in_a-1];
*splitkeylen = node->u.n.childkeylens[n_children_in_a-1];
splitk->data = (void*)(node->u.n.childkeys[n_children_in_a-1]);
splitk->size = node->u.n.childkeylens[n_children_in_a-1];
node->u.n.totalchildkeylens -= node->u.n.childkeylens[n_children_in_a-1];
node->u.n.childkeys[n_children_in_a-1]=0;
node->u.n.childkeylens[n_children_in_a-1]=0;
......@@ -470,60 +471,70 @@ void find_heaviest_data (BRTNODE node, int *childnum_ret, KVPAIR *pairs_ret, int
}
#endif
static int brtnode_insert (BRT t, BRTNODE node, bytevec key, ITEMLEN keylen, bytevec val, ITEMLEN vallen,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, bytevec*splitkey, ITEMLEN *splitkeylen,
int debug);
static int brtnode_insert (BRT t, BRTNODE node, DBT *k, DBT *v,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb,
DBT *split,
int debug,
DB *db);
/* key is not in the hashtable in node. Either put the key-value pair in the child, or put it in the node. */
static int push_kvpair_down_only_if_it_wont_push_more_else_put_here (BRT t, BRTNODE node, BRTNODE child,
bytevec key, ITEMLEN keylen, bytevec val, ITEMLEN vallen,
int childnum_of_node) {
DBT *k, DBT *v,
int childnum_of_node,
DB *db) {
assert(node->height>0); /* Not a leaf. */
int to_child=serialize_brtnode_size(child)+keylen+vallen+KEY_VALUE_OVERHEAD <= child->nodesize;
int to_child=serialize_brtnode_size(child)+k->size+v->size+KEY_VALUE_OVERHEAD <= child->nodesize;
if (brt_debug_mode) {
printf("%s:%d pushing %s to %s %d", __FILE__, __LINE__, (char*)key, to_child? "child" : "hash", childnum_of_node);
printf("%s:%d pushing %s to %s %d", __FILE__, __LINE__, (char*)k->data, to_child? "child" : "hash", childnum_of_node);
if (childnum_of_node+1<node->u.n.n_children) {
DBT k2;
printf(" nextsplitkey=%s\n", (char*)node->u.n.childkeys[childnum_of_node]);
assert(keycompare(key, keylen, node->u.n.childkeys[childnum_of_node], node->u.n.childkeylens[childnum_of_node])<=0);
assert(t->compare_fun(db, k, fill_dbt(&k2, node->u.n.childkeys[childnum_of_node], node->u.n.childkeylens[childnum_of_node]))<=0);
} else {
printf("\n");
}
}
if (to_child) {
int again_split=-1; BRTNODE againa,againb; bytevec againkey; ITEMLEN againlen;
int again_split=-1; BRTNODE againa,againb;
DBT againk;
init_dbt(&againk);
//printf("%s:%d hello!\n", __FILE__, __LINE__);
int r = brtnode_insert(t, child, key, keylen, val, vallen,
&again_split, &againa, &againb, &againkey, &againlen,
0);
int r = brtnode_insert(t, child, k, v,
&again_split, &againa, &againb, &againk,
0,
db);
if (r!=0) return r;
assert(again_split==0); /* I only did the insert if I knew it wouldn't push down, and hence wouldn't split. */
return r;
} else {
int r=insert_to_hash_in_nonleaf(node, childnum_of_node, key, keylen, val, vallen);
int r=insert_to_hash_in_nonleaf(node, childnum_of_node, k, v);
return r;
}
}
static int push_a_kvpair_down (BRT t, BRTNODE node, BRTNODE child, int childnum,
bytevec key, ITEMLEN keylen, bytevec val, ITEMLEN vallen,
int *child_did_split, BRTNODE *childa, BRTNODE *childb, bytevec*childsplitkey, ITEMLEN *childsplitkeylen) {
DBT *k, DBT *v,
int *child_did_split, BRTNODE *childa, BRTNODE *childb,
DBT *childsplitk,
DB *db) {
//if (debug) printf("%s:%d %*sinserting down\n", __FILE__, __LINE__, debug, "");
//printf("%s:%d hello!\n", __FILE__, __LINE__);
assert(node->height>0);
{
int r = brtnode_insert(t, child, key, keylen, val, vallen,
child_did_split, childa, childb, childsplitkey, childsplitkeylen,
0);
int r = brtnode_insert(t, child, k, v,
child_did_split, childa, childb, childsplitk,
0,
db);
if (r!=0) return r;
}
//if (debug) printf("%s:%d %*sinserted down child_did_split=%d\n", __FILE__, __LINE__, debug, "", child_did_split);
{
int r = hash_delete(node->u.n.htables[childnum], key, keylen); // Must delete after doing the insert, to avoid operating on freed' key
int r = hash_delete(node->u.n.htables[childnum], k->data, k->size); // Must delete after doing the insert, to avoid operating on freed' key
if (r!=0) return r;
}
{
int n_bytes_removed = (keylen + vallen + KEY_VALUE_OVERHEAD);
int n_bytes_removed = (k->size + k->size + KEY_VALUE_OVERHEAD);
node->u.n.n_bytes_in_hashtables -= n_bytes_removed;
node->u.n.n_bytes_in_hashtable[childnum] -= n_bytes_removed;
}
......@@ -540,8 +551,11 @@ int split_count=0;
* We also unpin the new children.
*/
static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
BRTNODE childa, BRTNODE childb, bytevec childsplitkey, ITEMLEN childsplitkeylen,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, bytevec*splitkey, ITEMLEN *splitkeylen) {
BRTNODE childa, BRTNODE childb,
DBT *childsplitk, /* the data in the childsplitk is alloc'd and is consumed by this call. */
int *did_split, BRTNODE *nodea, BRTNODE *nodeb,
DBT *splitk,
DB *db) {
assert(node->height>0);
HASHTABLE old_h = node->u.n.htables[childnum];
int old_count = node->u.n.n_bytes_in_hashtable[childnum];
......@@ -551,7 +565,7 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
if (brt_debug_mode) {
int i;
printf("%s:%d Child %d did split on %s\n", __FILE__, __LINE__, childnum, (char*)childsplitkey);
printf("%s:%d Child %d did split on %s\n", __FILE__, __LINE__, childnum, (char*)childsplitk->data);
printf("%s:%d oldsplitkeys:", __FILE__, __LINE__);
for(i=0; i<node->u.n.n_children-1; i++) printf(" %s", (char*)node->u.n.childkeys[i]);
printf("\n");
......@@ -574,9 +588,9 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
node->u.n.childkeys[cnum] = node->u.n.childkeys[cnum-1];
node->u.n.childkeylens[cnum] = node->u.n.childkeylens[cnum-1];
}
node->u.n.childkeys[childnum]=childsplitkey;
node->u.n.childkeylens[childnum]= childsplitkeylen;
node->u.n.totalchildkeylens += childsplitkeylen;
node->u.n.childkeys[childnum]= (char*)childsplitk->data;
node->u.n.childkeylens[childnum]= childsplitk->size;
node->u.n.totalchildkeylens += childsplitk->size;
node->u.n.n_children++;
if (brt_debug_mode) {
......@@ -589,10 +603,13 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
node->u.n.n_bytes_in_hashtables -= old_count; /* By default, they are all removed. We might add them back in. */
/* Keep pushing to the children, but not if the children would require a pushdown */
HASHTABLE_ITERATE(old_h, skey, skeylen, sval, svallen, ({
if (keycompare(skey, skeylen, childsplitkey, childsplitkeylen)<=0) {
r=push_kvpair_down_only_if_it_wont_push_more_else_put_here(t, node, childa, skey, skeylen, sval, svallen, childnum);
DBT skd, svd;
fill_dbt(&skd, skey, skeylen); skd.app_private=childsplitk->app_private;
fill_dbt(&svd, sval, svallen);
if (t->compare_fun(db, &skd, childsplitk)<=0) {
r=push_kvpair_down_only_if_it_wont_push_more_else_put_here(t, node, childa, &skd, &svd, childnum, db);
} else {
r=push_kvpair_down_only_if_it_wont_push_more_else_put_here(t, node, childb, skey, skeylen, sval, svallen, childnum+1);
r=push_kvpair_down_only_if_it_wont_push_more_else_put_here(t, node, childb, &skd, &svd, childnum+1, db);
}
if (r!=0) return r;
}));
......@@ -610,7 +627,7 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
if (node->u.n.n_children>TREE_FANOUT) {
//printf("%s:%d about to split having pushed %d out of %d keys\n", __FILE__, __LINE__, i, n_pairs);
brt_nonleaf_split(t, node, nodea, nodeb, splitkey, splitkeylen);
brt_nonleaf_split(t, node, nodea, nodeb, splitk);
//printf("%s:%d did split\n", __FILE__, __LINE__);
split_count++;
*did_split=1;
......@@ -630,8 +647,10 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
}
static int push_some_kvpairs_down (BRT t, BRTNODE node, int childnum,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, bytevec *splitkey, ITEMLEN *splitkeylen,
int debug) {
int *did_split, BRTNODE *nodea, BRTNODE *nodeb,
DBT *splitk,
int debug,
DB *db) {
void *childnode_v;
BRTNODE child;
int r;
......@@ -654,18 +673,24 @@ static int push_some_kvpairs_down (BRT t, BRTNODE node, int childnum,
bytevec key,val;
ITEMLEN keylen, vallen;
while(0==hashtable_random_pick(node->u.n.htables[childnum], &key, &keylen, &val, &vallen)) {
int child_did_split=0; BRTNODE childa, childb; bytevec childsplitkey; ITEMLEN childsplitkeylen;
int child_did_split=0; BRTNODE childa, childb;
DBT hk,hv;
DBT childsplitk;
init_dbt(&childsplitk);
childsplitk.app_private = splitk->app_private;
if (debug) printf("%s:%d %*spush down %s\n", __FILE__, __LINE__, debug, "", (char*)key);
r = push_a_kvpair_down (t, node, child, childnum,
key, keylen, val, vallen,
&child_did_split, &childa, &childb, &childsplitkey, &childsplitkeylen);
fill_dbt(&hk, key, keylen), fill_dbt(&hv, val, vallen),
&child_did_split, &childa, &childb,
&childsplitk,
db);
if (r!=0) return r;
if (child_did_split) {
// If the child splits, we don't push down any further.
if (debug) printf("%s:%d %*shandle split splitkey=%s\n", __FILE__, __LINE__, debug, "", (char*)childsplitkey);
if (debug) printf("%s:%d %*shandle split splitkey=%s\n", __FILE__, __LINE__, debug, "", (char*)childsplitk.data);
r=handle_split_of_child (t, node, childnum,
childa, childb, childsplitkey, childsplitkeylen,
did_split, nodea, nodeb, splitkey, splitkeylen);
childa, childb, &childsplitk,
did_split, nodea, nodeb, splitk, db);
return r; /* Don't do any more pushing if the child splits. */
}
}
......@@ -681,7 +706,7 @@ int debugp1 (int debug) {
return debug ? debug+1 : 0;
}
static int brtnode_maybe_push_down(BRT t, BRTNODE node, int *did_split, BRTNODE *nodea, BRTNODE *nodeb, bytevec *splitkey, ITEMLEN *splitkeylen, int debug)
static int brtnode_maybe_push_down(BRT t, BRTNODE node, int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, int debug, DB *db)
/* If the buffer is too full, then push down. Possibly the child will split. That may make us split. */
{
assert(node->height>0);
......@@ -697,7 +722,7 @@ static int brtnode_maybe_push_down(BRT t, BRTNODE node, int *did_split, BRTNODE
find_heaviest_child(node, &childnum);
if (debug) printf("%s:%d %*spush some down from %lld into %lld\n", __FILE__, __LINE__, debug, "", node->thisnodename, node->u.n.children[childnum]);
assert(node->u.n.children[childnum]!=0);
int r = push_some_kvpairs_down(t, node, childnum, did_split, nodea, nodeb, splitkey, splitkeylen, debugp1(debug));
int r = push_some_kvpairs_down(t, node, childnum, did_split, nodea, nodeb, splitk, debugp1(debug), db);
if (r!=0) return r;
assert(*did_split==0 || *did_split==1);
if (debug) printf("%s:%d %*sdid push_some_kvpairs_down did_split=%d\n", __FILE__, __LINE__, debug, "", *did_split);
......@@ -719,22 +744,22 @@ static int brtnode_maybe_push_down(BRT t, BRTNODE node, int *did_split, BRTNODE
return 0;
}
static int brt_leaf_insert (BRT t, BRTNODE node, bytevec key, ITEMLEN keylen, bytevec val, ITEMLEN vallen,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, bytevec*splitkey, ITEMLEN *splitkeylen,
int debug) {
bytevec olddata;
ITEMLEN olddatalen;
enum pma_errors pma_status = pma_lookup(node->u.l.buffer, key, keylen, &olddata, &olddatalen);
static int brt_leaf_insert (BRT t, BRTNODE node, DBT *k, DBT *v,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
int debug,
DB *db) {
DBT v2;
enum pma_errors pma_status = pma_lookup(node->u.l.buffer, k, init_dbt(&v2), db);
if (pma_status==BRT_OK) {
pma_status = pma_delete(node->u.l.buffer, key, keylen);
pma_status = pma_delete(node->u.l.buffer, k, db);
assert(pma_status==BRT_OK);
node->u.l.n_bytes_in_buffer -= keylen + olddatalen + KEY_VALUE_OVERHEAD;
node->u.l.n_bytes_in_buffer -= k->size + v2.size + KEY_VALUE_OVERHEAD;
}
pma_status = pma_insert(node->u.l.buffer, key, keylen, val, vallen);
node->u.l.n_bytes_in_buffer += keylen + vallen + KEY_VALUE_OVERHEAD;
pma_status = pma_insert(node->u.l.buffer, k, v, db);
node->u.l.n_bytes_in_buffer += k->size + v->size + KEY_VALUE_OVERHEAD;
// If it doesn't fit, then split the leaf.
if (serialize_brtnode_size(node) > node->nodesize) {
int r = brtleaf_split (t, node, nodea, nodeb, splitkey, splitkeylen);
int r = brtleaf_split (t, node, nodea, nodeb, splitk, db);
if (r!=0) return r;
//printf("%s:%d splitkey=%s\n", __FILE__, __LINE__, (char*)*splitkey);
split_count++;
......@@ -749,14 +774,28 @@ static int brt_leaf_insert (BRT t, BRTNODE node, bytevec key, ITEMLEN keylen, by
return 0;
}
static int brt_nonleaf_insert (BRT t, BRTNODE node, bytevec key, ITEMLEN keylen, bytevec val, ITEMLEN vallen,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, bytevec*splitkey, ITEMLEN *splitkeylen,
int debug) {
static unsigned int brtnode_which_child (BRTNODE node , DBT *k, BRT t, DB *db) {
int i;
assert(node->height>0);
for (i=0; i<node->u.n.n_children-1; i++) {
DBT k2;
if (t->compare_fun(db, k, fill_dbt(&k2, node->u.n.childkeys[i], node->u.n.childkeylens[i]))<=0) {
return i;
}
}
return node->u.n.n_children-1;
}
static int brt_nonleaf_insert (BRT t, BRTNODE node, DBT *k, DBT *v,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb,
DBT *splitk,
int debug,
DB *db) {
bytevec olddata;
ITEMLEN olddatalen;
unsigned int childnum = brtnode_which_child(node, key, keylen);
int found = !hash_find(node->u.n.htables[childnum], key, keylen, &olddata, &olddatalen);
unsigned int childnum = brtnode_which_child(node, k, t, db);
int found = !hash_find(node->u.n.htables[childnum], k->data, k->size, &olddata, &olddatalen);
if (0) { // It is faster to do this, except on yobiduck where things grind to a halt.
void *child_v;
......@@ -765,8 +804,8 @@ static int brt_nonleaf_insert (BRT t, BRTNODE node, bytevec key, ITEMLEN keylen,
/* If the child is in memory, then go ahead and put it in the child. */
BRTNODE child = child_v;
if (found) {
int diff = keylen + olddatalen + KEY_VALUE_OVERHEAD;
int r = hash_delete(node->u.n.htables[childnum], key, keylen);
int diff = k->size + olddatalen + KEY_VALUE_OVERHEAD;
int r = hash_delete(node->u.n.htables[childnum], k->data, k->size);
assert(r==0);
node->u.n.n_bytes_in_hashtables -= diff;
node->u.n.n_bytes_in_hashtable[childnum] -= diff;
......@@ -774,15 +813,14 @@ static int brt_nonleaf_insert (BRT t, BRTNODE node, bytevec key, ITEMLEN keylen,
{
int child_did_split;
BRTNODE childa, childb;
bytevec childsplitkey;
ITEMLEN childsplitkeylen;
int r = brtnode_insert(t, child, key, keylen, val, vallen,
&child_did_split, &childa, &childb, &childsplitkey, &childsplitkeylen, 0);
DBT childsplitk;
int r = brtnode_insert(t, child, k, v,
&child_did_split, &childa, &childb, &childsplitk, 0, db);
if (r!=0) return r;
if (child_did_split) {
r=handle_split_of_child(t, node, childnum,
childa, childb, childsplitkey, childsplitkeylen,
did_split, nodea, nodeb, splitkey, splitkeylen);
childa, childb, &childsplitk,
did_split, nodea, nodeb, splitk, db);
if (r!=0) return r;
} else {
cachetable_unpin(t->cf, child->thisnodename, 1);
......@@ -796,23 +834,23 @@ static int brt_nonleaf_insert (BRT t, BRTNODE node, bytevec key, ITEMLEN keylen,
if (debug) printf("%s:%d %*sDoing hash_insert\n", __FILE__, __LINE__, debug, "");
verify_counts(node);
if (found) {
int r = hash_delete(node->u.n.htables[childnum], key, keylen);
int diff = keylen + olddatalen + KEY_VALUE_OVERHEAD;
int r = hash_delete(node->u.n.htables[childnum], k->data, k->size);
int diff = k->size + olddatalen + KEY_VALUE_OVERHEAD;
assert(r==0);
node->u.n.n_bytes_in_hashtables -= diff;
node->u.n.n_bytes_in_hashtable[childnum] -= diff;
//printf("%s:%d deleted %d bytes\n", __FILE__, __LINE__, diff);
}
{
int diff = keylen + vallen + KEY_VALUE_OVERHEAD;
int r=hash_insert(node->u.n.htables[childnum], key, keylen, val, vallen);
int diff = k->size + v->size + KEY_VALUE_OVERHEAD;
int r=hash_insert(node->u.n.htables[childnum], k->data, k->size, v->data, v->size);
assert(r==0);
node->u.n.n_bytes_in_hashtables += diff;
node->u.n.n_bytes_in_hashtable[childnum] += diff;
}
if (debug) printf("%s:%d %*sDoing maybe_push_down\n", __FILE__, __LINE__, debug, "");
int r = brtnode_maybe_push_down(t, node, did_split, nodea, nodeb, splitkey, splitkeylen, debugp1(debug));
int r = brtnode_maybe_push_down(t, node, did_split, nodea, nodeb, splitk, debugp1(debug), db);
if (r!=0) return r;
if (debug) printf("%s:%d %*sDid maybe_push_down\n", __FILE__, __LINE__, debug, "");
if (*did_split) {
......@@ -832,17 +870,20 @@ static int brt_nonleaf_insert (BRT t, BRTNODE node, bytevec key, ITEMLEN keylen,
}
static int brtnode_insert (BRT t, BRTNODE node, bytevec key, ITEMLEN keylen, bytevec val, ITEMLEN vallen,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, bytevec*splitkey, ITEMLEN *splitkeylen,
int debug) {
static int brtnode_insert (BRT t, BRTNODE node, DBT *k, DBT *v,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
int debug,
DB *db) {
if (node->height==0) {
return brt_leaf_insert(t, node, key, keylen, val, vallen,
did_split, nodea, nodeb, splitkey, splitkeylen,
debug);
return brt_leaf_insert(t, node, k, v,
did_split, nodea, nodeb, splitk,
debug,
db);
} else {
return brt_nonleaf_insert(t, node, key, keylen, val, vallen,
did_split, nodea, nodeb, splitkey, splitkeylen,
debug);
return brt_nonleaf_insert(t, node, k, v,
did_split, nodea, nodeb, splitk,
debug,
db);
}
}
......@@ -889,7 +930,8 @@ static int setup_brt_root_node (BRT t, diskoff offset) {
#define WHEN_BRTTRACE(x) ((void)0)
#endif
int open_brt (const char *fname, const char *dbname, int is_create, BRT *newbrt, int nodesize, CACHETABLE cachetable) {
int open_brt (const char *fname, const char *dbname, int is_create, BRT *newbrt, int nodesize, CACHETABLE cachetable,
int (*compare_fun)(DB*,DBT*,DBT*)) {
/* If dbname is NULL then we setup to hold a single tree. Otherwise we setup an array. */
int r;
BRT t;
......@@ -903,6 +945,8 @@ int open_brt (const char *fname, const char *dbname, int is_create, BRT *newbrt,
if (0) { died0: toku_free(t); }
return r;
}
t->compare_fun = compare_fun;
t->skey = t->sval = 0;
if (dbname) {
malloced_name = mystrdup(dbname);
if (malloced_name==0) {
......@@ -1003,6 +1047,8 @@ int close_brt (BRT brt) {
//printf("%s:%d closing cachetable\n", __FILE__, __LINE__);
if ((r = cachefile_close(brt->cf))!=0) return r;
if (brt->database_name) toku_free(brt->database_name);
free(brt->skey);
free(brt->sval);
toku_free(brt);
return 0;
}
......@@ -1023,12 +1069,13 @@ CACHEKEY* calculate_root_offset_pointer (BRT brt) {
abort();
}
int brt_insert (BRT brt, bytevec key, ITEMLEN keylen, bytevec val, ITEMLEN vallen) {
int brt_insert (BRT brt, DBT *k, DBT *v, DB* db) {
void *node_v;
BRTNODE node;
CACHEKEY *rootp;
int r;
int did_split; BRTNODE nodea=0, nodeb=0; bytevec splitkey; ITEMLEN splitkeylen;
int did_split; BRTNODE nodea=0, nodeb=0;
DBT splitk;
int debug = brt_debug_mode;//strcmp(key,"hello387")==0;
//assert(0==cachetable_assert_all_unpinned(brt->cachetable));
if ((r = read_and_pin_brt_header(brt->cf, &brt->h))) {
......@@ -1043,9 +1090,9 @@ int brt_insert (BRT brt, bytevec key, ITEMLEN keylen, bytevec val, ITEMLEN valle
}
node=node_v;
if (debug) printf("%s:%d node inserting\n", __FILE__, __LINE__);
r = brtnode_insert(brt, node, key, keylen, val, vallen,
&did_split, &nodea, &nodeb, &splitkey, &splitkeylen,
debug);
r = brtnode_insert(brt, node, k, v,
&did_split, &nodea, &nodeb, &splitk,
debug, db);
if (r!=0) return r;
if (debug) printf("%s:%d did_insert\n", __FILE__, __LINE__);
if (did_split) {
......@@ -1064,9 +1111,9 @@ int brt_insert (BRT brt, bytevec key, ITEMLEN keylen, bytevec val, ITEMLEN valle
initialize_brtnode (brt, newroot, newroot_diskoff, nodea->height+1);
newroot->u.n.n_children=2;
//printf("%s:%d Splitkey=%p %s\n", __FILE__, __LINE__, splitkey, splitkey);
newroot->u.n.childkeys[0] = splitkey;
newroot->u.n.childkeylens[0] = splitkeylen;
newroot->u.n.totalchildkeylens=splitkeylen;
newroot->u.n.childkeys[0] = splitk.data;
newroot->u.n.childkeylens[0] = splitk.size;
newroot->u.n.totalchildkeylens=splitk.size;
newroot->u.n.children[0]=nodea->thisnodename;
newroot->u.n.children[1]=nodeb->thisnodename;
r=hashtable_create(&newroot->u.n.htables[0]); if (r!=0) return r;
......@@ -1090,12 +1137,11 @@ int brt_insert (BRT brt, bytevec key, ITEMLEN keylen, bytevec val, ITEMLEN valle
// This is pretty ugly.
static unsigned char lookup_result[1000000];
int brt_lookup_node (BRT brt, diskoff off, bytevec key, ITEMLEN keylen, bytevec *val, ITEMLEN *vallen) {
int brt_lookup_node (BRT brt, diskoff off, DBT *k, DBT *v, DB *db) {
void *node_v;
int r = cachetable_get_and_pin(brt->cf, off, &node_v,
brtnode_flush_callback, brtnode_fetch_callback, (void*)brt->h->nodesize);
bytevec answer;
ITEMLEN answerlen;
DBT answer;
BRTNODE node;
int childnum;
if (r!=0) {
......@@ -1107,36 +1153,38 @@ int brt_lookup_node (BRT brt, diskoff off, bytevec key, ITEMLEN keylen, bytevec
}
node=node_v;
if (node->height==0) {
r = pma_lookup(node->u.l.buffer, key, keylen, &answer, &answerlen);
r = pma_lookup(node->u.l.buffer, k, &answer, db);
//printf("%s:%d looked up something, got answerlen=%d\n", __FILE__, __LINE__, answerlen);
if (r!=0) goto died0;
if (r==0) {
*val = answer;
*vallen = answerlen;
*v = answer;
}
r = cachetable_unpin(brt->cf, off, 0);
return r;
}
childnum = brtnode_which_child(node, key, keylen);
childnum = brtnode_which_child(node, k, brt, db);
// Leaves have a single mdict, where the data is found.
if (hash_find (node->u.n.htables[childnum], key, keylen, &answer, vallen)==0) {
{
bytevec hanswer;
ITEMLEN hanswerlen;
if (hash_find (node->u.n.htables[childnum], k->data, k->size, &hanswer, &hanswerlen)==0) {
//printf("Found %d bytes\n", *vallen);
assert(*vallen<=(int)(sizeof(lookup_result)));
memcpy(lookup_result, answer, *vallen);
assert(hanswerlen<=(int)(sizeof(lookup_result)));
ybt_set_value(v, hanswer, hanswerlen, &brt->sval);
//printf("Returning %s\n", lookup_result);
*val = lookup_result;
r = cachetable_unpin(brt->cf, off, 0);
assert(r==0);
return 0;
}
}
if (node->height==0) {
r = cachetable_unpin(brt->cf, off, 0);
if (r==0) return DB_NOTFOUND;
else return r;
}
{
int result = brt_lookup_node(brt, node->u.n.children[childnum], key, keylen, val, vallen);
int result = brt_lookup_node(brt, node->u.n.children[childnum], k, v, db);
r = cachetable_unpin(brt->cf, off, 0);
if (r!=0) return r;
return result;
......@@ -1144,7 +1192,7 @@ int brt_lookup_node (BRT brt, diskoff off, bytevec key, ITEMLEN keylen, bytevec
}
int brt_lookup (BRT brt, bytevec key, unsigned int keylen, bytevec*val, unsigned int *vallen) {
int brt_lookup (BRT brt, DBT *k, DBT *v, DB *db) {
int r;
CACHEKEY *rootp;
assert(0==cachefile_count_pinned(brt->cf, 1));
......@@ -1156,7 +1204,7 @@ int brt_lookup (BRT brt, bytevec key, unsigned int keylen, bytevec*val, unsigned
return r;
}
rootp = calculate_root_offset_pointer(brt);
if ((r = brt_lookup_node(brt, *rootp, key, keylen, val, vallen))) {
if ((r = brt_lookup_node(brt, *rootp, k, v, db))) {
printf("%s:%d\n", __FILE__, __LINE__);
goto died0;
}
......
......@@ -9,11 +9,11 @@
#include "../include/ydb-constants.h"
#include "cachetable.h"
typedef struct brt *BRT;
int open_brt (const char *fname, const char *dbname, int is_create, BRT *, int nodesize, CACHETABLE);
int open_brt (const char *fname, const char *dbname, int is_create, BRT *, int nodesize, CACHETABLE, int(*)(DB*,DBT*,DBT*));
//int brt_create (BRT **, int nodesize, int n_nodes_in_cache); /* the nodesize and n_nodes in cache really should be separately configured. */
//int brt_open (BRT *, char *fname, char *dbname);
int brt_insert (BRT brt, bytevec key, ITEMLEN keylen, bytevec val, ITEMLEN vallen);
int brt_lookup (BRT brt, bytevec key, ITEMLEN keylen, bytevec*val, ITEMLEN *vallen);
int brt_insert (BRT brt, DBT *k, DBT *v, DB*db);
int brt_lookup (BRT brt, DBT *k, DBT *v, DB*db);
int close_brt (BRT);
int dump_brt (BRT brt);
void brt_fsync (BRT); /* fsync, but don't clear the caches. */
......
#include "ybt.h"
#include "brttypes.h"
int keycompare (bytevec key1, ITEMLEN key1len, bytevec key2, ITEMLEN key2len);
......
......@@ -268,7 +268,7 @@ static void test_find_insert (void) {
r=pma_insert(pma, fill_dbt(&k, "aaa", 3), fill_dbt(&v, "aaadata", 7), 0);
assert(r==BRT_OK);
ybt_init(&v);
init_dbt(&v);
r=pma_lookup(pma, fill_dbt(&k, "aaa", 3), &v, 0);
assert(r==BRT_OK);
assert(v.size==7);
......@@ -278,12 +278,12 @@ static void test_find_insert (void) {
r=pma_insert(pma, fill_dbt(&k, "bbb", 4), fill_dbt(&v, "bbbdata", 8), 0);
assert(r==BRT_OK);
ybt_init(&v);
init_dbt(&v);
r=pma_lookup(pma, fill_dbt(&k, "aaa", 3), &v, 0);
assert(r==BRT_OK);
assert(keycompare(v.data,v.size,"aaadata", 7)==0);
ybt_init(&v);
init_dbt(&v);
r=pma_lookup(pma, fill_dbt(&k, "bbb", 4), &v, 0);
assert(r==BRT_OK);
assert(keycompare(v.data,v.size,"bbbdata", 8)==0);
......@@ -415,8 +415,8 @@ void test_pma_cursor_2 (void) {
PMA_CURSOR c=0;
int r;
DBT key,val;
ybt_init(&key); key.flags=DB_DBT_REALLOC;
ybt_init(&val); val.flags=DB_DBT_REALLOC;
init_dbt(&key); key.flags=DB_DBT_REALLOC;
init_dbt(&val); val.flags=DB_DBT_REALLOC;
r=pma_create(&pma, default_compare_fun); assert(r==0);
r=pma_cursor(pma, &c); assert(r==0); assert(c!=0);
r=pma_cursor_set_position_last(c); assert(r==DB_NOTFOUND);
......@@ -434,8 +434,8 @@ void test_pma_cursor_3 (void) {
r=pma_insert(pma, fill_dbt(&k, "x", 2), fill_dbt(&v, "xx", 3), 0); assert(r==BRT_OK);
r=pma_insert(pma, fill_dbt(&k, "m", 2), fill_dbt(&v, "mm", 3), 0); assert(r==BRT_OK);
r=pma_insert(pma, fill_dbt(&k, "aa", 3), fill_dbt(&v,"a", 2), 0); assert(r==BRT_OK);
ybt_init(&key); key.flags=DB_DBT_REALLOC;
ybt_init(&val); val.flags=DB_DBT_REALLOC;
init_dbt(&key); key.flags=DB_DBT_REALLOC;
init_dbt(&val); val.flags=DB_DBT_REALLOC;
r=pma_cursor(pma, &c); assert(r==0); assert(c!=0);
r=pma_cursor_set_position_first(c); assert(r==0);
......@@ -508,8 +508,8 @@ void test_pma_compare_fun (int wrong_endian_p) {
r = pma_insert(pma, fill_dbt(&k, "00", 3), fill_dbt(&v, "00v", 4), 0); assert(r==BRT_OK);
r = pma_insert(pma, fill_dbt(&k, "01", 3), fill_dbt(&v, "01v", 4), 0); assert(r==BRT_OK);
r = pma_insert(pma, fill_dbt(&k, "11", 3), fill_dbt(&v, "11v", 4), 0); assert(r==BRT_OK);
ybt_init(&key); key.flags=DB_DBT_REALLOC;
ybt_init(&val); val.flags=DB_DBT_REALLOC;
init_dbt(&key); key.flags=DB_DBT_REALLOC;
init_dbt(&val); val.flags=DB_DBT_REALLOC;
r=pma_cursor(pma, &c); assert(r==0); assert(c!=0);
for (i=0; i<4; i++) {
......
......@@ -8,8 +8,8 @@
void ybt_test0 (void) {
void *v0=0,*v1=0;
DBT t0,t1;
ybt_init(&t0);
ybt_init(&t1);
init_dbt(&t0);
init_dbt(&t1);
ybt_set_value(&t0, "hello", 6, &v0);
ybt_set_value(&t1, "foo", 4, &v1);
assert(t0.size==6);
......@@ -25,7 +25,7 @@ void ybt_test0 (void) {
memory_check_all_free();
/* See if we can probe to find out how big something is by setting ulen=0 with YBT_USERMEM */
ybt_init(&t0);
init_dbt(&t0);
t0.flags = DB_DBT_USERMEM;
t0.ulen = 0;
ybt_set_value(&t0, "hello", 6, 0);
......@@ -33,7 +33,7 @@ void ybt_test0 (void) {
assert(t0.size==6);
/* Check realloc. */
ybt_init(&t0);
init_dbt(&t0);
t0.flags = DB_DBT_REALLOC;
v0 = 0;
ybt_set_value(&t0, "internationalization", 21, &v0);
......
......@@ -4,13 +4,13 @@
#include <errno.h>
#include <string.h>
int ybt_init (DBT *ybt) {
DBT *init_dbt (DBT *ybt) {
memset(ybt, 0, sizeof(*ybt));
return 0;
return ybt;
}
DBT *fill_dbt(DBT *dbt, bytevec k, ITEMLEN len) {
ybt_init(dbt);
init_dbt(dbt);
dbt->size=len;
dbt->data=(char*)k;
return dbt;
......
......@@ -6,7 +6,7 @@
#include "../include/db.h"
int ybt_init (DBT *);
DBT* init_dbt (DBT *);
DBT *fill_dbt(DBT *dbt, bytevec k, ITEMLEN len);
int ybt_set_value (DBT *, bytevec val, ITEMLEN vallen, void **staticptrp);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment