Commit f60c54ca authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Fixes #22, #333 and Addresses #818:

 * Create an exact bit (#818) for the sums (but doesn't calculate it.)
 * Add magic to the file header (#22)
 * Add per-subdb dupsort flags. (#333).

Did
{{{
$ cd tokudb
$ svn merge -r3990:4094 https://svn.tokutek.com/tokudb/tokudb.818/
$ cd ..
$ svn delete tokudb.818
}}}


git-svn-id: file:///svn/tokudb@4095 c7de825b-a66e-492c-adef-691d508d4ae1
parent 7cc5c172
......@@ -103,14 +103,14 @@ enum {
struct brt_header {
int dirty;
int layout_version;
unsigned int nodesize;
DISKOFF freelist;
DISKOFF unused_memory;
DISKOFF unnamed_root;
int n_named_roots; /* -1 if the only one is unnamed */
char **names;
DISKOFF *roots;
unsigned int flags;
char **names; // an array of names. NULL if subdatabases are not allowed.
DISKOFF *roots; // an array of DISKOFFs. Element 0 holds the element if no subdatabases allowed.
unsigned int *flags_array; // an array of flags. Element 0 holds the element if no subdatabases allowed.
FIFO fifo; // all the abort and commit commands. If the header gets flushed to disk, we write the fifo contents beyond the unused_memory.
};
......@@ -126,6 +126,7 @@ struct brt {
unsigned int nodesize;
unsigned int flags;
unsigned int did_set_flags;
int (*compare_fun)(DB*,const DBT*,const DBT*);
int (*dup_compare)(DB*,const DBT*,const DBT*);
DB *db; // To pass to the compare fun
......@@ -226,7 +227,14 @@ void toku_verify_all_in_mempool(BRTNODE node);
int toku_verify_brtnode (BRT brt, DISKOFF off, bytevec lorange, ITEMLEN lolen, bytevec hirange, ITEMLEN hilen, int recurse) ;
// Diff from 5 to 6: Added leafentry_estimate
#define BRT_LAYOUT_VERSION 6
enum brt_layout_version_e {
BRT_LAYOUT_VERSION_5 = 5,
BRT_LAYOUT_VERSION_6 = 6, // Diff from 5 to 6: Add leafentry_estimate
BRT_LAYOUT_VERSION_7 = 7, // Diff from 6 to 7: Add exact-bit to leafentry_estimate #818, add magic to header #22, add per-subdataase flags #333
BRT_ANTEULTIMATE_VERSION, // the version after the most recent version
BRT_LAYOUT_VERSION = BRT_ANTEULTIMATE_VERSION-1 // A hack so I don't have to change this line.
};
void toku_brtheader_free (struct brt_header *h);
#endif
......@@ -111,7 +111,7 @@ void toku_serialize_brtnode_to (int fd, DISKOFF off, BRTNODE node) {
wbuf_literal_bytes(&w, "toku", 4);
if (node->height==0) wbuf_literal_bytes(&w, "leaf", 4);
else wbuf_literal_bytes(&w, "node", 4);
wbuf_int(&w, node->layout_version);
wbuf_int(&w, BRT_LAYOUT_VERSION_7);
wbuf_ulonglong(&w, node->log_lsn.lsn);
//printf("%s:%d %lld.calculated_size=%d\n", __FILE__, __LINE__, off, calculated_size);
wbuf_uint(&w, calculated_size);
......@@ -265,9 +265,15 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode) {
}
}
result->layout_version = rbuf_int(&rc);
if (result->layout_version!=BRT_LAYOUT_VERSION) {
{
switch (result->layout_version) {
case BRT_LAYOUT_VERSION_5:
case BRT_LAYOUT_VERSION_6:
case BRT_LAYOUT_VERSION_7: goto ok_layout_version;
}
r=DB_BADFORMAT;
goto died1;
ok_layout_version: ;
}
result->disk_lsn.lsn = rbuf_ulonglong(&rc);
result->log_lsn = result->disk_lsn;
......@@ -297,7 +303,11 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode) {
u_int32_t childfp = rbuf_int(&rc);
BNC_SUBTREE_FINGERPRINT(result, i)= childfp;
check_subtree_fingerprint += childfp;
if (result->layout_version>BRT_LAYOUT_VERSION_5) {
BNC_SUBTREE_LEAFENTRY_ESTIMATE(result, i)=rbuf_ulonglong(&rc);
} else {
BNC_SUBTREE_LEAFENTRY_ESTIMATE(result, i)=0;
}
}
for (i=0; i<result->u.n.n_children-1; i++) {
if (result->flags & TOKU_DB_DUPSORT) {
......@@ -470,13 +480,25 @@ void toku_verify_counts (BRTNODE node) {
}
int toku_serialize_brt_header_size (struct brt_header *h) {
unsigned int size = 4+4+4+8+8+4; /* this size, flags, the tree's nodesize, freelist, unused_memory, named_roots. */
unsigned int size = (+8 // "tokudata"
+4 // size
+4 // tree's nodesize
+4 // version
+8 // freelist
+8 // unused memory
+4); // n_named_roots
if (h->n_named_roots<0) {
size+=8;
size+=(+8 // diskoff
+4 // flags
);
} else {
int i;
for (i=0; i<h->n_named_roots; i++) {
size+=12 + 1 + strlen(h->names[i]);
size+=(+8 // root diskoff
+4 // flags
+4 // length of null terminated string (including null)
+1 + strlen(h->names[i]) // null-terminated string
);
}
}
return size;
......@@ -484,8 +506,9 @@ int toku_serialize_brt_header_size (struct brt_header *h) {
int toku_serialize_brt_header_to_wbuf (struct wbuf *wbuf, struct brt_header *h) {
unsigned int size = toku_serialize_brt_header_size (h); // !!! seems silly to recompute the size when the caller knew it. Do we really need the size?
wbuf_literal_bytes(wbuf, "tokudata", 8);
wbuf_int (wbuf, size);
wbuf_int (wbuf, h->flags);
wbuf_int (wbuf, BRT_LAYOUT_VERSION);
wbuf_int (wbuf, h->nodesize);
wbuf_DISKOFF(wbuf, h->freelist);
wbuf_DISKOFF(wbuf, h->unused_memory);
......@@ -496,11 +519,13 @@ int toku_serialize_brt_header_to_wbuf (struct wbuf *wbuf, struct brt_header *h)
char *s = h->names[i];
unsigned int l = 1+strlen(s);
wbuf_DISKOFF(wbuf, h->roots[i]);
wbuf_int (wbuf, h->flags_array[i]);
wbuf_bytes (wbuf, s, l);
assert(l>0 && s[l-1]==0);
}
} else {
wbuf_DISKOFF(wbuf, h->unnamed_root);
wbuf_DISKOFF(wbuf, h->roots[0]);
wbuf_int (wbuf, h->flags_array[0]);
}
assert(wbuf->ndone<=wbuf->size);
return 0;
......@@ -521,32 +546,25 @@ int toku_serialize_brt_header_to (int fd, struct brt_header *h) {
return r;
}
int toku_deserialize_brtheader_from (int fd, DISKOFF off, struct brt_header **brth) {
//printf("%s:%d calling MALLOC\n", __FILE__, __LINE__);
static int deserialize_brtheader_6_or_earlier (int fd, DISKOFF off, struct brt_header **brth) {
// Deserialize a brt header from version 6 or earlier.
struct brt_header *MALLOC(h);
struct rbuf rc;
if (h==0) return errno;
int ret=-1;
if (0) { died0: toku_free(h); return ret; }
int size;
int sizeagain;
int ret = -1;
assert(off==0);
//printf("%s:%d malloced %p\n", __FILE__, __LINE__, h);
h->layout_version = BRT_LAYOUT_VERSION_6;
{
uint32_t size_n;
ssize_t r = pread(fd, &size_n, sizeof(size_n), off);
if (r==0) {
died0:
toku_free(h); return ret;
}
if (r!=sizeof(size_n)) {ret = EINVAL; goto died0;}
assert(r==sizeof(size_n)); // we already read it earlier.
size = ntohl(size_n);
}
struct rbuf rc;
rc.buf = toku_malloc(size);
if (rc.buf == NULL) {ret = ENOMEM; goto died0;}
if (0) {
died1:
toku_free(rc.buf);
goto died0;
}
if (rc.buf == NULL) { ret = ENOMEM; goto died0; }
if (0) { died1: toku_free(rc.buf); goto died0; }
rc.size=size;
if (rc.size<=0) {ret = EINVAL; goto died1;}
rc.ndone=0;
......@@ -557,13 +575,15 @@ int toku_deserialize_brtheader_from (int fd, DISKOFF off, struct brt_header **br
h->dirty=0;
sizeagain = rbuf_int(&rc);
if (sizeagain!=size) {ret = EINVAL; goto died1;}
h->flags = rbuf_int(&rc);
u_int32_t flags_for_all = rbuf_int(&rc);
h->nodesize = rbuf_int(&rc);
h->freelist = rbuf_diskoff(&rc);
h->unused_memory = rbuf_diskoff(&rc);
h->n_named_roots = rbuf_int(&rc);
if (h->n_named_roots>=0) {
int i;
MALLOC_N(h->n_named_roots, h->flags_array);
for (i=0; i<h->n_named_roots; i++) h->flags_array[i]=flags_for_all;
MALLOC_N(h->n_named_roots, h->roots);
if (h->n_named_roots > 0 && h->roots == NULL) {ret = ENOMEM; goto died1;}
if (0) {
......@@ -582,8 +602,6 @@ int toku_deserialize_brtheader_from (int fd, DISKOFF off, struct brt_header **br
goto died2;
}
for (i=0; i<h->n_named_roots; i++) {
bytevec nameptr;
unsigned int len;
......@@ -594,11 +612,13 @@ int toku_deserialize_brtheader_from (int fd, DISKOFF off, struct brt_header **br
if (len > 0 && h->names[i] == NULL) {ret = ENOMEM; goto died3;}
}
h->unnamed_root = -1;
} else {
h->roots = 0;
MALLOC_N(1, h->flags_array);
MALLOC_N(1, h->roots);
h->flags_array[0]=flags_for_all;
h->roots[0] = rbuf_diskoff(&rc);
h->names = 0;
h->unnamed_root = rbuf_diskoff(&rc);
}
if (rc.ndone!=rc.size) {ret = EINVAL; goto died3;}
toku_free(rc.buf);
......@@ -606,6 +626,77 @@ int toku_deserialize_brtheader_from (int fd, DISKOFF off, struct brt_header **br
return 0;
}
int deserialize_brtheader_7_or_later(u_int32_t size, int fd, DISKOFF off, struct brt_header **brth) {
// We already know the first 8 bytes are "tokudata", and we read in the size.
struct brt_header *MALLOC(h);
if (h==0) return errno;
int ret=-1;
if (0) { died0: toku_free(h); return ret; }
struct rbuf rc;
rc.buf = toku_malloc(size-12); // we can skip the first 12 bytes.
if (rc.buf == NULL) { ret=errno; if (0) { died1: toku_free(rc.buf); } goto died0; }
rc.size = size-12;
if (rc.size<=0) { ret = EINVAL; goto died1; }
rc.ndone = 0;
{
ssize_t r = pread(fd, rc.buf, size-12, off+12);
if (r!=size-12) { ret = EINVAL; goto died1; }
}
h->dirty=0;
h->layout_version = rbuf_int(&rc);
h->nodesize = rbuf_int(&rc);
assert(h->layout_version==BRT_LAYOUT_VERSION_7);
h->freelist = rbuf_diskoff(&rc);
h->unused_memory = rbuf_diskoff(&rc);
h->n_named_roots = rbuf_int(&rc);
if (h->n_named_roots>=0) {
int i;
int n_to_malloc = (h->n_named_roots == 0) ? 1 : h->n_named_roots;
MALLOC_N(n_to_malloc, h->flags_array); if (h->flags_array==0) { ret=errno; if (0) { died2: free(h->flags_array); } goto died1; }
MALLOC_N(n_to_malloc, h->roots); if (h->roots==0) { ret=errno; if (0) { died3: if (h->n_named_roots>=0) free(h->roots); } goto died2; }
MALLOC_N(n_to_malloc, h->names); if (h->names==0) { ret=errno; if (0) { died4: if (h->n_named_roots>=0) free(h->names); } goto died3; }
for (i=0; i<h->n_named_roots; i++) {
h->roots[i] = rbuf_diskoff(&rc);
h->flags_array[i] = rbuf_int(&rc);
bytevec nameptr;
unsigned int len;
rbuf_bytes(&rc, &nameptr, &len);
assert(strlen(nameptr)+1==len);
h->names[i] = toku_memdup(nameptr, len);
assert(len == 0 || h->names[i] != NULL); // make sure the malloc worked. Give up if this malloc failed...
}
} else {
int n_to_malloc = 1;
MALLOC_N(n_to_malloc, h->flags_array); if (h->flags_array==0) { ret=errno; goto died1; }
MALLOC_N(n_to_malloc, h->roots); if (h->roots==0) { ret=errno; goto died2; }
h->names = 0;
h->roots[0] = rbuf_diskoff(&rc);
h->flags_array[0] = rbuf_int(&rc);
}
if (rc.ndone!=rc.size) {ret = EINVAL; goto died4;}
toku_free(rc.buf);
*brth = h;
return 0;
}
int toku_deserialize_brtheader_from (int fd, DISKOFF off, struct brt_header **brth) {
//printf("%s:%d calling MALLOC\n", __FILE__, __LINE__);
assert(off==0);
//printf("%s:%d malloced %p\n", __FILE__, __LINE__, h);
char magic[12];
ssize_t r = pread(fd, magic, 12, off);
if (r==0) return -1;
if (r<0) return errno;
if (r!=12) return EINVAL;
if (memcmp(magic,"tokudata",8)==0) {
// It's version 7 or later
return deserialize_brtheader_7_or_later(ntohl(*(int*)(&magic[8])), fd, off, brth);
} else {
return deserialize_brtheader_6_or_earlier(fd, off, brth);
}
}
unsigned int toku_brt_pivot_key_len (BRT brt, struct kv_pair *pk) {
if (brt->flags & TOKU_DB_DUPSORT) {
return kv_pair_keylen(pk) + kv_pair_vallen(pk);
......
......@@ -50,7 +50,7 @@ int toku_testsetup_nonleaf (BRT brt, int height, DISKOFF *diskoff, int n_childre
int toku_testsetup_root(BRT brt, DISKOFF diskoff) {
int r = toku_read_and_pin_brt_header(brt->cf, &brt->h);
if (r!=0) return r;
brt->h->unnamed_root = diskoff;
brt->h->roots[0] = diskoff;
r = toku_unpin_brt_header(brt);
return r;
}
......
......@@ -171,25 +171,30 @@ int toku_brtnode_fetch_callback (CACHEFILE cachefile, DISKOFF nodename, void **b
return r;
}
void toku_brtheader_flush_callback (CACHEFILE cachefile, DISKOFF nodename, void *header_v, long size __attribute((unused)), BOOL write_me, BOOL keep_me, LSN lsn __attribute__((__unused__)), BOOL rename_p __attribute__((__unused__))) {
struct brt_header *h = header_v;
assert(nodename==0);
assert(!h->dirty); // shouldn't be dirty once it is unpinned.
if (write_me) {
toku_serialize_brt_header_to(toku_cachefile_fd(cachefile), h);
toku_serialize_fifo_at(toku_cachefile_fd(cachefile), h->unused_memory, h->fifo);
}
if (!keep_me) {
void toku_brtheader_free (struct brt_header *h) {
if (h->n_named_roots>0) {
int i;
for (i=0; i<h->n_named_roots; i++) {
toku_free(h->names[i]);
}
toku_free(h->names);
toku_free(h->roots);
}
toku_fifo_free(&h->fifo);
toku_free(h->roots);
toku_free(h->flags_array);
toku_free(h);
}
void toku_brtheader_flush_callback (CACHEFILE cachefile, DISKOFF nodename, void *header_v, long size __attribute((unused)), BOOL write_me, BOOL keep_me, LSN lsn __attribute__((__unused__)), BOOL rename_p __attribute__((__unused__))) {
struct brt_header *h = header_v;
assert(nodename==0);
assert(!h->dirty); // shouldn't be dirty once it is unpinned.
if (write_me) {
toku_serialize_brt_header_to(toku_cachefile_fd(cachefile), h);
toku_serialize_fifo_at(toku_cachefile_fd(cachefile), h->unused_memory, h->fifo);
}
if (!keep_me) {
toku_brtheader_free(h);
}
}
......@@ -292,11 +297,11 @@ static inline uint32_t myrandom (void) {
static void initialize_brtnode (BRT t, BRTNODE n, DISKOFF nodename, int height) {
n->tag = TYP_BRTNODE;
n->nodesize = t->h->nodesize;
n->flags = t->h->flags;
n->flags = t->flags;
n->thisnodename = nodename;
n->disk_lsn.lsn = 0; // a new one can always be 0.
n->log_lsn = n->disk_lsn;
n->layout_version = BRT_LAYOUT_VERSION;
n->layout_version = BRT_LAYOUT_VERSION_7;
n->height = height;
n->rand4fingerprint = random();
n->local_fingerprint = 0;
......@@ -1874,6 +1879,7 @@ int toku_brt_create(BRT *brt_ptr) {
memset(brt, 0, sizeof *brt);
list_init(&brt->cursors);
brt->flags = 0;
brt->did_set_flags = 0;
brt->nodesize = BRT_DEFAULT_NODE_SIZE;
brt->compare_fun = toku_default_compare_fun;
brt->dup_compare = toku_default_compare_fun;
......@@ -1884,6 +1890,7 @@ int toku_brt_create(BRT *brt_ptr) {
}
int toku_brt_set_flags(BRT brt, unsigned int flags) {
brt->did_set_flags = 1;
brt->flags = flags;
return 0;
}
......@@ -1918,18 +1925,19 @@ int toku_brt_get_fd(BRT brt, int *fdp) {
return 0;
}
int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char *dbname, int is_create, int only_create, int load_flags, CACHETABLE cachetable, TOKUTXN txn, DB *db) {
int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char *dbname, int is_create, int only_create, CACHETABLE cachetable, TOKUTXN txn, DB *db) {
/* If dbname is NULL then we setup to hold a single tree. Otherwise we setup an array. */
int r;
char *malloced_name=0;
int db_index;
//printf("%s:%d %d alloced\n", __FILE__, __LINE__, get_n_items_malloced()); toku_print_malloced_items();
WHEN_BRTTRACE(fprintf(stderr, "BRTTRACE: %s:%d toku_brt_open(%s, \"%s\", %d, %p, %d, %p)\n",
__FILE__, __LINE__, fname, dbname, is_create, newbrt, nodesize, cachetable));
if (0) { died0: assert(r); return r; }
assert(is_create || !only_create);
assert(!load_flags || !only_create);
t->fname = toku_strdup(fname_in_env);
if (t->fname==0) {
r = errno;
......@@ -1994,41 +2002,41 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char
goto died_after_read_and_pin;
}
t->h->dirty=1;
t->h->flags = t->flags;
if ((MALLOC_N(1, t->h->flags_array))==0) { r = errno; if (0) { died3: toku_free(t->h->flags_array); } goto died2; }
t->h->flags_array[0] = t->flags;
t->h->nodesize=t->nodesize;
t->h->freelist=-1;
t->h->unused_memory=2*t->nodesize;
toku_fifo_create(&t->h->fifo);
if (dbname) {
t->h->unnamed_root = -1;
t->h->n_named_roots = 1;
if ((MALLOC_N(1, t->h->names))==0) { assert(errno==ENOMEM); r=ENOMEM; if (0) { died3: toku_free(t->h->names); } goto died2; }
if ((MALLOC_N(1, t->h->roots))==0) { assert(errno==ENOMEM); r=ENOMEM; if (0) { died4: toku_free(t->h->roots); } goto died3; }
if ((t->h->names[0] = toku_strdup(dbname))==0) { assert(errno==ENOMEM); r=ENOMEM; if (0) { died5: toku_free(t->h->names[0]); } goto died4; }
if ((MALLOC_N(1, t->h->names))==0) { assert(errno==ENOMEM); r=ENOMEM; if (0) { died4: if (dbname) toku_free(t->h->names); } goto died3; }
if ((MALLOC_N(1, t->h->roots))==0) { assert(errno==ENOMEM); r=ENOMEM; if (0) { died5: if (dbname) toku_free(t->h->roots); } goto died4; }
if ((t->h->names[0] = toku_strdup(dbname))==0) { assert(errno==ENOMEM); r=ENOMEM; if (0) { died6: if (dbname) toku_free(t->h->names[0]); } goto died5; }
t->h->roots[0] = t->nodesize;
} else {
t->h->unnamed_root = t->nodesize;
MALLOC_N(1, t->h->roots); assert(t->h->roots);
t->h->roots[0] = t->nodesize;
t->h->n_named_roots = -1;
t->h->names=0;
t->h->roots=0;
}
{
LOGGEDBRTHEADER lh = {.size= toku_serialize_brt_header_size(t->h),
.flags = t->h->flags,
.flags = t->flags,
.nodesize = t->h->nodesize,
.freelist = t->h->freelist,
.unused_memory = t->h->unused_memory,
.n_named_roots = t->h->n_named_roots };
if (t->h->n_named_roots>0) {
if (t->h->n_named_roots>=0) {
lh.u.many.names = t->h->names;
lh.u.many.roots = t->h->roots;
} else {
lh.u.one.root = t->h->unnamed_root;
lh.u.one.root = t->h->roots[0];
}
if ((r=toku_log_fheader(toku_txn_logger(txn), (LSN*)0, 0, toku_txn_get_txnid(txn), toku_cachefile_filenum(t->cf), lh))) { goto died6; }
}
if ((r=setup_initial_brt_root_node(t, t->nodesize, toku_txn_logger(txn)))!=0) { died6: if (dbname) goto died5; else goto died2; }
if ((r=setup_initial_brt_root_node(t, t->nodesize, toku_txn_logger(txn)))!=0) { goto died6; }
//printf("%s:%d putting %p (%d)\n", __FILE__, __LINE__, t->h, 0);
if ((r=toku_cachetable_put(t->cf, 0, t->h, 0, toku_brtheader_flush_callback, toku_brtheader_fetch_callback, 0))) { goto died6; }
}
......@@ -2039,7 +2047,7 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char
int i;
assert(r==0);
assert(dbname);
if (t->h->unnamed_root!=-1) { r=EINVAL; goto died_after_read_and_pin; } // Cannot create a subdb in a file that is not enabled for subdbs
if (t->h->n_named_roots<0) { r=EINVAL; goto died_after_read_and_pin; } // Cannot create a subdb in a file that is not enabled for subdbs
assert(t->h->n_named_roots>=0);
for (i=0; i<t->h->n_named_roots; i++) {
if (strcmp(t->h->names[i], dbname)==0) {
......@@ -2047,11 +2055,16 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char
r = EEXIST;
goto died_after_read_and_pin;
}
else goto found_it;
else {
db_index = i;
goto found_it;
}
}
}
if ((t->h->names = toku_realloc(t->h->names, (1+t->h->n_named_roots)*sizeof(*t->h->names))) == 0) { assert(errno==ENOMEM); r=ENOMEM; goto died_after_read_and_pin; }
if ((t->h->roots = toku_realloc(t->h->roots, (1+t->h->n_named_roots)*sizeof(*t->h->roots))) == 0) { assert(errno==ENOMEM); r=ENOMEM; goto died_after_read_and_pin; }
if ((t->h->flags_array = toku_realloc(t->h->flags_array, (1+t->h->n_named_roots)*sizeof(*t->h->flags_array))) == 0) { assert(errno==ENOMEM); r=ENOMEM; goto died_after_read_and_pin; }
t->h->flags_array[t->h->n_named_roots] = t->flags;
t->h->n_named_roots++;
if ((t->h->names[t->h->n_named_roots-1] = toku_strdup(dbname)) == 0) { assert(errno==ENOMEM); r=ENOMEM; goto died_after_read_and_pin; }
//printf("%s:%d t=%p\n", __FILE__, __LINE__, t);
......@@ -2064,12 +2077,14 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char
if ((r = toku_read_and_pin_brt_header(t->cf, &t->h))!=0) goto died1;
if (!dbname) {
if (t->h->n_named_roots!=-1) { r = EINVAL; goto died_after_read_and_pin; } // requires a subdb
db_index=0;
} else {
int i;
if (t->h->n_named_roots==-1) { r=EINVAL; goto died_after_read_and_pin; } // no suddbs in the db
if (t->h->n_named_roots==-1) { r = EINVAL; goto died_after_read_and_pin; } // no suddbs in the db
// printf("%s:%d n_roots=%d\n", __FILE__, __LINE__, t->h->n_named_roots);
for (i=0; i<t->h->n_named_roots; i++) {
if (strcmp(t->h->names[i], dbname)==0) {
db_index=i;
goto found_it;
}
......@@ -2079,9 +2094,12 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char
}
found_it:
t->nodesize = t->h->nodesize; /* inherit the pagesize from the file */
if (t->flags != t->h->flags) { /* flags must match */
if (load_flags) t->flags = t->h->flags;
else {r = EINVAL; goto died_after_read_and_pin;}
if (!t->did_set_flags) {
t->flags = t->h->flags_array[db_index];
} else {
if (t->flags != t->h->flags_array[db_index]) { /* if flags have been set then flags must match */
r = EINVAL; goto died_after_read_and_pin;
}
}
}
assert(t->h);
......@@ -2103,7 +2121,6 @@ int toku_brt_remove_subdb(BRT brt, const char *dbname, u_int32_t flags) {
// We just called toku_brt_open, so it should exist...
assert(r==0);
assert(brt->h->unnamed_root==-1);
assert(brt->h->n_named_roots>=0);
for (i = 0; i < brt->h->n_named_roots; i++) {
if (strcmp(brt->h->names[i], dbname) == 0) {
......@@ -2143,7 +2160,6 @@ int toku_open_brt (const char *fname, const char *dbname, int is_create, BRT *ne
BRT brt;
int r;
const int only_create = 0;
const int load_flags = 0;
r = toku_brt_create(&brt);
if (r != 0)
......@@ -2151,7 +2167,7 @@ int toku_open_brt (const char *fname, const char *dbname, int is_create, BRT *ne
toku_brt_set_nodesize(brt, nodesize);
toku_brt_set_bt_compare(brt, compare_fun);
r = toku_brt_open(brt, fname, fname, dbname, is_create, only_create, load_flags, cachetable, txn, db);
r = toku_brt_open(brt, fname, fname, dbname, is_create, only_create, cachetable, txn, db);
if (r != 0) {
return r;
}
......@@ -2197,7 +2213,8 @@ int toku_brt_debug_mode = 0;//strcmp(key,"hello387")==0;
CACHEKEY* toku_calculate_root_offset_pointer (BRT brt) {
if (brt->database_name==0) {
return &brt->h->unnamed_root;
assert(brt->h->n_named_roots==-1);
return &brt->h->roots[0];
} else {
int i;
for (i=0; i<brt->h->n_named_roots; i++) {
......
......@@ -22,7 +22,7 @@ int toku_brt_get_nodesize(BRT, unsigned int *nodesize);
int toku_brt_set_bt_compare(BRT, int (*bt_compare)(DB *, const DBT*, const DBT*));
int toku_brt_set_dup_compare(BRT, int (*dup_compare)(DB *, const DBT*, const DBT*));
int brt_set_cachetable(BRT, CACHETABLE);
int toku_brt_open(BRT, const char *fname, const char *fname_in_env, const char *dbname, int is_create, int only_create, int load_flags, CACHETABLE ct, TOKUTXN txn, DB *db);
int toku_brt_open(BRT, const char *fname, const char *fname_in_env, const char *dbname, int is_create, int only_create, CACHETABLE ct, TOKUTXN txn, DB *db);
int toku_brt_remove_subdb(BRT brt, const char *dbname, u_int32_t flags);
int toku_brt_insert (BRT, DBT *, DBT *, TOKUTXN);
......
......@@ -28,19 +28,25 @@ void dump_header (int f, struct brt_header **header) {
int r;
r = toku_deserialize_brtheader_from (f, 0, &h); assert(r==0);
printf("brtheader:\n");
if (h->layout_version==BRT_LAYOUT_VERSION_6) printf(" layout_version<=6\n");
else printf(" layout_version=%d\n", h->layout_version);
printf(" dirty=%d\n", h->dirty);
printf(" nodesize=%d\n", h->nodesize);
printf(" freelist=%lld\n", h->freelist);
printf(" unused_memory=%lld\n", h->unused_memory);
printf(" unnamed_root=%lld\n", h->unnamed_root);
if (h->n_named_roots==-1) {
printf(" unnamed_root=%lld\n", h->roots[0]);
printf(" flags=%d\n", h->flags_array[0]);
} else {
printf(" n_named_roots=%d\n", h->n_named_roots);
if (h->n_named_roots>=0) {
int i;
for (i=0; i<h->n_named_roots; i++) {
printf(" %s -> %lld\n", h->names[i], h->roots[i]);
printf(" flags=%d\n", h->flags_array[i]);
}
}
}
printf(" flags=%d\n", h->flags);
*header = h;
printf("Fifo:\n");
r = toku_deserialize_fifo_at(f, h->unused_memory, &h->fifo);
......@@ -153,6 +159,7 @@ void dump_node (int f, DISKOFF off) {
}
if (dump_data) toku_omt_iterate(n->u.l.buffer, print_le, 0);
}
toku_brtnode_free(&n);
}
int main (int argc, const char *argv[]) {
......@@ -176,5 +183,7 @@ int main (int argc, const char *argv[]) {
for (off=h->nodesize; off<h->unused_memory; off+=h->nodesize) {
dump_node(f, off);
}
toku_brtheader_free(h);
toku_malloc_cleanup();
return 0;
}
......@@ -134,7 +134,7 @@ static void toku_recover_fheader (LSN UU(lsn), TXNID UU(txnid),FILENUM filenum,L
struct brt_header *MALLOC(h);
assert(h);
h->dirty=0;
h->flags = header.flags;
h->flags_array[0] = header.flags;
h->nodesize = header.nodesize;
h->freelist = header.freelist;
h->unused_memory = header.unused_memory;
......@@ -142,7 +142,8 @@ static void toku_recover_fheader (LSN UU(lsn), TXNID UU(txnid),FILENUM filenum,L
r=toku_fifo_create(&h->fifo);
assert(r==0);
if ((signed)header.n_named_roots==-1) {
h->unnamed_root = header.u.one.root;
MALLOC_N(1, h->roots); assert(h->roots);
h->roots[0] = header.u.one.root;
} else {
assert(0);
}
......@@ -177,7 +178,7 @@ void toku_recover_newbrtnode (LSN lsn, FILENUM filenum,DISKOFF diskoff,u_int32_t
n->thisnodename = diskoff;
n->log_lsn = n->disk_lsn = lsn;
//printf("%s:%d %p->disk_lsn=%"PRId64"\n", __FILE__, __LINE__, n, n->disk_lsn.lsn);
n->layout_version = BRT_LAYOUT_VERSION;
n->layout_version = BRT_LAYOUT_VERSION_7;
n->height = height;
n->rand4fingerprint = rand4fingerprint;
n->flags = is_dup_sort ? TOKU_DB_DUPSORT : 0; // Don't have TOKU_DB_DUP ???
......@@ -500,7 +501,7 @@ void toku_recover_leafsplit (LSN lsn, FILENUM filenum, DISKOFF old_diskoff, DISK
newn->thisnodename = new_diskoff;
newn->log_lsn = newn->disk_lsn = lsn;
//printf("%s:%d %p->disk_lsn=%"PRId64"\n", __FILE__, __LINE__, n, n->disk_lsn.lsn);
newn->layout_version = BRT_LAYOUT_VERSION;
newn->layout_version = BRT_LAYOUT_VERSION_7;
newn->height = 0;
newn->rand4fingerprint = new_rand4;
newn->flags = is_dup_sort ? TOKU_DB_DUPSORT : 0; // Don't have TOKU_DB_DUP ???
......@@ -628,7 +629,7 @@ void toku_recover_changeunnamedroot (LSN UU(lsn), FILENUM filenum, DISKOFF UU(ol
assert(pair->brt);
r = toku_read_and_pin_brt_header(pair->cf, &pair->brt->h);
assert(r==0);
pair->brt->h->unnamed_root = newroot;
pair->brt->h->roots[0] = newroot;
r = toku_unpin_brt_header(pair->brt);
}
void toku_recover_changenamedroot (LSN UU(lsn), FILENUM UU(filenum), BYTESTRING UU(name), DISKOFF UU(oldroot), DISKOFF UU(newroot)) { assert(0); }
......
......@@ -86,6 +86,7 @@ BINS = $(REGRESSION_TESTS) \
# This line intentially kept commented so I can have a \ on the end of the previous line
CHECKS = \
load_version_6 \
benchmarktest_256 \
test-assertA test-assertB \
$(REGRESSION_TESTS) \
......@@ -101,6 +102,10 @@ check_fail:
check_ok:
test 0 = 0 $(SUMMARIZE_CMD)
check_load_version_6:
(cp bench.db.ver6 bench.db.ver6.tmp && \
$(VGRIND) ../brtdump bench.db.ver6.tmp > /dev/null ) $(SUMMARIZE_CMD)
check_benchmarktest_256: benchmark-test
$(VGRIND) ./benchmark-test $(VERBVERBOSE) --valsize 256 --verify 1 $(SUMMARIZE_CMD)
......
......@@ -27,7 +27,7 @@ static void test_serialize(void) {
sn.thisnodename = sn.nodesize*20;
sn.disk_lsn.lsn = 789;
sn.log_lsn.lsn = 123456;
sn.layout_version = BRT_LAYOUT_VERSION;
sn.layout_version = BRT_LAYOUT_VERSION_7;
sn.height = 1;
sn.rand4fingerprint = randval;
sn.local_fingerprint = 0;
......@@ -59,7 +59,7 @@ static void test_serialize(void) {
assert(dn->thisnodename==nodesize*20);
assert(dn->disk_lsn.lsn==123456);
assert(dn->layout_version ==BRT_LAYOUT_VERSION);
assert(dn->layout_version ==BRT_LAYOUT_VERSION_7);
assert(dn->height == 1);
assert(dn->rand4fingerprint==randval);
assert(dn->u.n.n_children==2);
......
......@@ -914,7 +914,7 @@ static void test_brt_delete_both(int n) {
r = toku_brt_create(&t); assert(r == 0);
r = toku_brt_set_flags(t, TOKU_DB_DUP + TOKU_DB_DUPSORT); assert(r == 0);
r = toku_brt_set_nodesize(t, 4096); assert(r == 0);
r = toku_brt_open(t, fname, fname, 0, 1, 1, 0, ct, null_txn, (DB*)0);
r = toku_brt_open(t, fname, fname, 0, 1, 1, ct, null_txn, (DB*)0);
assert(r==0);
DBT key, val;
......@@ -1019,7 +1019,7 @@ static void test_new_brt_cursor_first(int n, int dup_mode) {
r = toku_brt_create(&t); assert(r == 0);
r = toku_brt_set_flags(t, dup_mode); assert(r == 0);
r = toku_brt_set_nodesize(t, 4096); assert(r == 0);
r = toku_brt_open(t, fname, fname, 0, 1, 1, 0, ct, null_txn, 0); assert(r==0);
r = toku_brt_open(t, fname, fname, 0, 1, 1, ct, null_txn, 0); assert(r==0);
DBT key, val;
int k, v;
......@@ -1073,7 +1073,7 @@ static void test_new_brt_cursor_last(int n, int dup_mode) {
r = toku_brt_create(&t); assert(r == 0);
r = toku_brt_set_flags(t, dup_mode); assert(r == 0);
r = toku_brt_set_nodesize(t, 4096); assert(r == 0);
r = toku_brt_open(t, fname, fname, 0, 1, 1, 0, ct, null_txn, 0); assert(r==0);
r = toku_brt_open(t, fname, fname, 0, 1, 1, ct, null_txn, 0); assert(r==0);
DBT key, val;
int k, v;
......@@ -1127,7 +1127,7 @@ static void test_new_brt_cursor_next(int n, int dup_mode) {
r = toku_brt_create(&t); assert(r == 0);
r = toku_brt_set_flags(t, dup_mode); assert(r == 0);
r = toku_brt_set_nodesize(t, 4096); assert(r == 0);
r = toku_brt_open(t, fname, fname, 0, 1, 1, 0, ct, null_txn, 0); assert(r==0);
r = toku_brt_open(t, fname, fname, 0, 1, 1, ct, null_txn, 0); assert(r==0);
DBT key, val;
int k, v;
......@@ -1179,7 +1179,7 @@ static void test_new_brt_cursor_prev(int n, int dup_mode) {
r = toku_brt_create(&t); assert(r == 0);
r = toku_brt_set_flags(t, dup_mode); assert(r == 0);
r = toku_brt_set_nodesize(t, 4096); assert(r == 0);
r = toku_brt_open(t, fname, fname, 0, 1, 1, 0, ct, null_txn, 0); assert(r==0);
r = toku_brt_open(t, fname, fname, 0, 1, 1, ct, null_txn, 0); assert(r==0);
DBT key, val;
int k, v;
......@@ -1231,7 +1231,7 @@ static void test_new_brt_cursor_current(int n, int dup_mode) {
r = toku_brt_create(&t); assert(r == 0);
r = toku_brt_set_flags(t, dup_mode); assert(r == 0);
r = toku_brt_set_nodesize(t, 4096); assert(r == 0);
r = toku_brt_open(t, fname, fname, 0, 1, 1, 0, ct, null_txn, 0); assert(r==0);
r = toku_brt_open(t, fname, fname, 0, 1, 1, ct, null_txn, 0); assert(r==0);
DBT key, val;
int k, v;
......@@ -1311,7 +1311,7 @@ static void test_new_brt_cursor_set_range(int n, int dup_mode) {
r = toku_brt_create(&brt); assert(r == 0);
r = toku_brt_set_flags(brt, dup_mode); assert(r == 0);
r = toku_brt_set_nodesize(brt, 4096); assert(r == 0);
r = toku_brt_open(brt, fname, fname, 0, 1, 1, 0, ct, null_txn, 0); assert(r==0);
r = toku_brt_open(brt, fname, fname, 0, 1, 1, ct, null_txn, 0); assert(r==0);
int i;
DBT key, val;
......
......@@ -23,7 +23,7 @@ static void test_flat (void) {
r = toku_brt_create(&t); assert(r==0);
r = toku_brt_set_flags(t, TOKU_DB_DUP + TOKU_DB_DUPSORT); assert(r == 0);
r = toku_brt_set_nodesize(t, 4096); assert(r == 0);
r = toku_brt_open(t, fname, fname, 0, 1, 1, 0, ct, null_txn, (DB*)0);
r = toku_brt_open(t, fname, fname, 0, 1, 1, ct, null_txn, (DB*)0);
u_int64_t i;
for (i=0; i<limit; i++) {
u_int64_t j;
......
......@@ -22,7 +22,7 @@ static void test_flat (void) {
r = toku_brt_create(&t); assert(r==0);
r = toku_brt_set_flags(t, TOKU_DB_DUP + TOKU_DB_DUPSORT); assert(r == 0);
r = toku_brt_set_nodesize(t, 4096); assert(r == 0);
r = toku_brt_open(t, fname, fname, 0, 1, 1, 0, ct, null_txn, (DB*)0);
r = toku_brt_open(t, fname, fname, 0, 1, 1, ct, null_txn, (DB*)0);
u_int64_t i;
for (i=0; i<limit; i++) {
u_int64_t j;
......
......@@ -22,7 +22,7 @@ void doit (void) {
r = toku_brt_create_cachetable(&ct, 0, ZERO_LSN, NULL_LOGGER); assert(r==0);
unlink(fname);
r = toku_brt_create(&t); assert(r==0);
r = toku_brt_open(t, fname, fname, 0, 1, 1, 0, ct, null_txn, (DB*)0); assert(r==0);
r = toku_brt_open(t, fname, fname, 0, 1, 1, ct, null_txn, (DB*)0); assert(r==0);
r = toku_brt_insert(t, toku_fill_dbt(&k, "a", 2), toku_fill_dbt(&v, "x", 2), null_txn);
assert(r==0);
......
......@@ -54,15 +54,15 @@ TDB_TESTS = $(patsubst %.c,%.tdb,$(SRCS))
BDB_DONTRUN = bug627 test_abort1 keyrange keyrange-unflat keyrange-dupsort keyrange-dupsort-unflat
BDB_TESTS = $(patsubst %.c,%.bdb,$(filter-out $(patsubst %,%.c,$(BDB_DONTRUN)),$(SRCS)))
TDB_TESTS_THAT_SHOULD_FAIL = test_groupcommit_count
TDB_TESTS_THAT_SHOULD_FAIL_LIT = test_log5.recover test_log7.recover test_log10.recover
TDB_TESTS_THAT_SHOULD_FAIL = test_groupcommit_count test-recover1 test-recover2 test-recover3 test_txn_recover3
TDB_TESTS_THAT_SHOULD_FAIL_LIT = test_log2.recover test_log3.recover test_log4.recover test_log5.recover test_log6.recover test_log7.recover test_log8.recover test_log9.recover test_log10.recover
ALL_TESTS = $(TDB_TESTS) $(BDB_TESTS)
RUN_TDB_TESTS = $(patsubst %.tdb,%.tdbrun,$(TDB_TESTS))
RUN_BDB_TESTS = $(patsubst %.bdb,%.bdbrun,$(BDB_TESTS))
RUN_ALL_TESTS = $(RUN_TDB_TESTS) $(RUN_BDB_TESTS)
MORE_TESTS = test_v6_v7_assoc3.tdbrun
RUN_ALL_TESTS = $(RUN_TDB_TESTS) $(RUN_BDB_TESTS) $(MORE_TESTS)
all build: $(ALL_TESTS)
......@@ -186,6 +186,20 @@ test_db_assoc3.tdbrun_wasbad: test_db_assoc3.tdb
# serialize these two tests since they use the same directory
test_db_assoc3.tdbrun_wasbad: test_db_assoc3.tdbrun
.phony: build_primary_db build_name_db build_expire_db
test_v6_assoc3.dir/dir.test_db_assoc3.c.tdb:
mkdir $@
build_primary_db: test_v6_assoc3.dir/dir.test_db_assoc3.c.tdb
gunzip < test_v6_assoc3.dir/dir.test_db_assoc3.c.tdb.original/primary.db.gz > test_v6_assoc3.dir/dir.test_db_assoc3.c.tdb/primary.db
build_name_db: test_v6_assoc3.dir/dir.test_db_assoc3.c.tdb
gunzip < test_v6_assoc3.dir/dir.test_db_assoc3.c.tdb.original/name.db.gz > test_v6_assoc3.dir/dir.test_db_assoc3.c.tdb/name.db
build_expire_db: test_v6_assoc3.dir/dir.test_db_assoc3.c.tdb
gunzip < test_v6_assoc3.dir/dir.test_db_assoc3.c.tdb.original/expire.db.gz > test_v6_assoc3.dir/dir.test_db_assoc3.c.tdb/expire.db
test_v6v7_assoc3.tdbrun: test_db_assoc3.tdb build_primary_db build_name_db build_expire_db
$(SETTOKUENV) (cd test_v6_assoc3.dir; LD_LIBRARY_PATH=../.. $(VGRIND) ../test_db_assoc3.tdb --seed=3 --count=1000 --more) $(SUMMARIZE_CMD)
test_db_assoc3.tdbrun: test_db_assoc3.tdb
$(SETTOKUENV) $(VGRIND) ./test_db_assoc3.tdb --seed=2 --count=100000 $(VERBVERBOSE) && \
$(SETTOKUENV) $(VGRIND) ./test_db_assoc3.tdb --seed=2 --count=100000 --more $(VERBVERBOSE) $(SUMMARIZE_CMD)
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007 Tokutek Inc. All rights reserved."
#include <stdlib.h>
#include <sys/stat.h>
#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <db.h>
// ENVDIR is defined in the Makefile
#define CKERR(r) if (r!=0) fprintf(stderr, "%s:%d error %d %s\n", __FILE__, __LINE__, r, db_strerror(r)); assert(r==0);
int main() {
DB_ENV * env = 0;
DB *db;
DB_TXN * const null_txn = 0;
const char * const fname = "test.db";
int r;
system("rm -rf " ENVDIR);
r=mkdir(ENVDIR, 0777); assert(r==0);
r=db_env_create(&env, 0); assert(r==0);
// Note: without DB_INIT_MPOOL the BDB library will fail on db->open().
r=env->open(env, ENVDIR, DB_INIT_MPOOL|DB_PRIVATE|DB_CREATE|DB_INIT_LOG, 0777); assert(r==0);
r = db_create(&db, env, 0); CKERR(r);
r = db->open(db, null_txn, fname, "main", DB_BTREE, DB_CREATE, 0666); CKERR(r);
r = db->close(db, 0); CKERR(r);
r = db_create(&db, env, 0); CKERR(r);
r = db->set_flags(db, DB_DUP | DB_DUPSORT); CKERR(r);
r = db->open(db, null_txn, fname, "subdb", DB_BTREE, DB_CREATE, 0666); CKERR(r);
r = db->close(db, 0); CKERR(r);
r = db_create(&db, env, 0); CKERR(r);
r = db->open(db, null_txn, fname, "subdb2", DB_BTREE, DB_CREATE, 0666); CKERR(r);
r = db->close(db, 0); CKERR(r);
u_int32_t flags;
r = db_create(&db, env, 0); CKERR(r);
r = db->open(db, null_txn, fname, "main", DB_BTREE, 0, 0666); CKERR(r);
r = db->get_flags(db, &flags); CKERR(r); assert(flags==0);
r = db->close(db, 0); CKERR(r);
r = db_create(&db, env, 0); CKERR(r);
r = db->open(db, null_txn, fname, "subdb", DB_BTREE, 0, 0666); CKERR(r);
r = db->get_flags(db, &flags); CKERR(r); assert(flags==(DB_DUP | DB_DUPSORT));
r = db->close(db, 0); CKERR(r);
r = db_create(&db, env, 0); CKERR(r);
r = db->open(db, null_txn, fname, "subdb2", DB_BTREE, 0, 0666); CKERR(r);
r = db->get_flags(db, &flags); CKERR(r); assert(flags==0);
r = db->close(db, 0); CKERR(r);
#if 0
const char * const fname2 = "test2.db";
// This sequence segfaults in BDB 4.3.29
// See what happens if we open a database with a subdb, when the file has only the main db.
r = db->open(db, null_txn, fname2, 0, DB_BTREE, DB_CREATE, 0666);
CKERR(r);
r = db->close(db,0);
CKERR(r);
r = db->open(db, null_txn, fname2, "main", DB_BTREE, 0, 0666);
CKERR(r);
r = db->close(db, 0);
CKERR(r);
#endif
r = env->close(env, 0);
CKERR(r);
return 0;
}
......@@ -50,12 +50,8 @@ void test_dup_flags(u_int32_t dup_flags) {
/* verify dup flags match */
r = db_create(&db, null_env, 0); assert(r == 0);
r = db->open(db, null_txn, fname, "main", DB_BTREE, 0, 0666);
#if USE_BDB
if (r == 0 && verbose)
printf("%s:%d: WARNING:open ok:dup_mode:%d\n", __FILE__, __LINE__, dup_flags);
#else
assert(flags ? r != 0 : r == 0);
#endif
r = db->close(db, 0); assert(r == 0);
r = db_create(&db, null_env, 0); assert(r == 0);
......
......@@ -2261,7 +2261,7 @@ static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *db
int is_db_excl = flags & DB_EXCL; flags&=~DB_EXCL;
int is_db_create = flags & DB_CREATE; flags&=~DB_CREATE;
int is_db_rdonly = flags & DB_RDONLY; flags&=~DB_RDONLY;
int is_db_unknown = dbtype == DB_UNKNOWN;
if (dbtype != DB_UNKNOWN && dbtype != DB_BTREE) return EINVAL;
if (flags & ~DB_THREAD) return EINVAL; // unknown flags
if (is_db_excl && !is_db_create) return EINVAL;
......@@ -2316,7 +2316,7 @@ static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *db
r = toku_brt_open(db->i->brt, db->i->full_fname, fname, dbname,
is_db_create, is_db_excl, is_db_unknown,
is_db_create, is_db_excl,
db->dbenv->i->cachetable,
txn ? txn->i->tokutxn : NULL_TXN,
db);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment