Commit d02f12e5 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

#3099 Merge 3099 onto the main line.

{{{
svn merge -r28775:29048 https://svn.tokutek.com/tokudb/toku/tokudb.3099
}}}


git-svn-id: file:///svn/toku/tokudb@29055 c7de825b-a66e-492c-adef-691d508d4ae1
parent fe0eb782
......@@ -75,7 +75,6 @@ enum {
TOKUDB_UPGRADE_FAILURE = -100011,
TOKUDB_TRY_AGAIN = -100012,
TOKUDB_NEEDS_REPAIR = -100013,
TOKUDB_FINGERPRINT_ERROR = -100014,
};
static void print_defines (void) {
......@@ -227,7 +226,6 @@ static void print_defines (void) {
dodefine(TOKUDB_UPGRADE_FAILURE);
dodefine(TOKUDB_TRY_AGAIN);
dodefine(TOKUDB_NEEDS_REPAIR);
dodefine(TOKUDB_FINGERPRINT_ERROR);
/* LOADER flags */
printf("/* LOADER flags */\n");
......
......@@ -55,7 +55,6 @@ BRT_SOURCES = \
checkpoint \
dbufio \
fifo \
fingerprint \
key \
leafentry \
leaflock \
......
......@@ -82,7 +82,6 @@ add_estimates (struct subtree_estimates *a, struct subtree_estimates *b) {
struct brtnode_nonleaf_childinfo {
u_int32_t subtree_fingerprint;
struct subtree_estimates subtree_estimates;
BLOCKNUM blocknum;
BOOL have_fullhash; // do we have the full hash?
......@@ -102,20 +101,16 @@ struct brtnode {
int layout_version_read_from_disk; // transient, not serialized to disk, (useful for debugging)
uint32_t build_id; // build_id (svn rev number) of software that wrote this node to disk
int height; /* height is always >= 0. 0 for leaf, >0 for nonleaf. */
u_int32_t rand4fingerprint;
u_int32_t local_fingerprint; /* For leaves this is everything in the buffer. For nonleaves, this is everything in the buffers, but does not include child subtree fingerprints. */
int dirty;
u_int32_t fullhash;
union node {
struct nonleaf {
// Don't actually store the subree fingerprint in the in-memory data structure.
int n_children; /* if n_children==TREE_FANOUT+1 then the tree needs to be rebalanced. */
unsigned int totalchildkeylens;
unsigned int n_bytes_in_buffers;
struct brtnode_nonleaf_childinfo *childinfos; /* One extra so we can grow */
#define BNC_SUBTREE_FINGERPRINT(node,i) ((node)->u.n.childinfos[i].subtree_fingerprint)
#define BNC_SUBTREE_ESTIMATES(node,i) ((node)->u.n.childinfos[i].subtree_estimates)
#define BNC_BLOCKNUM(node,i) ((node)->u.n.childinfos[i].blocknum)
#define BNC_BUFFER(node,i) ((node)->u.n.childinfos[i].buffer)
......@@ -259,7 +254,7 @@ int toku_deserialize_brtnode_from (int fd, BLOCKNUM off, u_int32_t /*fullhash*/,
unsigned int toku_serialize_brtnode_size(BRTNODE node); /* How much space will it take? */
int toku_keycompare (bytevec key1, ITEMLEN key1len, bytevec key2, ITEMLEN key2len);
void toku_verify_or_set_counts(BRTNODE, BOOL);
void toku_verify_or_set_counts(BRTNODE);
int toku_serialize_brt_header_size (struct brt_header *h);
int toku_serialize_brt_header_to (int fd, struct brt_header *h);
......@@ -296,11 +291,6 @@ extern CACHEKEY* toku_calculate_root_offset_pointer (BRT brt, u_int32_t *root_ha
static const BRTNODE null_brtnode=0;
//extern u_int32_t toku_calccrc32_kvpair (const void *key, int keylen, const void *val, int vallen);
//extern u_int32_t toku_calccrc32_kvpair_struct (const struct kv_pair *kvp);
extern u_int32_t toku_calc_fingerprint_cmd (u_int32_t type, XIDS xids, const void *key, u_int32_t keylen, const void *val, u_int32_t vallen);
extern u_int32_t toku_calc_fingerprint_cmdstruct (BRT_MSG cmd);
// How long is the pivot key?
unsigned int toku_brt_pivot_key_len (struct kv_pair *);
......@@ -345,11 +335,11 @@ unsigned int toku_brtnode_which_child (BRTNODE node , DBT *k, BRT t);
/* Stuff for testing */
int toku_testsetup_leaf(BRT brt, BLOCKNUM *);
int toku_testsetup_nonleaf (BRT brt, int height, BLOCKNUM *diskoff, int n_children, BLOCKNUM *children, u_int32_t *subtree_fingerprints, char **keys, int *keylens);
int toku_testsetup_nonleaf (BRT brt, int height, BLOCKNUM *diskoff, int n_children, BLOCKNUM *children, char **keys, int *keylens);
int toku_testsetup_root(BRT brt, BLOCKNUM);
int toku_testsetup_get_sersize(BRT brt, BLOCKNUM); // Return the size on disk.
int toku_testsetup_insert_to_leaf (BRT brt, BLOCKNUM, char *key, int keylen, char *val, int vallen, u_int32_t *leaf_fingerprint);
int toku_testsetup_insert_to_nonleaf (BRT brt, BLOCKNUM, enum brt_msg_type, char *key, int keylen, char *val, int vallen, u_int32_t *subtree_fingerprint);
int toku_testsetup_insert_to_leaf (BRT brt, BLOCKNUM, char *key, int keylen, char *val, int vallen);
int toku_testsetup_insert_to_nonleaf (BRT brt, BLOCKNUM, enum brt_msg_type, char *key, int keylen, char *val, int vallen);
// These two go together to do lookups in a brtnode using the keys in a command.
struct cmd_leafval_heaviside_extra {
......
......@@ -196,9 +196,7 @@ enum {
extended_node_header_overhead = (4+ // nodesize
4+ // flags
4+ // height
4+ // random for fingerprint
4), // localfingerprint
4), // height
};
#include "sub_block.h"
......@@ -219,12 +217,11 @@ toku_serialize_brtnode_size_slow (BRTNODE node) {
unsigned int hsize=0;
unsigned int csize=0;
size += 4; /* n_children */
size += 4; /* subtree fingerprint. */
size += 4*(node->u.n.n_children-1); /* key lengths*/
for (int i=0; i<node->u.n.n_children-1; i++) {
csize += toku_brt_pivot_key_len(node->u.n.childkeys[i]);
}
size += (8+4+4+1+3*8)*(node->u.n.n_children); /* For each child, a child offset, a count for the number of hash table entries, the subtree fingerprint, and 3*8 for the subtree estimates and 1 for the exact bit for the estimates. */
size += (8+4+1+3*8)*(node->u.n.n_children); /* For each child, a child offset, a count for the number of hash table entries, and 3*8 for the subtree estimates and 1 for the exact bit for the estimates. */
int n_buffers = node->u.n.n_children;
invariant(0 <= n_buffers && n_buffers < TREE_FANOUT+1);
for (int i=0; i< n_buffers; i++) {
......@@ -257,12 +254,11 @@ toku_serialize_brtnode_size (BRTNODE node) {
unsigned int result = node_header_overhead + extended_node_header_overhead;
invariant(sizeof(toku_off_t)==8);
if (node->height > 0) {
result += 4; /* subtree fingerpirnt */
result += 4; /* n_children */
result += 4*(node->u.n.n_children-1); /* key lengths*/
invariant(node->u.n.totalchildkeylens < (1<<30));
result += node->u.n.totalchildkeylens; /* the lengths of the pivot keys, without their key lengths. */
result += (8+4+4+1+3*8)*(node->u.n.n_children); /* For each child, a child offset, a count for the number of hash table entries, the subtree fingerprint, and 3*8 for the subtree estimates and one for the exact bit. */
result += (8+4+1+3*8)*(node->u.n.n_children); /* For each child, a child offset, a count for the number of hash table entries, and 3*8 for the subtree estimates and one for the exact bit. */
result += node->u.n.n_bytes_in_buffers;
result += node->u.n.n_children*stored_sub_block_map_size;
} else {
......@@ -301,11 +297,7 @@ serialize_node_header(BRTNODE node, struct wbuf *wbuf) {
wbuf_nocrc_uint(wbuf, node->nodesize);
wbuf_nocrc_uint(wbuf, node->flags);
wbuf_nocrc_int(wbuf, node->height);
//printf("%s:%d %lld rand=%08x sum=%08x height=%d\n", __FILE__, __LINE__, node->thisnodename, node->rand4fingerprint, node->subtree_fingerprint, node->height);
wbuf_nocrc_uint(wbuf, node->rand4fingerprint);
wbuf_nocrc_uint(wbuf, node->local_fingerprint);
//printf("%s:%d wrote %08x for node %lld\n", __FILE__, __LINE__, node->local_fingerprint, (long long)node->thisnodename);
//printf("%s:%d local_fingerprint=%8x\n", __FILE__, __LINE__, node->local_fingerprint);
//printf("%s:%d %lld height=%d\n", __FILE__, __LINE__, node->thisnodename, node->height);
//printf("%s:%d w.ndone=%d n_children=%d\n", __FILE__, __LINE__, w.ndone, node->n_children);
}
......@@ -313,18 +305,8 @@ static void
serialize_nonleaf(BRTNODE node, int n_sub_blocks, struct sub_block sub_block[], struct wbuf *wbuf) {
// serialize the nonleaf header
invariant(node->u.n.n_children>0);
// Local fingerprint is not actually stored while in main memory. Must calculate it.
// Subtract the child fingerprints from the subtree fingerprint to get the local fingerprint.
{
u_int32_t subtree_fingerprint = node->local_fingerprint;
for (int i = 0; i < node->u.n.n_children; i++) {
subtree_fingerprint += BNC_SUBTREE_FINGERPRINT(node, i);
}
wbuf_nocrc_uint(wbuf, subtree_fingerprint);
}
wbuf_nocrc_int(wbuf, node->u.n.n_children);
for (int i = 0; i < node->u.n.n_children; i++) {
wbuf_nocrc_uint(wbuf, BNC_SUBTREE_FINGERPRINT(node, i));
struct subtree_estimates *se = &(BNC_SUBTREE_ESTIMATES(node, i));
wbuf_nocrc_ulonglong(wbuf, se->nkeys);
wbuf_nocrc_ulonglong(wbuf, se->ndata);
......@@ -359,7 +341,6 @@ serialize_nonleaf(BRTNODE node, int n_sub_blocks, struct sub_block sub_block[],
// serialize the child buffers
{
int n_buffers = node->u.n.n_children;
u_int32_t check_local_fingerprint = 0;
for (int i = 0; i < n_buffers; i++) {
//printf("%s:%d p%d=%p n_entries=%d\n", __FILE__, __LINE__, i, node->mdicts[i], mdict_n_entries(node->mdicts[i]));
// invariant(child_buffer_map[i].offset == wbuf_get_woffset(wbuf));
......@@ -371,12 +352,8 @@ serialize_nonleaf(BRTNODE node, int n_sub_blocks, struct sub_block sub_block[],
wbuf_nocrc_xids(wbuf, xids);
wbuf_nocrc_bytes(wbuf, key, keylen);
wbuf_nocrc_bytes(wbuf, data, datalen);
check_local_fingerprint+=node->rand4fingerprint*toku_calc_fingerprint_cmd(type, xids, key, keylen, data, datalen);
});
}
//printf("%s:%d check_local_fingerprint=%8x\n", __FILE__, __LINE__, check_local_fingerprint);
if (check_local_fingerprint!=node->local_fingerprint) printf("%s:%d node=%" PRId64 " fingerprint expected=%08x actual=%08x\n", __FILE__, __LINE__, node->thisnodename.b, check_local_fingerprint, node->local_fingerprint);
invariant(check_local_fingerprint==node->local_fingerprint);
}
}
......@@ -572,8 +549,6 @@ struct deserialize_child_buffer_work {
BRTNODE node; // in node pointer
int cnum; // in child number
struct rbuf rb; // in child rbuf
uint32_t local_fingerprint; // out node fingerprint
};
static void
......@@ -584,8 +559,7 @@ deserialize_child_buffer_init(struct deserialize_child_buffer_work *dw, BRTNODE
}
static void
deserialize_child_buffer(BRTNODE node, int cnum, struct rbuf *rbuf, u_int32_t *local_fingerprint_ret) {
uint32_t local_fingerprint = 0;
deserialize_child_buffer(BRTNODE node, int cnum, struct rbuf *rbuf) {
int n_bytes_in_buffer = 0;
int n_in_this_buffer = rbuf_int(rbuf);
for (int i = 0; i < n_in_this_buffer; i++) {
......@@ -597,7 +571,6 @@ deserialize_child_buffer(BRTNODE node, int cnum, struct rbuf *rbuf, u_int32_t *l
xids_create_from_buffer(rbuf, &xids);
rbuf_bytes(rbuf, &key, &keylen); /* Returns a pointer into the rbuf. */
rbuf_bytes(rbuf, &val, &vallen);
local_fingerprint += node->rand4fingerprint * toku_calc_fingerprint_cmd(type, xids, key, keylen, val, vallen);
//printf("Found %s,%s\n", (char*)key, (char*)val);
int r = toku_fifo_enq(BNC_BUFFER(node, cnum), key, keylen, val, vallen, type, xids); /* Copies the data into the fifo */
lazy_assert_zero(r);
......@@ -608,7 +581,6 @@ deserialize_child_buffer(BRTNODE node, int cnum, struct rbuf *rbuf, u_int32_t *l
invariant(rbuf->ndone == rbuf->size);
BNC_NBYTESINBUF(node, cnum) = n_bytes_in_buffer;
*local_fingerprint_ret = local_fingerprint;
}
static void *
......@@ -618,14 +590,14 @@ deserialize_child_buffer_worker(void *arg) {
struct deserialize_child_buffer_work *dw = (struct deserialize_child_buffer_work *) workset_get(ws);
if (dw == NULL)
break;
deserialize_child_buffer(dw->node, dw->cnum, &dw->rb, &dw->local_fingerprint);
deserialize_child_buffer(dw->node, dw->cnum, &dw->rb);
}
workset_release_ref(ws);
return arg;
}
static void
deserialize_all_child_buffers(BRTNODE result, struct rbuf *rbuf, struct sub_block_map child_buffer_map[], int my_num_cores, uint32_t *check_local_fingerprint_ret) {
deserialize_all_child_buffers(BRTNODE result, struct rbuf *rbuf, struct sub_block_map child_buffer_map[], int my_num_cores) {
int n_nonempty_fifos = 0; // how many fifos are nonempty?
for(int i = 0; i < result->u.n.n_children; i++) {
if (child_buffer_map[i].size > 4)
......@@ -656,21 +628,19 @@ deserialize_all_child_buffers(BRTNODE result, struct rbuf *rbuf, struct sub_bloc
deserialize_child_buffer_worker(&ws);
workset_join(&ws);
// combine the fingerprints and update the buffer counts
uint32_t check_local_fingerprint = 0;
// Update the buffer counts
for (int i = 0; i < result->u.n.n_children; i++) {
check_local_fingerprint += work[i].local_fingerprint;
result->u.n.n_bytes_in_buffers += BNC_NBYTESINBUF(result, i);
}
// cleanup
workset_destroy(&ws);
*check_local_fingerprint_ret = check_local_fingerprint;
}
static int
deserialize_brtnode_nonleaf_from_rbuf (BRTNODE result, bytevec magic, struct rbuf *rb) {
// Note that result->layout_version_read_from_disk is initialized before this is read
int r;
if (memcmp(magic, "tokunode", 8)!=0) {
......@@ -679,17 +649,14 @@ deserialize_brtnode_nonleaf_from_rbuf (BRTNODE result, bytevec magic, struct rbu
}
result->u.n.totalchildkeylens=0;
u_int32_t subtree_fingerprint = rbuf_int(rb);
u_int32_t check_subtree_fingerprint = 0;
if (result->layout_version_read_from_disk <= BRT_LAST_LAYOUT_VERSION_WITH_FINGERPRINT) rbuf_int(rb); // ignore this int. It's a fingerprint.
result->u.n.n_children = rbuf_int(rb);
MALLOC_N(result->u.n.n_children+1, result->u.n.childinfos);
MALLOC_N(result->u.n.n_children, result->u.n.childkeys);
//printf("n_children=%d\n", result->n_children);
invariant(result->u.n.n_children>=0);
for (int i=0; i<result->u.n.n_children; i++) {
u_int32_t childfp = rbuf_int(rb);
BNC_SUBTREE_FINGERPRINT(result, i)= childfp;
check_subtree_fingerprint += childfp;
if (result->layout_version_read_from_disk <= BRT_LAST_LAYOUT_VERSION_WITH_FINGERPRINT) rbuf_int(rb); // ignore child fingerprint.
struct subtree_estimates *se = &(BNC_SUBTREE_ESTIMATES(result, i));
se->nkeys = rbuf_ulonglong(rb);
se->ndata = rbuf_ulonglong(rb);
......@@ -728,17 +695,7 @@ deserialize_brtnode_nonleaf_from_rbuf (BRTNODE result, bytevec magic, struct rbu
}
// deserialize all child buffers, like the function says
uint32_t check_local_fingerprint;
deserialize_all_child_buffers(result, rb, child_buffer_map, num_cores, &check_local_fingerprint);
if (check_local_fingerprint != result->local_fingerprint) {
fprintf(stderr, "%s:%d local fingerprint is wrong (found %8x calcualted %8x\n", __FILE__, __LINE__, result->local_fingerprint, check_local_fingerprint);
return toku_db_badformat();
}
if (check_subtree_fingerprint+check_local_fingerprint != subtree_fingerprint) {
fprintf(stderr, "%s:%d subtree fingerprint is wrong\n", __FILE__, __LINE__);
return toku_db_badformat();
}
deserialize_all_child_buffers(result, rb, child_buffer_map, num_cores);
return 0;
}
......@@ -788,7 +745,6 @@ deserialize_brtnode_leaf_from_rbuf (BRTNODE result, bytevec magic, struct rbuf *
//printf("%s:%d r PMA= %p\n", __FILE__, __LINE__, result->u.l.buffer);
toku_mempool_init(&result->u.l.buffer_mempool, rb->buf, rb->size);
u_int32_t actual_sum = 0;
u_int32_t start_of_data = rb->ndone;
OMTVALUE *MALLOC_N(n_in_buf, array);
......@@ -799,7 +755,6 @@ deserialize_brtnode_leaf_from_rbuf (BRTNODE result, bytevec magic, struct rbuf *
rb->ndone += disksize;
invariant(rb->ndone<=rb->size);
array[i]=(OMTVALUE)le;
actual_sum += x1764_memory(le, disksize);
}
}
else if (result->layout_version == BRT_LAYOUT_VERSION_13) {
......@@ -812,7 +767,6 @@ deserialize_brtnode_leaf_from_rbuf (BRTNODE result, bytevec magic, struct rbuf *
invariant(rb->ndone<=rb->size);
array[i]=(OMTVALUE)le;
actual_sum += x1764_memory(le, disksize);
}
}
else {
......@@ -821,7 +775,6 @@ deserialize_brtnode_leaf_from_rbuf (BRTNODE result, bytevec magic, struct rbuf *
toku_trace("fill array");
u_int32_t end_of_data = rb->ndone;
result->u.l.n_bytes_in_buffer += end_of_data-start_of_data + n_in_buf*OMT_ITEM_OVERHEAD;
actual_sum *= result->rand4fingerprint;
r = toku_omt_create_steal_sorted_array(&result->u.l.buffer, &array, n_in_buf, n_in_buf);
toku_trace("create omt");
if (r!=0) {
......@@ -836,13 +789,6 @@ deserialize_brtnode_leaf_from_rbuf (BRTNODE result, bytevec magic, struct rbuf *
result->u.l.buffer_mempool.free_offset = end_of_data;
if (r!=0) goto died_1;
if (actual_sum!=result->local_fingerprint) {
//fprintf(stderr, "%s:%d Corrupted checksum stored=%08x rand=%08x actual=%08x height=%d n_keys=%d\n", __FILE__, __LINE__, result->rand4fingerprint, result->local_fingerprint, actual_sum, result->height, n_in_buf);
r = toku_db_badformat();
goto died_1;
} else {
//fprintf(stderr, "%s:%d Good checksum=%08x height=%d\n", __FILE__, __LINE__, actual_sum, result->height);
}
//toku_verify_counts(result);
......@@ -883,8 +829,10 @@ deserialize_brtnode_from_rbuf (BLOCKNUM blocknum, u_int32_t fullhash, BRTNODE *b
result->thisnodename = blocknum;
result->flags = rbuf_int(rb);
result->height = rbuf_int(rb);
result->rand4fingerprint = rbuf_int(rb);
result->local_fingerprint = rbuf_int(rb);
if (result->layout_version_read_from_disk <= BRT_LAST_LAYOUT_VERSION_WITH_FINGERPRINT) {
rbuf_int(rb); // ignore rand4fingerprint
rbuf_int(rb); // ignore localfingerprint
}
// printf("%s:%d read %08x\n", __FILE__, __LINE__, result->local_fingerprint);
result->dirty = 0;
result->fullhash = fullhash;
......@@ -1045,7 +993,6 @@ deserialize_brtnode_from_rbuf_versioned (u_int32_t version, BLOCKNUM blocknum, u
LEAFENTRY *XCALLOC_N(num_les, new_les);
OMTVALUE v;
u_int32_t incremental_fingerprint = 0;
u_int32_t incremental_size = 0;
for (i = 0; i < num_les; i++) {
r = toku_omt_fetch(omt, i, &v, NULL);
......@@ -1056,10 +1003,7 @@ deserialize_brtnode_from_rbuf_versioned (u_int32_t version, BLOCKNUM blocknum, u
invariant(r==0);
invariant(new_memsize == new_disksize);
incremental_size += OMT_ITEM_OVERHEAD + new_memsize;
incremental_fingerprint += toku_le_crc(new_les[i]);
}
//Regenerate fingerprint.
node->local_fingerprint = node->rand4fingerprint * incremental_fingerprint;
//Set buffer size.
node->u.l.n_bytes_in_buffer = incremental_size;
......@@ -1249,7 +1193,6 @@ struct sum_info {
unsigned int dsum;
unsigned int msum;
unsigned int count;
u_int32_t fp;
};
static int
......@@ -1259,43 +1202,25 @@ sum_item (OMTVALUE lev, u_int32_t UU(idx), void *vsi) {
si->count++;
si->dsum += OMT_ITEM_OVERHEAD + leafentry_disksize(le);
si->msum += leafentry_memsize(le);
si->fp += toku_le_crc(le);
return 0;
}
void
toku_verify_or_set_counts (BRTNODE node, BOOL set_fingerprints) {
toku_verify_or_set_counts (BRTNODE node) {
/*foo*/
if (node->height==0) {
lazy_assert(node->u.l.buffer);
struct sum_info sum_info = {0,0,0,0};
struct sum_info sum_info = {0,0,0};
toku_omt_iterate(node->u.l.buffer, sum_item, &sum_info);
lazy_assert(sum_info.count==toku_omt_size(node->u.l.buffer));
lazy_assert(sum_info.dsum==node->u.l.n_bytes_in_buffer);
lazy_assert(sum_info.msum == node->u.l.buffer_mempool.free_offset - node->u.l.buffer_mempool.frag_size);
u_int32_t fps = node->rand4fingerprint * sum_info.fp;
if (set_fingerprints) {
node->local_fingerprint = fps;
}
lazy_assert(fps==node->local_fingerprint);
} else {
unsigned int sum = 0;
for (int i=0; i<node->u.n.n_children; i++)
sum += BNC_NBYTESINBUF(node,i);
// We don't rally care of the later buffers have garbage in them. Valgrind would do a better job noticing if we leave it uninitialized.
// But for now the code always initializes the later tables so they are 0.
uint32_t fp = 0;
int i;
for (i=0; i<node->u.n.n_children; i++)
FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type, xid,
{
fp += node->rand4fingerprint * toku_calc_fingerprint_cmd(type, xid, key, keylen, data, datalen);
});
if (set_fingerprints) {
node->local_fingerprint = fp;
}
lazy_assert(fp==node->local_fingerprint);
lazy_assert(sum==node->u.n.n_bytes_in_buffers);
}
}
......@@ -1665,7 +1590,7 @@ deserialize_brtheader_versioned (int fd, struct rbuf *rb, struct brt_header **br
toku_sync_fetch_and_increment_uint64(&upgrade_status.header_13); // how many header nodes upgraded from v13
upgrade++;
//Fall through on purpose
case BRT_LAYOUT_VERSION:
case BRT_LAYOUT_VERSION_14:
invariant(h->layout_version == BRT_LAYOUT_VERSION);
h->upgrade_brt_performed = FALSE;
if (upgrade) {
......
......@@ -19,7 +19,7 @@ int toku_testsetup_leaf(BRT brt, BLOCKNUM *blocknum) {
}
// Don't bother to clean up carefully if something goes wrong. (E.g., it's OK to have malloced stuff that hasn't been freed.)
int toku_testsetup_nonleaf (BRT brt, int height, BLOCKNUM *blocknum, int n_children, BLOCKNUM *children, u_int32_t *subtree_fingerprints, char **keys, int *keylens) {
int toku_testsetup_nonleaf (BRT brt, int height, BLOCKNUM *blocknum, int n_children, BLOCKNUM *children, char **keys, int *keylens) {
BRTNODE node;
assert(n_children<=BRT_FANOUT);
int r = toku_read_brt_header_and_store_in_cachefile(brt->cf, MAX_LSN, &brt->h, &ignore_if_was_already_open);
......@@ -32,8 +32,7 @@ int toku_testsetup_nonleaf (BRT brt, int height, BLOCKNUM *blocknum, int n_child
node->u.n.n_bytes_in_buffers=0;
int i;
for (i=0; i<n_children; i++) {
node->u.n.childinfos[i] = (struct brtnode_nonleaf_childinfo){ .subtree_fingerprint = subtree_fingerprints[i],
.subtree_estimates = zero_estimates,
node->u.n.childinfos[i] = (struct brtnode_nonleaf_childinfo){ .subtree_estimates = zero_estimates,
.blocknum = children[i],
.n_bytes_in_buffer = 0 };
r = toku_fifo_create(&BNC_BUFFER(node,i)); if (r!=0) return r;
......@@ -66,14 +65,14 @@ int toku_testsetup_get_sersize(BRT brt, BLOCKNUM diskoff) // Return the size on
return size;
}
int toku_testsetup_insert_to_leaf (BRT brt, BLOCKNUM blocknum, char *key, int keylen, char *val, int vallen, u_int32_t *subtree_fingerprint) {
int toku_testsetup_insert_to_leaf (BRT brt, BLOCKNUM blocknum, char *key, int keylen, char *val, int vallen) {
void *node_v;
int r;
r = toku_cachetable_get_and_pin(brt->cf, blocknum, toku_cachetable_hash(brt->cf, blocknum), &node_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
if (r!=0) return r;
BRTNODE node=node_v;
toku_verify_or_set_counts(node, FALSE);
toku_verify_or_set_counts(node);
assert(node->height==0);
size_t lesize, disksize;
......@@ -99,7 +98,6 @@ int toku_testsetup_insert_to_leaf (BRT brt, BLOCKNUM blocknum, char *key, int ke
LEAFENTRY storeddata=storeddatav;
// It's already there. So now we have to remove it and put the new one back in.
node->u.l.n_bytes_in_buffer -= OMT_ITEM_OVERHEAD + leafentry_disksize(storeddata);
node->local_fingerprint -= node->rand4fingerprint*toku_le_crc(storeddata);
toku_mempool_mfree(&node->u.l.buffer_mempool, storeddata, leafentry_memsize(storeddata));
// Now put the new kv in.
toku_omt_set_at(node->u.l.buffer, leafentry, idx);
......@@ -109,18 +107,16 @@ int toku_testsetup_insert_to_leaf (BRT brt, BLOCKNUM blocknum, char *key, int ke
}
node->u.l.n_bytes_in_buffer += OMT_ITEM_OVERHEAD + disksize;
node->local_fingerprint += node->rand4fingerprint*toku_le_crc(leafentry);
node->dirty=1;
*subtree_fingerprint = node->local_fingerprint;
toku_verify_or_set_counts(node, FALSE);
toku_verify_or_set_counts(node);
r = toku_unpin_brtnode(brt, node_v);
return r;
}
int toku_testsetup_insert_to_nonleaf (BRT brt, BLOCKNUM blocknum, enum brt_msg_type cmdtype, char *key, int keylen, char *val, int vallen, u_int32_t *subtree_fingerprint) {
int toku_testsetup_insert_to_nonleaf (BRT brt, BLOCKNUM blocknum, enum brt_msg_type cmdtype, char *key, int keylen, char *val, int vallen) {
void *node_v;
int r;
r = toku_cachetable_get_and_pin(brt->cf, blocknum, toku_cachetable_hash(brt->cf, blocknum), &node_v, NULL,
......@@ -137,9 +133,6 @@ int toku_testsetup_insert_to_nonleaf (BRT brt, BLOCKNUM blocknum, enum brt_msg_t
XIDS xids_0 = xids_get_root_xids();
r = toku_fifo_enq(BNC_BUFFER(node, childnum), key, keylen, val, vallen, cmdtype, xids_0);
assert(r==0);
u_int32_t fdelta = node->rand4fingerprint * toku_calc_fingerprint_cmd(cmdtype, xids_0, key, keylen, val, vallen);
node->local_fingerprint += fdelta;
*subtree_fingerprint += fdelta;
int sizediff = keylen + vallen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD + xids_get_serialize_size(xids_0);
node->u.n.n_bytes_in_buffers += sizediff;
BNC_NBYTESINBUF(node, childnum) += sizediff;
......
......@@ -5,8 +5,6 @@
/* Verify a BRT. */
/* Check:
* The fingerprint of every node (local check)
* The child's fingerprint matches the parent's copy (probably don't actually do thi syet)
* The tree is of uniform depth (and the height is correct at every node)
* For each pivot key: the max of the stuff to the left is <= the pivot key < the min of the stuff to the right.
* For each leaf node: All the keys are in strictly increasing order.
......@@ -15,31 +13,6 @@
#include "includes.h"
static int verify_local_fingerprint (BRTNODE node, int verbose) __attribute__ ((warn_unused_result));
static int
verify_local_fingerprint (BRTNODE node, int verbose) {
u_int32_t fp=0;
int i;
int r = 0;
if (node->height>0) {
for (i=0; i<node->u.n.n_children; i++)
FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type, xid,
{
fp += node->rand4fingerprint * toku_calc_fingerprint_cmd(type, xid, key, keylen, data, datalen);
});
if (fp!=node->local_fingerprint) {
if (verbose) {
fprintf(stderr, "%s:%d local fingerprints don't match\n", __FILE__, __LINE__);
}
r = TOKUDB_FINGERPRINT_ERROR;
}
} else {
toku_verify_or_set_counts(node, FALSE);
}
return r;
}
static int
compare_pairs (BRT brt, struct kv_pair *a, struct kv_pair *b) {
DBT x,y;
......@@ -143,13 +116,6 @@ toku_verify_brtnode (BRT brt, BLOCKNUM blocknum, int height,
assert(node->fullhash == fullhash); // this is a bad failure if wrong
if (height >= 0)
invariant(height == node->height); // this is a bad failure if wrong
{
int r = verify_local_fingerprint(node, verbose);
if (r) {
result = r;
if (!keep_going_on_failure) goto done;
}
}
if (node->height > 0) {
// Verify that all the pivot keys are in order.
for (int i = 0; i < node->u.n.n_children-2; i++) {
......
......@@ -320,22 +320,18 @@ brt_leaf_check_leaf_stats (BRTNODE node)
// This should be done incrementally in most cases.
static void
fixup_child_fingerprint (BRTNODE node, int childnum_of_node, BRTNODE child)
// Effect: Sum the child fingerprint (and leafentry estimates) and store them in NODE.
fixup_child_estimates (BRTNODE node, int childnum_of_node, BRTNODE child)
// Effect: Sum the child leafentry estimates and store them in NODE.
// Parameters:
// node The node to modify
// childnum_of_node Which child changed (PERFORMANCE: Later we could compute this incrementally)
// child The child that changed.
// brt The brt (not used now but it will be for logger)
// logger The logger (not used now but it will be for logger)
{
struct subtree_estimates estimates = zero_estimates;
u_int32_t sum = child->local_fingerprint;
estimates.exact = TRUE;
if (child->height>0) {
int i;
for (i=0; i<child->u.n.n_children; i++) {
sum += BNC_SUBTREE_FINGERPRINT(child,i);
struct subtree_estimates *child_se = &BNC_SUBTREE_ESTIMATES(child,i);
estimates.nkeys += child_se->nkeys;
estimates.ndata += child_se->ndata;
......@@ -351,31 +347,11 @@ fixup_child_fingerprint (BRTNODE node, int childnum_of_node, BRTNODE child)
toku_omt_iterate(child->u.l.buffer, fill_leafnode_estimates, &s);
#endif
}
// Don't try to get fancy about not modifying the fingerprint if it didn't change.
// We only call this function if we have reason to believe that the child's fingerprint did change.
BNC_SUBTREE_FINGERPRINT(node,childnum_of_node)=sum;
// We only call this function if we have reason to believe that the child changed.
BNC_SUBTREE_ESTIMATES(node,childnum_of_node)=estimates;
node->dirty=1;
}
static inline void
verify_local_fingerprint_nonleaf (BRTNODE node)
{
if (0) {
//brt_leaf_check_leaf_stats(node);
static int count=0; count++;
u_int32_t fp=0;
int i;
if (node->height==0) return;
for (i=0; i<node->u.n.n_children; i++)
FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type, xids,
fp += toku_calc_fingerprint_cmd(type, xids, key, keylen, data, datalen);
);
fp *= node->rand4fingerprint;
lazy_assert(fp==node->local_fingerprint);
}
}
static inline void
toku_verify_estimates (BRT t, BRTNODE node) {
if (node->height>0) {
......@@ -448,7 +424,6 @@ int toku_unpin_brtnode (BRT brt, BRTNODE node) {
// node->log_lsn = toku_txn_get_last_lsn(txn);
// //if (node->log_lsn.lsn>33320) printf("%s:%d node%lld lsn=%lld\n", __FILE__, __LINE__, node->thisnodename, node->log_lsn.lsn);
// }
verify_local_fingerprint_nonleaf(node);
VERIFY_NODE(brt,node);
return toku_cachetable_unpin(brt->cf, node->thisnodename, node->fullhash, (enum cachetable_dirty) node->dirty, brtnode_memory_size(node));
}
......@@ -457,9 +432,6 @@ int toku_unpin_brtnode (BRT brt, BRTNODE node) {
void toku_brtnode_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM nodename, void *brtnode_v, void *extraargs, long size __attribute__((unused)), BOOL write_me, BOOL keep_me, BOOL for_checkpoint) {
struct brt_header *h = extraargs;
BRTNODE brtnode = brtnode_v;
// if ((write_me || keep_me) && (brtnode->height==0)) {
// toku_pma_verify_fingerprint(brtnode->u.l.buffer, brtnode->rand4fingerprint, brtnode->subtree_fingerprint);
// }
if (0) {
printf("%s:%d toku_brtnode_flush_callback %p thisnodename=%" PRId64 " keep_me=%u height=%d", __FILE__, __LINE__, brtnode, brtnode->thisnodename.b, (unsigned)keep_me, brtnode->height);
if (brtnode->height==0) printf(" buf=%p mempool-base=%p", brtnode->u.l.buffer, brtnode->u.l.buffer_mempool.base);
......@@ -657,8 +629,6 @@ initialize_empty_brtnode (BRT t, BRTNODE n, BLOCKNUM nodename, int height, size_
n->layout_version_original = t->h->layout_version;
n->layout_version_read_from_disk = t->h->layout_version;
n->height = height;
n->rand4fingerprint = random();
n->local_fingerprint = 0;
n->dirty = 1;
lazy_assert(height>=0);
if (height>0) {
......@@ -700,7 +670,6 @@ brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, CACHEKEY *r
// Store the new root's identity in *ROOTP, and the node in *NEWROOTP.
// Unpin nodea and nodeb.
// Leave the new root pinned.
// Stores the sum of the fingerprints of the children into the new node. (LAZY: Later we'll only store the fingerprints when evicting.)
{
BRTNODE MALLOC(newroot);
int r;
......@@ -726,14 +695,10 @@ brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, CACHEKEY *r
r=toku_fifo_create(&BNC_BUFFER(newroot,1)); if (r!=0) return r;
BNC_NBYTESINBUF(newroot, 0)=0;
BNC_NBYTESINBUF(newroot, 1)=0;
BNC_SUBTREE_FINGERPRINT(newroot, 0)=0;
BNC_SUBTREE_FINGERPRINT(newroot, 1)=0;
BNC_SUBTREE_ESTIMATES(newroot, 0)=zero_estimates;
BNC_SUBTREE_ESTIMATES(newroot, 1)=zero_estimates;
verify_local_fingerprint_nonleaf(nodea);
verify_local_fingerprint_nonleaf(nodeb);
fixup_child_fingerprint(newroot, 0, nodea);
fixup_child_fingerprint(newroot, 1, nodeb);
fixup_child_estimates(newroot, 0, nodea);
fixup_child_estimates(newroot, 1, nodeb);
r = toku_unpin_brtnode(brt, nodea);
if (r!=0) return r;
r = toku_unpin_brtnode(brt, nodeb);
......@@ -775,7 +740,6 @@ init_childinfo(BRTNODE node, int childnum, BRTNODE child) {
BNC_BLOCKNUM(node,childnum) = child->thisnodename;
BNC_HAVE_FULLHASH(node,childnum) = FALSE;
BNC_NBYTESINBUF(node,childnum) = 0;
BNC_SUBTREE_FINGERPRINT(node,childnum) = 0;
BNC_SUBTREE_ESTIMATES(node,childnum) = zero_estimates;
int r = toku_fifo_create(&BNC_BUFFER(node,childnum));
resource_assert_zero(r);
......@@ -882,7 +846,6 @@ brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk)
toku_omt_destroy(&B->u.l.buffer); // Destroy B's empty OMT, so I can rebuild it from an array
{
u_int32_t i;
u_int32_t diff_fp = 0;
u_int32_t diff_size = 0;
struct subtree_estimates diff_est = zero_estimates;
LEAFENTRY *MALLOC_N(n_leafentries-split_at, free_us);
......@@ -894,7 +857,6 @@ brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk)
diff_est.ndata++;
diff_est.dsize += le_keylen(oldle) + le_latest_vallen(oldle);
//printf("%s:%d Added %u got %lu\n", __FILE__, __LINE__, le_keylen(oldle)+ le_latest_vallen(oldle), diff_est.dsize);
diff_fp += toku_le_crc(oldle);
diff_size += OMT_ITEM_OVERHEAD + leafentry_disksize(oldle);
memcpy(newle, oldle, leafentry_memsize(oldle));
free_us[i-split_at] = oldle; // don't free the old leafentries yet, since we compare them in the other iterations of the loops
......@@ -905,8 +867,6 @@ brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk)
toku_mempool_mfree(&node->u.l.buffer_mempool, oldle, leafentry_memsize(oldle));
}
toku_free(free_us);
node->local_fingerprint -= node->rand4fingerprint * diff_fp;
B ->local_fingerprint += B ->rand4fingerprint * diff_fp;
node->u.l.n_bytes_in_buffer -= diff_size;
B ->u.l.n_bytes_in_buffer += diff_size;
subtract_estimates(&node->u.l.leaf_stats, &diff_est);
......@@ -993,12 +953,9 @@ brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *spl
int r = toku_fifo_create(&BNC_BUFFER(B,i));
if (r!=0) return r;
BNC_NBYTESINBUF(B,i)=0;
BNC_SUBTREE_FINGERPRINT(B,i)=0;
BNC_SUBTREE_ESTIMATES(B,i)=zero_estimates;
}
verify_local_fingerprint_nonleaf(node);
for (i=n_children_in_a; i<old_n_children; i++) {
int targchild = i-n_children_in_a;
......@@ -1018,25 +975,18 @@ brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *spl
int fr = toku_fifo_peek(from_htab, &key, &keylen, &data, &datalen, &type, &xids);
if (fr!=0) break;
int n_bytes_moved = keylen+datalen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD + xids_get_serialize_size(xids);
u_int32_t old_from_fingerprint = node->local_fingerprint;
u_int32_t delta = toku_calc_fingerprint_cmd(type, xids, key, keylen, data, datalen);
u_int32_t new_from_fingerprint = old_from_fingerprint - node->rand4fingerprint*delta;
B->local_fingerprint += B->rand4fingerprint*delta;
int r = toku_fifo_enq(to_htab, key, keylen, data, datalen, type, xids);
if (r!=0) return r;
toku_fifo_deq(from_htab);
// key and data will no longer be valid
node->local_fingerprint = new_from_fingerprint;
B->u.n.n_bytes_in_buffers += n_bytes_moved;
BNC_NBYTESINBUF(B, targchild) += n_bytes_moved;
node->u.n.n_bytes_in_buffers -= n_bytes_moved;
BNC_NBYTESINBUF(node, i) -= n_bytes_moved;
// verify_local_fingerprint_nonleaf(B);
// verify_local_fingerprint_nonleaf(node);
}
// Delete a child, removing it's fingerprint, and also the preceeding pivot key. The child number must be > 0
// Delete a child, removing the preceeding pivot key. The child number must be > 0
{
lazy_assert(i>0);
if (i>n_children_in_a) {
......@@ -1049,16 +999,12 @@ brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *spl
BNC_BLOCKNUM(node, i) = make_blocknum(0);
BNC_HAVE_FULLHASH(node, i) = FALSE;
BNC_SUBTREE_FINGERPRINT(B, targchild) = BNC_SUBTREE_FINGERPRINT(node, i);
BNC_SUBTREE_FINGERPRINT(node, i) = 0;
BNC_SUBTREE_ESTIMATES(B, targchild) = BNC_SUBTREE_ESTIMATES(node, i);
BNC_SUBTREE_ESTIMATES(node, i) = zero_estimates;
lazy_assert(BNC_NBYTESINBUF(node, i) == 0);
}
// Drop the n_children now (not earlier) so that we can do the fingerprint verification at any time.
node->u.n.n_children=n_children_in_a;
for (i=n_children_in_a; i<old_n_children; i++) {
toku_fifo_free(&BNC_BUFFER(node,i));
......@@ -1071,8 +1017,6 @@ brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *spl
REALLOC_N(n_children_in_a+1, node->u.n.childinfos);
REALLOC_N(n_children_in_a, node->u.n.childkeys);
verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(B);
}
node->dirty = 1;
......@@ -1120,12 +1064,9 @@ handle_split_of_child (BRT t, BRTNODE node, int childnum,
node->dirty = 1;
verify_local_fingerprint_nonleaf(node);
XREALLOC_N(node->u.n.n_children+2, node->u.n.childinfos);
XREALLOC_N(node->u.n.n_children+1, node->u.n.childkeys);
// Slide the children over.
BNC_SUBTREE_FINGERPRINT (node, node->u.n.n_children+1)=0;
BNC_SUBTREE_ESTIMATES (node, node->u.n.n_children+1)=zero_estimates;
for (cnum=node->u.n.n_children; cnum>childnum+1; cnum--) {
node->u.n.childinfos[cnum] = node->u.n.childinfos[cnum-1];
......@@ -1136,21 +1077,14 @@ handle_split_of_child (BRT t, BRTNODE node, int childnum,
BNC_BLOCKNUM(node, childnum+1) = childb->thisnodename;
BNC_HAVE_FULLHASH(node, childnum+1) = TRUE;
BNC_FULLHASH(node, childnum+1) = childb->fullhash;
// BNC_SUBTREE_FINGERPRINT(node, childnum)=0; // leave the subtreefingerprint alone for the child, so we can log the change
BNC_SUBTREE_FINGERPRINT(node, childnum+1)=0;
BNC_SUBTREE_ESTIMATES (node, childnum+1)=zero_estimates;
fixup_child_fingerprint(node, childnum, childa);
fixup_child_fingerprint(node, childnum+1, childb);
fixup_child_estimates(node, childnum, childa);
fixup_child_estimates(node, childnum+1, childb);
r=toku_fifo_create(&BNC_BUFFER(node,childnum+1)); lazy_assert_zero(r);
verify_local_fingerprint_nonleaf(node); // The fingerprint hasn't changed and everhything is still there.
r=toku_fifo_create(&BNC_BUFFER(node,childnum)); lazy_assert_zero(r); // ??? SHould handle this error case
BNC_NBYTESINBUF(node, childnum) = 0;
BNC_NBYTESINBUF(node, childnum+1) = 0;
// Remove all the cmds from the local fingerprint. Some may get added in again when we try to push to the child.
verify_local_fingerprint_nonleaf(node);
// Slide the keys over
{
struct kv_pair *pivot = splitk->data;
......@@ -1172,17 +1106,11 @@ handle_split_of_child (BRT t, BRTNODE node, int childnum,
}
)
verify_local_fingerprint_nonleaf(node);
node->u.n.n_bytes_in_buffers -= old_count; /* By default, they are all removed. We might add them back in. */
/* Keep pushing to the children, but not if the children would require a pushdown */
toku_fifo_free(&old_h);
verify_local_fingerprint_nonleaf(childa);
verify_local_fingerprint_nonleaf(childb);
verify_local_fingerprint_nonleaf(node);
VERIFY_NODE(t, node);
VERIFY_NODE(t, childa);
VERIFY_NODE(t, childb);
......@@ -1229,9 +1157,6 @@ brt_split_child (BRT t, BRTNODE node, int childnum, BOOL *did_react)
VERIFY_NODE(t,child);
}
verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(child);
BRTNODE nodea, nodeb;
DBT splitk;
// printf("%s:%d node %" PRIu64 "->u.n.n_children=%d height=%d\n", __FILE__, __LINE__, node->thisnodename.b, node->u.n.n_children, node->height);
......@@ -1287,7 +1212,6 @@ brt_leaf_apply_clean_xids_once (BRTNODE node, LEAFENTRY le)
olddisksize = leafentry_disksize(le);
lazy_assert(oldmemsize == olddisksize);
#endif
u_int32_t old_crc = toku_le_crc(le);
size_t newmemsize;
size_t newdisksize;
......@@ -1311,10 +1235,7 @@ brt_leaf_apply_clean_xids_once (BRTNODE node, LEAFENTRY le)
toku_mempool_mfree(&node->u.l.buffer_mempool, p, size_reclaimed);
node->u.l.n_bytes_in_buffer -= OMT_ITEM_OVERHEAD + olddisksize;
node->local_fingerprint -= node->rand4fingerprint * old_crc;
node->u.l.n_bytes_in_buffer += OMT_ITEM_OVERHEAD + newdisksize;
node->local_fingerprint += node->rand4fingerprint * toku_le_crc(le);
//le_* functions (accessors) would return same key/keylen/val/vallen.
//Therefore no cursor invalidation is needed.
......@@ -1346,7 +1267,6 @@ brt_leaf_apply_full_promotion_once (BRTNODE node, LEAFENTRY le)
olddisksize = leafentry_disksize(le);
lazy_assert(oldmemsize == olddisksize);
#endif
u_int32_t old_crc = toku_le_crc(le);
size_t newmemsize;
size_t newdisksize;
......@@ -1370,10 +1290,8 @@ brt_leaf_apply_full_promotion_once (BRTNODE node, LEAFENTRY le)
toku_mempool_mfree(&node->u.l.buffer_mempool, p, size_reclaimed);
node->u.l.n_bytes_in_buffer -= OMT_ITEM_OVERHEAD + olddisksize;
node->local_fingerprint -= node->rand4fingerprint * old_crc;
node->u.l.n_bytes_in_buffer += OMT_ITEM_OVERHEAD + newdisksize;
node->local_fingerprint += node->rand4fingerprint * toku_le_crc(le);
//le_* functions (accessors) would return same key/keylen/val/vallen.
//Therefore no cursor invalidation is needed.
......@@ -1425,7 +1343,6 @@ brt_leaf_delete_leafentry (BRTNODE node, u_int32_t idx, LEAFENTRY le)
if ((r = toku_omt_delete_at(node->u.l.buffer, idx))) goto return_r;
node->u.l.n_bytes_in_buffer -= OMT_ITEM_OVERHEAD + leafentry_disksize(le);
node->local_fingerprint -= node->rand4fingerprint * toku_le_crc(le);
{
u_int32_t oldlen = le_latest_vallen(le) + le_keylen(le);
......@@ -1438,7 +1355,6 @@ brt_leaf_delete_leafentry (BRTNODE node, u_int32_t idx, LEAFENTRY le)
toku_mempool_mfree(&node->u.l.buffer_mempool, 0, leafentry_memsize(le)); // Must pass 0, since le may be no good any more.
r=0;
// printf("%s:%d rand4=%08x local_fingerprint=%08x this=%08x\n", __FILE__, __LINE__, node->rand4fingerprint, node->local_fingerprint, toku_calccrc32_kvpair_struct(kv));
return_r:
// brt_leaf_check_leaf_stats(node);
......@@ -1491,7 +1407,6 @@ brt_leaf_apply_cmd_once (BRTNODE node, const BRT_MSG cmd,
}
node->u.l.n_bytes_in_buffer -= OMT_ITEM_OVERHEAD + leafentry_disksize(le);
node->local_fingerprint -= node->rand4fingerprint * toku_le_crc(le);
//printf("%s:%d Added %u-%u got %lu\n", __FILE__, __LINE__, le_keylen(new_le), le_latest_vallen(le), node->u.l.leaf_stats.dsize);
// the ndata and nkeys remains unchanged
......@@ -1503,7 +1418,6 @@ brt_leaf_apply_cmd_once (BRTNODE node, const BRT_MSG cmd,
toku_mempool_mfree(&node->u.l.buffer_mempool, 0, size); // Must pass 0, since le may be no good any more.
node->u.l.n_bytes_in_buffer += OMT_ITEM_OVERHEAD + newdisksize;
node->local_fingerprint += node->rand4fingerprint*toku_le_crc(new_le);
if ((r = toku_omt_set_at(node->u.l.buffer, new_le, idx))) goto return_r;
......@@ -1518,7 +1432,6 @@ brt_leaf_apply_cmd_once (BRTNODE node, const BRT_MSG cmd,
if ((r = toku_omt_insert_at(node->u.l.buffer, new_le, idx))) goto return_r;
node->u.l.n_bytes_in_buffer += OMT_ITEM_OVERHEAD + newdisksize;
node->local_fingerprint += node->rand4fingerprint*toku_le_crc(new_le);
node->u.l.leaf_stats.dsize += le_latest_vallen(new_le) + le_keylen(new_le);
lazy_assert(node->u.l.leaf_stats.dsize < (1U<<31)); // make sure we didn't underflow
......@@ -1528,7 +1441,6 @@ brt_leaf_apply_cmd_once (BRTNODE node, const BRT_MSG cmd,
}
}
r=0;
// printf("%s:%d rand4=%08x local_fingerprint=%08x this=%08x\n", __FILE__, __LINE__, node->rand4fingerprint, node->local_fingerprint, toku_calccrc32_kvpair_struct(kv));
return_r:
if (maybe_free) toku_free(maybe_free);
......@@ -1546,7 +1458,6 @@ brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_MSG cmd,
// Return the serialization size in *new_size.
// The leaf could end up "too big" or "too small". It is up to the caller to fix that up.
{
// toku_pma_verify_fingerprint(node->u.l.buffer, node->rand4fingerprint, node->subtree_fingerprint);
TOKULOGGER logger = toku_cachefile_logger(t->cf);
VERIFY_NODE(t, node);
lazy_assert(node->height==0);
......@@ -1761,7 +1672,6 @@ brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_MSG cmd,
// node->dirty = 1;
// toku_pma_verify_fingerprint(node->u.l.buffer, node->rand4fingerprint, node->subtree_fingerprint);
*re = get_leaf_reactivity(node);
VERIFY_NODE(t, node);
return 0;
......@@ -1770,9 +1680,6 @@ brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_MSG cmd,
static int brt_nonleaf_cmd_once_to_child (BRT t, BRTNODE node, unsigned int childnum, BRT_MSG cmd,
enum reactivity re_array[], BOOL *did_io)
{
verify_local_fingerprint_nonleaf(node);
// if the fifo is empty and the child is in main memory and the child isn't gorged, then put it in the child
if (BNC_NBYTESINBUF(node, childnum) == 0) {
BLOCKNUM childblocknum = BNC_BLOCKNUM(node, childnum);
......@@ -1786,34 +1693,25 @@ static int brt_nonleaf_cmd_once_to_child (BRT t, BRTNODE node, unsigned int chil
// The child is in main memory.
BRTNODE child = child_v;
verify_local_fingerprint_nonleaf(child);
r = brtnode_put_cmd (t, child, cmd, &re_array[childnum], did_io);
fixup_child_fingerprint(node, childnum, child);
fixup_child_estimates(node, childnum, child);
VERIFY_NODE(t, node);
verify_local_fingerprint_nonleaf(child);
int rr = toku_unpin_brtnode(t, child);
lazy_assert_zero(rr);
verify_local_fingerprint_nonleaf(node);
return r;
}
put_in_fifo:
toku_brt_append_to_child_buffer(node, childnum, cmd->type, cmd->xids, cmd->u.id.key, cmd->u.id.val);
verify_local_fingerprint_nonleaf(node);
return 0;
}
// append a cmd to a nonleaf node's child buffer
void
toku_brt_append_to_child_buffer(BRTNODE node, int childnum, int type, XIDS xids, DBT *key, DBT *val) {
node->local_fingerprint += node->rand4fingerprint * toku_calc_fingerprint_cmd(type, xids, key->data, key->size, val->data, val->size);
int diff = key->size + val->size + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD + xids_get_serialize_size(xids);
int r = toku_fifo_enq(BNC_BUFFER(node,childnum), key->data, key->size, val->data, val->size, type, xids);
lazy_assert_zero(r);
......@@ -1887,16 +1785,12 @@ static int brt_nonleaf_cmd_once (BRT t, BRTNODE node, BRT_MSG cmd,
// The re_array[i] gets set to reactivity of any modified child.
{
verify_local_fingerprint_nonleaf(node);
/* find the right subtree */
//TODO: accesses key, val directly
unsigned int childnum = toku_brtnode_which_child(node, cmd->u.id.key, t);
int r = brt_nonleaf_cmd_once_to_child (t, node, childnum, cmd, re_array, did_io);
verify_local_fingerprint_nonleaf(node);
return r;
}
......@@ -1926,9 +1820,6 @@ brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_MSG cmd,
// The re_array[i] gets set to the reactivity of any modified child i. (And there may be several such children.)
//
{
verify_local_fingerprint_nonleaf(node);
//TODO: Accessing type directly
switch (cmd->type) {
case BRT_INSERT_NO_OVERWRITE:
......@@ -1964,7 +1855,6 @@ merge_leaf_nodes (BRTNODE a, BRTNODE b) {
while (toku_omt_size(omtb)>0) {
LEAFENTRY le = fetch_from_buf(omtb, 0);
u_int32_t le_size = leafentry_memsize(le);
u_int32_t le_crc = toku_le_crc(le);
{
LEAFENTRY new_le = mempool_malloc_from_omt(omta, &a->u.l.buffer_mempool, le_size, 0);
lazy_assert(new_le);
......@@ -1973,7 +1863,6 @@ merge_leaf_nodes (BRTNODE a, BRTNODE b) {
int r = toku_omt_insert_at(omta, new_le, idx);
lazy_assert_zero(r);
a->u.l.n_bytes_in_buffer += OMT_ITEM_OVERHEAD + le_size; //This should be disksize
a->local_fingerprint += a->rand4fingerprint * le_crc;
a->u.l.leaf_stats.ndata++;
maybe_bump_nkeys(a, +1);
......@@ -1985,7 +1874,6 @@ merge_leaf_nodes (BRTNODE a, BRTNODE b) {
int r = toku_omt_delete_at(omtb, 0);
lazy_assert_zero(r);
b->u.l.n_bytes_in_buffer -= OMT_ITEM_OVERHEAD + le_size;
b->local_fingerprint -= b->rand4fingerprint * le_crc;
b->u.l.leaf_stats.ndata--;
b->u.l.leaf_stats.dsize-= le_keylen(le) + le_latest_vallen(le);
......@@ -2022,7 +1910,6 @@ balance_leaf_nodes (BRTNODE a, BRTNODE b, struct kv_pair **splitk)
int to_idx = move_from_right ? toku_omt_size(omtto) : 0;
LEAFENTRY le = fetch_from_buf(omtfrom, from_idx);
u_int32_t le_size = leafentry_memsize(le);
u_int32_t le_crc = toku_le_crc(le);
{
LEAFENTRY new_le = mempool_malloc_from_omt(omtto, &to->u.l.buffer_mempool, le_size, 0);
lazy_assert(new_le);
......@@ -2031,7 +1918,6 @@ balance_leaf_nodes (BRTNODE a, BRTNODE b, struct kv_pair **splitk)
lazy_assert_zero(r);
maybe_bump_nkeys(to, +1);
to ->u.l.n_bytes_in_buffer += OMT_ITEM_OVERHEAD + le_size;
to ->local_fingerprint += to->rand4fingerprint * le_crc;
to->u.l.leaf_stats.ndata++;
to->u.l.leaf_stats.dsize+= le_keylen(le) + le_latest_vallen(le);
......@@ -2042,7 +1928,6 @@ balance_leaf_nodes (BRTNODE a, BRTNODE b, struct kv_pair **splitk)
int r = toku_omt_delete_at(omtfrom, from_idx);
lazy_assert_zero(r);
from->u.l.n_bytes_in_buffer -= OMT_ITEM_OVERHEAD + le_size;
from->local_fingerprint -= from->rand4fingerprint * le_crc;
from->u.l.leaf_stats.ndata--;
from->u.l.leaf_stats.dsize-= le_keylen(le) + le_latest_vallen(le);
......@@ -2100,8 +1985,8 @@ maybe_merge_pinned_leaf_nodes (BRTNODE parent, int childnum_of_parent,
int r = merge_leaf_nodes(a, b);
if (r != 0) return r;
}
fixup_child_fingerprint(parent, childnum_of_parent, a);
fixup_child_fingerprint(parent, childnum_of_parent+1, b);
fixup_child_estimates(parent, childnum_of_parent, a);
fixup_child_estimates(parent, childnum_of_parent+1, b);
return 0;
}
......@@ -2110,7 +1995,6 @@ maybe_merge_pinned_nonleaf_nodes (BRTNODE parent, int childnum_of_parent, struct
BRTNODE a, BRTNODE b,
BOOL *did_merge, BOOL *did_rebalance, struct kv_pair **splitk)
{
verify_local_fingerprint_nonleaf(a);
lazy_assert(parent_splitk);
int old_n_children = a->u.n.n_children;
int new_n_children = old_n_children + b->u.n.n_children;
......@@ -2131,29 +2015,14 @@ maybe_merge_pinned_nonleaf_nodes (BRTNODE parent, int childnum_of_parent, struct
b->u.n.n_children = 0;
b->u.n.n_bytes_in_buffers = 0;
{
static int count=0; count++;
u_int32_t fp = 0;
int i;
for (i=0; i<a->u.n.n_children; i++)
FIFO_ITERATE(BNC_BUFFER(a,i), key, keylen, data, datalen, type, xid,
fp += toku_calc_fingerprint_cmd(type, xid, key, keylen, data, datalen);
);
a->local_fingerprint = a->rand4fingerprint * fp;
//printf("%s:%d fp=%u\n", __FILE__, __LINE__, a->local_fingerprint);
verify_local_fingerprint_nonleaf(a);
}
b->local_fingerprint = 0;
a->dirty = 1;
b->dirty = 1;
fixup_child_fingerprint(parent, childnum_of_parent, a);
fixup_child_estimates(parent, childnum_of_parent, a);
// abort(); // don't forget to reuse blocknums
*did_merge = TRUE;
*did_rebalance = FALSE;
*splitk = NULL;
verify_local_fingerprint_nonleaf(a);
return 0;
}
......@@ -2181,13 +2050,11 @@ maybe_merge_pinned_nodes (BRTNODE parent, int childnum_of_parent, struct kv_pair
// splitk (OUT): If the two nodes did not get merged, the new pivot key between the two nodes.
{
lazy_assert(a->height == b->height);
verify_local_fingerprint_nonleaf(a);
parent->dirty = 1; // just to make sure
if (a->height == 0) {
return maybe_merge_pinned_leaf_nodes(parent, childnum_of_parent, a, b, parent_splitk, did_merge, did_rebalance, splitk);
} else {
int r = maybe_merge_pinned_nonleaf_nodes(parent, childnum_of_parent, parent_splitk, a, b, did_merge, did_rebalance, splitk);
verify_local_fingerprint_nonleaf(a);
return r;
}
}
......@@ -2195,7 +2062,6 @@ maybe_merge_pinned_nodes (BRTNODE parent, int childnum_of_parent, struct kv_pair
static int
brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_io, BOOL *did_react)
{
verify_local_fingerprint_nonleaf(node);
if (node->u.n.n_children < 2) return 0; // if no siblings, we are merged as best we can.
int childnuma,childnumb;
......@@ -2233,7 +2099,6 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_io, BOOL
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, t->h);
if (r!=0) return r;
childa = childnode_v;
verify_local_fingerprint_nonleaf(childa);
}
{
void *childnode_v;
......@@ -2255,9 +2120,7 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_io, BOOL
struct kv_pair *splitk_kvpair = 0;
struct kv_pair *old_split_key = node->u.n.childkeys[childnuma];
unsigned int deleted_size = toku_brt_pivot_key_len(old_split_key);
verify_local_fingerprint_nonleaf(childa);
r = maybe_merge_pinned_nodes(node, childnuma, node->u.n.childkeys[childnuma], childa, childb, &did_merge, &did_rebalance, &splitk_kvpair);
verify_local_fingerprint_nonleaf(childa);
if (childa->height>0) { int i; for (i=0; i+1<childa->u.n.n_children; i++) lazy_assert(childa->u.n.childkeys[i]); }
//(toku_verify_counts(childa), toku_verify_estimates(t,childa));
// the tree did react if a merge (did_merge) or rebalance (new spkit key) occurred
......@@ -2267,8 +2130,6 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_io, BOOL
node->u.n.totalchildkeylens -= deleted_size; // The key was free()'d inside the maybe_merge_pinned_nodes.
verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(childa);
if (did_merge) {
toku_fifo_free(&BNC_BUFFER(node, childnumb));
node->u.n.n_children--;
......@@ -2280,10 +2141,8 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_io, BOOL
&node->u.n.childkeys[childnuma+1],
(node->u.n.n_children-childnumb)*sizeof(node->u.n.childkeys[0]));
REALLOC_N(node->u.n.n_children-1, node->u.n.childkeys);
fixup_child_fingerprint(node, childnuma, childa);
fixup_child_estimates(node, childnuma, childa);
lazy_assert(node->u.n.childinfos[childnuma].blocknum.b == childa->thisnodename.b);
verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(childa);
childa->dirty = 1; // just to make sure
childb->dirty = 1; // just to make sure
} else {
......@@ -2291,7 +2150,6 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_io, BOOL
// If we didn't merge the nodes, then we need the correct pivot.
node->u.n.childkeys[childnuma] = splitk_kvpair;
node->u.n.totalchildkeylens += toku_brt_pivot_key_len(node->u.n.childkeys[childnuma]);
verify_local_fingerprint_nonleaf(node);
node->dirty = 1;
}
}
......@@ -2312,7 +2170,6 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_io, BOOL
if (rra) return rra;
if (rrb) return rrb;
}
verify_local_fingerprint_nonleaf(node);
return r;
}
......@@ -2348,8 +2205,6 @@ brt_handle_maybe_reactive_child_at_root (BRT brt, CACHEKEY *rootp, BRTNODE *node
int r = brt_nonleaf_split(brt, node, &nodea, &nodeb, &splitk);
if (r!=0) return r;
}
//verify_local_fingerprint_nonleaf(nodea);
//verify_local_fingerprint_nonleaf(nodeb);
return brt_init_new_root(brt, nodea, nodeb, splitk, rootp, nodep);
}
case RE_FUSIBLE:
......@@ -2414,9 +2269,6 @@ flush_this_child (BRT t, BRTNODE node, int childnum, enum reactivity *child_re,
toku_fill_dbt(&hv, val, vallen)} };
int n_bytes_removed = (hk.size + hv.size + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD + xids_get_serialize_size(xids));
u_int32_t old_from_fingerprint = node->local_fingerprint;
u_int32_t delta = toku_calc_fingerprint_cmd(type, xids, key, keylen, val, vallen);
u_int32_t new_from_fingerprint = old_from_fingerprint - node->rand4fingerprint*delta;
//printf("%s:%d random_picked\n", __FILE__, __LINE__);
r = brtnode_put_cmd (t, child, &brtcmd, child_re, did_io);
......@@ -2428,7 +2280,6 @@ flush_this_child (BRT t, BRTNODE node, int childnum, enum reactivity *child_re,
//printf("%s:%d deleted status=%d\n", __FILE__, __LINE__, r);
if (r!=0) goto return_r;
node->local_fingerprint = new_from_fingerprint;
node->u.n.n_bytes_in_buffers -= n_bytes_removed;
BNC_NBYTESINBUF(node, childnum) -= n_bytes_removed;
node->dirty = 1;
......@@ -2436,9 +2287,8 @@ flush_this_child (BRT t, BRTNODE node, int childnum, enum reactivity *child_re,
}
if (0) printf("%s:%d done random picking\n", __FILE__, __LINE__);
}
verify_local_fingerprint_nonleaf(node);
return_r:
fixup_child_fingerprint(node, childnum, child);
fixup_child_estimates(node, childnum, child);
{
int rr=toku_unpin_brtnode(t, child);
if (rr!=0) return rr;
......@@ -2469,18 +2319,13 @@ brtnode_put_cmd (BRT t, BRTNODE node, BRT_MSG cmd, enum reactivity *re, BOOL *di
// If we perform I/O then set *did_io to true.
// If a nonleaf node becomes overfull then we will flush some child.
{
verify_local_fingerprint_nonleaf(node);
if (node->height==0) {
return brt_leaf_put_cmd(t, node, cmd, re);
} else {
verify_local_fingerprint_nonleaf(node);
enum reactivity child_re[node->u.n.n_children];
{ int i; for (i=0; i<node->u.n.n_children; i++) child_re[i]=RE_STABLE; }
int r = brt_nonleaf_put_cmd(t, node, cmd, child_re, did_io);
if (r!=0) goto return_r;
verify_local_fingerprint_nonleaf(node);
// Now we may have overfilled node. So we'll flush the heaviest child until we are happy.
while (!*did_io // Don't flush if we've done I/O.
&& nonleaf_node_is_gorged(node) // Don't flush if the node is small enough.
......@@ -2530,7 +2375,6 @@ static int push_something_at_root (BRT brt, BRTNODE *nodep, CACHEKEY *rootp, BRT
BOOL did_io = FALSE;
{
int r = brtnode_put_cmd(brt, node, cmd, &re, &did_io);
verify_local_fingerprint_nonleaf(node);
if (r!=0) return r;
//if (should_split) printf("%s:%d Pushed something simple, should_split=1\n", __FILE__, __LINE__);
}
......@@ -2538,7 +2382,6 @@ static int push_something_at_root (BRT brt, BRTNODE *nodep, CACHEKEY *rootp, BRT
{
int r = brt_handle_maybe_reactive_child_at_root(brt, rootp, nodep, re);
verify_local_fingerprint_nonleaf(*nodep);
return r;
}
}
......@@ -2599,12 +2442,10 @@ int toku_brt_root_put_cmd(BRT brt, BRT_MSG cmd)
lazy_assert(node->fullhash==fullhash);
brt_verify_flags(brt, node);
verify_local_fingerprint_nonleaf(node);
if ((r = push_something_at_root(brt, &node, rootp, cmd))) {
toku_unpin_brtnode(brt, node); // ignore any error code on the unpin.
return r;
}
verify_local_fingerprint_nonleaf(node);
r = toku_unpin_brtnode(brt, node);
lazy_assert(r == 0);
return 0;
......@@ -3033,7 +2874,6 @@ static int setup_initial_brt_root_node (BRT t, BLOCKNUM blocknum) {
toku_free(node);
return r;
}
// verify_local_fingerprint_nonleaf(node);
r = toku_unpin_brtnode(t, node);
if (r!=0) {
toku_free(node);
......@@ -4606,9 +4446,6 @@ brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, BRT_
// Effect: Search in a node's child.
// If we change the shape, set *did_react = TRUE. Else set *did_react = FALSE.
{
verify_local_fingerprint_nonleaf(node);
/* if the child's buffer is not empty then empty it */
if (BNC_NBYTESINBUF(node, childnum) > 0) {
BOOL did_io = FALSE;
......@@ -4632,14 +4469,10 @@ brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, BRT_
struct unlock_brtnode_extra unlock_extra = {brt,childnode};
struct unlockers next_unlockers = {TRUE, unlock_brtnode_fun, (void*)&unlock_extra, unlockers};
verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(childnode);
enum reactivity child_re = RE_STABLE;
int r = brt_search_node(brt, childnode, search, getf, getf_v, &child_re, doprefetch, brtcursor, &next_unlockers);
if (r!=TOKUDB_TRY_AGAIN) {
// Even if r is reactive, we want to handle the maybe reactive child.
verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(childnode);
#if TOKU_DO_PREFETCH
// maybe prefetch the next child
......@@ -4660,7 +4493,6 @@ brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, BRT_
*parent_re = get_nonleaf_reactivity(node);
verify_local_fingerprint_nonleaf(node);
} else {
// try again.
assert(!next_unlockers.locked);
......@@ -4675,7 +4507,6 @@ brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, BRT_GET_CAL
int count=0;
again:
count++;
verify_local_fingerprint_nonleaf(node);
{
int c;
......@@ -4694,7 +4525,6 @@ brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, BRT_GET_CAL
if (search->compare(search,
toku_fill_dbt(&pivotkey, kv_pair_key(pivot), kv_pair_keylen(pivot)))) {
BOOL did_change_shape = FALSE;
verify_local_fingerprint_nonleaf(node);
int r = brt_search_child(brt, node, child[c], search, getf, getf_v, re, doprefetch, brtcursor, &did_change_shape, unlockers);
lazy_assert(r != EAGAIN);
if (r == 0) return r; //Success
......@@ -4705,7 +4535,6 @@ brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, BRT_GET_CAL
/* check the first (left) or last (right) node if nothing has been found */
BOOL ignore_did_change_shape; // ignore this
verify_local_fingerprint_nonleaf(node);
return brt_search_child(brt, node, child[c], search, getf, getf_v, re, doprefetch, brtcursor, &ignore_did_change_shape, unlockers);
}
}
......@@ -4713,7 +4542,6 @@ brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, BRT_GET_CAL
static int
brt_search_node (BRT brt, BRTNODE node, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v, enum reactivity *re, BOOL *doprefetch, BRT_CURSOR brtcursor, UNLOCKERS unlockers)
{
verify_local_fingerprint_nonleaf(node);
if (node->height > 0)
return brt_search_nonleaf_node(brt, node, search, getf, getf_v, re, doprefetch, brtcursor, unlockers);
else {
......
......@@ -17,10 +17,12 @@ enum brt_layout_version_e {
BRT_LAYOUT_VERSION_11 = 11, // Diff from 10 to 11: Nested transaction leafentries (completely redesigned). BRT_CMDs on disk now support XIDS (multiple txnids) instead of exactly one.
BRT_LAYOUT_VERSION_12 = 12, // Diff from 11 to 12: Added BRT_CMD 'BRT_INSERT_NO_OVERWRITE', compressed block format, num old blocks
BRT_LAYOUT_VERSION_13 = 13, // Diff from 12 to 13: Fixed loader pivot bug, added build_id to every node, timestamps to brtheader
BRT_LAYOUT_VERSION_14 = 14, // Diff from 13 to 14: Added MVCC, deprecated TOKU_DB_VALCMP_BUILTIN(_13)
BRT_LAYOUT_VERSION_14 = 14, // Diff from 13 to 14: Added MVCC, deprecated TOKU_DB_VALCMP_BUILTIN(_13), Remove fingerprints
BRT_NEXT_VERSION, // the version after the current version
BRT_LAYOUT_VERSION = BRT_NEXT_VERSION-1, // A hack so I don't have to change this line.
BRT_LAYOUT_MIN_SUPPORTED_VERSION = BRT_LAYOUT_VERSION_13 // Minimum version supported
};
// Define this symbolically so the knowledge of exactly which layout version got rid of fingerprints isn't spread all over the code.
#define BRT_LAST_LAYOUT_VERSION_WITH_FINGERPRINT BRT_LAYOUT_VERSION_13
#endif
......@@ -149,19 +149,11 @@ dump_node (int f, BLOCKNUM blocknum, struct brt_header *h) {
printf(" layout_version_original=%d\n", n->layout_version_original);
printf(" layout_version_read_from_disk=%d\n", n->layout_version_read_from_disk);
printf(" build_id=%d\n", n->build_id);
printf(" rand4fp =%08x\n", n->rand4fingerprint);
printf(" localfp =%08x\n", n->local_fingerprint);
if (n->height>0) {
printf(" n_children=%d\n", n->u.n.n_children);
printf(" total_childkeylens=%u\n", n->u.n.totalchildkeylens);
printf(" n_bytes_in_buffers=%u\n", n->u.n.n_bytes_in_buffers);
int i;
printf(" subfingerprints={");
for (i=0; i<n->u.n.n_children; i++) {
if (i>0) printf(" ");
printf("%08x", BNC_SUBTREE_FINGERPRINT(n, i));
}
printf("}\n");
printf(" subleafentry_estimates={");
for (i=0; i<n->u.n.n_children; i++) {
if (i>0) printf(" ");
......
......@@ -1988,7 +1988,6 @@ int merge_files (struct merge_fileset *fs,
struct subtree_info {
int64_t block;
struct subtree_estimates subtree_estimates;
int32_t fingerprint;
};
struct subtrees_info {
......@@ -2008,14 +2007,13 @@ static void subtrees_info_destroy(struct subtrees_info *p) {
p->subtrees = NULL;
}
static void allocate_node (struct subtrees_info *sts, int64_t b, const struct subtree_estimates est, const int fingerprint) {
static void allocate_node (struct subtrees_info *sts, int64_t b, const struct subtree_estimates est) {
if (sts->n_subtrees >= sts->n_subtrees_limit) {
sts->n_subtrees_limit *= 2;
XREALLOC_N(sts->n_subtrees_limit, sts->subtrees);
}
sts->subtrees[sts->n_subtrees].subtree_estimates = est;
sts->subtrees[sts->n_subtrees].block = b;
sts->subtrees[sts->n_subtrees].fingerprint = fingerprint;
sts->n_subtrees++;
}
......@@ -2029,9 +2027,6 @@ struct dbuf {
struct leaf_buf {
int64_t blocknum;
struct dbuf dbuf;
unsigned int rand4fingerprint;
unsigned int local_fingerprint;
int local_fingerprint_p;
int nkeys, ndata, dsize, n_in_buf;
int nkeys_p, ndata_p, dsize_p, partitions_p, n_in_buf_p;
TXNID xid;
......@@ -2231,14 +2226,10 @@ static struct leaf_buf *start_leaf (struct dbout *out, const DESCRIPTOR UU(desc)
putbuf_int32(&lbuf->dbuf, target_nodesize);
putbuf_int32(&lbuf->dbuf, flags);
putbuf_int32(&lbuf->dbuf, height);
lbuf->rand4fingerprint = loader_random();
putbuf_int32(&lbuf->dbuf, lbuf->rand4fingerprint);
lbuf->local_fingerprint = 0;
lbuf->nkeys = lbuf->ndata = lbuf->dsize = 0;
lbuf->n_in_buf = 0;
// leave these uninitialized for now.
lbuf->local_fingerprint_p = lbuf->dbuf.off; lbuf->dbuf.off+=4;
lbuf->nkeys_p = lbuf->dbuf.off; lbuf->dbuf.off+=8;
lbuf->ndata_p = lbuf->dbuf.off; lbuf->dbuf.off+=8;
lbuf->dsize_p = lbuf->dbuf.off; lbuf->dbuf.off+=8;
......@@ -2415,7 +2406,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
old_n_rows_remaining = n_rows_remaining;
struct subtree_estimates est = make_subtree_estimates(lbuf->nkeys, lbuf->ndata, lbuf->dsize, TRUE);
allocate_node(&sts, lblock, est, lbuf->local_fingerprint);
allocate_node(&sts, lblock, est);
n_pivots++;
......@@ -2458,7 +2449,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
if (lbuf) {
struct subtree_estimates est = make_subtree_estimates(lbuf->nkeys, lbuf->ndata, lbuf->dsize, TRUE);
allocate_node(&sts, lblock, est, lbuf->local_fingerprint);
allocate_node(&sts, lblock, est);
{
int p = progress_allocation/2;
finish_leafnode(&out, lbuf, p, bl);
......@@ -2826,13 +2817,8 @@ static void add_pair_to_leafnode (struct leaf_buf *lbuf, unsigned char *key, int
if (!lbuf->dbuf.error) {
invariant(le_off + le_len == lbuf->dbuf.off);
u_int32_t this_x = x1764_memory(lbuf->dbuf.buf + le_off, le_len);
u_int32_t this_prod = lbuf->rand4fingerprint * this_x;
lbuf->local_fingerprint += this_prod;
if (0) {
printf("%s:%d x1764(buf+%d, %d)=%8x\n", __FILE__, __LINE__, le_off, le_len, this_x);
printf("%s:%d rand4fingerprint=%8x\n", __FILE__, __LINE__, lbuf->rand4fingerprint);
printf("%s:%d this_prod=%8x\n", __FILE__, __LINE__, this_prod);
printf("%s:%d local_fingerprint=%8x\n", __FILE__, __LINE__, lbuf->local_fingerprint);
}
}
}
......@@ -2850,8 +2836,6 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr
int result = 0;
//printf(" finishing leaf node progress=%d fin at %d\n", bl->progress, bl->progress+progress_allocation);
//printf("local_fingerprint=%8x\n", lbuf->local_fingerprint);
putbuf_int32_at(&lbuf->dbuf, lbuf->local_fingerprint_p, lbuf->local_fingerprint);
putbuf_int64_at(&lbuf->dbuf, lbuf->nkeys_p, lbuf->nkeys);
putbuf_int64_at(&lbuf->dbuf, lbuf->ndata_p, lbuf->ndata);
putbuf_int64_at(&lbuf->dbuf, lbuf->dsize_p, lbuf->dsize);
......@@ -3098,12 +3082,10 @@ static int setup_nonleaf_block (int n_children,
struct subtree_estimates new_subtree_estimates = zero_estimates;
struct subtree_info *XMALLOC_N(n_children, subtrees_array);
int32_t fingerprint = 0;
for (int i = 0; i < n_children; i++) {
int64_t from_blocknum = first_child_offset_in_subtrees + i;
subtrees_array[i] = subtrees->subtrees[from_blocknum];
add_estimates(&new_subtree_estimates, &subtrees->subtrees[from_blocknum].subtree_estimates);
fingerprint += subtrees->subtrees[from_blocknum].fingerprint;
}
int r = allocate_block(out, blocknum);
......@@ -3111,7 +3093,7 @@ static int setup_nonleaf_block (int n_children,
toku_free(subtrees_array);
result = r;
} else {
allocate_node(next_subtrees, *blocknum, new_subtree_estimates, fingerprint);
allocate_node(next_subtrees, *blocknum, new_subtree_estimates);
*pivots_p = pivots;
*subtrees_info_p = subtrees_array;
......@@ -3147,8 +3129,6 @@ static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknu
node->height=height;
node->u.n.n_children = n_children;
node->flags = 0;
node->local_fingerprint = 0;
node->rand4fingerprint = loader_random();
XMALLOC_N(n_children-1, node->u.n.childkeys);
for (int i=0; i<n_children-1; i++)
......@@ -3168,7 +3148,6 @@ static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknu
XMALLOC_N(n_children, node->u.n.childinfos);
for (int i=0; i<n_children; i++) {
struct brtnode_nonleaf_childinfo *ci = &node->u.n.childinfos[i];
ci->subtree_fingerprint = subtree_info[i].fingerprint;
ci->subtree_estimates = subtree_info[i].subtree_estimates;
ci->blocknum = make_blocknum(subtree_info[i].block);
ci->have_fullhash = FALSE;
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "includes.h"
// Calculate the fingerprint for a kvpair
static void toku_calc_more_murmur_kvpair (struct x1764 *mm, const void *key, int keylen, const void *val, int vallen) {
int i;
i = toku_htod32(keylen);
x1764_add(mm, (void*)&i, 4);
x1764_add(mm, key, keylen);
i = toku_htod32(vallen);
x1764_add(mm, (void*)&i, 4);
x1764_add(mm, val, vallen);
}
#if 0
u_int32_t toku_calccrc32_kvpair (const void *key, int keylen, const void *val, int vallen) {
return toku_calc_more_crc32_kvpair(toku_null_crc, key, keylen, val, vallen);
}
#endif
u_int32_t toku_calc_fingerprint_cmd (u_int32_t type, XIDS xids, const void *key, u_int32_t keylen, const void *val, u_int32_t vallen) {
unsigned char type_c = (unsigned char)type;
struct x1764 mm;
x1764_init(&mm);
x1764_add(&mm, &type_c, 1);
toku_calc_more_murmur_xids(&mm, xids);
toku_calc_more_murmur_kvpair(&mm, key, keylen, val, vallen);
return x1764_finish(&mm);
}
......@@ -4,11 +4,6 @@
#include "includes.h"
u_int32_t toku_le_crc(LEAFENTRY v) {
return x1764_memory(v, leafentry_memsize(v));
}
void wbuf_LEAFENTRY(struct wbuf *w, LEAFENTRY le) {
wbuf_literal_bytes(w, le, leafentry_disksize(le));
}
......
......@@ -116,8 +116,6 @@ struct __attribute__ ((__packed__)) leafentry {
typedef struct leafentry *LEAFENTRY;
typedef struct leafentry_13 *LEAFENTRY_13;
u_int32_t toku_le_crc(LEAFENTRY v);
size_t leafentry_memsize (LEAFENTRY le); // the size of a leafentry in memory.
size_t leafentry_disksize (LEAFENTRY le); // this is the same as logsizeof_LEAFENTRY. The size of a leafentry on disk.
void wbuf_LEAFENTRY(struct wbuf *w, LEAFENTRY le);
......
......@@ -516,7 +516,7 @@ generate_logprint (void) {
fprintf(pf, " u_int32_t actual_murmur = x1764_finish(&checksum);\n");
fprintf(pf, " r = toku_fread_u_int32_t_nocrclen (f, &crc_in_file); len+=4; if (r!=0) return r;\n");
fprintf(pf, " fprintf(outf, \" crc=%%08x\", crc_in_file);\n");
fprintf(pf, " if (crc_in_file!=actual_murmur) fprintf(outf, \" actual_fingerprint=%%08x\", actual_murmur);\n");
fprintf(pf, " if (crc_in_file!=actual_murmur) fprintf(outf, \" checksum=%%08x\", actual_murmur);\n");
fprintf(pf, " r = toku_fread_u_int32_t_nocrclen (f, &len_in_file); len+=4; if (r!=0) return r;\n");
fprintf(pf, " fprintf(outf, \" len=%%u\", len_in_file);\n");
fprintf(pf, " if (len_in_file!=len) fprintf(outf, \" actual_len=%%u\", len);\n");
......
......@@ -23,7 +23,6 @@ test_serialize_nonleaf(void) {
int fd = open(__FILE__ ".brt", O_RDWR|O_CREAT|O_BINARY, S_IRWXU|S_IRWXG|S_IRWXO); assert(fd >= 0);
int r;
const u_int32_t randval = random();
// source_brt.fd=fd;
char *hello_string;
......@@ -34,8 +33,6 @@ test_serialize_nonleaf(void) {
sn.layout_version = BRT_LAYOUT_VERSION;
sn.layout_version_original = BRT_LAYOUT_VERSION;
sn.height = 1;
sn.rand4fingerprint = randval;
sn.local_fingerprint = 0;
sn.u.n.n_children = 2;
hello_string = toku_strdup("hello");
MALLOC_N(2, sn.u.n.childinfos);
......@@ -44,8 +41,6 @@ test_serialize_nonleaf(void) {
sn.u.n.totalchildkeylens = 6;
BNC_BLOCKNUM(&sn, 0).b = 30;
BNC_BLOCKNUM(&sn, 1).b = 35;
BNC_SUBTREE_FINGERPRINT(&sn, 0) = random();
BNC_SUBTREE_FINGERPRINT(&sn, 1) = random();
BNC_SUBTREE_ESTIMATES(&sn, 0).ndata = random() + (((long long)random())<<32);
BNC_SUBTREE_ESTIMATES(&sn, 1).ndata = random() + (((long long)random())<<32);
BNC_SUBTREE_ESTIMATES(&sn, 0).nkeys = random() + (((long long)random())<<32);
......@@ -65,9 +60,9 @@ test_serialize_nonleaf(void) {
r = xids_create_child(xids_123, &xids_234, (TXNID)234);
CKERR(r);
r = toku_fifo_enq(BNC_BUFFER(&sn,0), "a", 2, "aval", 5, BRT_NONE, xids_0); assert(r==0); sn.local_fingerprint += randval*toku_calc_fingerprint_cmd(BRT_NONE, xids_0, "a", 2, "aval", 5);
r = toku_fifo_enq(BNC_BUFFER(&sn,0), "b", 2, "bval", 5, BRT_NONE, xids_123); assert(r==0); sn.local_fingerprint += randval*toku_calc_fingerprint_cmd(BRT_NONE, xids_123, "b", 2, "bval", 5);
r = toku_fifo_enq(BNC_BUFFER(&sn,1), "x", 2, "xval", 5, BRT_NONE, xids_234); assert(r==0); sn.local_fingerprint += randval*toku_calc_fingerprint_cmd(BRT_NONE, xids_234, "x", 2, "xval", 5);
r = toku_fifo_enq(BNC_BUFFER(&sn,0), "a", 2, "aval", 5, BRT_NONE, xids_0); assert(r==0);
r = toku_fifo_enq(BNC_BUFFER(&sn,0), "b", 2, "bval", 5, BRT_NONE, xids_123); assert(r==0);
r = toku_fifo_enq(BNC_BUFFER(&sn,1), "x", 2, "xval", 5, BRT_NONE, xids_234); assert(r==0);
BNC_NBYTESINBUF(&sn, 0) = 2*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+2+5) + xids_get_serialize_size(xids_0) + xids_get_serialize_size(xids_123);
BNC_NBYTESINBUF(&sn, 1) = 1*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+2+5) + xids_get_serialize_size(xids_234);
sn.u.n.n_bytes_in_buffers = 3*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+2+5) + xids_get_serialize_size(xids_0) + xids_get_serialize_size(xids_123) + xids_get_serialize_size(xids_234);
......@@ -112,23 +107,12 @@ test_serialize_nonleaf(void) {
assert(dn->layout_version_original ==BRT_LAYOUT_VERSION);
assert(dn->layout_version_read_from_disk ==BRT_LAYOUT_VERSION);
assert(dn->height == 1);
assert(dn->rand4fingerprint==randval);
assert(dn->u.n.n_children==2);
assert(strcmp(kv_pair_key(dn->u.n.childkeys[0]), "hello")==0);
assert(toku_brt_pivot_key_len(dn->u.n.childkeys[0])==6);
assert(dn->u.n.totalchildkeylens==6);
assert(BNC_BLOCKNUM(dn,0).b==30);
assert(BNC_BLOCKNUM(dn,1).b==35);
{
int i;
for (i=0; i<2; i++) {
assert(BNC_SUBTREE_FINGERPRINT(dn, i)==BNC_SUBTREE_FINGERPRINT(&sn, i));
assert(BNC_SUBTREE_ESTIMATES(dn, i).nkeys==BNC_SUBTREE_ESTIMATES(&sn, i).nkeys);
assert(BNC_SUBTREE_ESTIMATES(dn, i).ndata==BNC_SUBTREE_ESTIMATES(&sn, i).ndata);
assert(BNC_SUBTREE_ESTIMATES(dn, i).dsize==BNC_SUBTREE_ESTIMATES(&sn, i).dsize);
}
assert(dn->local_fingerprint==sn.local_fingerprint);
}
toku_brtnode_free(&dn);
kv_pair_free(sn.u.n.childkeys[0]);
......
......@@ -17,7 +17,6 @@ char *fname;
static void
doit (void) {
BLOCKNUM nodea,nodeb;
u_int32_t fingerprinta=0;
int r;
......@@ -35,11 +34,10 @@ doit (void) {
r = toku_testsetup_leaf(t, &nodea);
assert(r==0);
r = toku_testsetup_nonleaf(t, 1, &nodeb, 1, &nodea, &fingerprinta, 0, 0);
r = toku_testsetup_nonleaf(t, 1, &nodeb, 1, &nodea, 0, 0);
assert(r==0);
u_int32_t fingerprint=0;
r = toku_testsetup_insert_to_nonleaf(t, nodeb, BRT_DELETE_ANY, "hello", 6, 0, 0, &fingerprint);
r = toku_testsetup_insert_to_nonleaf(t, nodeb, BRT_DELETE_ANY, "hello", 6, 0, 0);
assert(r==0);
r = toku_testsetup_root(t, nodeb);
......
......@@ -43,7 +43,6 @@ char *fname;
static void
doit (int ksize __attribute__((__unused__))) {
BLOCKNUM cnodes[BRT_FANOUT], bnode, anode;
u_int32_t fingerprints[BRT_FANOUT];
char *keys[BRT_FANOUT-1];
int keylens[BRT_FANOUT-1];
......@@ -64,12 +63,11 @@ doit (int ksize __attribute__((__unused__))) {
for (i=0; i<BRT_FANOUT; i++) {
r=toku_testsetup_leaf(t, &cnodes[i]);
assert(r==0);
fingerprints[i]=0;
char key[KSIZE+10];
int keylen = 1+snprintf(key, KSIZE, "%08d%0*d", i*10000+1, KSIZE-9, 0);
char val[1];
char vallen=0;
r=toku_testsetup_insert_to_leaf(t, cnodes[i], key, keylen, val, vallen, &fingerprints[i]);
r=toku_testsetup_insert_to_leaf(t, cnodes[i], key, keylen, val, vallen);
assert(r==0);
}
......@@ -80,14 +78,13 @@ doit (int ksize __attribute__((__unused__))) {
keys[i]=toku_strdup(key);
}
r = toku_testsetup_nonleaf(t, 1, &bnode, BRT_FANOUT, cnodes, fingerprints, keys, keylens);
r = toku_testsetup_nonleaf(t, 1, &bnode, BRT_FANOUT, cnodes, keys, keylens);
assert(r==0);
for (i=0; i+1<BRT_FANOUT; i++) {
toku_free(keys[i]);
}
u_int32_t bfingerprint=0;
{
const int magic_size = (NODESIZE-toku_testsetup_get_sersize(t, bnode))/2-25;
//printf("magic_size=%d\n", magic_size);
......@@ -95,22 +92,22 @@ doit (int ksize __attribute__((__unused__))) {
int keylen = 1+snprintf(key, KSIZE, "%08d%0*d", 150002, magic_size, 0);
char val[1];
char vallen=0;
r=toku_testsetup_insert_to_nonleaf(t, bnode, BRT_INSERT, key, keylen, val, vallen, &bfingerprint);
r=toku_testsetup_insert_to_nonleaf(t, bnode, BRT_INSERT, key, keylen, val, vallen);
keylen = 1+snprintf(key, KSIZE, "%08d%0*d", 2, magic_size-1, 0);
r=toku_testsetup_insert_to_nonleaf(t, bnode, BRT_INSERT, key, keylen, val, vallen, &bfingerprint);
r=toku_testsetup_insert_to_nonleaf(t, bnode, BRT_INSERT, key, keylen, val, vallen);
}
//printf("%lld sersize=%d\n", bnode, toku_testsetup_get_sersize(t, bnode));
// Now we have an internal node which has full children and the buffers are nearly full
r = toku_testsetup_nonleaf(t, 2, &anode, 1, &bnode, &bfingerprint, 0, 0);
r = toku_testsetup_nonleaf(t, 2, &anode, 1, &bnode, 0, 0);
assert(r==0);
{
char key[20];
int keylen = 1+snprintf(key, 20, "%08d", 3);
char val[1];
char vallen=0;
r=toku_testsetup_insert_to_nonleaf(t, anode, BRT_INSERT, key, keylen, val, vallen, &bfingerprint);
r=toku_testsetup_insert_to_nonleaf(t, anode, BRT_INSERT, key, keylen, val, vallen);
}
if (0)
{
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment