Commit 44b64807 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Candidate fix for #3173. Refs #3173. [t:3173]

git-svn-id: file:///svn/toku/tokudb@27114 c7de825b-a66e-492c-adef-691d508d4ae1
parent 79cc9245
...@@ -351,7 +351,7 @@ void mempool_release(struct mempool *); // release anything that was not release ...@@ -351,7 +351,7 @@ void mempool_release(struct mempool *); // release anything that was not release
void toku_verify_all_in_mempool(BRTNODE node); void toku_verify_all_in_mempool(BRTNODE node);
int toku_verify_brtnode (BRT brt, BLOCKNUM blocknum, bytevec lorange, ITEMLEN lolen, bytevec hirange, ITEMLEN hilen, int recurse) ; int toku_verify_brtnode (BRT brt, BLOCKNUM blocknum, int height, struct kv_pair *lesser_pivot, struct kv_pair *greatereq_pivot, int recurse);
void toku_brtheader_free (struct brt_header *h); void toku_brtheader_free (struct brt_header *h);
int toku_brtheader_close (CACHEFILE cachefile, int fd, void *header_v, char **error_string, BOOL oplsn_valid, LSN oplsn); int toku_brtheader_close (CACHEFILE cachefile, int fd, void *header_v, char **error_string, BOOL oplsn_valid, LSN oplsn);
......
...@@ -3,19 +3,14 @@ ...@@ -3,19 +3,14 @@
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved." #ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it." #ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
/* Verify a BRT. */ /* Verify a BRT. */
/* Check: /* Check:
* the fingerprint of every node (local check) * The fingerprint of every node (local check)
* the child's fingerprint matches the parent's copy * The child's fingerprint matches the parent's copy (probably don't actually do thi syet)
* the tree is of uniform depth (and the height is correct at every node) * The tree is of uniform depth (and the height is correct at every node)
* For non-dup trees: the values to the left are < the values to the right * For each pivot key: the max of the stuff to the left is <= the pivot key < the min of the stuff to the right.
* and < the pivot * For each leaf node: All the keys are in strictly increasing order.
* For dup trees: the values to the left are <= the values to the right * For each nonleaf node: All the messages have keys that are between the associated pivot keys ( left_pivot_key < message <= right_pivot_key)
* the pivots are < or <= left values (according to the PresentL bit)
* the pivots are > or >= right values (according to the PresentR bit)
*
* Note: We don't yet have DUP trees, so thee checks on duplicate trees are unimplemented. (Nov 1 2007)
*/ */
#include "includes.h" #include "includes.h"
...@@ -49,94 +44,99 @@ static int compare_leafentries (BRT brt, LEAFENTRY a, LEAFENTRY b) { ...@@ -49,94 +44,99 @@ static int compare_leafentries (BRT brt, LEAFENTRY a, LEAFENTRY b) {
toku_fill_dbt(&y, le_key(b), le_keylen(b))); toku_fill_dbt(&y, le_key(b), le_keylen(b)));
return cmp; return cmp;
} }
static int compare_pair_to_leafentry (BRT brt, struct kv_pair *a, LEAFENTRY b) {
// all this because we dont have nested functions DBT x,y;
struct verify_pair_arg { int cmp = brt->compare_fun(brt->db,
BRT brt; toku_fill_dbt(&x, kv_pair_key(a), kv_pair_keylen(a)),
int i; toku_fill_dbt(&y, le_key(b), le_keylen(b)));
bytevec thislorange; return cmp;
ITEMLEN thislolen;
bytevec thishirange;
ITEMLEN thishilen;
int *resultp;
};
struct check_increasing_arg {
BRT brt;
LEAFENTRY prev;
};
// Make sure that they are in increasing order.
static int check_increasing (OMTVALUE lev, u_int32_t idx, void *arg) {
struct check_increasing_arg *ciarg = (struct check_increasing_arg *)arg;
LEAFENTRY v=lev;
LEAFENTRY prev = ciarg->prev;
if (idx>0)
assert(compare_leafentries(ciarg->brt, prev, v)<0);
ciarg->prev=v;
return 0;
} }
int toku_verify_brtnode (BRT brt, BLOCKNUM blocknum, bytevec lorange, ITEMLEN lolen, bytevec hirange, ITEMLEN hilen, int recurse) { int toku_verify_brtnode (BRT brt, BLOCKNUM blocknum, int height,
struct kv_pair *lesser_pivot, // Everything in the subtree should be > lesser_pivot. (lesser_pivot==NULL if there is no lesser pivot.)
struct kv_pair *greatereq_pivot, // Everything in the subtree should be <= lesser_pivot. (lesser_pivot==NULL if there is no lesser pivot.)
int recurse)
{
int result=0; int result=0;
BRTNODE node; BRTNODE node;
void *node_v; void *node_v;
int r;
u_int32_t fullhash = toku_cachetable_hash(brt->cf, blocknum); u_int32_t fullhash = toku_cachetable_hash(brt->cf, blocknum);
if ((r = toku_cachetable_get_and_pin(brt->cf, blocknum, fullhash, &node_v, NULL, {
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt->h))) int r = toku_cachetable_get_and_pin(brt->cf, blocknum, fullhash, &node_v, NULL,
return r; toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt->h);
if (r) return r;
}
//printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v); //printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v);
node=node_v; node=node_v;
assert(node->fullhash==fullhash); assert(node->fullhash==fullhash);
if (height==-1) {
height = node->height;
}
assert(node->height ==height);
verify_local_fingerprint(node); verify_local_fingerprint(node);
if (node->height>0) { if (node->height>0) {
int i; // Verify that all the pivot keys are in order.
for (i=0; i< node->u.n.n_children; i++) { for (int i=0; i<node->u.n.n_children-2; i++) {
bytevec thislorange,thishirange; int compare = compare_pairs(brt, node->u.n.childkeys[i], node->u.n.childkeys[i+1]);
ITEMLEN thislolen, thishilen; assert(compare<0);
if (node->u.n.n_children==0 || i==0) {
thislorange=lorange;
thislolen =lolen;
} else {
thislorange=kv_pair_key(node->u.n.childkeys[i-1]);
thislolen =toku_brt_pivot_key_len(node->u.n.childkeys[i-1]);
}
if (node->u.n.n_children==0 || i+1>=node->u.n.n_children) {
thishirange=hirange;
thishilen =hilen;
} else {
thishirange=kv_pair_key(node->u.n.childkeys[i]);
thishilen =toku_brt_pivot_key_len(node->u.n.childkeys[i]);
} }
// Verify that all the pivot keys are lesser_pivot < pivot <= greatereq_pivot
for (int i=0; i<node->u.n.n_children-1; i++) {
if (lesser_pivot) {
int compare = compare_pairs(brt, lesser_pivot, node->u.n.childkeys[i]);
assert(compare < 0);
} }
//if (lorange) printf("%s:%d lorange=%s\n", __FILE__, __LINE__, (char*)lorange); if (greatereq_pivot) {
//if (hirange) printf("%s:%d lorange=%s\n", __FILE__, __LINE__, (char*)hirange); int compare = compare_pairs(brt, greatereq_pivot, node->u.n.childkeys[i]);
for (i=0; i<node->u.n.n_children-2; i++) { assert(compare >= 0);
assert(compare_pairs(brt, node->u.n.childkeys[i], node->u.n.childkeys[i+1])<0);
} }
for (i=0; i<node->u.n.n_children; i++) {
if (i>0) {
//printf(" %s:%d i=%d %p v=%s\n", __FILE__, __LINE__, i, node->u.n.childkeys[i-1], (char*)kv_pair_key(node->u.n.childkeys[i-1]));
DBT k1,k2,k3;
toku_fill_dbt(&k2, kv_pair_key(node->u.n.childkeys[i-1]), toku_brt_pivot_key_len(node->u.n.childkeys[i-1]));
if (lorange) assert(brt->compare_fun(brt->db, toku_fill_dbt(&k1, lorange, lolen), &k2) <0);
if (hirange) assert(brt->compare_fun(brt->db, &k2, toku_fill_dbt(&k3, hirange, hilen)) <=0);
} }
// Verify that messages in the buffers are in the right place.
{/*nothing*/} // To do later.
// Verify that the subtrees have the right properties.
if (recurse) { if (recurse) {
result|=toku_verify_brtnode(brt, BNC_BLOCKNUM(node, i), for (int i=0; i<node->u.n.n_children; i++) {
(i==0) ? lorange : kv_pair_key(node->u.n.childkeys[i-1]), int r = toku_verify_brtnode(brt, BNC_BLOCKNUM(node, i), height-1,
(i==0) ? lolen : toku_brt_pivot_key_len(node->u.n.childkeys[i-1]), (i==0) ? lesser_pivot : node->u.n.childkeys[i-1],
(i==node->u.n.n_children-1) ? hirange : kv_pair_key(node->u.n.childkeys[i]), (i==node->u.n.n_children-1) ? greatereq_pivot : node->u.n.childkeys[i],
(i==node->u.n.n_children-1) ? hilen : toku_brt_pivot_key_len(node->u.n.childkeys[i]),
recurse); recurse);
assert(r==0);
} }
} }
} else { } else {
struct check_increasing_arg ciarg = { brt , 0 }; /* It's a leaf. Make sure every leaf value is between the pivots, and that the leaf values are sorted. */
toku_omt_iterate(node->u.l.buffer, check_increasing, &ciarg); for (u_int32_t i=0; i<toku_omt_size(node->u.l.buffer); i++) {
OMTVALUE le_v;
{
int r = toku_omt_fetch(node->u.l.buffer, i, &le_v, NULL);
assert(r==0);
}
LEAFENTRY le = le_v;
if (lesser_pivot) {
int compare = compare_pair_to_leafentry(brt, lesser_pivot, le);
assert(compare < 0);
}
if (greatereq_pivot) {
int compare = compare_pair_to_leafentry(brt, greatereq_pivot, le);
assert(compare >= 0);
}
if (0<i) {
OMTVALUE prev_le_v;
int r = toku_omt_fetch(node->u.l.buffer, i-1, &prev_le_v, NULL);
assert(r==0);
LEAFENTRY prev_le = prev_le_v;
int compare = compare_leafentries(brt, prev_le, le);
assert(compare<0);
}
}
}
{
int r = toku_cachetable_unpin(brt->cf, blocknum, fullhash, CACHETABLE_CLEAN, 0);
if (r) return r;
} }
if ((r = toku_cachetable_unpin(brt->cf, blocknum, fullhash, CACHETABLE_CLEAN, 0))) return r;
return result; return result;
} }
...@@ -145,8 +145,10 @@ int toku_verify_brt (BRT brt) { ...@@ -145,8 +145,10 @@ int toku_verify_brt (BRT brt) {
assert(brt->h); assert(brt->h);
u_int32_t root_hash; u_int32_t root_hash;
rootp = toku_calculate_root_offset_pointer(brt, &root_hash); rootp = toku_calculate_root_offset_pointer(brt, &root_hash);
int n_pinned = toku_cachefile_count_pinned(brt->cf, 0); int n_pinned_before = toku_cachefile_count_pinned(brt->cf, 0);
int r = toku_verify_brtnode(brt, *rootp, 0, 0, 0, 0, 1); int r = toku_verify_brtnode(brt, *rootp, -1, NULL, NULL, 1);
assert(n_pinned == toku_cachefile_count_pinned(brt->cf, 0)); int n_pinned_after = toku_cachefile_count_pinned(brt->cf, 0);
assert(n_pinned_before==n_pinned_after); // this may stop working if we release the ydb lock (in some future version of the code).
return r; return r;
} }
...@@ -5298,7 +5298,7 @@ int toku_brt_stat64 (BRT brt, TOKUTXN UU(txn), struct brtstat64_s *s) { ...@@ -5298,7 +5298,7 @@ int toku_brt_stat64 (BRT brt, TOKUTXN UU(txn), struct brtstat64_s *s) {
/* ********************* debugging dump ************************ */ /* ********************* debugging dump ************************ */
static int static int
toku_dump_brtnode (FILE *file, BRT brt, BLOCKNUM blocknum, int depth, bytevec lorange, ITEMLEN lolen, bytevec hirange, ITEMLEN hilen) { toku_dump_brtnode (FILE *file, BRT brt, BLOCKNUM blocknum, int depth, struct kv_pair *lorange, struct kv_pair *hirange) {
int result=0; int result=0;
BRTNODE node; BRTNODE node;
void *node_v; void *node_v;
...@@ -5310,11 +5310,11 @@ toku_dump_brtnode (FILE *file, BRT brt, BLOCKNUM blocknum, int depth, bytevec lo ...@@ -5310,11 +5310,11 @@ toku_dump_brtnode (FILE *file, BRT brt, BLOCKNUM blocknum, int depth, bytevec lo
fprintf(file, "%s:%d pin %p\n", __FILE__, __LINE__, node_v); fprintf(file, "%s:%d pin %p\n", __FILE__, __LINE__, node_v);
node=node_v; node=node_v;
lazy_assert(node->fullhash==fullhash); lazy_assert(node->fullhash==fullhash);
result=toku_verify_brtnode(brt, blocknum, lorange, lolen, hirange, hilen, 0); result=toku_verify_brtnode(brt, blocknum, -1, lorange, hirange, 0);
fprintf(file, "%*sNode=%p\n", depth, "", node); fprintf(file, "%*sNode=%p\n", depth, "", node);
if (node->height>0) { if (node->height>0) {
fprintf(file, "%*sNode %"PRId64" nodesize=%u height=%d n_children=%d n_bytes_in_buffers=%u keyrange=%s %s\n", fprintf(file, "%*sNode %"PRId64" nodesize=%u height=%d n_children=%d n_bytes_in_buffers=%u keyrange=%s %s\n",
depth, "", blocknum.b, node->nodesize, node->height, node->u.n.n_children, node->u.n.n_bytes_in_buffers, (char*)lorange, (char*)hirange); depth, "", blocknum.b, node->nodesize, node->height, node->u.n.n_children, node->u.n.n_bytes_in_buffers, (char*)kv_pair_key(lorange), (char*)kv_pair_key(hirange));
//printf("%s %s\n", lorange ? lorange : "NULL", hirange ? hirange : "NULL"); //printf("%s %s\n", lorange ? lorange : "NULL", hirange ? hirange : "NULL");
{ {
int i; int i;
...@@ -5346,18 +5346,15 @@ toku_dump_brtnode (FILE *file, BRT brt, BLOCKNUM blocknum, int depth, bytevec lo ...@@ -5346,18 +5346,15 @@ toku_dump_brtnode (FILE *file, BRT brt, BLOCKNUM blocknum, int depth, bytevec lo
fprintf(file, "%*spivot %d len=%u %u\n", depth+1, "", i-1, node->u.n.childkeys[i-1]->keylen, (unsigned)toku_dtoh32(*(int*)key)); fprintf(file, "%*spivot %d len=%u %u\n", depth+1, "", i-1, node->u.n.childkeys[i-1]->keylen, (unsigned)toku_dtoh32(*(int*)key));
} }
toku_dump_brtnode(file, brt, BNC_BLOCKNUM(node, i), depth+4, toku_dump_brtnode(file, brt, BNC_BLOCKNUM(node, i), depth+4,
(i==0) ? lorange : node->u.n.childkeys[i-1]->key, (i==0) ? lorange : node->u.n.childkeys[i-1],
(i==0) ? lolen : toku_brt_pivot_key_len(node->u.n.childkeys[i-1]), (i==node->u.n.n_children-1) ? hirange : node->u.n.childkeys[i]);
(i==node->u.n.n_children-1) ? hirange : node->u.n.childkeys[i]->key,
(i==node->u.n.n_children-1) ? hilen : toku_brt_pivot_key_len(node->u.n.childkeys[i])
);
} }
} }
} else { } else {
fprintf(file, "%*sNode %" PRId64 " nodesize=%u height=%d n_bytes_in_buffer=%u keyrange (key only)=", fprintf(file, "%*sNode %" PRId64 " nodesize=%u height=%d n_bytes_in_buffer=%u keyrange (key only)=",
depth, "", blocknum.b, node->nodesize, node->height, node->u.l.n_bytes_in_buffer); depth, "", blocknum.b, node->nodesize, node->height, node->u.l.n_bytes_in_buffer);
if (lorange) { toku_print_BYTESTRING(file, lolen, (void*)lorange); } else { fprintf(file, "-\\infty"); } fprintf(file, " "); if (lorange) { toku_print_BYTESTRING(file, kv_pair_keylen(lorange), kv_pair_key(lorange)); } else { fprintf(file, "-\\infty"); } fprintf(file, " ");
if (hirange) { toku_print_BYTESTRING(file, hilen, (void*)hirange); } else { fprintf(file, "\\infty"); } if (hirange) { toku_print_BYTESTRING(file, kv_pair_keylen(hirange), kv_pair_key(hirange)); } else { fprintf(file, "\\infty"); }
fprintf(file, " est={n=%" PRIu64 " k=%" PRIu64 " s=%" PRIu64 " e=%d}", fprintf(file, " est={n=%" PRIu64 " k=%" PRIu64 " s=%" PRIu64 " e=%d}",
node->u.l.leaf_stats.ndata, node->u.l.leaf_stats.nkeys, node->u.l.leaf_stats.dsize, (int)node->u.l.leaf_stats.exact); node->u.l.leaf_stats.ndata, node->u.l.leaf_stats.nkeys, node->u.l.leaf_stats.dsize, (int)node->u.l.leaf_stats.exact);
fprintf(file, "\n"); fprintf(file, "\n");
...@@ -5386,7 +5383,7 @@ int toku_dump_brt (FILE *f, BRT brt) { ...@@ -5386,7 +5383,7 @@ int toku_dump_brt (FILE *f, BRT brt) {
u_int32_t fullhash; u_int32_t fullhash;
toku_dump_translation_table(f, brt->h->blocktable); toku_dump_translation_table(f, brt->h->blocktable);
rootp = toku_calculate_root_offset_pointer(brt, &fullhash); rootp = toku_calculate_root_offset_pointer(brt, &fullhash);
return toku_dump_brtnode(f, brt, *rootp, 0, 0, 0, 0, 0); return toku_dump_brtnode(f, brt, *rootp, 0, 0, 0);
} }
int toku_brt_truncate (BRT brt) { int toku_brt_truncate (BRT brt) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment