Commit 987c03b9 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

merge tokudb revision 7776-7883 to toudb.1032b. closes #1260

git-svn-id: file:///svn/toku/tokudb.1032b@7888 c7de825b-a66e-492c-adef-691d508d4ae1
parent add8a16f
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
#include <assert.h> #include <assert.h>
#include <db_cxx.h> #include <db_cxx.h>
static void hexdump(Dbt *d) { static inline void hexdump(Dbt *d) {
unsigned char *cp = (unsigned char *) d->get_data(); unsigned char *cp = (unsigned char *) d->get_data();
int n = d->get_size(); int n = d->get_size();
printf(" "); printf(" ");
......
...@@ -11,10 +11,9 @@ ...@@ -11,10 +11,9 @@
#include "list.h" #include "list.h"
#include "mempool.h" #include "mempool.h"
#include "kv-pair.h" #include "kv-pair.h"
#include "leafentry.h"
typedef void *OMTVALUE; typedef void *OMTVALUE;
#include "omt.h" #include "omt.h"
#include "leafentry.h"
#ifndef BRT_FANOUT #ifndef BRT_FANOUT
#define BRT_FANOUT 16 #define BRT_FANOUT 16
...@@ -263,7 +262,13 @@ int toku_cmd_leafval_heaviside (OMTVALUE leafentry, void *extra); ...@@ -263,7 +262,13 @@ int toku_cmd_leafval_heaviside (OMTVALUE leafentry, void *extra);
int toku_brt_root_put_cmd(BRT brt, BRT_CMD cmd, TOKULOGGER logger); int toku_brt_root_put_cmd(BRT brt, BRT_CMD cmd, TOKULOGGER logger);
int toku_cachefile_root_put_cmd (CACHEFILE cf, BRT_CMD cmd, TOKULOGGER logger); int toku_cachefile_root_put_cmd (CACHEFILE cf, BRT_CMD cmd, TOKULOGGER logger);
void *mempool_malloc_from_omt(OMT omt, struct mempool *mp, size_t size); void *mempool_malloc_from_omt(OMT omt, struct mempool *mp, size_t size, void **maybe_free);
// Effect: Allocate a new object of size SIZE in MP. If MP runs out of space, allocate new a new mempool space, and copy all the items
// from the OMT (which items refer to items in the old mempool) into the new mempool.
// If MAYBE_FREE is NULL then free the old mempool's space.
// Otherwise, store the old mempool's space in maybe_free.
void mempool_release(struct mempool *); // release anything that was not released when the ..._norelease function was called.
void toku_verify_all_in_mempool(BRTNODE node); void toku_verify_all_in_mempool(BRTNODE node);
...@@ -281,6 +286,7 @@ enum brt_layout_version_e { ...@@ -281,6 +286,7 @@ enum brt_layout_version_e {
void toku_brtheader_free (struct brt_header *h); void toku_brtheader_free (struct brt_header *h);
int toku_brtheader_close (CACHEFILE cachefile, void *header_v); int toku_brtheader_close (CACHEFILE cachefile, void *header_v);
int toku_brtheader_checkpoint (CACHEFILE cachefile, void *header_v);
#define BLOCK_ALLOCATOR_ALIGNMENT 4096 #define BLOCK_ALLOCATOR_ALIGNMENT 4096
// How much must be reserved at the beginning for the block? // How much must be reserved at the beginning for the block?
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
/* Buffered repository tree.
* Observation: The in-memory representation of a node doesn't have to be the same as the on-disk representation.
* Goal for the in-memory representation: fast
* Goal for on-disk: small
*
* So to get this running fast, I'll make a version that doesn't do range queries:
* use a hash table for in-memory
* simply write the strings on disk.
* Later I'll do a PMA or a skiplist for the in-memory version.
* Also, later I'll convert the format to network order fromn host order.
* Later, for on disk, I'll compress it (perhaps with gzip, perhaps with the bzip2 algorithm.)
*
* The collection of nodes forms a data structure like a B-tree. The complexities of keeping it balanced apply.
*
* We always write nodes to a new location on disk.
* The nodes themselves contain the information about the tree structure.
* Q: During recovery, how do we find the root node without looking at every block on disk?
* A: The root node is either the designated root near the front of the freelist.
* The freelist is updated infrequently. Before updating the stable copy of the freelist, we make sure that
* the root is up-to-date. We can make the freelist-and-root update be an arbitrarily small fraction of disk bandwidth.
*
*/
#include "includes.h"
long long n_items_malloced;
static void verify_local_fingerprint_nonleaf (BRTNODE node);
static int toku_dump_brtnode (BRT brt, BLOCKNUM blocknum, int depth, bytevec lorange, ITEMLEN lolen, bytevec hirange, ITEMLEN hilen);
typedef struct kvpair {
bytevec key;
unsigned int keylen;
bytevec val;
unsigned int vallen;
} *KVPAIR;
// Simple LCG random number generator. Not high quality, but good enough.
static int r_seeded=0;
static u_int32_t rstate=1;
static inline void mysrandom (int s) {
rstate=s;
r_seeded=1;
}
static inline u_int32_t myrandom (void) {
if (!r_seeded) {
struct timeval tv;
gettimeofday(&tv, 0);
mysrandom(tv.tv_sec);
}
rstate = (279470275ull*(u_int64_t)rstate)%4294967291ull;
return rstate;
}
static int
handle_split_of_child_simple (BRT t, BRTNODE node, int childnum,
BRTNODE childa, BRTNODE childb,
DBT *splitk, /* the data in the childsplitk is previously alloc'd and is consumed by this call. */
TOKULOGGER logger);
static int brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, TOKULOGGER logger);
//#define MAX_PATHLEN_TO_ROOT 40
static const char *unparse_cmd_type (enum brt_cmd_type typ) __attribute__((__unused__));
static const char *unparse_cmd_type (enum brt_cmd_type typ) {
switch (typ) {
case BRT_NONE: return "NONE";
case BRT_INSERT: return "INSERT";
case BRT_DELETE_ANY: return "DELETE_ANY";
case BRT_DELETE_BOTH: return "DELETE_BOTH";
case BRT_ABORT_ANY: return "ABORT_ANY";
case BRT_ABORT_BOTH: return "ABORT_BOTH";
case BRT_COMMIT_ANY: return "COMMIT_ANY";
case BRT_COMMIT_BOTH: return "COMMIT_BOTH";
}
return "?";
}
static int brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb,
DBT *split,
TOKULOGGER);
static int
brtnode_put_cmd_simple (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger,
BOOL *should_split, BOOL *should_merge);
// The maximum row size is 16KB according to the PRD. That means the max pivot key size is 16KB.
#define MAX_PIVOT_KEY_SIZE (1<<14)
/* key is not in the buffer. Either put the key-value pair in the child, or put it in the node. */
static int push_brt_cmd_down_only_if_it_wont_push_more_else_put_here (BRT t, BRTNODE node, BRTNODE child,
BRT_CMD cmd,
int childnum_of_node,
TOKULOGGER logger) {
assert(node->height>0); /* Not a leaf. */
DBT *k = cmd->u.id.key;
DBT *v = cmd->u.id.val;
unsigned int oldsize = toku_serialize_brtnode_size(child);
unsigned int newsize_bounded = oldsize + k->size + v->size + KEY_VALUE_OVERHEAD + LE_OVERHEAD_BOUND + MAX_PIVOT_KEY_SIZE;
newsize_bounded += (child->height > 0) ? BRT_CMD_OVERHEAD : OMT_ITEM_OVERHEAD;
int to_child = newsize_bounded <= child->nodesize;
if (0) {
printf("%s:%d pushing %s to %s %d", __FILE__, __LINE__, (char*)k->data, to_child? "child" : "hash", childnum_of_node);
if (childnum_of_node+1<node->u.n.n_children) {
DBT k2;
printf(" nextsplitkey=%s\n", (char*)node->u.n.childkeys[childnum_of_node]);
assert(t->compare_fun(t->db, k, toku_fill_dbt(&k2, node->u.n.childkeys[childnum_of_node], toku_brt_pivot_key_len(t, node->u.n.childkeys[childnum_of_node])))<=0);
} else {
printf("\n");
}
}
int r;
if (to_child) {
int again_split=-1; BRTNODE againa,againb;
DBT againk;
toku_init_dbt(&againk);
//printf("%s:%d hello!\n", __FILE__, __LINE__);
r = brtnode_put_cmd(t, child, cmd,
&again_split, &againa, &againb, &againk,
logger);
if (r!=0) return r;
assert(again_split==0); /* I only did the insert if I knew it wouldn't push down, and hence wouldn't split. */
} else {
r=insert_to_buffer_in_nonleaf(node, childnum_of_node, k, v, cmd->type, cmd->xid);
}
if (newsize_bounded < toku_serialize_brtnode_size(child)) {
fprintf(stderr, "%s:%d size estimate is messed up. newsize_bounded=%u actual_size=%u child_height=%d to_child=%d\n",
__FILE__, __LINE__, newsize_bounded, toku_serialize_brtnode_size(child), child->height, to_child);
fprintf(stderr, " cmd->type=%s cmd->xid=%llu\n", unparse_cmd_type(cmd->type), (unsigned long long)cmd->xid);
fprintf(stderr, " oldsize=%u k->size=%u v->size=%u\n", oldsize, k->size, v->size);
assert(toku_serialize_brtnode_size(child)<=child->nodesize);
//assert(newsize_bounded >= toku_serialize_brtnode_size(child)); // Don't abort on this
}
fixup_child_fingerprint(node, childnum_of_node, child, t, logger);
return r;
}
static int push_a_brt_cmd_down_simple (BRT t, BRTNODE node, BRTNODE child, int childnum,
BRT_CMD cmd,
BOOL *must_split, BOOL *must_merge,
TOKULOGGER logger) {
//if (debug) printf("%s:%d %*sinserting down\n", __FILE__, __LINE__, debug, "");
//printf("%s:%d hello!\n", __FILE__, __LINE__);
assert(node->height>0);
{
int r = brtnode_put_cmd_simple(t, child, cmd, logger,
must_split, must_merge);
if (r!=0) return r;
}
DBT *k = cmd->u.id.key;
DBT *v = cmd->u.id.val;
//if (debug) printf("%s:%d %*sinserted down child_did_split=%d\n", __FILE__, __LINE__, debug, "", child_did_split);
u_int32_t old_fingerprint = node->local_fingerprint;
u_int32_t new_fingerprint = old_fingerprint - node->rand4fingerprint*toku_calc_fingerprint_cmdstruct(cmd);
node->local_fingerprint = new_fingerprint;
if (t->txn_that_created != cmd->xid) {
int r = toku_log_brtdeq(logger, &node->log_lsn, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum);
assert(r==0);
}
{
int r = toku_fifo_deq(BNC_BUFFER(node,childnum));
//printf("%s:%d deleted status=%d\n", __FILE__, __LINE__, r);
if (r!=0) return r;
}
{
int n_bytes_removed = (k->size + v->size + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD);
node->u.n.n_bytes_in_buffers -= n_bytes_removed;
BNC_NBYTESINBUF(node, childnum) -= n_bytes_removed;
node->dirty = 1;
}
fixup_child_fingerprint(node, childnum, child, t, logger);
return 0;
}
static int push_a_brt_cmd_down (BRT t, BRTNODE node, BRTNODE child, int childnum,
BRT_CMD cmd,
int *child_did_split, BRTNODE *childa, BRTNODE *childb,
DBT *childsplitk,
TOKULOGGER logger) {
//if (debug) printf("%s:%d %*sinserting down\n", __FILE__, __LINE__, debug, "");
//printf("%s:%d hello!\n", __FILE__, __LINE__);
assert(node->height>0);
{
int r = brtnode_put_cmd(t, child, cmd,
child_did_split, childa, childb, childsplitk,
logger);
if (r!=0) return r;
}
DBT *k = cmd->u.id.key;
DBT *v = cmd->u.id.val;
//if (debug) printf("%s:%d %*sinserted down child_did_split=%d\n", __FILE__, __LINE__, debug, "", child_did_split);
u_int32_t old_fingerprint = node->local_fingerprint;
u_int32_t new_fingerprint = old_fingerprint - node->rand4fingerprint*toku_calc_fingerprint_cmdstruct(cmd);
node->local_fingerprint = new_fingerprint;
if (t->txn_that_created != cmd->xid) {
int r = toku_log_brtdeq(logger, &node->log_lsn, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum);
assert(r==0);
}
{
int r = toku_fifo_deq(BNC_BUFFER(node,childnum));
//printf("%s:%d deleted status=%d\n", __FILE__, __LINE__, r);
if (r!=0) return r;
}
{
int n_bytes_removed = (k->size + v->size + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD);
node->u.n.n_bytes_in_buffers -= n_bytes_removed;
BNC_NBYTESINBUF(node, childnum) -= n_bytes_removed;
node->dirty = 1;
}
if (*child_did_split) {
// Don't try to fix these up.
//fixup_child_fingerprint(node, childnum, *childa, t, logger);
//fixup_child_fingerprint(node, childnum+1, *childb, t, logger);
} else {
fixup_child_fingerprint(node, childnum, child, t, logger);
}
return 0;
}
static int brtnode_maybe_push_down(BRT t, BRTNODE node, int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, TOKULOGGER logger);
static int split_count=0;
/* NODE is a node with a child.
* childnum was split into two nodes childa, and childb. childa is the same as the original child. childb is a new child.
* We must slide things around, & move things from the old table to the new tables.
* We also move things to the new children as much as we can without doing any pushdowns or splitting of the child.
* We must delete the old buffer (but the old child is already deleted.)
* We also unpin the new children.
*/
static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
BRTNODE childa, BRTNODE childb,
DBT *childsplitk, /* the data in the childsplitk is alloc'd and is consumed by this call. */
int *did_split, BRTNODE *nodea, BRTNODE *nodeb,
DBT *splitk,
TOKULOGGER logger) {
assert(node->height>0);
assert(0 <= childnum && childnum < node->u.n.n_children);
FIFO old_h = BNC_BUFFER(node,childnum);
int old_count = BNC_NBYTESINBUF(node, childnum);
int cnum;
int r;
assert(node->u.n.n_children<=TREE_FANOUT);
if (toku_brt_debug_mode) {
int i;
printf("%s:%d Child %d did split on %s\n", __FILE__, __LINE__, childnum, (char*)childsplitk->data);
printf("%s:%d oldsplitkeys:", __FILE__, __LINE__);
for(i=0; i<node->u.n.n_children-1; i++) printf(" %s", (char*)node->u.n.childkeys[i]);
printf("\n");
}
node->dirty = 1;
//verify_local_fingerprint_nonleaf(node);
REALLOC_N(node->u.n.n_children+2, node->u.n.childinfos);
REALLOC_N(node->u.n.n_children+1, node->u.n.childkeys);
// Slide the children over.
BNC_SUBTREE_FINGERPRINT (node, node->u.n.n_children+1)=0;
BNC_SUBTREE_LEAFENTRY_ESTIMATE(node, node->u.n.n_children+1)=0;
for (cnum=node->u.n.n_children; cnum>childnum+1; cnum--) {
node->u.n.childinfos[cnum] = node->u.n.childinfos[cnum-1];
}
r = toku_log_addchild(logger, (LSN*)0, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum+1, childb->thisnodename, 0);
node->u.n.n_children++;
assert(BNC_BLOCKNUM(node, childnum).b==childa->thisnodename.b); // use the same child
BNC_BLOCKNUM(node, childnum+1) = childb->thisnodename;
BNC_HAVE_FULLHASH(node, childnum+1) = TRUE;
BNC_FULLHASH(node, childnum+1) = childb->fullhash;
// BNC_SUBTREE_FINGERPRINT(node, childnum)=0; // leave the subtreefingerprint alone for the child, so we can log the change
BNC_SUBTREE_FINGERPRINT (node, childnum+1)=0;
BNC_SUBTREE_LEAFENTRY_ESTIMATE(node, childnum+1)=0;
fixup_child_fingerprint(node, childnum, childa, t, logger);
fixup_child_fingerprint(node, childnum+1, childb, t, logger);
r=toku_fifo_create(&BNC_BUFFER(node,childnum+1)); assert(r==0);
//verify_local_fingerprint_nonleaf(node); // The fingerprint hasn't changed and everhything is still there.
r=toku_fifo_create(&BNC_BUFFER(node,childnum)); assert(r==0); // ??? SHould handle this error case
BNC_NBYTESINBUF(node, childnum) = 0;
BNC_NBYTESINBUF(node, childnum+1) = 0;
// Remove all the cmds from the local fingerprint. Some may get added in again when we try to push to the child.
FIFO_ITERATE(old_h, skey, skeylen, sval, svallen, type, xid,
{
u_int32_t old_fingerprint = node->local_fingerprint;
u_int32_t new_fingerprint = old_fingerprint - node->rand4fingerprint*toku_calc_fingerprint_cmd(type, xid, skey, skeylen, sval, svallen);
if (t->txn_that_created != xid) {
r = toku_log_brtdeq(logger, &node->log_lsn, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum);
assert(r==0);
}
node->local_fingerprint = new_fingerprint;
});
//verify_local_fingerprint_nonleaf(node);
// Slide the keys over
{
struct kv_pair *pivot = childsplitk->data;
BYTESTRING bs = { .len = childsplitk->size,
.data = kv_pair_key(pivot) };
r = toku_log_setpivot(logger, (LSN*)0, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum, bs);
if (r!=0) return r;
for (cnum=node->u.n.n_children-2; cnum>childnum; cnum--) {
node->u.n.childkeys[cnum] = node->u.n.childkeys[cnum-1];
}
//if (logger) assert((t->flags&TOKU_DB_DUPSORT)==0); // the setpivot is wrong for TOKU_DB_DUPSORT, so recovery will be broken.
node->u.n.childkeys[childnum]= pivot;
node->u.n.totalchildkeylens += toku_brt_pivot_key_len(t, pivot);
}
if (toku_brt_debug_mode) {
int i;
printf("%s:%d splitkeys:", __FILE__, __LINE__);
for(i=0; i<node->u.n.n_children-2; i++) printf(" %s", (char*)node->u.n.childkeys[i]);
printf("\n");
}
//verify_local_fingerprint_nonleaf(node);
node->u.n.n_bytes_in_buffers -= old_count; /* By default, they are all removed. We might add them back in. */
/* Keep pushing to the children, but not if the children would require a pushdown */
FIFO_ITERATE(old_h, skey, skeylen, sval, svallen, type, xid, {
DBT skd; DBT svd;
BRT_CMD_S brtcmd = build_brt_cmd((enum brt_cmd_type)type, xid, toku_fill_dbt(&skd, skey, skeylen), toku_fill_dbt(&svd, sval, svallen));
//verify_local_fingerprint_nonleaf(childa); verify_local_fingerprint_nonleaf(childb);
int pusha = 0; int pushb = 0;
switch (type) {
case BRT_INSERT:
case BRT_DELETE_BOTH:
case BRT_DELETE_ANY:
case BRT_ABORT_BOTH:
case BRT_ABORT_ANY:
case BRT_COMMIT_BOTH:
case BRT_COMMIT_ANY:
if ((type!=BRT_DELETE_ANY && type!=BRT_ABORT_ANY && type!=BRT_COMMIT_ANY) || 0==(t->flags&TOKU_DB_DUPSORT)) {
// If it's an INSERT or DELETE_BOTH or there are no duplicates then we just put the command into one subtree
int cmp = brt_compare_pivot(t, &skd, &svd, childsplitk->data);
if (cmp <= 0) pusha = 1;
else pushb = 1;
} else {
assert((type==BRT_DELETE_ANY || type==BRT_ABORT_ANY || type==BRT_COMMIT_ANY) && t->flags&TOKU_DB_DUPSORT);
// It is a DELETE or ABORT_ANY and it's a DUPSORT database,
// in which case if the comparison function comes up 0 we must write the command to both children. (See #201)
int cmp = brt_compare_pivot(t, &skd, 0, childsplitk->data);
if (cmp<=0) pusha=1;
if (cmp>=0) pushb=1; // Could be that both pusha and pushb are set
}
if (pusha) {
// If we already have something in the buffer, we must add the new command to the buffer so that commands don't get out of order.
if (toku_fifo_n_entries(BNC_BUFFER(node,childnum))==0) {
r=push_brt_cmd_down_only_if_it_wont_push_more_else_put_here(t, node, childa, &brtcmd, childnum, logger);
} else {
r=insert_to_buffer_in_nonleaf(node, childnum, &skd, &svd, type, xid);
}
}
if (pushb) {
// If we already have something in the buffer, we must add the new command to the buffer so that commands don't get out of order.
if (toku_fifo_n_entries(BNC_BUFFER(node,childnum+1))==0) {
r=push_brt_cmd_down_only_if_it_wont_push_more_else_put_here(t, node, childb, &brtcmd, childnum+1, logger);
} else {
r=insert_to_buffer_in_nonleaf(node, childnum+1, &skd, &svd, type, xid);
}
}
//verify_local_fingerprint_nonleaf(childa); verify_local_fingerprint_nonleaf(childb);
if (r!=0) printf("r=%d\n", r);
assert(r==0);
goto ok;
case BRT_NONE:
// Don't have to do anything in this case, can just drop the command
goto ok;
}
printf("Bad type %d\n", type); // Don't use default: because I want a compiler warning if I forget a enum case, and I want a runtime error if the type isn't one of the expected ones.
assert(0);
ok: /*nothing*/;
});
toku_fifo_free(&old_h);
//verify_local_fingerprint_nonleaf(childa);
//verify_local_fingerprint_nonleaf(childb);
//verify_local_fingerprint_nonleaf(node);
VERIFY_NODE(t, node);
VERIFY_NODE(t, childa);
VERIFY_NODE(t, childb);
r=toku_unpin_brtnode(t, childa);
assert(r==0);
r=toku_unpin_brtnode(t, childb);
assert(r==0);
if (node->u.n.n_children>TREE_FANOUT) {
//printf("%s:%d about to split having pushed %d out of %d keys\n", __FILE__, __LINE__, i, n_pairs);
r=brt_nonleaf_split(t, node, nodea, nodeb, splitk, logger);
if (r!=0) return r;
//printf("%s:%d did split\n", __FILE__, __LINE__);
split_count++;
*did_split=1;
assert((*nodea)->height>0);
assert((*nodeb)->height>0);
assert((*nodea)->u.n.n_children>0);
assert((*nodeb)->u.n.n_children>0);
assert(BNC_BLOCKNUM(*nodea, (*nodea)->u.n.n_children-1).b!=0);
assert(BNC_BLOCKNUM(*nodeb, (*nodeb)->u.n.n_children-1).b!=0);
assert(toku_serialize_brtnode_size(*nodea)<=(*nodea)->nodesize);
assert(toku_serialize_brtnode_size(*nodeb)<=(*nodeb)->nodesize);
//verify_local_fingerprint_nonleaf(*nodea);
//verify_local_fingerprint_nonleaf(*nodeb);
} else {
*did_split=0;
if (toku_serialize_brtnode_size(node) > node->nodesize) {
/* lighten the node by pushing down its buffers. this may cause
the current node to split and go away */
r = brtnode_maybe_push_down(t, node, did_split, nodea, nodeb, splitk, logger);
assert(r == 0);
}
if (*did_split == 0) assert(toku_serialize_brtnode_size(node)<=node->nodesize);
}
return 0;
}
static int
push_some_brt_cmds_down_simple (BRT t, BRTNODE node, int childnum, BOOL *must_split, BOOL *must_merge, TOKULOGGER logger) {
int r;
assert(node->height>0);
BLOCKNUM targetchild = BNC_BLOCKNUM(node, childnum);
assert(targetchild.b>=0 && targetchild.b<t->h->unused_blocks.b); // This assertion could fail in a concurrent setting since another process might have bumped unused memory.
u_int32_t childfullhash = compute_child_fullhash(t->cf, node, childnum);
void *childnode_v;
r = toku_cachetable_get_and_pin(t->cf, targetchild, childfullhash, &childnode_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, t->h);
if (r!=0) return r;
//printf("%s:%d pin %p\n", __FILE__, __LINE__, childnode_v);
BRTNODE child = childnode_v;
assert(child->thisnodename.b!=0);
//verify_local_fingerprint_nonleaf(child);
VERIFY_NODE(t, child);
//printf("%s:%d height=%d n_bytes_in_buffer = {%d, %d, %d, ...}\n", __FILE__, __LINE__, child->height, child->n_bytes_in_buffer[0], child->n_bytes_in_buffer[1], child->n_bytes_in_buffer[2]);
//printf("%s:%d before pushing into Node %" PRIu64 ", disksize=%d", __FILE__, __LINE__, child->thisnodename.b, toku_serialize_brtnode_size(child));
//if (child->height==0) printf(" omtsize=%d", toku_omt_size(child->u.l.buffer));
//printf("\n");
assert(toku_serialize_brtnode_size(child)<=child->nodesize);
if (child->height>0 && child->u.n.n_children>0) assert(BNC_BLOCKNUM(child, child->u.n.n_children-1).b!=0);
if (0) {
static int count=0;
count++;
printf("%s:%d pushing %d count=%d\n", __FILE__, __LINE__, childnum, count);
}
BOOL some_must_split = FALSE;
BOOL some_must_merge = FALSE;
int pushed_count = 0;
{
bytevec key,val;
ITEMLEN keylen, vallen;
//printf("%s:%d Try random_pick, weight=%d \n", __FILE__, __LINE__, BNC_NBYTESINBUF(node, childnum));
assert(toku_fifo_n_entries(BNC_BUFFER(node,childnum))>0);
u_int32_t type;
TXNID xid;
while(0==toku_fifo_peek(BNC_BUFFER(node,childnum), &key, &keylen, &val, &vallen, &type, &xid)) {
DBT hk,hv;
DBT childsplitk;
BOOL this_must_split, this_must_merge;
BRT_CMD_S brtcmd = { (enum brt_cmd_type)type, xid, .u.id= {toku_fill_dbt(&hk, key, keylen),
toku_fill_dbt(&hv, val, vallen)} };
//printf("%s:%d random_picked\n", __FILE__, __LINE__);
toku_init_dbt(&childsplitk);
pushed_count++;
r = push_a_brt_cmd_down_simple (t, node, child, childnum,
&brtcmd,
&this_must_split, &this_must_merge,
logger);
if (0) {
unsigned int sum=0;
FIFO_ITERATE(BNC_BUFFER(node,childnum), subhk __attribute__((__unused__)), hkl, hd __attribute__((__unused__)), hdl, subtype __attribute__((__unused__)), subxid __attribute__((__unused__)),
sum+=hkl+hdl+KEY_VALUE_OVERHEAD+BRT_CMD_OVERHEAD);
printf("%s:%d sum=%u\n", __FILE__, __LINE__, sum);
assert(sum==BNC_NBYTESINBUF(node, childnum));
}
if (BNC_NBYTESINBUF(node, childnum)>0) assert(toku_fifo_n_entries(BNC_BUFFER(node,childnum))>0);
//printf("%s:%d %d=push_a_brt_cmd_down=(); child_did_split=%d (weight=%d)\n", __FILE__, __LINE__, r, child_did_split, BNC_NBYTESINBUF(node, childnum));
if (r!=0) return r;
some_must_split |= this_must_split;
some_must_merge |= this_must_merge;
}
if (0) printf("%s:%d done random picking\n", __FILE__, __LINE__);
}
assert(toku_serialize_brtnode_size(node)<=node->nodesize);
//verify_local_fingerprint_nonleaf(node);
//printf("%s:%d after pushing %d into Node %" PRIu64 ", disksize=%d", __FILE__, __LINE__, pushed_count, child->thisnodename.b, toku_serialize_brtnode_size(child));
//if (child->height==0) printf(" omtsize=%d", toku_omt_size(child->u.l.buffer));
//printf("\n");
r=toku_unpin_brtnode(t, child);
if (r!=0) return r;
*must_split = some_must_split;
*must_merge = some_must_merge;
return 0;
}
static int push_some_brt_cmds_down (BRT t, BRTNODE node, int childnum,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb,
DBT *splitk,
TOKULOGGER logger) {
void *childnode_v;
BRTNODE child;
int r;
assert(node->height>0);
BLOCKNUM targetchild = BNC_BLOCKNUM(node, childnum);
assert(targetchild.b>=0 && targetchild.b<t->h->unused_blocks.b); // This assertion could fail in a concurrent setting since another process might have bumped unused memory.
u_int32_t childfullhash = compute_child_fullhash(t->cf, node, childnum);
r = toku_cachetable_get_and_pin(t->cf, targetchild, childfullhash, &childnode_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, t->h);
if (r!=0) return r;
//printf("%s:%d pin %p\n", __FILE__, __LINE__, childnode_v);
child=childnode_v;
assert(child->thisnodename.b!=0);
//verify_local_fingerprint_nonleaf(child);
VERIFY_NODE(t, child);
//printf("%s:%d height=%d n_bytes_in_buffer = {%d, %d, %d, ...}\n", __FILE__, __LINE__, child->height, child->n_bytes_in_buffer[0], child->n_bytes_in_buffer[1], child->n_bytes_in_buffer[2]);
if (child->height>0 && child->u.n.n_children>0) assert(BNC_BLOCKNUM(child, child->u.n.n_children-1).b!=0);
if (0) {
static int count=0;
count++;
printf("%s:%d pushing %d count=%d\n", __FILE__, __LINE__, childnum, count);
}
...
assert(toku_serialize_brtnode_size(node)<=node->nodesize);
//verify_local_fingerprint_nonleaf(node);
r=toku_unpin_brtnode(t, child);
if (r!=0) return r;
*did_split=0;
return 0;
}
static int brtnode_maybe_push_down(BRT t, BRTNODE node, int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, TOKULOGGER logger)
/* If the buffer is too full, then push down. Possibly the child will split. That may make us split. */
{
assert(node->height>0);
if (toku_serialize_brtnode_size(node) > node->nodesize ) {
{
/* Push to a child. */
/* Find the heaviest child, and push stuff to it. Keep pushing to the child until we run out.
* But if the child pushes something to its child and our buffer has gotten small enough, then we stop pushing. */
int childnum;
find_heaviest_child(node, &childnum);
assert(BNC_BLOCKNUM(node, childnum).b!=0);
int r = push_some_brt_cmds_down(t, node, childnum, did_split, nodea, nodeb, splitk, logger);
if (r!=0) return r;
assert(*did_split==0 || *did_split==1);
if (*did_split) {
assert(toku_serialize_brtnode_size(*nodea)<=(*nodea)->nodesize);
assert(toku_serialize_brtnode_size(*nodeb)<=(*nodeb)->nodesize);
assert((*nodea)->u.n.n_children>0);
assert((*nodeb)->u.n.n_children>0);
assert(BNC_BLOCKNUM(*nodea, (*nodea)->u.n.n_children-1).b!=0);
assert(BNC_BLOCKNUM(*nodeb, (*nodeb)->u.n.n_children-1).b!=0);
//verify_local_fingerprint_nonleaf(*nodea);
//verify_local_fingerprint_nonleaf(*nodeb);
} else {
assert(toku_serialize_brtnode_size(node)<=node->nodesize);
}
}
} else {
*did_split=0;
assert(toku_serialize_brtnode_size(node)<=node->nodesize);
}
//if (*did_split) {
// verify_local_fingerprint_nonleaf(*nodea);
// verify_local_fingerprint_nonleaf(*nodeb);
//} else {
// verify_local_fingerprint_nonleaf(node);
//}
return 0;
}
// Whenever anything provisional is happening, it's XID must match the cmd's.
static int
brt_leaf_put_cmd_simple (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger,
u_int64_t *new_size /*OUT*/
)
// Effect: Put a cmd into a leaf.
// Return the serialization size in *new_size.
// The leaf could end up "too big". It is up to the caller to fix that up.
{
// toku_pma_verify_fingerprint(node->u.l.buffer, node->rand4fingerprint, node->subtree_fingerprint);
VERIFY_NODE(t, node);
assert(node->height==0);
LEAFENTRY storeddata;
OMTVALUE storeddatav=NULL;
u_int32_t idx;
int r;
int compare_both = should_compare_both_keys(node, cmd);
struct cmd_leafval_bessel_extra be = {t, cmd, compare_both};
//static int counter=0;
//counter++;
//printf("counter=%d\n", counter);
switch (cmd->type) {
case BRT_INSERT:
if (node->u.l.seqinsert) {
idx = toku_omt_size(node->u.l.buffer);
r = toku_omt_fetch(node->u.l.buffer, idx-1, &storeddatav, NULL);
if (r != 0) goto fz;
storeddata = storeddatav;
int cmp = toku_cmd_leafval_bessel(storeddata, &be);
if (cmp >= 0) goto fz;
r = DB_NOTFOUND;
} else {
fz:
r = toku_omt_find_zero(node->u.l.buffer, toku_cmd_leafval_bessel, &be,
&storeddatav, &idx, NULL);
}
if (r==DB_NOTFOUND) {
storeddata = 0;
} else if (r!=0) {
return r;
} else {
storeddata=storeddatav;
}
r = brt_leaf_apply_cmd_once(t, node, cmd, logger, idx, storeddata);
if (r!=0) return r;
// if the insertion point is within a window of the right edge of
// the leaf then it is sequential
// window = min(32, number of leaf entries/16)
u_int32_t s = toku_omt_size(node->u.l.buffer);
u_int32_t w = s / 16;
if (w == 0) w = 1;
if (w > 32) w = 32;
// within the window?
if (s - idx <= w) {
node->u.l.seqinsert += 1;
} else {
node->u.l.seqinsert = 0;
}
break;
case BRT_DELETE_BOTH:
case BRT_ABORT_BOTH:
case BRT_COMMIT_BOTH:
// Delete the one item
r = toku_omt_find_zero(node->u.l.buffer, toku_cmd_leafval_bessel, &be,
&storeddatav, &idx, NULL);
if (r == DB_NOTFOUND) break;
if (r != 0) return r;
storeddata=storeddatav;
VERIFY_NODE(t, node);
static int count=0;
count++;
r = brt_leaf_apply_cmd_once(t, node, cmd, logger, idx, storeddata);
if (r!=0) return r;
VERIFY_NODE(t, node);
break;
case BRT_DELETE_ANY:
case BRT_ABORT_ANY:
case BRT_COMMIT_ANY:
// Delete all the matches
r = toku_omt_find_zero(node->u.l.buffer, toku_cmd_leafval_bessel, &be,
&storeddatav, &idx, NULL);
if (r == DB_NOTFOUND) break;
if (r != 0) return r;
storeddata=storeddatav;
while (1) {
int vallen = le_any_vallen(storeddata);
void *save_val = toku_memdup(le_any_val(storeddata), vallen);
r = brt_leaf_apply_cmd_once(t, node, cmd, logger, idx, storeddata);
if (r!=0) return r;
// Now we must find the next one.
DBT valdbt;
BRT_CMD_S ncmd = { cmd->type, cmd->xid, .u.id={cmd->u.id.key, toku_fill_dbt(&valdbt, save_val, vallen)}};
struct cmd_leafval_bessel_extra nbe = {t, &ncmd, 1};
r = toku_omt_find(node->u.l.buffer, toku_cmd_leafval_bessel, &nbe, +1,
&storeddatav, &idx, NULL);
toku_free(save_val);
if (r!=0) break;
storeddata=storeddatav;
{ // Continue only if the next record that we found has the same key.
DBT adbt;
if (t->compare_fun(t->db,
toku_fill_dbt(&adbt, le_any_key(storeddata), le_any_keylen(storeddata)),
cmd->u.id.key) != 0)
break;
}
}
break;
case BRT_NONE: return EINVAL;
}
/// All done doing the work
node->dirty = 1;
// toku_pma_verify_fingerprint(node->u.l.buffer, node->rand4fingerprint, node->subtree_fingerprint);
VERIFY_NODE(t, node);
*new_size = toku_serialize_brtnode_size(node);
return 0;
}
static int brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
TOKULOGGER logger) {
u_int64_t leaf_size MAYBE_INIT(0);
int r = brt_leaf_put_cmd_simple(t, node, cmd, logger, &leaf_size);
if (r!=0) return r;
// If it doesn't fit, then split the leaf.
if (leaf_size > node->nodesize) {
FILENUM filenum = toku_cachefile_filenum(t->cf);
r = brtleaf_split (logger, filenum, t, node, nodea, nodeb, splitk);
if (r!=0) return r;
//printf("%s:%d splitkey=%s\n", __FILE__, __LINE__, (char*)*splitkey);
split_count++;
*did_split = 1;
assert(toku_serialize_brtnode_size(*nodea)<=(*nodea)->nodesize);
assert(toku_serialize_brtnode_size(*nodeb)<=(*nodeb)->nodesize);
VERIFY_NODE(t, *nodea);
VERIFY_NODE(t, *nodeb);
} else {
*did_split = 0;
}
return 0;
}
/* put a cmd into a nodes child */
static int
brt_nonleaf_put_cmd_child_node_simple (BRT t, BRTNODE node, int childnum, BOOL maybe, BRT_CMD cmd, TOKULOGGER logger,
BOOL *should_split /* OUT */,
BOOL *should_merge)
// Effect: Put CMD into the child of node.
// If MAYBE is false and the child is not in main memory, then don't do anything.
// If we return 0, then store *must_split and *must_merge appropriately.
{
int r;
void *child_v;
BRTNODE child;
int child_did_split;
BLOCKNUM childblocknum=BNC_BLOCKNUM(node, childnum);
u_int32_t fullhash = compute_child_fullhash(t->cf, node, childnum);
if (maybe)
r = toku_cachetable_maybe_get_and_pin(t->cf, childblocknum, fullhash, &child_v);
else
r = toku_cachetable_get_and_pin(t->cf, childblocknum, fullhash, &child_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, t->h);
if (r != 0)
return r;
child = child_v;
child_did_split = 0;
r = brtnode_put_cmd_simple(t, child, cmd, logger, should_split, should_merge);
if (r != 0) {
/* putting to the child failed for some reason, so unpin the child and return the error code */
int rr = toku_unpin_brtnode(t, child);
assert(rr == 0);
return r;
}
{
//verify_local_fingerprint_nonleaf(child);
fixup_child_fingerprint(node, childnum, child, t, logger);
int rr = toku_unpin_brtnode(t, child);
assert(rr == 0);
}
return r;
}
/* put a cmd into a nodes child */
static int brt_nonleaf_put_cmd_child_node (BRT t, BRTNODE node, BRT_CMD cmd,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
TOKULOGGER logger, int childnum, int maybe) {
int r;
void *child_v;
BRTNODE child;
int child_did_split;
BRTNODE childa, childb;
DBT childsplitk;
*did_split = 0;
BLOCKNUM childblocknum=BNC_BLOCKNUM(node, childnum);
u_int32_t fullhash = compute_child_fullhash(t->cf, node, childnum);
if (maybe)
r = toku_cachetable_maybe_get_and_pin(t->cf, childblocknum, fullhash, &child_v);
else
r = toku_cachetable_get_and_pin(t->cf, childblocknum, fullhash, &child_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, t->h);
if (r != 0)
return r;
child = child_v;
child_did_split = 0;
r = brtnode_put_cmd(t, child, cmd,
&child_did_split, &childa, &childb, &childsplitk, logger);
if (r != 0) {
/* putting to the child failed for some reason, so unpin the child and return the error code */
int rr = toku_unpin_brtnode(t, child);
assert(rr == 0);
return r;
}
if (child_did_split) {
if (0) printf("brt_nonleaf_insert child_split %p\n", child);
r = handle_split_of_child(t, node, childnum,
childa, childb, &childsplitk,
did_split, nodea, nodeb, splitk,
logger);
assert(r == 0);
} else {
//verify_local_fingerprint_nonleaf(child);
fixup_child_fingerprint(node, childnum, child, t, logger);
int rr = toku_unpin_brtnode(t, child);
assert(rr == 0);
}
return r;
}
int toku_brt_do_push_cmd = 1;
/* put a cmd into a node at childnum */
static int brt_nonleaf_put_cmd_child (BRT t, BRTNODE node, BRT_CMD cmd,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
TOKULOGGER logger, unsigned int childnum, int can_push, int *do_push_down) {
//verify_local_fingerprint_nonleaf(node);
/* try to push the cmd to the subtree if the buffer is empty and pushes are enabled */
if (BNC_NBYTESINBUF(node, childnum) == 0 && can_push && toku_brt_do_push_cmd) {
int r = brt_nonleaf_put_cmd_child_node(t, node, cmd, did_split, nodea, nodeb, splitk, logger, childnum, 1);
if (r == 0)
return r;
}
//verify_local_fingerprint_nonleaf(node);
/* append the cmd to the child buffer */
{
int type = cmd->type;
DBT *k = cmd->u.id.key;
DBT *v = cmd->u.id.val;
int r = log_and_save_brtenq(logger, t, node, childnum, cmd->xid, type, k->data, k->size, v->data, v->size, &node->local_fingerprint);
if (r!=0) return r;
int diff = k->size + v->size + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD;
r=toku_fifo_enq(BNC_BUFFER(node,childnum), k->data, k->size, v->data, v->size, type, cmd->xid);
assert(r==0);
node->u.n.n_bytes_in_buffers += diff;
BNC_NBYTESINBUF(node, childnum) += diff;
node->dirty = 1;
}
*do_push_down = 1;
return 0;
}
static int
merge (void) {
static int printcount=0;
printcount++;
if (0==(printcount & (printcount-1))) {// is printcount a power of two?
printf("%s:%d %s not ready (%d invocations)\n", __FILE__, __LINE__, __func__, printcount);
}
return 0;
}
static inline int
brt_serialize_size_of_child (BRT t, BRTNODE node, int childnum, int *fanout) {
assert(node->height>0);
BLOCKNUM childblocknum = BNC_BLOCKNUM(node, childnum);
u_int32_t fullhash = compute_child_fullhash(t->cf, node, childnum);
void *childnode_v;
int r = toku_cachetable_get_and_pin(t->cf, childblocknum, fullhash, &childnode_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, t->h);
BRTNODE childnode = childnode_v;
int size = toku_serialize_brtnode_size(childnode);
assert(r==0);
*fanout = (childnode->height==0) ? 0 : childnode->u.n.n_children;
r = toku_cachetable_unpin(t->cf, childnode->thisnodename, childnode->fullhash, 0, brtnode_memory_size(childnode));
assert(r==0);
return size;
}
// Split or merge the child, if the child too large or too small.
// Return the new fanout of node.
static int
brt_nonleaf_maybe_split_or_merge (BRT t, BRTNODE node, int childnum, BOOL should_split, BOOL should_merge, TOKULOGGER logger, u_int32_t *new_fanout) {
//printf("%s:%d Node %" PRIu64 " is size %d, child %d is Node %" PRIu64 " size is %d\n", __FILE__, __LINE__, node->thisnodename.b, toku_serialize_brtnode_size(node), childnum, BNC_BLOCKNUM(node, childnum).b, brt_serialize_size_of_child(t, node, childnum));
assert(!(should_split && should_merge));
if (should_split) { int r = brt_split_child(t, node, childnum, logger); if (r!=0) return r; }
if (should_merge) { int r = merge(); if (r!=0) return r; }
*new_fanout = node->u.n.n_children;
return 0;
}
/* Put a cmd into a node at childnum */
/* May result in the data being pushed to a child.
* Which may cause that child to split, which may cause the fanout to become larger.
* Return the new fanout. */
static int
brt_nonleaf_put_cmd_child_simple (BRT t, BRTNODE node, unsigned int childnum, BRT_CMD cmd, TOKULOGGER logger, u_int32_t *new_fanout) {
//verify_local_fingerprint_nonleaf(node);
/* Push the cmd to the subtree if the buffer is empty */
//printf("%s:%d %s\n",__FILE__,__LINE__,__func__);
if (BNC_NBYTESINBUF(node, childnum) == 0) {
BOOL must_split MAYBE_INIT(FALSE);
BOOL must_merge MAYBE_INIT(FALSE);
//printf("%s:%d fix up fingerprint?\n", __FILE__, __LINE__);
int r = brt_nonleaf_put_cmd_child_node_simple(t, node, childnum, TRUE, cmd, logger, &must_split, &must_merge);
//printf("%s:%d Put in child, must_split=%d must_merge=%d\n", __FILE__, __LINE__, must_split, must_merge);
if (r==0) {
return brt_nonleaf_maybe_split_or_merge(t, node, childnum, must_split, must_merge, logger, new_fanout);
}
// Otherwise fall out and append it to the child buffer.
//printf("%s:%d fall out\n", __FILE__, __LINE__);
}
//verify_local_fingerprint_nonleaf(node);
/* append the cmd to the child buffer */
{
int type = cmd->type;
DBT *k = cmd->u.id.key;
DBT *v = cmd->u.id.val;
int r = log_and_save_brtenq(logger, t, node, childnum, cmd->xid, type, k->data, k->size, v->data, v->size, &node->local_fingerprint);
if (r!=0) return r;
int diff = k->size + v->size + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD;
r=toku_fifo_enq(BNC_BUFFER(node,childnum), k->data, k->size, v->data, v->size, type, cmd->xid);
assert(r==0);
node->u.n.n_bytes_in_buffers += diff;
BNC_NBYTESINBUF(node, childnum) += diff;
node->dirty = 1;
}
if (toku_serialize_brtnode_size(node) > node->nodesize) {
int biggest_child;
BOOL must_split MAYBE_INIT(FALSE);
BOOL must_merge MAYBE_INIT(FALSE);
find_heaviest_child(node, &biggest_child);
{
int cfan;
int csize;
csize = brt_serialize_size_of_child(t, node, biggest_child, &cfan);
if (0) printf("%s:%d Node %" PRIu64 " fanout=%d Pushing into child %d (Node %" PRIu64 ", size=%d, fanout=%d estimate=%" PRId64 ")\n", __FILE__, __LINE__,
node->thisnodename.b, node->u.n.n_children,
biggest_child, BNC_BLOCKNUM(node, biggest_child).b, csize, cfan, BNC_SUBTREE_LEAFENTRY_ESTIMATE(node, biggest_child));
}
// printf("%s:%d fix up fingerprint?\n", __FILE__, __LINE__);
int r = push_some_brt_cmds_down_simple(t, node, biggest_child, &must_split, &must_merge, logger);
if (r!=0) return r;
return brt_nonleaf_maybe_split_or_merge(t, node, biggest_child, must_split, must_merge, logger, new_fanout);
}
*new_fanout = node->u.n.n_children;
if (0) {
printf("%s:%d Done pushing Node %" PRIu64 " n_children=%d: estimates=", __FILE__, __LINE__, node->thisnodename.b, node->u.n.n_children);
int i;
int64_t total=0;
for (i=0; i<node->u.n.n_children; i++) {
int64_t v = BNC_SUBTREE_LEAFENTRY_ESTIMATE(node, i);
total+=v;
printf(" %" PRId64, v);
}
printf(" total=%" PRId64 " \n", total);
}
return 0;
}
static int
brt_nonleaf_cmd_once_simple (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger,
u_int32_t *new_fanout) {
//verify_local_fingerprint_nonleaf(node);
/* find the right subtree */
unsigned int childnum = toku_brtnode_which_child(node, cmd->u.id.key, cmd->u.id.val, t);
/* put the cmd in the subtree */
return brt_nonleaf_put_cmd_child_simple(t, node, childnum, cmd, logger, new_fanout);
}
/* delete in all subtrees starting from the left most one which contains the key */
/* delete in all subtrees starting from the left most one which contains the key */
static int brt_nonleaf_cmd_many (BRT t, BRTNODE node, BRT_CMD cmd,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
TOKULOGGER logger) {
int r;
/* find all children that need a copy of the command */
int sendchild[TREE_FANOUT], delidx = 0;
#define sendchild_append(i) \
if (delidx == 0 || sendchild[delidx-1] != i) sendchild[delidx++] = i;
int i;
for (i = 0; i < node->u.n.n_children-1; i++) {
int cmp = brt_compare_pivot(t, cmd->u.id.key, 0, node->u.n.childkeys[i]);
if (cmp > 0) {
continue;
} else if (cmp < 0) {
sendchild_append(i);
break;
} else if (t->flags & TOKU_DB_DUPSORT) {
sendchild_append(i);
sendchild_append(i+1);
} else {
sendchild_append(i);
break;
}
}
if (delidx == 0)
sendchild_append(node->u.n.n_children-1);
#undef sendchild_append
/* issue the to all of the children found previously */
int do_push_down = 0;
for (i=0; i<delidx; i++) {
r = brt_nonleaf_put_cmd_child(t, node, cmd, did_split, nodea, nodeb, splitk, logger, sendchild[i], delidx == 1, &do_push_down);
assert(r == 0);
}
if (do_push_down) {
/* maybe push down */
//verify_local_fingerprint_nonleaf(node);
r = brtnode_maybe_push_down(t, node, did_split, nodea, nodeb, splitk, logger);
if (r!=0) return r;
if (*did_split) {
assert(toku_serialize_brtnode_size(*nodea)<=(*nodea)->nodesize);
assert(toku_serialize_brtnode_size(*nodeb)<=(*nodeb)->nodesize);
assert((*nodea)->u.n.n_children>0);
assert((*nodeb)->u.n.n_children>0);
assert(BNC_BLOCKNUM(*nodea,(*nodea)->u.n.n_children-1).b!=0);
assert(BNC_BLOCKNUM(*nodeb,(*nodeb)->u.n.n_children-1).b!=0);
} else {
assert(toku_serialize_brtnode_size(node)<=node->nodesize);
}
//if (*did_split) {
// verify_local_fingerprint_nonleaf(*nodea);
// verify_local_fingerprint_nonleaf(*nodeb);
//} else {
// verify_local_fingerprint_nonleaf(node);
//}
}
return 0;
}
static int brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb,
DBT *splitk,
TOKULOGGER logger) {
switch (cmd->type) {
case BRT_INSERT:
case BRT_DELETE_BOTH:
case BRT_ABORT_BOTH:
case BRT_COMMIT_BOTH:
do_once:
return brt_nonleaf_cmd_once(t, node, cmd, did_split, nodea, nodeb, splitk, logger);
case BRT_DELETE_ANY:
case BRT_ABORT_ANY:
case BRT_COMMIT_ANY:
if (0 == (node->flags & TOKU_DB_DUPSORT)) goto do_once; // nondupsort delete_any is just do once.
return brt_nonleaf_cmd_many(t, node, cmd, did_split, nodea, nodeb, splitk, logger);
case BRT_NONE:
break;
}
return EINVAL;
}
static int brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
TOKULOGGER logger) {
//static int counter=0; // FOO
//static int oldcounter=0;
//int tmpcounter;
//u_int32_t oldfingerprint=node->local_fingerprint;
int r;
//counter++; tmpcounter=counter;
if (node->height==0) {
// toku_pma_verify_fingerprint(node->u.l.buffer, node->rand4fingerprint, node->subtree_fingerprint);
r = brt_leaf_put_cmd(t, node, cmd,
did_split, nodea, nodeb, splitk,
logger);
} else {
r = brt_nonleaf_put_cmd(t, node, cmd,
did_split, nodea, nodeb, splitk,
logger);
}
//oldcounter=tmpcounter;
// Watch out. If did_split then the original node is no longer allocated.
if (*did_split) {
assert(toku_serialize_brtnode_size(*nodea)<=(*nodea)->nodesize);
assert(toku_serialize_brtnode_size(*nodeb)<=(*nodeb)->nodesize);
// if ((*nodea)->height==0) {
// toku_pma_verify_fingerprint((*nodea)->u.l.buffer, (*nodea)->rand4fingerprint, (*nodea)->subtree_fingerprint);
// toku_pma_verify_fingerprint((*nodeb)->u.l.buffer, (*nodeb)->rand4fingerprint, (*nodeb)->subtree_fingerprint);
// }
} else {
assert(toku_serialize_brtnode_size(node)<=node->nodesize);
// if (node->height==0) {
// toku_pma_verify_fingerprint(node->u.l.buffer, node->rand4fingerprint, node->local_fingerprint);
// } else {
// verify_local_fingerprint_nonleaf(node);
// }
}
//if (node->local_fingerprint==3522421844U) {
// if (*did_split) {
// verify_local_fingerprint_nonleaf(*nodea);
// verify_local_fingerprint_nonleaf(*nodeb);
// }
return r;
}
//strcmp(key,"hello387")==0;
static int push_something_simple(BRT brt, BRTNODE *nodep, CACHEKEY *rootp, BRT_CMD cmd, TOKULOGGER logger) {
BRTNODE node = *nodep;
BOOL should_split =-1;
BOOL should_merge =-1;
{
int r = brtnode_put_cmd_simple(brt, node, cmd, logger, &should_split, &should_merge);
if (r!=0) return r;
//if (should_split) printf("%s:%d Pushed something simple, should_split=1\n", __FILE__, __LINE__);
}
assert(should_split!=(BOOL)-1 && should_merge!=(BOOL)-1);
assert(!(should_split && should_merge));
//printf("%s:%d should_split=%d node_size=%" PRIu64 "\n", __FILE__, __LINE__, should_split, brtnode_memory_size(node));
if (should_split) {
BRTNODE nodea,nodeb;
DBT splitk;
if (node->height==0) {
int r = brtleaf_split(logger, toku_cachefile_filenum(brt->cf), brt, node, &nodea, &nodeb, &splitk);
if (r!=0) return r;
} else {
int r = brt_nonleaf_split(brt, node, &nodea, &nodeb, &splitk, logger);
if (r!=0) return r;
}
return brt_init_new_root(brt, nodea, nodeb, splitk, rootp, logger, nodep);
} else if (should_merge) {
return 0; // Cannot merge anything at the root, so return happy.
} else {
return 0;
}
}
#if 0
static int push_something(BRT brt, BRTNODE *nodep, CACHEKEY *rootp, BRT_CMD cmd, TOKULOGGER logger) {
int did_split = 0;
BRTNODE nodea=0, nodeb=0;
DBT splitk;
int result = brtnode_put_cmd(brt, *nodep, cmd,
&did_split, &nodea, &nodeb, &splitk,
logger);
int r;
if (did_split) {
// node is unpinned, so now we have to proceed to update the root with a new node.
//printf("%s:%d did_split=%d nodeb=%p nodeb->thisnodename=%lld nodeb->nodesize=%d\n", __FILE__, __LINE__, did_split, nodeb, nodeb->thisnodename, nodeb->nodesize);
//printf("Did split, splitkey=%s\n", splitkey);
if (nodeb->height>0) assert(BNC_BLOCKNUM(nodeb,nodeb->u.n.n_children-1).b!=0);
assert(nodeb->nodesize>0);
r = brt_init_new_root(brt, nodea, nodeb, splitk, rootp, logger, nodep);
assert(r == 0);
} else {
if ((*nodep)->height>0)
assert((*nodep)->u.n.n_children<=TREE_FANOUT);
}
//assert(0==toku_cachetable_assert_all_unpinned(brt->cachetable));
return result;
}
#endif
int toku_brt_root_put_cmd(BRT brt, BRT_CMD cmd, TOKULOGGER logger) {
void *node_v;
BRTNODE node;
CACHEKEY *rootp;
int r;
//assert(0==toku_cachetable_assert_all_unpinned(brt->cachetable));
assert(brt->h);
brt->h->root_put_counter = global_root_put_counter++;
u_int32_t fullhash;
rootp = toku_calculate_root_offset_pointer(brt, &fullhash);
//assert(fullhash==toku_cachetable_hash(brt->cf, *rootp));
if ((r=toku_cachetable_get_and_pin(brt->cf, *rootp, fullhash, &node_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt->h))) {
return r;
}
//printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v);
node=node_v;
assert(node->fullhash==fullhash);
// push the fifo stuff
{
DBT okey,odata;
BRT_CMD_S ocmd;
while (0==toku_fifo_peek_cmdstruct(brt->h->fifo, &ocmd, &okey, &odata)) {
if ((r = push_something_simple(brt, &node, rootp, &ocmd, logger))) return r;
r = toku_fifo_deq(brt->h->fifo);
assert(r==0);
}
}
if ((r = push_something_simple(brt, &node, rootp, cmd, logger))) return r;
r = toku_unpin_brtnode(brt, node);
assert(r == 0);
return 0;
}
int toku_brt_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn) {
int r;
if (txn && (brt->txn_that_created != toku_txn_get_txnid(txn))) {
toku_cachefile_refup(brt->cf);
BYTESTRING keybs = {key->size, toku_memdup_in_rollback(txn, key->data, key->size)};
BYTESTRING databs = {val->size, toku_memdup_in_rollback(txn, val->data, val->size)};
r = toku_logger_save_rollback_cmdinsert(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), keybs, databs);
if (r!=0) return r;
r = toku_txn_note_brt(txn, brt);
if (r!=0) return r;
}
BRT_CMD_S brtcmd = { BRT_INSERT, toku_txn_get_txnid(txn), .u.id={key,val}};
r = toku_brt_root_put_cmd(brt, &brtcmd, toku_txn_logger(txn));
if (r!=0) return r;
return r;
}
int toku_brt_delete_both(BRT brt, DBT *key, DBT *val, TOKUTXN txn) {
//{ unsigned i; printf("del %p keylen=%d key={", brt->db, key->size); for(i=0; i<key->size; i++) printf("%d,", ((char*)key->data)[i]); printf("} datalen=%d data={", val->size); for(i=0; i<val->size; i++) printf("%d,", ((char*)val->data)[i]); printf("}\n"); }
int r;
if (txn && (brt->txn_that_created != toku_txn_get_txnid(txn))) {
BYTESTRING keybs = {key->size, toku_memdup_in_rollback(txn, key->data, key->size)};
BYTESTRING databs = {val->size, toku_memdup_in_rollback(txn, val->data, val->size)};
toku_cachefile_refup(brt->cf);
r = toku_logger_save_rollback_cmddeleteboth(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), keybs, databs);
if (r!=0) return r;
r = toku_txn_note_brt(txn, brt);
if (r!=0) return r;
}
BRT_CMD_S brtcmd = { BRT_DELETE_BOTH, toku_txn_get_txnid(txn), .u.id={key,val}};
r = toku_brt_root_put_cmd(brt, &brtcmd, toku_txn_logger(txn));
return r;
}
#if 0
static int show_brtnode_blocknumbers (BRT brt, DISKOFF off) {
BRTNODE node;
void *node_v;
int i,r;
assert(off%brt->h->nodesize==0);
if ((r = toku_cachetable_get_and_pin(brt->cf, off, &node_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt->h))) {
if (0) { died0: toku_cachetable_unpin(brt->cf, off, 0, 0); }
return r;
}
printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v);
node=node_v;
printf(" %lld", off/brt->h->nodesize);
if (node->height>0) {
for (i=0; i<node->u.n.n_children; i++) {
if ((r=show_brtnode_blocknumbers(brt, BNC_BLOCKNUM(node, i)))) goto died0;
}
}
r = toku_cachetable_unpin(brt->cf, off, 0, 0);
return r;
}
int show_brt_blocknumbers (BRT brt) {
int r;
CACHEKEY *rootp;
if ((r = toku_read_and_pin_brt_header(brt->cf, &brt->h))) {
if (0) { died0: toku_unpin_brt_header(brt); }
return r;
}
rootp = toku_calculate_root_offset_pointer(brt);
printf("BRT %p has blocks:", brt);
if ((r=show_brtnode_blocknumbers (brt, *rootp, 0))) goto died0;
printf("\n");
if ((r = toku_unpin_brt_header(brt))!=0) return r;
return 0;
}
#endif
typedef struct brt_split {
int did_split;
BRTNODE nodea;
BRTNODE nodeb;
DBT splitk;
} BRT_SPLIT;
static inline void brt_split_init(BRT_SPLIT *split) {
split->did_split = 0;
split->nodea = split->nodeb = 0;
toku_init_dbt(&split->splitk);
}
static int brt_search_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKULOGGER logger, OMTCURSOR);
int toku_brt_height_of_root(BRT brt, int *height) {
// for an open brt, return the current height.
int r;
assert(brt->h);
u_int32_t fullhash;
CACHEKEY *rootp = toku_calculate_root_offset_pointer(brt, &fullhash);
void *node_v;
//assert(fullhash == toku_cachetable_hash(brt->cf, *rootp));
if ((r=toku_cachetable_get_and_pin(brt->cf, *rootp, fullhash, &node_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt->h))) {
return r;
}
BRTNODE node = node_v;
*height = node->height;
r = toku_unpin_brtnode(brt, node); assert(r==0);
return 0;
}
...@@ -271,7 +271,7 @@ void toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct ...@@ -271,7 +271,7 @@ void toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct
printf("%s:%d w.done=%u calculated_size=%u\n", __FILE__, __LINE__, w.ndone, calculated_size); printf("%s:%d w.done=%u calculated_size=%u\n", __FILE__, __LINE__, w.ndone, calculated_size);
assert(calculated_size==w.ndone); assert(calculated_size==w.ndone);
// The uncompressed part of the header is // The uncompressed part of the block header is
// tokuleaf(8), // tokuleaf(8),
// version(4), // version(4),
// lsn(8), // lsn(8),
......
...@@ -71,12 +71,8 @@ int toku_testsetup_insert_to_leaf (BRT brt, BLOCKNUM blocknum, char *key, int ke ...@@ -71,12 +71,8 @@ int toku_testsetup_insert_to_leaf (BRT brt, BLOCKNUM blocknum, char *key, int ke
assert(node->height==0); assert(node->height==0);
u_int32_t lesize, disksize; u_int32_t lesize, disksize;
LEAFENTRY tmp_leafentry; LEAFENTRY leafentry;
r = le_committed(keylen, key, vallen, val, &lesize, &disksize, &tmp_leafentry); r = le_committed(keylen, key, vallen, val, &lesize, &disksize, &leafentry, node->u.l.buffer, &node->u.l.buffer_mempool, 0);
LEAFENTRY leafentry = mempool_malloc_from_omt(node->u.l.buffer, &node->u.l.buffer_mempool, lesize);
memcpy(leafentry, tmp_leafentry, lesize);
toku_free(tmp_leafentry);
OMTVALUE storeddatav; OMTVALUE storeddatav;
u_int32_t idx; u_int32_t idx;
......
...@@ -1256,7 +1256,8 @@ should_compare_both_keys (BRTNODE node, BRT_CMD cmd) ...@@ -1256,7 +1256,8 @@ should_compare_both_keys (BRTNODE node, BRT_CMD cmd)
static int apply_cmd_to_le_committed (u_int32_t klen, void *kval, static int apply_cmd_to_le_committed (u_int32_t klen, void *kval,
u_int32_t dlen, void *dval, u_int32_t dlen, void *dval,
BRT_CMD cmd, BRT_CMD cmd,
u_int32_t *newlen, u_int32_t *disksize, LEAFENTRY *new_data) { u_int32_t *newlen, u_int32_t *disksize, LEAFENTRY *new_data,
OMT omt, struct mempool *mp, void **maybe_free) {
//assert(cmd->u.id.key->size == klen); //assert(cmd->u.id.key->size == klen);
//assert(memcmp(cmd->u.id.key->data, kval, klen)==0); //assert(memcmp(cmd->u.id.key->data, kval, klen)==0);
switch (cmd->type) { switch (cmd->type) {
...@@ -1265,20 +1266,23 @@ static int apply_cmd_to_le_committed (u_int32_t klen, void *kval, ...@@ -1265,20 +1266,23 @@ static int apply_cmd_to_le_committed (u_int32_t klen, void *kval,
klen, kval, klen, kval,
dlen, dval, dlen, dval,
cmd->u.id.val->size, cmd->u.id.val->data, cmd->u.id.val->size, cmd->u.id.val->data,
newlen, disksize, new_data); newlen, disksize, new_data,
omt, mp, maybe_free);
case BRT_DELETE_ANY: case BRT_DELETE_ANY:
case BRT_DELETE_BOTH: case BRT_DELETE_BOTH:
return le_provdel(cmd->xid, return le_provdel(cmd->xid,
klen, kval, klen, kval,
dlen, dval, dlen, dval,
newlen, disksize, new_data); newlen, disksize, new_data,
omt, mp, maybe_free);
case BRT_ABORT_BOTH: case BRT_ABORT_BOTH:
case BRT_ABORT_ANY: case BRT_ABORT_ANY:
case BRT_COMMIT_BOTH: case BRT_COMMIT_BOTH:
case BRT_COMMIT_ANY: case BRT_COMMIT_ANY:
// Just return the original committed record // Just return the original committed record
return le_committed(klen, kval, dlen, dval, return le_committed(klen, kval, dlen, dval,
newlen, disksize, new_data); newlen, disksize, new_data,
omt, mp, maybe_free);
case BRT_NONE: break; case BRT_NONE: break;
} }
abort(); return 0; abort(); return 0;
...@@ -1289,7 +1293,8 @@ static int apply_cmd_to_le_both (TXNID xid, ...@@ -1289,7 +1293,8 @@ static int apply_cmd_to_le_both (TXNID xid,
u_int32_t clen, void *cval, u_int32_t clen, void *cval,
u_int32_t plen, void *pval, u_int32_t plen, void *pval,
BRT_CMD cmd, BRT_CMD cmd,
u_int32_t *newlen, u_int32_t *disksize, LEAFENTRY *new_data) { u_int32_t *newlen, u_int32_t *disksize, LEAFENTRY *new_data,
OMT omt, struct mempool *mp, void *maybe_free) {
u_int32_t prev_len; u_int32_t prev_len;
void *prev_val; void *prev_val;
if (xid==cmd->xid) { if (xid==cmd->xid) {
...@@ -1308,25 +1313,29 @@ static int apply_cmd_to_le_both (TXNID xid, ...@@ -1308,25 +1313,29 @@ static int apply_cmd_to_le_both (TXNID xid,
klen, kval, klen, kval,
prev_len, prev_val, prev_len, prev_val,
cmd->u.id.val->size, cmd->u.id.val->data, cmd->u.id.val->size, cmd->u.id.val->data,
newlen, disksize, new_data); newlen, disksize, new_data,
omt, mp, maybe_free);
case BRT_DELETE_ANY: case BRT_DELETE_ANY:
case BRT_DELETE_BOTH: case BRT_DELETE_BOTH:
return le_provdel(cmd->xid, return le_provdel(cmd->xid,
klen, kval, klen, kval,
prev_len, prev_val, prev_len, prev_val,
newlen, disksize, new_data); newlen, disksize, new_data,
omt, mp, maybe_free);
case BRT_ABORT_BOTH: case BRT_ABORT_BOTH:
case BRT_ABORT_ANY: case BRT_ABORT_ANY:
// I don't see how you could have an abort where the xids don't match. But do it anyway. // I don't see how you could have an abort where the xids don't match. But do it anyway.
return le_committed(klen, kval, return le_committed(klen, kval,
prev_len, prev_val, prev_len, prev_val,
newlen, disksize, new_data); newlen, disksize, new_data,
omt, mp, maybe_free);
case BRT_COMMIT_BOTH: case BRT_COMMIT_BOTH:
case BRT_COMMIT_ANY: case BRT_COMMIT_ANY:
// In the future we won't even have these commit messages. // In the future we won't even have these commit messages.
return le_committed(klen, kval, return le_committed(klen, kval,
plen, pval, plen, pval,
newlen, disksize, new_data); newlen, disksize, new_data,
omt, mp, maybe_free);
case BRT_NONE: break; case BRT_NONE: break;
} }
abort(); return 0; abort(); return 0;
...@@ -1336,7 +1345,9 @@ static int apply_cmd_to_le_provdel (TXNID xid, ...@@ -1336,7 +1345,9 @@ static int apply_cmd_to_le_provdel (TXNID xid,
u_int32_t klen, void *kval, u_int32_t klen, void *kval,
u_int32_t clen, void *cval, u_int32_t clen, void *cval,
BRT_CMD cmd, BRT_CMD cmd,
u_int32_t *newlen, u_int32_t *disksize, LEAFENTRY *new_data) { u_int32_t *newlen, u_int32_t *disksize, LEAFENTRY *new_data,
OMT omt, struct mempool *mp, void *maybe_free)
{
// keep the committed value for rollback // keep the committed value for rollback
//assert(cmd->u.id.key->size == klen); //assert(cmd->u.id.key->size == klen);
//assert(memcmp(cmd->u.id.key->data, kval, klen)==0); //assert(memcmp(cmd->u.id.key->data, kval, klen)==0);
...@@ -1347,13 +1358,15 @@ static int apply_cmd_to_le_provdel (TXNID xid, ...@@ -1347,13 +1358,15 @@ static int apply_cmd_to_le_provdel (TXNID xid,
klen, kval, klen, kval,
clen, cval, clen, cval,
cmd->u.id.val->size, cmd->u.id.val->data, cmd->u.id.val->size, cmd->u.id.val->data,
newlen, disksize, new_data); newlen, disksize, new_data,
omt, mp, maybe_free);
} else { } else {
// It's an insert, but the committed value is deleted (since the xids don't match, we assume the delete took effect) // It's an insert, but the committed value is deleted (since the xids don't match, we assume the delete took effect)
return le_provpair(cmd->xid, return le_provpair(cmd->xid,
klen, kval, klen, kval,
cmd->u.id.val->size, cmd->u.id.val->data, cmd->u.id.val->size, cmd->u.id.val->data,
newlen, disksize, new_data); newlen, disksize, new_data,
omt, mp, maybe_free);
} }
case BRT_DELETE_ANY: case BRT_DELETE_ANY:
case BRT_DELETE_BOTH: case BRT_DELETE_BOTH:
...@@ -1363,7 +1376,8 @@ static int apply_cmd_to_le_provdel (TXNID xid, ...@@ -1363,7 +1376,8 @@ static int apply_cmd_to_le_provdel (TXNID xid,
return le_provdel(cmd->xid, return le_provdel(cmd->xid,
klen, kval, klen, kval,
clen, cval, clen, cval,
newlen, disksize, new_data); newlen, disksize, new_data,
omt, mp, maybe_free);
} else { } else {
// The commited value is deleted, and we are deleting, so treat as a delete. // The commited value is deleted, and we are deleting, so treat as a delete.
*new_data = 0; *new_data = 0;
...@@ -1374,7 +1388,8 @@ static int apply_cmd_to_le_provdel (TXNID xid, ...@@ -1374,7 +1388,8 @@ static int apply_cmd_to_le_provdel (TXNID xid,
// I don't see how the xids could not match... // I don't see how the xids could not match...
return le_committed(klen, kval, return le_committed(klen, kval,
clen, cval, clen, cval,
newlen, disksize, new_data); newlen, disksize, new_data,
omt, mp, maybe_free);
case BRT_COMMIT_BOTH: case BRT_COMMIT_BOTH:
case BRT_COMMIT_ANY: case BRT_COMMIT_ANY:
*new_data = 0; *new_data = 0;
...@@ -1388,7 +1403,8 @@ static int apply_cmd_to_le_provpair (TXNID xid, ...@@ -1388,7 +1403,8 @@ static int apply_cmd_to_le_provpair (TXNID xid,
u_int32_t klen, void *kval, u_int32_t klen, void *kval,
u_int32_t plen , void *pval, u_int32_t plen , void *pval,
BRT_CMD cmd, BRT_CMD cmd,
u_int32_t *newlen, u_int32_t *disksize, LEAFENTRY *new_data) { u_int32_t *newlen, u_int32_t *disksize, LEAFENTRY *new_data,
OMT omt, struct mempool *mp, void **maybe_free) {
//assert(cmd->u.id.key->size == klen); //assert(cmd->u.id.key->size == klen);
//assert(memcmp(cmd->u.id.key->data, kval, klen)==0); //assert(memcmp(cmd->u.id.key->data, kval, klen)==0);
switch (cmd->type) { switch (cmd->type) {
...@@ -1398,14 +1414,16 @@ static int apply_cmd_to_le_provpair (TXNID xid, ...@@ -1398,14 +1414,16 @@ static int apply_cmd_to_le_provpair (TXNID xid,
return le_provpair(cmd->xid, return le_provpair(cmd->xid,
klen, kval, klen, kval,
cmd->u.id.val->size, cmd->u.id.val->data, cmd->u.id.val->size, cmd->u.id.val->data,
newlen, disksize, new_data); newlen, disksize, new_data,
omt, mp, maybe_free);
} else { } else {
// the old prov was actually committed. // the old prov was actually committed.
return le_both(cmd->xid, return le_both(cmd->xid,
klen, kval, klen, kval,
plen, pval, plen, pval,
cmd->u.id.val->size, cmd->u.id.val->data, cmd->u.id.val->size, cmd->u.id.val->data,
newlen, disksize, new_data); newlen, disksize, new_data,
omt, mp, maybe_free);
} }
case BRT_DELETE_BOTH: case BRT_DELETE_BOTH:
case BRT_DELETE_ANY: case BRT_DELETE_ANY:
...@@ -1418,7 +1436,8 @@ static int apply_cmd_to_le_provpair (TXNID xid, ...@@ -1418,7 +1436,8 @@ static int apply_cmd_to_le_provpair (TXNID xid,
return le_provdel(cmd->xid, return le_provdel(cmd->xid,
klen, kval, klen, kval,
plen, pval, plen, pval,
newlen, disksize, new_data); newlen, disksize, new_data,
omt, mp, maybe_free);
} }
case BRT_ABORT_BOTH: case BRT_ABORT_BOTH:
case BRT_ABORT_ANY: case BRT_ABORT_ANY:
...@@ -1429,7 +1448,8 @@ static int apply_cmd_to_le_provpair (TXNID xid, ...@@ -1429,7 +1448,8 @@ static int apply_cmd_to_le_provpair (TXNID xid,
case BRT_COMMIT_BOTH: case BRT_COMMIT_BOTH:
return le_committed(klen, kval, return le_committed(klen, kval,
plen, pval, plen, pval,
newlen, disksize, new_data); newlen, disksize, new_data,
omt, mp, maybe_free);
case BRT_NONE: break; case BRT_NONE: break;
} }
abort(); return 0; abort(); return 0;
...@@ -1438,7 +1458,8 @@ static int apply_cmd_to_le_provpair (TXNID xid, ...@@ -1438,7 +1458,8 @@ static int apply_cmd_to_le_provpair (TXNID xid,
static int static int
apply_cmd_to_leaf (BRT_CMD cmd, apply_cmd_to_leaf (BRT_CMD cmd,
void *stored_data, // NULL if there was no stored data. void *stored_data, // NULL if there was no stored data.
u_int32_t *newlen, u_int32_t *disksize, LEAFENTRY *new_data) u_int32_t *newlen, u_int32_t *disksize, LEAFENTRY *new_data,
OMT omt, struct mempool *mp, void **maybe_free)
{ {
if (stored_data==0) { if (stored_data==0) {
switch (cmd->type) { switch (cmd->type) {
...@@ -1448,7 +1469,8 @@ apply_cmd_to_leaf (BRT_CMD cmd, ...@@ -1448,7 +1469,8 @@ apply_cmd_to_leaf (BRT_CMD cmd,
int r = le_provpair(cmd->xid, int r = le_provpair(cmd->xid,
cmd->u.id.key->size, cmd->u.id.key->data, cmd->u.id.key->size, cmd->u.id.key->data,
cmd->u.id.val->size, cmd->u.id.val->data, cmd->u.id.val->size, cmd->u.id.val->data,
newlen, disksize, &le); newlen, disksize, &le,
omt, mp, maybe_free);
if (r==0) *new_data=le; if (r==0) *new_data=le;
return r; return r;
} }
...@@ -1466,7 +1488,8 @@ apply_cmd_to_leaf (BRT_CMD cmd, ...@@ -1466,7 +1488,8 @@ apply_cmd_to_leaf (BRT_CMD cmd,
abort(); abort();
} else { } else {
LESWITCHCALL(stored_data, apply_cmd_to, cmd, LESWITCHCALL(stored_data, apply_cmd_to, cmd,
newlen, disksize, new_data); newlen, disksize, new_data,
omt, mp, maybe_free);
} }
abort(); return 0; abort(); return 0;
} }
...@@ -1480,10 +1503,14 @@ brt_leaf_apply_cmd_once (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, ...@@ -1480,10 +1503,14 @@ brt_leaf_apply_cmd_once (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger,
{ {
FILENUM filenum = toku_cachefile_filenum(t->cf); FILENUM filenum = toku_cachefile_filenum(t->cf);
u_int32_t newlen=0, newdisksize=0; u_int32_t newlen=0, newdisksize=0;
LEAFENTRY newdata=0; LEAFENTRY new_le=0;
int r = apply_cmd_to_leaf(cmd, le, &newlen, &newdisksize, &newdata); void *maybe_free = 0;
// This function may call mempool_malloc_dont_release() to allocate more space.
// That means the old pointers are guaranteed to still be good, but the data may have been copied into a new mempool.
// We'll have to release the old mempool later.
int r = apply_cmd_to_leaf(cmd, le, &newlen, &newdisksize, &new_le, node->u.l.buffer, &node->u.l.buffer_mempool, &maybe_free);
if (r!=0) return r; if (r!=0) return r;
if (newdata) assert(newdisksize == leafentry_disksize(newdata)); if (new_le) assert(newdisksize == leafentry_disksize(new_le));
//printf("Applying command: %s xid=%lld ", unparse_cmd_type(cmd->type), (long long)cmd->xid); //printf("Applying command: %s xid=%lld ", unparse_cmd_type(cmd->type), (long long)cmd->xid);
//toku_print_BYTESTRING(stdout, cmd->u.id.key->size, cmd->u.id.key->data); //toku_print_BYTESTRING(stdout, cmd->u.id.key->size, cmd->u.id.key->data);
...@@ -1491,12 +1518,12 @@ brt_leaf_apply_cmd_once (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, ...@@ -1491,12 +1518,12 @@ brt_leaf_apply_cmd_once (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger,
//toku_print_BYTESTRING(stdout, cmd->u.id.val->size, cmd->u.id.val->data); //toku_print_BYTESTRING(stdout, cmd->u.id.val->size, cmd->u.id.val->data);
//printf(" to \n"); //printf(" to \n");
//print_leafentry(stdout, le); printf("\n"); //print_leafentry(stdout, le); printf("\n");
//printf(" got "); print_leafentry(stdout, newdata); printf("\n"); //printf(" got "); print_leafentry(stdout, new_le); printf("\n");
if (le && newdata) { if (le && new_le) {
if (t->txn_that_created != cmd->xid) { if (t->txn_that_created != cmd->xid) {
if ((r = toku_log_deleteleafentry(logger, &node->log_lsn, 0, filenum, node->thisnodename, idx))) return r; if ((r = toku_log_deleteleafentry(logger, &node->log_lsn, 0, filenum, node->thisnodename, idx))) goto return_r;
if ((r = toku_log_insertleafentry(logger, &node->log_lsn, 0, toku_cachefile_filenum(t->cf), node->thisnodename, idx, newdata))) return r; if ((r = toku_log_insertleafentry(logger, &node->log_lsn, 0, toku_cachefile_filenum(t->cf), node->thisnodename, idx, new_le))) goto return_r;
} }
node->u.l.n_bytes_in_buffer -= OMT_ITEM_OVERHEAD + leafentry_disksize(le); node->u.l.n_bytes_in_buffer -= OMT_ITEM_OVERHEAD + leafentry_disksize(le);
...@@ -1504,29 +1531,24 @@ brt_leaf_apply_cmd_once (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, ...@@ -1504,29 +1531,24 @@ brt_leaf_apply_cmd_once (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger,
u_int32_t size = leafentry_memsize(le); u_int32_t size = leafentry_memsize(le);
LEAFENTRY new_le = mempool_malloc_from_omt(node->u.l.buffer, &node->u.l.buffer_mempool, newlen);
assert(new_le);
memcpy(new_le, newdata, newlen);
// This mfree must occur after the mempool_malloc so that when the mempool is compressed everything is accounted for. // This mfree must occur after the mempool_malloc so that when the mempool is compressed everything is accounted for.
// But we must compute the size before doing the mempool malloc because otherwise the le pointer is no good. // But we must compute the size before doing the mempool malloc because otherwise the le pointer is no good.
toku_mempool_mfree(&node->u.l.buffer_mempool, 0, size); // Must pass 0, since le may be no good any more. toku_mempool_mfree(&node->u.l.buffer_mempool, 0, size); // Must pass 0, since le may be no good any more.
node->u.l.n_bytes_in_buffer += OMT_ITEM_OVERHEAD + newdisksize; node->u.l.n_bytes_in_buffer += OMT_ITEM_OVERHEAD + newdisksize;
node->local_fingerprint += node->rand4fingerprint*toku_le_crc(newdata); node->local_fingerprint += node->rand4fingerprint*toku_le_crc(new_le);
toku_free(newdata);
if ((r = toku_omt_set_at(node->u.l.buffer, new_le, idx))) return r; if ((r = toku_omt_set_at(node->u.l.buffer, new_le, idx))) goto return_r;
} else { } else {
if (le) { if (le) {
// It's there, note that it's gone and remove it from the mempool // It's there, note that it's gone and remove it from the mempool
if (t->txn_that_created != cmd->xid) { if (t->txn_that_created != cmd->xid) {
if ((r = toku_log_deleteleafentry(logger, &node->log_lsn, 0, filenum, node->thisnodename, idx))) return r; if ((r = toku_log_deleteleafentry(logger, &node->log_lsn, 0, filenum, node->thisnodename, idx))) goto return_r;
} }
if ((r = toku_omt_delete_at(node->u.l.buffer, idx))) return r; if ((r = toku_omt_delete_at(node->u.l.buffer, idx))) goto return_r;
node->u.l.n_bytes_in_buffer -= OMT_ITEM_OVERHEAD + leafentry_disksize(le); node->u.l.n_bytes_in_buffer -= OMT_ITEM_OVERHEAD + leafentry_disksize(le);
node->local_fingerprint -= node->rand4fingerprint * toku_le_crc(le); node->local_fingerprint -= node->rand4fingerprint * toku_le_crc(le);
...@@ -1534,23 +1556,24 @@ brt_leaf_apply_cmd_once (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, ...@@ -1534,23 +1556,24 @@ brt_leaf_apply_cmd_once (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger,
toku_mempool_mfree(&node->u.l.buffer_mempool, 0, leafentry_memsize(le)); // Must pass 0, since le may be no good any more. toku_mempool_mfree(&node->u.l.buffer_mempool, 0, leafentry_memsize(le)); // Must pass 0, since le may be no good any more.
} }
if (newdata) { if (new_le) {
LEAFENTRY new_le = mempool_malloc_from_omt(node->u.l.buffer, &node->u.l.buffer_mempool, newlen); if ((r = toku_omt_insert_at(node->u.l.buffer, new_le, idx))) goto return_r;
assert(new_le);
memcpy(new_le, newdata, newlen);
if ((r = toku_omt_insert_at(node->u.l.buffer, new_le, idx))) return r;
if (t->txn_that_created != cmd->xid) { if (t->txn_that_created != cmd->xid) {
if ((r = toku_log_insertleafentry(logger, &node->log_lsn, 0, toku_cachefile_filenum(t->cf), node->thisnodename, idx, newdata))) return r; if ((r = toku_log_insertleafentry(logger, &node->log_lsn, 0, toku_cachefile_filenum(t->cf), node->thisnodename, idx, new_le))) goto return_r;
} }
node->u.l.n_bytes_in_buffer += OMT_ITEM_OVERHEAD + newdisksize; node->u.l.n_bytes_in_buffer += OMT_ITEM_OVERHEAD + newdisksize;
node->local_fingerprint += node->rand4fingerprint*toku_le_crc(newdata); node->local_fingerprint += node->rand4fingerprint*toku_le_crc(new_le);
toku_free(newdata);
} }
} }
r=0;
// printf("%s:%d rand4=%08x local_fingerprint=%08x this=%08x\n", __FILE__, __LINE__, node->rand4fingerprint, node->local_fingerprint, toku_calccrc32_kvpair_struct(kv)); // printf("%s:%d rand4=%08x local_fingerprint=%08x this=%08x\n", __FILE__, __LINE__, node->rand4fingerprint, node->local_fingerprint, toku_calccrc32_kvpair_struct(kv));
return 0; return_r:
if (maybe_free) toku_free(maybe_free); //
return r;
} }
static int static int
...@@ -1884,7 +1907,7 @@ merge_leaf_nodes (BRTNODE a, BRTNODE b) { ...@@ -1884,7 +1907,7 @@ merge_leaf_nodes (BRTNODE a, BRTNODE b) {
u_int32_t le_size = leafentry_memsize(le); u_int32_t le_size = leafentry_memsize(le);
u_int32_t le_crc = toku_le_crc(le); u_int32_t le_crc = toku_le_crc(le);
{ {
LEAFENTRY new_le = mempool_malloc_from_omt(omta, &a->u.l.buffer_mempool, le_size); LEAFENTRY new_le = mempool_malloc_from_omt(omta, &a->u.l.buffer_mempool, le_size, 0);
assert(new_le); assert(new_le);
memcpy(new_le, le, le_size); memcpy(new_le, le, le_size);
int r = toku_omt_insert_at(omta, new_le, toku_omt_size(a->u.l.buffer)); int r = toku_omt_insert_at(omta, new_le, toku_omt_size(a->u.l.buffer));
...@@ -1924,7 +1947,7 @@ balance_leaf_nodes (BRTNODE a, BRTNODE b, struct kv_pair **splitk) ...@@ -1924,7 +1947,7 @@ balance_leaf_nodes (BRTNODE a, BRTNODE b, struct kv_pair **splitk)
u_int32_t le_size = leafentry_memsize(le); u_int32_t le_size = leafentry_memsize(le);
u_int32_t le_crc = toku_le_crc(le); u_int32_t le_crc = toku_le_crc(le);
{ {
LEAFENTRY new_le = mempool_malloc_from_omt(omtto, &to->u.l.buffer_mempool, le_size); LEAFENTRY new_le = mempool_malloc_from_omt(omtto, &to->u.l.buffer_mempool, le_size, 0);
assert(new_le); assert(new_le);
memcpy(new_le, le, le_size); memcpy(new_le, le, le_size);
int r = toku_omt_insert_at(omtto, new_le, to_idx); int r = toku_omt_insert_at(omtto, new_le, to_idx);
...@@ -2564,7 +2587,7 @@ static int move_it (OMTVALUE lev, u_int32_t idx, void *v) { ...@@ -2564,7 +2587,7 @@ static int move_it (OMTVALUE lev, u_int32_t idx, void *v) {
} }
// Compress things, and grow the mempool if needed. // Compress things, and grow the mempool if needed.
static int omt_compress_kvspace (OMT omt, struct mempool *memp, size_t added_size) { static int omt_compress_kvspace (OMT omt, struct mempool *memp, size_t added_size, void **maybe_free) {
u_int32_t total_size_needed = memp->free_offset-memp->frag_size + added_size; u_int32_t total_size_needed = memp->free_offset-memp->frag_size + added_size;
if (total_size_needed+total_size_needed/4 >= memp->size) { if (total_size_needed+total_size_needed/4 >= memp->size) {
memp->size = total_size_needed+total_size_needed/4; memp->size = total_size_needed+total_size_needed/4;
...@@ -2577,15 +2600,19 @@ static int omt_compress_kvspace (OMT omt, struct mempool *memp, size_t added_siz ...@@ -2577,15 +2600,19 @@ static int omt_compress_kvspace (OMT omt, struct mempool *memp, size_t added_siz
struct omt_compressor_state oc = { &new_kvspace, omt }; struct omt_compressor_state oc = { &new_kvspace, omt };
toku_omt_iterate(omt, move_it, &oc); toku_omt_iterate(omt, move_it, &oc);
if (maybe_free) {
*maybe_free = memp->base;
} else {
toku_free(memp->base); toku_free(memp->base);
}
*memp = new_kvspace; *memp = new_kvspace;
return 0; return 0;
} }
void *mempool_malloc_from_omt(OMT omt, struct mempool *mp, size_t size) { void *mempool_malloc_from_omt(OMT omt, struct mempool *mp, size_t size, void **maybe_free) {
void *v = toku_mempool_malloc(mp, size, 1); void *v = toku_mempool_malloc(mp, size, 1);
if (v==0) { if (v==0) {
if (0 == omt_compress_kvspace(omt, mp, size)) { if (0 == omt_compress_kvspace(omt, mp, size, maybe_free)) {
v = toku_mempool_malloc(mp, size, 1); v = toku_mempool_malloc(mp, size, 1);
assert(v); assert(v);
} }
...@@ -2714,7 +2741,7 @@ static int brt_init_header(BRT t, TOKUTXN txn) { ...@@ -2714,7 +2741,7 @@ static int brt_init_header(BRT t, TOKUTXN txn) {
if ((r=setup_initial_brt_root_node(t, root, toku_txn_logger(txn)))!=0) { return r; } if ((r=setup_initial_brt_root_node(t, root, toku_txn_logger(txn)))!=0) { return r; }
//printf("%s:%d putting %p (%d)\n", __FILE__, __LINE__, t->h, 0); //printf("%s:%d putting %p (%d)\n", __FILE__, __LINE__, t->h, 0);
assert(t->h->free_blocks.b==-1); assert(t->h->free_blocks.b==-1);
toku_cachefile_set_userdata(t->cf, t->h, toku_brtheader_close); toku_cachefile_set_userdata(t->cf, t->h, toku_brtheader_close, toku_brtheader_checkpoint);
return r; return r;
} }
...@@ -2766,7 +2793,7 @@ int toku_read_brt_header_and_store_in_cachefile (CACHEFILE cf, struct brt_header ...@@ -2766,7 +2793,7 @@ int toku_read_brt_header_and_store_in_cachefile (CACHEFILE cf, struct brt_header
int r = toku_deserialize_brtheader_from(toku_cachefile_fd(cf), make_blocknum(0), &h); int r = toku_deserialize_brtheader_from(toku_cachefile_fd(cf), make_blocknum(0), &h);
if (r!=0) return r; if (r!=0) return r;
h->root_put_counter = global_root_put_counter++; h->root_put_counter = global_root_put_counter++;
toku_cachefile_set_userdata(cf, (void*)h, toku_brtheader_close); toku_cachefile_set_userdata(cf, (void*)h, toku_brtheader_close, toku_brtheader_checkpoint);
*header = h; *header = h;
return 0; return 0;
} }
...@@ -2983,7 +3010,9 @@ int toku_brt_create_cachetable(CACHETABLE *ct, long cachesize, LSN initial_lsn, ...@@ -2983,7 +3010,9 @@ int toku_brt_create_cachetable(CACHETABLE *ct, long cachesize, LSN initial_lsn,
return toku_create_cachetable(ct, cachesize, initial_lsn, logger); return toku_create_cachetable(ct, cachesize, initial_lsn, logger);
} }
int toku_brtheader_close (CACHEFILE cachefile, void *header_v) { int
toku_brtheader_checkpoint (CACHEFILE cachefile, void *header_v)
{
struct brt_header *h = header_v; struct brt_header *h = header_v;
//printf("%s:%d allocated_limit=%lu writing queue to %lu\n", __FILE__, __LINE__, //printf("%s:%d allocated_limit=%lu writing queue to %lu\n", __FILE__, __LINE__,
// block_allocator_allocated_limit(h->block_allocator), h->unused_blocks.b*h->nodesize); // block_allocator_allocated_limit(h->block_allocator), h->unused_blocks.b*h->nodesize);
...@@ -2992,7 +3021,16 @@ int toku_brtheader_close (CACHEFILE cachefile, void *header_v) { ...@@ -2992,7 +3021,16 @@ int toku_brtheader_close (CACHEFILE cachefile, void *header_v) {
u_int64_t write_to = block_allocator_allocated_limit(h->block_allocator); // Must compute this after writing the header. u_int64_t write_to = block_allocator_allocated_limit(h->block_allocator); // Must compute this after writing the header.
//printf("%s:%d fifo written to %lu\n", __FILE__, __LINE__, write_to); //printf("%s:%d fifo written to %lu\n", __FILE__, __LINE__, write_to);
toku_serialize_fifo_at(toku_cachefile_fd(cachefile), write_to, h->fifo); toku_serialize_fifo_at(toku_cachefile_fd(cachefile), write_to, h->fifo);
h->dirty = 0;
} }
return 0;
}
int
toku_brtheader_close (CACHEFILE cachefile, void *header_v)
{
struct brt_header *h = header_v;
toku_brtheader_checkpoint(cachefile, h);
toku_brtheader_free(h); toku_brtheader_free(h);
return 0; return 0;
} }
......
...@@ -125,6 +125,7 @@ struct cachefile { ...@@ -125,6 +125,7 @@ struct cachefile {
void *userdata; void *userdata;
int (*close_userdata)(CACHEFILE cf, void *userdata); // when closing the last reference to a cachefile, first call this function. int (*close_userdata)(CACHEFILE cf, void *userdata); // when closing the last reference to a cachefile, first call this function.
int (*checkpoint_userdata)(CACHEFILE cf, void *userdata); // when checkpointing a cachefile, call this function.
}; };
int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN initial_lsn, TOKULOGGER logger) { int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN initial_lsn, TOKULOGGER logger) {
...@@ -229,6 +230,7 @@ int toku_cachetable_openfd (CACHEFILE *cf, CACHETABLE t, int fd, const char *fna ...@@ -229,6 +230,7 @@ int toku_cachetable_openfd (CACHEFILE *cf, CACHETABLE t, int fd, const char *fna
newcf->userdata = 0; newcf->userdata = 0;
newcf->close_userdata = 0; newcf->close_userdata = 0;
newcf->checkpoint_userdata = 0;
*cf = newcf; *cf = newcf;
return 0; return 0;
...@@ -252,6 +254,7 @@ int toku_cachefile_set_fd (CACHEFILE cf, int fd, const char *fname) { ...@@ -252,6 +254,7 @@ int toku_cachefile_set_fd (CACHEFILE cf, int fd, const char *fname) {
return r; return r;
} }
cf->close_userdata = NULL; cf->close_userdata = NULL;
cf->checkpoint_userdata = NULL;
cf->userdata = NULL; cf->userdata = NULL;
close(cf->fd); close(cf->fd);
...@@ -313,6 +316,7 @@ int toku_cachefile_close (CACHEFILE *cfp, TOKULOGGER logger) { ...@@ -313,6 +316,7 @@ int toku_cachefile_close (CACHEFILE *cfp, TOKULOGGER logger) {
return r; return r;
} }
cf->close_userdata = NULL; cf->close_userdata = NULL;
cf->checkpoint_userdata = NULL;
cf->userdata = NULL; cf->userdata = NULL;
cf->cachetable->cachefiles = remove_cf_from_list(cf, cf->cachetable->cachefiles); cf->cachetable->cachefiles = remove_cf_from_list(cf, cf->cachetable->cachefiles);
cachetable_unlock(ct); cachetable_unlock(ct);
...@@ -1081,6 +1085,16 @@ int toku_cachetable_checkpoint (CACHETABLE ct) { ...@@ -1081,6 +1085,16 @@ int toku_cachetable_checkpoint (CACHETABLE ct) {
cachetable_complete_write_pair(ct, p, FALSE); cachetable_complete_write_pair(ct, p, FALSE);
} }
{
CACHEFILE cf;
for (cf = ct->cachefiles; cf; cf=cf->next) {
if (cf->checkpoint_userdata) {
int r = cf->checkpoint_userdata(cf, cf->userdata);
assert(r==0);
}
}
}
ct->checkpointing = 0; // clear the checkpoint in progress flag ct->checkpointing = 0; // clear the checkpoint in progress flag
} }
...@@ -1219,9 +1233,15 @@ int toku_cachetable_get_key_state (CACHETABLE ct, CACHEKEY key, CACHEFILE cf, vo ...@@ -1219,9 +1233,15 @@ int toku_cachetable_get_key_state (CACHETABLE ct, CACHEKEY key, CACHEFILE cf, vo
return r; return r;
} }
void toku_cachefile_set_userdata (CACHEFILE cf, void *userdata, int (*close_userdata)(CACHEFILE, void*)) { void
toku_cachefile_set_userdata (CACHEFILE cf,
void *userdata,
int (*close_userdata)(CACHEFILE, void*),
int (*checkpoint_userdata)(CACHEFILE, void*))
{
cf->userdata = userdata; cf->userdata = userdata;
cf->close_userdata = close_userdata; cf->close_userdata = close_userdata;
cf->checkpoint_userdata = checkpoint_userdata;
} }
void *toku_cachefile_get_userdata(CACHEFILE cf) { void *toku_cachefile_get_userdata(CACHEFILE cf) {
......
...@@ -61,8 +61,9 @@ typedef void (*CACHETABLE_FLUSH_CALLBACK)(CACHEFILE, CACHEKEY key, void *value, ...@@ -61,8 +61,9 @@ typedef void (*CACHETABLE_FLUSH_CALLBACK)(CACHEFILE, CACHEKEY key, void *value,
// associated with the key are returned. // associated with the key are returned.
typedef int (*CACHETABLE_FETCH_CALLBACK)(CACHEFILE, CACHEKEY key, u_int32_t fullhash, void **value, long *sizep, void *extraargs, LSN *written_lsn); typedef int (*CACHETABLE_FETCH_CALLBACK)(CACHEFILE, CACHEKEY key, u_int32_t fullhash, void **value, long *sizep, void *extraargs, LSN *written_lsn);
void toku_cachefile_set_userdata(CACHEFILE cf, void *userdata, int (*close_userdata)(CACHEFILE, void*)); void toku_cachefile_set_userdata(CACHEFILE cf, void *userdata, int (*close_userdata)(CACHEFILE, void*), int (*checkpoint_userdata)(CACHEFILE, void*));
// Effect: Store some cachefile-specific user data. When the last reference to a cachefile is closed, we call close_userdata. // Effect: Store some cachefile-specific user data. When the last reference to a cachefile is closed, we call close_userdata().
// When the cachefile needs to be checkpointed, we call checkpoint_userdata().
// If userdata is already non-NULL, then we simply overwrite it. // If userdata is already non-NULL, then we simply overwrite it.
void *toku_cachefile_get_userdata(CACHEFILE); void *toku_cachefile_get_userdata(CACHEFILE);
// Effect: Get the user dataa. // Effect: Get the user dataa.
......
...@@ -6,9 +6,20 @@ u_int32_t toku_le_crc(LEAFENTRY v) { ...@@ -6,9 +6,20 @@ u_int32_t toku_le_crc(LEAFENTRY v) {
return x1764_memory(v, leafentry_memsize(v)); return x1764_memory(v, leafentry_memsize(v));
} }
int le_committed (u_int32_t klen, void* kval, u_int32_t dlen, void* dval, u_int32_t *resultsize, u_int32_t *disksize, LEAFENTRY *result) { static void *
le_malloc(OMT omt, struct mempool *mp, size_t size, void **maybe_free)
{
if (omt)
return mempool_malloc_from_omt(omt, mp, size, maybe_free);
else
return toku_malloc(size);
}
int
le_committed (u_int32_t klen, void* kval, u_int32_t dlen, void* dval, u_int32_t *resultsize, u_int32_t *disksize, LEAFENTRY *result,
OMT omt, struct mempool *mp, void **maybe_free) {
size_t size = 9+klen+dlen; size_t size = 9+klen+dlen;
unsigned char *lec=toku_malloc(size); unsigned char *lec=le_malloc(omt, mp, size, maybe_free);
assert(lec); assert(lec);
lec[0] = LE_COMMITTED; lec[0] = LE_COMMITTED;
putint(lec+1, klen); putint(lec+1, klen);
...@@ -20,10 +31,12 @@ int le_committed (u_int32_t klen, void* kval, u_int32_t dlen, void* dval, u_int3 ...@@ -20,10 +31,12 @@ int le_committed (u_int32_t klen, void* kval, u_int32_t dlen, void* dval, u_int3
*result=(LEAFENTRY)lec; *result=(LEAFENTRY)lec;
return 0; return 0;
} }
int le_both (TXNID xid, u_int32_t klen, void* kval, u_int32_t clen, void* cval, u_int32_t plen, void* pval, int le_both (TXNID xid, u_int32_t klen, void* kval, u_int32_t clen, void* cval, u_int32_t plen, void* pval,
u_int32_t *resultsize, u_int32_t *disksize, LEAFENTRY *result) { u_int32_t *resultsize, u_int32_t *disksize, LEAFENTRY *result,
OMT omt, struct mempool *mp, void **maybe_free) {
size_t size = 1+8+4*3+klen+clen+plen; size_t size = 1+8+4*3+klen+clen+plen;
unsigned char *lec=toku_malloc(size); unsigned char *lec=le_malloc(omt, mp, size, maybe_free);
assert(lec); assert(lec);
lec[0] = LE_BOTH; lec[0] = LE_BOTH;
putint64(lec+1, xid); putint64(lec+1, xid);
...@@ -39,10 +52,13 @@ int le_both (TXNID xid, u_int32_t klen, void* kval, u_int32_t clen, void* cval, ...@@ -39,10 +52,13 @@ int le_both (TXNID xid, u_int32_t klen, void* kval, u_int32_t clen, void* cval,
return 0; return 0;
} }
int le_provdel (TXNID xid, u_int32_t klen, void* kval, u_int32_t dlen, void* dval,
u_int32_t *memsize, u_int32_t *disksize, LEAFENTRY *result) { int
le_provdel (TXNID xid, u_int32_t klen, void* kval, u_int32_t dlen, void* dval,
u_int32_t *memsize, u_int32_t *disksize, LEAFENTRY *result,
OMT omt, struct mempool *mp, void **maybe_free) {
size_t size = 1 + 8 + 2*4 + klen + dlen; size_t size = 1 + 8 + 2*4 + klen + dlen;
unsigned char *lec=toku_malloc(size); unsigned char *lec= le_malloc(omt, mp, size, maybe_free);
assert(lec); assert(lec);
lec[0] = LE_PROVDEL; lec[0] = LE_PROVDEL;
putint64(lec+1, xid); putint64(lec+1, xid);
...@@ -55,9 +71,12 @@ int le_provdel (TXNID xid, u_int32_t klen, void* kval, u_int32_t dlen, void* dv ...@@ -55,9 +71,12 @@ int le_provdel (TXNID xid, u_int32_t klen, void* kval, u_int32_t dlen, void* dv
*result=(LEAFENTRY)lec; *result=(LEAFENTRY)lec;
return 0; return 0;
} }
int le_provpair (TXNID xid, u_int32_t klen, void* kval, u_int32_t plen, void* pval, u_int32_t *memsize, u_int32_t *disksize, LEAFENTRY *result) {
int
le_provpair (TXNID xid, u_int32_t klen, void* kval, u_int32_t plen, void* pval, u_int32_t *memsize, u_int32_t *disksize, LEAFENTRY *result,
OMT omt, struct mempool *mp, void **maybe_free) {
size_t size = 1 + 8 + 2*4 + klen + plen; size_t size = 1 + 8 + 2*4 + klen + plen;
unsigned char *lec=toku_malloc(size); unsigned char *lec= le_malloc(omt, mp, size, maybe_free);
assert(lec); assert(lec);
lec[0] = LE_PROVPAIR; lec[0] = LE_PROVPAIR;
putint64(lec+1, xid); putint64(lec+1, xid);
...@@ -154,7 +173,8 @@ int toku_fread_LEAFENTRY(FILE *f, LEAFENTRY *le, struct x1764 *checksum, u_int32 ...@@ -154,7 +173,8 @@ int toku_fread_LEAFENTRY(FILE *f, LEAFENTRY *le, struct x1764 *checksum, u_int32
r = toku_fread_BYTESTRING(f, &a, checksum, len); if (r!=0) return r; r = toku_fread_BYTESTRING(f, &a, checksum, len); if (r!=0) return r;
r = toku_fread_BYTESTRING(f, &b, checksum, len); if (r!=0) return r; r = toku_fread_BYTESTRING(f, &b, checksum, len); if (r!=0) return r;
r = le_committed(a.len, a.data, b.len, b.data, r = le_committed(a.len, a.data, b.len, b.data,
&memsize, &disksize, le); &memsize, &disksize, le,
0, 0, 0);
toku_free_BYTESTRING(a); toku_free_BYTESTRING(a);
toku_free_BYTESTRING(b); toku_free_BYTESTRING(b);
return r; return r;
...@@ -164,7 +184,8 @@ int toku_fread_LEAFENTRY(FILE *f, LEAFENTRY *le, struct x1764 *checksum, u_int32 ...@@ -164,7 +184,8 @@ int toku_fread_LEAFENTRY(FILE *f, LEAFENTRY *le, struct x1764 *checksum, u_int32
r = toku_fread_BYTESTRING(f, &b, checksum, len); if (r!=0) return r; r = toku_fread_BYTESTRING(f, &b, checksum, len); if (r!=0) return r;
r = toku_fread_BYTESTRING(f, &c, checksum, len); if (r!=0) return r; r = toku_fread_BYTESTRING(f, &c, checksum, len); if (r!=0) return r;
r = le_both(xid, a.len, a.data, b.len, b.data, c.len, c.data, r = le_both(xid, a.len, a.data, b.len, b.data, c.len, c.data,
&memsize, &disksize, le); &memsize, &disksize, le,
0, 0, 0);
toku_free_BYTESTRING(a); toku_free_BYTESTRING(a);
toku_free_BYTESTRING(b); toku_free_BYTESTRING(b);
toku_free_BYTESTRING(c); toku_free_BYTESTRING(c);
...@@ -174,7 +195,8 @@ int toku_fread_LEAFENTRY(FILE *f, LEAFENTRY *le, struct x1764 *checksum, u_int32 ...@@ -174,7 +195,8 @@ int toku_fread_LEAFENTRY(FILE *f, LEAFENTRY *le, struct x1764 *checksum, u_int32
r = toku_fread_BYTESTRING(f, &a, checksum, len); if (r!=0) return r; r = toku_fread_BYTESTRING(f, &a, checksum, len); if (r!=0) return r;
r = toku_fread_BYTESTRING(f, &b, checksum, len); if (r!=0) return r; r = toku_fread_BYTESTRING(f, &b, checksum, len); if (r!=0) return r;
r = le_provdel(xid, a.len, a.data, b.len, b.data, r = le_provdel(xid, a.len, a.data, b.len, b.data,
&memsize, &disksize, le); &memsize, &disksize, le,
0, 0, 0);
toku_free_BYTESTRING(a); toku_free_BYTESTRING(a);
toku_free_BYTESTRING(b); toku_free_BYTESTRING(b);
return r; return r;
...@@ -183,7 +205,8 @@ int toku_fread_LEAFENTRY(FILE *f, LEAFENTRY *le, struct x1764 *checksum, u_int32 ...@@ -183,7 +205,8 @@ int toku_fread_LEAFENTRY(FILE *f, LEAFENTRY *le, struct x1764 *checksum, u_int32
r = toku_fread_BYTESTRING(f, &a, checksum, len); if (r!=0) return r; r = toku_fread_BYTESTRING(f, &a, checksum, len); if (r!=0) return r;
r = toku_fread_BYTESTRING(f, &b, checksum, len); if (r!=0) return r; r = toku_fread_BYTESTRING(f, &b, checksum, len); if (r!=0) return r;
r = le_provpair(xid, a.len, a.data, b.len, b.data, r = le_provpair(xid, a.len, a.data, b.len, b.data,
&memsize, &disksize, le); &memsize, &disksize, le,
0, 0, 0);
toku_free_BYTESTRING(a); toku_free_BYTESTRING(a);
toku_free_BYTESTRING(b); toku_free_BYTESTRING(b);
return r; return r;
......
...@@ -33,13 +33,16 @@ ...@@ -33,13 +33,16 @@
u_int32_t toku_le_crc(LEAFENTRY v); u_int32_t toku_le_crc(LEAFENTRY v);
int le_committed (u_int32_t klen, void* kval, u_int32_t dlen, void* dval, u_int32_t *resultsize, u_int32_t *disksize, LEAFENTRY *result); int le_committed (u_int32_t klen, void* kval, u_int32_t dlen, void* dval, u_int32_t *resultsize, u_int32_t *disksize, LEAFENTRY *result,
OMT, struct mempool *, void **maybe_free);
int le_both (TXNID xid, u_int32_t cklen, void* ckval, u_int32_t cdlen, void* cdval, u_int32_t pdlen, void* pdval, int le_both (TXNID xid, u_int32_t cklen, void* ckval, u_int32_t cdlen, void* cdval, u_int32_t pdlen, void* pdval,
u_int32_t *memsize, u_int32_t *disksize, LEAFENTRY *result); u_int32_t *memsize, u_int32_t *disksize, LEAFENTRY *result,
OMT, struct mempool *, void **maybe_free);
int le_provdel (TXNID xid, u_int32_t klen, void* kval, u_int32_t dlen, void* dval, int le_provdel (TXNID xid, u_int32_t klen, void* kval, u_int32_t dlen, void* dval,
u_int32_t *resultsize, u_int32_t *memsize, LEAFENTRY *result); u_int32_t *resultsize, u_int32_t *memsize, LEAFENTRY *result,
int le_provpair (TXNID xid, u_int32_t klen, void* kval, u_int32_t dlen, void* dval, OMT, struct mempool *, void **maybe_free);
u_int32_t *memsize, u_int32_t *disksize, LEAFENTRY *result); int le_provpair (TXNID xid, u_int32_t klen, void* kval, u_int32_t plen, void* pval, u_int32_t *memsize, u_int32_t *disksize, LEAFENTRY *result,
OMT omt, struct mempool *mp, void **maybe_free);
enum le_state { LE_COMMITTED=1, // A committed pair. enum le_state { LE_COMMITTED=1, // A committed pair.
LE_BOTH, // A committed pair and a provisional pair. LE_BOTH, // A committed pair and a provisional pair.
......
...@@ -158,7 +158,7 @@ static void toku_recover_fheader (LSN UU(lsn), TXNID UU(txnid),FILENUM filenum,L ...@@ -158,7 +158,7 @@ static void toku_recover_fheader (LSN UU(lsn), TXNID UU(txnid),FILENUM filenum,L
pair->brt->h = h; pair->brt->h = h;
pair->brt->nodesize = h->nodesize; pair->brt->nodesize = h->nodesize;
pair->brt->flags = h->nodesize; pair->brt->flags = h->nodesize;
toku_cachefile_set_userdata(pair->cf, pair->brt->h, toku_brtheader_close); toku_cachefile_set_userdata(pair->cf, pair->brt->h, toku_brtheader_close, toku_brtheader_checkpoint);
} }
static void static void
...@@ -613,7 +613,7 @@ toku_recover_insertleafentry (LSN lsn, FILENUM filenum, BLOCKNUM blocknum, u_int ...@@ -613,7 +613,7 @@ toku_recover_insertleafentry (LSN lsn, FILENUM filenum, BLOCKNUM blocknum, u_int
node->log_lsn = lsn; node->log_lsn = lsn;
{ {
int memsize = leafentry_memsize(newleafentry); int memsize = leafentry_memsize(newleafentry);
void *mem = mempool_malloc_from_omt(node->u.l.buffer, &node->u.l.buffer_mempool, memsize); void *mem = mempool_malloc_from_omt(node->u.l.buffer, &node->u.l.buffer_mempool, memsize, 0);
assert(mem); assert(mem);
memcpy(mem, newleafentry, memsize); memcpy(mem, newleafentry, memsize);
r = toku_omt_insert_at(node->u.l.buffer, mem, idx); r = toku_omt_insert_at(node->u.l.buffer, mem, idx);
......
#include "portability.h" #include "portability.h"
#include <string.h> #include <string.h>
#include "brttypes.h" #include "brttypes.h"
#include "leafentry.h" #include "includes.h"
static void test_leafentry_1 (void) { static void test_leafentry_1 (void) {
LEAFENTRY l; LEAFENTRY l;
int r; int r;
u_int32_t msize, dsize; u_int32_t msize, dsize;
r = le_committed(4, "abc", 3, "xy", &msize, &dsize, &l); r = le_committed(4, "abc", 3, "xy", &msize, &dsize, &l, 0, 0, 0);
assert(r==0); assert(r==0);
char expect[] = {LE_COMMITTED, char expect[] = {LE_COMMITTED,
0, 0, 0, 4, 0, 0, 0, 4,
...@@ -24,7 +24,7 @@ static void test_leafentry_2 (void) { ...@@ -24,7 +24,7 @@ static void test_leafentry_2 (void) {
LEAFENTRY l; LEAFENTRY l;
int r; int r;
u_int32_t msize, dsize; u_int32_t msize, dsize;
r = le_both(0x0123456789abcdef0LL, 3, "ab", 4, "xyz", 5, "lmno", &msize, &dsize, &l); r = le_both(0x0123456789abcdef0LL, 3, "ab", 4, "xyz", 5, "lmno", &msize, &dsize, &l, 0, 0, 0);
assert(r==0); assert(r==0);
char expect[] = {LE_BOTH, char expect[] = {LE_BOTH,
0x12, 0x34, 0x56, 0x78, 0x9a, 0xbc, 0xde, 0xf0, 0x12, 0x34, 0x56, 0x78, 0x9a, 0xbc, 0xde, 0xf0,
...@@ -41,7 +41,7 @@ static void test_leafentry_3 (void) { ...@@ -41,7 +41,7 @@ static void test_leafentry_3 (void) {
LEAFENTRY l; LEAFENTRY l;
int r; int r;
u_int32_t msize, dsize; u_int32_t msize, dsize;
r = le_provdel(0x0123456789abcdef0LL, 3, "ab", 5, "lmno", &msize, &dsize, &l); r = le_provdel(0x0123456789abcdef0LL, 3, "ab", 5, "lmno", &msize, &dsize, &l, 0, 0, 0);
assert(r==0); assert(r==0);
char expect[] = {LE_PROVDEL, char expect[] = {LE_PROVDEL,
0x12, 0x34, 0x56, 0x78, 0x9a, 0xbc, 0xde, 0xf0, 0x12, 0x34, 0x56, 0x78, 0x9a, 0xbc, 0xde, 0xf0,
...@@ -57,7 +57,7 @@ static void test_leafentry_4 (void) { ...@@ -57,7 +57,7 @@ static void test_leafentry_4 (void) {
LEAFENTRY l; LEAFENTRY l;
int r; int r;
u_int32_t msize, dsize; u_int32_t msize, dsize;
r = le_provpair(0x0123456789abcdef0LL, 3, "ab", 5, "lmno", &msize, &dsize, &l); r = le_provpair(0x0123456789abcdef0LL, 3, "ab", 5, "lmno", &msize, &dsize, &l, 0, 0, 0);
assert(r==0); assert(r==0);
char expect[] = {LE_PROVPAIR, char expect[] = {LE_PROVPAIR,
0x12, 0x34, 0x56, 0x78, 0x9a, 0xbc, 0xde, 0xf0, 0x12, 0x34, 0x56, 0x78, 0x9a, 0xbc, 0xde, 0xf0,
...@@ -89,7 +89,7 @@ static void test_leafentry_3long (void) { ...@@ -89,7 +89,7 @@ static void test_leafentry_3long (void) {
LEAFENTRY l; LEAFENTRY l;
int r; int r;
u_int32_t msize, dsize; u_int32_t msize, dsize;
r = le_provdel(0x0123456789abcdef0LL, 301, zeros, 1025, zeros, &msize, &dsize, &l); r = le_provdel(0x0123456789abcdef0LL, 301, zeros, 1025, zeros, &msize, &dsize, &l, 0, 0, 0);
assert(r==0); assert(r==0);
assert(sizeof(expect_3long)==msize); assert(sizeof(expect_3long)==msize);
assert(msize==dsize); assert(msize==dsize);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment