Commit f3e09918 authored by Rich Prohaska's avatar Rich Prohaska

Add a dirty bit to the BRT node object. This bit is set by code

that modifies the state of a BRT node object.  A node is unpinned with
its dirty bit passed to the cache table.



git-svn-id: file:///svn/tokudb@316 c7de825b-a66e-492c-adef-691d508d4ae1
parent 8c958bb6
......@@ -27,6 +27,7 @@ struct brtnode {
unsigned int nodesize;
diskoff thisnodename;
int height; /* height is always >= 0. 0 for leaf, >0 for nonleaf. */
int dirty;
union node {
struct nonleaf {
int n_children; /* if n_children==TREE_FANOUT+1 then the tree needs to be rebalanced. */
......@@ -91,6 +92,10 @@ void brtnode_free (BRTNODE *node);
//int write_brt_header (int fd, struct brt_header *header);
static inline void brtnode_set_dirty(BRTNODE node) {
node->dirty = 1;
}
#if 1
#define DEADBEEF ((void*)0xDEADBEEF)
#else
......
......@@ -173,6 +173,7 @@ int deserialize_brtnode_from (int fd, diskoff off, BRTNODE *brtnode, int nodesiz
result->nodesize = nodesize; // How to compute the nodesize?
result->thisnodename = off;
result->height = rbuf_int(&rc);
result->dirty = 0;
//printf("height==%d\n", result->height);
if (result->height>0) {
result->u.n.totalchildkeylens=0;
......
......@@ -202,6 +202,7 @@ static void initialize_brtnode (BRT t, BRTNODE n, diskoff nodename, int height)
n->nodesize = t->h->nodesize;
n->thisnodename = nodename;
n->height = height;
brtnode_set_dirty(n);
assert(height>=0);
if (height>0) {
n->u.n.n_children = 0;
......@@ -283,6 +284,7 @@ static int insert_to_hash_in_nonleaf (BRTNODE node, int childnum, DBT *k, DBT *v
if (r!=0) return r;
node->u.n.n_bytes_in_hashtable[childnum] += n_bytes_added;
node->u.n.n_bytes_in_hashtables += n_bytes_added;
brtnode_set_dirty(node);
return 0;
}
......@@ -571,6 +573,7 @@ static int push_a_brt_cmd_down (BRT t, BRTNODE node, BRTNODE child, int childnum
int n_bytes_removed = (k->size + v->size + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD);
node->u.n.n_bytes_in_hashtables -= n_bytes_removed;
node->u.n.n_bytes_in_hashtable[childnum] -= n_bytes_removed;
brtnode_set_dirty(node);
}
return 0;
......@@ -608,6 +611,8 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
printf("\n");
}
brtnode_set_dirty(node);
// Slide the children over.
for (cnum=node->u.n.n_children; cnum>childnum+1; cnum--) {
node->u.n.children[cnum] = node->u.n.children[cnum-1];
......@@ -658,9 +663,9 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
toku_hashtable_free(&old_h);
r=cachetable_unpin(t->cf, childa->thisnodename, 1);
r=cachetable_unpin(t->cf, childa->thisnodename, childa->dirty);
assert(r==0);
r=cachetable_unpin(t->cf, childb->thisnodename, 1);
r=cachetable_unpin(t->cf, childb->thisnodename, childb->dirty);
assert(r==0);
......@@ -771,7 +776,7 @@ static int push_some_brt_cmds_down (BRT t, BRTNODE node, int childnum,
if (0) printf("%s:%d done random picking\n", __FILE__, __LINE__);
}
if (debug) printf("%s:%d %*sdone push_some_brt_cmds_down, unpinning %lld\n", __FILE__, __LINE__, debug, "", targetchild);
r=cachetable_unpin(t->cf, targetchild, 1);
r=cachetable_unpin(t->cf, targetchild, child->dirty);
if (r!=0) return r;
*did_split=0;
assert(serialize_brtnode_size(node)<=node->nodesize);
......@@ -850,6 +855,7 @@ static int brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
pma_status = pma_insert(node->u.l.buffer, k, v, db);
node->u.l.n_bytes_in_buffer += k->size + v->size + KEY_VALUE_OVERHEAD;
#endif
brtnode_set_dirty(node);
// If it doesn't fit, then split the leaf.
if (serialize_brtnode_size(node) > node->nodesize) {
int r = brtleaf_split (t, node, nodea, nodeb, splitk, k->app_private, db);
......@@ -878,6 +884,7 @@ static int brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
r = pma_delete(node->u.l.buffer, cmd->u.id.key, cmd->u.id.db);
assert(r == BRT_OK);
node->u.l.n_bytes_in_buffer -= cmd->u.id.key->size + val.size + KEY_VALUE_OVERHEAD;
brtnode_set_dirty(node);
}
*did_split = 0;
return r;
......@@ -946,7 +953,7 @@ static int brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
k->app_private, db);
assert(r == 0);
} else {
r = cachetable_unpin(t->cf, child->thisnodename, 1);
r = cachetable_unpin(t->cf, child->thisnodename, child->dirty);
assert(r == 0);
}
......@@ -967,6 +974,7 @@ static int brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
assert(r==0);
node->u.n.n_bytes_in_hashtables -= diff;
node->u.n.n_bytes_in_hashtable[childnum] -= diff;
brtnode_set_dirty(node);
}
{
int child_did_split;
......@@ -982,7 +990,7 @@ static int brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
k->app_private, db);
if (r!=0) return r;
} else {
cachetable_unpin(t->cf, child->thisnodename, 1);
cachetable_unpin(t->cf, child->thisnodename, child->dirty);
*did_split = 0;
}
}
......@@ -997,7 +1005,8 @@ static int brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
int diff = k->size + olddatalen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD;
assert(r==0);
node->u.n.n_bytes_in_hashtables -= diff;
node->u.n.n_bytes_in_hashtable[childnum] -= diff;
node->u.n.n_bytes_in_hashtable[childnum] -= diff;
brtnode_set_dirty(node);
//printf("%s:%d deleted %d bytes\n", __FILE__, __LINE__, diff);
}
{
......@@ -1006,6 +1015,7 @@ static int brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
assert(r==0);
node->u.n.n_bytes_in_hashtables += diff;
node->u.n.n_bytes_in_hashtable[childnum] += diff;
brtnode_set_dirty(node);
}
if (debug) printf("%s:%d %*sDoing maybe_push_down\n", __FILE__, __LINE__, debug, "");
int r = brtnode_maybe_push_down(t, node, did_split, nodea, nodeb, splitk, debugp1(debug), k->app_private, db);
......@@ -1053,7 +1063,7 @@ int brt_create_cachetable (CACHETABLE *ct, int cachelines) {
static int setup_brt_root_node (BRT t, diskoff offset) {
int r;
BRTNODE MALLOC(node);
TAGMALLOC(BRTNODE, node);
assert(node);
//printf("%s:%d\n", __FILE__, __LINE__);
initialize_brtnode(t, node,
......@@ -1071,7 +1081,7 @@ static int setup_brt_root_node (BRT t, diskoff offset) {
}
//printf("%s:%d created %lld\n", __FILE__, __LINE__, node->thisnodename);
verify_counts(node);
r=cachetable_unpin(t->cf, node->thisnodename, 1);
r=cachetable_unpin(t->cf, node->thisnodename, node->dirty);
if (r!=0) {
toku_free(node);
return r;
......@@ -1227,7 +1237,7 @@ CACHEKEY* calculate_root_offset_pointer (BRT brt) {
}
int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, CACHEKEY *rootp) {
BRTNODE MALLOC(newroot);
TAGMALLOC(BRTNODE, newroot);
int r;
diskoff newroot_diskoff=malloc_diskblock(brt, brt->h->nodesize);
assert(newroot);
......@@ -1245,8 +1255,8 @@ int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, CACHEKE
r=toku_hashtable_create(&newroot->u.n.htables[0]); if (r!=0) return r;
r=toku_hashtable_create(&newroot->u.n.htables[1]); if (r!=0) return r;
verify_counts(newroot);
r=cachetable_unpin(brt->cf, nodea->thisnodename, 1); if (r!=0) return r;
r=cachetable_unpin(brt->cf, nodeb->thisnodename, 1); if (r!=0) return r;
r=cachetable_unpin(brt->cf, nodea->thisnodename, nodea->dirty); if (r!=0) return r;
r=cachetable_unpin(brt->cf, nodeb->thisnodename, nodeb->dirty); if (r!=0) return r;
//printf("%s:%d put %lld\n", __FILE__, __LINE__, brt->root);
cachetable_put(brt->cf, newroot_diskoff, newroot,
brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)brt->h->nodesize);
......@@ -1287,14 +1297,17 @@ int brt_root_put_cmd(BRT brt, BRT_CMD *cmd) {
if (nodeb->height>0) assert(nodeb->u.n.children[nodeb->u.n.n_children-1]!=0);
assert(nodeb->nodesize>0);
}
int dirty;
if (did_split) {
r = brt_init_new_root(brt, nodea, nodeb, splitk, rootp);
assert(r == 0);
dirty = 1;
} else {
if (node->height>0)
assert(node->u.n.n_children<=TREE_FANOUT);
dirty = node->dirty;
}
cachetable_unpin(brt->cf, *rootp, 1);
cachetable_unpin(brt->cf, *rootp, dirty);
r = unpin_brt_header(brt);
assert(r == 0);
//assert(0==cachetable_assert_all_unpinned(brt->cachetable));
......@@ -1775,7 +1788,7 @@ void brt_cursor_leaf_split(BRT_CURSOR cursor, BRT t, BRTNODE oldnode, BRTNODE le
if (0) printf("brt_cursor_leaf_split %p oldnode %lld newnode %lld\n", cursor,
oldnode->thisnodename, newnode->thisnodename);
r = cachetable_unpin(t->cf, oldnode->thisnodename, 1);
r = cachetable_unpin(t->cf, oldnode->thisnodename, oldnode->dirty);
assert(r == 0);
r = cachetable_maybe_get_and_pin(t->cf, newnode->thisnodename, &v);
assert(r == 0 && v == newnode);
......@@ -1855,7 +1868,7 @@ void brt_cursor_nonleaf_split(BRT_CURSOR cursor, BRT t, BRTNODE oldnode, BRTNODE
if (0) printf("brt_cursor_nonleaf_split %p oldnode %lld newnode %lld\n",
cursor, oldnode->thisnodename, newnode->thisnodename);
r = cachetable_unpin(t->cf, oldnode->thisnodename, 1);
r = cachetable_unpin(t->cf, oldnode->thisnodename, oldnode->dirty);
assert(r == 0);
r = cachetable_maybe_get_and_pin(t->cf, newnode->thisnodename, &v);
assert(r == 0 && v == newnode);
......@@ -2062,7 +2075,7 @@ int brtcurs_set_position_next2(BRT_CURSOR cursor) {
node = cursor->path[cursor->path_len-1];
childnum = cursor->pathcnum[cursor->path_len-1];
cursor->path_len -= 1;
cachetable_unpin(cursor->brt->cf, node->thisnodename, 1);
cachetable_unpin(cursor->brt->cf, node->thisnodename, node->dirty);
if (brt_cursor_path_empty(cursor))
return DB_NOTFOUND;
......@@ -2122,7 +2135,7 @@ int brtcurs_set_position_prev2(BRT_CURSOR cursor) {
node = cursor->path[cursor->path_len-1];
childnum = cursor->pathcnum[cursor->path_len-1];
cursor->path_len -= 1;
cachetable_unpin(cursor->brt->cf, node->thisnodename, 1);
cachetable_unpin(cursor->brt->cf, node->thisnodename, node->dirty);
if (brt_cursor_path_empty(cursor))
return DB_NOTFOUND;
......@@ -2223,7 +2236,7 @@ int brtcurs_set_key(BRT_CURSOR cursor, diskoff off, DBT *key, DBT *val, int flag
if (r != 0) {
cursor->path_len -= 1;
cachetable_unpin(brt->cf, off, 1);
cachetable_unpin(brt->cf, off, node->dirty);
}
return r;
}
......@@ -2285,7 +2298,7 @@ int brtcurs_set_range(BRT_CURSOR cursor, diskoff off, DBT *key, DB *db) {
if (r != 0) {
cursor->path_len -= 1;
cachetable_unpin(brt->cf, off, 1);
cachetable_unpin(brt->cf, off, node->dirty);
}
return r;
}
......@@ -2297,7 +2310,7 @@ static int unpin_cursor (BRT_CURSOR cursor) {
for (i=0; i<cursor->path_len; i++) {
BRTNODE node = cursor->path[i];
brt_node_remove_cursor(node, cursor->pathcnum[i], cursor);
int r2 = cachetable_unpin(brt->cf, cursor->path[i]->thisnodename, 1);
int r2 = cachetable_unpin(brt->cf, node->thisnodename, node->dirty);
if (r==0) r=r2;
}
if (cursor->pmacurs) {
......@@ -2414,8 +2427,10 @@ int brt_cursor_delete(BRT_CURSOR cursor, int flags __attribute__((__unused__)))
assert(node->height == 0);
int kvsize;
r = pma_cursor_delete_under(cursor->pmacurs, &kvsize);
if (r == 0)
if (r == 0) {
node->u.l.n_bytes_in_buffer -= KEY_VALUE_OVERHEAD + kvsize;
brtnode_set_dirty(node);
}
} else
r = DB_NOTFOUND;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment